Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions internal/gitproto/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/go-git/go-git/v6/plumbing/protocol/packp/sideband"
"github.com/go-git/go-git/v6/plumbing/storer"
"github.com/go-git/go-git/v6/plumbing/transport"

"github.com/entirehq/git-sync/pkg/gitsync/syncerr"
)

// PushCommand represents a single ref update command.
Expand Down Expand Up @@ -138,13 +140,30 @@ func sendReceivePack(
if err := report.Decode(respReader); err != nil {
return fmt.Errorf("decode report-status: %w", err)
}
if err := report.Error(); err != nil {
return fmt.Errorf("report-status: %w", err)
if reportErr := buildReportError(report); reportErr != nil {
return reportErr
}
}
return nil
}

func buildReportError(report *packp.ReportStatus) *syncerr.PushReportError {
if report.UnpackStatus != "ok" {
return &syncerr.PushReportError{UnpackStatus: report.UnpackStatus}
}
var failures []syncerr.PushRefFailure
for _, cs := range report.CommandStatuses {
if cs.Status == "ok" {
continue
}
failures = append(failures, syncerr.PushRefFailure{Ref: cs.ReferenceName.String(), Status: cs.Status})
}
if len(failures) == 0 {
return nil
}
return &syncerr.PushReportError{Failures: failures}
}

// PushObjects pushes locally-materialized objects to the target.
func PushObjects(
ctx context.Context,
Expand Down
78 changes: 78 additions & 0 deletions internal/gitproto/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gitproto
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"net/http/httptest"
Expand All @@ -15,6 +16,8 @@ import (
"github.com/go-git/go-git/v6/plumbing/protocol/packp/capability"
"github.com/go-git/go-git/v6/plumbing/transport"
"github.com/stretchr/testify/require"

"github.com/entirehq/git-sync/pkg/gitsync/syncerr"
)

func TestPrefixedLineWriter(t *testing.T) {
Expand Down Expand Up @@ -318,6 +321,81 @@ func TestBuildUpdateRequestDeleteWithoutCapability(t *testing.T) {
}
}

func TestPushPackReturnsPushReportErrorForPerRefFailures(t *testing.T) {
refA := plumbing.ReferenceName("refs/heads/main")
refB := plumbing.ReferenceName("refs/heads/feature")

srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if _, err := io.Copy(io.Discard, r.Body); err != nil {
t.Logf("drain request body: %v", err)
}
_ = r.Body.Close()

w.Header().Set("Content-Type", "application/x-git-receive-pack-result")
w.WriteHeader(http.StatusOK)

report := packp.NewReportStatus()
report.UnpackStatus = "ok"
report.CommandStatuses = []*packp.CommandStatus{
{ReferenceName: refA, Status: "remote ref has changed"},
{ReferenceName: refB, Status: "already exists"},
}
if err := report.Encode(w); err != nil {
t.Logf("encode report: %v", err)
}
}))
defer srv.Close()

pack := &trackingReadCloser{ReadCloser: io.NopCloser(bytes.NewBufferString("PACK"))}
conn := connForServer(t, srv)
adv := packp.NewAdvRefs()
adv.Capabilities = capability.NewList()
require.NoError(t, adv.Capabilities.Set(capability.ReportStatus))

err := PushPack(context.Background(), conn, adv, []PushCommand{
{Name: refA, New: plumbing.NewHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")},
{Name: refB, New: plumbing.NewHash("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb")},
}, pack, false)
if err == nil {
t.Fatal("expected PushPack to return an error")
}

var prErr *syncerr.PushReportError
if !errors.As(err, &prErr) {
t.Fatalf("expected *syncerr.PushReportError, got %T: %v", err, err)
}
if prErr.UnpackStatus != "" {
t.Errorf("unexpected UnpackStatus %q; expected empty for per-ref failures", prErr.UnpackStatus)
}
if len(prErr.Failures) != 2 {
t.Fatalf("expected 2 failures, got %d: %+v", len(prErr.Failures), prErr.Failures)
}
got := map[string]string{}
for _, f := range prErr.Failures {
got[f.Ref] = f.Status
}
if got[refA.String()] != "remote ref has changed" {
t.Errorf("ref %s status: want %q, got %q", refA, "remote ref has changed", got[refA.String()])
}
if got[refB.String()] != "already exists" {
t.Errorf("ref %s status: want %q, got %q", refB, "already exists", got[refB.String()])
}
}

func TestBuildReportErrorTreatsEmptyUnpackStatusAsFatal(t *testing.T) {
// A malformed / degraded receive-pack response with an empty unpack
// status must be treated as failure (matches go-git's ReportStatus.Error
// semantics), not silently passed through.
report := &packp.ReportStatus{UnpackStatus: ""}
got := buildReportError(report)
if got == nil {
t.Fatal("expected non-nil PushReportError for empty unpack status; empty is not 'ok'")
}
if got.UnpackStatus != "" {
t.Errorf("UnpackStatus: want empty (propagated), got %q", got.UnpackStatus)
}
}

