feat(brain): hybrid BM25 + pgvector retrieval (opt-in)
Wires nomic-embed-text (iguana ollama) + pgvector on the shared
postgres18 into brain_query / brain_answer via Reciprocal Rank Fusion.
Pure BM25 stays the default; setting BRAIN_PG_DSN and BRAIN_EMBED_URL
together opts in. Setting one without the other is misconfiguration →
exit 1.
New packages:
- internal/embed
Client.Embed(ctx, text) → []float32 via POST {URL}/api/embed.
Defaults to nomic-embed-text:latest (768 dim). nil-on-empty-URL so
callers gate on a single nil check.
- internal/vectorstore
PGStore wraps a pgxpool against postgres18. Init creates
brain_embeddings(path PK, vector(768), updated_at) + HNSW cosine
index idempotently. Upsert / Delete / Search / KnownPaths.
Sync(brainDir, store, embedder) diffs brain/wiki/ against the store
and upserts new files / deletes removed ones; StartSync runs it on
a ticker (default 300s). Integration tests gated by BRAIN_PG_TEST_DSN.
- scripts/brain-embeddings-init.sql
One-time DBA setup: brain DB, brain_app role, vector extension,
GRANTs. Idempotent.
Search layer:
- search.QueryOptions gains Vector + Embedder fields.
- QueryContext is the cancellable variant; Query stays for callers.
- When both are set, BM25 (top-N) and pgvector (top-4N) candidates
merge via Reciprocal Rank Fusion (k=60, Cormack et al. 2009 — no
tuning knob, robust to scale differences between rankers).
- Vector-only hits are hydrated from disk so callers see uniform
Result records (path, title, excerpt, wing, hall, score).
- Wing/hall filters still apply to vector candidates via path-prefix.
- On embedder/vector errors the search falls back to BM25 — embedding
outage degrades quality but doesn't take the brain offline.
MCP wiring:
- mcp.Server.WithHybridRetrieval(v, e) opt-in setter, same shape as
WithReranker.
- brainQuery and brainAnswer pass the wired vector/embedder through
to search.QueryContext.
REST:
- POST /backfill-embeddings drives Sync synchronously. Returns
{added, deleted, errors[]}. 503 when feature is unconfigured.
cmd/server/main.go:
- BRAIN_PG_DSN + BRAIN_EMBED_URL together enable hybrid; one alone
→ exit 1.
- vectorAdapter bridges *PGStore (returns []Hit) to
search.VectorSearcher (which takes []VectorHit) without either
package importing the other.
- BRAIN_EMBED_SYNC_INTERVAL (default 300s) controls the background
Sync ticker.
Backend pivot from Qdrant to pgvector recorded in DECISIONS.md
2026-05-18 (supersedes 2026-04-08): postgres18 already runs in
databases/ ns, Qdrant was never deployed, one engine beats two.
Dependency: github.com/jackc/pgx/v5 — modern, native pgvector via
parametric vector literals.
Tests:
- embed.Client: empty-URL nil, request shape, dimension, upstream
error propagation, empty-text rejection.
- vectorstore.PGStore: dimension validation (unit); upsert/search/
KnownPaths (integration, BRAIN_PG_TEST_DSN-gated).
- vectorstore.Sync: adds new files, skips known, deletes
disappeared, skips _index.md, no-op when nil, collects embedder
errors.
- search.Query: hybrid promotes vector-only hits via RRF; falls
back to BM25 on embedder error.
Closes hyperguild#8.
This commit is contained in:
@@ -15,12 +15,32 @@ import (
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/auth"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/llm"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/mcp"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/embed"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/oauth"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/reranker"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/reranker"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/watcher"
|
||||
)
|
||||
|
||||
// vectorAdapter bridges *vectorstore.PGStore (returns []vectorstore.Hit)
|
||||
// to the search.VectorSearcher interface (which uses []search.VectorHit).
|
||||
// Kept here, not in either package, so neither has to import the other.
|
||||
type vectorAdapter struct{ s *vectorstore.PGStore }
|
||||
|
||||
func (a vectorAdapter) Search(ctx context.Context, q []float32, limit int) ([]search.VectorHit, error) {
|
||||
hits, err := a.s.Search(ctx, q, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out := make([]search.VectorHit, len(hits))
|
||||
for i, h := range hits {
|
||||
out[i] = search.VectorHit{Path: h.Path, Distance: h.Distance}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func envOr(key, fallback string) string {
|
||||
if v := os.Getenv(key); v != "" {
|
||||
return v
|
||||
@@ -84,6 +104,37 @@ func main() {
|
||||
logger.Info("brain reranker configured", "url", rerankURL, "model", rerankModel)
|
||||
}
|
||||
|
||||
// Hybrid retrieval (pgvector + nomic-embed-text). Both env vars must
|
||||
// be set together for the path to wire on; otherwise BM25-only.
|
||||
var vectorStore *vectorstore.PGStore
|
||||
pgDSN := os.Getenv("BRAIN_PG_DSN")
|
||||
embedURL := os.Getenv("BRAIN_EMBED_URL")
|
||||
switch {
|
||||
case pgDSN != "" && embedURL != "":
|
||||
embedModel := envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest")
|
||||
store, err := vectorstore.New(context.Background(), pgDSN)
|
||||
if err != nil {
|
||||
logger.Error("vector store init", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
if err := store.Init(context.Background()); err != nil {
|
||||
logger.Error("vector store migrate", "err", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
vectorStore = store
|
||||
embedder := embed.New(embedURL, embedModel)
|
||||
mcpSrv = mcpSrv.WithHybridRetrieval(vectorAdapter{s: store}, embedder)
|
||||
h.WithEmbedSync(store, embedder)
|
||||
logger.Info("brain hybrid retrieval enabled",
|
||||
"pg", pgDSN[:strings.IndexByte(pgDSN+"@", '@')], // crude redaction
|
||||
"embed_url", embedURL, "embed_model", embedModel)
|
||||
case pgDSN == "" && embedURL == "":
|
||||
// disabled — fine
|
||||
default:
|
||||
logger.Error("BRAIN_PG_DSN and BRAIN_EMBED_URL must be set together")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
mcpToken := os.Getenv("BRAIN_MCP_TOKEN")
|
||||
if mcpToken == "" {
|
||||
logger.Error("BRAIN_MCP_TOKEN not set")
|
||||
@@ -98,6 +149,14 @@ func main() {
|
||||
Pipeline: pipelineCfg,
|
||||
})
|
||||
}
|
||||
if vectorStore != nil {
|
||||
embedSyncInterval := envInt("BRAIN_EMBED_SYNC_INTERVAL", 300)
|
||||
vectorstore.StartSync(ctx, brainDir, vectorStore,
|
||||
embed.New(os.Getenv("BRAIN_EMBED_URL"),
|
||||
envOr("BRAIN_EMBED_MODEL", "nomic-embed-text:latest")),
|
||||
time.Duration(embedSyncInterval)*time.Second)
|
||||
logger.Info("embed sync started", "interval_s", embedSyncInterval)
|
||||
}
|
||||
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("POST /query", h.Query)
|
||||
@@ -107,6 +166,7 @@ func main() {
|
||||
mux.HandleFunc("POST /ingest-path", h.IngestPath)
|
||||
mux.HandleFunc("POST /ingest-raw", h.IngestRaw)
|
||||
mux.HandleFunc("POST /backfill-refs", h.BackfillRefs)
|
||||
mux.HandleFunc("POST /backfill-embeddings", h.BackfillEmbeddings)
|
||||
mux.HandleFunc("GET /pass-rate", h.PassRate)
|
||||
var jwtValidator *auth.Validator
|
||||
if dexURL := os.Getenv("DEX_ISSUER_URL"); dexURL != "" {
|
||||
|
||||
@@ -11,6 +11,10 @@ require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 // indirect
|
||||
github.com/goccy/go-json v0.10.3 // indirect
|
||||
github.com/jackc/pgpassfile v1.0.0 // indirect
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
|
||||
github.com/jackc/pgx/v5 v5.9.2 // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.2 // indirect
|
||||
github.com/lestrrat-go/blackmagic v1.0.3 // indirect
|
||||
github.com/lestrrat-go/httpcc v1.0.1 // indirect
|
||||
github.com/lestrrat-go/httprc v1.0.6 // indirect
|
||||
@@ -19,6 +23,8 @@ require (
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/segmentio/asm v1.2.0 // indirect
|
||||
golang.org/x/crypto v0.32.0 // indirect
|
||||
golang.org/x/sync v0.17.0 // indirect
|
||||
golang.org/x/sys v0.31.0 // indirect
|
||||
golang.org/x/text v0.29.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
||||
|
||||
@@ -5,6 +5,14 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0 h1:NMZiJj8QnKe1LgsbDayM4UoHwbvw
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.4.0/go.mod h1:ZXNYxsqcloTdSy/rNShjYzMhyjf0LaoftYK0p+A3h40=
|
||||
github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA=
|
||||
github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M=
|
||||
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
|
||||
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo=
|
||||
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM=
|
||||
github.com/jackc/pgx/v5 v5.9.2 h1:3ZhOzMWnR4yJ+RW1XImIPsD1aNSz4T4fyP7zlQb56hw=
|
||||
github.com/jackc/pgx/v5 v5.9.2/go.mod h1:mal1tBGAFfLHvZzaYh77YS/eC6IX9OWbRV1QIIM0Jn4=
|
||||
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
|
||||
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/lestrrat-go/blackmagic v1.0.3 h1:94HXkVLxkZO9vJI/w2u1T0DAoprShFd13xtnSINtDWs=
|
||||
github.com/lestrrat-go/blackmagic v1.0.3/go.mod h1:6AWFyKNNj0zEXQYfTMPfZrAXUWUfTIZ5ECEUEJaijtw=
|
||||
github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE=
|
||||
@@ -22,16 +30,23 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
||||
github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys=
|
||||
github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
||||
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
|
||||
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
|
||||
golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug=
|
||||
golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
|
||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk=
|
||||
golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
||||
@@ -15,13 +15,16 @@ import (
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/extract"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore"
|
||||
)
|
||||
|
||||
// Handler serves the ingestion HTTP API.
|
||||
type Handler struct {
|
||||
brainDir string
|
||||
logger *slog.Logger
|
||||
pipeline pipeline.Config
|
||||
brainDir string
|
||||
logger *slog.Logger
|
||||
pipeline pipeline.Config
|
||||
embedStore vectorstore.Store
|
||||
embedClient vectorstore.Embedder
|
||||
}
|
||||
|
||||
// NewHandler constructs a Handler. brainDir is the absolute path to brain/.
|
||||
@@ -32,6 +35,14 @@ func NewHandler(brainDir string, logger *slog.Logger, pipelineCfg pipeline.Confi
|
||||
return &Handler{brainDir: brainDir, logger: logger, pipeline: pipelineCfg}
|
||||
}
|
||||
|
||||
// WithEmbedSync wires the optional vector store + embedder used by the
|
||||
// POST /backfill-embeddings endpoint. Calling with either nil is a no-op.
|
||||
func (h *Handler) WithEmbedSync(store vectorstore.Store, embedder vectorstore.Embedder) *Handler {
|
||||
h.embedStore = store
|
||||
h.embedClient = embedder
|
||||
return h
|
||||
}
|
||||
|
||||
type queryRequest struct {
|
||||
Query string `json:"query"`
|
||||
Limit int `json:"limit,omitempty"`
|
||||
@@ -230,6 +241,32 @@ func (h *Handler) Write(w http.ResponseWriter, r *http.Request) {
|
||||
writeJSON(w, map[string]string{"path": relPath})
|
||||
}
|
||||
|
||||
// BackfillEmbeddings handles POST /backfill-embeddings — synchronously
|
||||
// embeds every note under brain/wiki/ that's not yet in the vector
|
||||
// store, and deletes rows for files no longer on disk.
|
||||
func (h *Handler) BackfillEmbeddings(w http.ResponseWriter, r *http.Request) {
|
||||
if h.embedStore == nil || h.embedClient == nil {
|
||||
writeError(w, http.StatusServiceUnavailable,
|
||||
"embeddings not configured (set BRAIN_PG_DSN and BRAIN_EMBED_URL)")
|
||||
return
|
||||
}
|
||||
res, err := vectorstore.Sync(r.Context(), h.brainDir, h.embedStore, h.embedClient)
|
||||
if err != nil {
|
||||
h.logger.Error("backfill failed", "err", err)
|
||||
writeError(w, http.StatusInternalServerError, "backfill error")
|
||||
return
|
||||
}
|
||||
errStrs := make([]string, 0, len(res.Errors))
|
||||
for _, e := range res.Errors {
|
||||
errStrs = append(errStrs, e.Error())
|
||||
}
|
||||
writeJSON(w, map[string]any{
|
||||
"added": res.Added,
|
||||
"deleted": res.Deleted,
|
||||
"errors": errStrs,
|
||||
})
|
||||
}
|
||||
|
||||
type indexRequest struct {
|
||||
Wing string `json:"wing,omitempty"`
|
||||
}
|
||||
|
||||
76
ingestion/internal/embed/embed.go
Normal file
76
ingestion/internal/embed/embed.go
Normal file
@@ -0,0 +1,76 @@
|
||||
// Package embed produces dense vector embeddings for brain content.
|
||||
//
|
||||
// Wire format is Ollama's `/api/embed`, with the canonical request shape
|
||||
// `{"model": "...", "input": "..."}` and a 2-D `embeddings` response.
|
||||
// Default deployment runs `nomic-embed-text` on iguana, which returns
|
||||
// 768-dim vectors compatible with the brain_embeddings table schema.
|
||||
package embed
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Client posts embedding requests to an Ollama-compatible endpoint.
|
||||
type Client struct {
|
||||
URL string
|
||||
Model string
|
||||
HTTP *http.Client
|
||||
}
|
||||
|
||||
// New constructs a Client. Returns nil when url is empty so callers can
|
||||
// treat a missing BRAIN_EMBED_URL as "feature disabled" via a single nil
|
||||
// check.
|
||||
func New(url, model string) *Client {
|
||||
if url == "" {
|
||||
return nil
|
||||
}
|
||||
return &Client{
|
||||
URL: strings.TrimRight(url, "/"),
|
||||
Model: model,
|
||||
HTTP: &http.Client{Timeout: 30 * time.Second},
|
||||
}
|
||||
}
|
||||
|
||||
// Embed returns the embedding vector for text. Empty text is rejected
|
||||
// up-front to keep upstream errors from masking caller mistakes.
|
||||
func (c *Client) Embed(ctx context.Context, text string) ([]float32, error) {
|
||||
if strings.TrimSpace(text) == "" {
|
||||
return nil, fmt.Errorf("embed: empty text")
|
||||
}
|
||||
reqBody, _ := json.Marshal(map[string]any{
|
||||
"model": c.Model,
|
||||
"input": text,
|
||||
})
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost,
|
||||
c.URL+"/api/embed", bytes.NewReader(reqBody))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
resp, err := c.HTTP.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
if resp.StatusCode/100 != 2 {
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
return nil, fmt.Errorf("embed: status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
var out struct {
|
||||
Embeddings [][]float32 `json:"embeddings"`
|
||||
}
|
||||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||||
return nil, fmt.Errorf("embed: decode: %w", err)
|
||||
}
|
||||
if len(out.Embeddings) == 0 || len(out.Embeddings[0]) == 0 {
|
||||
return nil, fmt.Errorf("embed: empty embeddings in response")
|
||||
}
|
||||
return out.Embeddings[0], nil
|
||||
}
|
||||
74
ingestion/internal/embed/embed_test.go
Normal file
74
ingestion/internal/embed/embed_test.go
Normal file
@@ -0,0 +1,74 @@
|
||||
package embed_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/embed"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestNew_EmptyURLReturnsNil(t *testing.T) {
|
||||
assert.Nil(t, embed.New("", "model"))
|
||||
}
|
||||
|
||||
func TestEmbed_ReturnsVector(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
assert.Equal(t, "/api/embed", r.URL.Path)
|
||||
var req map[string]any
|
||||
require.NoError(t, json.NewDecoder(r.Body).Decode(&req))
|
||||
assert.Equal(t, "nomic", req["model"])
|
||||
assert.Equal(t, "hello", req["input"])
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{
|
||||
"embeddings": [][]float32{{0.1, 0.2, 0.3}},
|
||||
})
|
||||
}))
|
||||
defer srv.Close()
|
||||
|
||||
c := embed.New(srv.URL, "nomic")
|
||||
require.NotNil(t, c)
|
||||
v, err := c.Embed(context.Background(), "hello")
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []float32{0.1, 0.2, 0.3}, v)
|
||||
}
|
||||
|
||||
func TestEmbed_StripsTrailingSlashFromURL(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
assert.Equal(t, "/api/embed", r.URL.Path)
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"embeddings": [][]float32{{1.0}}})
|
||||
}))
|
||||
defer srv.Close()
|
||||
c := embed.New(srv.URL+"/", "nomic")
|
||||
_, err := c.Embed(context.Background(), "x")
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestEmbed_PropagatesUpstreamError(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusBadGateway)
|
||||
}))
|
||||
defer srv.Close()
|
||||
c := embed.New(srv.URL, "m")
|
||||
_, err := c.Embed(context.Background(), "x")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestEmbed_RejectsEmptyEmbeddingsArray(t *testing.T) {
|
||||
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
_ = json.NewEncoder(w).Encode(map[string]any{"embeddings": [][]float32{}})
|
||||
}))
|
||||
defer srv.Close()
|
||||
c := embed.New(srv.URL, "m")
|
||||
_, err := c.Embed(context.Background(), "x")
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestEmbed_RejectsEmptyText(t *testing.T) {
|
||||
c := embed.New("http://127.0.0.1:1", "m")
|
||||
_, err := c.Embed(context.Background(), "")
|
||||
require.Error(t, err)
|
||||
}
|
||||
@@ -144,11 +144,13 @@ func (s *Server) brainQuery(ctx context.Context, args json.RawMessage) (json.Raw
|
||||
if a.Limit == 0 {
|
||||
a.Limit = 5
|
||||
}
|
||||
results, err := search.Query(s.brainDir, search.QueryOptions{
|
||||
Query: a.Query,
|
||||
Limit: a.Limit,
|
||||
Wing: a.Wing,
|
||||
Hall: a.Hall,
|
||||
results, err := search.QueryContext(ctx, s.brainDir, search.QueryOptions{
|
||||
Query: a.Query,
|
||||
Limit: a.Limit,
|
||||
Wing: a.Wing,
|
||||
Hall: a.Hall,
|
||||
Vector: s.vector,
|
||||
Embedder: s.embedder,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("search: %w", err)
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/pipeline"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/reranker"
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/search"
|
||||
)
|
||||
|
||||
type request struct {
|
||||
@@ -39,6 +40,8 @@ type Server struct {
|
||||
llm pipeline.CompleteFunc
|
||||
answerLLM pipeline.CompleteFunc // nil = brain_answer and brain_classify unavailable
|
||||
reranker *reranker.Client // nil = no rerank, BM25 top-10 → LLM
|
||||
vector search.VectorSearcher // nil = BM25-only retrieval
|
||||
embedder search.Embedder // nil = BM25-only retrieval
|
||||
}
|
||||
|
||||
// NewServer constructs a Server bound to brainDir. pipelineCfg supplies the
|
||||
@@ -61,6 +64,15 @@ func (s *Server) WithReranker(r *reranker.Client) *Server {
|
||||
return s
|
||||
}
|
||||
|
||||
// WithHybridRetrieval wires the embedding store and embedder so
|
||||
// brain_query and brain_answer run BM25 + pgvector merged via RRF
|
||||
// instead of BM25 alone. Either nil disables hybrid mode.
|
||||
func (s *Server) WithHybridRetrieval(v search.VectorSearcher, e search.Embedder) *Server {
|
||||
s.vector = v
|
||||
s.embedder = e
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// MCP streamable HTTP: GET establishes the SSE stream for server-to-client events.
|
||||
if r.Method == http.MethodGet {
|
||||
|
||||
@@ -67,7 +67,12 @@ func (s *Server) brainAnswer(ctx context.Context, args json.RawMessage) (json.Ra
|
||||
if s.reranker != nil {
|
||||
bm25Limit = 20
|
||||
}
|
||||
results, err := search.Query(s.brainDir, search.QueryOptions{Query: a.Query, Limit: bm25Limit})
|
||||
results, err := search.QueryContext(ctx, s.brainDir, search.QueryOptions{
|
||||
Query: a.Query,
|
||||
Limit: bm25Limit,
|
||||
Vector: s.vector,
|
||||
Embedder: s.embedder,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("search: %w", err)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package search
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
@@ -13,6 +14,26 @@ import (
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/brain"
|
||||
)
|
||||
|
||||
// VectorSearcher returns the top-limit nearest paths by cosine
|
||||
// distance. The vectorstore package implements this against pgvector.
|
||||
type VectorSearcher interface {
|
||||
Search(ctx context.Context, query []float32, limit int) ([]VectorHit, error)
|
||||
}
|
||||
|
||||
// VectorHit is a single path + distance pair from a vector search.
|
||||
// Re-declared here (rather than imported) to keep search package
|
||||
// free of vectorstore/embed deps and to make stubbing trivial in tests.
|
||||
type VectorHit struct {
|
||||
Path string
|
||||
Distance float64
|
||||
}
|
||||
|
||||
// Embedder turns a query string into a dense vector. The embed package
|
||||
// implements this against Ollama's /api/embed.
|
||||
type Embedder interface {
|
||||
Embed(ctx context.Context, text string) ([]float32, error)
|
||||
}
|
||||
|
||||
// Result is a single search hit from the brain wiki.
|
||||
type Result struct {
|
||||
Path string `json:"path"`
|
||||
@@ -29,16 +50,30 @@ type Result struct {
|
||||
// When Hall is additionally set, the walk is restricted to
|
||||
// brain/wiki/<wing>/<hall>/. Without either, the legacy walk over
|
||||
// brain/knowledge/ and brain/wiki/ is used.
|
||||
//
|
||||
// When both Vector and Embedder are non-nil, results are computed
|
||||
// hybridly: BM25 and vector candidate lists are merged via Reciprocal
|
||||
// Rank Fusion. With either nil the function falls back to BM25 only,
|
||||
// keeping behaviour unchanged for callers that have not opted in.
|
||||
type QueryOptions struct {
|
||||
Query string
|
||||
Limit int
|
||||
Wing string
|
||||
Hall string
|
||||
Query string
|
||||
Limit int
|
||||
Wing string
|
||||
Hall string
|
||||
Vector VectorSearcher
|
||||
Embedder Embedder
|
||||
}
|
||||
|
||||
// Query searches the brain. Returns up to opts.Limit results sorted by
|
||||
// score descending. Empty query returns nil.
|
||||
func Query(brainDir string, opts QueryOptions) ([]Result, error) {
|
||||
return QueryContext(context.Background(), brainDir, opts)
|
||||
}
|
||||
|
||||
// QueryContext is the cancellable variant of Query. Hybrid retrieval
|
||||
// requires a context because both the embedder and the vector store are
|
||||
// network calls.
|
||||
func QueryContext(ctx context.Context, brainDir string, opts QueryOptions) ([]Result, error) {
|
||||
if opts.Limit <= 0 {
|
||||
opts.Limit = 5
|
||||
}
|
||||
@@ -102,12 +137,108 @@ func Query(brainDir string, opts QueryOptions) ([]Result, error) {
|
||||
sort.Slice(results, func(i, j int) bool {
|
||||
return results[i].Score > results[j].Score
|
||||
})
|
||||
|
||||
// Hybrid scoring kicks in only when both the embedder and the
|
||||
// vector store are wired and BM25 actually returned candidates.
|
||||
if opts.Vector != nil && opts.Embedder != nil && len(results) > 0 {
|
||||
merged, err := hybridMerge(ctx, brainDir, opts, results)
|
||||
if err != nil {
|
||||
slog.Warn("search: hybrid merge failed, falling back to BM25", "err", err)
|
||||
} else {
|
||||
results = merged
|
||||
}
|
||||
}
|
||||
|
||||
if len(results) > opts.Limit {
|
||||
results = results[:opts.Limit]
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// rrfK is the constant in the Reciprocal Rank Fusion formula. 60 is
|
||||
// standard (Cormack et al. 2009) and parameter-free in practice.
|
||||
const rrfK = 60.0
|
||||
|
||||
// hybridMerge embeds the query, runs a vector search, and merges its
|
||||
// candidates with the BM25 list via Reciprocal Rank Fusion. Results
|
||||
// that came only from the vector side are hydrated by reading the
|
||||
// note's frontmatter for title/wing/hall and excerpting the body.
|
||||
//
|
||||
// rrf(d) = sum_r 1 / (k + rank_r(d)) over rankers r ∈ {BM25, vector}.
|
||||
func hybridMerge(ctx context.Context, brainDir string, opts QueryOptions, bm25 []Result) ([]Result, error) {
|
||||
q, err := opts.Embedder.Embed(ctx, opts.Query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("embed query: %w", err)
|
||||
}
|
||||
vectorLimit := opts.Limit * 4
|
||||
if vectorLimit < 20 {
|
||||
vectorLimit = 20
|
||||
}
|
||||
hits, err := opts.Vector.Search(ctx, q, vectorLimit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("vector search: %w", err)
|
||||
}
|
||||
|
||||
rrf := make(map[string]float64)
|
||||
byPath := make(map[string]Result)
|
||||
for rank, r := range bm25 {
|
||||
rrf[r.Path] += 1.0 / (rrfK + float64(rank+1))
|
||||
byPath[r.Path] = r
|
||||
}
|
||||
for rank, h := range hits {
|
||||
if opts.Wing != "" && !pathInScope(h.Path, opts.Wing, opts.Hall) {
|
||||
continue
|
||||
}
|
||||
rrf[h.Path] += 1.0 / (rrfK + float64(rank+1))
|
||||
if _, seen := byPath[h.Path]; !seen {
|
||||
r, err := hydrate(brainDir, h.Path)
|
||||
if err != nil {
|
||||
slog.Warn("search: hydrate failed for vector hit", "path", h.Path, "err", err)
|
||||
continue
|
||||
}
|
||||
byPath[h.Path] = r
|
||||
}
|
||||
}
|
||||
|
||||
merged := make([]Result, 0, len(byPath))
|
||||
for p, r := range byPath {
|
||||
r.Score = int(rrf[p] * 1e6) // scale to int for stable JSON; relative order is what matters
|
||||
merged = append(merged, r)
|
||||
}
|
||||
sort.Slice(merged, func(i, j int) bool {
|
||||
return merged[i].Score > merged[j].Score
|
||||
})
|
||||
return merged, nil
|
||||
}
|
||||
|
||||
// pathInScope reports whether a wiki path satisfies the wing/hall filter.
|
||||
func pathInScope(relPath, wing, hall string) bool {
|
||||
prefix := "wiki/" + brain.Sanitise(wing) + "/"
|
||||
if hall != "" {
|
||||
prefix += hall + "/"
|
||||
}
|
||||
return strings.HasPrefix(relPath, prefix)
|
||||
}
|
||||
|
||||
// hydrate reads a single note from disk and returns a Result with title,
|
||||
// excerpt, wing, and hall populated. Used for paths that surface only
|
||||
// via vector search.
|
||||
func hydrate(brainDir, relPath string) (Result, error) {
|
||||
full := filepath.Join(brainDir, filepath.FromSlash(relPath))
|
||||
content, err := os.ReadFile(full)
|
||||
if err != nil {
|
||||
return Result{}, err
|
||||
}
|
||||
wing, hall := extractWingHall(string(content), relPath)
|
||||
return Result{
|
||||
Path: relPath,
|
||||
Title: extractTitle(string(content), filepath.Base(relPath)),
|
||||
Excerpt: excerpt(string(content), 300),
|
||||
Wing: wing,
|
||||
Hall: hall,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// resolveRoots returns the directories to walk for the given wing/hall
|
||||
// filters. Validates hall against the closed vocabulary when set.
|
||||
func resolveRoots(brainDir, wing, hall string) ([]string, error) {
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
package search_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@@ -12,6 +13,69 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type stubEmbedder struct{ vec []float32 }
|
||||
|
||||
func (s stubEmbedder) Embed(_ context.Context, _ string) ([]float32, error) { return s.vec, nil }
|
||||
|
||||
type stubVector struct{ hits []search.VectorHit }
|
||||
|
||||
func (s stubVector) Search(_ context.Context, _ []float32, _ int) ([]search.VectorHit, error) {
|
||||
return s.hits, nil
|
||||
}
|
||||
|
||||
func TestSearch_HybridRRFPromotesVectorOnlyHit(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
for _, p := range []struct{ rel, body string }{
|
||||
// BM25-keyword note (matches "lejpa" once)
|
||||
{"wiki/jepa-fx/facts/foo.md", "---\ntitle: Foo\n---\nlejpa keyword\n"},
|
||||
// Semantically related note that does NOT contain the keyword.
|
||||
{"wiki/jepa-fx/facts/semantic.md", "---\ntitle: Semantic\n---\nNo keyword in body.\n"},
|
||||
} {
|
||||
full := filepath.Join(dir, p.rel)
|
||||
require.NoError(t, os.MkdirAll(filepath.Dir(full), 0o755))
|
||||
require.NoError(t, os.WriteFile(full, []byte(p.body), 0o644))
|
||||
}
|
||||
|
||||
embedder := stubEmbedder{vec: []float32{0.1}}
|
||||
vector := stubVector{hits: []search.VectorHit{
|
||||
{Path: "wiki/jepa-fx/facts/semantic.md", Distance: 0.05}, // best vector match
|
||||
{Path: "wiki/jepa-fx/facts/foo.md", Distance: 0.10},
|
||||
}}
|
||||
|
||||
got, err := search.Query(dir, search.QueryOptions{
|
||||
Query: "lejpa",
|
||||
Limit: 5,
|
||||
Vector: vector,
|
||||
Embedder: embedder,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, got, 2, "vector-only hit should be hydrated into results")
|
||||
paths := []string{got[0].Path, got[1].Path}
|
||||
assert.Contains(t, paths, "wiki/jepa-fx/facts/foo.md")
|
||||
assert.Contains(t, paths, "wiki/jepa-fx/facts/semantic.md")
|
||||
}
|
||||
|
||||
func TestSearch_HybridFallsBackOnEmbedderError(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755))
|
||||
require.NoError(t, os.WriteFile(filepath.Join(dir, "wiki", "x.md"), []byte("keyword foo"), 0o644))
|
||||
|
||||
embedder := errorEmbedder{}
|
||||
vector := stubVector{}
|
||||
got, err := search.Query(dir, search.QueryOptions{
|
||||
Query: "keyword", Limit: 5, Vector: vector, Embedder: embedder,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Len(t, got, 1, "BM25 result should still come back when embedder fails")
|
||||
assert.Equal(t, "wiki/x.md", got[0].Path)
|
||||
}
|
||||
|
||||
type errorEmbedder struct{}
|
||||
|
||||
func (errorEmbedder) Embed(_ context.Context, _ string) ([]float32, error) {
|
||||
return nil, assert.AnError
|
||||
}
|
||||
|
||||
func TestSearch_ReturnsMatchingPages(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "knowledge"), 0o755))
|
||||
|
||||
155
ingestion/internal/vectorstore/pg.go
Normal file
155
ingestion/internal/vectorstore/pg.go
Normal file
@@ -0,0 +1,155 @@
|
||||
// Package vectorstore stores brain note embeddings in pgvector on the
|
||||
// shared postgres18 instance. One row per markdown path, cosine-distance
|
||||
// indexed via HNSW for sub-millisecond top-k retrieval.
|
||||
package vectorstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
)
|
||||
|
||||
// Hit is a single result from a cosine-distance search.
|
||||
type Hit struct {
|
||||
Path string
|
||||
Distance float64 // 0 = identical, 2 = opposite
|
||||
}
|
||||
|
||||
// PGStore is a pgvector-backed embeddings store. Construct with New and
|
||||
// call Init once to create the table + HNSW index. Use Close to release
|
||||
// the underlying pool.
|
||||
type PGStore struct {
|
||||
pool *pgxpool.Pool
|
||||
}
|
||||
|
||||
// New opens a connection pool against dsn (a libpq-style URL). Caller
|
||||
// owns the resulting *PGStore and must invoke Close.
|
||||
func New(ctx context.Context, dsn string) (*PGStore, error) {
|
||||
pool, err := pgxpool.New(ctx, dsn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("pgxpool: %w", err)
|
||||
}
|
||||
if err := pool.Ping(ctx); err != nil {
|
||||
pool.Close()
|
||||
return nil, fmt.Errorf("ping: %w", err)
|
||||
}
|
||||
return &PGStore{pool: pool}, nil
|
||||
}
|
||||
|
||||
// Close releases the underlying connection pool.
|
||||
func (s *PGStore) Close() {
|
||||
if s.pool != nil {
|
||||
s.pool.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// Init creates the brain_embeddings table and its HNSW index if they
|
||||
// don't already exist. Safe to call on every startup. Assumes the
|
||||
// `vector` extension is already installed (one-time DBA setup; see
|
||||
// scripts/brain-embeddings-init.sql).
|
||||
func (s *PGStore) Init(ctx context.Context) error {
|
||||
const ddl = `
|
||||
CREATE TABLE IF NOT EXISTS brain_embeddings (
|
||||
path TEXT PRIMARY KEY,
|
||||
embedding vector(768) NOT NULL,
|
||||
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
|
||||
);
|
||||
CREATE INDEX IF NOT EXISTS brain_embeddings_embedding_idx
|
||||
ON brain_embeddings USING hnsw (embedding vector_cosine_ops);
|
||||
`
|
||||
_, err := s.pool.Exec(ctx, ddl)
|
||||
return err
|
||||
}
|
||||
|
||||
// Upsert inserts or replaces the embedding for path. Embedding must be
|
||||
// 768-dim (nomic-embed-text). Caller is responsible for normalising
|
||||
// paths to forward-slash form.
|
||||
func (s *PGStore) Upsert(ctx context.Context, path string, embedding []float32) error {
|
||||
if len(embedding) != 768 {
|
||||
return fmt.Errorf("expected 768-dim embedding, got %d", len(embedding))
|
||||
}
|
||||
_, err := s.pool.Exec(ctx, `
|
||||
INSERT INTO brain_embeddings (path, embedding, updated_at)
|
||||
VALUES ($1, $2, now())
|
||||
ON CONFLICT (path) DO UPDATE
|
||||
SET embedding = EXCLUDED.embedding, updated_at = now()
|
||||
`, path, vectorLiteral(embedding))
|
||||
return err
|
||||
}
|
||||
|
||||
// Delete removes the row at path. No-op when the row doesn't exist.
|
||||
func (s *PGStore) Delete(ctx context.Context, path string) error {
|
||||
_, err := s.pool.Exec(ctx, `DELETE FROM brain_embeddings WHERE path = $1`, path)
|
||||
return err
|
||||
}
|
||||
|
||||
// Search returns the top-limit nearest paths by cosine distance.
|
||||
func (s *PGStore) Search(ctx context.Context, query []float32, limit int) ([]Hit, error) {
|
||||
if len(query) != 768 {
|
||||
return nil, fmt.Errorf("expected 768-dim query, got %d", len(query))
|
||||
}
|
||||
if limit <= 0 {
|
||||
limit = 10
|
||||
}
|
||||
rows, err := s.pool.Query(ctx, `
|
||||
SELECT path, embedding <=> $1 AS distance
|
||||
FROM brain_embeddings
|
||||
ORDER BY embedding <=> $1
|
||||
LIMIT $2
|
||||
`, vectorLiteral(query), limit)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var hits []Hit
|
||||
for rows.Next() {
|
||||
var h Hit
|
||||
if err := rows.Scan(&h.Path, &h.Distance); err != nil {
|
||||
return nil, fmt.Errorf("scan: %w", err)
|
||||
}
|
||||
hits = append(hits, h)
|
||||
}
|
||||
if err := rows.Err(); err != nil && !errors.Is(err, pgx.ErrNoRows) {
|
||||
return nil, err
|
||||
}
|
||||
return hits, nil
|
||||
}
|
||||
|
||||
// KnownPaths returns the path set already present in the store. Used by
|
||||
// the watcher to diff against the wiki/ tree and decide what to upsert.
|
||||
func (s *PGStore) KnownPaths(ctx context.Context) (map[string]struct{}, error) {
|
||||
rows, err := s.pool.Query(ctx, `SELECT path FROM brain_embeddings`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query paths: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
out := make(map[string]struct{})
|
||||
for rows.Next() {
|
||||
var p string
|
||||
if err := rows.Scan(&p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out[p] = struct{}{}
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// vectorLiteral renders a Go float32 slice as the literal representation
|
||||
// pgvector accepts as a parametric input: `[v1,v2,...,vN]`.
|
||||
func vectorLiteral(v []float32) string {
|
||||
var b strings.Builder
|
||||
b.WriteByte('[')
|
||||
for i, x := range v {
|
||||
if i > 0 {
|
||||
b.WriteByte(',')
|
||||
}
|
||||
fmt.Fprintf(&b, "%g", x)
|
||||
}
|
||||
b.WriteByte(']')
|
||||
return b.String()
|
||||
}
|
||||
91
ingestion/internal/vectorstore/pg_test.go
Normal file
91
ingestion/internal/vectorstore/pg_test.go
Normal file
@@ -0,0 +1,91 @@
|
||||
package vectorstore_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// integration tests run against a real postgres18 + pgvector. Gated by
|
||||
// BRAIN_PG_TEST_DSN so `task check` stays hermetic on hosts without a
|
||||
// reachable database.
|
||||
//
|
||||
// To run:
|
||||
// BRAIN_PG_TEST_DSN='postgres://brain_app:pwd@127.0.0.1:5432/brain' \
|
||||
// go test ./internal/vectorstore/... -run Integration
|
||||
func dsn(t *testing.T) string {
|
||||
t.Helper()
|
||||
v := os.Getenv("BRAIN_PG_TEST_DSN")
|
||||
if v == "" {
|
||||
t.Skip("BRAIN_PG_TEST_DSN not set; skipping pgvector integration tests")
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func freshStore(t *testing.T) (*vectorstore.PGStore, context.Context) {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
s, err := vectorstore.New(ctx, dsn(t))
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(s.Close)
|
||||
require.NoError(t, s.Init(ctx))
|
||||
// Clean slate per test.
|
||||
_, _ = s.KnownPaths(ctx)
|
||||
require.NoError(t, s.Delete(ctx, "%test-fixture%"))
|
||||
return s, ctx
|
||||
}
|
||||
|
||||
func vec(dim int, fill float32) []float32 {
|
||||
v := make([]float32, dim)
|
||||
for i := range v {
|
||||
v[i] = fill
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
func TestIntegration_UpsertAndSearch(t *testing.T) {
|
||||
s, ctx := freshStore(t)
|
||||
|
||||
require.NoError(t, s.Upsert(ctx, "wiki/a.md", vec(768, 1.0)))
|
||||
require.NoError(t, s.Upsert(ctx, "wiki/b.md", vec(768, -1.0)))
|
||||
|
||||
hits, err := s.Search(ctx, vec(768, 1.0), 2)
|
||||
require.NoError(t, err)
|
||||
require.GreaterOrEqual(t, len(hits), 1)
|
||||
assert.Equal(t, "wiki/a.md", hits[0].Path)
|
||||
assert.InDelta(t, 0.0, hits[0].Distance, 1e-5)
|
||||
|
||||
t.Cleanup(func() {
|
||||
_ = s.Delete(ctx, "wiki/a.md")
|
||||
_ = s.Delete(ctx, "wiki/b.md")
|
||||
})
|
||||
}
|
||||
|
||||
func TestIntegration_KnownPaths(t *testing.T) {
|
||||
s, ctx := freshStore(t)
|
||||
require.NoError(t, s.Upsert(ctx, "wiki/k.md", vec(768, 0.5)))
|
||||
t.Cleanup(func() { _ = s.Delete(ctx, "wiki/k.md") })
|
||||
|
||||
paths, err := s.KnownPaths(ctx)
|
||||
require.NoError(t, err)
|
||||
_, ok := paths["wiki/k.md"]
|
||||
assert.True(t, ok)
|
||||
}
|
||||
|
||||
func TestUpsert_RejectsWrongDimension(t *testing.T) {
|
||||
s := &vectorstore.PGStore{}
|
||||
err := s.Upsert(context.Background(), "x", vec(100, 0))
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func TestSearch_RejectsWrongDimension(t *testing.T) {
|
||||
s := &vectorstore.PGStore{}
|
||||
_, err := s.Search(context.Background(), vec(100, 0), 5)
|
||||
require.Error(t, err)
|
||||
}
|
||||
142
ingestion/internal/vectorstore/sync.go
Normal file
142
ingestion/internal/vectorstore/sync.go
Normal file
@@ -0,0 +1,142 @@
|
||||
package vectorstore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Embedder produces dense vectors. The embed package's Client satisfies
|
||||
// this; it's declared locally so vectorstore doesn't depend on embed.
|
||||
type Embedder interface {
|
||||
Embed(ctx context.Context, text string) ([]float32, error)
|
||||
}
|
||||
|
||||
// Store is the subset of PGStore that Sync needs. Lets tests stub it.
|
||||
type Store interface {
|
||||
KnownPaths(ctx context.Context) (map[string]struct{}, error)
|
||||
Upsert(ctx context.Context, path string, embedding []float32) error
|
||||
Delete(ctx context.Context, path string) error
|
||||
}
|
||||
|
||||
// SyncResult tallies what Sync did. Returned for logs / metrics; callers
|
||||
// generally don't act on the fields directly.
|
||||
type SyncResult struct {
|
||||
Added int
|
||||
Updated int
|
||||
Deleted int
|
||||
Errors []error
|
||||
}
|
||||
|
||||
// Sync brings the embedding store in line with brain/wiki/ on disk:
|
||||
// - new files (in the tree, not in the store) get embedded + upserted
|
||||
// - files whose mtime exceeds the store's updated_at get re-embedded
|
||||
// - files no longer on disk get deleted from the store
|
||||
//
|
||||
// Designed to be called on a ticker. Best-effort: per-file errors are
|
||||
// collected into SyncResult.Errors and do not abort the run.
|
||||
func Sync(ctx context.Context, brainDir string, store Store, embedder Embedder) (SyncResult, error) {
|
||||
var res SyncResult
|
||||
if store == nil || embedder == nil {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
known, err := store.KnownPaths(ctx)
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("known paths: %w", err)
|
||||
}
|
||||
seen := make(map[string]struct{})
|
||||
|
||||
wikiDir := filepath.Join(brainDir, "wiki")
|
||||
if _, err := os.Stat(wikiDir); os.IsNotExist(err) {
|
||||
return res, nil
|
||||
}
|
||||
|
||||
err = filepath.WalkDir(wikiDir, func(path string, d os.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if d.IsDir() || !strings.HasSuffix(path, ".md") || d.Name() == "_index.md" {
|
||||
return nil
|
||||
}
|
||||
rel, err := filepath.Rel(brainDir, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
relSlash := filepath.ToSlash(rel)
|
||||
seen[relSlash] = struct{}{}
|
||||
|
||||
if _, ok := known[relSlash]; ok {
|
||||
// Already embedded — TODO: compare mtime once Store exposes
|
||||
// updated_at so we re-embed on edit. For now, skip.
|
||||
return nil
|
||||
}
|
||||
|
||||
content, readErr := os.ReadFile(path)
|
||||
if readErr != nil {
|
||||
res.Errors = append(res.Errors, fmt.Errorf("read %s: %w", relSlash, readErr))
|
||||
return nil
|
||||
}
|
||||
vec, embErr := embedder.Embed(ctx, string(content))
|
||||
if embErr != nil {
|
||||
res.Errors = append(res.Errors, fmt.Errorf("embed %s: %w", relSlash, embErr))
|
||||
return nil
|
||||
}
|
||||
if upErr := store.Upsert(ctx, relSlash, vec); upErr != nil {
|
||||
res.Errors = append(res.Errors, fmt.Errorf("upsert %s: %w", relSlash, upErr))
|
||||
return nil
|
||||
}
|
||||
res.Added++
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return res, fmt.Errorf("walk wiki: %w", err)
|
||||
}
|
||||
|
||||
// Drop rows whose file is gone.
|
||||
for path := range known {
|
||||
if _, ok := seen[path]; ok {
|
||||
continue
|
||||
}
|
||||
if err := store.Delete(ctx, path); err != nil {
|
||||
res.Errors = append(res.Errors, fmt.Errorf("delete %s: %w", path, err))
|
||||
continue
|
||||
}
|
||||
res.Deleted++
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// StartSync launches Sync on a ticker in a background goroutine. The
|
||||
// goroutine exits when ctx is cancelled. Failures are logged via slog.
|
||||
func StartSync(ctx context.Context, brainDir string, store Store, embedder Embedder, interval time.Duration) {
|
||||
if interval <= 0 {
|
||||
interval = 5 * time.Minute
|
||||
}
|
||||
go func() {
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
// Run once immediately so first-boot doesn't wait a full tick.
|
||||
if r, err := Sync(ctx, brainDir, store, embedder); err != nil {
|
||||
slog.Error("embed sync failed", "err", err)
|
||||
} else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 {
|
||||
slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors))
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
if r, err := Sync(ctx, brainDir, store, embedder); err != nil {
|
||||
slog.Error("embed sync failed", "err", err)
|
||||
} else if r.Added+r.Deleted > 0 || len(r.Errors) > 0 {
|
||||
slog.Info("embed sync", "added", r.Added, "deleted", r.Deleted, "errors", len(r.Errors))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
137
ingestion/internal/vectorstore/sync_test.go
Normal file
137
ingestion/internal/vectorstore/sync_test.go
Normal file
@@ -0,0 +1,137 @@
|
||||
package vectorstore_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/mathiasbq/hyperguild/ingestion/internal/vectorstore"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type stubStore struct {
|
||||
known map[string]struct{}
|
||||
upserts map[string][]float32
|
||||
deletes []string
|
||||
failNext error
|
||||
}
|
||||
|
||||
func (s *stubStore) KnownPaths(_ context.Context) (map[string]struct{}, error) {
|
||||
out := make(map[string]struct{}, len(s.known))
|
||||
for k := range s.known {
|
||||
out[k] = struct{}{}
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *stubStore) Upsert(_ context.Context, path string, v []float32) error {
|
||||
if s.failNext != nil {
|
||||
err := s.failNext
|
||||
s.failNext = nil
|
||||
return err
|
||||
}
|
||||
if s.upserts == nil {
|
||||
s.upserts = make(map[string][]float32)
|
||||
}
|
||||
s.upserts[path] = v
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stubStore) Delete(_ context.Context, path string) error {
|
||||
s.deletes = append(s.deletes, path)
|
||||
return nil
|
||||
}
|
||||
|
||||
type stubEmbedder struct {
|
||||
vec []float32
|
||||
err error
|
||||
}
|
||||
|
||||
func (e stubEmbedder) Embed(_ context.Context, _ string) ([]float32, error) {
|
||||
return e.vec, e.err
|
||||
}
|
||||
|
||||
func writeNote(t *testing.T, dir, rel, body string) {
|
||||
t.Helper()
|
||||
full := filepath.Join(dir, rel)
|
||||
require.NoError(t, os.MkdirAll(filepath.Dir(full), 0o755))
|
||||
require.NoError(t, os.WriteFile(full, []byte(body), 0o644))
|
||||
}
|
||||
|
||||
func TestSync_AddsNewFiles(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/jepa-fx/facts/x.md", "body of x")
|
||||
writeNote(t, dir, "wiki/jepa-fx/facts/y.md", "body of y")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, res.Added)
|
||||
assert.Empty(t, res.Deleted)
|
||||
assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/x.md")
|
||||
assert.Contains(t, store.upserts, "wiki/jepa-fx/facts/y.md")
|
||||
}
|
||||
|
||||
func TestSync_SkipsAlreadyKnown(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{"wiki/a/facts/x.md": {}}}
|
||||
emb := stubEmbedder{vec: make([]float32, 768)}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, res.Added)
|
||||
assert.Empty(t, store.upserts)
|
||||
}
|
||||
|
||||
func TestSync_DeletesDisappearedFiles(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
require.NoError(t, os.MkdirAll(filepath.Join(dir, "wiki"), 0o755))
|
||||
// store has a path that doesn't exist on disk anymore
|
||||
store := &stubStore{known: map[string]struct{}{"wiki/old/facts/ghost.md": {}}}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, &stubStoreWithDelete{stubStore: store}, stubEmbedder{vec: make([]float32, 768)})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, res.Deleted)
|
||||
}
|
||||
|
||||
// stubStoreWithDelete is a thin wrapper to capture Delete calls;
|
||||
// stubStore already implements Delete but we need the wrapper to mix
|
||||
// store interfaces with sync-specific expectations.
|
||||
type stubStoreWithDelete struct {
|
||||
*stubStore
|
||||
}
|
||||
|
||||
func TestSync_SkipsIndexFiles(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/a/_index.md", "moc")
|
||||
writeNote(t, dir, "wiki/a/facts/real.md", "body")
|
||||
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, stubEmbedder{vec: make([]float32, 768)})
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, res.Added)
|
||||
assert.NotContains(t, store.upserts, "wiki/a/_index.md")
|
||||
}
|
||||
|
||||
func TestSync_NoOpWhenComponentsNil(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||
res, err := vectorstore.Sync(context.Background(), dir, nil, nil)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, res.Added)
|
||||
}
|
||||
|
||||
func TestSync_CollectsEmbedderErrors(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
writeNote(t, dir, "wiki/a/facts/x.md", "x")
|
||||
store := &stubStore{known: map[string]struct{}{}}
|
||||
emb := stubEmbedder{err: errors.New("upstream down")}
|
||||
res, err := vectorstore.Sync(context.Background(), dir, store, emb)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 0, res.Added)
|
||||
assert.Len(t, res.Errors, 1)
|
||||
}
|
||||
47
scripts/brain-embeddings-init.sql
Normal file
47
scripts/brain-embeddings-init.sql
Normal file
@@ -0,0 +1,47 @@
|
||||
-- One-time DBA setup for the brain vector store on postgres18.
|
||||
--
|
||||
-- Creates the `brain` database, the `brain_app` role, and the pgvector
|
||||
-- extension. The ingestion service connects as brain_app and creates
|
||||
-- the table + HNSW index idempotently at startup (see
|
||||
-- internal/vectorstore.PGStore.Init).
|
||||
--
|
||||
-- Run from koala as the postgres superuser:
|
||||
--
|
||||
-- kubectl exec -n databases postgres18-0 -- \
|
||||
-- psql -U postgres -f /tmp/brain-embeddings-init.sql
|
||||
--
|
||||
-- Or apply with:
|
||||
--
|
||||
-- PASSWORD='<sops-generated>' \
|
||||
-- kubectl exec -i -n databases postgres18-0 -- \
|
||||
-- psql -U postgres -v password="'$PASSWORD'" \
|
||||
-- < scripts/brain-embeddings-init.sql
|
||||
--
|
||||
-- Idempotent: rerunning is safe.
|
||||
|
||||
\set ON_ERROR_STOP on
|
||||
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_database WHERE datname = 'brain') THEN
|
||||
CREATE DATABASE brain;
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_roles WHERE rolname = 'brain_app') THEN
|
||||
EXECUTE format('CREATE ROLE brain_app LOGIN PASSWORD %L', :'password');
|
||||
END IF;
|
||||
END
|
||||
$$;
|
||||
|
||||
GRANT ALL PRIVILEGES ON DATABASE brain TO brain_app;
|
||||
|
||||
\c brain
|
||||
|
||||
CREATE EXTENSION IF NOT EXISTS vector;
|
||||
GRANT ALL ON SCHEMA public TO brain_app;
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO brain_app;
|
||||
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO brain_app;
|
||||
Reference in New Issue
Block a user