Files
gitea-mcp/internal/gitea/files_test.go
2026-05-06 22:51:21 +02:00

268 lines
9.3 KiB
Go

package gitea_test
import (
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"testing"
"gitea.d-ma.be/mathias/gitea-mcp/internal/gitea"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestGetFileContents(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/mathias/infra/contents/README.md", r.URL.Path)
assert.Equal(t, "main", r.URL.Query().Get("ref"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"path":"README.md","sha":"deadbeef","size":13,"content":"SGVsbG8sIHdvcmxkIQ==","encoding":"base64"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
fc, err := c.GetFileContents(context.Background(), "mathias", "infra", "README.md", "main")
require.NoError(t, err)
assert.Equal(t, "README.md", fc.Path)
assert.Equal(t, "deadbeef", fc.Sha)
assert.Equal(t, int64(13), fc.Size)
assert.Equal(t, "SGVsbG8sIHdvcmxkIQ==", fc.Content)
}
func TestBranchExistsTrue(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches/main", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"name":"main","commit":{"id":"abc123","url":"http://example.com"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
exists, err := c.BranchExists(context.Background(), "o", "r", "main")
require.NoError(t, err)
assert.True(t, exists)
}
func TestBranchExistsFalseOn404(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches/nonexistent", r.URL.Path)
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"branch not found"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
exists, err := c.BranchExists(context.Background(), "o", "r", "nonexistent")
require.NoError(t, err)
assert.False(t, exists)
}
func TestCreateBranchSendsPayload(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches", r.URL.Path)
assert.Equal(t, http.MethodPost, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(`{"name":"feat/x","commit":{"id":"abc","url":"http://example.com"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
err := c.CreateBranch(context.Background(), "o", "r", "feat/x", "main")
require.NoError(t, err)
var payload map[string]string
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "feat/x", payload["new_branch_name"])
assert.Equal(t, "main", payload["old_branch_name"])
}
func TestListBranches(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches", r.URL.Path)
assert.Equal(t, "1", r.URL.Query().Get("page"))
assert.Equal(t, "30", r.URL.Query().Get("limit"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`[
{"name":"main","commit":{"id":"abc","url":"http://example.com"}},
{"name":"feat/x","commit":{"id":"def","url":"http://example.com"}}
]`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
branches, err := c.ListBranches(context.Background(), "o", "r", 0, 0)
require.NoError(t, err)
require.Len(t, branches, 2)
assert.Equal(t, "main", branches[0].Name)
assert.Equal(t, "abc", branches[0].Commit.ID)
assert.Equal(t, "feat/x", branches[1].Name)
}
func TestUpsertFileSendsPayloadAndDecodesResult(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/contents/p.md", r.URL.Path)
assert.Equal(t, http.MethodPut, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusCreated)
_, _ = w.Write([]byte(`{"content":{"path":"p.md","sha":"newsha","html_url":"http://example.com/p.md"},"commit":{"sha":"abc","html_url":"http://example.com/commit/abc"}}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
result, err := c.UpsertFile(context.Background(), "o", "r", "p.md", gitea.UpsertFileArgs{
Branch: "feat/x",
Content: "aGVsbG8=",
Message: "add p.md",
Sha: "oldsha",
})
require.NoError(t, err)
var payload map[string]string
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "feat/x", payload["branch"])
assert.Equal(t, "aGVsbG8=", payload["content"])
assert.Equal(t, "add p.md", payload["message"])
assert.Equal(t, "oldsha", payload["sha"])
assert.Equal(t, "p.md", result.Content.Path)
assert.Equal(t, "newsha", result.Content.Sha)
assert.Equal(t, "http://example.com/p.md", result.Content.HTMLURL)
assert.Equal(t, "abc", result.Commit.Sha)
}
func TestDeleteBranch(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branches/feat/x", r.URL.Path)
assert.Equal(t, http.MethodDelete, r.Method)
w.WriteHeader(http.StatusNoContent)
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
err := c.DeleteBranch(context.Background(), "o", "r", "feat/x")
require.NoError(t, err)
}
func TestDeleteBranchProtected(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusForbidden)
_, _ = w.Write([]byte(`{"message":"branch is protected"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
err := c.DeleteBranch(context.Background(), "o", "r", "main")
require.Error(t, err)
assert.ErrorIs(t, err, gitea.ErrPermissionDenied)
}
func TestGetBranchProtectionFound(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/branch_protections/main", r.URL.Path)
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{
"required_approvals": 2,
"push_whitelist_usernames": ["alice"],
"merge_whitelist_usernames": ["bob"]
}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
bp, err := c.GetBranchProtection(context.Background(), "o", "r", "main")
require.NoError(t, err)
assert.True(t, bp.Protected)
assert.Equal(t, int64(2), bp.RequiredApprovals)
assert.Equal(t, []string{"alice"}, bp.PushWhitelist)
assert.Equal(t, []string{"bob"}, bp.MergeWhitelist)
}
func TestGetBranchProtectionNotFoundReturnsUnprotected(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNotFound)
_, _ = w.Write([]byte(`{"message":"not found"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
bp, err := c.GetBranchProtection(context.Background(), "o", "r", "feat/x")
require.NoError(t, err)
assert.False(t, bp.Protected)
}
func TestListContentsDirectory(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/contents/src", r.URL.Path)
assert.Equal(t, "main", r.URL.Query().Get("ref"))
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`[
{"name":"main.go","path":"src/main.go","type":"file","sha":"abc","size":100},
{"name":"lib","path":"src/lib","type":"dir","sha":"def","size":0}
]`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
entries, err := c.ListContents(context.Background(), "o", "r", "src", "main")
require.NoError(t, err)
require.Len(t, entries, 2)
assert.Equal(t, "main.go", entries[0].Name)
assert.Equal(t, "file", entries[0].Type)
assert.Equal(t, "lib", entries[1].Name)
assert.Equal(t, "dir", entries[1].Type)
}
func TestListContentsOnFileReturnsError(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write([]byte(`{"path":"main.go","sha":"abc","size":100,"content":"","encoding":"base64"}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
_, err := c.ListContents(context.Background(), "o", "r", "main.go", "")
require.Error(t, err)
assert.ErrorIs(t, err, gitea.ErrValidation)
}
func TestDeleteFile(t *testing.T) {
var captured []byte
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, "/api/v1/repos/o/r/contents/src/old.go", r.URL.Path)
assert.Equal(t, http.MethodDelete, r.Method)
var err error
captured, err = io.ReadAll(r.Body)
require.NoError(t, err)
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(`{
"content":null,
"commit":{"sha":"cmt1","html_url":"http://example.com/commit/cmt1"}
}`))
}))
defer srv.Close()
c := gitea.NewClient(srv.URL, "tok")
result, err := c.DeleteFile(context.Background(), "o", "r", "src/old.go", gitea.DeleteFileArgs{
Branch: "main",
Message: "remove old.go",
Sha: "blobsha",
})
require.NoError(t, err)
assert.Equal(t, "cmt1", result.Commit.Sha)
var payload map[string]string
require.NoError(t, json.Unmarshal(captured, &payload))
assert.Equal(t, "main", payload["branch"])
assert.Equal(t, "remove old.go", payload["message"])
assert.Equal(t, "blobsha", payload["sha"])
}