feat(vectorstore): re-embed on file mtime > store updated_at (#23)

Removes the TODO in Sync that left files static after their first embed.
Edits to brain/wiki/ and brain/knowledge/ now surface in subsequent
syncs without manual /backfill-embeddings calls.

Approach
- Store interface: KnownPaths → KnownPathsWithTime returning path →
  updated_at. Callers compare against file mtime to detect edits.
- PGStore: SELECT path, updated_at FROM brain_embeddings.
- Sync groups known chunks by parent path and tracks the EARLIEST
  updated_at per parent. A file is stale when its mtime is after that
  oldest chunk's timestamp — any chunk older than the file means at
  least one chunk hasn't been refreshed since the last edit.
- Stale-path rewrite: delete every old chunk for the parent (handles
  "file shrunk → fewer chunks → orphan rows at higher #NNNN" cleanly),
  then re-chunk + re-embed + re-upsert.

Tests
- New: TestSync_ReembedsFileWhenMtimeNewer — file mtime forced into the
  future vs store updated_at; Sync deletes old chunk + upserts fresh one.
- New: TestSync_SkipsFileWhenMtimeOlder — file mtime backdated; Sync is
  a no-op (no upserts, no deletes).
- Updated: stubStore.known is now map[string]time.Time. A zero value
  resolves to a far-future sentinel so existing "skip if already known"
  tests keep passing without per-test setup.
- pg_test renamed KnownPaths integration → KnownPathsWithTime; asserts
  updated_at is non-zero and within 5s of insert wall-clock.

Backward compat
- brain_embeddings rows pre-dating this change carry valid updated_at
  values (column was always populated via `DEFAULT now()` + ON CONFLICT
  `updated_at = now()`). No migration needed. Live pod will start
  re-embedding any file whose source has been edited since its chunks
  were originally written.

Closes gitea/mathias/hyperguild#23.
This commit is contained in:
Mathias
2026-05-20 09:50:45 +02:00
parent 6f1cb53295
commit 437ccad904
4 changed files with 139 additions and 40 deletions

View File

@@ -8,6 +8,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"strings" "strings"
"time"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
@@ -120,21 +121,26 @@ func (s *PGStore) Search(ctx context.Context, query []float32, limit int) ([]Hit
return hits, nil return hits, nil
} }
// KnownPaths returns the path set already present in the store. Used by // KnownPathsWithTime returns every embedded chunk path paired with the
// the watcher to diff against the wiki/ tree and decide what to upsert. // row's updated_at. Sync uses the timestamps to decide whether a file
func (s *PGStore) KnownPaths(ctx context.Context) (map[string]struct{}, error) { // has been edited since its chunks were last embedded — when the file's
rows, err := s.pool.Query(ctx, `SELECT path FROM brain_embeddings`) // mtime exceeds the oldest chunk's updated_at, the file is re-embedded.
func (s *PGStore) KnownPathsWithTime(ctx context.Context) (map[string]time.Time, error) {
rows, err := s.pool.Query(ctx, `SELECT path, updated_at FROM brain_embeddings`)
if err != nil { if err != nil {
return nil, fmt.Errorf("query paths: %w", err) return nil, fmt.Errorf("query paths: %w", err)
} }
defer rows.Close() defer rows.Close()
out := make(map[string]struct{}) out := make(map[string]time.Time)
for rows.Next() { for rows.Next() {
var p string var (
if err := rows.Scan(&p); err != nil { p string
t time.Time
)
if err := rows.Scan(&p, &t); err != nil {
return nil, err return nil, err
} }
out[p] = struct{}{} out[p] = t
} }
return out, rows.Err() return out, rows.Err()
} }

View File

