Files
Mathias bc011cc1f0 feat(claudewatcher): ingest Claude Code session transcripts into brain
New package internal/claudewatcher. The volume gate (24 turns/week of
agentsquad logs vs 500/week gate) exposed that the real signal lives
in daily Claude Code usage at ~/.claude/projects/*/<uuid>.jsonl, not
in agentsquad output. This package captures that signal. See infra#73
Track E + hyperguild#27 for the full reframe.

Components:
- parser: tolerant JSONL parser over the observed Claude Code session
  schema (user / assistant / attachment / system + bookkeeping types).
  Skip-flag fast-paths queue-operation, last-prompt, permission-mode,
  ai-title, bridge-session, file-history-snapshot.
- scrubber: 11-rule fail-closed regex set for credential shapes
  (bearer, postgres URIs, PEM, ssh-key, ghp_/sk-/sk-ant-/AKIA, homelab
  env tokens, SOPS markers). Drop turn + log on match.
- cursor: postgres-backed claude_session_cursors table, keyed by
  (host, file_path) with byte_offset. Resumable across pod restarts.
- watcher: poll loop. Walks SessionsDir, processes each .jsonl from
  its cursor offset, runs scrubber, emits a Batch per file to a
  Sink interface, advances cursor on successful Ingest.

No classifier integration in this commit — every kept turn is emitted
in a per-session batch. The cmd/server wiring (next commit) routes
batches to brain/wiki/claude-sessions/facts/. Classifier-driven hall
routing (decisions / failures / hypotheses) is a follow-up.

19 unit tests across parser + scrubber + watcher. task check green.

Refs: infra#73, hyperguild#27
2026-05-25 19:58:58 +02:00

306 lines
8.9 KiB
Go

// Package claudewatcher ingests Claude Code session transcripts
// (`~/.claude/projects/*/<uuid>.jsonl`) into the brain corpus.
//
// Schema (observed 2026-05-25 across ~30 session files on koala):
//
// type=user — user prompts + tool results
// type=assistant — model turns; tool_use blocks live in message.content
// type=attachment — hook outputs, ingested files
// type=system — turn-boundary metadata
// type=file-history-snapshot — git-style snapshot of edited files
// type=queue-operation, last-prompt, permission-mode, ai-title,
// bridge-session — internal bookkeeping, ignored
//
// The parser is intentionally tolerant: malformed lines are skipped
// (caller logs and advances), missing optional fields default to "",
// and unknown `type` values are returned as Turn entries with
// `Skip=true` so callers can filter cheaply.
package claudewatcher
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"time"
)
// Turn is one parsed JSONL entry from a Claude Code session log.
//
// Skip is true for entry types we never want to ingest (queue
// bookkeeping, snapshots, etc.). Callers fast-path these without
// running the scrubber or classifier.
type Turn struct {
SessionID string
Type string
ParentUUID string
Timestamp time.Time
Cwd string
GitBranch string
Content string // plain-text projection of the entry, ready for the scrubber/classifier
ToolName string // populated when an assistant turn invokes a tool
OffsetAfter int64 // byte offset in the file just past this entry
Skip bool
ParseWarning string // non-empty when the entry parsed but had a sub-field we couldn't normalise
}
// ParseStream reads JSONL lines from r starting at startOffset and
// invokes emit for each parsed entry. emit may return ErrStop to
// terminate the scan cleanly. Other emit errors propagate.
//
// startOffset is informational — the caller is expected to have already
// seeked the underlying reader to that offset. ParseStream adds the
// number of bytes consumed per line to it to compute Turn.OffsetAfter.
//
// Lines that fail to unmarshal are logged via warnf and skipped; they
// do NOT advance OffsetAfter past the malformed line by themselves,
// but the next valid line resumes correctly because bufio.Scanner
// preserves stream position.
func ParseStream(
r io.Reader,
startOffset int64,
warnf func(format string, args ...any),
emit func(Turn) error,
) (int64, error) {
scanner := bufio.NewScanner(r)
scanner.Buffer(make([]byte, 0, 64*1024), 8*1024*1024) // some lines are big (tool outputs)
offset := startOffset
for scanner.Scan() {
raw := scanner.Bytes()
lineLen := int64(len(raw)) + 1 // +1 for the newline
t, err := parseTurn(raw)
if err != nil {
if warnf != nil {
warnf("parse: %v (%d bytes)", err, len(raw))
}
offset += lineLen
continue
}
t.OffsetAfter = offset + lineLen
if err := emit(t); err != nil {
if errors.Is(err, ErrStop) {
return t.OffsetAfter, nil
}
return offset, fmt.Errorf("emit: %w", err)
}
offset = t.OffsetAfter
}
if err := scanner.Err(); err != nil {
return offset, fmt.Errorf("scan: %w", err)
}
return offset, nil
}
// ErrStop terminates a ParseStream loop without surfacing an error.
var ErrStop = errors.New("claudewatcher: stop")
// rawEntry is a permissive shape that covers every type observed in
// the JSONL files. Fields we don't care about are intentionally
// omitted to keep the unmarshal cheap.
type rawEntry struct {
Type string `json:"type"`
SessionID string `json:"sessionId"`
ParentUUID string `json:"parentUuid"`
Timestamp string `json:"timestamp"`
Cwd string `json:"cwd"`
GitBranch string `json:"gitBranch"`
Message json.RawMessage `json:"message"`
Attachment json.RawMessage `json:"attachment"`
Content string `json:"content"` // queue-operation
LastPrompt string `json:"lastPrompt"` // last-prompt
Subtype string `json:"subtype"` // system
}
// skipTypes lists every entry type we want to never ingest. Marked Skip
// at parse time so the caller's filter is a single boolean check.
var skipTypes = map[string]struct{}{
"queue-operation": {},
"last-prompt": {},
"permission-mode": {},
"ai-title": {},
"bridge-session": {},
"file-history-snapshot": {},
}
func parseTurn(raw []byte) (Turn, error) {
var e rawEntry
if err := json.Unmarshal(raw, &e); err != nil {
return Turn{}, fmt.Errorf("unmarshal: %w", err)
}
t := Turn{
Type: e.Type,
SessionID: e.SessionID,
ParentUUID: e.ParentUUID,
Cwd: e.Cwd,
GitBranch: e.GitBranch,
}
if _, skip := skipTypes[e.Type]; skip {
t.Skip = true
return t, nil
}
if e.Timestamp != "" {
if ts, err := time.Parse(time.RFC3339Nano, e.Timestamp); err == nil {
t.Timestamp = ts
} else {
t.ParseWarning = "timestamp"
}
}
switch e.Type {
case "user":
t.Content = extractMessageText(e.Message)
case "assistant":
t.Content, t.ToolName = extractAssistantTurn(e.Message)
case "attachment":
t.Content = extractAttachmentText(e.Attachment)
case "system":
t.Content = "[system " + e.Subtype + "]"
default:
// Unknown type — keep the row but mark Skip so callers ignore.
t.Skip = true
}
return t, nil
}
// extractMessageText pulls the textual projection out of a user/assistant
// message field. The shape is the Anthropic Messages API content-block
// array (an array of {type, text|tool_use|tool_result, ...}). We
// concatenate every text-bearing block and ignore the rest.
func extractMessageText(raw json.RawMessage) string {
if len(raw) == 0 {
return ""
}
var msg struct {
Role string `json:"role"`
Content json.RawMessage `json:"content"`
Stop string `json:"stop_reason"`
Model string `json:"model"`
Usage map[string]any `json:"usage"`
Meta map[string]string `json:"meta"`
}
if err := json.Unmarshal(raw, &msg); err != nil {
// Some user turns have message as plain string.
var s string
if err2 := json.Unmarshal(raw, &s); err2 == nil {
return s
}
return ""
}
// Content can be a string OR an array.
var asString string
if err := json.Unmarshal(msg.Content, &asString); err == nil {
return asString
}
var blocks []struct {
Type string `json:"type"`
Text string `json:"text"`
Content json.RawMessage `json:"content"`
}
if err := json.Unmarshal(msg.Content, &blocks); err != nil {
return ""
}
var sb strings.Builder
for _, b := range blocks {
switch b.Type {
case "text":
sb.WriteString(b.Text)
sb.WriteByte('\n')
case "tool_result":
// Tool result content may itself be a string or array of blocks.
var s string
if err := json.Unmarshal(b.Content, &s); err == nil {
sb.WriteString("[tool_result] ")
sb.WriteString(s)
sb.WriteByte('\n')
continue
}
var sub []struct {
Type string `json:"type"`
Text string `json:"text"`
}
if err := json.Unmarshal(b.Content, &sub); err == nil {
for _, s := range sub {
if s.Type == "text" {
sb.WriteString("[tool_result] ")
sb.WriteString(s.Text)
sb.WriteByte('\n')
}
}
}
}
}
return strings.TrimRight(sb.String(), "\n")
}
// extractAssistantTurn pulls text + the first tool name (if any) from
// an assistant content-block array. Multi-tool turns lose the second
// name; the goal is signal for classification, not perfect fidelity.
func extractAssistantTurn(raw json.RawMessage) (string, string) {
if len(raw) == 0 {
return "", ""
}
var msg struct {
Content json.RawMessage `json:"content"`
}
if err := json.Unmarshal(raw, &msg); err != nil {
return "", ""
}
var blocks []struct {
Type string `json:"type"`
Text string `json:"text"`
Name string `json:"name"`
Tool json.RawMessage `json:"input"`
}
if err := json.Unmarshal(msg.Content, &blocks); err != nil {
return "", ""
}
var sb strings.Builder
var firstTool string
for _, b := range blocks {
switch b.Type {
case "text":
sb.WriteString(b.Text)
sb.WriteByte('\n')
case "tool_use":
if firstTool == "" {
firstTool = b.Name
}
sb.WriteString("[tool_use:")
sb.WriteString(b.Name)
sb.WriteString("]\n")
}
}
return strings.TrimRight(sb.String(), "\n"), firstTool
}
// extractAttachmentText pulls text content from an attachment payload,
// or returns a short tag when the attachment is a hook event.
func extractAttachmentText(raw json.RawMessage) string {
if len(raw) == 0 {
return ""
}
var a struct {
Type string `json:"type"`
HookName string `json:"hookName"`
HookEvent string `json:"hookEvent"`
Content string `json:"content"`
Text string `json:"text"`
}
if err := json.Unmarshal(raw, &a); err != nil {
return ""
}
if a.Content != "" {
return a.Content
}
if a.Text != "" {
return a.Text
}
if a.HookName != "" {
return "[hook " + a.HookEvent + ":" + a.HookName + "]"
}
return ""
}