Files
hyperguild/ingestion/internal/pipeline/pipeline.go
2026-04-23 16:36:22 +02:00

123 lines
3.4 KiB
Go

// ingestion/internal/pipeline/pipeline.go
package pipeline
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
// CompleteFunc is the function signature for LLM calls.
type CompleteFunc func(ctx context.Context, system, user string) (string, error)
// Config holds pipeline configuration.
type Config struct {
Complete CompleteFunc
ChunkSize int // 0 = no chunking
Schema string // overrides brain/schema.md when set (useful in tests)
}
// Result is the outcome of a pipeline run.
type Result struct {
Pages []string // relative paths written (or would-be written in dry-run)
Warnings []string
}
// Run ingests content and writes structured wiki pages to brainDir/wiki/.
// In dry-run mode, pages are returned but not written to disk.
func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) {
inventory, err := wiki.LoadInventory(brainDir)
if err != nil {
return Result{}, fmt.Errorf("load inventory: %w", err)
}
schema := cfg.Schema
if schema == "" {
schema = loadSchema(brainDir)
}
chunks := Chunk(content, cfg.ChunkSize)
var allPages []wiki.Page
var allWarnings []string
for _, chunk := range chunks {
userPrompt := BuildPrompt(schema, source, chunk, inventory)
output, err := cfg.Complete(ctx, systemPrompt, userPrompt)
if err != nil {
return Result{}, fmt.Errorf("LLM call: %w", err)
}
pages, warnings := ParsePages(output)
allPages = append(allPages, pages...)
allWarnings = append(allWarnings, warnings...)
}
resolved := Resolve(allPages, inventory)
withRefs := injectSourceRefs(resolved, inventory, brainDir)
merged := mergeAll(withRefs)
date := time.Now().UTC().Format("2006-01-02")
var written []string
for _, page := range merged {
if !dryRun {
dest := filepath.Join(brainDir, filepath.FromSlash(page.Path))
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err)
}
if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil {
return Result{}, fmt.Errorf("write %s: %w", page.Path, err)
}
}
written = append(written, page.Path)
}
if !dryRun {
if err := wiki.RebuildIndex(brainDir, date); err != nil {
allWarnings = append(allWarnings, fmt.Sprintf("rebuild index: %v", err))
}
if err := wiki.AppendLog(brainDir, source, written, allWarnings, date); err != nil {
allWarnings = append(allWarnings, fmt.Sprintf("append log: %v", err))
}
}
return Result{Pages: written, Warnings: allWarnings}, nil
}
// mergeAll deduplicates pages by path, merging content from later occurrences.
func mergeAll(pages []wiki.Page) []wiki.Page {
order := make([]string, 0, len(pages))
byPath := make(map[string]wiki.Page, len(pages))
for _, p := range pages {
if _, seen := byPath[p.Path]; !seen {
order = append(order, p.Path)
byPath[p.Path] = p
} else {
byPath[p.Path] = wiki.Merge(byPath[p.Path], p)
}
}
result := make([]wiki.Page, 0, len(order))
for _, path := range order {
result = append(result, byPath[path])
}
return result
}
const defaultSchema = `# Brain Wiki Schema
Three page types: wiki/sources/, wiki/concepts/, wiki/entities/.
See brain/schema.md for the full schema.
`
func loadSchema(brainDir string) string {
b, err := os.ReadFile(filepath.Join(brainDir, "schema.md"))
if err != nil {
return defaultSchema
}
return strings.TrimSpace(string(b))
}