Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions agent/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_library(
go_test(
name = "agent_test",
srcs = [
"artifact_batch_hints_test.go",
"agent_worker_test.go",
"fake_api_server_test.go",
"gcp_meta_data_test.go",
Expand Down
53 changes: 53 additions & 0 deletions agent/agent_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,20 @@ type AgentWorker struct {
// The time when this agent worker started
startTime time.Time

// Server-provided hints for artifact upload batching, updated from ping.
artifactBatchHintsMu sync.RWMutex
artifactCreateBatchSize int
artifactUpdateBatchSizeMax int

// disable the delay between pings, to speed up certain testing scenarios
noWaitBetweenPingsForTesting bool
}

const (
artifactCreateBatchSizeEnv = "BUILDKITE_ARTIFACT_CREATE_BATCH_SIZE"
artifactUpdateBatchSizeMaxEnv = "BUILDKITE_ARTIFACT_UPDATE_BATCH_SIZE_MAX"
)

type agentWorkerState string

const (
Expand Down Expand Up @@ -505,6 +515,8 @@ func (a *AgentWorker) AcquireAndRunJob(ctx context.Context, jobId string) error
}

func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job, ignoreAgentInDispatches *bool) error {
a.applyArtifactBatchHintsToJob(acceptResponse)

a.setBusy(acceptResponse.ID)
defer a.setIdle()

Expand Down Expand Up @@ -549,6 +561,47 @@ func (a *AgentWorker) RunJob(ctx context.Context, acceptResponse *api.Job, ignor
return nil
}

func (a *AgentWorker) setArtifactBatchHintsFromPing(ping *api.Ping) {
if ping == nil {
return
}

a.artifactBatchHintsMu.Lock()
defer a.artifactBatchHintsMu.Unlock()

if ping.ArtifactCreateBatchSize > 0 {
a.artifactCreateBatchSize = ping.ArtifactCreateBatchSize
}
if ping.ArtifactUpdateBatchSizeMax > 0 {
a.artifactUpdateBatchSizeMax = ping.ArtifactUpdateBatchSizeMax
}
}

func (a *AgentWorker) artifactBatchHints() (createBatchSize, updateBatchSizeMax int) {
a.artifactBatchHintsMu.RLock()
defer a.artifactBatchHintsMu.RUnlock()

return a.artifactCreateBatchSize, a.artifactUpdateBatchSizeMax
}

func (a *AgentWorker) applyArtifactBatchHintsToJob(job *api.Job) {
createBatchSize, updateBatchSizeMax := a.artifactBatchHints()
if createBatchSize <= 0 && updateBatchSizeMax <= 0 {
return
}

if job.Env == nil {
job.Env = make(map[string]string)
}

if createBatchSize > 0 {
job.Env[artifactCreateBatchSizeEnv] = fmt.Sprint(createBatchSize)
}
if updateBatchSizeMax > 0 {
job.Env[artifactUpdateBatchSizeMaxEnv] = fmt.Sprint(updateBatchSizeMax)
}
}

// Disconnect notifies the Buildkite API that this agent worker/session is
// permanently disconnecting. Don't spend long retrying, because we want to
// disconnect as fast as possible.
Expand Down
3 changes: 3 additions & 0 deletions agent/agent_worker_ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (a *AgentWorker) Ping(ctx context.Context) (jobID, action string, err error
// once we've done that, we can do the error handling for pingErr

if ping != nil {
a.setArtifactBatchHintsFromPing(ping)

// Is there a message that should be shown in the logs?
if ping.Message != "" {
a.logger.Info(ping.Message)
Expand Down Expand Up @@ -258,6 +260,7 @@ func (a *AgentWorker) Ping(ctx context.Context) (jobID, action string, err error
a.apiClient = newAPIClient
a.agent.Endpoint = ping.Endpoint
ping = newPing
a.setArtifactBatchHintsFromPing(ping)
}
}

Expand Down
39 changes: 39 additions & 0 deletions agent/artifact_batch_hints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package agent

import (
"testing"

"github.com/buildkite/agent/v3/api"
"github.com/stretchr/testify/require"
)

func TestApplyArtifactBatchHintsToJobSetsEnv(t *testing.T) {
t.Parallel()

worker := &AgentWorker{}
worker.setArtifactBatchHintsFromPing(&api.Ping{
ArtifactCreateBatchSize: 60,
ArtifactUpdateBatchSizeMax: 240,
})

job := &api.Job{}
worker.applyArtifactBatchHintsToJob(job)

require.Equal(t, "60", job.Env[artifactCreateBatchSizeEnv])
require.Equal(t, "240", job.Env[artifactUpdateBatchSizeMaxEnv])
}

func TestSetArtifactBatchHintsFromPingIgnoresZeroValues(t *testing.T) {
t.Parallel()

worker := &AgentWorker{}
worker.setArtifactBatchHintsFromPing(&api.Ping{
ArtifactCreateBatchSize: 45,
ArtifactUpdateBatchSizeMax: 180,
})
worker.setArtifactBatchHintsFromPing(&api.Ping{})

createBatchSize, updateBatchSizeMax := worker.artifactBatchHints()
require.Equal(t, 45, createBatchSize)
require.Equal(t, 180, updateBatchSizeMax)
}
12 changes: 7 additions & 5 deletions api/pings.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import "context"

// Ping represents a Buildkite Agent API Ping
type Ping struct {
Action string `json:"action,omitempty"`
Message string `json:"message,omitempty"`
Job *Job `json:"job,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
RequestHeaders map[string]string `json:"request_headers,omitzero"` // omit nil, keep empty map
Action string `json:"action,omitempty"`
Message string `json:"message,omitempty"`
Job *Job `json:"job,omitempty"`
Endpoint string `json:"endpoint,omitempty"`
RequestHeaders map[string]string `json:"request_headers,omitzero"` // omit nil, keep empty map
ArtifactCreateBatchSize int `json:"artifact_create_batch_size,omitempty"`
ArtifactUpdateBatchSizeMax int `json:"artifact_update_batch_size_max,omitempty"`
}

// Pings the API and returns any work the client needs to perform
Expand Down
14 changes: 14 additions & 0 deletions clicommand/artifact_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ type ArtifactUploadConfig struct {
GlobResolveFollowSymlinks bool `cli:"glob-resolve-follow-symlinks"`
UploadSkipSymlinks bool `cli:"upload-skip-symlinks"`
NoMultipartUpload bool `cli:"no-multipart-artifact-upload"`
CreateBatchSize int `cli:"artifact-create-batch-size"`
UpdateBatchSizeMax int `cli:"artifact-update-batch-size-max"`

// deprecated
FollowSymlinks bool `cli:"follow-symlinks" deprecated-and-renamed-to:"GlobResolveFollowSymlinks"`
Expand Down Expand Up @@ -125,6 +127,16 @@ var ArtifactUploadCommand = cli.Command{
Usage: "After the glob has been resolved to a list of files to upload, skip uploading those that are symlinks to files (default: false)",
EnvVar: "BUILDKITE_ARTIFACT_UPLOAD_SKIP_SYMLINKS",
},
cli.IntFlag{
Name: "artifact-create-batch-size",
Usage: "Maximum number of artifacts to include in each create-artifacts API request (default: 30)",
EnvVar: "BUILDKITE_ARTIFACT_CREATE_BATCH_SIZE",
},
cli.IntFlag{
Name: "artifact-update-batch-size-max",
Usage: "Maximum number of artifact states to include in each update-artifacts API request (default: unlimited)",
EnvVar: "BUILDKITE_ARTIFACT_UPDATE_BATCH_SIZE_MAX",
},
cli.BoolFlag{ // Deprecated
Name: "follow-symlinks",
Usage: "Follow symbolic links while resolving globs. Note this argument is deprecated. Use `--glob-resolve-follow-symlinks` instead (default: false)",
Expand Down Expand Up @@ -157,6 +169,8 @@ var ArtifactUploadCommand = cli.Command{
// this works as long as the user only sets one of the two flags
GlobResolveFollowSymlinks: (cfg.GlobResolveFollowSymlinks || cfg.FollowSymlinks),
UploadSkipSymlinks: cfg.UploadSkipSymlinks,
CreateBatchSize: cfg.CreateBatchSize,
UpdateBatchSizeMax: cfg.UpdateBatchSizeMax,
})

// Upload the artifacts
Expand Down
2 changes: 2 additions & 0 deletions internal/artifact/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ go_test(
"artifactory_downloader_test.go",
"artifactory_uploader_test.go",
"azure_blob_test.go",
"batching_test.go",
"bk_uploader_test.go",
"download_test.go",
"downloader_test.go",
Expand All @@ -113,5 +114,6 @@ go_test(
"@com_github_google_go_cmp//cmp",
"@com_github_google_go_cmp//cmp/cmpopts",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
],
)
28 changes: 25 additions & 3 deletions internal/artifact/batch_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type BatchCreatorConfig struct {

// Whether to allow multipart uploads to the BK-hosted bucket.
AllowMultipart bool

// Number of artifacts in each CreateArtifacts request.
// If zero, a default is used.
CreateBatchSize int
}

type BatchCreator struct {
Expand All @@ -48,12 +52,16 @@ func NewArtifactBatchCreator(l logger.Logger, ac APIClient, c BatchCreatorConfig

func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) {
length := len(a.conf.Artifacts)
chunks := 30
batchSize := a.conf.CreateBatchSize
if batchSize <= 0 {
batchSize = 30
}
const maxCreateBatchSize = 500

// Split into the artifacts into chunks so we're not uploading a ton of
// files at once.
for i := 0; i < length; i += chunks {
j := min(i+chunks, length)
for i := 0; i < length; {
j := min(i+batchSize, length)

// The artifacts that will be uploaded in this chunk
theseArtifacts := a.conf.Artifacts[i:j]
Expand All @@ -72,6 +80,7 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) {
a.logger.Info("Creating (%d-%d)/%d artifacts", i, j, length)

timeout := a.conf.CreateArtifactsTimeout
saw429 := false

// Retry the batch upload a couple of times
r := roko.NewRetrier(
Expand All @@ -87,6 +96,9 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) {
}

creation, resp, err := a.apiClient.CreateArtifacts(ctxTimeout, a.conf.JobID, batch)
if resp != nil && resp.StatusCode == 429 {
saw429 = true
}
// the server returns a 403 code if the artifact has exceeded the service quota
// Break the retry on any 4xx code except for 429 Too Many Requests.
if resp != nil && (resp.StatusCode != 429 && resp.StatusCode >= 400 && resp.StatusCode <= 499) {
Expand Down Expand Up @@ -121,6 +133,16 @@ func (a *BatchCreator) Create(ctx context.Context) ([]*api.Artifact, error) {
theseArtifacts[index].UploadInstructions = specific
}
}

if saw429 && batchSize < maxCreateBatchSize {
newBatchSize := min(batchSize*2, maxCreateBatchSize)
if newBatchSize != batchSize {
a.logger.Info("Received 429 while creating artifacts, increasing create batch size from %d to %d", batchSize, newBatchSize)
batchSize = newBatchSize
}
}

i = j
}

return a.conf.Artifacts, nil
Expand Down
119 changes: 119 additions & 0 deletions internal/artifact/batching_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package artifact

import (
"context"
"errors"
"fmt"
"net/http"
"testing"

"github.com/buildkite/agent/v3/api"
"github.com/buildkite/agent/v3/logger"
"github.com/stretchr/testify/require"
)

type stubArtifactAPIClient struct {
createFn func(context.Context, string, *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error)
updateFn func(context.Context, string, []api.ArtifactState) (*api.Response, error)
}

func (s *stubArtifactAPIClient) CreateArtifacts(ctx context.Context, jobID string, batch *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) {
if s.createFn == nil {
panic("unexpected CreateArtifacts call")
}
return s.createFn(ctx, jobID, batch)
}

func (s *stubArtifactAPIClient) SearchArtifacts(context.Context, string, *api.ArtifactSearchOptions) ([]*api.Artifact, *api.Response, error) {
return nil, nil, nil
}

func (s *stubArtifactAPIClient) UpdateArtifacts(ctx context.Context, jobID string, states []api.ArtifactState) (*api.Response, error) {
if s.updateFn == nil {
panic("unexpected UpdateArtifacts call")
}
return s.updateFn(ctx, jobID, states)
}

func TestBatchCreatorIncreasesCreateBatchSizeAfter429(t *testing.T) {
t.Parallel()

artifacts := make([]*api.Artifact, 12)
for i := range artifacts {
artifacts[i] = &api.Artifact{Path: fmt.Sprintf("artifact-%d", i)}
}

var call int
var batchSizes []int
apiClient := &stubArtifactAPIClient{
createFn: func(_ context.Context, _ string, batch *api.ArtifactBatch) (*api.ArtifactBatchCreateResponse, *api.Response, error) {
batchSizes = append(batchSizes, len(batch.Artifacts))

if call == 0 {
call++
return nil, &api.Response{Response: &http.Response{StatusCode: http.StatusTooManyRequests, Status: "429 Too Many Requests"}}, errors.New("rate limited")
}

ids := make([]string, len(batch.Artifacts))
for i := range ids {
ids[i] = fmt.Sprintf("id-%d-%d", call, i)
}
call++

return &api.ArtifactBatchCreateResponse{
ArtifactIDs: ids,
InstructionsTemplate: &api.ArtifactUploadInstructions{},
}, &api.Response{Response: &http.Response{StatusCode: http.StatusCreated, Status: "201 Created"}}, nil
},
}

creator := NewArtifactBatchCreator(logger.Discard, apiClient, BatchCreatorConfig{
JobID: "job-id",
Artifacts: artifacts,
CreateBatchSize: 5,
})

_, err := creator.Create(t.Context())
require.NoError(t, err)
require.Equal(t, []int{5, 5, 7}, batchSizes)
}

func TestUpdateStatesRespectsConfiguredBatchMax(t *testing.T) {
t.Parallel()

var updateSizes []int
apiClient := &stubArtifactAPIClient{
updateFn: func(_ context.Context, _ string, states []api.ArtifactState) (*api.Response, error) {
updateSizes = append(updateSizes, len(states))
return &api.Response{Response: &http.Response{StatusCode: http.StatusOK, Status: "200 OK"}}, nil
},
}

u := &Uploader{
conf: UploaderConfig{
JobID: "job-id",
UpdateBatchSizeMax: 2,
},
logger: logger.Discard,
apiClient: apiClient,
}

worker := &artifactUploadWorker{
Uploader: u,
trackers: map[*api.Artifact]*artifactTracker{},
}
for i := 0; i < 5; i++ {
artifact := &api.Artifact{ID: fmt.Sprintf("artifact-%d", i)}
worker.trackers[artifact] = &artifactTracker{
ArtifactState: api.ArtifactState{ID: artifact.ID, State: "finished"},
}
}

err := worker.updateStates(t.Context())
require.NoError(t, err)
require.Equal(t, []int{2, 2, 1}, updateSizes)

for _, tracker := range worker.trackers {
require.Equal(t, "sent", tracker.State)
}
}
Loading