-
Notifications
You must be signed in to change notification settings - Fork 339
Expand file tree
/
Copy pathbatching_test.go
More file actions
119 lines (99 loc) · 3.48 KB
/
batching_test.go
File metadata and controls
119 lines (99 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package artifact
import (
"context"
"errors"
"fmt"
"net/http"
"testing"
"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/logger"
"github.com/stretchr/testify/require"
)
type stubArtifactAPIClient struct {
createFn func(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error)
updateFn func(context.Context, string, []api.ArtifactState) (*api.Response, error)
}
func (s *stubArtifactAPIClient) CreateArtifacts(ctx context.Context, jobID string, batch *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) {
if s.createFn == nil {
panic("unexpected CreateArtifacts call")
}
return s.createFn(ctx, jobID, batch)
}
func (s *stubArtifactAPIClient) SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error) {
return nil, nil, nil
}
func (s *stubArtifactAPIClient) UpdateArtifacts(ctx context.Context, jobID string, states []api.ArtifactState) (*api.Response, error) {
if s.updateFn == nil {
panic("unexpected UpdateArtifacts call")
}
return s.updateFn(ctx, jobID, states)
}
func TestBatchCreatorIncreasesCreateBatchSizeAfter429(t *testing.T) {
t.Parallel()
artifacts := make([]*api.Artifact, 12)
for i := range artifacts {
artifacts[i] = &api.Artifact{Path: fmt.Sprintf("artifact-%d", i)}
}
var call int
var batchSizes []int
apiClient := &stubArtifactAPIClient{
createFn: func(_ context.Context, _ string, batch *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) {
batchSizes = append(batchSizes, len(batch.Artifacts))
if call == 0 {
call++
return nil, &api.Response{Response: &http.Response{StatusCode: http.StatusTooManyRequests, Status: "429 Too Many Requests"}}, errors.New("rate limited")
}
ids := make([]string, len(batch.Artifacts))
for i := range ids {
ids[i] = fmt.Sprintf("id-%d-%d", call, i)
}
call++
return &api.ArtifactBatchCreateResponse{
ArtifactIDs: ids,
InstructionsTemplate: &api.ArtifactUploadInstructions{},
}, &api.Response{Response: &http.Response{StatusCode: http.StatusCreated, Status: "201 Created"}}, nil
},
}
creator := NewArtifactBatchCreator(logger.Discard, apiClient, BatchCreatorConfig{
JobID: "job-id",
Artifacts: artifacts,
CreateBatchSize: 5,
})
_, err := creator.Create(t.Context())
require.NoError(t, err)
require.Equal(t, []int{5, 5, 7}, batchSizes)
}
func TestUpdateStatesRespectsConfiguredBatchMax(t *testing.T) {
t.Parallel()
var updateSizes []int
apiClient := &stubArtifactAPIClient{
updateFn: func(_ context.Context, _ string, states []api.ArtifactState) (*api.Response, error) {
updateSizes = append(updateSizes, len(states))
return &api.Response{Response: &http.Response{StatusCode: http.StatusOK, Status: "200 OK"}}, nil
},
}
u := &Uploader{
conf: UploaderConfig{
JobID: "job-id",
UpdateBatchSizeMax: 2,
},
logger: logger.Discard,
apiClient: apiClient,
}
worker := &artifactUploadWorker{
Uploader: u,
trackers: map[*api.Artifact]*artifactTracker{},
}
for i := 0; i < 5; i++ {
artifact := &api.Artifact{ID: fmt.Sprintf("artifact-%d", i)}
worker.trackers[artifact] = &artifactTracker{
ArtifactState: api.ArtifactState{ID: artifact.ID, State: "finished"},
}
}
err := worker.updateStates(t.Context())
require.NoError(t, err)
require.Equal(t, []int{2, 2, 1}, updateSizes)
for _, tracker := range worker.trackers {
require.Equal(t, "sent", tracker.State)
}
}