Files
gitea-mcp/internal/tools/code_search_test.go
Mathias Bergqvist 2c6b9986e4 feat(tools): code_search (org-wide fan-out)
When repo is omitted, lists owner's repos then concurrently searches
each one (semaphore cap 5, 5s per-repo timeout). Merges and sorts
hits by score desc with deterministic tiebreak. Partial failures
tracked in partial_repos without aborting the whole fan-out.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-04 22:48:39 +02:00

188 lines
7.0 KiB
Go

package tools_test
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCodeSearchSingleRepo(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/mathias/infra/search", r.URL.Path)
assert.Equal(t, "ListRepos", r.URL.Query().Get("q"))
assert.Equal(t, "code", r.URL.Query().Get("type"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"data":[{
"path":"internal/gitea/repos.go",
"snippet":"func (c *Client) ListRepos",
"html_url":"http://gitea.example.com/mathias/infra/src/branch/main/internal/gitea/repos.go",
"score":3.0
}],
"ok":true
}`))
}))
defer srv.Close()
tool := tools.NewCodeSearch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"q":"ListRepos","owner":"mathias","repo":"infra"}`))
require.NoError(t, err)
var result struct {
Results []struct {
Repo string `json:"repo"`
Path string `json:"path"`
Snippet string `json:"snippet"`
Score float64 `json:"score"`
} `json:"results"`
}
require.NoError(t, json.Unmarshal(out, &result))
require.Len(t, result.Results, 1)
assert.Equal(t, "mathias/infra", result.Results[0].Repo)
assert.Equal(t, "internal/gitea/repos.go", result.Results[0].Path)
assert.Equal(t, "func (c *Client) ListRepos", result.Results[0].Snippet)
}
func TestCodeSearchAllowlistRejects(t *testing.T) {
tool := tools.NewCodeSearch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"q":"foo","owner":"evil","repo":"infra"}`))
require.Error(t, err)
}
func TestCodeSearchRequiresQ(t *testing.T) {
tool := tools.NewCodeSearch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"mathias"}))
_, err := tool.Call(context.Background(), json.RawMessage(`{"owner":"mathias","repo":"infra"}`))
require.Error(t, err)
assert.True(t, errors.Is(err, gitea.ErrValidation))
}
func TestCodeSearchFanOutHappyPath(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.URL.Path {
case "/api/v1/users/mathias/repos":
_, _ = w.Write([]byte(`[
{"name":"infra","full_name":"mathias/infra","default_branch":"main"},
{"name":"gitea-mcp","full_name":"mathias/gitea-mcp","default_branch":"main"}
]`))
case "/api/v1/repos/mathias/infra/search":
_, _ = w.Write([]byte(`{"data":[{"path":"main.go","snippet":"infra hit","html_url":"http://x/infra/main.go","score":2.0}],"ok":true}`))
case "/api/v1/repos/mathias/gitea-mcp/search":
_, _ = w.Write([]byte(`{"data":[{"path":"cmd/main.go","snippet":"gitea-mcp hit","html_url":"http://x/gitea-mcp/main.go","score":1.0}],"ok":true}`))
default:
http.NotFound(w, r)
}
}))
defer srv.Close()
tool := tools.NewCodeSearch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"q":"hit","owner":"mathias"}`))
require.NoError(t, err)
var result struct {
Results []struct {
Repo string `json:"repo"`
Path string `json:"path"`
Snippet string `json:"snippet"`
} `json:"results"`
Partial bool `json:"partial"`
}
require.NoError(t, json.Unmarshal(out, &result))
assert.False(t, result.Partial)
require.Len(t, result.Results, 2)
repos := make([]string, 0, 2)
for _, r := range result.Results {
repos = append(repos, r.Repo)
}
assert.Contains(t, repos, "mathias/infra")
assert.Contains(t, repos, "mathias/gitea-mcp")
}
func TestCodeSearchFanOutPartialFailure(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.URL.Path {
case "/api/v1/users/mathias/repos":
_, _ = w.Write([]byte(`[
{"name":"infra","full_name":"mathias/infra","default_branch":"main"},
{"name":"broken","full_name":"mathias/broken","default_branch":"main"}
]`))
case "/api/v1/repos/mathias/infra/search":
_, _ = w.Write([]byte(`{"data":[{"path":"main.go","snippet":"infra hit","html_url":"http://x/infra/main.go","score":1.0}],"ok":true}`))
case "/api/v1/repos/mathias/broken/search":
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(`{"message":"internal error"}`))
default:
http.NotFound(w, r)
}
}))
defer srv.Close()
tool := tools.NewCodeSearch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"q":"hit","owner":"mathias"}`))
require.NoError(t, err)
var result struct {
Results []struct{ Repo string `json:"repo"` } `json:"results"`
Partial bool `json:"partial"`
PartialRepos []string `json:"partial_repos"`
}
require.NoError(t, json.Unmarshal(out, &result))
assert.True(t, result.Partial)
require.Len(t, result.PartialRepos, 1)
assert.Equal(t, "mathias/broken", result.PartialRepos[0])
require.Len(t, result.Results, 1)
assert.Equal(t, "mathias/infra", result.Results[0].Repo)
}
func TestCodeSearchFanOutSortsByScore(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch r.URL.Path {
case "/api/v1/users/mathias/repos":
_, _ = w.Write([]byte(`[
{"name":"alpha","full_name":"mathias/alpha","default_branch":"main"},
{"name":"beta","full_name":"mathias/beta","default_branch":"main"}
]`))
case "/api/v1/repos/mathias/alpha/search":
// low score
_, _ = w.Write([]byte(`{"data":[{"path":"a.go","snippet":"low","html_url":"http://x/alpha/a.go","score":1.0}],"ok":true}`))
case "/api/v1/repos/mathias/beta/search":
// high score
_, _ = w.Write([]byte(`{"data":[{"path":"b.go","snippet":"high","html_url":"http://x/beta/b.go","score":5.0}],"ok":true}`))
default:
http.NotFound(w, r)
}
}))
defer srv.Close()
tool := tools.NewCodeSearch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"mathias"}))
out, err := tool.Call(context.Background(), json.RawMessage(`{"q":"something","owner":"mathias"}`))
require.NoError(t, err)
var result struct {
Results []struct {
Snippet string `json:"snippet"`
Score float64 `json:"score"`
} `json:"results"`
}
require.NoError(t, json.Unmarshal(out, &result))
require.Len(t, result.Results, 2)
// First result must be the high-score one
assert.True(t, result.Results[0].Score > result.Results[1].Score,
"expected results sorted by score desc, got %v then %v",
result.Results[0].Score, result.Results[1].Score)
assert.True(t, strings.Contains(result.Results[0].Snippet, "high"))
}