Schema-only change. DDL adds tier + topic on fresh tables and uses ADD COLUMN IF NOT EXISTS on existing tables (idempotent across pod restarts). New conditional indexes match the wing/hall pattern. No behavior change in this commit — UpsertEntity still writes only the original columns; tier + topic stay '' on every row. M2 plumbs the parser through. The empty default means existing queries are untouched until the rest of the chain lands. Part of infra#72 — brain DIKW tier redesign.
364 lines
11 KiB
Go
364 lines
11 KiB
Go
// Package graphstore stores the brain knowledge graph (entities +
|
|
// directed edges) in PostgreSQL on the shared postgres18 instance,
|
|
// alongside the pgvector embeddings in [vectorstore].
|
|
//
|
|
// Schema (created idempotently by Init):
|
|
//
|
|
// brain_entities(slug PK, type, wing, hall, doc_path, title, updated_at)
|
|
// brain_edges(id PK, src_slug FK, dst_slug, edge_type, src_doc, src_line,
|
|
// weight, updated_at)
|
|
//
|
|
// Edges fan-out from a source document; calling [PGStore.ReplaceEdgesForDoc]
|
|
// replaces every edge previously emitted from that document so re-ingest is
|
|
// idempotent without bookkeeping.
|
|
//
|
|
// All slug strings are stored verbatim — callers are expected to canonicalise
|
|
// before persisting. Dst slugs may reference entities that don't yet exist
|
|
// (dangling edges); resolution is deferred to query time so ingestion order
|
|
// doesn't matter.
|
|
package graphstore
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/mathiasbq/hyperguild/ingestion/internal/graph"
|
|
)
|
|
|
|
// PGStore is the postgres-backed brain knowledge-graph store. Construct
|
|
// with New + call Init once to create tables and indexes. Use Close to
|
|
// release the pool.
|
|
type PGStore struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
// New opens a pgxpool against dsn and pings to verify connectivity. The
|
|
// caller owns the resulting PGStore and must invoke Close.
|
|
func New(ctx context.Context, dsn string) (*PGStore, error) {
|
|
pool, err := pgxpool.New(ctx, dsn)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("pgxpool: %w", err)
|
|
}
|
|
if err := pool.Ping(ctx); err != nil {
|
|
pool.Close()
|
|
return nil, fmt.Errorf("ping: %w", err)
|
|
}
|
|
return &PGStore{pool: pool}, nil
|
|
}
|
|
|
|
// Close releases the underlying connection pool.
|
|
func (s *PGStore) Close() {
|
|
if s.pool != nil {
|
|
s.pool.Close()
|
|
}
|
|
}
|
|
|
|
// Init creates brain_entities + brain_edges tables and their indexes if
|
|
// they don't yet exist. Safe to call on every startup. No-op when the
|
|
// schema already matches.
|
|
func (s *PGStore) Init(ctx context.Context) error {
|
|
const ddl = `
|
|
CREATE TABLE IF NOT EXISTS brain_entities (
|
|
slug TEXT PRIMARY KEY,
|
|
type TEXT NOT NULL DEFAULT 'knowledge',
|
|
wing TEXT NOT NULL DEFAULT '',
|
|
hall TEXT NOT NULL DEFAULT '',
|
|
doc_path TEXT NOT NULL,
|
|
title TEXT NOT NULL DEFAULT '',
|
|
tier TEXT NOT NULL DEFAULT '',
|
|
topic TEXT NOT NULL DEFAULT '',
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
-- Idempotent migration for clusters created before the DIKW tier
|
|
-- redesign (infra#72). ADD COLUMN IF NOT EXISTS is safe across
|
|
-- repeated startups.
|
|
ALTER TABLE brain_entities
|
|
ADD COLUMN IF NOT EXISTS tier TEXT NOT NULL DEFAULT '',
|
|
ADD COLUMN IF NOT EXISTS topic TEXT NOT NULL DEFAULT '';
|
|
CREATE INDEX IF NOT EXISTS brain_entities_wing_idx
|
|
ON brain_entities (wing) WHERE wing <> '';
|
|
CREATE INDEX IF NOT EXISTS brain_entities_type_idx
|
|
ON brain_entities (type);
|
|
CREATE INDEX IF NOT EXISTS brain_entities_tier_idx
|
|
ON brain_entities (tier) WHERE tier <> '';
|
|
CREATE INDEX IF NOT EXISTS brain_entities_topic_idx
|
|
ON brain_entities (topic) WHERE topic <> '';
|
|
|
|
CREATE TABLE IF NOT EXISTS brain_edges (
|
|
id BIGSERIAL PRIMARY KEY,
|
|
src_slug TEXT NOT NULL,
|
|
dst_slug TEXT NOT NULL,
|
|
edge_type TEXT NOT NULL DEFAULT 'wikilink',
|
|
src_doc TEXT NOT NULL,
|
|
src_line INTEGER NOT NULL DEFAULT 0,
|
|
weight REAL NOT NULL DEFAULT 1.0,
|
|
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
|
);
|
|
CREATE INDEX IF NOT EXISTS brain_edges_src_idx
|
|
ON brain_edges (src_slug, edge_type);
|
|
CREATE INDEX IF NOT EXISTS brain_edges_dst_idx
|
|
ON brain_edges (dst_slug, edge_type);
|
|
CREATE INDEX IF NOT EXISTS brain_edges_src_doc_idx
|
|
ON brain_edges (src_doc);
|
|
`
|
|
_, err := s.pool.Exec(ctx, ddl)
|
|
return err
|
|
}
|
|
|
|
// UpsertEntity inserts or updates one entity by slug.
|
|
func (s *PGStore) UpsertEntity(ctx context.Context, e graph.Entity) error {
|
|
if e.Slug == "" {
|
|
return errors.New("entity slug is required")
|
|
}
|
|
if e.Type == "" {
|
|
e.Type = "knowledge"
|
|
}
|
|
_, err := s.pool.Exec(ctx, `
|
|
INSERT INTO brain_entities (slug, type, wing, hall, doc_path, title, updated_at)
|
|
VALUES ($1, $2, $3, $4, $5, $6, now())
|
|
ON CONFLICT (slug) DO UPDATE
|
|
SET type = EXCLUDED.type,
|
|
wing = EXCLUDED.wing,
|
|
hall = EXCLUDED.hall,
|
|
doc_path = EXCLUDED.doc_path,
|
|
title = EXCLUDED.title,
|
|
updated_at = now()
|
|
`, e.Slug, e.Type, e.Wing, e.Hall, e.DocPath, e.Title)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert entity %q: %w", e.Slug, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReplaceEdgesForDoc deletes every edge previously emitted from docPath
|
|
// and inserts the new set in one transaction. Caller should pass the
|
|
// complete edge set for the doc — partial updates are not supported.
|
|
func (s *PGStore) ReplaceEdgesForDoc(ctx context.Context, docPath string, edges []graph.Edge) error {
|
|
if docPath == "" {
|
|
return errors.New("doc path is required")
|
|
}
|
|
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("begin: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
|
|
return fmt.Errorf("delete prior edges for %q: %w", docPath, err)
|
|
}
|
|
for _, e := range edges {
|
|
if e.SrcSlug == "" || e.DstSlug == "" {
|
|
continue
|
|
}
|
|
if _, err := tx.Exec(ctx, `
|
|
INSERT INTO brain_edges (src_slug, dst_slug, edge_type, src_doc, src_line, weight)
|
|
VALUES ($1, $2, $3, $4, $5, 1.0)
|
|
`, e.SrcSlug, e.DstSlug, e.EdgeType, e.SrcDoc, e.SrcLine); err != nil {
|
|
return fmt.Errorf("insert edge %s->%s: %w", e.SrcSlug, e.DstSlug, err)
|
|
}
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return fmt.Errorf("commit: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteByDoc removes the entity at docPath and every edge it sourced.
|
|
// Use when a wiki page is deleted on disk.
|
|
func (s *PGStore) DeleteByDoc(ctx context.Context, docPath string) error {
|
|
if docPath == "" {
|
|
return errors.New("doc path is required")
|
|
}
|
|
tx, err := s.pool.BeginTx(ctx, pgx.TxOptions{})
|
|
if err != nil {
|
|
return fmt.Errorf("begin: %w", err)
|
|
}
|
|
defer func() { _ = tx.Rollback(ctx) }()
|
|
|
|
if _, err := tx.Exec(ctx, `DELETE FROM brain_edges WHERE src_doc = $1`, docPath); err != nil {
|
|
return fmt.Errorf("delete edges: %w", err)
|
|
}
|
|
if _, err := tx.Exec(ctx, `DELETE FROM brain_entities WHERE doc_path = $1`, docPath); err != nil {
|
|
return fmt.Errorf("delete entity: %w", err)
|
|
}
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
// Neighbor is one row in a Neighbors / Subgraph response.
|
|
type Neighbor struct {
|
|
Slug string
|
|
Type string
|
|
Wing string
|
|
Hall string
|
|
DocPath string
|
|
Title string
|
|
EdgeType string
|
|
Distance int // hop count from origin; 1 for direct neighbors
|
|
}
|
|
|
|
// Neighbors returns the direct (1-hop) outgoing neighbours of slug.
|
|
// edgeType filters by relationship kind; "" returns all kinds.
|
|
// limit defaults to 25 when <= 0.
|
|
func (s *PGStore) Neighbors(ctx context.Context, slug, edgeType string, limit int) ([]Neighbor, error) {
|
|
if slug == "" {
|
|
return nil, errors.New("slug is required")
|
|
}
|
|
if limit <= 0 {
|
|
limit = 25
|
|
}
|
|
q := `
|
|
SELECT e.dst_slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
|
|
COALESCE(t.doc_path,''), COALESCE(t.title,''), e.edge_type, 1
|
|
FROM brain_edges e
|
|
LEFT JOIN brain_entities t ON t.slug = e.dst_slug
|
|
WHERE e.src_slug = $1
|
|
AND ($2 = '' OR e.edge_type = $2)
|
|
ORDER BY e.updated_at DESC
|
|
LIMIT $3
|
|
`
|
|
rows, err := s.pool.Query(ctx, q, slug, edgeType, limit)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query neighbors: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanNeighbors(rows)
|
|
}
|
|
|
|
// Subgraph returns every distinct slug reachable from origin within
|
|
// depth outgoing hops, annotated with the shortest hop distance. The
|
|
// origin itself is omitted. depth defaults to 2 when <= 0; values
|
|
// above 6 are clamped to 6 to bound traversal cost.
|
|
func (s *PGStore) Subgraph(ctx context.Context, origin string, depth int) ([]Neighbor, error) {
|
|
if origin == "" {
|
|
return nil, errors.New("origin slug is required")
|
|
}
|
|
if depth <= 0 {
|
|
depth = 2
|
|
}
|
|
if depth > 6 {
|
|
depth = 6
|
|
}
|
|
q := `
|
|
WITH RECURSIVE walk(slug, edge_type, distance) AS (
|
|
SELECT e.dst_slug, e.edge_type, 1
|
|
FROM brain_edges e
|
|
WHERE e.src_slug = $1
|
|
UNION
|
|
SELECT e.dst_slug, e.edge_type, w.distance + 1
|
|
FROM walk w
|
|
JOIN brain_edges e ON e.src_slug = w.slug
|
|
WHERE w.distance < $2
|
|
)
|
|
SELECT w.slug, COALESCE(t.type,''), COALESCE(t.wing,''), COALESCE(t.hall,''),
|
|
COALESCE(t.doc_path,''), COALESCE(t.title,''), w.edge_type, MIN(w.distance)
|
|
FROM walk w
|
|
LEFT JOIN brain_entities t ON t.slug = w.slug
|
|
WHERE w.slug <> $1
|
|
GROUP BY w.slug, t.type, t.wing, t.hall, t.doc_path, t.title, w.edge_type
|
|
ORDER BY MIN(w.distance), w.slug
|
|
`
|
|
rows, err := s.pool.Query(ctx, q, origin, depth)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query subgraph: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
return scanNeighbors(rows)
|
|
}
|
|
|
|
// PathStep is one hop in a Path response.
|
|
type PathStep struct {
|
|
FromSlug string
|
|
ToSlug string
|
|
EdgeType string
|
|
}
|
|
|
|
// Path returns the shortest directed path from src to dst within
|
|
// maxDepth hops, as an ordered list of edges. Empty slice means no
|
|
// path exists. maxDepth defaults to 4 when <= 0; values above 8 are
|
|
// clamped to 8.
|
|
func (s *PGStore) Path(ctx context.Context, src, dst string, maxDepth int) ([]PathStep, error) {
|
|
if src == "" || dst == "" {
|
|
return nil, errors.New("src and dst are required")
|
|
}
|
|
if maxDepth <= 0 {
|
|
maxDepth = 4
|
|
}
|
|
if maxDepth > 8 {
|
|
maxDepth = 8
|
|
}
|
|
q := `
|
|
WITH RECURSIVE walk(cur, path_slugs, path_edges, distance) AS (
|
|
SELECT e.dst_slug,
|
|
ARRAY[e.src_slug, e.dst_slug]::TEXT[],
|
|
ARRAY[e.edge_type]::TEXT[],
|
|
1
|
|
FROM brain_edges e
|
|
WHERE e.src_slug = $1
|
|
UNION ALL
|
|
SELECT e.dst_slug,
|
|
w.path_slugs || e.dst_slug,
|
|
w.path_edges || e.edge_type,
|
|
w.distance + 1
|
|
FROM walk w
|
|
JOIN brain_edges e ON e.src_slug = w.cur
|
|
WHERE w.distance < $3
|
|
AND NOT (e.dst_slug = ANY(w.path_slugs))
|
|
)
|
|
SELECT path_slugs, path_edges
|
|
FROM walk
|
|
WHERE cur = $2
|
|
ORDER BY distance ASC
|
|
LIMIT 1
|
|
`
|
|
row := s.pool.QueryRow(ctx, q, src, dst, maxDepth)
|
|
var (
|
|
slugs []string
|
|
kinds []string
|
|
)
|
|
if err := row.Scan(&slugs, &kinds); err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("scan path: %w", err)
|
|
}
|
|
if len(slugs) < 2 || len(kinds) == 0 {
|
|
return nil, nil
|
|
}
|
|
steps := make([]PathStep, 0, len(kinds))
|
|
for i := 0; i < len(kinds) && i+1 < len(slugs); i++ {
|
|
steps = append(steps, PathStep{
|
|
FromSlug: slugs[i],
|
|
ToSlug: slugs[i+1],
|
|
EdgeType: kinds[i],
|
|
})
|
|
}
|
|
return steps, nil
|
|
}
|
|
|
|
// CountEdges is a debug helper — returns the total edges currently stored.
|
|
// Used by tests and by the volume-gate diagnostic.
|
|
func (s *PGStore) CountEdges(ctx context.Context) (int64, error) {
|
|
var n int64
|
|
err := s.pool.QueryRow(ctx, `SELECT count(*) FROM brain_edges`).Scan(&n)
|
|
return n, err
|
|
}
|
|
|
|
func scanNeighbors(rows pgx.Rows) ([]Neighbor, error) {
|
|
var out []Neighbor
|
|
for rows.Next() {
|
|
var n Neighbor
|
|
if err := rows.Scan(
|
|
&n.Slug, &n.Type, &n.Wing, &n.Hall,
|
|
&n.DocPath, &n.Title, &n.EdgeType, &n.Distance,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scan: %w", err)
|
|
}
|
|
out = append(out, n)
|
|
}
|
|
return out, rows.Err()
|
|
}
|