20 Commits

Author SHA1 Message Date
Mathias Bergqvist
e74320a8e8 feat(ingestion): wire watcher into server startup + fix Procfile env vars
Some checks failed
cd / Build and deploy (push) Successful in 10s
CI / Lint / Test / Vet (push) Failing after 5s
CI / Mirror to GitHub (push) Has been skipped
- Start background watcher on startup when INGEST_WATCH_INTERVAL > 0
- Procfile: add INGEST_WATCH_INTERVAL=30 and INGEST_SVC_URL for supervisor

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 23:09:00 +02:00
Mathias Bergqvist
1b0706f270 chore(brain): rename CLAUDE.md to schema.md for clarity
CLAUDE.md has a specific meaning in the Claude Code ecosystem (agent
instructions). The wiki schema for the ingestion pipeline should live
in schema.md to avoid confusion.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 23:06:32 +02:00
Mathias Bergqvist
2ae6bfe81e fix(brain): enforce mutual exclusivity and clarify brain_ingest schema
- Return error when both path and content are supplied simultaneously
- Improve tool description to clearly state the two valid call forms
- Add per-field descriptions so LLMs understand what each parameter requires

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 23:03:03 +02:00
Mathias Bergqvist
a6dce972d6 feat(brain): add path field to brain_ingest for /ingest-path routing
Adds an optional path field to brain_ingest so Claude can ingest files
or directories directly by path without embedding content in the call.
Routing: path set → /ingest-path; content+source set → /ingest; neither → error.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 23:01:05 +02:00
Mathias Bergqvist
2f4b577131 fix(ingestion): address code review issues in api and watcher packages
- Strip internal error detail from 500 responses (leak prevention)
- Add path containment assertion in /write handler
- Use Go 1.22 method-prefixed mux routes for automatic 405 responses
- Clarify watch_interval log when watcher not yet wired
- Consolidate validation tests into table-driven TestIngest_Validation
- Watcher: return nil after successful quarantine to avoid double-logging
- Watcher: append timestamp suffix to processed dest if file already exists

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 22:59:39 +02:00
Mathias Bergqvist
a25bb18c54 feat(ingestion): add /ingest and /ingest-path HTTP handlers
Wires pipeline.Run into the HTTP layer so callers can ingest raw text
or files/directories without touching the filesystem directly. Rewrites
main.go to parse LLM and watcher env vars and build pipeline.Config.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 22:54:28 +02:00
Mathias Bergqvist
78531bb238 feat(ingestion): add background file watcher for brain/raw/
Polls brain/raw/ on a configurable ticker, derives human-readable source
names from filenames, runs the pipeline, and moves files to
processed/YYYY-MM-DD/ on success or failed/ on error with a log.md entry.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 22:54:03 +02:00
Mathias Bergqvist
04fefe8e9c fix(ingestion): wrap naked error returns and harden mustJSON helper
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 22:51:19 +02:00
Mathias Bergqvist
103f4d90bf feat(ingestion): add pipeline orchestrator with prompt builder
Adds prompt.go (BuildPrompt + systemPrompt) and pipeline.go (Run, Config,
Result, mergeAll) that wire chunking, LLM calls, parse, merge, index rebuild,
and log append into a single ingestion pipeline. Includes integration tests
covering write, dry-run, and duplicate-path merge scenarios.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-22 22:45:19 +02:00
Mathias Bergqvist
9b11719481 feat(ingestion): add content chunking and LLM JSON output parser 2026-04-22 22:37:14 +02:00
Mathias Bergqvist
d405346f07 feat(ingestion): add wiki index rebuilder and audit log 2026-04-22 22:36:55 +02:00
Mathias Bergqvist
bf8a3fc11c feat(ingestion): add OpenAI-compatible LLM HTTP client with 429 retry 2026-04-22 22:29:24 +02:00
Mathias Bergqvist
ae5a4d04f0 feat(ingestion): add wiki page merge logic 2026-04-22 22:28:55 +02:00
Mathias Bergqvist
3a0424a6b4 feat(ingestion): add wiki inventory loader 2026-04-22 22:28:53 +02:00
Mathias Bergqvist
08dd7b9365 docs(brain): add wiki schema document for ingest prompt 2026-04-22 22:25:52 +02:00
Mathias Bergqvist
91e02b930c feat(ingestion): add wiki package with Page types and slug generation 2026-04-22 22:25:45 +02:00
Mathias Bergqvist
c7341a2607 feat(config): add IngestSvcURL and KBRetrievalURL to supervisor config 2026-04-22 22:24:27 +02:00
Mathias Bergqvist
b5a0085c0a feat(brain): add brain_ingest, brain_search tools and extend search to wiki/ 2026-04-22 22:16:02 +02:00
Mathias Bergqvist
d6daa37c71 docs: add brain ingestion pipeline implementation plan 2026-04-22 22:14:59 +02:00
Mathias Bergqvist
62fc3989f2 docs: add brain ingestion pipeline design spec 2026-04-22 22:05:19 +02:00
35 changed files with 5250 additions and 68 deletions

1
.gitignore vendored
View File

@@ -34,6 +34,7 @@ secrets/
# ── Documented examples (commit these) ── # ── Documented examples (commit these) ──
!.env.example !.env.example
!config/supervisor/CLAUDE.md !config/supervisor/CLAUDE.md
!brain/CLAUDE.md
# IDE # IDE
.idea/ .idea/

View File

@@ -1,2 +1,2 @@
ingestion: cd ingestion && INGEST_BRAIN_DIR=../brain INGEST_PORT=3300 go run ./cmd/server/ ingestion: cd ingestion && INGEST_BRAIN_DIR=../brain INGEST_PORT=3300 INGEST_WATCH_INTERVAL=30 go run ./cmd/server/
supervisor: SUPERVISOR_CONFIG_DIR=./config/supervisor SUPERVISOR_MODELS_FILE=./config/models.yaml SUPERVISOR_SESSIONS_DIR=./brain/sessions INGEST_BASE_URL=http://localhost:3300 go run ./cmd/supervisor/ supervisor: SUPERVISOR_CONFIG_DIR=./config/supervisor SUPERVISOR_MODELS_FILE=./config/models.yaml SUPERVISOR_SESSIONS_DIR=./brain/sessions INGEST_BASE_URL=http://localhost:3300 INGEST_SVC_URL=http://localhost:3300 go run ./cmd/supervisor/

154
brain/schema.md Normal file
View File

@@ -0,0 +1,154 @@
# Brain Wiki Schema
This document defines the three page types in the brain wiki.
The LLM must follow this schema exactly when generating wiki pages.
## Wikilink Format
All cross-references use `[[slug|Display Text]]`.
Rules:
- slug = lowercase filename without .md, spaces → hyphens, strip all non-alphanumeric except hyphens
- The `|` separator is REQUIRED — never use `[[Title]]` without a slug
- Examples: `[[domain-driven-design|Domain Driven Design]]`, `[[ryan-singer|Ryan Singer]]`
- Slugs must resolve to an existing file in the inventory, or a file you are creating in this response
Slug generation examples:
- "Domain Driven Design" → `domain-driven-design`
- "It's Complicated" → `its-complicated`
- "gRPC" → `grpc`
- "GPT-4o" → `gpt-4o`
## Domains
Use one of: `ai-llm`, `software-engineering`, `product-strategy`, `finance-markets`,
`personal`, `consulting`, `climate`, `infrastructure`, `security`
---
## Source Pages — wiki/sources/<slug>.md
One page per ingested source. Books are NEVER split across multiple source pages — update the existing one.
Required frontmatter:
```yaml
title: <exact title>
type: article | pdf | book | video | note | project
domain: <domain>
date_ingested: YYYY-MM-DD
last_updated: YYYY-MM-DD
aliases:
- <exact title>
```
Body sections (in this order):
### Summary
23 sentences. Core argument or finding.
### Key Claims
Bulleted list. Paraphrase — no verbatim quotes or code.
### Concepts Introduced or Reinforced
Wikilinks to wiki/concepts/ ONLY. One per line.
### Entities Mentioned
Wikilinks to wiki/entities/ ONLY. One per line.
### Open Questions Raised
Gaps or follow-up questions from this source.
For books only, also add:
### Chapters
One bullet per chapter with 12 sentence summary.
### Argument Arc
Overall narrative as it becomes clear across chapters.
### Updates
Dated entries appended on re-ingestion. NEVER rewrite — only append.
---
## Concept Pages — wiki/concepts/<slug>.md
One page per idea, framework, methodology, or pattern.
Required frontmatter:
```yaml
title: <concept name>
domain: <domain>
last_updated: YYYY-MM-DD
aliases:
- <exact title>
```
Body sections (in this order):
### Definition
One-paragraph plain-language explanation.
### Why It Matters
Practical significance. Why should anyone care?
### Related Concepts
Wikilinks to wiki/concepts/ ONLY.
### Related Entities
Wikilinks to wiki/entities/ ONLY.
### Sources
Wikilinks to wiki/sources/ ONLY.
### Evolving Notes
Updated as new sources arrive. Append, do not rewrite.
---
## Entity Pages — wiki/entities/<slug>.md
One page per person, tool, organisation, technology, or product.
Required frontmatter:
```yaml
title: <name>
type: person | company | tool | model | framework | technology
domain: <domain>
last_updated: YYYY-MM-DD
aliases:
- <exact title>
```
Body sections (in this order):
### Description
One-line description.
### Relevance
Why this entity matters to this knowledge base.
### Key Positions, Products, or Claims
With dates where known.
### Related Concepts
Wikilinks to wiki/concepts/ ONLY.
### Related Entities
Wikilinks to wiki/entities/ ONLY.
### Sources
Wikilinks to wiki/sources/ ONLY.
---
## Non-Negotiable Rules
1. Output ONLY a valid JSON array — no markdown fences, no prose before or after
2. Each element: `{"path": "wiki/<type>/<slug>.md", "content": "...full markdown..."}`
3. Slugs are kebab-case: lowercase, spaces→hyphens, strip special characters
4. Every wikilink must be `[[slug|Display Text]]` — the pipe separator is required
5. Dates always YYYY-MM-DD
6. Never reproduce verbatim code — describe the pattern or technique
7. Section links must match their section type (Related Concepts → concepts/ only, etc.)
8. One source page per book — if inventory shows it exists, include it as an UPDATE

View File

