// ingestion/internal/pipeline/pipeline.go package pipeline import ( "context" "fmt" "os" "path/filepath" "strings" "time" "github.com/mathiasbq/hyperguild/ingestion/internal/wiki" ) // CompleteFunc is the function signature for LLM calls. type CompleteFunc func(ctx context.Context, system, user string) (string, error) // Config holds pipeline configuration. type Config struct { Complete CompleteFunc ChunkSize int // 0 = no chunking Schema string // overrides brain/schema.md when set (useful in tests) } // Result is the outcome of a pipeline run. type Result struct { Pages []string // relative paths written (or would-be written in dry-run) Warnings []string } // Run ingests content and writes structured wiki pages to brainDir/wiki/. // In dry-run mode, pages are returned but not written to disk. func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) { inventory, err := wiki.LoadInventory(brainDir) if err != nil { return Result{}, fmt.Errorf("load inventory: %w", err) } schema := cfg.Schema if schema == "" { schema = loadSchema(brainDir) } chunks := Chunk(content, cfg.ChunkSize) var allPages []wiki.Page var allWarnings []string for _, chunk := range chunks { userPrompt := BuildPrompt(schema, source, chunk, inventory) output, err := cfg.Complete(ctx, systemPrompt, userPrompt) if err != nil { return Result{}, fmt.Errorf("LLM call: %w", err) } pages, warnings := ParsePages(output) allPages = append(allPages, pages...) allWarnings = append(allWarnings, warnings...) } merged := mergeAll(allPages) date := time.Now().UTC().Format("2006-01-02") var written []string for _, page := range merged { if !dryRun { dest := filepath.Join(brainDir, filepath.FromSlash(page.Path)) if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil { return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err) } if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil { return Result{}, fmt.Errorf("write %s: %w", page.Path, err) } } written = append(written, page.Path) } if !dryRun { if err := wiki.RebuildIndex(brainDir, date); err != nil { allWarnings = append(allWarnings, fmt.Sprintf("rebuild index: %v", err)) } if err := wiki.AppendLog(brainDir, source, written, allWarnings, date); err != nil { allWarnings = append(allWarnings, fmt.Sprintf("append log: %v", err)) } } return Result{Pages: written, Warnings: allWarnings}, nil } // mergeAll deduplicates pages by path, merging content from later occurrences. func mergeAll(pages []wiki.Page) []wiki.Page { order := make([]string, 0, len(pages)) byPath := make(map[string]wiki.Page, len(pages)) for _, p := range pages { if _, seen := byPath[p.Path]; !seen { order = append(order, p.Path) byPath[p.Path] = p } else { byPath[p.Path] = wiki.Merge(byPath[p.Path], p) } } result := make([]wiki.Page, 0, len(order)) for _, path := range order { result = append(result, byPath[path]) } return result } const defaultSchema = `# Brain Wiki Schema Three page types: wiki/sources/, wiki/concepts/, wiki/entities/. See brain/schema.md for the full schema. ` func loadSchema(brainDir string) string { b, err := os.ReadFile(filepath.Join(brainDir, "schema.md")) if err != nil { return defaultSchema } return strings.TrimSpace(string(b)) }