Files
Mathias bc011cc1f0 feat(claudewatcher): ingest Claude Code session transcripts into brain
New package internal/claudewatcher. The volume gate (24 turns/week of
agentsquad logs vs 500/week gate) exposed that the real signal lives
in daily Claude Code usage at ~/.claude/projects/*/<uuid>.jsonl, not
in agentsquad output. This package captures that signal. See infra#73
Track E + hyperguild#27 for the full reframe.

Components:
- parser: tolerant JSONL parser over the observed Claude Code session
  schema (user / assistant / attachment / system + bookkeeping types).
  Skip-flag fast-paths queue-operation, last-prompt, permission-mode,
  ai-title, bridge-session, file-history-snapshot.
- scrubber: 11-rule fail-closed regex set for credential shapes
  (bearer, postgres URIs, PEM, ssh-key, ghp_/sk-/sk-ant-/AKIA, homelab
  env tokens, SOPS markers). Drop turn + log on match.
- cursor: postgres-backed claude_session_cursors table, keyed by
  (host, file_path) with byte_offset. Resumable across pod restarts.
- watcher: poll loop. Walks SessionsDir, processes each .jsonl from
  its cursor offset, runs scrubber, emits a Batch per file to a
  Sink interface, advances cursor on successful Ingest.

No classifier integration in this commit — every kept turn is emitted
in a per-session batch. The cmd/server wiring (next commit) routes
batches to brain/wiki/claude-sessions/facts/. Classifier-driven hall
routing (decisions / failures / hypotheses) is a follow-up.

19 unit tests across parser + scrubber + watcher. task check green.

Refs: infra#73, hyperguild#27
2026-05-25 19:58:58 +02:00

175 lines
5.8 KiB
Go

package claudewatcher
import (
"context"
"os"
"path/filepath"
"strings"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// memSink captures batches without touching postgres. Thread-safe so
// TickOnce can run from any goroutine in concurrent tests.
type memSink struct {
mu sync.Mutex
batches []Batch
failOn string // file basename to error on
}
func (m *memSink) Ingest(_ context.Context, b Batch) error {
m.mu.Lock()
defer m.mu.Unlock()
if m.failOn != "" && strings.Contains(b.FilePath, m.failOn) {
return assert.AnError
}
m.batches = append(m.batches, b)
return nil
}
func writeSession(t *testing.T, dir, sessionID string, lines []string) string {
t.Helper()
path := filepath.Join(dir, sessionID+".jsonl")
body := strings.Join(lines, "\n") + "\n"
require.NoError(t, os.WriteFile(path, []byte(body), 0o644))
return path
}
func TestTickOnce_NoCursorReingestsEverythingEveryTick(t *testing.T) {
tmp := t.TempDir()
projectDir := filepath.Join(tmp, "-home-mathias-dev")
require.NoError(t, os.MkdirAll(projectDir, 0o755))
writeSession(t, projectDir, "sess1", []string{
`{"type":"user","sessionId":"sess1","message":"first prompt"}`,
`{"type":"assistant","sessionId":"sess1","message":{"content":[{"type":"text","text":"first answer"}]}}`,
})
sink := &memSink{}
cfg := Config{
SessionsDir: tmp,
Host: "koala",
Sink: sink,
}
require.NoError(t, TickOnce(context.Background(), cfg))
require.NoError(t, TickOnce(context.Background(), cfg))
require.Len(t, sink.batches, 2, "no cursor => re-emits same batch every tick")
assert.Equal(t, "sess1", sink.batches[0].SessionID)
assert.Equal(t, "koala", sink.batches[0].Host)
assert.Equal(t, "-home-mathias-dev", sink.batches[0].ProjectID)
assert.Len(t, sink.batches[0].Turns, 2)
}
func TestTickOnce_FiltersSkipTurnsAndScrubberMatches(t *testing.T) {
tmp := t.TempDir()
proj := filepath.Join(tmp, "-home-mathias-dev")
require.NoError(t, os.MkdirAll(proj, 0o755))
writeSession(t, proj, "sess-scrub", []string{
`{"type":"queue-operation","sessionId":"sess-scrub","content":"x"}`, // Skip
`{"type":"user","sessionId":"sess-scrub","message":"normal prompt"}`,
`{"type":"assistant","sessionId":"sess-scrub","message":{"content":[{"type":"text","text":"value POSTGRES_PASSWORD=hunter2supersecretvalue"}]}}`, // scrubbed
})
sink := &memSink{}
require.NoError(t, TickOnce(context.Background(), Config{
SessionsDir: tmp, Host: "koala", Sink: sink,
}))
require.Len(t, sink.batches, 1)
turns := sink.batches[0].Turns
require.Len(t, turns, 1, "skip + scrubbed turns must not reach the sink")
assert.Equal(t, "user", turns[0].Type)
}
func TestTickOnce_AllScrubbedNoBatchEmitted(t *testing.T) {
tmp := t.TempDir()
proj := filepath.Join(tmp, "-home-mathias-dev")
require.NoError(t, os.MkdirAll(proj, 0o755))
writeSession(t, proj, "all-bad", []string{
`{"type":"user","sessionId":"all-bad","message":"Authorization: Bearer abcdef1234567890ghijklmnop"}`,
})
sink := &memSink{}
require.NoError(t, TickOnce(context.Background(), Config{
SessionsDir: tmp, Host: "koala", Sink: sink,
}))
assert.Empty(t, sink.batches, "no usable turns => no batch")
}
func TestTickOnce_IgnoresNonJsonlFiles(t *testing.T) {
tmp := t.TempDir()
proj := filepath.Join(tmp, "-home-mathias-dev")
require.NoError(t, os.MkdirAll(proj, 0o755))
require.NoError(t, os.WriteFile(filepath.Join(proj, "README.md"), []byte("ignore me"), 0o644))
require.NoError(t, os.WriteFile(filepath.Join(proj, "config.json"), []byte("{}"), 0o644))
sink := &memSink{}
require.NoError(t, TickOnce(context.Background(), Config{
SessionsDir: tmp, Host: "koala", Sink: sink,
}))
assert.Empty(t, sink.batches)
}
func TestTickOnce_HandlesMultipleProjectsAndSessions(t *testing.T) {
tmp := t.TempDir()
projA := filepath.Join(tmp, "-home-mathias-dev")
projB := filepath.Join(tmp, "-home-mathias-AI-infra")
require.NoError(t, os.MkdirAll(projA, 0o755))
require.NoError(t, os.MkdirAll(projB, 0o755))
writeSession(t, projA, "a1", []string{`{"type":"user","sessionId":"a1","message":"q1"}`})
writeSession(t, projA, "a2", []string{`{"type":"user","sessionId":"a2","message":"q2"}`})
writeSession(t, projB, "b1", []string{`{"type":"user","sessionId":"b1","message":"q3"}`})
sink := &memSink{}
require.NoError(t, TickOnce(context.Background(), Config{
SessionsDir: tmp, Host: "koala", Sink: sink,
}))
require.Len(t, sink.batches, 3)
projects := map[string]int{}
for _, b := range sink.batches {
projects[b.ProjectID]++
}
assert.Equal(t, 2, projects["-home-mathias-dev"])
assert.Equal(t, 1, projects["-home-mathias-AI-infra"])
}
func TestTickOnce_SinkErrorDoesNotKillOtherFiles(t *testing.T) {
tmp := t.TempDir()
proj := filepath.Join(tmp, "-home-mathias-dev")
require.NoError(t, os.MkdirAll(proj, 0o755))
writeSession(t, proj, "good", []string{`{"type":"user","sessionId":"good","message":"q"}`})
writeSession(t, proj, "bad-session", []string{`{"type":"user","sessionId":"bad-session","message":"q"}`})
sink := &memSink{failOn: "bad-session"}
require.NoError(t, TickOnce(context.Background(), Config{
SessionsDir: tmp, Host: "koala", Sink: sink,
}))
require.Len(t, sink.batches, 1, "good session still ingested")
assert.Equal(t, "good", sink.batches[0].SessionID)
}
func TestWatch_RespectsContextCancel(t *testing.T) {
tmp := t.TempDir()
require.NoError(t, os.MkdirAll(filepath.Join(tmp, "-home-mathias-dev"), 0o755))
sink := &memSink{}
ctx, cancel := context.WithCancel(context.Background())
done := make(chan error, 1)
go func() {
done <- Watch(ctx, Config{
SessionsDir: tmp,
Host: "koala",
Interval: 10 * time.Millisecond,
Sink: sink,
})
}()
time.Sleep(50 * time.Millisecond)
cancel()
select {
case err := <-done:
assert.ErrorIs(t, err, context.Canceled)
case <-time.After(2 * time.Second):
t.Fatal("Watch did not return after cancel")
}
}