@@ -105,6 +105,8 @@ func main() {
})) }))
reg.Register(brain.New(brain.Config{ reg.Register(brain.New(brain.Config{
IngestBaseURL: cfg.IngestBaseURL, IngestBaseURL: cfg.IngestBaseURL,
IngestSvcURL: cfg.IngestSvcURL,
KBRetrievalURL: cfg.KBRetrievalURL,
})) }))
reg.Register(org.New(org.Config{ reg.Register(org.New(org.Config{
TierFn: tierFn, TierFn: tierFn,

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,240 @@
# Brain Ingestion Pipeline — Design Spec
**Date:** 2026-04-22
**Status:** approved
**Author:** Mathias + Claude
---
## Overview
Add a structured ingestion pipeline to the hyperguild brain. The pipeline accepts raw content (directly or from files) and uses an LLM to produce structured wiki pages in `brain/wiki/` — the declarative layer of the Two-Layer Brain. Three fixed knowledge classes: **concepts**, **entities**, **sources**.
This spec covers:
- Three new packages in the `ingestion` Go module (`llm`, `wiki`, `pipeline`, `watcher`)
- Two new HTTP endpoints on the ingestion server (`/ingest`, `/ingest-path`)
- A background file watcher for `brain/raw/`
- Config additions to both the ingestion server and the supervisor
It does **not** cover Layer 2 (training data, `brain/training-data/`) — that is the trainer worker's concern.
---
## Information Model
Three fixed wiki page classes, matching the Two-Layer Brain design spec and the existing `ingestion-svc` model:
### `wiki/sources/<slug>.md`
One page per ingested source (project, book, article, note). Updated (not replaced) on re-ingestion.
Required frontmatter: `title`, `type` (article|pdf|book|video|note|project), `domain`, `source_url`, `date_ingested`, `last_updated`, `aliases`.
Body sections: Summary · Key Claims · Concepts Introduced or Reinforced · Entities Mentioned · Open Questions Raised. Books add: Chapters · Argument Arc · Updates (dated, append-only).
### `wiki/concepts/<slug>.md`
One page per idea, framework, methodology, or pattern (e.g. Domain Driven Design, TDD, event sourcing).
Required frontmatter: `title`, `domain`, `last_updated`, `aliases`.
Body sections: Definition · Why It Matters · Related Concepts · Related Entities · Sources · Evolving Notes.
### `wiki/entities/<slug>.md`
One page per person, tool, organisation, technology, or product.
Required frontmatter: `title`, `type` (person|company|tool|model|framework|technology), `domain`, `last_updated`, `aliases`.
Body sections: Description · Relevance · Key Positions/Products/Claims · Related Concepts · Related Entities · Sources.
### Wikilink format
All cross-references use `[[slug|Display Text]]`. Slug = lowercase title, spaces→hyphens, non-alphanumeric stripped. Slugs must resolve to an existing file in the wiki.
### Supporting files
- `brain/wiki/index.md` — auto-rebuilt on every ingest: one-sentence summary per page, grouped by type
- `brain/log.md` — append-only audit trail: date, source, pages written, warnings
---
## Architecture
### New packages (`ingestion` module)
```
ingestion/internal/
llm/ — OpenAI-compatible HTTP client (chat completions, retry on 429,
configurable timeout and temperature)
wiki/ — Page types, slug utilities, merge logic, inventory loader,
index rebuilder, log appender
pipeline/ — Orchestrates one ingest run end-to-end (content or extracted file text)
watcher/ — Polls brain/raw/ and triggers pipeline on new files
```
The existing `api/` and `search/` packages are updated; no other existing packages change.
### Brain directory layout
```
brain/
wiki/
concepts/ ← LLM-structured concept pages
entities/ ← LLM-structured entity pages
sources/ ← LLM-structured source pages
index.md ← auto-rebuilt on each ingest
knowledge/ ← quick raw notes via brain_write (BM25-searchable, unchanged)
raw/ ← drop zone; watcher picks up files here
processed/ ← moved here on success (organised by date: processed/YYYY-MM-DD/)
failed/ ← moved here on failure
sessions/ ← session logs (retrospective/trainer concern, not touched here)
training-data/ ← Layer 2 (trainer worker concern, not touched here)
log.md ← append-only audit trail
CLAUDE.md ← schema document injected into every ingest prompt
```
If `brain/CLAUDE.md` is absent, the pipeline falls back to an embedded default schema compiled into the binary.
---
## API
### `POST /ingest`
Ingest content provided directly by the caller.
**Request:**
```json
{
"content": "...",
"source": "shape-up-book",
"dry_run": false
}
```
**Response:**
```json
{
"pages": ["wiki/sources/shape-up.md", "wiki/concepts/betting-table.md"],
"warnings": []
}
```
`source` is the human-readable name used when writing/updating `wiki/sources/<slug>.md`. `dry_run: true` returns the page contents without writing.
### `POST /ingest-path`
Ingest a file or walk a directory recursively. Supports `.md`, `.txt`, `.pdf`.
**Request:**
```json
{
"path": "/Users/mathias/brain/raw/shape-up.pdf",
"source": "shape-up-book",
"dry_run": false
}
```
If `path` is a directory, all supported files within it are ingested in sequence. `source` is optional for directory ingestion — if omitted, the LLM derives it from each file's name and content.
**Response:** same shape as `/ingest`, with pages and warnings aggregated across all files.
### Supervisor skill update
`brain_ingest` in `internal/skills/brain/handlers.go` gains an optional `path` field. If `path` is set, it calls `/ingest-path`; otherwise `/ingest`.
---
## Pipeline
`pipeline.Run(ctx, cfg, brainDir, content, source, dryRun)` — called by both HTTP handlers after any file reading is done.
Steps:
1. **Load inventory** — walk `brain/wiki/{concepts,entities,sources}/`, build slug index grouped by type. Injected into prompt so LLM knows what to update vs create.
2. **Load schema** — read `brain/CLAUDE.md`; fall back to embedded default if absent.
3. **Chunk** — split content at `INGEST_CHUNK_SIZE` chars (default 6000; split on paragraph boundary). If `INGEST_CHUNK_SIZE=0`, no chunking.
4. **LLM call per chunk** — returns JSON array of `{"path": "wiki/concepts/foo.md", "content": "..."}`. Prompt structure: system instruction → date → schema → inventory → non-negotiable slug/wikilink rules → source content.
5. **Parse + truncation recovery** — strip markdown fences if present. If JSON array is truncated mid-object (token limit), salvage all complete objects before the break and log a warning.
6. **Merge** — combine pages with the same path across chunks:
- Bullet sections (Related Concepts, Related Entities, Sources, Key Claims): union unique lines
- Append sections (Evolving Notes, Updates, Open Questions): append new content
- All other sections: keep first occurrence
- Frontmatter: keep first occurrence
7. **Write** — create subdirs as needed, write files atomically. In dry-run mode, return page map without writing.
8. **Rebuild `index.md`** — one-sentence summary per page (derived from first body paragraph), grouped by type, with page count header.
9. **Append to `log.md`** — date, source, list of pages written, warning count.
---
## File Watcher
Background goroutine started at server startup (when `INGEST_WATCH_INTERVAL > 0`).
**Poll loop:**
1. Walk `brain/raw/` for files with supported extensions (`.md`, `.txt`, `.pdf`), excluding `processed/` and `failed/` subdirs.
2. For each file found: derive source from filename (strip extension, kebab-to-title), call `pipeline.Run` with the file content.
3. On success: move file to `brain/raw/processed/YYYY-MM-DD/<filename>`.
4. On failure: move file to `brain/raw/failed/<filename>`, append error to `brain/log.md`.
5. Sleep `INGEST_WATCH_INTERVAL` seconds, repeat.
Files are processed one at a time (no concurrency within the watcher) to avoid LLM rate-limit collisions.
---
## LLM Prompt
**System:**
> You are a wiki agent. Read the source material and produce structured wiki pages following the schema provided. Output ONLY a valid JSON array — no markdown fences, no other text. Each element must have: `"path"` (relative path within wiki, e.g. `"wiki/sources/foo.md"`) and `"content"` (full markdown including YAML frontmatter). Follow the schema strictly: correct frontmatter fields, wikilinks as `[[slug|Display Text]]`, dates in YYYY-MM-DD format, paraphrase rather than quoting verbatim.
**User (built dynamically):**
1. Today's date
2. Full schema (`brain/CLAUDE.md` content)
3. Existing wiki inventory grouped by type (for update-vs-create decisions)
4. Non-negotiable rules: slug format, wikilink format, one-source-per-book, section type enforcement
5. Source content (the chunk)
Temperature: 0.2 for reproducibility.
---
## Configuration
### Ingestion server (new env vars)
| Variable | Default | Description |
|---|---|---|
| `INGEST_LLM_URL` | `http://iguana:4000/v1` | OpenAI-compatible endpoint |
| `INGEST_LLM_KEY` | (empty) | API key |
| `INGEST_LLM_MODEL` | `koala/qwen35-9b-fast` | Model name |
| `INGEST_LLM_TIMEOUT` | `15` | LLM call timeout (minutes) |
| `INGEST_CHUNK_SIZE` | `6000` | Max chars per LLM call (0 = no chunking) |
| `INGEST_WATCH_INTERVAL` | `30` | Watcher poll interval in seconds (0 = disabled) |
### Supervisor (new env vars + wiring)
| Variable | Default | Description |
|---|---|---|
| `INGEST_SVC_URL` | (empty) | URL of ingestion server for `brain_ingest` |
| `KB_RETRIEVAL_URL` | (empty) | URL of KB retrieval server for `brain_search` |
`config.go` gets two new fields. `main.go` passes them to `brain.New()`. Both tools are only registered as MCP tools when the respective URL is configured (already implemented in `skill.go`).
---
## Testing
| Package | What is tested |
|---|---|
| `wiki/` | Slug generation (edge cases: apostrophes, colons, version strings), merge logic (bullets union, append, keep-first), inventory loading from temp dir, truncation recovery (valid partial JSON), index rebuild output |
| `pipeline/` | Integration test: temp brain dir + mock LLM HTTP server returning fixture JSON; verify files written to correct paths, index rebuilt, log appended |
| `api/` | Handler tests for `/ingest` and `/ingest-path` using mock pipeline; 400 on missing fields, 200 with expected response shape |
| `watcher/` | File placed in `brain/raw/` is moved to `processed/` on mock-pipeline success; moved to `failed/` on error |
All tests are table-driven. No real LLM calls in tests.
---
## Out of Scope
- Python validation/correction loop (can be added later; the LLM prompt enforces schema rules as non-negotiable instructions)
- `brain/training-data/` — trainer worker concern
- `brain/sessions/` — retrospective/sessionlog concern
- Upload endpoint (multipart HTTP) — `scp`/rsync to `brain/raw/` + watcher covers this
- Qdrant vector indexing — `brain_search` calls a separate KB retrieval service; ingestion does not write to Qdrant

View File

@@ -2,34 +2,86 @@
package main package main
import ( import (
"context"
"fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"strconv"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/api" "github.com/mathiasbq/hyperguild/ingestion/internal/api"
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/watcher"
) )
func envOr(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
func envInt(key string, fallback int) int {
if v := os.Getenv(key); v != "" {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return fallback
}
func main() { func main() {
logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) logger := slog.New(slog.NewJSONHandler(os.Stdout, nil))
brainDir := os.Getenv("INGEST_BRAIN_DIR") brainDir := envOr("INGEST_BRAIN_DIR", "../brain")
if brainDir == "" { port := envOr("INGEST_PORT", "3300")
brainDir = "../brain"
llmURL := envOr("INGEST_LLM_URL", "http://iguana:4000/v1")
llmKey := os.Getenv("INGEST_LLM_KEY")
llmModel := envOr("INGEST_LLM_MODEL", "koala/qwen35-9b-fast")
llmTimeoutMins := envInt("INGEST_LLM_TIMEOUT", 15)
chunkSize := envInt("INGEST_CHUNK_SIZE", 6000)
watchInterval := envInt("INGEST_WATCH_INTERVAL", 30)
llmClient := llm.New(llmURL, llmKey, llmModel, time.Duration(llmTimeoutMins)*time.Minute)
pipelineCfg := pipeline.Config{
Complete: llmClient.Complete,
ChunkSize: chunkSize,
} }
port := os.Getenv("INGEST_PORT") h := api.NewHandler(brainDir, logger, pipelineCfg)
if port == "" {
port = "3300"
}
h := api.NewHandler(brainDir, logger) ctx := context.Background()
if watchInterval > 0 {
watcher.Start(ctx, watcher.Config{
BrainDir: brainDir,
Interval: time.Duration(watchInterval) * time.Second,
Pipeline: pipelineCfg,
})
}
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/query", h.Query) mux.HandleFunc("POST /query", h.Query)
mux.HandleFunc("/write", h.Write) mux.HandleFunc("POST /write", h.Write)
mux.HandleFunc("POST /ingest", h.Ingest)
mux.HandleFunc("POST /ingest-path", h.IngestPath)
addr := ":" + port addr := ":" + port
logger.Info("ingestion server starting", "addr", addr, "brain_dir", brainDir) watchIntervalLog := "disabled"
if watchInterval > 0 {
watchIntervalLog = fmt.Sprintf("%ds", watchInterval)
}
logger.Info("ingestion server starting",
"addr", addr,
"brain_dir", brainDir,
"llm_url", llmURL,
"llm_model", llmModel,
"chunk_size", chunkSize,
"watch_interval", watchIntervalLog,
)
if err := http.ListenAndServe(addr, mux); err != nil { if err := http.ListenAndServe(addr, mux); err != nil {
logger.Error("server stopped", "err", err) logger.Error("server stopped", "err", err)
os.Exit(1) os.Exit(1)

View File

@@ -11,6 +11,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/search" "github.com/mathiasbq/hyperguild/ingestion/internal/search"
) )
@@ -18,11 +19,15 @@ import (
type Handler struct { type Handler struct {
brainDir string brainDir string
logger *slog.Logger logger *slog.Logger
pipeline pipeline.Config
} }
// NewHandler constructs a Handler. brainDir is the absolute path to brain/. // NewHandler constructs a Handler. brainDir is the absolute path to brain/.
func NewHandler(brainDir string, logger *slog.Logger) *Handler { func NewHandler(brainDir string, logger *slog.Logger, pipelineCfg pipeline.Config) *Handler {
return &Handler{brainDir: brainDir, logger: logger} if logger == nil {
logger = slog.Default()
}
return &Handler{brainDir: brainDir, logger: logger, pipeline: pipelineCfg}
} }
type queryRequest struct { type queryRequest struct {
@@ -37,15 +42,32 @@ type writeRequest struct {
Domain string `json:"domain,omitempty"` Domain string `json:"domain,omitempty"`
} }
type ingestRequest struct {
Content string `json:"content"`
Source string `json:"source"`
DryRun bool `json:"dry_run"`
}
type ingestPathRequest struct {
Path string `json:"path"`
Source string `json:"source"`
DryRun bool `json:"dry_run"`
}
type ingestResponse struct {
Pages []string `json:"pages"`
Warnings []string `json:"warnings"`
}
// Query handles POST /query — full-text search across the brain wiki. // Query handles POST /query — full-text search across the brain wiki.
func (h *Handler) Query(w http.ResponseWriter, r *http.Request) { func (h *Handler) Query(w http.ResponseWriter, r *http.Request) {
var req queryRequest var req queryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest) writeError(w, http.StatusBadRequest, "invalid JSON")
return return
} }
if strings.TrimSpace(req.Query) == "" { if strings.TrimSpace(req.Query) == "" {
http.Error(w, "query is required", http.StatusBadRequest) writeError(w, http.StatusBadRequest, "query is required")
return return
} }
if req.Limit == 0 { if req.Limit == 0 {
@@ -55,22 +77,22 @@ func (h *Handler) Query(w http.ResponseWriter, r *http.Request) {
results, err := search.Query(h.brainDir, req.Query, req.Limit) results, err := search.Query(h.brainDir, req.Query, req.Limit)
if err != nil { if err != nil {
h.logger.Error("query failed", "err", err) h.logger.Error("query failed", "err", err)
http.Error(w, "search error", http.StatusInternalServerError) writeError(w, http.StatusInternalServerError, "search error")
return return
} }
writeJSON(w, map[string]any{"results": results}) writeJSON(w, map[string]any{"results": results})
} }
// Write handles POST /write — write raw content to brain/raw/. // Write handles POST /write — write raw content to brain/knowledge/.
func (h *Handler) Write(w http.ResponseWriter, r *http.Request) { func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
var req writeRequest var req writeRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "invalid JSON", http.StatusBadRequest) writeError(w, http.StatusBadRequest, "invalid JSON")
return return
} }
if req.Content == "" { if req.Content == "" {
http.Error(w, "content is required", http.StatusBadRequest) writeError(w, http.StatusBadRequest, "content is required")
return return
} }
@@ -81,7 +103,7 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
rawDir := filepath.Join(h.brainDir, "knowledge") rawDir := filepath.Join(h.brainDir, "knowledge")
if err := os.MkdirAll(rawDir, 0o755); err != nil { if err := os.MkdirAll(rawDir, 0o755); err != nil {
http.Error(w, "failed to create raw dir", http.StatusInternalServerError) writeError(w, http.StatusInternalServerError, "failed to create raw dir")
return return
} }
@@ -104,9 +126,13 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
base += ".md" base += ".md"
} }
dest := filepath.Join(rawDir, base) dest := filepath.Join(rawDir, base)
if !strings.HasPrefix(filepath.Clean(dest)+string(os.PathSeparator), filepath.Clean(rawDir)+string(os.PathSeparator)) {
writeError(w, http.StatusBadRequest, "invalid filename")
return
}
if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil { if err := os.WriteFile(dest, []byte(finalContent), 0o644); err != nil {
h.logger.Error("write failed", "err", err) h.logger.Error("write failed", "err", err)
http.Error(w, "write error", http.StatusInternalServerError) writeError(w, http.StatusInternalServerError, "write error")
return return
} }
@@ -114,7 +140,144 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
writeJSON(w, map[string]string{"path": filepath.ToSlash(rel)}) writeJSON(w, map[string]string{"path": filepath.ToSlash(rel)})
} }
// Ingest handles POST /ingest — run the pipeline on provided content.
func (h *Handler) Ingest(w http.ResponseWriter, r *http.Request) {
var req ingestRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid JSON")
return
}
if strings.TrimSpace(req.Content) == "" {
writeError(w, http.StatusBadRequest, "content is required")
return
}
if strings.TrimSpace(req.Source) == "" {
writeError(w, http.StatusBadRequest, "source is required")
return
}
result, err := pipeline.Run(r.Context(), h.pipeline, h.brainDir, req.Content, req.Source, req.DryRun)
if err != nil {
h.logger.Error("ingest failed", "source", req.Source, "err", err)
writeError(w, http.StatusInternalServerError, "ingest error")
return
}
pages := result.Pages
if pages == nil {
pages = []string{}
}
warnings := result.Warnings
if warnings == nil {
warnings = []string{}
}
writeJSON(w, ingestResponse{Pages: pages, Warnings: warnings})
}
// supportedExtensions lists file extensions that IngestPath will process.
var supportedExtensions = map[string]bool{
".md": true,
".txt": true,
".pdf": true,
}
// IngestPath handles POST /ingest-path — ingest a file or directory.
func (h *Handler) IngestPath(w http.ResponseWriter, r *http.Request) {
var req ingestPathRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid JSON")
return
}
if strings.TrimSpace(req.Path) == "" {
writeError(w, http.StatusBadRequest, "path is required")
return
}
info, err := os.Stat(req.Path)
if err != nil {
writeError(w, http.StatusBadRequest, fmt.Sprintf("path not accessible: %v", err))
return
}
var allPages []string
var allWarnings []string
if info.IsDir() {
err = filepath.WalkDir(req.Path, func(path string, d os.DirEntry, walkErr error) error {
if walkErr != nil {
return walkErr
}
if d.IsDir() {
return nil
}
ext := strings.ToLower(filepath.Ext(path))
if !supportedExtensions[ext] {
return nil
}
content, readErr := os.ReadFile(path)
if readErr != nil {
allWarnings = append(allWarnings, fmt.Sprintf("read %s: %v", path, readErr))
return nil
}
source := req.Source
if source == "" {
source = filepath.Base(path)
}
result, runErr := pipeline.Run(r.Context(), h.pipeline, h.brainDir, string(content), source, req.DryRun)
if runErr != nil {
allWarnings = append(allWarnings, fmt.Sprintf("ingest %s: %v", path, runErr))
return nil
}
allPages = append(allPages, result.Pages...)
allWarnings = append(allWarnings, result.Warnings...)
return nil
})
if err != nil {
h.logger.Error("walk dir failed", "path", req.Path, "err", err)
writeError(w, http.StatusInternalServerError, fmt.Sprintf("walk error: %v", err))
return
}
} else {
ext := strings.ToLower(filepath.Ext(req.Path))
if !supportedExtensions[ext] {
writeError(w, http.StatusBadRequest, fmt.Sprintf("unsupported file extension: %s", ext))
return
}
content, readErr := os.ReadFile(req.Path)
if readErr != nil {
writeError(w, http.StatusInternalServerError, fmt.Sprintf("read file: %v", readErr))
return
}
source := req.Source
if source == "" {
source = filepath.Base(req.Path)
}
result, runErr := pipeline.Run(r.Context(), h.pipeline, h.brainDir, string(content), source, req.DryRun)
if runErr != nil {
h.logger.Error("ingest-path failed", "path", req.Path, "err", runErr)
writeError(w, http.StatusInternalServerError, "ingest error")
return
}
allPages = result.Pages
allWarnings = result.Warnings
}
if allPages == nil {
allPages = []string{}
}
if allWarnings == nil {
allWarnings = []string{}
}
writeJSON(w, ingestResponse{Pages: allPages, Warnings: allWarnings})
}
func writeJSON(w http.ResponseWriter, v any) { func writeJSON(w http.ResponseWriter, v any) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(v) //nolint:errcheck json.NewEncoder(w).Encode(v) //nolint:errcheck
} }
func writeError(w http.ResponseWriter, code int, msg string) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
json.NewEncoder(w).Encode(map[string]string{"error": msg}) //nolint:errcheck
}

View File

@@ -3,6 +3,7 @@ package api_test
import ( import (
"bytes" "bytes"
"context"
"encoding/json" "encoding/json"
"log/slog" "log/slog"
"net/http" "net/http"
@@ -12,11 +13,26 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/mathiasbq/hyperguild/ingestion/internal/api"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
) )
// stubComplete returns a fixed JSON page so tests never call a real LLM.
func stubComplete(_ context.Context, _, _ string) (string, error) {
return `[{"path":"wiki/sources/test-source.md","content":"# Test Source\n\nSome content here.\n"}]`, nil
}
func stubPipelineCfg() pipeline.Config {
return pipeline.Config{
Complete: stubComplete,
ChunkSize: 0,
Schema: "# Test Schema\nwiki/sources/, wiki/concepts/, wiki/entities/",
}
}
func setup(t *testing.T) (string, *api.Handler) { func setup(t *testing.T) (string, *api.Handler) {
t.Helper() t.Helper()
dir := t.TempDir() dir := t.TempDir()
@@ -27,9 +43,13 @@ func setup(t *testing.T) (string, *api.Handler) {
0o644, 0o644,
)) ))
logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) logger := slog.New(slog.NewTextHandler(os.Stderr, nil))
return dir, api.NewHandler(dir, logger) return dir, api.NewHandler(dir, logger, stubPipelineCfg())
} }
// ---------------------------------------------------------------------------
// Existing tests (Write / Query)
// ---------------------------------------------------------------------------
func TestQuery_ReturnsResults(t *testing.T) { func TestQuery_ReturnsResults(t *testing.T) {
_, h := setup(t) _, h := setup(t)
body, _ := json.Marshal(map[string]any{"query": "test driven", "limit": 5}) body, _ := json.Marshal(map[string]any{"query": "test driven", "limit": 5})
@@ -112,3 +132,122 @@ func TestWrite_GeneratesFilenameIfAbsent(t *testing.T) {
assert.Len(t, entries, 2) assert.Len(t, entries, 2)
assert.True(t, strings.HasSuffix(entries[1].Name(), ".md")) assert.True(t, strings.HasSuffix(entries[1].Name(), ".md"))
} }
// ---------------------------------------------------------------------------
// POST /ingest
// ---------------------------------------------------------------------------
func TestIngest_Validation(t *testing.T) {
cases := []struct {
name string
body map[string]any
}{
{"missing content", map[string]any{"source": "test-source"}},
{"missing source", map[string]any{"content": "some content"}},
{"whitespace content", map[string]any{"content": " ", "source": "test-source"}},
{"whitespace source", map[string]any{"content": "some content", "source": " "}},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
_, h := setup(t)
body, _ := json.Marshal(tc.body)
req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.Ingest(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
})
}
}
func TestIngest_Success(t *testing.T) {
_, h := setup(t)
body, _ := json.Marshal(map[string]any{
"content": "some content about shape-up methodology",
"source": "shape-up-book",
"dry_run": true,
})
req := httptest.NewRequest(http.MethodPost, "/ingest", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.Ingest(rec, req)
require.Equal(t, http.StatusOK, rec.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp))
pages, ok := resp["pages"]
require.True(t, ok, "response must have pages field")
pagesSlice, ok := pages.([]any)
require.True(t, ok, "pages must be an array")
assert.NotEmpty(t, pagesSlice)
}
// ---------------------------------------------------------------------------
// POST /ingest-path
// ---------------------------------------------------------------------------
func TestIngestPath_MissingPath(t *testing.T) {
_, h := setup(t)
body, _ := json.Marshal(map[string]any{"source": "test-source"})
req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.IngestPath(rec, req)
assert.Equal(t, http.StatusBadRequest, rec.Code)
}
func TestIngestPath_File(t *testing.T) {
_, h := setup(t)
// Create a temp file with content
dir := t.TempDir()
f := filepath.Join(dir, "doc.md")
require.NoError(t, os.WriteFile(f, []byte("# Hello\nThis is markdown content."), 0o644))
body, _ := json.Marshal(map[string]any{
"path": f,
"source": "test-doc",
"dry_run": true,
})
req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.IngestPath(rec, req)
require.Equal(t, http.StatusOK, rec.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp))
pages, ok := resp["pages"]
require.True(t, ok, "response must have pages field")
pagesSlice, ok := pages.([]any)
require.True(t, ok, "pages must be an array")
assert.NotEmpty(t, pagesSlice)
}
func TestIngestPath_Directory(t *testing.T) {
_, h := setup(t)
// Create a temp dir with one .md file
dir := t.TempDir()
require.NoError(t, os.WriteFile(filepath.Join(dir, "notes.md"), []byte("# Notes\nSome notes."), 0o644))
body, _ := json.Marshal(map[string]any{
"path": dir,
"dry_run": true,
})
req := httptest.NewRequest(http.MethodPost, "/ingest-path", bytes.NewReader(body))
rec := httptest.NewRecorder()
h.IngestPath(rec, req)
require.Equal(t, http.StatusOK, rec.Code)
var resp map[string]any
require.NoError(t, json.Unmarshal(rec.Body.Bytes(), &resp))
pages, ok := resp["pages"]
require.True(t, ok, "response must have pages field")
pagesSlice, ok := pages.([]any)
require.True(t, ok, "pages must be an array")
assert.NotEmpty(t, pagesSlice)
}