func TestPushPackRejectsDeletes(t *testing.T) {
pack := &trackingReadCloser{ReadCloser: io.NopCloser(bytes.NewBufferString("PACK"))}
// PushPack should reject delete commands before even trying to connect.
Expand Down
19 changes: 18 additions & 1 deletion internal/gitproto/smarthttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,19 @@ import (

const maxHTTPErrorBody = 64 * 1024

// HTTPError is the error returned by httpError for non-2xx responses. Callers
// can use errors.As to inspect StatusCode directly instead of parsing the
// formatted string.
type HTTPError struct {
StatusCode int
URL string
Reason string
}

func (e *HTTPError) Error() string {
return fmt.Sprintf("http %d: %s %s", e.StatusCode, e.URL, e.Reason)
}

// httpError checks an HTTP response status and returns an error for non-2xx responses.
func httpError(res *http.Response) error {
if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusMultipleChoices {
Expand All @@ -31,7 +44,11 @@ func httpError(res *http.Response) error {
reason = string(data)
}
}
return fmt.Errorf("http %d: %s %s", res.StatusCode, res.Request.URL.Redacted(), reason)
return &HTTPError{
StatusCode: res.StatusCode,
URL: res.Request.URL.Redacted(),
Reason: reason,
}
}

// StatsPhaseHeader is the HTTP header used to annotate requests with the
Expand Down
30 changes: 30 additions & 0 deletions internal/gitproto/smarthttp_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package gitproto

import (
"bytes"
"context"
"errors"
"io"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -192,6 +194,34 @@ func TestHTTPErrorBoundsBodyRead(t *testing.T) {
}
}

func TestHTTPErrorTypedStatusCode(t *testing.T) {
req, err := http.NewRequestWithContext(t.Context(), http.MethodGet, "https://example.com/repo.git/info/refs", nil)
if err != nil {
t.Fatalf("NewRequestWithContext: %v", err)
}
res := &http.Response{
StatusCode: http.StatusNotFound,
Request: req,
Body: io.NopCloser(bytes.NewReader([]byte("Repository not found."))),
}

err = httpError(res)
if err == nil {
t.Fatal("expected error")
}

var he *HTTPError
if !errors.As(err, &he) {
t.Fatalf("expected *HTTPError via errors.As, got %T: %v", err, err)
}
if he.StatusCode != http.StatusNotFound {
t.Errorf("StatusCode = %d, want %d", he.StatusCode, http.StatusNotFound)
}
if !strings.Contains(he.Error(), "http 404") {
t.Errorf("Error() = %q, want to contain \"http 404\"", he.Error())
}
}

type roundTripperFunc func(*http.Request) (*http.Response, error)

func (f roundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) {
Expand Down
51 changes: 25 additions & 26 deletions internal/strategy/incremental/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,43 +55,42 @@ func Execute(ctx context.Context, p Params, cfg planner.PlanConfig) (Result, err
}
cmds := convert.PlansToPushCommands(p.PushPlans)
if ok, reason := canRelay(cfg.Force, cfg.Prune, false, p.PushPlans); ok {
desired := convert.DesiredRefsForPlans(p.DesiredRefs, p.PushPlans)
packReader, err := p.SourceService.FetchPack(ctx, p.SourceConn, desired, p.TargetRefs)
if err != nil {
return Result{}, fmt.Errorf("fetch source pack: %w", err)
}
packReader = gitproto.LimitPackReader(packReader, p.MaxPackBytes)
packReader = closeOnce(packReader)
if err := p.TargetPusher.PushPack(ctx, cmds, packReader); err != nil {
_ = packReader.Close()
return Result{}, fmt.Errorf("push target refs: %w", err)
}
_ = packReader.Close()
return Result{Relay: true, RelayMode: "incremental", RelayReason: reason}, nil
return p.relay(ctx, cmds, p.TargetRefs, reason, "fetch source pack")
}

if p.CanTagRelay == nil {
return Result{}, errors.New("incremental strategy requires CanTagRelay")
}
if ok, reason := p.CanTagRelay(p.PushPlans); ok {
desired := convert.DesiredRefsForPlans(p.DesiredRefs, p.PushPlans)
packReader, err := p.SourceService.FetchPack(ctx, p.SourceConn, desired, nil)
if err != nil {
return Result{}, fmt.Errorf("fetch source tag pack: %w", err)
}
packReader = gitproto.LimitPackReader(packReader, p.MaxPackBytes)
packReader = closeOnce(packReader)
if err := p.TargetPusher.PushPack(ctx, cmds, packReader); err != nil {
_ = packReader.Close()
return Result{}, fmt.Errorf("push target refs: %w", err)
}
_ = packReader.Close()
return Result{Relay: true, RelayMode: "incremental", RelayReason: reason}, nil
return p.relay(ctx, cmds, nil, reason, "fetch source tag pack")
}

return Result{}, nil
}

// relay fetches a pack from source with the given haves and pushes it to the
// target.
func (p Params) relay(
ctx context.Context,
cmds []gitproto.PushCommand,
haves map[plumbing.ReferenceName]plumbing.Hash,
reason, fetchErrPrefix string,
) (Result, error) {
desired := convert.DesiredRefsForPlans(p.DesiredRefs, p.PushPlans)
packReader, err := p.SourceService.FetchPack(ctx, p.SourceConn, desired, haves)
if err != nil {
return Result{}, fmt.Errorf("%s: %w", fetchErrPrefix, err)
}
packReader = gitproto.LimitPackReader(packReader, p.MaxPackBytes)
packReader = closeOnce(packReader)
if err := p.TargetPusher.PushPack(ctx, cmds, packReader); err != nil {
_ = packReader.Close()
return Result{}, fmt.Errorf("push target refs: %w", err)
}
_ = packReader.Close()
return Result{Relay: true, RelayMode: "incremental", RelayReason: reason}, nil
}

type closeOnceReadCloser struct {
io.ReadCloser

Expand Down
4 changes: 3 additions & 1 deletion internal/strategy/replicate/replicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Result struct {
RelayReason string
}

const ReasonOverwriteRelay = "replicate-overwrite-relay"

// Execute runs relay-only replication. Create/update refs are pushed via pack
// relay and deletes are sent afterwards as ref-only commands.
func Execute(ctx context.Context, p Params) (Result, error) {
Expand Down Expand Up @@ -81,7 +83,7 @@ func Execute(ctx context.Context, p Params) (Result, error) {
}
}

return Result{Relay: true, RelayMode: "replicate", RelayReason: "replicate-overwrite-relay"}, nil
return Result{Relay: true, RelayMode: "replicate", RelayReason: ReasonOverwriteRelay}, nil
}

type closeOnceReadCloser struct {
Expand Down
23 changes: 12 additions & 11 deletions internal/strategy/replicate/replicate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ func TestExecuteReplicateRelaysUpdatesAndDeletesSeparately(t *testing.T) {
var pushed []gitproto.PushCommand
var deleted []gitproto.PushCommand

tp := fakeTargetPusher{
pushPack: func(_ context.Context, cmds []gitproto.PushCommand, pack io.ReadCloser) error {
defer pack.Close()
pushed = append([]gitproto.PushCommand(nil), cmds...)
return nil
},
pushCommands: func(_ context.Context, cmds []gitproto.PushCommand) error {
deleted = append([]gitproto.PushCommand(nil), cmds...)
return nil
},
}
result, err := Execute(context.Background(), Params{
SourceService: fakeSourceService{
fetchPack: func(_ context.Context, _ *gitproto.Conn, desired map[plumbing.ReferenceName]gitproto.DesiredRef, targetRefs map[plumbing.ReferenceName]plumbing.Hash) (io.ReadCloser, error) {
Expand All @@ -57,17 +68,7 @@ func TestExecuteReplicateRelaysUpdatesAndDeletesSeparately(t *testing.T) {
return io.NopCloser(bytes.NewReader([]byte("PACK"))), nil
},
},
TargetPusher: fakeTargetPusher{
pushPack: func(_ context.Context, cmds []gitproto.PushCommand, pack io.ReadCloser) error {
defer pack.Close()
pushed = append([]gitproto.PushCommand(nil), cmds...)
return nil
},
pushCommands: func(_ context.Context, cmds []gitproto.PushCommand) error {
deleted = append([]gitproto.PushCommand(nil), cmds...)
return nil
},
},
TargetPusher: tp,
DesiredRefs: map[plumbing.ReferenceName]planner.DesiredRef{
mainRef: {
SourceRef: mainRef,
Expand Down
22 changes: 13 additions & 9 deletions pkg/gitsync/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,17 @@ func (c *Client) Plan(ctx context.Context, req PlanRequest) (PlanResult, error)
if err != nil {
return PlanResult{}, err
}
result, err := internalbridge.Run(ctx, cfg)
if err != nil {
return PlanResult{}, fmt.Errorf("plan: %w", err)
result, runErr := internalbridge.Run(ctx, cfg)
public := internalbridge.FromSyncResult(result)
if runErr != nil {
return public, fmt.Errorf("plan: %w", runErr)
}
return internalbridge.FromSyncResult(result), nil
return public, nil
}

// Sync executes a sync between two remotes.
// Sync executes a sync between two remotes. On error the returned result
// still carries any plans that were built before the failure, so callers
// can inspect per-ref actions (e.g. when reconciling a PushReportError).
func (c *Client) Sync(ctx context.Context, req SyncRequest) (SyncResult, error) {
if err := req.Validate(); err != nil {
return SyncResult{}, err
Expand All @@ -68,11 +71,12 @@ func (c *Client) Sync(ctx context.Context, req SyncRequest) (SyncResult, error)
if err != nil {
return SyncResult{}, err
}
result, err := internalbridge.Run(ctx, cfg)
if err != nil {
return SyncResult{}, fmt.Errorf("sync: %w", err)
result, runErr := internalbridge.Run(ctx, cfg)
public := internalbridge.FromSyncResult(result)
if runErr != nil {
return public, fmt.Errorf("sync: %w", runErr)
}
return internalbridge.FromSyncResult(result), nil
return public, nil
}

// Replicate executes source-authoritative relay-only replication between two remotes.
Expand Down
Loading
Loading