@@ -36,7 +36,7 @@ func freshStore(t *testing.T) (*vectorstore.PGStore, context.Context) {
t.Cleanup(s.Close) t.Cleanup(s.Close)
require.NoError(t, s.Init(ctx)) require.NoError(t, s.Init(ctx))
// Clean slate per test. // Clean slate per test.
_, _ = s.KnownPaths(ctx) _, _ = s.KnownPathsWithTime(ctx)
require.NoError(t, s.Delete(ctx, "%test-fixture%")) require.NoError(t, s.Delete(ctx, "%test-fixture%"))
return s, ctx return s, ctx
} }
@@ -67,15 +67,18 @@ func TestIntegration_UpsertAndSearch(t *testing.T) {
}) })
} }
func TestIntegration_KnownPaths(t *testing.T) { func TestIntegration_KnownPathsWithTime(t *testing.T) {
s, ctx := freshStore(t) s, ctx := freshStore(t)
before := time.Now()
require.NoError(t, s.Upsert(ctx, "wiki/k.md", vec(768, 0.5))) require.NoError(t, s.Upsert(ctx, "wiki/k.md", vec(768, 0.5)))
t.Cleanup(func() { _ = s.Delete(ctx, "wiki/k.md") }) t.Cleanup(func() { _ = s.Delete(ctx, "wiki/k.md") })
paths, err := s.KnownPaths(ctx) paths, err := s.KnownPathsWithTime(ctx)
require.NoError(t, err) require.NoError(t, err)
_, ok := paths["wiki/k.md"] at, ok := paths["wiki/k.md"]
assert.True(t, ok) require.True(t, ok)
assert.False(t, at.IsZero(), "updated_at must not be zero")
assert.WithinDuration(t, before, at, 5*time.Second, "updated_at must be recent")
} }
func TestUpsert_RejectsWrongDimension(t *testing.T) { func TestUpsert_RejectsWrongDimension(t *testing.T) {

View File

@@ -18,7 +18,11 @@ type Embedder interface {
// Store is the subset of PGStore that Sync needs. Lets tests stub it. // Store is the subset of PGStore that Sync needs. Lets tests stub it.
type Store interface { type Store interface {
KnownPaths(ctx context.Context) (map[string]struct{}, error) // KnownPathsWithTime returns every embedded chunk path paired with the
// row's updated_at. Sync uses the timestamp to detect edits — a file
// whose mtime is newer than ANY of its chunks' updated_at is re-embedded
// from scratch (old chunks deleted, fresh chunks upserted).
KnownPathsWithTime(ctx context.Context) (map[string]time.Time, error)
Upsert(ctx context.Context, path string, embedding []float32) error Upsert(ctx context.Context, path string, embedding []float32) error
Delete(ctx context.Context, path string) error Delete(ctx context.Context, path string) error
} }
@@ -58,15 +62,31 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder)
return res, nil return res, nil
} }
known, err := store.KnownPaths(ctx) known, err := store.KnownPathsWithTime(ctx)
if err != nil { if err != nil {
return res, fmt.Errorf("known paths: %w", err) return res, fmt.Errorf("known paths: %w", err)
} }
// Build a parent → "any chunk known?" set so we can skip files that // Group known chunks by parent path and remember the EARLIEST
// already have at least one chunk row in the store. // updated_at per parent. A file is considered stale if its mtime is
knownParents := make(map[string]struct{}, len(known)) // after the oldest of its chunk rows — i.e. at least one chunk hasn't
for p := range known { // been refreshed since the last edit. Also keep the full chunk-path
knownParents[ParentPath(p)] = struct{}{} // list per parent so we can delete every old chunk before re-embedding
// (handles "file shrunk → fewer chunks → orphan rows" cleanly).
type parentState struct {
minUpdatedAt time.Time
chunkPaths []string
}
parents := make(map[string]*parentState, len(known))
for p, t := range known {
parent := ParentPath(p)
ps, ok := parents[parent]
if !ok {
ps = &parentState{minUpdatedAt: t}
parents[parent] = ps
} else if t.Before(ps.minUpdatedAt) {
ps.minUpdatedAt = t
}
ps.chunkPaths = append(ps.chunkPaths, p)
} }
seenParents := make(map[string]struct{}) seenParents := make(map[string]struct{})
@@ -90,11 +110,26 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder)
relSlash := filepath.ToSlash(rel) relSlash := filepath.ToSlash(rel)
seenParents[relSlash] = struct{}{} seenParents[relSlash] = struct{}{}
if _, ok := knownParents[relSlash]; ok { if ps, ok := parents[relSlash]; ok {
// File has at least one chunk in the store already. // File already has chunks in the store. Re-embed only when
// TODO: compare mtime once Store exposes updated_at so we // the file has been edited since the oldest chunk was
// re-embed on edit. For now, skip. // written. Tolerate clock skew with a sub-second grace.
return nil info, statErr := d.Info()
if statErr != nil {
res.Errors = append(res.Errors, fmt.Errorf("stat %s: %w", relSlash, statErr))
return nil
}
if !info.ModTime().After(ps.minUpdatedAt) {
return nil
}
// Stale: delete old chunks before re-embedding so a shrunk
// file doesn't leave orphan rows at higher #NNNN indexes.
for _, oldPath := range ps.chunkPaths {
if delErr := store.Delete(ctx, oldPath); delErr != nil {
res.Errors = append(res.Errors, fmt.Errorf("delete %s for re-embed: %w", oldPath, delErr))
return nil
}
}
} }
content, readErr := os.ReadFile(path) content, readErr := os.ReadFile(path)