View File

@@ -0,0 +1,119 @@
package llm
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"time"
)
// Client calls an OpenAI-compatible chat completions endpoint.
type Client struct {
baseURL string
apiKey string
model string
httpClient *http.Client
}
// New constructs a Client.
func New(baseURL, apiKey, model string, timeout time.Duration) *Client {
return &Client{
baseURL: strings.TrimRight(baseURL, "/"),
apiKey: apiKey,
model: model,
httpClient: &http.Client{Timeout: timeout},
}
}
type chatRequest struct {
Model string `json:"model"`
Messages []message `json:"messages"`
Temperature float64 `json:"temperature"`
}
type message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type chatResponse struct {
Choices []struct {
Message message `json:"message"`
} `json:"choices"`
}
// Complete sends a system + user message and returns the assistant's reply.
// Retries once on HTTP 429 using Retry-After header or 5s backoff.
func (c *Client) Complete(ctx context.Context, system, user string) (string, error) {
body := chatRequest{
Model: c.model,
Messages: []message{
{Role: "system", Content: system},
{Role: "user", Content: user},
},
Temperature: 0.2,
}
b, err := json.Marshal(body)
if err != nil {
return "", fmt.Errorf("marshal request: %w", err)
}
do := func() (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.baseURL+"/chat/completions", bytes.NewReader(b))
if err != nil {
return nil, fmt.Errorf("build request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
if c.apiKey != "" {
req.Header.Set("Authorization", "Bearer "+c.apiKey)
}
return c.httpClient.Do(req)
}
resp, err := do()
if err != nil {
return "", fmt.Errorf("call LLM: %w", err)
}
if resp.StatusCode == http.StatusTooManyRequests {
resp.Body.Close()
wait := 5 * time.Second
if ra := resp.Header.Get("Retry-After"); ra != "" {
if secs, err := strconv.Atoi(ra); err == nil {
wait = time.Duration(secs) * time.Second
}
}
select {
case <-ctx.Done():
return "", ctx.Err()
case <-time.After(wait):
}
resp, err = do()
if err != nil {
return "", fmt.Errorf("retry LLM call: %w", err)
}
}
defer resp.Body.Close()
out, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("LLM returned %d: %s", resp.StatusCode, out)
}
var cr chatResponse
if err := json.Unmarshal(out, &cr); err != nil {
return "", fmt.Errorf("parse response: %w", err)
}
if len(cr.Choices) == 0 {
return "", fmt.Errorf("LLM returned no choices")
}
return cr.Choices[0].Message.Content, nil
}

