package claudewatcher import ( "context" "fmt" "log/slog" "os" "path/filepath" "strings" "time" ) // Sink consumes batches of ingest-ready turns from the watcher. The // production implementation builds wiki pages and calls pipeline.RunRaw // against the brain. Tests substitute a counter. // // A Batch represents the turns ingested from one session file between // two cursor checkpoints. Implementations must be idempotent — the // watcher only advances the cursor on a nil return. type Sink interface { Ingest(ctx context.Context, b Batch) error } // Batch is a per-file slice of turns plus identifying metadata. type Batch struct { Host string // origin host, e.g. "koala" FilePath string // absolute path to the source .jsonl file SessionID string // first session_id seen in the batch ProjectID string // basename of the parent dir, e.g. "-home-mathias-dev" Turns []Turn // never empty; caller filters Skip + scrubber matches } // Config drives one Watch loop. SessionsDir is the absolute path to the // Claude Code projects directory (~/.claude/projects). Host is the // label written into cursors and ingested page frontmatter. Interval // is the poll cadence; a zero or negative value disables the loop. // // Sink is required. Cursors is optional — when nil the watcher // re-reads from byte 0 on every tick (useful for first-run testing // without a postgres dependency). type Config struct { SessionsDir string Host string Interval time.Duration Sink Sink Cursors *CursorStore Logger *slog.Logger } // Watch runs the polling loop until ctx is cancelled. Returns ctx.Err() // on shutdown. Each tick walks SessionsDir for *.jsonl files, advances // each file's cursor, and emits one Batch per file with new turns. // Errors during a single file's parse or ingest are logged but do not // abort the loop — a single bad file shouldn't block the others. func Watch(ctx context.Context, cfg Config) error { if cfg.SessionsDir == "" { return fmt.Errorf("sessions dir is required") } if cfg.Sink == nil { return fmt.Errorf("sink is required") } if cfg.Interval <= 0 { return fmt.Errorf("interval must be positive") } if cfg.Host == "" { cfg.Host = "unknown" } if cfg.Logger == nil { cfg.Logger = slog.Default() } cfg.Logger.Info("claudewatcher: started", "sessions_dir", cfg.SessionsDir, "host", cfg.Host, "interval", cfg.Interval) ticker := time.NewTicker(cfg.Interval) defer ticker.Stop() // Run an immediate first sweep so first-launch users don't wait one // tick before anything happens. runTick(ctx, cfg) for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: runTick(ctx, cfg) } } } // runTick is one polling pass. Exposed (lowercase) for tests via // TickOnce. func runTick(ctx context.Context, cfg Config) { files, err := listSessionFiles(cfg.SessionsDir) if err != nil { cfg.Logger.Warn("claudewatcher: list session files", "err", err) return } for _, f := range files { if ctx.Err() != nil { return } if err := processFile(ctx, cfg, f); err != nil { cfg.Logger.Warn("claudewatcher: file failed", "path", f, "err", err) } } } // TickOnce runs one sweep synchronously and returns. Used by tests + // by ad-hoc CLI invocations. func TickOnce(ctx context.Context, cfg Config) error { if cfg.SessionsDir == "" || cfg.Sink == nil { return fmt.Errorf("config invalid") } if cfg.Host == "" { cfg.Host = "unknown" } if cfg.Logger == nil { cfg.Logger = slog.Default() } runTick(ctx, cfg) return nil } func listSessionFiles(root string) ([]string, error) { var out []string err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error { if walkErr != nil { return walkErr } if d.IsDir() { return nil } if !strings.HasSuffix(path, ".jsonl") { return nil } out = append(out, path) return nil }) if err != nil { return nil, fmt.Errorf("walk %s: %w", root, err) } return out, nil } func processFile(ctx context.Context, cfg Config, path string) error { startOffset := int64(0) if cfg.Cursors != nil { off, _, err := cfg.Cursors.GetOffset(ctx, cfg.Host, path) if err != nil { return fmt.Errorf("get cursor: %w", err) } startOffset = off } stat, err := os.Stat(path) if err != nil { return fmt.Errorf("stat: %w", err) } if stat.Size() <= startOffset { return nil // nothing new } f, err := os.Open(path) if err != nil { return fmt.Errorf("open: %w", err) } defer func() { _ = f.Close() }() if _, err := f.Seek(startOffset, 0); err != nil { return fmt.Errorf("seek: %w", err) } var keep []Turn var sessionID string var droppedScrub int endOffset, err := ParseStream(f, startOffset, func(format string, args ...any) { cfg.Logger.Warn(fmt.Sprintf("claudewatcher: parse: "+format, args...)) }, func(t Turn) error { if t.Skip || t.Content == "" { return nil } if rule := Scrub(t.Content); rule != "" { droppedScrub++ cfg.Logger.Warn("claudewatcher: turn dropped by scrubber", "rule", rule, "path", path, "session_id", t.SessionID) return nil } if sessionID == "" { sessionID = t.SessionID } keep = append(keep, t) return nil }) if err != nil { return fmt.Errorf("parse stream: %w", err) } if len(keep) == 0 { if cfg.Cursors != nil { if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil { return fmt.Errorf("advance cursor (no-turns): %w", err) } } if droppedScrub > 0 { cfg.Logger.Info("claudewatcher: only scrubbed turns this tick", "path", path, "dropped", droppedScrub) } return nil } batch := Batch{ Host: cfg.Host, FilePath: path, SessionID: sessionID, ProjectID: filepath.Base(filepath.Dir(path)), Turns: keep, } if err := cfg.Sink.Ingest(ctx, batch); err != nil { return fmt.Errorf("sink ingest: %w", err) } if cfg.Cursors != nil { if err := cfg.Cursors.SetOffset(ctx, cfg.Host, path, endOffset); err != nil { return fmt.Errorf("advance cursor: %w", err) } } cfg.Logger.Info("claudewatcher: ingested batch", "path", path, "session_id", sessionID, "turns_kept", len(keep), "dropped_scrub", droppedScrub, "new_offset", endOffset) return nil }