From 37fdd33b2dc22d8a2a0f8672b4cc5ce99945ea14 Mon Sep 17 00:00:00 2001 From: Mathias Date: Tue, 19 May 2026 21:57:09 +0200 Subject: [PATCH] feat(ingestion): chunk markdown before embedding (#38) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Long markdown files (>~8KB) silently failed to embed because nomic-embed-text on iguana has a 2048-token context. embed sync logged errors=1 every cycle with no useful body until #37 added per-item logging — three files exceed the ceiling: finbert source (8 KB), koala-machine-state (7.1 KB), litellm-absorption (8.8 KB). Curated knowledge entries should never be vector-blind. Approach: chunk-before-embed, no schema change. vectorstore/chunk.go (new) - ChunkMarkdown splits at H1/H2 boundaries; sections over maxBytes are further split at paragraph boundaries, packing greedily under budget. - NumberChunks assigns "#NNNN" storage paths (1-based, zero-padded to 4 digits — handles files with up to ~10k sections in stable sort order). - ParentPath strips the chunk suffix for retrieval-side dedup. vectorstore/sync.go - After ChunkMarkdown produces N pieces, each is embedded + upserted as a separate brain_embeddings row at "#NNNN". maxChunkBytes = 4000 (≈1000 nomic tokens, well under the 2048 ceiling with headroom for unicode/code blocks). - "Already embedded?" check now reduces known paths to parent set via ParentPath, so the first chunk hit short-circuits the file. - Delete walk also reduces via ParentPath; when a parent file disappears, every chunk row (and any pre-existing bare-path row, for backward compatibility with rows written before this change) gets dropped. search/search.go - hybridMerge collapses chunk-path vector hits to parent via ParentPath before scope check, RRF accumulation, and hydration. A file with three chunk hits returns one result row, not three. Backward compatibility: pre-existing bare-path rows in brain_embeddings keep working — ParentPath returns them unchanged, knownParents handles them as if they were "wiki/foo.md#NNNN" hits, sync skips re-embed, and search dedup is a no-op for them. No migration required to ship. Tests: - chunk_test.go covers short / heading split / oversized section / content preservation / chunk numbering / parent-path stripping. - sync_test.go adds long-file chunking, single-chunk-row short file, skip-if-any-chunk-known, delete-all-chunks-of-disappeared-file. Existing tests updated for #NNNN paths. - search_test.go adds chunk-paths-dedupe-to-parent. Closes gitea/mathias/infra#38. --- ingestion/internal/search/search.go | 17 ++- ingestion/internal/search/search_test.go | 30 ++++ ingestion/internal/vectorstore/chunk.go | 137 +++++++++++++++++++ ingestion/internal/vectorstore/chunk_test.go | 72 ++++++++++ ingestion/internal/vectorstore/sync.go | 49 ++++--- ingestion/internal/vectorstore/sync_test.go | 82 ++++++++++- 6 files changed, 358 insertions(+), 29 deletions(-) create mode 100644 ingestion/internal/vectorstore/chunk.go create mode 100644 ingestion/internal/vectorstore/chunk_test.go diff --git a/ingestion/internal/search/search.go b/ingestion/internal/search/search.go index 6a008a4..d5c289b 100644 --- a/ingestion/internal/search/search.go +++ b/ingestion/internal/search/search.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/mathiasbq/hyperguild/ingestion/internal/brain" + "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" ) // VectorSearcher returns the top-limit nearest paths by cosine @@ -186,17 +187,21 @@ func hybridMerge(ctx context.Context, brainDir string, opts QueryOptions, bm25 [ byPath[r.Path] = r } for rank, h := range hits { - if opts.Wing != "" && !pathInScope(h.Path, opts.Wing, opts.Hall) { + // Vector store keys are chunk paths ("wiki/foo.md#0001"); collapse + // back to the parent so multiple chunk hits from the same file + // score against a single result row. + parent := vectorstore.ParentPath(h.Path) + if opts.Wing != "" && !pathInScope(parent, opts.Wing, opts.Hall) { continue } - rrf[h.Path] += 1.0 / (rrfK + float64(rank+1)) - if _, seen := byPath[h.Path]; !seen { - r, err := hydrate(brainDir, h.Path) + rrf[parent] += 1.0 / (rrfK + float64(rank+1)) + if _, seen := byPath[parent]; !seen { + r, err := hydrate(brainDir, parent) if err != nil { - slog.Warn("search: hydrate failed for vector hit", "path", h.Path, "err", err) + slog.Warn("search: hydrate failed for vector hit", "path", parent, "err", err) continue } - byPath[h.Path] = r + byPath[parent] = r } } diff --git a/ingestion/internal/search/search_test.go b/ingestion/internal/search/search_test.go index 87af707..db7a7f1 100644 --- a/ingestion/internal/search/search_test.go +++ b/ingestion/internal/search/search_test.go @@ -55,6 +55,36 @@ func TestSearch_HybridRRFPromotesVectorOnlyHit(t *testing.T) { assert.Contains(t, paths, "wiki/jepa-fx/facts/semantic.md") } +func TestSearch_HybridDedupesChunkPathsToParent(t *testing.T) { + dir := t.TempDir() + full := filepath.Join(dir, "knowledge", "long.md") + require.NoError(t, os.MkdirAll(filepath.Dir(full), 0o755)) + // Body contains the BM25 keyword "alpaca" so hybridMerge actually runs + // (it only kicks in when BM25 returns at least one candidate). + require.NoError(t, os.WriteFile(full, []byte("---\ntitle: Long\n---\nalpaca content.\n"), 0o644)) + + embedder := stubEmbedder{vec: []float32{0.1}} + // Vector store returns three chunk-path hits all pointing at the same + // parent file. The merged result must surface ONE row per parent — not + // three rows with chunk-suffixed paths. + vector := stubVector{hits: []search.VectorHit{ + {Path: "knowledge/long.md#0001", Distance: 0.05}, + {Path: "knowledge/long.md#0002", Distance: 0.07}, + {Path: "knowledge/long.md#0003", Distance: 0.09}, + }} + + got, err := search.Query(dir, search.QueryOptions{ + Query: "alpaca", + Limit: 5, + Vector: vector, + Embedder: embedder, + }) + require.NoError(t, err) + require.Len(t, got, 1, "three chunk hits for one parent must merge to one result") + assert.Equal(t, "knowledge/long.md", got[0].Path) + assert.Equal(t, "Long", got[0].Title) +} + func TestSearch_HybridFallsBackOnEmbedderError(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) diff --git a/ingestion/internal/vectorstore/chunk.go b/ingestion/internal/vectorstore/chunk.go new file mode 100644 index 0000000..b1e8aad --- /dev/null +++ b/ingestion/internal/vectorstore/chunk.go @@ -0,0 +1,137 @@ +package vectorstore + +import ( + "fmt" + "strings" +) + +// NumberedChunk pairs a chunk's body with the storage path it will use +// in brain_embeddings. Path format: "#NNNN" where NNNN is the +// 1-based chunk index zero-padded to 4 digits. +type NumberedChunk struct { + Path string + Content string +} + +// ParentPath returns the file path with any "#NNNN" chunk suffix removed. +// Inputs without a "#" are returned unchanged. Used by search to dedupe +// chunk-level hits back to a single document per result. +func ParentPath(p string) string { + if i := strings.Index(p, "#"); i >= 0 { + return p[:i] + } + return p +} + +// NumberChunks assigns "#NNNN" storage paths to a slice of chunk +// bodies, indexed from 0001. Empty chunks are dropped. +func NumberChunks(parent string, chunks []string) []NumberedChunk { + out := make([]NumberedChunk, 0, len(chunks)) + idx := 1 + for _, c := range chunks { + if strings.TrimSpace(c) == "" { + continue + } + out = append(out, NumberedChunk{ + Path: fmt.Sprintf("%s#%04d", parent, idx), + Content: c, + }) + idx++ + } + return out +} + +// ChunkMarkdown splits a markdown document into embedding-sized pieces. +// Strategy: +// 1. Split at H1/H2 headings (top-of-line "#" or "##"). The intro before +// the first heading is its own chunk. +// 2. Any section larger than maxBytes is further split at paragraph +// boundaries (blank lines), packing paragraphs greedily under the +// byte budget. +// +// The function aims for "fits comfortably under nomic-embed-text's 2048- +// token context" — at ~4 chars/token for English markdown, maxBytes ≈ 4000 +// is a safe call-site default. +func ChunkMarkdown(content string, maxBytes int) []string { + if maxBytes <= 0 { + maxBytes = 4000 + } + sections := splitAtHeadings(content) + + out := make([]string, 0, len(sections)) + for _, s := range sections { + if len(s) <= maxBytes { + out = append(out, s) + continue + } + out = append(out, splitAtParagraphs(s, maxBytes)...) + } + return out +} + +// splitAtHeadings cuts content into sections that each start with an +// "# " or "## " line (intro before any heading is the leading section). +func splitAtHeadings(content string) []string { + lines := strings.Split(content, "\n") + var sections []string + var cur strings.Builder + flush := func() { + if cur.Len() == 0 { + return + } + // Trim all trailing whitespace then re-add a single newline so a + // single-paragraph file round-trips to its original content rather + // than accumulating extra newlines from the empty-line split. + s := strings.TrimRight(cur.String(), "\n") + sections = append(sections, s+"\n") + cur.Reset() + } + for _, ln := range lines { + trimmed := strings.TrimLeft(ln, " ") + isH := strings.HasPrefix(trimmed, "# ") || strings.HasPrefix(trimmed, "## ") + if isH && cur.Len() > 0 { + flush() + } + cur.WriteString(ln) + cur.WriteByte('\n') + } + flush() + // Drop empty / whitespace-only trailing section (common when content + // itself ends with a "\n" — Split leaves a final empty element). + if n := len(sections); n > 0 && strings.TrimSpace(sections[n-1]) == "" { + sections = sections[:n-1] + } + return sections +} + +// splitAtParagraphs packs paragraphs (blank-line separated blocks) into +// sub-chunks of at most maxBytes. A single paragraph that itself exceeds +// maxBytes is emitted as one over-budget chunk rather than being split +// mid-sentence — better to over-spend a little than truncate prose. +func splitAtParagraphs(section string, maxBytes int) []string { + paras := strings.Split(section, "\n\n") + var out []string + var cur strings.Builder + for _, p := range paras { + if p == "" { + continue + } + // +2 for the "\n\n" rejoin if cur isn't empty + need := len(p) + if cur.Len() > 0 { + need += 2 + } + if cur.Len() > 0 && cur.Len()+need > maxBytes { + out = append(out, cur.String()) + cur.Reset() + } + if cur.Len() > 0 { + cur.WriteString("\n\n") + } + cur.WriteString(p) + } + if cur.Len() > 0 { + out = append(out, cur.String()) + } + return out +} diff --git a/ingestion/internal/vectorstore/chunk_test.go b/ingestion/internal/vectorstore/chunk_test.go new file mode 100644 index 0000000..ab9414a --- /dev/null +++ b/ingestion/internal/vectorstore/chunk_test.go @@ -0,0 +1,72 @@ +package vectorstore_test + +import ( + "strings" + "testing" + + "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestChunkMarkdown_ShortFileFitsInOne(t *testing.T) { + out := vectorstore.ChunkMarkdown("Just a short paragraph.\n", 4000) + require.Len(t, out, 1) + assert.Equal(t, "Just a short paragraph.\n", out[0]) +} + +func TestChunkMarkdown_SplitsAtHeadings(t *testing.T) { + src := "# Top\n\nintro\n\n## A\n\nbody a\n\n## B\n\nbody b\n" + out := vectorstore.ChunkMarkdown(src, 50) // tiny limit forces per-section split + + assert.GreaterOrEqual(t, len(out), 2, "should split at H2 boundaries") + // Each chunk should start with a heading (top-level intro chunk OK without one) + for i, c := range out { + if i == 0 { + continue + } + assert.True(t, strings.HasPrefix(strings.TrimSpace(c), "#"), + "non-first chunk %d should start with heading: %q", i, c) + } +} + +func TestChunkMarkdown_FurtherSplitsOversizedSection(t *testing.T) { + // One H2 section with 4 paragraphs of ~80 chars each, limit 100. + src := "## big\n\n" + + strings.Repeat("paragraph one is moderately long.\n\n", 1) + + strings.Repeat("paragraph two also moderately long.\n\n", 1) + + strings.Repeat("paragraph three is moderately long.\n\n", 1) + + strings.Repeat("paragraph four is moderately long.\n\n", 1) + out := vectorstore.ChunkMarkdown(src, 100) + + assert.Greater(t, len(out), 1, "oversized section should sub-split at paragraph boundaries") + for i, c := range out { + assert.LessOrEqual(t, len(c), 200, + "chunk %d exceeds 2x maxBytes: %d", i, len(c)) + } +} + +func TestChunkMarkdown_PreservesContent(t *testing.T) { + src := "# H1\n\nfirst section body.\n\n## H2a\n\nsecond section body.\n\n## H2b\n\nthird section body.\n" + out := vectorstore.ChunkMarkdown(src, 50) + joined := strings.Join(out, "") + // All non-whitespace tokens from src must appear in the joined output + for _, token := range []string{"H1", "first", "H2a", "second", "H2b", "third"} { + assert.Contains(t, joined, token, "token %q missing after chunking", token) + } +} + +func TestChunkMarkdown_NumberedSuffix(t *testing.T) { + out := vectorstore.NumberChunks("knowledge/foo.md", []string{"a", "b", "c"}) + require.Len(t, out, 3) + assert.Equal(t, "knowledge/foo.md#0001", out[0].Path) + assert.Equal(t, "knowledge/foo.md#0002", out[1].Path) + assert.Equal(t, "knowledge/foo.md#0003", out[2].Path) + assert.Equal(t, "a", out[0].Content) +} + +func TestParentPath_StripsChunkSuffix(t *testing.T) { + assert.Equal(t, "knowledge/foo.md", vectorstore.ParentPath("knowledge/foo.md#0001")) + assert.Equal(t, "knowledge/foo.md", vectorstore.ParentPath("knowledge/foo.md")) + assert.Equal(t, "wiki/a/b.md", vectorstore.ParentPath("wiki/a/b.md#9999")) +} diff --git a/ingestion/internal/vectorstore/sync.go b/ingestion/internal/vectorstore/sync.go index 9cc33fb..d1591fa 100644 --- a/ingestion/internal/vectorstore/sync.go +++ b/ingestion/internal/vectorstore/sync.go @@ -37,6 +37,13 @@ type SyncResult struct { // source pages; knowledge/ holds curated hand-written entries. var scanDirs = []string{"wiki", "knowledge"} +// maxChunkBytes is the per-chunk byte budget passed to ChunkMarkdown. +// Sized to fit comfortably under nomic-embed-text's 2048-token default +// context (~4 chars/token for English markdown → ~8 KB ceiling; we sit +// at 4 KB to leave headroom for unicode, code blocks, and tokenizer +// variance). +const maxChunkBytes = 4000 + // Sync brings the embedding store in line with brain/{wiki,knowledge}/ // on disk: // - new files (in the tree, not in the store) get embedded + upserted @@ -55,7 +62,13 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) if err != nil { return res, fmt.Errorf("known paths: %w", err) } - seen := make(map[string]struct{}) + // Build a parent → "any chunk known?" set so we can skip files that + // already have at least one chunk row in the store. + knownParents := make(map[string]struct{}, len(known)) + for p := range known { + knownParents[ParentPath(p)] = struct{}{} + } + seenParents := make(map[string]struct{}) for _, sub := range scanDirs { root := filepath.Join(brainDir, sub) @@ -75,11 +88,12 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) return err } relSlash := filepath.ToSlash(rel) - seen[relSlash] = struct{}{} + seenParents[relSlash] = struct{}{} - if _, ok := known[relSlash]; ok { - // Already embedded — TODO: compare mtime once Store exposes - // updated_at so we re-embed on edit. For now, skip. + if _, ok := knownParents[relSlash]; ok { + // File has at least one chunk in the store already. + // TODO: compare mtime once Store exposes updated_at so we + // re-embed on edit. For now, skip. return nil } @@ -88,16 +102,19 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr)) return nil } - vec, embErr := embedder.Embed(ctx, string(content)) - if embErr != nil { - res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", relSlash, embErr)) - return nil + chunks := NumberChunks(relSlash, ChunkMarkdown(string(content), maxChunkBytes)) + for _, ch := range chunks { + vec, embErr := embedder.Embed(ctx, ch.Content) + if embErr != nil { + res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", ch.Path, embErr)) + continue + } + if upErr := store.Upsert(ctx, ch.Path, vec); upErr != nil { + res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", ch.Path, upErr)) + continue + } + res.Added++ } - if upErr := store.Upsert(ctx, relSlash, vec); upErr != nil { - res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", relSlash, upErr)) - return nil - } - res.Added++ return nil }) if err != nil { @@ -105,9 +122,9 @@ func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) } } - // Drop rows whose file is gone. + // Drop chunk rows whose parent file is gone. for path := range known { - if _, ok := seen[path]; ok { + if _, ok := seenParents[ParentPath(path)]; ok { continue } if err := store.Delete(ctx, path); err != nil { diff --git a/ingestion/internal/vectorstore/sync_test.go b/ingestion/internal/vectorstore/sync_test.go index a4139c7..54e67a6 100644 --- a/ingestion/internal/vectorstore/sync_test.go +++ b/ingestion/internal/vectorstore/sync_test.go @@ -5,6 +5,7 @@ import ( "errors" "os" "path/filepath" + "strings" "testing" "github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore" @@ -72,15 +73,15 @@ func TestSync_AddsNewFiles(t *testing.T) { require.NoError(t, err) assert.Equal(t, 2, res.Added) assert.Empty(t, res.Deleted) - assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/x.md") - assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/y.md") + assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/x.md#0001") + assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/y.md#0001") } func TestSync_SkipsAlreadyKnown(t *testing.T) { dir := t.TempDir() writeNote(t, dir, "wiki/a/facts/x.md", "x") - store := &stubStore{known: map[string]struct{}{"wiki/a/facts/x.md": {}}} + store := &stubStore{known: map[string]struct{}{"wiki/a/facts/x.md#0001": {}}} emb := stubEmbedder{vec: make([]float32, 768)} res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err) @@ -92,7 +93,7 @@ func TestSync_DeletesDisappearedFiles(t *testing.T) { dir := t.TempDir() require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) // store has a path that doesn't exist on disk anymore - store := &stubStore{known: map[string]struct{}{"wiki/old/facts/ghost.md": {}}} + store := &stubStore{known: map[string]struct{}{"wiki/old/facts/ghost.md#0001": {}}} res, err := vectorstore.Sync(context.Background(), dir, &stubStoreWithDelete{stubStore: store}, stubEmbedder{vec: make([]float32, 768)}) require.NoError(t, err) assert.Equal(t, 1, res.Deleted) @@ -114,7 +115,7 @@ func TestSync_SkipsIndexFiles(t *testing.T) { res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)}) require.NoError(t, err) assert.Equal(t, 1, res.Added) - assert.NotContains(t, store.upserts, "wiki/a/_index.md") + assert.NotContains(t, store.upserts, "wiki/a/_index.md#0001") } func TestSync_ScansKnowledgeDir(t *testing.T) { @@ -127,8 +128,75 @@ func TestSync_ScansKnowledgeDir(t *testing.T) { res, err := vectorstore.Sync(context.Background(), dir, store, emb) require.NoError(t, err) assert.Equal(t, 2, res.Added) - assert.Contains(t, store.upserts, "wiki/a/facts/x.md") - assert.Contains(t, store.upserts, "knowledge/2026-05-19-koala-gpu-setup.md") + assert.Contains(t, store.upserts, "wiki/a/facts/x.md#0001") + assert.Contains(t, store.upserts, "knowledge/2026-05-19-koala-gpu-setup.md#0001") +} + +func TestSync_ChunksLongFiles(t *testing.T) { + dir := t.TempDir() + // Build a file that's well over the chunk byte budget. Multi-section + // markdown so the chunker has heading boundaries to cut on. + body := "# Doc\n\nintro line.\n\n" + for i := 0; i < 10; i++ { + body += "## Section " + string(rune('A'+i)) + "\n\n" + body += strings.Repeat("This section has a fair amount of content. ", 50) + "\n\n" + } + writeNote(t, dir, "knowledge/long.md", body) + + store := &stubStore{known: map[string]struct{}{}} + emb := stubEmbedder{vec: make([]float32, 768)} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Greater(t, res.Added, 1, "long file should produce multiple chunk rows") + // Every upserted path for this file must be a chunk path. + chunkCount := 0 + for p := range store.upserts { + if strings.HasPrefix(p, "knowledge/long.md#") { + chunkCount++ + } + } + assert.Equal(t, res.Added, chunkCount, "all rows for long file should be chunk-suffixed") + // The bare parent path must NOT be upserted directly. + assert.NotContains(t, store.upserts, "knowledge/long.md") +} + +func TestSync_ShortFileGetsSingleChunkRow(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/short.md", "tiny body\n") + + store := &stubStore{known: map[string]struct{}{}} + emb := stubEmbedder{vec: make([]float32, 768)} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Equal(t, 1, res.Added) + assert.Contains(t, store.upserts, "wiki/short.md#0001") +} + +func TestSync_SkipsFileIfAnyChunkAlreadyKnown(t *testing.T) { + dir := t.TempDir() + writeNote(t, dir, "wiki/foo.md", "body\n") + + store := &stubStore{known: map[string]struct{}{ + "wiki/foo.md#0001": {}, + }} + emb := stubEmbedder{vec: make([]float32, 768)} + res, err := vectorstore.Sync(context.Background(), dir, store, emb) + require.NoError(t, err) + assert.Equal(t, 0, res.Added) + assert.Empty(t, store.upserts) +} + +func TestSync_DeletesAllChunksOfDisappearedFile(t *testing.T) { + dir := t.TempDir() + require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755)) + store := &stubStore{known: map[string]struct{}{ + "wiki/ghost.md#0001": {}, + "wiki/ghost.md#0002": {}, + "wiki/ghost.md#0003": {}, + }} + res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)}) + require.NoError(t, err) + assert.Equal(t, 3, res.Deleted) } func TestSync_NoOpWhenComponentsNil(t *testing.T) {