View File

@@ -0,0 +1,86 @@
package llm
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func mockServer(t *testing.T, response string) *httptest.Server {
t.Helper()
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/chat/completions", r.URL.Path)
assert.Equal(t, "application/json", r.Header.Get("Content-Type"))
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{
{"message": map[string]any{"role": "assistant", "content": response}},
},
})
}))
}
func TestClient_Complete(t *testing.T) {
srv := mockServer(t, "hello world")
defer srv.Close()
c := New(srv.URL, "", "test-model", 10*time.Second)
got, err := c.Complete(context.Background(), "you are helpful", "say hello")
require.NoError(t, err)
assert.Equal(t, "hello world", got)
}
func TestClient_ReturnsErrorOnNon200(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
http.Error(w, "overloaded", http.StatusServiceUnavailable)
}))
defer srv.Close()
c := New(srv.URL, "", "test-model", 10*time.Second)
_, err := c.Complete(context.Background(), "sys", "user")
assert.Error(t, err)
}
func TestClient_SendsAuthHeader(t *testing.T) {
var gotAuth string
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gotAuth = r.Header.Get("Authorization")
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{{"message": map[string]any{"content": "ok"}}},
})
}))
defer srv.Close()
c := New(srv.URL, "my-key", "test-model", 10*time.Second)
_, err := c.Complete(context.Background(), "sys", "user")
require.NoError(t, err)
assert.Equal(t, "Bearer my-key", gotAuth)
}
func TestClient_Retries429(t *testing.T) {
calls := 0
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
calls++
if calls == 1 {
w.Header().Set("Retry-After", "0")
w.WriteHeader(http.StatusTooManyRequests)
return
}
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{{"message": map[string]any{"content": "retried"}}},
})
}))
defer srv.Close()
c := New(srv.URL, "", "test-model", 10*time.Second)
got, err := c.Complete(context.Background(), "sys", "user")
require.NoError(t, err)
assert.Equal(t, "retried", got)
assert.Equal(t, 2, calls)
}

View File

@@ -0,0 +1,39 @@
// ingestion/internal/pipeline/chunk.go
package pipeline
import "strings"
// Chunk splits content into pieces of at most maxSize bytes, splitting at
// paragraph boundaries (\n\n). If maxSize <= 0, returns content as one chunk.
func Chunk(content string, maxSize int) []string {
content = strings.TrimSpace(content)
if maxSize <= 0 || len(content) <= maxSize {
return []string{content}
}
paragraphs := strings.Split(content, "\n\n")
var chunks []string
var cur strings.Builder
for _, para := range paragraphs {
para = strings.TrimSpace(para)
if para == "" {
continue
}
addition := para
if cur.Len() > 0 {
addition = "\n\n" + para
}
if cur.Len() > 0 && cur.Len()+len(addition) > maxSize {
chunks = append(chunks, cur.String())
cur.Reset()
cur.WriteString(para)
} else {
cur.WriteString(addition)
}
}
if cur.Len() > 0 {
chunks = append(chunks, cur.String())
}
return chunks
}

View File

@@ -0,0 +1,36 @@
// ingestion/internal/pipeline/chunk_test.go
package pipeline
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestChunk_NoChunkingWhenZero(t *testing.T) {
content := strings.Repeat("word ", 1000)
chunks := Chunk(content, 0)
assert.Len(t, chunks, 1)
}
func TestChunk_SplitsAtParagraph(t *testing.T) {
content := "First paragraph here.\n\nSecond paragraph here."
chunks := Chunk(content, 40)
assert.Len(t, chunks, 2)
assert.Equal(t, "First paragraph here.", chunks[0])
assert.Equal(t, "Second paragraph here.", chunks[1])
}
func TestChunk_SingleLargeParagraph(t *testing.T) {
content := strings.Repeat("x", 100)
chunks := Chunk(content, 50)
assert.Len(t, chunks, 1)
}
func TestChunk_NoChunkingWhenContentFits(t *testing.T) {
content := "Short content."
chunks := Chunk(content, 1000)
assert.Len(t, chunks, 1)
assert.Equal(t, "Short content.", chunks[0])
}

View File

