feat(ingestion): add pipeline orchestrator with prompt builder
Adds prompt.go (BuildPrompt + systemPrompt) and pipeline.go (Run, Config, Result, mergeAll) that wire chunking, LLM calls, parse, merge, index rebuild, and log append into a single ingestion pipeline. Includes integration tests covering write, dry-run, and duplicate-path merge scenarios. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
120
ingestion/internal/pipeline/pipeline.go
Normal file
120
ingestion/internal/pipeline/pipeline.go
Normal file
@@ -0,0 +1,120 @@
|
||||
// ingestion/internal/pipeline/pipeline.go
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
|
||||
)
|
||||
|
||||
// CompleteFunc is the function signature for LLM calls.
|
||||
type CompleteFunc func(ctx context.Context, system, user string) (string, error)
|
||||
|
||||
// Config holds pipeline configuration.
|
||||
type Config struct {
|
||||
Complete CompleteFunc
|
||||
ChunkSize int // 0 = no chunking
|
||||
Schema string // overrides brain/CLAUDE.md when set (useful in tests)
|
||||
}
|
||||
|
||||
// Result is the outcome of a pipeline run.
|
||||
type Result struct {
|
||||
Pages []string // relative paths written (or would-be written in dry-run)
|
||||
Warnings []string
|
||||
}
|
||||
|
||||
// Run ingests content and writes structured wiki pages to brainDir/wiki/.
|
||||
// In dry-run mode, pages are returned but not written to disk.
|
||||
func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) {
|
||||
inventory, err := wiki.LoadInventory(brainDir)
|
||||
if err != nil {
|
||||
return Result{}, fmt.Errorf("load inventory: %w", err)
|
||||
}
|
||||
|
||||
schema := cfg.Schema
|
||||
if schema == "" {
|
||||
schema = loadSchema(brainDir)
|
||||
}
|
||||
|
||||
chunks := Chunk(content, cfg.ChunkSize)
|
||||
|
||||
var allPages []wiki.Page
|
||||
var allWarnings []string
|
||||
|
||||
for _, chunk := range chunks {
|
||||
userPrompt := BuildPrompt(schema, source, chunk, inventory)
|
||||
output, err := cfg.Complete(ctx, systemPrompt, userPrompt)
|
||||
if err != nil {
|
||||
return Result{}, fmt.Errorf("LLM call: %w", err)
|
||||
}
|
||||
pages, warnings := ParsePages(output)
|
||||
allPages = append(allPages, pages...)
|
||||
allWarnings = append(allWarnings, warnings...)
|
||||
}
|
||||
|
||||
merged := mergeAll(allPages)
|
||||
|
||||
date := time.Now().UTC().Format("2006-01-02")
|
||||
var written []string
|
||||
|
||||
for _, page := range merged {
|
||||
if !dryRun {
|
||||
dest := filepath.Join(brainDir, filepath.FromSlash(page.Path))
|
||||
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
|
||||
return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err)
|
||||
}
|
||||
if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil {
|
||||
return Result{}, fmt.Errorf("write %s: %w", page.Path, err)
|
||||
}
|
||||
}
|
||||
written = append(written, page.Path)
|
||||
}
|
||||
|
||||
if !dryRun {
|
||||
if err := wiki.RebuildIndex(brainDir, date); err != nil {
|
||||
allWarnings = append(allWarnings, fmt.Sprintf("rebuild index: %v", err))
|
||||
}
|
||||
if err := wiki.AppendLog(brainDir, source, written, allWarnings, date); err != nil {
|
||||
allWarnings = append(allWarnings, fmt.Sprintf("append log: %v", err))
|
||||
}
|
||||
}
|
||||
|
||||
return Result{Pages: written, Warnings: allWarnings}, nil
|
||||
}
|
||||
|
||||
// mergeAll deduplicates pages by path, merging content from later occurrences.
|
||||
func mergeAll(pages []wiki.Page) []wiki.Page {
|
||||
order := make([]string, 0, len(pages))
|
||||
byPath := make(map[string]wiki.Page, len(pages))
|
||||
for _, p := range pages {
|
||||
if _, seen := byPath[p.Path]; !seen {
|
||||
order = append(order, p.Path)
|
||||
byPath[p.Path] = p
|
||||
} else {
|
||||
byPath[p.Path] = wiki.Merge(byPath[p.Path], p)
|
||||
}
|
||||
}
|
||||
result := make([]wiki.Page, 0, len(order))
|
||||
for _, path := range order {
|
||||
result = append(result, byPath[path])
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
const defaultSchema = `# Brain Wiki Schema
|
||||
Three page types: wiki/sources/, wiki/concepts/, wiki/entities/.
|
||||
See brain/CLAUDE.md for the full schema.
|
||||
`
|
||||
|
||||
func loadSchema(brainDir string) string {
|
||||
b, err := os.ReadFile(filepath.Join(brainDir, "CLAUDE.md"))
|
||||
if err != nil {
|
||||
return defaultSchema
|
||||
}
|
||||
return strings.TrimSpace(string(b))
|
||||
}
|
||||
130
ingestion/internal/pipeline/pipeline_test.go
Normal file
130
ingestion/internal/pipeline/pipeline_test.go
Normal file
@@ -0,0 +1,130 @@
|
||||
// ingestion/internal/pipeline/pipeline_test.go
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
|
||||
)
|
||||
|
||||
func TestRun_WritesPages(t *testing.T) {
|
||||
brainDir := t.TempDir()
|
||||
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
|
||||
}
|
||||
|
||||
llmResponse := mustJSON([]wiki.Page{
|
||||
{
|
||||
Path: "wiki/sources/test-article.md",
|
||||
Content: "---\ntitle: Test Article\ntype: article\ndomain: software-engineering\ndate_ingested: 2026-04-22\nlast_updated: 2026-04-22\naliases:\n - Test Article\n---\n\n## Summary\n\nA test article.\n\n## Key Claims\n\n- It tests things.\n\n## Concepts Introduced or Reinforced\n\n## Entities Mentioned\n\n## Open Questions Raised\n",
|
||||
},
|
||||
{
|
||||
Path: "wiki/concepts/testing.md",
|
||||
Content: "---\ntitle: Testing\ndomain: software-engineering\nlast_updated: 2026-04-22\naliases:\n - Testing\n---\n\n## Definition\n\nThe practice of verifying software.\n\n## Why It Matters\n\nCatches bugs.\n\n## Related Concepts\n\n## Related Entities\n\n## Sources\n\n## Evolving Notes\n",
|
||||
},
|
||||
})
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"choices": []map[string]any{
|
||||
{"message": map[string]any{"role": "assistant", "content": llmResponse}},
|
||||
},
|
||||
})
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
cfg := Config{
|
||||
Complete: llm.New(srv.URL, "", "test-model", 30*time.Second).Complete,
|
||||
ChunkSize: 0,
|
||||
}
|
||||
|
||||
result, err := Run(context.Background(), cfg, brainDir, "An article about testing.", "test-article", false)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, result.Pages, 2)
|
||||
assert.Empty(t, result.Warnings)
|
||||
|
||||
_, err = os.Stat(filepath.Join(brainDir, "wiki", "sources", "test-article.md"))
|
||||
require.NoError(t, err)
|
||||
_, err = os.Stat(filepath.Join(brainDir, "wiki", "concepts", "testing.md"))
|
||||
require.NoError(t, err)
|
||||
_, err = os.Stat(filepath.Join(brainDir, "wiki", "index.md"))
|
||||
require.NoError(t, err)
|
||||
_, err = os.Stat(filepath.Join(brainDir, "log.md"))
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestRun_DryRunDoesNotWrite(t *testing.T) {
|
||||
brainDir := t.TempDir()
|
||||
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
|
||||
}
|
||||
|
||||
llmResponse := mustJSON([]wiki.Page{{
|
||||
Path: "wiki/sources/foo.md",
|
||||
Content: "---\ntitle: Foo\n---\n\n## Summary\n\nFoo.\n",
|
||||
}})
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"choices": []map[string]any{{"message": map[string]any{"content": llmResponse}}},
|
||||
})
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
cfg := Config{Complete: llm.New(srv.URL, "", "m", 30*time.Second).Complete}
|
||||
result, err := Run(context.Background(), cfg, brainDir, "foo content", "foo", true)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, result.Pages, 1)
|
||||
|
||||
_, err = os.Stat(filepath.Join(brainDir, "wiki", "sources", "foo.md"))
|
||||
assert.True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
func TestRun_MergesDuplicatePaths(t *testing.T) {
|
||||
brainDir := t.TempDir()
|
||||
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
|
||||
}
|
||||
|
||||
// LLM returns same path twice (simulates multi-chunk merge)
|
||||
llmResponse := mustJSON([]wiki.Page{
|
||||
{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nFirst.\n\n## Related Concepts\n\n- [[bar|Bar]]\n"},
|
||||
{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nSecond.\n\n## Related Concepts\n\n- [[baz|Baz]]\n"},
|
||||
})
|
||||
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
json.NewEncoder(w).Encode(map[string]any{
|
||||
"choices": []map[string]any{{"message": map[string]any{"content": llmResponse}}},
|
||||
})
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
cfg := Config{Complete: llm.New(srv.URL, "", "m", 30*time.Second).Complete}
|
||||
result, err := Run(context.Background(), cfg, brainDir, "content", "foo", false)
|
||||
require.NoError(t, err)
|
||||
assert.Len(t, result.Pages, 1) // deduplicated
|
||||
|
||||
content, err := os.ReadFile(filepath.Join(brainDir, "wiki", "concepts", "foo.md"))
|
||||
require.NoError(t, err)
|
||||
// keep-first for Definition, union for Related Concepts
|
||||
assert.Contains(t, string(content), "First.")
|
||||
assert.Contains(t, string(content), "[[bar|Bar]]")
|
||||
assert.Contains(t, string(content), "[[baz|Baz]]")
|
||||
}
|
||||
|
||||
func mustJSON(v any) string {
|
||||
b, _ := json.Marshal(v)
|
||||
return string(b)
|
||||
}
|
||||
60
ingestion/internal/pipeline/prompt.go
Normal file
60
ingestion/internal/pipeline/prompt.go
Normal file
@@ -0,0 +1,60 @@
|
||||
// ingestion/internal/pipeline/prompt.go
|
||||
package pipeline
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
|
||||
)
|
||||
|
||||
const systemPrompt = `You are a wiki agent. Read the source material and produce structured wiki pages following the schema provided.
|
||||
|
||||
Output ONLY a valid JSON array — no markdown fences, no other text before or after.
|
||||
Each element must have:
|
||||
"path" — relative path within the wiki, e.g. "wiki/sources/foo.md"
|
||||
"content" — full markdown content of the page including YAML frontmatter
|
||||
|
||||
Follow the schema strictly: correct frontmatter fields, wikilinks as [[slug|Display Text]],
|
||||
dates in YYYY-MM-DD format, and paraphrase rather than quoting verbatim.`
|
||||
|
||||
// BuildPrompt constructs the user prompt for a single chunk.
|
||||
func BuildPrompt(schema, source, content string, inventory map[wiki.PageType][]wiki.Entry) string {
|
||||
var sb strings.Builder
|
||||
|
||||
fmt.Fprintf(&sb, "Today's date is %s.\n\n", time.Now().UTC().Format("2006-01-02"))
|
||||
|
||||
sb.WriteString("## Schema\n\n")
|
||||
sb.WriteString(schema)
|
||||
sb.WriteString("\n\n")
|
||||
|
||||
sb.WriteString("## Existing wiki pages\n\n")
|
||||
sb.WriteString("Link ONLY to pages in this inventory or pages you are creating in this response.\n\n")
|
||||
|
||||
for _, pt := range []wiki.PageType{wiki.PageTypeConcept, wiki.PageTypeEntity, wiki.PageTypeSource} {
|
||||
entries := inventory[pt]
|
||||
label := strings.ToUpper(string(pt)[:1]) + string(pt)[1:]
|
||||
if len(entries) == 0 {
|
||||
fmt.Fprintf(&sb, "%s — (none yet)\n\n", label)
|
||||
continue
|
||||
}
|
||||
fmt.Fprintf(&sb, "%s — link ONLY under the matching section:\n", label)
|
||||
for _, e := range entries {
|
||||
fmt.Fprintf(&sb, " - [[%s|%s]]\n", e.Slug, e.Title)
|
||||
}
|
||||
sb.WriteString("\n")
|
||||
}
|
||||
|
||||
sb.WriteString("## Non-negotiable rules\n\n")
|
||||
sb.WriteString("1. Output ONLY a valid JSON array — no prose, no fences.\n")
|
||||
sb.WriteString("2. Slugs are kebab-case: lowercase, spaces→hyphens, no special chars.\n")
|
||||
sb.WriteString("3. Wikilinks: [[slug|Display Text]] — the pipe is required.\n")
|
||||
sb.WriteString("4. Section links must match their section type.\n")
|
||||
sb.WriteString("5. One source page per book — update it if inventory shows it exists.\n\n")
|
||||
|
||||
fmt.Fprintf(&sb, "## Source: %s\n\n", source)
|
||||
sb.WriteString(content)
|
||||
|
||||
return sb.String()
|
||||
}
|
||||
Reference in New Issue
Block a user