diff --git a/ingestion/internal/graph/extract.go b/ingestion/internal/graph/extract.go new file mode 100644 index 0000000..57550cd --- /dev/null +++ b/ingestion/internal/graph/extract.go @@ -0,0 +1,200 @@ +// Package graph extracts entity + edge records from brain markdown +// documents for the brain_entities / brain_edges relational graph. +// +// The extractor is pure: it takes markdown bytes and a document path and +// returns the entity (one per doc) and the wikilink edges (zero or more) +// it found, with source line numbers so the graph store can record +// provenance. +// +// Edge types in v1: only "wikilink" — derived from [[slug]] and +// [[slug|Display]] occurrences in the body. Section-header edges are +// deferred (see infra#62 grill addendum). +package graph + +import ( + "bufio" + "bytes" + "path/filepath" + "regexp" + "strings" +) + +// Entity represents one brain document for graph indexing. +// +// Slug is the basename without ".md" — the same identity used by +// wiki canonicalization and the wikilink target syntax. +// +// Type categorises the doc into a coarse bucket so callers can filter +// graph traversals (e.g. "only entity nodes"). When the doc lives +// under brain/wiki///, Wing and Hall capture the +// taxonomy; otherwise they're empty (legacy brain/knowledge/ docs). +type Entity struct { + DocPath string // forward-slash, relative to brainDir + Slug string + Type string // "concept" | "entity" | "source" | "hall" | "knowledge" + Wing string // optional; from frontmatter or path + Hall string // optional; from frontmatter or path + Title string // optional; from frontmatter +} + +// Edge represents a directed relationship between two slugs. +// +// SrcLine is the 1-indexed line in the source document where the link +// was found, so callers can re-find the linking text after an edit. +type Edge struct { + SrcDoc string // forward-slash, relative to brainDir + SrcSlug string // == Entity.Slug for SrcDoc + DstSlug string + EdgeType string // "wikilink" in v1 + SrcLine int // 1-indexed +} + +// linkRE matches both [[slug]] and [[slug|Display Name]] wikilinks. +// Group 1 is the slug; group 2 (if present) is the display. +var linkRE = regexp.MustCompile(`\[\[([^\]|]+)(?:\|([^\]]+))?\]\]`) + +// Extract parses one markdown document and returns its Entity plus the +// outgoing wikilink Edges. docPath is forward-slash, relative to +// brainDir; content is the raw markdown bytes. +// +// Returns ok=false when docPath does not yield a usable slug (e.g. +// non-markdown file slipped through). +func Extract(docPath string, content []byte) (Entity, []Edge, bool) { + slug := slugFromPath(docPath) + if slug == "" { + return Entity{}, nil, false + } + ent := Entity{DocPath: docPath, Slug: slug} + classifyByPath(&ent, docPath) + readFrontmatter(&ent, content) + + edges := extractEdges(docPath, slug, content) + return ent, edges, true +} + +func slugFromPath(docPath string) string { + base := filepath.Base(docPath) + if !strings.HasSuffix(base, ".md") { + return "" + } + return strings.TrimSuffix(base, ".md") +} + +// classifyByPath fills Type / Wing / Hall from the path layout when the +// doc lives under brain/wiki/. Layout: wiki///.md +// or wiki//.md for the legacy concept/entity/source dirs. +func classifyByPath(e *Entity, docPath string) { + parts := strings.Split(docPath, "/") + if len(parts) < 2 || parts[0] != "wiki" { + e.Type = "knowledge" + return + } + switch parts[1] { + case "concepts": + e.Type = "concept" + case "entities": + e.Type = "entity" + case "sources": + e.Type = "source" + default: + // wiki///.md + e.Type = "hall" + e.Wing = parts[1] + if len(parts) >= 4 { + e.Hall = parts[2] + } + } +} + +// readFrontmatter pulls title/wing/hall from a YAML frontmatter block. +// Frontmatter is optional; missing fields leave the entity unchanged. +func readFrontmatter(e *Entity, content []byte) { + scanner := bufio.NewScanner(bytes.NewReader(content)) + inFM := false + for scanner.Scan() { + line := scanner.Text() + if strings.TrimSpace(line) == "---" { + if !inFM { + inFM = true + continue + } + return + } + if !inFM { + return + } + key, val, ok := strings.Cut(line, ":") + if !ok { + continue + } + v := strings.Trim(strings.TrimSpace(val), `"'`) + switch strings.TrimSpace(key) { + case "title": + if e.Title == "" { + e.Title = v + } + case "wing": + if e.Wing == "" { + e.Wing = v + } + case "hall": + if e.Hall == "" { + e.Hall = v + } + } + } +} + +func extractEdges(docPath, srcSlug string, content []byte) []Edge { + var edges []Edge + seen := make(map[string]struct{}) // dedupe (dst, line) + scanner := bufio.NewScanner(bytes.NewReader(content)) + line := 0 + for scanner.Scan() { + line++ + matches := linkRE.FindAllStringSubmatch(scanner.Text(), -1) + for _, m := range matches { + dst := strings.TrimSpace(m[1]) + if dst == "" || dst == srcSlug { + continue + } + key := dst + "|" + itoa(line) + if _, dup := seen[key]; dup { + continue + } + seen[key] = struct{}{} + edges = append(edges, Edge{ + SrcDoc: docPath, + SrcSlug: srcSlug, + DstSlug: dst, + EdgeType: "wikilink", + SrcLine: line, + }) + } + } + return edges +} + +// itoa avoids the fmt dependency on a hot path. Single-digit fast path +// keeps overhead negligible for typical line counts. +func itoa(n int) string { + if n == 0 { + return "0" + } + var buf [20]byte + i := len(buf) + neg := n < 0 + if neg { + n = -n + } + for n > 0 { + i-- + buf[i] = byte('0' + n%10) + n /= 10 + } + if neg { + i-- + buf[i] = '-' + } + return string(buf[i:]) +} diff --git a/ingestion/internal/graph/extract_test.go b/ingestion/internal/graph/extract_test.go new file mode 100644 index 0000000..c785837 --- /dev/null +++ b/ingestion/internal/graph/extract_test.go @@ -0,0 +1,106 @@ +package graph + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtract_HallDoc(t *testing.T) { + content := []byte(`--- +wing: jepa-fx +hall: decisions +title: Val Vol Decision +--- +# Val Vol + +See also [[other-decision]] and [[parent-concept|Parent Concept]]. + +Linking to [[unrelated]]. +`) + + ent, edges, ok := Extract("wiki/jepa-fx/decisions/val-vol.md", content) + require.True(t, ok) + assert.Equal(t, "val-vol", ent.Slug) + assert.Equal(t, "hall", ent.Type) + assert.Equal(t, "jepa-fx", ent.Wing) + assert.Equal(t, "decisions", ent.Hall) + assert.Equal(t, "Val Vol Decision", ent.Title) + + require.Len(t, edges, 3) + assert.Equal(t, "other-decision", edges[0].DstSlug) + assert.Equal(t, "parent-concept", edges[1].DstSlug) + assert.Equal(t, "unrelated", edges[2].DstSlug) + for _, e := range edges { + assert.Equal(t, "wikilink", e.EdgeType) + assert.Equal(t, "val-vol", e.SrcSlug) + assert.Equal(t, "wiki/jepa-fx/decisions/val-vol.md", e.SrcDoc) + assert.Greater(t, e.SrcLine, 0) + } +} + +func TestExtract_LegacyConceptDoc(t *testing.T) { + content := []byte(`--- +title: Hash Encoding +--- +# Hash Encoding + +Linked to [[financial-sentiment-analysis|FSA]]. +`) + ent, edges, ok := Extract("wiki/concepts/hash-encoding.md", content) + require.True(t, ok) + assert.Equal(t, "hash-encoding", ent.Slug) + assert.Equal(t, "concept", ent.Type) + assert.Empty(t, ent.Wing) + assert.Empty(t, ent.Hall) + assert.Equal(t, "Hash Encoding", ent.Title) + + require.Len(t, edges, 1) + assert.Equal(t, "financial-sentiment-analysis", edges[0].DstSlug) +} + +func TestExtract_KnowledgeDoc(t *testing.T) { + content := []byte("# No frontmatter, no links here.\n") + ent, edges, ok := Extract("knowledge/some-note.md", content) + require.True(t, ok) + assert.Equal(t, "some-note", ent.Slug) + assert.Equal(t, "knowledge", ent.Type) + assert.Empty(t, edges) +} + +func TestExtract_DedupesRepeatedLinkOnSameLine(t *testing.T) { + content := []byte("See [[foo]] and [[foo]] again on the same line.\n") + _, edges, ok := Extract("knowledge/dup.md", content) + require.True(t, ok) + require.Len(t, edges, 1) + assert.Equal(t, "foo", edges[0].DstSlug) +} + +func TestExtract_KeepsMultipleEdgesOnDifferentLines(t *testing.T) { + content := []byte("First mention [[foo]].\n\nSecond mention [[foo]].\n") + _, edges, ok := Extract("knowledge/multi.md", content) + require.True(t, ok) + require.Len(t, edges, 2) + assert.NotEqual(t, edges[0].SrcLine, edges[1].SrcLine) +} + +func TestExtract_IgnoresSelfLinks(t *testing.T) { + content := []byte("Self-reference [[self]] should be ignored.\n") + _, edges, ok := Extract("knowledge/self.md", content) + require.True(t, ok) + assert.Empty(t, edges) +} + +func TestExtract_RejectsNonMarkdown(t *testing.T) { + _, _, ok := Extract("wiki/concepts/not-markdown.txt", []byte("anything")) + assert.False(t, ok) +} + +func TestExtract_LineNumbersAre1Indexed(t *testing.T) { + content := []byte("line 1\nline 2 [[bar]]\n") + _, edges, ok := Extract("knowledge/lines.md", content) + require.True(t, ok) + require.Len(t, edges, 1) + assert.Equal(t, 2, edges[0].SrcLine) +} diff --git a/ingestion/internal/graphstore/pg.go b/ingestion/internal/graphstore/pg.go new file mode 100644 index 0000000..9061cfe --- /dev/null +++ b/ingestion/internal/graphstore/pg.go @@ -0,0 +1,351 @@ +// Package graphstore stores the brain knowledge graph (entities + +// directed edges) in PostgreSQL on the shared postgres18 instance, +// alongside the pgvector embeddings in [vectorstore]. +// +// Schema (created idempotently by Init): +// +// brain_entities(slug PK, type, wing, hall, doc_path, title, updated_at) +// brain_edges(id PK, src_slug FK, dst_slug, edge_type, src_doc, src_line, +// weight, updated_at) +// +// Edges fan-out from a source document; calling [PGStore.ReplaceEdgesForDoc] +// replaces every edge previously emitted from that document so re-ingest is +// idempotent without bookkeeping. +// +// All slug strings are stored verbatim — callers are expected to canonicalise +// before persisting. Dst slugs may reference entities that don't yet exist +// (dangling edges); resolution is deferred to query time so ingestion order +// doesn't matter. +package graphstore + +import ( + "context" + "errors" + "fmt" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + + "github.com/mathiasbq/hyperguild/ingestion/internal/graph" +) + +// PGStore is the postgres-backed brain knowledge-graph store. Construct +// with New + call Init once to create tables and indexes. Use Close to +// release the pool. +type PGStore struct { + pool *pgxpool.Pool +} + +// New opens a pgxpool against dsn and pings to verify connectivity. The +// caller owns the resulting PGStore and must invoke Close. +func New(ctx context.Context, dsn string) (*PGStore, error) { + pool, err := pgxpool.New(ctx, dsn) + if err != nil { + return nil, fmt.Errorf("pgxpool: %w", err) + } + if err := pool.Ping(ctx); err != nil { + pool.Close() + return nil, fmt.Errorf("ping: %w", err) + } + return &PGStore{pool: pool}, nil +} + +// Close releases the underlying connection pool. +func (s *PGStore) Close() { + if s.pool != nil { + s.pool.Close() + } +} + +// Init creates brain_entities + brain_edges tables and their indexes if +// they don't yet exist. Safe to call on every startup. No-op when the +// schema already matches. +func (s *PGStore) Init(ctx context.Context) error { + const ddl = ` +CREATE TABLE IF NOT EXISTS brain_entities ( + slug TEXT PRIMARY KEY, + type TEXT NOT NULL DEFAULT 'knowledge', + wing TEXT NOT NULL DEFAULT '', + hall TEXT NOT NULL DEFAULT '', + doc_path TEXT NOT NULL, + title TEXT NOT NULL DEFAULT '', + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS brain_entities_wing_idx + ON brain_entities (wing) WHERE wing <> ''; +CREATE INDEX IF NOT EXISTS brain_entities_type_idx + ON brain_entities (type); + +CREATE TABLE IF NOT EXISTS brain_edges ( + id BIGSERIAL PRIMARY KEY, + src_slug TEXT NOT NULL, + dst_slug TEXT NOT NULL, + edge_type TEXT NOT NULL DEFAULT 'wikilink', + src_doc TEXT NOT NULL, + src_line INTEGER NOT NULL DEFAULT 0, + weight REAL NOT NULL DEFAULT 1.0, + updated_at TIMESTAMPTZ NOT NULL DEFAULT now() +); +CREATE INDEX IF NOT EXISTS brain_edges_src_idx + ON brain_edges (src_slug, edge_type); +CREATE INDEX IF NOT EXISTS brain_edges_dst_idx + ON brain_edges (dst_slug, edge_type); +CREATE INDEX IF NOT EXISTS brain_edges_src_doc_idx + ON brain_edges (src_doc); +` + _, err := s.pool.Exec(ctx, ddl) + return err +} + +// UpsertEntity inserts or updates one entity by slug. +func (s *PGStore) UpsertEntity(ctx context.Context, e graph.Entity) error { + if e.Slug == "" { + return errors.New("entity slug is required") + } + if e.Type == "" { + e.Type = "knowledge" + } + _, err := s.pool.Exec(ctx, ` + INSERT INTO brain_entities (slug, type, wing, hall, doc_path, title, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, now()) + ON CONFLICT (slug) DO UPDATE + SET type = EXCLUDED.type, + wing = EXCLUDED.wing, + hall = EXCLUDED.hall, + doc_path = EXCLUDED.doc_path, + title = EXCLUDED.title, + updated_at = now() + `, e.Slug, e.Type, e.Wing, e.Hall, e.DocPath, e.Title) + if err != nil { + return fmt.Errorf("upsert entity %q: %w", e.Slug, err) + } + return nil +} + +// ReplaceEdgesForDoc deletes every edge previously emitted from docPath +// and inserts the new set in one transaction. Caller should pass the +// complete edge set for the doc — partial updates are not supported. +func (s *PGStore) ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error { + if docPath == "" { + return errors.New("doc path is required") + } + tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return fmt.Errorf("begin: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil { + return fmt.Errorf("delete prior edges for %q: %w", docPath, err) + } + for _, e := range edges { + if e.SrcSlug == "" || e.DstSlug == "" { + continue + } + if _, err := tx.Exec(ctx, ` + INSERT INTO brain_edges (src_slug, dst_slug, edge_type, src_doc, src_line, weight) + VALUES ($1, $2, $3, $4, $5, 1.0) + `, e.SrcSlug, e.DstSlug, e.EdgeType, e.SrcDoc, e.SrcLine); err != nil { + return fmt.Errorf("insert edge %s->%s: %w", e.SrcSlug, e.DstSlug, err) + } + } + if err := tx.Commit(ctx); err != nil { + return fmt.Errorf("commit: %w", err) + } + return nil +} + +// DeleteByDoc removes the entity at docPath and every edge it sourced. +// Use when a wiki page is deleted on disk. +func (s *PGStore) DeleteByDoc(ctx context.Context, docPath string) error { + if docPath == "" { + return errors.New("doc path is required") + } + tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{}) + if err != nil { + return fmt.Errorf("begin: %w", err) + } + defer func() { _ = tx.Rollback(ctx) }() + + if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil { + return fmt.Errorf("delete edges: %w", err) + } + if _, err := tx.Exec(ctx, `DELETE FROM brain_entities WHERE doc_path = $1`, docPath); err != nil { + return fmt.Errorf("delete entity: %w", err) + } + return tx.Commit(ctx) +} + +// Neighbor is one row in a Neighbors / Subgraph response. +type Neighbor struct { + Slug string + Type string + Wing string + Hall string + DocPath string + Title string + EdgeType string + Distance int // hop count from origin; 1 for direct neighbors +} + +// Neighbors returns the direct (1-hop) outgoing neighbours of slug. +// edgeType filters by relationship kind; "" returns all kinds. +// limit defaults to 25 when <= 0. +func (s *PGStore) Neighbors(ctx context.Context, slug, edgeType string, limit int) ([]Neighbor, error) { + if slug == "" { + return nil, errors.New("slug is required") + } + if limit <= 0 { + limit = 25 + } + q := ` +SELECT e.dst_slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''), + COALESCE(t.doc_path,''), COALESCE(t.title,''), e.edge_type, 1 +FROM brain_edges e +LEFT JOIN brain_entities t ON t.slug = e.dst_slug +WHERE e.src_slug = $1 + AND ($2 = '' OR e.edge_type = $2) +ORDER BY e.updated_at DESC +LIMIT $3 +` + rows, err := s.pool.Query(ctx, q, slug, edgeType, limit) + if err != nil { + return nil, fmt.Errorf("query neighbors: %w", err) + } + defer rows.Close() + return scanNeighbors(rows) +} + +// Subgraph returns every distinct slug reachable from origin within +// depth outgoing hops, annotated with the shortest hop distance. The +// origin itself is omitted. depth defaults to 2 when <= 0; values +// above 6 are clamped to 6 to bound traversal cost. +func (s *PGStore) Subgraph(ctx context.Context, origin string, depth int) ([]Neighbor, error) { + if origin == "" { + return nil, errors.New("origin slug is required") + } + if depth <= 0 { + depth = 2 + } + if depth > 6 { + depth = 6 + } + q := ` +WITH RECURSIVE walk(slug, edge_type, distance) AS ( + SELECT e.dst_slug, e.edge_type, 1 + FROM brain_edges e + WHERE e.src_slug = $1 + UNION + SELECT e.dst_slug, e.edge_type, w.distance + 1 + FROM walk w + JOIN brain_edges e ON e.src_slug = w.slug + WHERE w.distance < $2 +) +SELECT w.slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''), + COALESCE(t.doc_path,''), COALESCE(t.title,''), w.edge_type, MIN(w.distance) +FROM walk w +LEFT JOIN brain_entities t ON t.slug = w.slug +WHERE w.slug <> $1 +GROUP BY w.slug, t.type, t.wing, t.hall, t.doc_path, t.title, w.edge_type +ORDER BY MIN(w.distance), w.slug +` + rows, err := s.pool.Query(ctx, q, origin, depth) + if err != nil { + return nil, fmt.Errorf("query subgraph: %w", err) + } + defer rows.Close() + return scanNeighbors(rows) +} + +// PathStep is one hop in a Path response. +type PathStep struct { + FromSlug string + ToSlug string + EdgeType string +} + +// Path returns the shortest directed path from src to dst within +// maxDepth hops, as an ordered list of edges. Empty slice means no +// path exists. maxDepth defaults to 4 when <= 0; values above 8 are +// clamped to 8. +func (s *PGStore) Path(ctx context.Context, src, dst string, maxDepth int) ([]PathStep, error) { + if src == "" || dst == "" { + return nil, errors.New("src and dst are required") + } + if maxDepth <= 0 { + maxDepth = 4 + } + if maxDepth > 8 { + maxDepth = 8 + } + q := ` +WITH RECURSIVE walk(cur, path_slugs, path_edges, distance) AS ( + SELECT e.dst_slug, + ARRAY[e.src_slug, e.dst_slug]::TEXT[], + ARRAY[e.edge_type]::TEXT[], + 1 + FROM brain_edges e + WHERE e.src_slug = $1 + UNION ALL + SELECT e.dst_slug, + w.path_slugs || e.dst_slug, + w.path_edges || e.edge_type, + w.distance + 1 + FROM walk w + JOIN brain_edges e ON e.src_slug = w.cur + WHERE w.distance < $3 + AND NOT (e.dst_slug = ANY(w.path_slugs)) +) +SELECT path_slugs, path_edges +FROM walk +WHERE cur = $2 +ORDER BY distance ASC +LIMIT 1 +` + row := s.pool.QueryRow(ctx, q, src, dst, maxDepth) + var ( + slugs []string + kinds []string + ) + if err := row.Scan(&slugs, &kinds); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("scan path: %w", err) + } + if len(slugs) < 2 || len(kinds) == 0 { + return nil, nil + } + steps := make([]PathStep, 0, len(kinds)) + for i := 0; i < len(kinds) && i+1 < len(slugs); i++ { + steps = append(steps, PathStep{ + FromSlug: slugs[i], + ToSlug: slugs[i+1], + EdgeType: kinds[i], + }) + } + return steps, nil +} + +// CountEdges is a debug helper — returns the total edges currently stored. +// Used by tests and by the volume-gate diagnostic. +func (s *PGStore) CountEdges(ctx context.Context) (int64, error) { + var n int64 + err := s.pool.QueryRow(ctx, `SELECT count(*) FROM brain_edges`).Scan(&n) + return n, err +} + +func scanNeighbors(rows pgx.Rows) ([]Neighbor, error) { + var out []Neighbor + for rows.Next() { + var n Neighbor + if err := rows.Scan( + &n.Slug, &n.Type, &n.Wing, &n.Hall, + &n.DocPath, &n.Title, &n.EdgeType, &n.Distance, + ); err != nil { + return nil, fmt.Errorf("scan: %w", err) + } + out = append(out, n) + } + return out, rows.Err() +}