@@ -0,0 +1,55 @@
// ingestion/internal/pipeline/parse.go
package pipeline
import (
"encoding/json"
"fmt"
"strings"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
// ParsePages parses LLM output as a JSON array of {path, content} objects.
// If the array is truncated mid-object (token limit), it salvages all complete objects.
func ParsePages(output string) ([]wiki.Page, []string) {
output = strings.TrimSpace(output)
if output == "" {
return nil, []string{"LLM returned empty output"}
}
output = stripFences(output)
var pages []wiki.Page
if err := json.Unmarshal([]byte(output), &pages); err == nil {
return pages, nil
}
// Truncation recovery: find last `}` that closes a complete object.
idx := strings.LastIndex(output, "}")
if idx < 0 {
return nil, []string{"LLM output contained no complete JSON objects"}
}
start := strings.Index(output, "[")
if start < 0 {
return nil, []string{"LLM output contained no JSON array opening bracket"}
}
candidate := output[start:idx+1] + "]"
if err := json.Unmarshal([]byte(candidate), &pages); err != nil {
return nil, []string{fmt.Sprintf("truncation recovery failed: %v", err)}
}
return pages, []string{fmt.Sprintf("LLM output was truncated; recovered %d page(s)", len(pages))}
}
func stripFences(s string) string {
for _, prefix := range []string{"```json\n", "```json\r\n", "```\n", "```\r\n"} {
if strings.HasPrefix(s, prefix) {
s = strings.TrimPrefix(s, prefix)
s = strings.TrimSuffix(strings.TrimSpace(s), "```")
return strings.TrimSpace(s)
}
}
return s
}

View File

@@ -0,0 +1,46 @@
// ingestion/internal/pipeline/parse_test.go
package pipeline
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestParsePages_ValidJSON(t *testing.T) {
input := `[{"path":"wiki/sources/foo.md","content":"# Foo"},{"path":"wiki/concepts/bar.md","content":"# Bar"}]`
pages, warnings := ParsePages(input)
require.Len(t, pages, 2)
assert.Empty(t, warnings)
assert.Equal(t, "wiki/sources/foo.md", pages[0].Path)
assert.Equal(t, "wiki/concepts/bar.md", pages[1].Path)
}
func TestParsePages_StripsFences(t *testing.T) {
input := "```json\n[{\"path\":\"wiki/sources/foo.md\",\"content\":\"# Foo\"}]\n```"
pages, warnings := ParsePages(input)
assert.Len(t, pages, 1)
assert.Empty(t, warnings)
}
func TestParsePages_TruncationRecovery(t *testing.T) {
input := `[{"path":"wiki/sources/foo.md","content":"# Foo"},{"path":"wiki/concepts/bar.md","content":"trunc`
pages, warnings := ParsePages(input)
require.Len(t, pages, 1)
assert.Equal(t, "wiki/sources/foo.md", pages[0].Path)
assert.NotEmpty(t, warnings)
}
func TestParsePages_EmptyInput(t *testing.T) {
pages, warnings := ParsePages("")
assert.Empty(t, pages)
assert.NotEmpty(t, warnings)
}
func TestParsePages_PlainFence(t *testing.T) {
input := "```\n[{\"path\":\"wiki/sources/foo.md\",\"content\":\"ok\"}]\n```"
pages, warnings := ParsePages(input)
assert.Len(t, pages, 1)
assert.Empty(t, warnings)
}

View File

@@ -0,0 +1,120 @@
// ingestion/internal/pipeline/pipeline.go
package pipeline
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
// CompleteFunc is the function signature for LLM calls.
type CompleteFunc func(ctx context.Context, system, user string) (string, error)
// Config holds pipeline configuration.
type Config struct {
Complete CompleteFunc
ChunkSize int // 0 = no chunking
Schema string // overrides brain/schema.md when set (useful in tests)
}
// Result is the outcome of a pipeline run.
type Result struct {
Pages []string // relative paths written (or would-be written in dry-run)
Warnings []string
}
// Run ingests content and writes structured wiki pages to brainDir/wiki/.
// In dry-run mode, pages are returned but not written to disk.
func Run(ctx context.Context, cfg Config, brainDir, content, source string, dryRun bool) (Result, error) {
inventory, err := wiki.LoadInventory(brainDir)
if err != nil {
return Result{}, fmt.Errorf("load inventory: %w", err)
}
schema := cfg.Schema
if schema == "" {
schema = loadSchema(brainDir)
}
chunks := Chunk(content, cfg.ChunkSize)
var allPages []wiki.Page
var allWarnings []string
for _, chunk := range chunks {
userPrompt := BuildPrompt(schema, source, chunk, inventory)
output, err := cfg.Complete(ctx, systemPrompt, userPrompt)
if err != nil {
return Result{}, fmt.Errorf("LLM call: %w", err)
}
pages, warnings := ParsePages(output)
allPages = append(allPages, pages...)
allWarnings = append(allWarnings, warnings...)
}
merged := mergeAll(allPages)
date := time.Now().UTC().Format("2006-01-02")
var written []string
for _, page := range merged {
if !dryRun {
dest := filepath.Join(brainDir, filepath.FromSlash(page.Path))
if err := os.MkdirAll(filepath.Dir(dest), 0o755); err != nil {
return Result{}, fmt.Errorf("mkdir for %s: %w", page.Path, err)
}
if err := os.WriteFile(dest, []byte(page.Content), 0o644); err != nil {
return Result{}, fmt.Errorf("write %s: %w", page.Path, err)
}
}
written = append(written, page.Path)
}
if !dryRun {
if err := wiki.RebuildIndex(brainDir, date); err != nil {
allWarnings = append(allWarnings, fmt.Sprintf("rebuild index: %v", err))
}
if err := wiki.AppendLog(brainDir, source, written, allWarnings, date); err != nil {
allWarnings = append(allWarnings, fmt.Sprintf("append log: %v", err))
}
}
return Result{Pages: written, Warnings: allWarnings}, nil
}
// mergeAll deduplicates pages by path, merging content from later occurrences.
func mergeAll(pages []wiki.Page) []wiki.Page {
order := make([]string, 0, len(pages))
byPath := make(map[string]wiki.Page, len(pages))
for _, p := range pages {
if _, seen := byPath[p.Path]; !seen {
order = append(order, p.Path)
byPath[p.Path] = p
} else {
byPath[p.Path] = wiki.Merge(byPath[p.Path], p)
}
}
result := make([]wiki.Page, 0, len(order))
for _, path := range order {
result = append(result, byPath[path])
}
return result
}
const defaultSchema = `# Brain Wiki Schema
Three page types: wiki/sources/, wiki/concepts/, wiki/entities/.
See brain/schema.md for the full schema.
`
func loadSchema(brainDir string) string {
b, err := os.ReadFile(filepath.Join(brainDir, "schema.md"))
if err != nil {
return defaultSchema
}
return strings.TrimSpace(string(b))
}

View File

@@ -0,0 +1,133 @@
// ingestion/internal/pipeline/pipeline_test.go
package pipeline
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
func TestRun_WritesPages(t *testing.T) {
brainDir := t.TempDir()
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
llmResponse := mustJSON([]wiki.Page{
{
Path: "wiki/sources/test-article.md",
Content: "---\ntitle: Test Article\ntype: article\ndomain: software-engineering\ndate_ingested: 2026-04-22\nlast_updated: 2026-04-22\naliases:\n - Test Article\n---\n\n## Summary\n\nA test article.\n\n## Key Claims\n\n- It tests things.\n\n## Concepts Introduced or Reinforced\n\n## Entities Mentioned\n\n## Open Questions Raised\n",
},
{
Path: "wiki/concepts/testing.md",
Content: "---\ntitle: Testing\ndomain: software-engineering\nlast_updated: 2026-04-22\naliases:\n - Testing\n---\n\n## Definition\n\nThe practice of verifying software.\n\n## Why It Matters\n\nCatches bugs.\n\n## Related Concepts\n\n## Related Entities\n\n## Sources\n\n## Evolving Notes\n",
},
})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{
{"message": map[string]any{"role": "assistant", "content": llmResponse}},
},
})
}))
defer srv.Close()
cfg := Config{
Complete: llm.New(srv.URL, "", "test-model", 30*time.Second).Complete,
ChunkSize: 0,
}
result, err := Run(context.Background(), cfg, brainDir, "An article about testing.", "test-article", false)
require.NoError(t, err)
assert.Len(t, result.Pages, 2)
assert.Empty(t, result.Warnings)
_, err = os.Stat(filepath.Join(brainDir, "wiki", "sources", "test-article.md"))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(brainDir, "wiki", "concepts", "testing.md"))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(brainDir, "wiki", "index.md"))
require.NoError(t, err)
_, err = os.Stat(filepath.Join(brainDir, "log.md"))
require.NoError(t, err)
}
func TestRun_DryRunDoesNotWrite(t *testing.T) {
brainDir := t.TempDir()
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
llmResponse := mustJSON([]wiki.Page{{
Path: "wiki/sources/foo.md",
Content: "---\ntitle: Foo\n---\n\n## Summary\n\nFoo.\n",
}})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{{"message": map[string]any{"content": llmResponse}}},
})
}))
defer srv.Close()
cfg := Config{Complete: llm.New(srv.URL, "", "m", 30*time.Second).Complete}
result, err := Run(context.Background(), cfg, brainDir, "foo content", "foo", true)
require.NoError(t, err)
assert.Len(t, result.Pages, 1)
_, err = os.Stat(filepath.Join(brainDir, "wiki", "sources", "foo.md"))
assert.True(t, os.IsNotExist(err))
}
func TestRun_MergesDuplicatePaths(t *testing.T) {
brainDir := t.TempDir()
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
// LLM returns same path twice (simulates multi-chunk merge)
llmResponse := mustJSON([]wiki.Page{
{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nFirst.\n\n## Related Concepts\n\n- [[bar|Bar]]\n"},
{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nSecond.\n\n## Related Concepts\n\n- [[baz|Baz]]\n"},
})
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(map[string]any{
"choices": []map[string]any{{"message": map[string]any{"content": llmResponse}}},
})
}))
defer srv.Close()
cfg := Config{Complete: llm.New(srv.URL, "", "m", 30*time.Second).Complete}
result, err := Run(context.Background(), cfg, brainDir, "content", "foo", false)
require.NoError(t, err)
assert.Len(t, result.Pages, 1) // deduplicated
content, err := os.ReadFile(filepath.Join(brainDir, "wiki", "concepts", "foo.md"))
require.NoError(t, err)
// keep-first for Definition, union for Related Concepts
assert.Contains(t, string(content), "First.")
assert.Contains(t, string(content), "[[bar|Bar]]")
assert.Contains(t, string(content), "[[baz|Baz]]")
}
func mustJSON(v any) string {
b, err := json.Marshal(v)
if err != nil {
panic(err)
}
return string(b)
}

View File

