// Package graphstore stores the brain knowledge graph (entities + // directed edges) in PostgreSQL on the shared postgres18 instance, // alongside the pgvector embeddings in [vectorstore]. // // Schema (created idempotently by Init): // // brain_entities(slug PK, type, wing, hall, doc_path, title, updated_at) // brain_edges(id PK, src_slug FK, dst_slug, edge_type, src_doc, src_line, // weight, updated_at) // // Edges fan-out from a source document; calling [PGStore.ReplaceEdgesForDoc] // replaces every edge previously emitted from that document so re-ingest is // idempotent without bookkeeping. // // All slug strings are stored verbatim — callers are expected to canonicalise // before persisting. Dst slugs may reference entities that don't yet exist // (dangling edges); resolution is deferred to query time so ingestion order // doesn't matter. package graphstore import ( "context" "errors" "fmt" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/mathiasbq/hyperguild/ingestion/internal/graph" ) // PGStore is the postgres-backed brain knowledge-graph store. Construct // with New + call Init once to create tables and indexes. Use Close to // release the pool. type PGStore struct { pool *pgxpool.Pool } // New opens a pgxpool against dsn and pings to verify connectivity. The // caller owns the resulting PGStore and must invoke Close. func New(ctx context.Context, dsn string) (*PGStore, error) { pool, err := pgxpool.New(ctx, dsn) if err != nil { return nil, fmt.Errorf("pgxpool: %w", err) } if err := pool.Ping(ctx); err != nil { pool.Close() return nil, fmt.Errorf("ping: %w", err) } return &PGStore{pool: pool}, nil } // Close releases the underlying connection pool. func (s *PGStore) Close() { if s.pool != nil { s.pool.Close() } } // Init creates brain_entities + brain_edges tables and their indexes if // they don't yet exist. Safe to call on every startup. No-op when the // schema already matches. func (s *PGStore) Init(ctx context.Context) error { const ddl = ` CREATE TABLE IF NOT EXISTS brain_entities ( slug TEXT PRIMARY KEY, type TEXT NOT NULL DEFAULT 'knowledge', wing TEXT NOT NULL DEFAULT '', hall TEXT NOT NULL DEFAULT '', doc_path TEXT NOT NULL, title TEXT NOT NULL DEFAULT '', updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS brain_entities_wing_idx ON brain_entities (wing) WHERE wing <> ''; CREATE INDEX IF NOT EXISTS brain_entities_type_idx ON brain_entities (type); CREATE TABLE IF NOT EXISTS brain_edges ( id BIGSERIAL PRIMARY KEY, src_slug TEXT NOT NULL, dst_slug TEXT NOT NULL, edge_type TEXT NOT NULL DEFAULT 'wikilink', src_doc TEXT NOT NULL, src_line INTEGER NOT NULL DEFAULT 0, weight REAL NOT NULL DEFAULT 1.0, updated_at TIMESTAMPTZ NOT NULL DEFAULT now() ); CREATE INDEX IF NOT EXISTS brain_edges_src_idx ON brain_edges (src_slug, edge_type); CREATE INDEX IF NOT EXISTS brain_edges_dst_idx ON brain_edges (dst_slug, edge_type); CREATE INDEX IF NOT EXISTS brain_edges_src_doc_idx ON brain_edges (src_doc); ` _, err := s.pool.Exec(ctx, ddl) return err } // UpsertEntity inserts or updates one entity by slug. func (s *PGStore) UpsertEntity(ctx context.Context, e graph.Entity) error { if e.Slug == "" { return errors.New("entity slug is required") } if e.Type == "" { e.Type = "knowledge" } _, err := s.pool.Exec(ctx, ` INSERT INTO brain_entities (slug, type, wing, hall, doc_path, title, updated_at) VALUES ($1, $2, $3, $4, $5, $6, now()) ON CONFLICT (slug) DO UPDATE SET type = EXCLUDED.type, wing = EXCLUDED.wing, hall = EXCLUDED.hall, doc_path = EXCLUDED.doc_path, title = EXCLUDED.title, updated_at = now() `, e.Slug, e.Type, e.Wing, e.Hall, e.DocPath, e.Title) if err != nil { return fmt.Errorf("upsert entity %q: %w", e.Slug, err) } return nil } // ReplaceEdgesForDoc deletes every edge previously emitted from docPath // and inserts the new set in one transaction. Caller should pass the // complete edge set for the doc — partial updates are not supported. func (s *PGStore) ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error { if docPath == "" { return errors.New("doc path is required") } tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return fmt.Errorf("begin: %w", err) } defer func() { _ = tx.Rollback(ctx) }() if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil { return fmt.Errorf("delete prior edges for %q: %w", docPath, err) } for _, e := range edges { if e.SrcSlug == "" || e.DstSlug == "" { continue } if _, err := tx.Exec(ctx, ` INSERT INTO brain_edges (src_slug, dst_slug, edge_type, src_doc, src_line, weight) VALUES ($1, $2, $3, $4, $5, 1.0) `, e.SrcSlug, e.DstSlug, e.EdgeType, e.SrcDoc, e.SrcLine); err != nil { return fmt.Errorf("insert edge %s->%s: %w", e.SrcSlug, e.DstSlug, err) } } if err := tx.Commit(ctx); err != nil { return fmt.Errorf("commit: %w", err) } return nil } // DeleteByDoc removes the entity at docPath and every edge it sourced. // Use when a wiki page is deleted on disk. func (s *PGStore) DeleteByDoc(ctx context.Context, docPath string) error { if docPath == "" { return errors.New("doc path is required") } tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{}) if err != nil { return fmt.Errorf("begin: %w", err) } defer func() { _ = tx.Rollback(ctx) }() if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil { return fmt.Errorf("delete edges: %w", err) } if _, err := tx.Exec(ctx, `DELETE FROM brain_entities WHERE doc_path = $1`, docPath); err != nil { return fmt.Errorf("delete entity: %w", err) } return tx.Commit(ctx) } // Neighbor is one row in a Neighbors / Subgraph response. type Neighbor struct { Slug string Type string Wing string Hall string DocPath string Title string EdgeType string Distance int // hop count from origin; 1 for direct neighbors } // Neighbors returns the direct (1-hop) outgoing neighbours of slug. // edgeType filters by relationship kind; "" returns all kinds. // limit defaults to 25 when <= 0. func (s *PGStore) Neighbors(ctx context.Context, slug, edgeType string, limit int) ([]Neighbor, error) { if slug == "" { return nil, errors.New("slug is required") } if limit <= 0 { limit = 25 } q := ` SELECT e.dst_slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''), COALESCE(t.doc_path,''), COALESCE(t.title,''), e.edge_type, 1 FROM brain_edges e LEFT JOIN brain_entities t ON t.slug = e.dst_slug WHERE e.src_slug = $1 AND ($2 = '' OR e.edge_type = $2) ORDER BY e.updated_at DESC LIMIT $3 ` rows, err := s.pool.Query(ctx, q, slug, edgeType, limit) if err != nil { return nil, fmt.Errorf("query neighbors: %w", err) } defer rows.Close() return scanNeighbors(rows) } // Subgraph returns every distinct slug reachable from origin within // depth outgoing hops, annotated with the shortest hop distance. The // origin itself is omitted. depth defaults to 2 when <= 0; values // above 6 are clamped to 6 to bound traversal cost. func (s *PGStore) Subgraph(ctx context.Context, origin string, depth int) ([]Neighbor, error) { if origin == "" { return nil, errors.New("origin slug is required") } if depth <= 0 { depth = 2 } if depth > 6 { depth = 6 } q := ` WITH RECURSIVE walk(slug, edge_type, distance) AS ( SELECT e.dst_slug, e.edge_type, 1 FROM brain_edges e WHERE e.src_slug = $1 UNION SELECT e.dst_slug, e.edge_type, w.distance + 1 FROM walk w JOIN brain_edges e ON e.src_slug = w.slug WHERE w.distance < $2 ) SELECT w.slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''), COALESCE(t.doc_path,''), COALESCE(t.title,''), w.edge_type, MIN(w.distance) FROM walk w LEFT JOIN brain_entities t ON t.slug = w.slug WHERE w.slug <> $1 GROUP BY w.slug, t.type, t.wing, t.hall, t.doc_path, t.title, w.edge_type ORDER BY MIN(w.distance), w.slug ` rows, err := s.pool.Query(ctx, q, origin, depth) if err != nil { return nil, fmt.Errorf("query subgraph: %w", err) } defer rows.Close() return scanNeighbors(rows) } // PathStep is one hop in a Path response. type PathStep struct { FromSlug string ToSlug string EdgeType string } // Path returns the shortest directed path from src to dst within // maxDepth hops, as an ordered list of edges. Empty slice means no // path exists. maxDepth defaults to 4 when <= 0; values above 8 are // clamped to 8. func (s *PGStore) Path(ctx context.Context, src, dst string, maxDepth int) ([]PathStep, error) { if src == "" || dst == "" { return nil, errors.New("src and dst are required") } if maxDepth <= 0 { maxDepth = 4 } if maxDepth > 8 { maxDepth = 8 } q := ` WITH RECURSIVE walk(cur, path_slugs, path_edges, distance) AS ( SELECT e.dst_slug, ARRAY[e.src_slug, e.dst_slug]::TEXT[], ARRAY[e.edge_type]::TEXT[], 1 FROM brain_edges e WHERE e.src_slug = $1 UNION ALL SELECT e.dst_slug, w.path_slugs || e.dst_slug, w.path_edges || e.edge_type, w.distance + 1 FROM walk w JOIN brain_edges e ON e.src_slug = w.cur WHERE w.distance < $3 AND NOT (e.dst_slug = ANY(w.path_slugs)) ) SELECT path_slugs, path_edges FROM walk WHERE cur = $2 ORDER BY distance ASC LIMIT 1 ` row := s.pool.QueryRow(ctx, q, src, dst, maxDepth) var ( slugs []string kinds []string ) if err := row.Scan(&slugs, &kinds); err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, nil } return nil, fmt.Errorf("scan path: %w", err) } if len(slugs) < 2 || len(kinds) == 0 { return nil, nil } steps := make([]PathStep, 0, len(kinds)) for i := 0; i < len(kinds) && i+1 < len(slugs); i++ { steps = append(steps, PathStep{ FromSlug: slugs[i], ToSlug: slugs[i+1], EdgeType: kinds[i], }) } return steps, nil } // CountEdges is a debug helper — returns the total edges currently stored. // Used by tests and by the volume-gate diagnostic. func (s *PGStore) CountEdges(ctx context.Context) (int64, error) { var n int64 err := s.pool.QueryRow(ctx, `SELECT count(*) FROM brain_edges`).Scan(&n) return n, err } func scanNeighbors(rows pgx.Rows) ([]Neighbor, error) { var out []Neighbor for rows.Next() { var n Neighbor if err := rows.Scan( &n.Slug, &n.Type, &n.Wing, &n.Hall, &n.DocPath, &n.Title, &n.EdgeType, &n.Distance, ); err != nil { return nil, fmt.Errorf("scan: %w", err) } out = append(out, n) } return out, rows.Err() }