// ingestion/internal/pipeline/pipeline.go package pipeline import ( "context" "fmt" "os" "path/filepath" "strings" "time" "github.com/mathiasbq/hyperguild/ingestion/internal/wiki" ) // CompleteFunc is the function signature for LLM calls. type CompleteFunc func(ctx context.Context, system, user string) (string, error) // Config holds pipeline configuration. type Config struct { Complete CompleteFunc ChunkSize int // 0 = no chunking Schema string // overrides brain/schema.md when set (useful in tests) } // Result is the outcome of a pipeline run. type Result struct { Pages []string // relative paths written (or would-be written in dry-run) Warnings []string } // Run ingests content and writes structured wiki pages to brainDir/wiki/. // In dry-run mode, pages are returned but not written to disk. func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) { inventory, err := wiki.LoadInventory(brainDir) if err != nil { return Result{}, fmt.Errorf("load inventory: %w", err) } schema := cfg.Schema if schema == "" { schema = loadSchema(brainDir) } sourceSlug := wiki.Slug(source) date := time.Now().UTC().Format("2006-01-02") chunks := Chunk(content, cfg.ChunkSize) var allRaw []RawPage var allWarnings []string for _, chunk := range chunks { userPrompt := BuildPrompt(schema, source, chunk, inventory) output, err := cfg.Complete(ctx, systemPrompt, userPrompt) if err != nil { return Result{}, fmt.Errorf("LLM call: %w", err) } raw, warnings := ParseRawPages(output) allRaw = append(allRaw, raw...) allWarnings = append(allWarnings, warnings...) } return buildAndWrite(allRaw, sourceSlug, date, brainDir, source, inventory, allWarnings, dryRun) } // RunRaw runs the pipeline on pre-parsed RawPages, skipping the LLM extraction // step. Use this when the caller has already produced the structured RawPage data // (e.g. from a more capable model or manual curation). func RunRaw(brainDir, source string, rawPages []RawPage, dryRun bool) (Result, error) { inventory, err := wiki.LoadInventory(brainDir) if err != nil { return Result{}, fmt.Errorf("load inventory: %w", err) } sourceSlug := wiki.Slug(source) date := time.Now().UTC().Format("2006-01-02") return buildAndWrite(rawPages, sourceSlug, date, brainDir, source, inventory, nil, dryRun) } // buildAndWrite runs BuildPages through write for both Run and RunRaw. func buildAndWrite(rawPages []RawPage, sourceSlug, date, brainDir, source string, inventory map[wiki.PageType][]wiki.Entry, warnings []string, dryRun bool) (Result, error) { pages, buildWarnings := BuildPages(rawPages, sourceSlug, date) warnings = append(warnings, buildWarnings...) resolved := Resolve(pages, inventory) canonicalized, linkWarnings := CanonicalizeLinks(resolved, inventory) warnings = append(warnings, linkWarnings...) withRefs := injectSourceRefs(canonicalized, inventory, brainDir) merged := mergeAll(withRefs) var written []string for _, page := range merged { if !dryRun { dest := filepath.Join(brainDir, filepath.FromSlash(page.Path)) if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil { return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err) } if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil { return Result{}, fmt.Errorf("write %s: %w", page.Path, err) } } written = append(written, page.Path) } if !dryRun { if err := wiki.RebuildIndex(brainDir, date); err != nil { warnings = append(warnings, fmt.Sprintf("rebuild index: %v", err)) } if err := wiki.AppendLog(brainDir, source, written, warnings, date); err != nil { warnings = append(warnings, fmt.Sprintf("append log: %v", err)) } } return Result{Pages: written, Warnings: warnings}, nil } // mergeAll deduplicates pages by path, merging content from later occurrences. func mergeAll(pages []wiki.Page) []wiki.Page { order := make([]string, 0, len(pages)) byPath := make(map[string]wiki.Page, len(pages)) for _, p := range pages { if _, seen := byPath[p.Path]; !seen { order = append(order, p.Path) byPath[p.Path] = p } else { byPath[p.Path] = wiki.Merge(byPath[p.Path], p) } } result := make([]wiki.Page, 0, len(order)) for _, path := range order { result = append(result, byPath[path]) } return result } const defaultSchema = `# Brain Wiki Schema Three page types: wiki/sources/, wiki/concepts/, wiki/entities/. See brain/schema.md for the full schema. ` func loadSchema(brainDir string) string { b, err := os.ReadFile(filepath.Join(brainDir, "schema.md")) if err != nil { return defaultSchema } return strings.TrimSpace(string(b)) }