@@ -0,0 +1,60 @@
// ingestion/internal/pipeline/prompt.go
package pipeline
import (
"fmt"
"strings"
"time"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
const systemPrompt = `You are a wiki agent. Read the source material and produce structured wiki pages following the schema provided.
Output ONLY a valid JSON array — no markdown fences, no other text before or after.
Each element must have:
"path" — relative path within the wiki, e.g. "wiki/sources/foo.md"
"content" — full markdown content of the page including YAML frontmatter
Follow the schema strictly: correct frontmatter fields, wikilinks as [[slug|Display Text]],
dates in YYYY-MM-DD format, and paraphrase rather than quoting verbatim.`
// BuildPrompt constructs the user prompt for a single chunk.
func BuildPrompt(schema, source, content string, inventory map[wiki.PageType][]wiki.Entry) string {
var sb strings.Builder
fmt.Fprintf(&sb, "Today's date is %s.\n\n", time.Now().UTC().Format("2006-01-02"))
sb.WriteString("## Schema\n\n")
sb.WriteString(schema)
sb.WriteString("\n\n")
sb.WriteString("## Existing wiki pages\n\n")
sb.WriteString("Link ONLY to pages in this inventory or pages you are creating in this response.\n\n")
for _, pt := range []wiki.PageType{wiki.PageTypeConcept, wiki.PageTypeEntity, wiki.PageTypeSource} {
entries := inventory[pt]
label := strings.ToUpper(string(pt)[:1]) + string(pt)[1:]
if len(entries) == 0 {
fmt.Fprintf(&sb, "%s — (none yet)\n\n", label)
continue
}
fmt.Fprintf(&sb, "%s — link ONLY under the matching section:\n", label)
for _, e := range entries {
fmt.Fprintf(&sb, " - [[%s|%s]]\n", e.Slug, e.Title)
}
sb.WriteString("\n")
}
sb.WriteString("## Non-negotiable rules\n\n")
sb.WriteString("1. Output ONLY a valid JSON array — no prose, no fences.\n")
sb.WriteString("2. Slugs are kebab-case: lowercase, spaces→hyphens, no special chars.\n")
sb.WriteString("3. Wikilinks: [[slug|Display Text]] — the pipe is required.\n")
sb.WriteString("4. Section links must match their section type.\n")
sb.WriteString("5. One source page per book — update it if inventory shows it exists.\n\n")
fmt.Fprintf(&sb, "## Source: %s\n\n", source)
sb.WriteString(content)
return sb.String()
}

View File

@@ -33,7 +33,12 @@ func Query(brainDir, query string, limit int) ([]Result, error) {
var results []Result var results []Result
err := filepath.WalkDir(filepath.Join(brainDir, "knowledge"), func(path string, d os.DirEntry, err error) error { for _, subdir := range []string{"knowledge", "wiki"} {
dir := filepath.Join(brainDir, subdir)
if _, statErr := os.Stat(dir); os.IsNotExist(statErr) {
continue
}
err := filepath.WalkDir(dir, func(path string, d os.DirEntry, err error) error {
if err != nil { if err != nil {
slog.Warn("search: skipping path", "path", path, "err", err) slog.Warn("search: skipping path", "path", path, "err", err)
return nil return nil
@@ -74,6 +79,7 @@ func Query(brainDir, query string, limit int) ([]Result, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
sort.Slice(results, func(i, j int) bool { sort.Slice(results, func(i, j int) bool {
return results[i].Score > results[j].Score return results[i].Score > results[j].Score

View File

@@ -0,0 +1,173 @@
// ingestion/internal/watcher/watcher.go
package watcher
import (
"context"
"fmt"
"log/slog"
"os"
"path/filepath"
"strings"
"time"
"unicode"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
)
// Config holds watcher configuration.
type Config struct {
BrainDir string
Interval time.Duration
Pipeline pipeline.Config
}
// Start launches the watcher in a background goroutine.
// It returns immediately. The watcher stops when ctx is cancelled.
func Start(ctx context.Context, cfg Config) {
go func() {
ticker := time.NewTicker(cfg.Interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
date := time.Now().UTC().Format("2006-01-02")
errs := processDir(ctx, cfg, date)
for _, err := range errs {
slog.Error("watcher: error processing file", "error", err)
}
}
}
}()
}
// processDir walks brain/raw/, processes each eligible file, returns any errors encountered.
func processDir(ctx context.Context, cfg Config, date string) []error {
rawDir := filepath.Join(cfg.BrainDir, "raw")
var errs []error
err := filepath.WalkDir(rawDir, func(path string, d os.DirEntry, err error) error {
if err != nil {
return err
}
// Skip the root itself.
if path == rawDir {
return nil
}
// Skip processed/ and failed/ subdirectories entirely.
if d.IsDir() {
name := d.Name()
if name == "processed" || name == "failed" {
return filepath.SkipDir
}
return nil
}
// Only process supported extensions.
ext := strings.ToLower(filepath.Ext(path))
if ext != ".md" && ext != ".txt" && ext != ".pdf" {
return nil
}
if err := processFile(ctx, cfg, path, date); err != nil {
errs = append(errs, fmt.Errorf("process %s: %w", filepath.Base(path), err))
}
return nil
})
if err != nil {
errs = append(errs, fmt.Errorf("walk raw dir: %w", err))
}
return errs
}
// processFile reads a file, calls pipeline.Run, moves it to processed/ or failed/.
func processFile(ctx context.Context, cfg Config, path, date string) error {
filename := filepath.Base(path)
source := deriveSource(filename)
content, err := os.ReadFile(path)
if err != nil {
return fmt.Errorf("read file: %w", err)
}
_, runErr := pipeline.Run(ctx, cfg.Pipeline, cfg.BrainDir, string(content), source, false)
if runErr != nil {
// Move to failed/.
failedDir := filepath.Join(cfg.BrainDir, "raw", "failed")
if mkErr := os.MkdirAll(failedDir, 0o755); mkErr != nil {
return fmt.Errorf("mkdir failed dir: %w", mkErr)
}
dest := filepath.Join(failedDir, filename)
if mvErr := os.Rename(path, dest); mvErr != nil {
return fmt.Errorf("move to failed: %w", mvErr)
}
slog.Warn("watcher: file failed, moved to failed/", "file", filename, "error", runErr)
if logErr := appendWatcherLog(cfg.BrainDir, filename, runErr, date); logErr != nil {
slog.Error("watcher: failed to write log entry", "error", logErr)
}
// Return nil: the file was quarantined successfully; the error was already
// logged. Returning runErr would cause processDir to log it again at Error level.
return nil
}
// Move to processed/YYYY-MM-DD/.
processedDir := filepath.Join(cfg.BrainDir, "raw", "processed", date)
if err := os.MkdirAll(processedDir, 0o755); err != nil {
return fmt.Errorf("mkdir processed dir: %w", err)
}
dest := filepath.Join(processedDir, filename)
if _, err := os.Stat(dest); err == nil {
// File already exists in processed; append timestamp to avoid overwriting the archive.
ext := filepath.Ext(filename)
base := strings.TrimSuffix(filename, ext)
dest = filepath.Join(processedDir, base+"-"+time.Now().UTC().Format("150405")+ext)
}
if err := os.Rename(path, dest); err != nil {
return fmt.Errorf("move to processed: %w", err)
}
slog.Info("watcher: file processed", "file", filename, "source", source)
return nil
}
// deriveSource turns a filename into a human-readable source name.
// "shape-up-book.md" → "Shape Up Book"
func deriveSource(filename string) string {
// Strip extension.
name := strings.TrimSuffix(filename, filepath.Ext(filename))
// Split on hyphens.
words := strings.Split(name, "-")
// Title-case each word.
for i, w := range words {
if w == "" {
continue
}
runes := []rune(w)
runes[0] = unicode.ToUpper(runes[0])
words[i] = string(runes)
}
return strings.Join(words, " ")
}
// appendWatcherLog appends a watcher error entry to brain/log.md.
func appendWatcherLog(brainDir, filename string, runErr error, date string) error {
entry := fmt.Sprintf("## %s — watcher error\n\n- **File:** %s\n- **Error:** %s\n\n",
date, filename, runErr.Error())
logPath := filepath.Join(brainDir, "log.md")
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open log: %w", err)
}
defer f.Close()
if _, err = f.WriteString(entry); err != nil {
return fmt.Errorf("write log: %w", err)
}
return nil
}

View File

@@ -0,0 +1,219 @@
// ingestion/internal/watcher/watcher_test.go
package watcher
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
"github.com/mathiasbq/hyperguild/ingestion/internal/wiki"
)
// successComplete returns a valid JSON-encoded page array for any call.
func successComplete(page wiki.Page) pipeline.CompleteFunc {
return func(ctx context.Context, system, user string) (string, error) {
b, err := json.Marshal([]wiki.Page{page})
if err != nil {
return "", err
}
return string(b), nil
}
}
// errorComplete always returns an error simulating an LLM failure.
func errorComplete(_ context.Context, _, _ string) (string, error) {
return "", fmt.Errorf("LLM unavailable")
}
func setupBrainDir(t *testing.T) string {
t.Helper()
brainDir := t.TempDir()
for _, sub := range []string{"wiki/concepts", "wiki/entities", "wiki/sources", "raw"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, sub), 0o755))
}
return brainDir
}
func TestStart_ProcessesFile(t *testing.T) {
brainDir := setupBrainDir(t)
// Place a .md file in raw/.
rawFile := filepath.Join(brainDir, "raw", "shape-up-book.md")
require.NoError(t, os.WriteFile(rawFile, []byte("Content about Shape Up."), 0o644))
date := time.Now().UTC().Format("2006-01-02")
wikiPage := wiki.Page{
Path: "wiki/sources/shape-up-book.md",
Content: "---\ntitle: Shape Up Book\ntype: article\ndomain: product-management\ndate_ingested: " + date + "\nlast_updated: " + date + "\naliases:\n - Shape Up Book\n---\n\n## Summary\n\nA book about Shape Up.\n",
}
cfg := Config{
BrainDir: brainDir,
Interval: 50 * time.Millisecond,
Pipeline: pipeline.Config{
Complete: successComplete(wikiPage),
ChunkSize: 0,
Schema: "# Schema\nThree page types.",
},
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
Start(ctx, cfg)
// Poll until the file is moved to processed/.
processedPath := filepath.Join(brainDir, "raw", "processed", date, "shape-up-book.md")
var found bool
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if _, err := os.Stat(processedPath); err == nil {
found = true
break
}
time.Sleep(20 * time.Millisecond)
}
require.True(t, found, "file should be moved to processed/")
// Original file should be gone.
_, err := os.Stat(rawFile)
assert.True(t, os.IsNotExist(err), "original file should be gone from raw/")
// Wiki page should exist.
wikiPath := filepath.Join(brainDir, "wiki", "sources", "shape-up-book.md")
_, err = os.Stat(wikiPath)
assert.NoError(t, err, "wiki page should be written")
// log.md should contain an ingest record.
logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md"))
require.NoError(t, err)
assert.Contains(t, string(logContent), "— ingest")
}
func TestStart_MovesToFailedOnError(t *testing.T) {
brainDir := setupBrainDir(t)
rawFile := filepath.Join(brainDir, "raw", "bad-file.md")
require.NoError(t, os.WriteFile(rawFile, []byte("Some content."), 0o644))
cfg := Config{
BrainDir: brainDir,
Interval: 50 * time.Millisecond,
Pipeline: pipeline.Config{
Complete: errorComplete,
ChunkSize: 0,
Schema: "# Schema\nThree page types.",
},
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
Start(ctx, cfg)
// Poll until the file is moved to failed/.
failedPath := filepath.Join(brainDir, "raw", "failed", "bad-file.md")
var found bool
deadline := time.Now().Add(2 * time.Second)
for time.Now().Before(deadline) {
if _, err := os.Stat(failedPath); err == nil {
found = true
break
}
time.Sleep(20 * time.Millisecond)
}
require.True(t, found, "file should be moved to failed/")
// Original file should be gone from raw/.
_, err := os.Stat(rawFile)
assert.True(t, os.IsNotExist(err), "original file should be gone from raw/")
// log.md should contain a watcher error entry.
logContent, err := os.ReadFile(filepath.Join(brainDir, "log.md"))
require.NoError(t, err)
assert.Contains(t, string(logContent), "— watcher error")
assert.Contains(t, string(logContent), "bad-file.md")
}
func TestDeriveSource(t *testing.T) {
tests := []struct {
filename string
want string
}{
{"shape-up-book.md", "Shape Up Book"},
{"raft-consensus.txt", "Raft Consensus"},
{"my-note.md", "My Note"},
{"single.md", "Single"},
{"no-extension", "No Extension"},
}
for _, tc := range tests {
t.Run(tc.filename, func(t *testing.T) {
got := deriveSource(tc.filename)
assert.Equal(t, tc.want, got)
})
}
}
func TestProcessDir_SkipsSubdirs(t *testing.T) {
brainDir := setupBrainDir(t)
// Create processed/ and failed/ subdirs with files inside.
for _, sub := range []string{"processed/2026-04-22", "failed"} {
require.NoError(t, os.MkdirAll(filepath.Join(brainDir, "raw", sub), 0o755))
}
processedFile := filepath.Join(brainDir, "raw", "processed", "2026-04-22", "old-file.md")
failedFile := filepath.Join(brainDir, "raw", "failed", "broken-file.md")
require.NoError(t, os.WriteFile(processedFile, []byte("old"), 0o644))
require.NoError(t, os.WriteFile(failedFile, []byte("broken"), 0o644))
// Also place a valid file in raw/ root that should be processed.
validFile := filepath.Join(brainDir, "raw", "valid.md")
require.NoError(t, os.WriteFile(validFile, []byte("valid content"), 0o644))
date := time.Now().UTC().Format("2006-01-02")
// Track which sources were passed to Complete.
var processedSources []string
completeFn := func(ctx context.Context, system, user string) (string, error) {
// Record that this was called; return a minimal valid page.
page := wiki.Page{
Path: "wiki/sources/valid.md",
Content: "---\ntitle: Valid\n---\n\n## Summary\n\nValid.\n",
}
b, _ := json.Marshal([]wiki.Page{page})
processedSources = append(processedSources, "called")
return string(b), nil
}
cfg := Config{
BrainDir: brainDir,
Interval: time.Hour, // not used; we call processDir directly
Pipeline: pipeline.Config{
Complete: completeFn,
ChunkSize: 0,
Schema: "# Schema\nThree page types.",
},
}
errs := processDir(context.Background(), cfg, date)
assert.Empty(t, errs, "no errors expected")
// Complete should have been called exactly once (for valid.md, not for files in subdirs).
assert.Len(t, processedSources, 1, "only the file in raw/ root should be processed")
// Files in processed/ and failed/ must remain untouched.
_, err := os.Stat(processedFile)
assert.NoError(t, err, "processed subdir file should be untouched")
_, err = os.Stat(failedFile)
assert.NoError(t, err, "failed subdir file should be untouched")
}

View File

@@ -0,0 +1,71 @@
// ingestion/internal/wiki/index.go
package wiki
import (
"fmt"
"os"
"path/filepath"
"strings"
)
// RebuildIndex writes brain/wiki/index.md from the current wiki contents.
func RebuildIndex(brainDir, date string) error {
inv, err := LoadInventory(brainDir)
if err != nil {
return fmt.Errorf("load inventory: %w", err)
}
total := len(inv[PageTypeConcept]) + len(inv[PageTypeEntity]) + len(inv[PageTypeSource])
var sb strings.Builder
fmt.Fprintf(&sb, "# Wiki Index\n\n")
fmt.Fprintf(&sb, "_Updated: %s — %d pages (%d concepts, %d entities, %d sources)_\n\n",
date, total,
len(inv[PageTypeConcept]),
len(inv[PageTypeEntity]),
len(inv[PageTypeSource]))
for _, pt := range []PageType{PageTypeConcept, PageTypeEntity, PageTypeSource} {
entries := inv[pt]
if len(entries) == 0 {
continue
}
label := strings.ToUpper(string(pt)[:1]) + string(pt)[1:]
fmt.Fprintf(&sb, "## %s\n\n", label)
for _, e := range entries {
summary := pageFirstSentence(brainDir, e)
if summary != "" {
fmt.Fprintf(&sb, "- [[%s|%s]] — %s\n", e.Slug, e.Title, summary)
} else {
fmt.Fprintf(&sb, "- [[%s|%s]]\n", e.Slug, e.Title)
}
}
sb.WriteString("\n")
}
dest := filepath.Join(brainDir, "wiki", "index.md")
return os.WriteFile(dest, []byte(sb.String()), 0o644)
}
func pageFirstSentence(brainDir string, e Entry) string {
path := filepath.Join(brainDir, "wiki", string(e.Type), e.Slug+".md")
content, err := os.ReadFile(path)
if err != nil {
return ""
}
parts := strings.SplitN(string(content), "---", 3)
body := string(content)
if len(parts) == 3 {
body = parts[2]
}
for _, line := range strings.Split(body, "\n") {
line = strings.TrimSpace(line)
if line == "" || strings.HasPrefix(line, "#") {
continue
}
if len(line) > 100 {
return line[:100] + "…"
}
return line
}
return ""
}

View File

@@ -0,0 +1,76 @@
// ingestion/internal/wiki/index_test.go
package wiki
import (
"os"
"path/filepath"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func setupWikiDir(t *testing.T) string {
t.Helper()
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "concepts"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "entities"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "sources"), 0o755))
require.NoError(t, os.WriteFile(
filepath.Join(dir, "wiki", "concepts", "tdd.md"),
[]byte("---\ntitle: TDD\n---\n\n## Definition\n\nTest-driven development is a discipline.\n"),
0o644,
))
return dir
}
func TestRebuildIndex(t *testing.T) {
dir := setupWikiDir(t)
require.NoError(t, RebuildIndex(dir, "2026-04-22"))
content, err := os.ReadFile(filepath.Join(dir, "wiki", "index.md"))
require.NoError(t, err)
s := string(content)
assert.Contains(t, s, "# Wiki Index")
assert.Contains(t, s, "2026-04-22")
assert.Contains(t, s, "[[tdd|TDD]]")
assert.Contains(t, s, "## Concepts")
}
func TestRebuildIndex_EmptyWiki(t *testing.T) {
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "concepts"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "entities"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "sources"), 0o755))
require.NoError(t, RebuildIndex(dir, "2026-04-22"))
content, err := os.ReadFile(filepath.Join(dir, "wiki", "index.md"))
require.NoError(t, err)
assert.Contains(t, string(content), "# Wiki Index")
}
func TestAppendLog(t *testing.T) {
dir := t.TempDir()
require.NoError(t, AppendLog(dir, "shape-up-book",
[]string{"wiki/sources/shape-up.md", "wiki/concepts/betting-table.md"},
nil, "2026-04-22"))
content, err := os.ReadFile(filepath.Join(dir, "log.md"))
require.NoError(t, err)
s := string(content)
assert.Contains(t, s, "shape-up-book")
assert.Contains(t, s, "wiki/sources/shape-up.md")
assert.True(t, strings.HasPrefix(s, "## 2026-04-22"))
}
func TestAppendLog_AppendsOnSecondCall(t *testing.T) {
dir := t.TempDir()
require.NoError(t, AppendLog(dir, "source-a", []string{"wiki/sources/a.md"}, nil, "2026-04-22"))
require.NoError(t, AppendLog(dir, "source-b", []string{"wiki/sources/b.md"}, nil, "2026-04-22"))
content, err := os.ReadFile(filepath.Join(dir, "log.md"))
require.NoError(t, err)
assert.Contains(t, string(content), "source-a")
assert.Contains(t, string(content), "source-b")
}

View File

@@ -0,0 +1,69 @@
// ingestion/internal/wiki/inventory.go
package wiki
import (
"bufio"
"fmt"
"os"
"path/filepath"
"strings"
)
// LoadInventory walks brain/wiki/ and returns all pages grouped by type.
// Missing subdirectories are silently skipped.
func LoadInventory(brainDir string) (map[PageType][]Entry, error) {
result := map[PageType][]Entry{
PageTypeConcept: {},
PageTypeEntity: {},
PageTypeSource: {},
}
for pt := range result {
dir := filepath.Join(brainDir, "wiki", string(pt))
entries, err := os.ReadDir(dir)
if os.IsNotExist(err) {
continue
}
if err != nil {
return nil, fmt.Errorf("read dir %s: %w", dir, err)
}
for _, e := range entries {
if e.IsDir() || !strings.HasSuffix(e.Name(), ".md") {
continue
}
slug := strings.TrimSuffix(e.Name(), ".md")
path := filepath.Join(dir, e.Name())
title := readTitle(path, slug)
result[pt] = append(result[pt], Entry{Slug: slug, Title: title, Type: pt})
}
}
return result, nil
}
// readTitle extracts the title from YAML frontmatter, falling back to slug.
func readTitle(path, fallback string) string {
f, err := os.Open(path)
if err != nil {
return fallback
}
defer f.Close()
scanner := bufio.NewScanner(f)
inFM := false
for scanner.Scan() {
line := scanner.Text()
if strings.TrimSpace(line) == "---" {
if !inFM {
inFM = true
continue
}
break
}
if inFM {
key, val, ok := strings.Cut(line, ":")
if ok && strings.TrimSpace(key) == "title" {
return strings.Trim(strings.TrimSpace(val), `"'`)
}
}
}
return fallback
}

View File

@@ -0,0 +1,62 @@
// ingestion/internal/wiki/inventory_test.go
package wiki
import (
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLoadInventory(t *testing.T) {
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "concepts"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "entities"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "sources"), 0o755))
require.NoError(t, os.WriteFile(
filepath.Join(dir, "wiki", "concepts", "domain-driven-design.md"),
[]byte("---\ntitle: Domain Driven Design\n---\n\n## Definition\n\nA thing.\n"),
0o644,
))
require.NoError(t, os.WriteFile(
filepath.Join(dir, "wiki", "entities", "ryan-singer.md"),
[]byte("---\ntitle: Ryan Singer\n---\n\n## Description\n\nDesigner.\n"),
0o644,
))
inv, err := LoadInventory(dir)
require.NoError(t, err)
assert.Len(t, inv[PageTypeConcept], 1)
assert.Equal(t, "domain-driven-design", inv[PageTypeConcept][0].Slug)
assert.Equal(t, "Domain Driven Design", inv[PageTypeConcept][0].Title)
assert.Len(t, inv[PageTypeEntity], 1)
assert.Equal(t, "ryan-singer", inv[PageTypeEntity][0].Slug)
assert.Empty(t, inv[PageTypeSource])
}
func TestLoadInventory_EmptyDirs(t *testing.T) {
dir := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "concepts"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "entities"), 0o755))
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki", "sources"), 0o755))
inv, err := LoadInventory(dir)
require.NoError(t, err)
assert.Empty(t, inv[PageTypeConcept])
assert.Empty(t, inv[PageTypeEntity])
assert.Empty(t, inv[PageTypeSource])
}
func TestLoadInventory_MissingDirsOk(t *testing.T) {
dir := t.TempDir()
// No wiki/ subdirs at all
inv, err := LoadInventory(dir)
require.NoError(t, err)
assert.NotNil(t, inv)
}

