Add BranchExists/CreateBranch/UpsertFile gitea client methods and the file_write_branch MCP tool. Branch is auto-created from base (or repo default_branch) when it doesn't exist; file is upserted via PUT contents. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
174 lines
6.1 KiB
Go
174 lines
6.1 KiB
Go
package tools_test
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"io"
|
|
"net/http"
|
|
"net/http/httptest"
|
|
"sync/atomic"
|
|
"testing"
|
|
|
|
"gitea.d-ma.be/mathias/gitea-mcp/internal/allowlist"
|
|
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
|
|
"gitea.d-ma.be/mathias/gitea-mcp/internal/tools"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
const branchCheckExistsResp = `{"name":"feat/x","commit":{"id":"abc","url":"http://example.com"}}`
|
|
const createBranchResp = `{"name":"feat/new","commit":{"id":"abc","url":"http://example.com"}}`
|
|
const upsertFileResp = `{"content":{"path":"doc.md","sha":"filsha","html_url":"http://example.com/doc.md"},"commit":{"sha":"cmt1","html_url":"http://example.com/commit/cmt1"}}`
|
|
const getRepoResp = `{"name":"myrepo","full_name":"owner/myrepo","default_branch":"main"}`
|
|
|
|
func TestFileWriteBranchCreatesBranchAndFile(t *testing.T) {
|
|
mux := http.NewServeMux()
|
|
|
|
// Branch check → 404 (branch doesn't exist)
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method == http.MethodGet {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
_, _ = w.Write([]byte(`{"message":"branch not found"}`))
|
|
}
|
|
})
|
|
|
|
// Create branch → 201
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
|
|
require.Equal(t, http.MethodPost, r.Method)
|
|
w.WriteHeader(http.StatusCreated)
|
|
_, _ = w.Write([]byte(createBranchResp))
|
|
})
|
|
|
|
// Upsert file → 201
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
|
|
require.Equal(t, http.MethodPut, r.Method)
|
|
w.WriteHeader(http.StatusCreated)
|
|
_, _ = w.Write([]byte(upsertFileResp))
|
|
})
|
|
|
|
srv := httptest.NewServer(mux)
|
|
defer srv.Close()
|
|
|
|
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
|
|
out, err := tool.Call(context.Background(), json.RawMessage(`{
|
|
"owner":"owner","name":"myrepo","path":"doc.md",
|
|
"content":"hello","branch":"feat/new","base":"main",
|
|
"message":"add doc.md"
|
|
}`))
|
|
require.NoError(t, err)
|
|
|
|
var result map[string]any
|
|
require.NoError(t, json.Unmarshal(out, &result))
|
|
assert.Equal(t, "feat/new", result["branch"])
|
|
assert.Equal(t, "doc.md", result["path"])
|
|
assert.Equal(t, "cmt1", result["commit_sha"])
|
|
}
|
|
|
|
func TestFileWriteBranchUsesDefaultBaseWhenBaseEmpty(t *testing.T) {
|
|
var createBody []byte
|
|
mux := http.NewServeMux()
|
|
|
|
// Branch check → 404
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/new", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusNotFound)
|
|
_, _ = w.Write([]byte(`{"message":"not found"}`))
|
|
})
|
|
|
|
// GET repo (to resolve default_branch)
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo", func(w http.ResponseWriter, r *http.Request) {
|
|
require.Equal(t, http.MethodGet, r.Method)
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(getRepoResp))
|
|
})
|
|
|
|
// Create branch → capture body to assert old_branch_name
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
|
|
require.Equal(t, http.MethodPost, r.Method)
|
|
var err error
|
|
createBody, err = io.ReadAll(r.Body)
|
|
require.NoError(t, err)
|
|
w.WriteHeader(http.StatusCreated)
|
|
_, _ = w.Write([]byte(createBranchResp))
|
|
})
|
|
|
|
// Upsert file
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusCreated)
|
|
_, _ = w.Write([]byte(upsertFileResp))
|
|
})
|
|
|
|
srv := httptest.NewServer(mux)
|
|
defer srv.Close()
|
|
|
|
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
|
|
out, err := tool.Call(context.Background(), json.RawMessage(`{
|
|
"owner":"owner","name":"myrepo","path":"doc.md",
|
|
"content":"hello","branch":"feat/new",
|
|
"message":"add doc.md"
|
|
}`))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, out)
|
|
|
|
var payload map[string]string
|
|
require.NoError(t, json.Unmarshal(createBody, &payload))
|
|
assert.Equal(t, "main", payload["old_branch_name"])
|
|
assert.Equal(t, "feat/new", payload["new_branch_name"])
|
|
}
|
|
|
|
func TestFileWriteBranchSkipsCreateWhenBranchExists(t *testing.T) {
|
|
var createCallCount atomic.Int32
|
|
mux := http.NewServeMux()
|
|
|
|
// Branch check → 200 (branch exists)
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches/feat/existing", func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
_, _ = w.Write([]byte(branchCheckExistsResp))
|
|
})
|
|
|
|
// Create branch — should NOT be called
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/branches", func(w http.ResponseWriter, r *http.Request) {
|
|
createCallCount.Add(1)
|
|
w.WriteHeader(http.StatusCreated)
|
|
_, _ = w.Write([]byte(createBranchResp))
|
|
})
|
|
|
|
// Upsert file
|
|
mux.HandleFunc("/api/v1/repos/owner/myrepo/contents/doc.md", func(w http.ResponseWriter, r *http.Request) {
|
|
w.WriteHeader(http.StatusCreated)
|
|
_, _ = w.Write([]byte(upsertFileResp))
|
|
})
|
|
|
|
srv := httptest.NewServer(mux)
|
|
defer srv.Close()
|
|
|
|
tool := tools.NewFileWriteBranch(gitea.NewClient(srv.URL, "tok"), allowlist.New([]string{"owner"}))
|
|
out, err := tool.Call(context.Background(), json.RawMessage(`{
|
|
"owner":"owner","name":"myrepo","path":"doc.md",
|
|
"content":"hello","branch":"feat/existing",
|
|
"message":"update doc.md"
|
|
}`))
|
|
require.NoError(t, err)
|
|
require.NotNil(t, out)
|
|
|
|
assert.Equal(t, int32(0), createCallCount.Load(), "POST /branches should not be called when branch exists")
|
|
}
|
|
|
|
func TestFileWriteBranchAllowlistRejects(t *testing.T) {
|
|
tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"allowed"}))
|
|
_, err := tool.Call(context.Background(), json.RawMessage(`{
|
|
"owner":"evil","name":"repo","path":"f.md",
|
|
"content":"x","branch":"feat/x","message":"msg"
|
|
}`))
|
|
require.Error(t, err)
|
|
}
|
|
|
|
func TestFileWriteBranchRequiresMessage(t *testing.T) {
|
|
tool := tools.NewFileWriteBranch(gitea.NewClient("http://unused", ""), allowlist.New([]string{"owner"}))
|
|
_, err := tool.Call(context.Background(), json.RawMessage(`{
|
|
"owner":"owner","name":"repo","path":"f.md",
|
|
"content":"x","branch":"feat/x"
|
|
}`))
|
|
require.Error(t, err)
|
|
assert.ErrorIs(t, err, gitea.ErrValidation)
|
|
}
|