From 49b188e9c90565a377f1914943c203e33be1e10e Mon Sep 17 00:00:00 2001 From: Mathias Date: Mon, 25 May 2026 19:59:07 +0200 Subject: [PATCH] feat(server): wire claudewatcher behind CLAUDE_SESSIONS_DIR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Opt-in by setting CLAUDE_SESSIONS_DIR to the ~/.claude/projects path. When set, the server starts claudewatcher.Watch in a goroutine that ticks every CLAUDE_INGEST_INTERVAL seconds (default 60). Requires BRAIN_PG_DSN for the cursor table — fail-fast if missing. Each Batch becomes one wiki note at: brain/wiki/claude-sessions/facts/session--.md with frontmatter type=source + domain=. Per-turn content capped at 2000 chars (full transcripts stay in ~/.claude/projects already); the brain entry is a digest, not a mirror. CLAUDE_INGEST_HOST overrides the os.Hostname()-derived host label, useful when multiple ingestion pods consume the same DSN from different machines. Closes hyperguild#27. Bump-Type: minor --- ingestion/cmd/server/main.go | 92 ++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/ingestion/cmd/server/main.go b/ingestion/cmd/server/main.go index 0db641b..f47e29a 100644 --- a/ingestion/cmd/server/main.go +++ b/ingestion/cmd/server/main.go @@ -15,6 +15,7 @@ import ( chassisauth "gitea.d-ma.be/mathias/mcp-chassis/auth" "github.com/mathiasbq/hyperguild/ingestion/internal/api" + "github.com/mathiasbq/hyperguild/ingestion/internal/claudewatcher" "github.com/mathiasbq/hyperguild/ingestion/internal/embed" "github.com/mathiasbq/hyperguild/ingestion/internal/graphstore" "github.com/mathiasbq/hyperguild/ingestion/internal/graphsync" @@ -29,6 +30,50 @@ import ( "github.com/mathiasbq/hyperguild/ingestion/internal/watcher" ) +// claudeSink converts each claudewatcher.Batch into one wiki note under +// brain/wiki/claude-sessions/facts/. v1 emits one note per session +// keyed by host + session id; classifier-driven hall routing is a +// follow-up (hyperguild#27 v2). +type claudeSink struct { + brainDir string + logger *slog.Logger +} + +func (s *claudeSink) Ingest(ctx context.Context, b claudewatcher.Batch) error { + if len(b.Turns) == 0 { + return nil + } + var sb strings.Builder + fmt.Fprintf(&sb, "# Claude session %s (%s)\n\n", b.SessionID, b.Host) + fmt.Fprintf(&sb, "_Project: `%s`. File: `%s`. Turns: %d._\n\n", b.ProjectID, b.FilePath, len(b.Turns)) + for _, t := range b.Turns { + fmt.Fprintf(&sb, "## %s — %s\n\n", t.Type, t.Timestamp.UTC().Format(time.RFC3339)) + if t.ToolName != "" { + fmt.Fprintf(&sb, "_tool: `%s`_\n\n", t.ToolName) + } + // Cap per-turn excerpt to keep page size bounded; the full + // transcript lives on disk under ~/.claude/projects/ already. + content := t.Content + if len(content) > 2000 { + content = content[:2000] + "…" + } + sb.WriteString(content) + sb.WriteString("\n\n") + } + slug := "session-" + b.Host + "-" + b.SessionID + if _, err := api.WriteNote(s.brainDir, api.WriteNoteOptions{ + Filename: slug, + Wing: "claude-sessions", + Hall: "facts", + Type: "source", + Domain: b.ProjectID, + Content: sb.String(), + }); err != nil { + return fmt.Errorf("write claude session note: %w", err) + } + return nil +} + // redactDSN parses a Postgres URL and replaces its password with `***` // for safe inclusion in logs. Falls back to a non-leaking placeholder // if parsing fails — we never log a raw DSN. @@ -73,6 +118,16 @@ func envInt(key string, fallback int) int { return fallback } +// systemHostname returns os.Hostname() with a "unknown" fallback so the +// caller never has to handle the rare error path. +func systemHostname() string { + h, err := os.Hostname() + if err != nil || h == "" { + return "unknown" + } + return h +} + func main() { logger := slog.New(slog.NewJSONHandler(os.Stdout, nil)) @@ -191,6 +246,43 @@ func main() { Pipeline: pipelineCfg, }) } + + // Claude Code session ingestion (hyperguild#27 / infra#73 Track E.1). + // Off by default — explicitly opt in by setting CLAUDE_SESSIONS_DIR + // to the ~/.claude/projects path. Requires BRAIN_PG_DSN for the + // cursor table (resumable offsets across restarts). + if claudeDir := os.Getenv("CLAUDE_SESSIONS_DIR"); claudeDir != "" { + if pgDSN == "" { + logger.Error("CLAUDE_SESSIONS_DIR set but BRAIN_PG_DSN missing — claudewatcher needs the cursor table") + os.Exit(1) + } + cursorStore, cerr := claudewatcher.NewCursorStore(ctx, pgDSN) + if cerr != nil { + logger.Error("claudewatcher cursor init", "err", cerr) + os.Exit(1) + } + if cerr := cursorStore.Init(ctx); cerr != nil { + logger.Error("claudewatcher cursor migrate", "err", cerr) + os.Exit(1) + } + host := envOr("CLAUDE_INGEST_HOST", systemHostname()) + interval := time.Duration(envInt("CLAUDE_INGEST_INTERVAL", 60)) * time.Second + sink := &claudeSink{brainDir: brainDir, logger: logger} + go func() { + if err := claudewatcher.Watch(ctx, claudewatcher.Config{ + SessionsDir: claudeDir, + Host: host, + Interval: interval, + Sink: sink, + Cursors: cursorStore, + Logger: logger, + }); err != nil && err != context.Canceled { + logger.Error("claudewatcher exited", "err", err) + } + }() + logger.Info("claudewatcher started", + "sessions_dir", claudeDir, "host", host, "interval", interval) + } if vectorStore != nil { embedSyncInterval := envInt("BRAIN_EMBED_SYNC_INTERVAL", 300) vectorstore.StartSync(ctx, brainDir, vectorStore,