View File

@@ -0,0 +1,40 @@
// ingestion/internal/wiki/log.go
package wiki
import (
"fmt"
"os"
"path/filepath"
"strings"
)
// AppendLog appends one ingestion record to brain/log.md.
func AppendLog(brainDir, source string, pages, warnings []string, date string) error {
var sb strings.Builder
fmt.Fprintf(&sb, "## %s — ingest\n\n", date)
fmt.Fprintf(&sb, "- **Source:** %s\n", source)
if len(pages) > 0 {
sb.WriteString("- **Pages written:**\n")
for _, p := range pages {
fmt.Fprintf(&sb, " - %s\n", p)
}
}
if len(warnings) > 0 {
sb.WriteString("- **Warnings:**\n")
for _, w := range warnings {
fmt.Fprintf(&sb, " - %s\n", w)
}
}
sb.WriteString("\n")
logPath := filepath.Join(brainDir, "log.md")
f, err := os.OpenFile(logPath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return fmt.Errorf("open log: %w", err)
}
defer f.Close()
if _, err = f.WriteString(sb.String()); err != nil {
return fmt.Errorf("write log: %w", err)
}
return nil
}

View File

@@ -0,0 +1,120 @@
// ingestion/internal/wiki/merge.go
package wiki
import (
"fmt"
"strings"
)
var bulletSections = map[string]bool{
"Related Concepts": true,
"Related Entities": true,
"Sources": true,
"Key Claims": true,
"Entities Mentioned": true,
"Concepts Introduced or Reinforced": true,
"Chapters": true,
}
var appendSections = map[string]bool{
"Evolving Notes": true,
"Updates": true,
"Open Questions Raised": true,
"Open Questions": true,
}
type section struct {
heading string
content string
}
// Merge combines two Page values with the same path.
// Frontmatter is taken from a. Sections are merged by strategy:
// bullet sections union unique lines, append sections concatenate,
// all others keep a's version. Sections in b not present in a are appended.
func Merge(a, b Page) Page {
fmA, secsA := parseSections(a.Content)
_, secsB := parseSections(b.Content)
idx := make(map[string]int, len(secsA))
for i, s := range secsA {
idx[s.heading] = i
}
for _, sB := range secsB {
i, exists := idx[sB.heading]
if !exists {
idx[sB.heading] = len(secsA)
secsA = append(secsA, sB)
continue
}
sA := secsA[i]
switch {
case bulletSections[sB.heading]:
secsA[i].content = mergeBullets(sA.content, sB.content)
case appendSections[sB.heading]:
secsA[i].content = strings.TrimRight(sA.content, "\n") + "\n\n" + strings.TrimLeft(sB.content, "\n")
}
}
return Page{Path: a.Path, Content: rebuildContent(fmA, secsA)}
}
func parseSections(markdown string) (frontmatter string, sections []section) {
lines := strings.Split(markdown, "\n")
i := 0
if i < len(lines) && strings.TrimSpace(lines[i]) == "---" {
i++
var fmLines []string
for i < len(lines) {
if strings.TrimSpace(lines[i]) == "---" {
i++
break
}
fmLines = append(fmLines, lines[i])
i++
}
frontmatter = fmt.Sprintf("---\n%s\n---\n", strings.Join(fmLines, "\n"))
}
var cur *section
for ; i < len(lines); i++ {
line := lines[i]
if strings.HasPrefix(line, "## ") {
if cur != nil {
sections = append(sections, *cur)
}
cur = &section{heading: strings.TrimPrefix(line, "## ")}
} else if cur != nil {
cur.content += line + "\n"
}
}
if cur != nil {
sections = append(sections, *cur)
}
return
}
func rebuildContent(frontmatter string, sections []section) string {
var sb strings.Builder
sb.WriteString(frontmatter)
for _, sec := range sections {
fmt.Fprintf(&sb, "\n## %s\n\n%s", sec.heading, sec.content)
}
return sb.String()
}
func mergeBullets(a, b string) string {
seen := make(map[string]bool)
var lines []string
for _, line := range strings.Split(a+b, "\n") {
trimmed := strings.TrimSpace(line)
if trimmed == "" || seen[trimmed] {
continue
}
seen[trimmed] = true
lines = append(lines, line)
}
return strings.Join(lines, "\n") + "\n"
}

View File

@@ -0,0 +1,55 @@
// ingestion/internal/wiki/merge_test.go
package wiki
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMerge_BulletSectionsUnion(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Related Concepts\n\n- [[bar|Bar]]\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Related Concepts\n\n- [[bar|Bar]]\n- [[baz|Baz]]\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "[[bar|Bar]]")
assert.Contains(t, got.Content, "[[baz|Baz]]")
assert.Equal(t, 1, strings.Count(got.Content, "[[bar|Bar]]"))
}
func TestMerge_AppendSections(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Evolving Notes\n\nFirst note.\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Evolving Notes\n\nSecond note.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "First note.")
assert.Contains(t, got.Content, "Second note.")
}
func TestMerge_KeepFirstForOtherSections(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nFirst definition.\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nSecond definition.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "First definition.")
assert.NotContains(t, got.Content, "Second definition.")
}
func TestMerge_NewSectionFromB(t *testing.T) {
a := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Definition\n\nA thing.\n"}
b := Page{Path: "wiki/concepts/foo.md", Content: "---\ntitle: Foo\n---\n\n## Why It Matters\n\nBecause reasons.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "A thing.")
assert.Contains(t, got.Content, "Because reasons.")
}
func TestMerge_KeepsFrontmatterFromA(t *testing.T) {
a := Page{Path: "p.md", Content: "---\ntitle: A\nlast_updated: 2026-01-01\n---\n\n## Definition\n\nA.\n"}
b := Page{Path: "p.md", Content: "---\ntitle: B\nlast_updated: 2026-06-01\n---\n\n## Definition\n\nB.\n"}
got := Merge(a, b)
assert.Contains(t, got.Content, "title: A")
assert.NotContains(t, got.Content, "title: B")
}

View File

@@ -0,0 +1,28 @@
// ingestion/internal/wiki/slug.go
package wiki
import (
"strings"
"unicode"
)
// Slug converts a title to a kebab-case slug suitable for wiki filenames.
// Rules: lowercase, spaces/hyphens/underscores → hyphens, strip everything else.
func Slug(title string) string {
var b strings.Builder
prevHyphen := true // start true to trim leading hyphens
for _, r := range strings.ToLower(title) {
switch {
case r == ' ' || r == '-' || r == '_':
if !prevHyphen {
b.WriteRune('-')
prevHyphen = true
}
case unicode.IsLetter(r) || unicode.IsDigit(r):
b.WriteRune(r)
prevHyphen = false
// all other characters (apostrophes, colons, dots, etc.) are dropped
}
}
return strings.TrimRight(b.String(), "-")
}

View File

@@ -0,0 +1,29 @@
// ingestion/internal/wiki/slug_test.go
package wiki
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestSlug(t *testing.T) {
tests := []struct {
input string
want string
}{
{"Domain Driven Design", "domain-driven-design"},
{"It's Complicated", "its-complicated"},
{"gRPC", "grpc"},
{"GPT-4o", "gpt-4o"},
{"Property 1: It's Rough", "property-1-its-rough"},
{" leading spaces ", "leading-spaces"},
{"multiple spaces", "multiple-spaces"},
{"already-kebab", "already-kebab"},
}
for _, tc := range tests {
t.Run(tc.input, func(t *testing.T) {
assert.Equal(t, tc.want, Slug(tc.input))
})
}
}

View File

@@ -0,0 +1,24 @@
// ingestion/internal/wiki/types.go
package wiki
// PageType identifies the wiki subdirectory for a page.
type PageType string
const (
PageTypeConcept PageType = "concepts"
PageTypeEntity PageType = "entities"
PageTypeSource PageType = "sources"
)
// Page is a wiki page to be written to disk.
type Page struct {
Path string // relative to brainDir, e.g. "wiki/sources/foo.md"
Content string // full markdown including YAML frontmatter
}
// Entry is a summary of an existing wiki page used to build the inventory.
type Entry struct {
Slug string
Title string
Type PageType
}

View File

@@ -9,6 +9,8 @@ type Config struct {
ConfigDir string // SUPERVISOR_CONFIG_DIR, default ./config/supervisor ConfigDir string // SUPERVISOR_CONFIG_DIR, default ./config/supervisor
ModelsFile string // SUPERVISOR_MODELS_FILE, default <ConfigDir>/../models.yaml ModelsFile string // SUPERVISOR_MODELS_FILE, default <ConfigDir>/../models.yaml
IngestBaseURL string // INGEST_BASE_URL, default http://localhost:3300 IngestBaseURL string // INGEST_BASE_URL, default http://localhost:3300
IngestSvcURL string // INGEST_SVC_URL — base URL for brain_ingest (/ingest, /ingest-path)
KBRetrievalURL string // KB_RETRIEVAL_URL — base URL for brain_search
SessionsDir string // SUPERVISOR_SESSIONS_DIR, default ./brain/sessions SessionsDir string // SUPERVISOR_SESSIONS_DIR, default ./brain/sessions
BrainDir string // SUPERVISOR_BRAIN_DIR, default ./brain BrainDir string // SUPERVISOR_BRAIN_DIR, default ./brain
} }
@@ -22,6 +24,8 @@ func Load() (Config, error) {
} }
cfg.ModelsFile = envOr("SUPERVISOR_MODELS_FILE", cfg.ConfigDir+"/../models.yaml") cfg.ModelsFile = envOr("SUPERVISOR_MODELS_FILE", cfg.ConfigDir+"/../models.yaml")
cfg.IngestBaseURL = envOr("INGEST_BASE_URL", "http://localhost:3300") cfg.IngestBaseURL = envOr("INGEST_BASE_URL", "http://localhost:3300")
cfg.IngestSvcURL = envOr("INGEST_SVC_URL", "")
cfg.KBRetrievalURL = envOr("KB_RETRIEVAL_URL", "")
cfg.SessionsDir = envOr("SUPERVISOR_SESSIONS_DIR", "./brain/sessions") cfg.SessionsDir = envOr("SUPERVISOR_SESSIONS_DIR", "./brain/sessions")
cfg.BrainDir = envOr("SUPERVISOR_BRAIN_DIR", "./brain") cfg.BrainDir = envOr("SUPERVISOR_BRAIN_DIR", "./brain")
return cfg, nil return cfg, nil

