From e49b36e463aad7d4da79fc5ff4ca1d25a3108b45 Mon Sep 17 00:00:00 2001 From: Mathias Date: Fri, 22 May 2026 07:13:05 +0200 Subject: [PATCH] feat(ingestion): expose Prometheus /metrics for brain query latency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes infra#50. Adds an internal/metrics package with a hand-rolled Prometheus exposition layer (stdlib + sync/atomic only — no new dep) and wraps the HTTP mux with a timing middleware. Every request emits one observation on the `brain_query_duration_seconds` histogram labeled by `path` (request Pattern, low cardinality) and `status` (2xx/3xx/4xx/5xx). Dependency choice: hand-rolled rather than github.com/prometheus/client_golang because the surface needed is small (one histogram + bucket constants) and the repo CLAUDE.md keeps deps stdlib + jwx + testify only. ~150 LOC of code + tests is cheaper than the chart of transitive prometheus deps. Endpoints: - GET /metrics — OpenMetrics text exposition, no auth (cluster-internal) Wire format pinned by tests in internal/metrics/metrics_test.go. The ServiceMonitor that drives the kube-prometheus-stack scrape lives in infra/k3s/apps/supervisor/ (separate commit on mathias/infra). After this image deploys, the canary alert from docs/superpowers/specs/2026-05-homelab-architecture-review.md becomes wireable: histogram_quantile(0.95, sum(rate(brain_query_duration_seconds_bucket[5m])) by (le)) > 1.5 * histogram_quantile(0.95, sum(rate(brain_query_duration_seconds_bucket[5m] offset 7d)) by (le)) Co-Authored-By: Claude Opus 4.7 (1M context) --- ingestion/cmd/server/main.go | 12 +- ingestion/internal/metrics/metrics.go | 194 +++++++++++++++++++++ ingestion/internal/metrics/metrics_test.go | 119 +++++++++++++ 3 files changed, 324 insertions(+), 1 deletion(-) create mode 100644 ingestion/internal/metrics/metrics.go create mode 100644 ingestion/internal/metrics/metrics_test.go diff --git a/ingestion/cmd/server/main.go b/ingestion/cmd/server/main.go index fda853f..1effd4d 100644 --- a/ingestion/cmd/server/main.go +++ b/ingestion/cmd/server/main.go @@ -17,6 +17,7 @@ import ( "github.com/mathiasbq/hyperguild/ingestion/internal/llm" "github.com/mathiasbq/hyperguild/ingestion/internal/mcp" "github.com/mathiasbq/hyperguild/ingestion/internal/embed" + "github.com/mathiasbq/hyperguild/ingestion/internal/metrics" "github.com/mathiasbq/hyperguild/ingestion/internal/oauth" "github.com/mathiasbq/hyperguild/ingestion/internal/pipeline" "github.com/mathiasbq/hyperguild/ingestion/internal/reranker" @@ -235,6 +236,15 @@ func main() { os.Exit(1) } + // /metrics — unauthenticated Prometheus endpoint. kube-prometheus-stack + // scrapes it via the ServiceMonitor in k3s/apps/supervisor/. The metrics + // middleware below wraps every other registered handler so it observes + // real request latency. /metrics itself is excluded from its own + // observation by registering it on the outer mux (post-wrap). + reg := metrics.New() + mux.HandleFunc("GET /metrics", reg.Handler()) + logger.Info("metrics endpoint registered", "path", "/metrics") + addr := ":" + port watchIntervalLog := "disabled" if watchInterval > 0 { @@ -249,7 +259,7 @@ func main() { "watch_interval", watchIntervalLog, "mcp_enabled", true, ) - if err := http.ListenAndServe(addr, mux); err != nil { + if err := http.ListenAndServe(addr, reg.Middleware(mux)); err != nil { logger.Error("server stopped", "err", err) os.Exit(1) } diff --git a/ingestion/internal/metrics/metrics.go b/ingestion/internal/metrics/metrics.go new file mode 100644 index 0000000..3ace296 --- /dev/null +++ b/ingestion/internal/metrics/metrics.go @@ -0,0 +1,194 @@ +// Package metrics is a tiny Prometheus exposition layer. +// +// Hand-rolled rather than pulling in github.com/prometheus/client_golang +// to keep ingestion's dependency surface minimal (stdlib + jwx + testify +// per the repo CLAUDE.md). The single histogram + counter it emits cover +// the canary alert wired in k3s/apps/monitoring/ — see infra#50. +// +// Wire format follows the OpenMetrics text exposition that +// kube-prometheus-stack scrapes by default. +package metrics + +import ( + "fmt" + "net/http" + "sort" + "strings" + "sync" + "sync/atomic" + "time" +) + +// histogram buckets in seconds. Tuned for in-cluster HTTP API +// latencies: BM25 query is sub-10ms, hybrid retrieval + LLM-synthesis +// can run into seconds. +Inf catch-all is implicit. +var defaultBuckets = []float64{ + 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, +} + +// Registry holds one histogram (request latency) labeled by path + status +// and one counter (request total) with the same labels. Concurrent-safe. +type Registry struct { + mu sync.RWMutex + series map[labelKey]*series + buckets []float64 +} + +type labelKey struct{ path, status string } + +type series struct { + // One atomic counter per bucket (counts of observations ≤ bucket). + // counts[len(buckets)] = +Inf bucket (== total observations). + counts []atomic.Uint64 + sumNs atomic.Uint64 // sum of durations in nanoseconds +} + +// New returns a Registry pre-populated with no series; the first +// observation per (path, status) lazy-creates one. +func New() *Registry { + return &Registry{ + series: make(map[labelKey]*series), + buckets: defaultBuckets, + } +} + +// Observe records a single request duration for the given path + status. +func (r *Registry) Observe(path, status string, d time.Duration) { + key := labelKey{path: path, status: status} + + r.mu.RLock() + s := r.series[key] + r.mu.RUnlock() + + if s == nil { + r.mu.Lock() + s = r.series[key] + if s == nil { + s = &series{counts: make([]atomic.Uint64, len(r.buckets)+1)} + r.series[key] = s + } + r.mu.Unlock() + } + + secs := d.Seconds() + for i, b := range r.buckets { + if secs <= b { + s.counts[i].Add(1) + } + } + // +Inf bucket always increments. + s.counts[len(r.buckets)].Add(1) + s.sumNs.Add(uint64(d.Nanoseconds())) +} + +// Middleware wraps next, observing every request's duration + status. +// The metric label `path` uses the request's Pattern (Go 1.22+ ServeMux), +// falling back to the URL path if no Pattern is set. Pattern keeps +// cardinality bounded (one series per route, not one per unique URL). +func (r *Registry) Middleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + rec := &statusRecorder{ResponseWriter: w, code: http.StatusOK} + start := time.Now() + next.ServeHTTP(rec, req) + path := req.Pattern + if path == "" { + path = req.URL.Path + } + r.Observe(path, statusClass(rec.code), time.Since(start)) + }) +} + +// Handler exposes /metrics in OpenMetrics text format. +func (r *Registry) Handler() http.HandlerFunc { + return func(w http.ResponseWriter, req *http.Request) { + w.Header().Set("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + r.write(w) + } +} + +func (r *Registry) write(w http.ResponseWriter) { + r.mu.RLock() + defer r.mu.RUnlock() + + _, _ = fmt.Fprintln(w, "# HELP brain_query_duration_seconds Brain HTTP API request latency in seconds.") + _, _ = fmt.Fprintln(w, "# TYPE brain_query_duration_seconds histogram") + + // Sort keys for stable output (helps diffing scrape responses). + keys := make([]labelKey, 0, len(r.series)) + for k := range r.series { + keys = append(keys, k) + } + sort.Slice(keys, func(i, j int) bool { + if keys[i].path != keys[j].path { + return keys[i].path < keys[j].path + } + return keys[i].status < keys[j].status + }) + + for _, k := range keys { + s := r.series[k] + labels := fmt.Sprintf(`path=%q,status=%q`, k.path, k.status) + for i, b := range r.buckets { + _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=%q} %d\n", + labels, formatBucket(b), s.counts[i].Load()) + } + // +Inf bucket + inf := s.counts[len(r.buckets)].Load() + _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_bucket{%s,le=\"+Inf\"} %d\n", labels, inf) + _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_sum{%s} %s\n", + labels, formatSeconds(s.sumNs.Load())) + _, _ = fmt.Fprintf(w, "brain_query_duration_seconds_count{%s} %d\n", labels, inf) + } +} + +func formatBucket(b float64) string { + // Match Prometheus convention: no trailing zeros. + s := fmt.Sprintf("%g", b) + if !strings.ContainsAny(s, ".e") { + s = s + ".0" + } + return s +} + +func formatSeconds(ns uint64) string { + return fmt.Sprintf("%g", float64(ns)/1e9) +} + +func statusClass(code int) string { + switch { + case code >= 200 && code < 300: + return "2xx" + case code >= 300 && code < 400: + return "3xx" + case code >= 400 && code < 500: + return "4xx" + case code >= 500 && code < 600: + return "5xx" + default: + return "xxx" + } +} + +// statusRecorder captures the response code so middleware can label +// the histogram by status class without buffering the body. +type statusRecorder struct { + http.ResponseWriter + code int + wroteHeader bool +} + +func (r *statusRecorder) WriteHeader(code int) { + if r.wroteHeader { + return + } + r.code = code + r.wroteHeader = true + r.ResponseWriter.WriteHeader(code) +} + +func (r *statusRecorder) Write(b []byte) (int, error) { + if !r.wroteHeader { + r.WriteHeader(http.StatusOK) + } + return r.ResponseWriter.Write(b) +} diff --git a/ingestion/internal/metrics/metrics_test.go b/ingestion/internal/metrics/metrics_test.go new file mode 100644 index 0000000..c437905 --- /dev/null +++ b/ingestion/internal/metrics/metrics_test.go @@ -0,0 +1,119 @@ +package metrics + +import ( + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestRegistry_ObserveAndExpose(t *testing.T) { + t.Parallel() + + r := New() + // Three observations on the same series; one falls into each + // representative band. + r.Observe("/query", "2xx", 4*time.Millisecond) // ≤ 5ms + r.Observe("/query", "2xx", 20*time.Millisecond) // ≤ 25ms + r.Observe("/query", "2xx", 600*time.Millisecond) // ≤ 1s + + req := httptest.NewRequest(http.MethodGet, "/metrics", nil) + rec := httptest.NewRecorder() + r.Handler().ServeHTTP(rec, req) + + body := rec.Body.String() + + mustContain := []string{ + `# TYPE brain_query_duration_seconds histogram`, + `brain_query_duration_seconds_bucket{path="/query",status="2xx",le="0.005"} 1`, + `brain_query_duration_seconds_bucket{path="/query",status="2xx",le="0.025"} 2`, + `brain_query_duration_seconds_bucket{path="/query",status="2xx",le="1.0"} 3`, + `brain_query_duration_seconds_bucket{path="/query",status="2xx",le="+Inf"} 3`, + `brain_query_duration_seconds_count{path="/query",status="2xx"} 3`, + } + for _, want := range mustContain { + if !strings.Contains(body, want) { + t.Errorf("missing line: %q\n--- body ---\n%s", want, body) + } + } + + if got := rec.Header().Get("Content-Type"); !strings.HasPrefix(got, "text/plain") { + t.Errorf("content-type = %q, want text/plain prefix", got) + } +} + +func TestRegistry_LabelsByStatus(t *testing.T) { + t.Parallel() + + r := New() + r.Observe("/query", "2xx", time.Millisecond) + r.Observe("/query", "5xx", time.Millisecond) + r.Observe("/write", "2xx", time.Millisecond) + + rec := httptest.NewRecorder() + r.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil)) + body := rec.Body.String() + + for _, want := range []string{ + `brain_query_duration_seconds_count{path="/query",status="2xx"} 1`, + `brain_query_duration_seconds_count{path="/query",status="5xx"} 1`, + `brain_query_duration_seconds_count{path="/write",status="2xx"} 1`, + } { + if !strings.Contains(body, want) { + t.Errorf("missing %q in body:\n%s", want, body) + } + } +} + +func TestMiddleware_RecordsTiming(t *testing.T) { + t.Parallel() + + r := New() + handler := r.Middleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + time.Sleep(2 * time.Millisecond) + w.WriteHeader(http.StatusOK) + _, _ = io.WriteString(w, "ok") + })) + + srv := httptest.NewServer(handler) + defer srv.Close() + + resp, err := http.Get(srv.URL + "/query") + if err != nil { + t.Fatalf("get: %v", err) + } + _ = resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + t.Fatalf("status %d, want 200", resp.StatusCode) + } + + // Exposition should now include /query. + rec := httptest.NewRecorder() + r.Handler().ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/metrics", nil)) + body := rec.Body.String() + if !strings.Contains(body, `path="/query"`) { + t.Errorf("expected /query series, got body:\n%s", body) + } + if !strings.Contains(body, `status="2xx"`) { + t.Errorf("expected 2xx status class, got body:\n%s", body) + } +} + +func TestStatusRecorder_DefaultsTo200(t *testing.T) { + t.Parallel() + + r := New() + handler := r.Middleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte("hello")) + })) + + rec := httptest.NewRecorder() + handler.ServeHTTP(rec, httptest.NewRequest(http.MethodGet, "/x", nil)) + + if rec.Code != http.StatusOK { + t.Errorf("code %d, want 200", rec.Code) + } +}