View File

@@ -7,6 +7,7 @@ import (
"path/filepath" "path/filepath"
"strings" "strings"
"testing" "testing"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@@ -14,16 +15,27 @@ import (
) )
type stubStore struct { type stubStore struct {
known map[string]struct{} // known maps chunk-path → updated_at. Tests that don't care about
// re-embed-on-mtime use a far-future time so the Sync skip path
// always wins. Tests that do exercise the mtime path set the
// updated_at explicitly.
known map[string]time.Time
upserts map[string][]float32 upserts map[string][]float32
deletes []string deletes []string
failNext error failNext error
} }
func (s *stubStore) KnownPaths(_ context.Context) (map[string]struct{}, error) { // farFuture is "newer than any file mtime", used as the default
out := make(map[string]struct{}, len(s.known)) // updated_at in stubs that don't care about re-embed behavior.
for k := range s.known { var farFuture = time.Now().Add(24 * time.Hour)
out[k] = struct{}{}
func (s *stubStore) KnownPathsWithTime(_ context.Context) (map[string]time.Time, error) {
out := make(map[string]time.Time, len(s.known))
for k, t := range s.known {
if t.IsZero() {
t = farFuture
}
out[k] = t
} }
return out, nil return out, nil
} }
@@ -67,7 +79,7 @@ func TestSync_AddsNewFiles(t *testing.T) {
writeNote(t, dir, "wiki/jepa-fx/facts/x.md", "body of x") writeNote(t, dir, "wiki/jepa-fx/facts/x.md", "body of x")
writeNote(t, dir, "wiki/jepa-fx/facts/y.md", "body of y") writeNote(t, dir, "wiki/jepa-fx/facts/y.md", "body of y")
store := &stubStore{known: map[string]struct{}{}} store := &stubStore{known: map[string]time.Time{}}
emb := stubEmbedder{vec: make([]float32, 768)} emb := stubEmbedder{vec: make([]float32, 768)}
res, err := vectorstore.Sync(context.Background(), dir, store, emb) res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err) require.NoError(t, err)
@@ -81,7 +93,7 @@ func TestSync_SkipsAlreadyKnown(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
writeNote(t, dir, "wiki/a/facts/x.md", "x") writeNote(t, dir, "wiki/a/facts/x.md", "x")
store := &stubStore{known: map[string]struct{}{"wiki/a/facts/x.md#0001": {}}} store := &stubStore{known: map[string]time.Time{"wiki/a/facts/x.md#0001": {}}}
emb := stubEmbedder{vec: make([]float32, 768)} emb := stubEmbedder{vec: make([]float32, 768)}
res, err := vectorstore.Sync(context.Background(), dir, store, emb) res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err) require.NoError(t, err)
@@ -93,7 +105,7 @@ func TestSync_DeletesDisappearedFiles(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755))
// store has a path that doesn't exist on disk anymore // store has a path that doesn't exist on disk anymore
store := &stubStore{known: map[string]struct{}{"wiki/old/facts/ghost.md#0001": {}}} store := &stubStore{known: map[string]time.Time{"wiki/old/facts/ghost.md#0001": {}}}
res, err := vectorstore.Sync(context.Background(), dir, &stubStoreWithDelete{stubStore: store}, stubEmbedder{vec: make([]float32, 768)}) res, err := vectorstore.Sync(context.Background(), dir, &stubStoreWithDelete{stubStore: store}, stubEmbedder{vec: make([]float32, 768)})
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, res.Deleted) assert.Equal(t, 1, res.Deleted)
@@ -111,7 +123,7 @@ func TestSync_SkipsIndexFiles(t *testing.T) {
writeNote(t, dir, "wiki/a/_index.md", "moc") writeNote(t, dir, "wiki/a/_index.md", "moc")
writeNote(t, dir, "wiki/a/facts/real.md", "body") writeNote(t, dir, "wiki/a/facts/real.md", "body")
store := &stubStore{known: map[string]struct{}{}} store := &stubStore{known: map[string]time.Time{}}
res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)}) res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)})
require.NoError(t, err) require.NoError(t, err)
assert.Equal(t, 1, res.Added) assert.Equal(t, 1, res.Added)
@@ -123,7 +135,7 @@ func TestSync_ScansKnowledgeDir(t *testing.T) {
writeNote(t, dir, "wiki/a/facts/x.md", "x") writeNote(t, dir, "wiki/a/facts/x.md", "x")
writeNote(t, dir, "knowledge/2026-05-19-koala-gpu-setup.md", "knowledge body") writeNote(t, dir, "knowledge/2026-05-19-koala-gpu-setup.md", "knowledge body")
store := &stubStore{known: map[string]struct{}{}} store := &stubStore{known: map[string]time.Time{}}
emb := stubEmbedder{vec: make([]float32, 768)} emb := stubEmbedder{vec: make([]float32, 768)}
res, err := vectorstore.Sync(context.Background(), dir, store, emb) res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err) require.NoError(t, err)
@@ -143,7 +155,7 @@ func TestSync_ChunksLongFiles(t *testing.T) {
} }
writeNote(t, dir, "knowledge/long.md", body) writeNote(t, dir, "knowledge/long.md", body)
store := &stubStore{known: map[string]struct{}{}} store := &stubStore{known: map[string]time.Time{}}
emb := stubEmbedder{vec: make([]float32, 768)} emb := stubEmbedder{vec: make([]float32, 768)}
res, err := vectorstore.Sync(context.Background(), dir, store, emb) res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err) require.NoError(t, err)
@@ -164,7 +176,7 @@ func TestSync_ShortFileGetsSingleChunkRow(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
writeNote(t, dir, "wiki/short.md", "tiny body\n") writeNote(t, dir, "wiki/short.md", "tiny body\n")
store := &stubStore{known: map[string]struct{}{}} store := &stubStore{known: map[string]time.Time{}}
emb := stubEmbedder{vec: make([]float32, 768)} emb := stubEmbedder{vec: make([]float32, 768)}
res, err := vectorstore.Sync(context.Background(), dir, store, emb) res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err) require.NoError(t, err)
@@ -176,7 +188,7 @@ func TestSync_SkipsFileIfAnyChunkAlreadyKnown(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
writeNote(t, dir, "wiki/foo.md", "body\n") writeNote(t, dir, "wiki/foo.md", "body\n")
store := &stubStore{known: map[string]struct{}{ store := &stubStore{known: map[string]time.Time{
"wiki/foo.md#0001": {}, "wiki/foo.md#0001": {},
}} }}
emb := stubEmbedder{vec: make([]float32, 768)} emb := stubEmbedder{vec: make([]float32, 768)}
@@ -189,7 +201,7 @@ func TestSync_SkipsFileIfAnyChunkAlreadyKnown(t *testing.T) {
func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) { func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755))
store := &stubStore{known: map[string]struct{}{ store := &stubStore{known: map[string]time.Time{
"wiki/ghost.md#0001": {}, "wiki/ghost.md#0001": {},
"wiki/ghost.md#0002": {}, "wiki/ghost.md#0002": {},
"wiki/ghost.md#0003": {}, "wiki/ghost.md#0003": {},
@@ -199,6 +211,49 @@ func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) {
assert.Equal(t, 3, res.Deleted) assert.Equal(t, 3, res.Deleted)
} }
func TestSync_ReembedsFileWhenMtimeNewer(t *testing.T) {
dir := t.TempDir()
writeNote(t, dir, "wiki/edited.md", "original body\n")
// Force the file's mtime ahead of any plausible store updated_at.
future := time.Now().Add(1 * time.Hour)
require.NoError(t, os.Chtimes(filepath.Join(dir, "wiki/edited.md"), future, future))
store := &stubStore{
known: map[string]time.Time{
// Existing chunk row pre-dates the file's mtime.
"wiki/edited.md#0001": time.Now().Add(-1 * time.Hour),
},
}
emb := stubEmbedder{vec: make([]float32, 768)}
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err)
assert.Equal(t, 1, res.Added, "file with newer mtime should be re-embedded")
assert.Contains(t, store.upserts, "wiki/edited.md#0001")
// Old chunks of the same parent must be deleted before re-embed so
// shrunk files don't leave orphan rows at higher #NNNN indexes.
assert.Contains(t, store.deletes, "wiki/edited.md#0001")
}
func TestSync_SkipsFileWhenMtimeOlder(t *testing.T) {
dir := t.TempDir()
writeNote(t, dir, "wiki/stable.md", "body\n")
// Backdate mtime to before the store's recorded updated_at.
past := time.Now().Add(-2 * time.Hour)
require.NoError(t, os.Chtimes(filepath.Join(dir, "wiki/stable.md"), past, past))
store := &stubStore{
known: map[string]time.Time{
"wiki/stable.md#0001": time.Now(),
},
}
emb := stubEmbedder{vec: make([]float32, 768)}
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err)
assert.Equal(t, 0, res.Added)
assert.Empty(t, store.upserts)
assert.Empty(t, store.deletes)
}
func TestSync_NoOpWhenComponentsNil(t *testing.T) { func TestSync_NoOpWhenComponentsNil(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
writeNote(t, dir, "wiki/a/facts/x.md", "x") writeNote(t, dir, "wiki/a/facts/x.md", "x")
@@ -210,7 +265,7 @@ func TestSync_NoOpWhenComponentsNil(t *testing.T) {
func TestSync_CollectsEmbedderErrors(t *testing.T) { func TestSync_CollectsEmbedderErrors(t *testing.T) {
dir := t.TempDir() dir := t.TempDir()
writeNote(t, dir, "wiki/a/facts/x.md", "x") writeNote(t, dir, "wiki/a/facts/x.md", "x")
store := &stubStore{known: map[string]struct{}{}} store := &stubStore{known: map[string]time.Time{}}
emb := stubEmbedder{err: errors.New("upstream down")} emb := stubEmbedder{err: errors.New("upstream down")}
res, err := vectorstore.Sync(context.Background(), dir, store, emb) res, err := vectorstore.Sync(context.Background(), dir, store, emb)
require.NoError(t, err) require.NoError(t, err)