fix(ingestion): embed sync also scans brain/knowledge/ + logs per-item errors
The embed sync goroutine only walked brain/wiki/. brain/knowledge/ (112 curated entries, per CLAUDE.md the most-important brain content) had zero coverage in brain_embeddings — vector retrieval was blind to it. Hybrid BM25 + pgvector retrieval would never surface a curated knowledge entry via the vector arm. Extract the per-root walk into a loop over a small subdir list and add "knowledge" alongside "wiki". scanDirs is package-level so it stays a single source of truth for what gets embedded. Also log each failing item's path + error string from StartSync. Previously only the aggregate count was logged, so a persistent `errors=1` per cycle was opaque. With per-item warnings, the actual ollama "input length exceeds the context length" surface immediately. Refs gitea/mathias/infra#37 (this commit covers the knowledge/ scan bug; the long-file chunking bug is a separate change.)
This commit is contained in:
@@ -32,7 +32,13 @@ type SyncResult struct {
|
|||||||
Errors []error
|
Errors []error
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync brings the embedding store in line with brain/wiki/ on disk:
|
// scanDirs is the set of brainDir subdirectories whose .md files are
|
||||||
|
// embedded for vector retrieval. wiki/ holds LLM-extracted entity and
|
||||||
|
// source pages; knowledge/ holds curated hand-written entries.
|
||||||
|
var scanDirs = []string{"wiki", "knowledge"}
|
||||||
|
|
||||||
|
// Sync brings the embedding store in line with brain/{wiki,knowledge}/
|
||||||
|
// on disk:
|
||||||
// - new files (in the tree, not in the store) get embedded + upserted
|
// - new files (in the tree, not in the store) get embedded + upserted
|
||||||
// - files whose mtime exceeds the store's updated_at get re-embedded
|
// - files whose mtime exceeds the store's updated_at get re-embedded
|
||||||
// - files no longer on disk get deleted from the store
|
// - files no longer on disk get deleted from the store
|
||||||
@@ -51,50 +57,52 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder)
|
|||||||
}
|
}
|
||||||
seen := make(map[string]struct{})
|
seen := make(map[string]struct{})
|
||||||
|
|
||||||
wikiDir := filepath.Join(brainDir, "wiki")
|
for _, sub := range scanDirs {
|
||||||
if _, err := os.Stat(wikiDir); os.IsNotExist(err) {
|
root := filepath.Join(brainDir, sub)
|
||||||
return res, nil
|
if _, err := os.Stat(root); os.IsNotExist(err) {
|
||||||
}
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
err = filepath.WalkDir(wikiDir, func(path string, d os.DirEntry, err error) error {
|
err = filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if d.IsDir() || !strings.HasSuffix(path, ".md") || d.Name() == "_index.md" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
rel, err := filepath.Rel(brainDir, path)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
relSlash := filepath.ToSlash(rel)
|
||||||
|
seen[relSlash] = struct{}{}
|
||||||
|
|
||||||
|
if _, ok := known[relSlash]; ok {
|
||||||
|
// Already embedded — TODO: compare mtime once Store exposes
|
||||||
|
// updated_at so we re-embed on edit. For now, skip.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
content, readErr := os.ReadFile(path)
|
||||||
|
if readErr != nil {
|
||||||
|
res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
vec, embErr := embedder.Embed(ctx, string(content))
|
||||||
|
if embErr != nil {
|
||||||
|
res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", relSlash, embErr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if upErr := store.Upsert(ctx, relSlash, vec); upErr != nil {
|
||||||
|
res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", relSlash, upErr))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
res.Added++
|
||||||
|
return nil
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return res, fmt.Errorf("walk %s: %w", sub, err)
|
||||||
}
|
}
|
||||||
if d.IsDir() || !strings.HasSuffix(path, ".md") || d.Name() == "_index.md" {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
rel, err := filepath.Rel(brainDir, path)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
relSlash := filepath.ToSlash(rel)
|
|
||||||
seen[relSlash] = struct{}{}
|
|
||||||
|
|
||||||
if _, ok := known[relSlash]; ok {
|
|
||||||
// Already embedded — TODO: compare mtime once Store exposes
|
|
||||||
// updated_at so we re-embed on edit. For now, skip.
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
content, readErr := os.ReadFile(path)
|
|
||||||
if readErr != nil {
|
|
||||||
res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
vec, embErr := embedder.Embed(ctx, string(content))
|
|
||||||
if embErr != nil {
|
|
||||||
res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", relSlash, embErr))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if upErr := store.Upsert(ctx, relSlash, vec); upErr != nil {
|
|
||||||
res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", relSlash, upErr))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
res.Added++
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return res, fmt.Errorf("walk wiki: %w", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drop rows whose file is gone.
|
// Drop rows whose file is gone.
|
||||||
@@ -125,6 +133,9 @@ func StartSync(ctx context.Context, brainDir string, store Store, embedder Embed
|
|||||||
slog.Error("embed sync failed", "err", err)
|
slog.Error("embed sync failed", "err", err)
|
||||||
} else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 {
|
} else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 {
|
||||||
slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors))
|
slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors))
|
||||||
|
for _, e := range r.Errors {
|
||||||
|
slog.Warn("embed sync item failed", "err", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
|||||||
@@ -117,6 +117,20 @@ func TestSync_SkipsIndexFiles(t *testing.T) {
|
|||||||
assert.NotContains(t, store.upserts, "wiki/a/_index.md")
|
assert.NotContains(t, store.upserts, "wiki/a/_index.md")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSync_ScansKnowledgeDir(t *testing.T) {
|
||||||
|
dir := t.TempDir()
|
||||||
|
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||||
|
writeNote(t, dir, "knowledge/2026-05-19-koala-gpu-setup.md", "knowledge body")
|
||||||
|
|
||||||
|
store := &stubStore{known: map[string]struct{}{}}
|
||||||
|
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||||
|
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 2, res.Added)
|
||||||
|
assert.Contains(t, store.upserts, "wiki/a/facts/x.md")
|
||||||
|
assert.Contains(t, store.upserts, "knowledge/2026-05-19-koala-gpu-setup.md")
|
||||||
|
}
|
||||||
|
|
||||||
func TestSync_NoOpWhenComponentsNil(t *testing.T) {
|
func TestSync_NoOpWhenComponentsNil(t *testing.T) {
|
||||||
dir := t.TempDir()
|
dir := t.TempDir()
|
||||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||||
|
|||||||
Reference in New Issue
Block a user