View File

@@ -10,13 +10,17 @@ import (
"net/http" "net/http"
) )
// Handle dispatches brain_query and brain_write tool calls. // Handle dispatches brain tool calls.
func (s *Skill) Handle(ctx context.Context, tool string, args json.RawMessage) (json.RawMessage, error) { func (s *Skill) Handle(ctx context.Context, tool string, args json.RawMessage) (json.RawMessage, error) {
switch tool { switch tool {
case "brain_query": case "brain_query":
return s.query(ctx, args) return s.query(ctx, args)
case "brain_write": case "brain_write":
return s.write(ctx, args) return s.write(ctx, args)
case "brain_ingest":
return s.ingest(ctx, args)
case "brain_search":
return s.search(ctx, args)
default: default:
return nil, fmt.Errorf("unknown brain tool: %s", tool) return nil, fmt.Errorf("unknown brain tool: %s", tool)
} }
@@ -59,12 +63,74 @@ func (s *Skill) write(ctx context.Context, args json.RawMessage) (json.RawMessag
return s.post(ctx, "/write", a) return s.post(ctx, "/write", a)
} }
type ingestArgs struct {
Content string `json:"content,omitempty"`
Source string `json:"source,omitempty"`
Path string `json:"path,omitempty"`
DryRun bool `json:"dry_run,omitempty"`
}
func (s *Skill) ingest(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a ingestArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if s.cfg.IngestSvcURL == "" {
return nil, fmt.Errorf("brain_ingest: INGEST_SVC_URL not configured")
}
if a.Path != "" && a.Content != "" {
return nil, fmt.Errorf("path and content+source are mutually exclusive: provide one or the other")
}
if a.Path != "" {
return s.postTo(ctx, s.cfg.IngestSvcURL+"/ingest-path", map[string]any{
"path": a.Path,
"source": a.Source,
"dry_run": a.DryRun,
})
}
if a.Content != "" && a.Source != "" {
return s.postTo(ctx, s.cfg.IngestSvcURL+"/ingest", map[string]any{
"content": a.Content,
"source": a.Source,
"dry_run": a.DryRun,
})
}
return nil, fmt.Errorf("either content+source or path is required")
}
type searchArgs struct {
Query string `json:"query"`
Collection string `json:"collection,omitempty"`
Limit int `json:"limit,omitempty"`
}
func (s *Skill) search(ctx context.Context, args json.RawMessage) (json.RawMessage, error) {
var a searchArgs
if err := json.Unmarshal(args, &a); err != nil {
return nil, fmt.Errorf("parse args: %w", err)
}
if a.Query == "" {
return nil, fmt.Errorf("query is required")
}
if a.Limit == 0 {
a.Limit = 5
}
if s.cfg.KBRetrievalURL == "" {
return nil, fmt.Errorf("brain_search: KB_RETRIEVAL_URL not configured")
}
return s.postTo(ctx, s.cfg.KBRetrievalURL+"/api/v1/search", a)
}
func (s *Skill) post(ctx context.Context, path string, body any) (json.RawMessage, error) { func (s *Skill) post(ctx context.Context, path string, body any) (json.RawMessage, error) {
return s.postTo(ctx, s.cfg.IngestBaseURL+path, body)
}
func (s *Skill) postTo(ctx context.Context, url string, body any) (json.RawMessage, error) {
b, err := json.Marshal(body) b, err := json.Marshal(body)
if err != nil { if err != nil {
return nil, fmt.Errorf("marshal request: %w", err) return nil, fmt.Errorf("marshal request: %w", err)
} }
req, err := http.NewRequestWithContext(ctx, http.MethodPost, s.cfg.IngestBaseURL+path, bytes.NewReader(b)) req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(b))
if err != nil { if err != nil {
return nil, fmt.Errorf("build request: %w", err) return nil, fmt.Errorf("build request: %w", err)
} }

View File

@@ -63,3 +63,60 @@ func TestHandle_UnknownTool_ReturnsError(t *testing.T) {
_, err := s.Handle(context.Background(), "brain_unknown", nil) _, err := s.Handle(context.Background(), "brain_unknown", nil)
assert.Error(t, err) assert.Error(t, err)
} }
func TestIngest_RoutesToIngestPath(t *testing.T) {
var capturedPath string
var capturedBody map[string]any
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedPath = r.URL.Path
require.NoError(t, json.NewDecoder(r.Body).Decode(&capturedBody))
_ = json.NewEncoder(w).Encode(map[string]any{"pages": []string{"wiki/foo.md"}})
}))
defer srv.Close()
s := brain.New(brain.Config{IngestSvcURL: srv.URL})
args, _ := json.Marshal(map[string]any{"path": "/tmp/some-file.md"})
out, err := s.Handle(context.Background(), "brain_ingest", args)
require.NoError(t, err)
assert.Equal(t, "/ingest-path", capturedPath)
assert.Equal(t, "/tmp/some-file.md", capturedBody["path"])
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
pages := result["pages"].([]any)
assert.Len(t, pages, 1)
}
func TestIngest_RoutesToIngest(t *testing.T) {
var capturedPath string
var capturedBody map[string]any
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
capturedPath = r.URL.Path
require.NoError(t, json.NewDecoder(r.Body).Decode(&capturedBody))
_ = json.NewEncoder(w).Encode(map[string]any{"pages": []string{"wiki/bar.md"}})
}))
defer srv.Close()
s := brain.New(brain.Config{IngestSvcURL: srv.URL})
args, _ := json.Marshal(map[string]any{"content": "some content", "source": "my-source.md"})
out, err := s.Handle(context.Background(), "brain_ingest", args)
require.NoError(t, err)
assert.Equal(t, "/ingest", capturedPath)
assert.Equal(t, "some content", capturedBody["content"])
assert.Equal(t, "my-source.md", capturedBody["source"])
var result map[string]any
require.NoError(t, json.Unmarshal(out, &result))
pages := result["pages"].([]any)
assert.Len(t, pages, 1)
}
func TestIngest_MissingRequiredFields(t *testing.T) {
s := brain.New(brain.Config{IngestSvcURL: "http://localhost:3300"})
args, _ := json.Marshal(map[string]any{})
_, err := s.Handle(context.Background(), "brain_ingest", args)
require.Error(t, err)
assert.Contains(t, err.Error(), "either content+source or path is required")
}

View File

@@ -9,7 +9,9 @@ import (
// Config holds brain skill configuration. // Config holds brain skill configuration.
type Config struct { type Config struct {
IngestBaseURL string // base URL of the ingestion HTTP server, e.g. http://localhost:3300 IngestBaseURL string // base URL of the ingestion HTTP server (brain_query, brain_write)
IngestSvcURL string // base URL of the ingestion-svc HTTP server (brain_ingest)
KBRetrievalURL string // base URL of the kb-retrieval server (brain_search)
} }
// Skill implements registry.Skill for brain_query and brain_write. // Skill implements registry.Skill for brain_query and brain_write.
@@ -32,10 +34,10 @@ func (s *Skill) Tools() []registry.ToolDef {
str := map[string]any{"type": "string"} str := map[string]any{"type": "string"}
num := map[string]any{"type": "integer"} num := map[string]any{"type": "integer"}
return []registry.ToolDef{ tools := []registry.ToolDef{
{ {
Name: "brain_query", Name: "brain_query",
Description: "Search the hyperguild brain wiki for relevant knowledge. Call this before starting any significant task.", Description: "BM25 full-text search across brain/knowledge/ and brain/wiki/ markdown files. Fast, no embeddings needed. Call before any significant task.",
InputSchema: schema([]string{"query"}, map[string]any{ InputSchema: schema([]string{"query"}, map[string]any{
"query": str, "query": str,
"limit": num, "limit": num,
@@ -43,7 +45,7 @@ func (s *Skill) Tools() []registry.ToolDef {
}, },
{ {
Name: "brain_write", Name: "brain_write",
Description: "Write a raw knowledge note to the brain for later ingestion into the wiki.", Description: "Write a raw knowledge note to brain/knowledge/ for later ingestion.",
InputSchema: schema([]string{"content"}, map[string]any{ InputSchema: schema([]string{"content"}, map[string]any{
"content": str, "content": str,
"type": str, "type": str,
@@ -52,4 +54,32 @@ func (s *Skill) Tools() []registry.ToolDef {
}), }),
}, },
} }
if s.cfg.IngestSvcURL != "" {
tools = append(tools, registry.ToolDef{
Name: "brain_ingest",
Description: "Ingest content into the brain wiki (brain/wiki/). Calls an LLM to produce structured wiki pages. " +
"Use for substantial documents, articles, or knowledge worth structuring. " +
"Provide EITHER (a) path — absolute path to a file or directory, " +
"OR (b) content + source — raw text and a human-readable name. " +
"Providing both is an error. Returns the list of wiki pages written.",
InputSchema: schema([]string{}, map[string]any{
"content": map[string]any{"type": "string", "description": "raw text to ingest; required when path is not set"},
"source": map[string]any{"type": "string", "description": "human-readable name for the content, e.g. 'shape-up-book'; required when path is not set"},
"path": map[string]any{"type": "string", "description": "absolute path to a file or directory to ingest; mutually exclusive with content+source"},
"dry_run": map[string]any{"type": "boolean"},
}),
})
}
if s.cfg.KBRetrievalURL != "" {
tools = append(tools, registry.ToolDef{
Name: "brain_search",
Description: "Semantic vector search across the brain wiki using embeddings. Use when brain_query returns no results or you need conceptually-related results rather than keyword matches.",
InputSchema: schema([]string{"query"}, map[string]any{
"query": str,
"collection": str,
"limit": num,
}),
})
}
return tools
} }