package vectorstore import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "time" ) // Embedder produces dense vectors. The embed package's Client satisfies // this; it's declared locally so vectorstore doesn't depend on embed. type Embedder interface { Embed(ctx context.Context, text string) ([]float32, error) } // Store is the subset of PGStore that Sync needs. Lets tests stub it. type Store interface { // KnownPathsWithTime returns every embedded chunk path paired with the // row's updated_at. Sync uses the timestamp to detect edits — a file // whose mtime is newer than ANY of its chunks' updated_at is re-embedded // from scratch (old chunks deleted, fresh chunks upserted). KnownPathsWithTime(ctx context.Context) (map[string]time.Time, error) Upsert(ctx context.Context, path string, embedding []float32) error Delete(ctx context.Context, path string) error } // SyncResult tallies what Sync did. Returned for logs / metrics; callers // generally don't act on the fields directly. type SyncResult struct { Added int Updated int Deleted int Errors []error } // scanDirs is the set of brainDir subdirectories whose .md files are // embedded for vector retrieval. wiki/ holds LLM-extracted entity and // source pages; knowledge/ holds curated hand-written entries. var scanDirs = []string{"wiki", "knowledge"} // maxChunkBytes is the per-chunk byte budget passed to ChunkMarkdown. // Sized to fit comfortably under nomic-embed-text's 2048-token default // context (~4 chars/token for English markdown → ~8 KB ceiling; we sit // at 4 KB to leave headroom for unicode, code blocks, and tokenizer // variance). const maxChunkBytes = 4000 // Sync brings the embedding store in line with brain/{wiki,knowledge}/ // on disk: // - new files (in the tree, not in the store) get embedded + upserted // - files whose mtime exceeds the store's updated_at get re-embedded // - files no longer on disk get deleted from the store // // Designed to be called on a ticker. Best-effort: per-file errors are // collected into SyncResult.Errors and do not abort the run. func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) (SyncResult, error) { var res SyncResult if store == nil || embedder == nil { return res, nil } known, err := store.KnownPathsWithTime(ctx) if err != nil { return res, fmt.Errorf("known paths: %w", err) } // Group known chunks by parent path and remember the EARLIEST // updated_at per parent. A file is considered stale if its mtime is // after the oldest of its chunk rows — i.e. at least one chunk hasn't // been refreshed since the last edit. Also keep the full chunk-path // list per parent so we can delete every old chunk before re-embedding // (handles "file shrunk → fewer chunks → orphan rows" cleanly). type parentState struct { minUpdatedAt time.Time chunkPaths []string } parents := make(map[string]*parentState, len(known)) for p, t := range known { parent := ParentPath(p) ps, ok := parents[parent] if !ok { ps = &parentState{minUpdatedAt: t} parents[parent] = ps } else if t.Before(ps.minUpdatedAt) { ps.minUpdatedAt = t } ps.chunkPaths = append(ps.chunkPaths, p) } seenParents := make(map[string]struct{}) for _, sub := range scanDirs { root := filepath.Join(brainDir, sub) if _, err := os.Stat(root); os.IsNotExist(err) { continue } err = filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error { if err != nil { return err } if d.IsDir() || !strings.HasSuffix(path, ".md") || d.Name() == "_index.md" { return nil } rel, err := filepath.Rel(brainDir, path) if err != nil { return err } relSlash := filepath.ToSlash(rel) seenParents[relSlash] = struct{}{} if ps, ok := parents[relSlash]; ok { // File already has chunks in the store. Re-embed only when // the file has been edited since the oldest chunk was // written. Tolerate clock skew with a sub-second grace. info, statErr := d.Info() if statErr != nil { res.Errors = append(res.Errors, fmt.Errorf("stat %s: %w", relSlash, statErr)) return nil } if !info.ModTime().After(ps.minUpdatedAt) { return nil } // Stale: delete old chunks before re-embedding so a shrunk // file doesn't leave orphan rows at higher #NNNN indexes. for _, oldPath := range ps.chunkPaths { if delErr := store.Delete(ctx, oldPath); delErr != nil { res.Errors = append(res.Errors, fmt.Errorf("delete %s for re-embed: %w", oldPath, delErr)) return nil } } } content, readErr := os.ReadFile(path) if readErr != nil { res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr)) return nil } chunks := NumberChunks(relSlash, ChunkMarkdown(string(content), maxChunkBytes)) for _, ch := range chunks { vec, embErr := embedder.Embed(ctx, ch.Content) if embErr != nil { res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", ch.Path, embErr)) continue } if upErr := store.Upsert(ctx, ch.Path, vec); upErr != nil { res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", ch.Path, upErr)) continue } res.Added++ } return nil }) if err != nil { return res, fmt.Errorf("walk %s: %w", sub, err) } } // Drop chunk rows whose parent file is gone. for path := range known { if _, ok := seenParents[ParentPath(path)]; ok { continue } if err := store.Delete(ctx, path); err != nil { res.Errors = append(res.Errors, fmt.Errorf("delete %s: %w", path, err)) continue } res.Deleted++ } return res, nil } // StartSync launches Sync on a ticker in a background goroutine. The // goroutine exits when ctx is cancelled. Failures are logged via slog. func StartSync(ctx context.Context, brainDir string, store Store, embedder Embedder, interval time.Duration) { if interval <= 0 { interval = 5 * time.Minute } go func() { t := time.NewTicker(interval) defer t.Stop() // Run once immediately so first-boot doesn't wait a full tick. if r, err := Sync(ctx, brainDir, store, embedder); err != nil { slog.Error("embed sync failed", "err", err) } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) for _, e := range r.Errors { slog.Warn("embed sync item failed", "err", e) } } for { select { case <-ctx.Done(): return case <-t.C: if r, err := Sync(ctx, brainDir, store, embedder); err != nil { slog.Error("embed sync failed", "err", err) } else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 { slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors)) } } } }() }