feat(claudewatcher): ingest Claude Code session transcripts into brain
New package internal/claudewatcher. The volume gate (24 turns/week of agentsquad logs vs 500/week gate) exposed that the real signal lives in daily Claude Code usage at ~/.claude/projects/*/<uuid>.jsonl, not in agentsquad output. This package captures that signal. See infra#73 Track E + hyperguild#27 for the full reframe. Components: - parser: tolerant JSONL parser over the observed Claude Code session schema (user / assistant / attachment / system + bookkeeping types). Skip-flag fast-paths queue-operation, last-prompt, permission-mode, ai-title, bridge-session, file-history-snapshot. - scrubber: 11-rule fail-closed regex set for credential shapes (bearer, postgres URIs, PEM, ssh-key, ghp_/sk-/sk-ant-/AKIA, homelab env tokens, SOPS markers). Drop turn + log on match. - cursor: postgres-backed claude_session_cursors table, keyed by (host, file_path) with byte_offset. Resumable across pod restarts. - watcher: poll loop. Walks SessionsDir, processes each .jsonl from its cursor offset, runs scrubber, emits a Batch per file to a Sink interface, advances cursor on successful Ingest. No classifier integration in this commit — every kept turn is emitted in a per-session batch. The cmd/server wiring (next commit) routes batches to brain/wiki/claude-sessions/facts/. Classifier-driven hall routing (decisions / failures / hypotheses) is a follow-up. 19 unit tests across parser + scrubber + watcher. task check green. Refs: infra#73, hyperguild#27
This commit is contained in:
234
ingestion/internal/claudewatcher/watcher.go
Normal file
234
ingestion/internal/claudewatcher/watcher.go
Normal file
@@ -0,0 +1,234 @@
|
||||
package claudewatcher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Sink consumes batches of ingest-ready turns from the watcher. The
|
||||
// production implementation builds wiki pages and calls pipeline.RunRaw
|
||||
// against the brain. Tests substitute a counter.
|
||||
//
|
||||
// A Batch represents the turns ingested from one session file between
|
||||
// two cursor checkpoints. Implementations must be idempotent — the
|
||||
// watcher only advances the cursor on a nil return.
|
||||
type Sink interface {
|
||||
Ingest(ctx context.Context, b Batch) error
|
||||
}
|
||||
|
||||
// Batch is a per-file slice of turns plus identifying metadata.
|
||||
type Batch struct {
|
||||
Host string // origin host, e.g. "koala"
|
||||
FilePath string // absolute path to the source .jsonl file
|
||||
SessionID string // first session_id seen in the batch
|
||||
ProjectID string // basename of the parent dir, e.g. "-home-mathias-dev"
|
||||
Turns []Turn // never empty; caller filters Skip + scrubber matches
|
||||
}
|
||||
|
||||
// Config drives one Watch loop. SessionsDir is the absolute path to the
|
||||
// Claude Code projects directory (~/.claude/projects). Host is the
|
||||
// label written into cursors and ingested page frontmatter. Interval
|
||||
// is the poll cadence; a zero or negative value disables the loop.
|
||||
//
|
||||
// Sink is required. Cursors is optional — when nil the watcher
|
||||
// re-reads from byte 0 on every tick (useful for first-run testing
|
||||
// without a postgres dependency).
|
||||
type Config struct {
|
||||
SessionsDir string
|
||||
Host string
|
||||
Interval time.Duration
|
||||
Sink Sink
|
||||
Cursors *CursorStore
|
||||
Logger *slog.Logger
|
||||
}
|
||||
|
||||
// Watch runs the polling loop until ctx is cancelled. Returns ctx.Err()
|
||||
// on shutdown. Each tick walks SessionsDir for *.jsonl files, advances
|
||||
// each file's cursor, and emits one Batch per file with new turns.
|
||||
// Errors during a single file's parse or ingest are logged but do not
|
||||
// abort the loop — a single bad file shouldn't block the others.
|
||||
func Watch(ctx context.Context, cfg Config) error {
|
||||
if cfg.SessionsDir == "" {
|
||||
return fmt.Errorf("sessions dir is required")
|
||||
}
|
||||
if cfg.Sink == nil {
|
||||
return fmt.Errorf("sink is required")
|
||||
}
|
||||
if cfg.Interval <= 0 {
|
||||
return fmt.Errorf("interval must be positive")
|
||||
}
|
||||
if cfg.Host == "" {
|
||||
cfg.Host = "unknown"
|
||||
}
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = slog.Default()
|
||||
}
|
||||
cfg.Logger.Info("claudewatcher: started",
|
||||
"sessions_dir", cfg.SessionsDir,
|
||||
"host", cfg.Host,
|
||||
"interval", cfg.Interval)
|
||||
|
||||
ticker := time.NewTicker(cfg.Interval)
|
||||
defer ticker.Stop()
|
||||
// Run an immediate first sweep so first-launch users don't wait one
|
||||
// tick before anything happens.
|
||||
runTick(ctx, cfg)
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-ticker.C:
|
||||
runTick(ctx, cfg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runTick is one polling pass. Exposed (lowercase) for tests via
|
||||
// TickOnce.
|
||||
func runTick(ctx context.Context, cfg Config) {
|
||||
files, err := listSessionFiles(cfg.SessionsDir)
|
||||
if err != nil {
|
||||
cfg.Logger.Warn("claudewatcher: list session files", "err", err)
|
||||
return
|
||||
}
|
||||
for _, f := range files {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
if err := processFile(ctx, cfg, f); err != nil {
|
||||
cfg.Logger.Warn("claudewatcher: file failed",
|
||||
"path", f, "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TickOnce runs one sweep synchronously and returns. Used by tests +
|
||||
// by ad-hoc CLI invocations.
|
||||
func TickOnce(ctx context.Context, cfg Config) error {
|
||||
if cfg.SessionsDir == "" || cfg.Sink == nil {
|
||||
return fmt.Errorf("config invalid")
|
||||
}
|
||||
if cfg.Host == "" {
|
||||
cfg.Host = "unknown"
|
||||
}
|
||||
if cfg.Logger == nil {
|
||||
cfg.Logger = slog.Default()
|
||||
}
|
||||
runTick(ctx, cfg)
|
||||
return nil
|
||||
}
|
||||
|
||||
func listSessionFiles(root string) ([]string, error) {
|
||||
var out []string
|
||||
err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
if !strings.HasSuffix(path, ".jsonl") {
|
||||
return nil
|
||||
}
|
||||
out = append(out, path)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("walk %s: %w", root, err)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func processFile(ctx context.Context, cfg Config, path string) error {
|
||||
startOffset := int64(0)
|
||||
if cfg.Cursors != nil {
|
||||
off, _, err := cfg.Cursors.GetOffset(ctx, cfg.Host, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get cursor: %w", err)
|
||||
}
|
||||
startOffset = off
|
||||
}
|
||||
|
||||
stat, err := os.Stat(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat: %w", err)
|
||||
}
|
||||
if stat.Size() <= startOffset {
|
||||
return nil // nothing new
|
||||
}
|
||||
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open: %w", err)
|
||||
}
|
||||
defer func() { _ = f.Close() }()
|
||||
if _, err := f.Seek(startOffset, 0); err != nil {
|
||||
return fmt.Errorf("seek: %w", err)
|
||||
}
|
||||
|
||||
var keep []Turn
|
||||
var sessionID string
|
||||
var droppedScrub int
|
||||
endOffset, err := ParseStream(f, startOffset,
|
||||
func(format string, args ...any) {
|
||||
cfg.Logger.Warn(fmt.Sprintf("claudewatcher: parse: "+format, args...))
|
||||
},
|
||||
func(t Turn) error {
|
||||
if t.Skip || t.Content == "" {
|
||||
return nil
|
||||
}
|
||||
if rule := Scrub(t.Content); rule != "" {
|
||||
droppedScrub++
|
||||
cfg.Logger.Warn("claudewatcher: turn dropped by scrubber",
|
||||
"rule", rule, "path", path, "session_id", t.SessionID)
|
||||
return nil
|
||||
}
|
||||
if sessionID == "" {
|
||||
sessionID = t.SessionID
|
||||
}
|
||||
keep = append(keep, t)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse stream: %w", err)
|
||||
}
|
||||
|
||||
if len(keep) == 0 {
|
||||
if cfg.Cursors != nil {
|
||||
if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil {
|
||||
return fmt.Errorf("advance cursor (no-turns): %w", err)
|
||||
}
|
||||
}
|
||||
if droppedScrub > 0 {
|
||||
cfg.Logger.Info("claudewatcher: only scrubbed turns this tick",
|
||||
"path", path, "dropped", droppedScrub)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
batch := Batch{
|
||||
Host: cfg.Host,
|
||||
FilePath: path,
|
||||
SessionID: sessionID,
|
||||
ProjectID: filepath.Base(filepath.Dir(path)),
|
||||
Turns: keep,
|
||||
}
|
||||
if err := cfg.Sink.Ingest(ctx, batch); err != nil {
|
||||
return fmt.Errorf("sink ingest: %w", err)
|
||||
}
|
||||
if cfg.Cursors != nil {
|
||||
if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil {
|
||||
return fmt.Errorf("advance cursor: %w", err)
|
||||
}
|
||||
}
|
||||
cfg.Logger.Info("claudewatcher: ingested batch",
|
||||
"path", path, "session_id", sessionID,
|
||||
"turns_kept", len(keep), "dropped_scrub", droppedScrub,
|
||||
"new_offset", endOffset)
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user