Files
Mathias bc011cc1f0 feat(claudewatcher): ingest Claude Code session transcripts into brain
New package internal/claudewatcher. The volume gate (24 turns/week of
agentsquad logs vs 500/week gate) exposed that the real signal lives
in daily Claude Code usage at ~/.claude/projects/*/<uuid>.jsonl, not
in agentsquad output. This package captures that signal. See infra#73
Track E + hyperguild#27 for the full reframe.

Components:
- parser: tolerant JSONL parser over the observed Claude Code session
  schema (user / assistant / attachment / system + bookkeeping types).
  Skip-flag fast-paths queue-operation, last-prompt, permission-mode,
  ai-title, bridge-session, file-history-snapshot.
- scrubber: 11-rule fail-closed regex set for credential shapes
  (bearer, postgres URIs, PEM, ssh-key, ghp_/sk-/sk-ant-/AKIA, homelab
  env tokens, SOPS markers). Drop turn + log on match.
- cursor: postgres-backed claude_session_cursors table, keyed by
  (host, file_path) with byte_offset. Resumable across pod restarts.
- watcher: poll loop. Walks SessionsDir, processes each .jsonl from
  its cursor offset, runs scrubber, emits a Batch per file to a
  Sink interface, advances cursor on successful Ingest.

No classifier integration in this commit — every kept turn is emitted
in a per-session batch. The cmd/server wiring (next commit) routes
batches to brain/wiki/claude-sessions/facts/. Classifier-driven hall
routing (decisions / failures / hypotheses) is a follow-up.

19 unit tests across parser + scrubber + watcher. task check green.

Refs: infra#73, hyperguild#27
2026-05-25 19:58:58 +02:00

111 lines
3.6 KiB
Go

package claudewatcher
import (
"context"
"errors"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
// CursorStore tracks how far the watcher has ingested into each
// session JSONL file. Keyed by (host, file_path) so the same `~/.claude`
// path on different hosts doesn't collide and resumability survives
// pod restarts. Idempotent Init lives alongside the rest of the
// claudewatcher schema; no separate migration framework.
type CursorStore struct {
pool *pgxpool.Pool
}
// NewCursorStore opens a pool against dsn. Caller closes the store.
func NewCursorStore(ctx context.Context, dsn string) (*CursorStore, error) {
pool, err := pgxpool.New(ctx, dsn)
if err != nil {
return nil, fmt.Errorf("pgxpool: %w", err)
}
if err := pool.Ping(ctx); err != nil {
pool.Close()
return nil, fmt.Errorf("ping: %w", err)
}
return &CursorStore{pool: pool}, nil
}
// NewCursorStoreFromPool wraps an existing pool (so the watcher can
// share the brain DSN pool with vectorstore/graphstore without a
// second connection set). Caller must NOT close the wrapped pool via
// the store — close the pool directly.
func NewCursorStoreFromPool(pool *pgxpool.Pool) *CursorStore {
return &CursorStore{pool: pool}
}
// Close releases the underlying connection pool when this store owns
// it. No-op when the pool was injected via NewCursorStoreFromPool —
// pgxpool.Close is idempotent so we lean on that.
func (s *CursorStore) Close() {
if s.pool != nil {
s.pool.Close()
}
}
// Init creates the claude_session_cursors table when missing.
func (s *CursorStore) Init(ctx context.Context) error {
const ddl = `
CREATE TABLE IF NOT EXISTS claude_session_cursors (
host TEXT NOT NULL,
file_path TEXT NOT NULL,
byte_offset BIGINT NOT NULL DEFAULT 0,
last_seen_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (host, file_path)
);
CREATE INDEX IF NOT EXISTS claude_session_cursors_host_idx
ON claude_session_cursors (host);
`
_, err := s.pool.Exec(ctx, ddl)
return err
}
// GetOffset returns the last recorded byte offset for (host, filePath).
// Missing rows are reported as offset=0, ok=false so the caller can
// distinguish "never ingested" from "ingested at the start of the
// file" (both produce identical behaviour but the metric is useful).
func (s *CursorStore) GetOffset(ctx context.Context, host, filePath string) (int64, bool, error) {
if host == "" || filePath == "" {
return 0, false, errors.New("host and file_path are required")
}
var offset int64
err := s.pool.QueryRow(ctx, `
SELECT byte_offset FROM claude_session_cursors WHERE host = $1 AND file_path = $2
`, host, filePath).Scan(&offset)
if errors.Is(err, pgx.ErrNoRows) {
return 0, false, nil
}
if err != nil {
return 0, false, fmt.Errorf("query: %w", err)
}
return offset, true, nil
}
// SetOffset writes the new offset for (host, filePath). Used after
// every successful parse + ingest batch so a crash mid-file rewinds
// only to the last committed checkpoint.
func (s *CursorStore) SetOffset(ctx context.Context, host, filePath string, offset int64) error {
if host == "" || filePath == "" {
return errors.New("host and file_path are required")
}
if offset < 0 {
return errors.New("offset must be >= 0")
}
_, err := s.pool.Exec(ctx, `
INSERT INTO claude_session_cursors (host, file_path, byte_offset, last_seen_at)
VALUES ($1, $2, $3, now())
ON CONFLICT (host, file_path) DO UPDATE
SET byte_offset = EXCLUDED.byte_offset,
last_seen_at = now()
`, host, filePath, offset)
if err != nil {
return fmt.Errorf("upsert offset: %w", err)
}
return nil
}