Skip to content

Commit 2be2909

Browse files
authored
feat: add chunked uploads for large stack payloads (#2251)
* feat: add chunked uploads for large stack payloads to avoid 413 errors * docs: add blog post and roadmap entry for chunked stack uploads * fix: guard receivedBodies with sync.Mutex in chunked upload test * fix: avoid require in handler goroutine and guard against empty bodies slice * fix: replace atomic.Value with mutex-guarded error in chunked upload test * docs: add max_payload_bytes to Atmos Pro configuration reference
1 parent 9e14d6f commit 2be2909

File tree

11 files changed

+616
-30
lines changed

11 files changed

+616
-30
lines changed

pkg/pro/api_client.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
cfg "github.com/cloudposse/atmos/pkg/config"
1818
"github.com/cloudposse/atmos/pkg/pro/dtos"
1919
"github.com/cloudposse/atmos/pkg/schema"
20-
"github.com/cloudposse/atmos/pkg/utils"
2120
)
2221

2322
const (
@@ -93,6 +92,7 @@ type AtmosProAPIClient struct {
9392
BaseAPIEndpoint string
9493
BaseURL string
9594
HTTPClient *http.Client
95+
MaxPayloadBytes int // Configurable max payload size before chunking. 0 uses default.
9696
}
9797

9898
// NewAtmosProAPIClient creates a new instance of AtmosProAPIClient.
@@ -120,11 +120,15 @@ func NewAtmosProAPIClientFromEnv(atmosConfig *schema.AtmosConfiguration) (*Atmos
120120
}
121121
log.Debug("Using baseAPIEndpoint", "baseAPIEndpoint", baseAPIEndpoint)
122122

123+
maxPayloadBytes := atmosConfig.Settings.Pro.MaxPayloadBytes
124+
123125
// First, check if the API key is set via environment variable
124126
apiToken := atmosConfig.Settings.Pro.Token
125127
if apiToken != "" {
126128
log.Debug("Creating API client with API token from environment variable")
127-
return NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken), nil
129+
client := NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken)
130+
client.MaxPayloadBytes = maxPayloadBytes
131+
return client, nil
128132
}
129133

130134
// If API key is not set, attempt to use GitHub OIDC token exchange
@@ -146,7 +150,9 @@ func NewAtmosProAPIClientFromEnv(atmosConfig *schema.AtmosConfiguration) (*Atmos
146150
return nil, errors.Join(errUtils.ErrOIDCTokenExchangeFailed, err)
147151
}
148152

149-
return NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken), nil
153+
client := NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken)
154+
client.MaxPayloadBytes = maxPayloadBytes
155+
return client, nil
150156
}
151157

152158
func getAuthenticatedRequest(c *AtmosProAPIClient, method, url string, body io.Reader) (*http.Request, error) {
@@ -162,15 +168,49 @@ func getAuthenticatedRequest(c *AtmosProAPIClient, method, url string, body io.R
162168
}
163169

164170
// UploadAffectedStacks uploads information about affected stacks.
171+
// Large payloads are automatically split into chunks to stay within server body size limits.
165172
func (c *AtmosProAPIClient) UploadAffectedStacks(dto *dtos.UploadAffectedStacksRequest) error {
166173
url := fmt.Sprintf("%s/%s/affected-stacks", c.BaseURL, c.BaseAPIEndpoint)
167174

168-
data, err := utils.ConvertToJSON(dto)
175+
// Estimate metadata overhead (everything except the stacks array).
176+
overheadDTO := dtos.UploadAffectedStacksRequest{
177+
HeadSHA: dto.HeadSHA,
178+
BaseSHA: dto.BaseSHA,
179+
RepoURL: dto.RepoURL,
180+
RepoName: dto.RepoName,
181+
RepoOwner: dto.RepoOwner,
182+
RepoHost: dto.RepoHost,
183+
Stacks: []schema.Affected{},
184+
}
185+
overhead := metadataOverhead(overheadDTO)
186+
187+
return sendChunked(dto.Stacks, c.MaxPayloadBytes, overhead, func(chunk []schema.Affected, batch *BatchInfo) error {
188+
chunkDTO := &dtos.UploadAffectedStacksRequest{
189+
HeadSHA: dto.HeadSHA,
190+
BaseSHA: dto.BaseSHA,
191+
RepoURL: dto.RepoURL,
192+
RepoName: dto.RepoName,
193+
RepoOwner: dto.RepoOwner,
194+
RepoHost: dto.RepoHost,
195+
Stacks: chunk,
196+
}
197+
if batch != nil {
198+
chunkDTO.BatchID = batch.BatchID
199+
chunkDTO.BatchIndex = &batch.BatchIndex
200+
chunkDTO.BatchTotal = &batch.BatchTotal
201+
}
202+
return c.sendAffectedStacksRequest(url, chunkDTO)
203+
})
204+
}
205+
206+
// sendAffectedStacksRequest sends a single affected stacks upload request.
207+
func (c *AtmosProAPIClient) sendAffectedStacksRequest(url string, dto *dtos.UploadAffectedStacksRequest) error {
208+
data, err := json.Marshal(dto)
169209
if err != nil {
170210
return errors.Join(errUtils.ErrFailedToMarshalPayload, err)
171211
}
172212

173-
req, err := getAuthenticatedRequest(c, "POST", url, bytes.NewBuffer([]byte(data)))
213+
req, err := getAuthenticatedRequest(c, "POST", url, bytes.NewBuffer(data))
174214
if err != nil {
175215
return errors.Join(errUtils.ErrFailedToCreateAuthRequest, err)
176216
}

pkg/pro/api_client_instances.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package pro
22

33
import (
4+
"bytes"
45
"crypto/sha256"
56
"encoding/hex"
7+
"encoding/json"
68
"errors"
79
"fmt"
810
"net/http"
9-
"strings"
1011
"time"
1112

1213
log "github.com/cloudposse/atmos/pkg/logger"
1314

1415
errUtils "github.com/cloudposse/atmos/errors"
1516
"github.com/cloudposse/atmos/pkg/pro/dtos"
16-
"github.com/cloudposse/atmos/pkg/utils"
17+
"github.com/cloudposse/atmos/pkg/schema"
1718
)
1819

1920
const (
@@ -22,6 +23,7 @@ const (
2223
)
2324

2425
// UploadInstances uploads drift detection data to the API.
26+
// Large payloads are automatically split into chunks to stay within server body size limits.
2527
func (c *AtmosProAPIClient) UploadInstances(dto *dtos.InstancesUploadRequest) error {
2628
if dto == nil {
2729
return errors.Join(
@@ -31,22 +33,57 @@ func (c *AtmosProAPIClient) UploadInstances(dto *dtos.InstancesUploadRequest) er
3133
}
3234
endpoint := fmt.Sprintf("%s/%s/instances", c.BaseURL, c.BaseAPIEndpoint)
3335

34-
// Guard against nil HTTPClient by ensuring a default client with a sane timeout
36+
// Estimate metadata overhead (everything except the instances array).
37+
overheadDTO := dtos.InstancesUploadRequest{
38+
RepoURL: dto.RepoURL,
39+
RepoName: dto.RepoName,
40+
RepoOwner: dto.RepoOwner,
41+
RepoHost: dto.RepoHost,
42+
Instances: []schema.Instance{},
43+
}
44+
overhead := metadataOverhead(overheadDTO)
45+
46+
return sendChunked(dto.Instances, c.MaxPayloadBytes, overhead, func(chunk []schema.Instance, batch *BatchInfo) error {
47+
chunkDTO := &dtos.InstancesUploadRequest{
48+
RepoURL: dto.RepoURL,
49+
RepoName: dto.RepoName,
50+
RepoOwner: dto.RepoOwner,
51+
RepoHost: dto.RepoHost,
52+
Instances: chunk,
53+
}
54+
if batch != nil {
55+
chunkDTO.BatchID = batch.BatchID
56+
chunkDTO.BatchIndex = &batch.BatchIndex
57+
chunkDTO.BatchTotal = &batch.BatchTotal
58+
}
59+
return c.sendInstancesRequest(endpoint, chunkDTO)
60+
})
61+
}
62+
63+
// sendInstancesRequest sends a single instances upload request.
64+
func (c *AtmosProAPIClient) sendInstancesRequest(endpoint string, dto *dtos.InstancesUploadRequest) error {
65+
// Guard against nil HTTPClient by ensuring a default client with a sane timeout.
3566
client := c.HTTPClient
3667
if client == nil {
3768
client = &http.Client{Timeout: DefaultHTTPClientTimeout}
3869
}
3970

40-
data, err := utils.ConvertToJSON(dto)
71+
data, err := json.Marshal(dto)
4172
if err != nil {
4273
return errors.Join(errUtils.ErrFailedToMarshalPayload, err)
4374
}
4475

45-
// Log safe metadata instead of full payload to prevent secret leakage
46-
hash := sha256.Sum256([]byte(data))
47-
log.Debug("Uploading instances DTO.", "repo_owner", dto.RepoOwner, "repo_name", dto.RepoName, "instances_count", len(dto.Instances), "payload_hash", hex.EncodeToString(hash[:]))
76+
// Log safe metadata instead of full payload to prevent secret leakage.
77+
hash := sha256.Sum256(data)
78+
log.Debug("Uploading instances DTO.",
79+
"repo_owner", dto.RepoOwner,
80+
"repo_name", dto.RepoName,
81+
"instances_count", len(dto.Instances),
82+
"payload_bytes", len(data),
83+
"payload_hash", hex.EncodeToString(hash[:]),
84+
)
4885

49-
req, err := getAuthenticatedRequest(c, "POST", endpoint, strings.NewReader(data))
86+
req, err := getAuthenticatedRequest(c, "POST", endpoint, bytes.NewReader(data))
5087
if err != nil {
5188
return errors.Join(errUtils.ErrFailedToCreateAuthRequest, err)
5289
}

pkg/pro/api_client_upload_affected_stacks_test.go

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
package pro
22

33
import (
4+
"encoding/json"
5+
"io"
46
"net/http"
57
"net/http/httptest"
8+
"sync"
9+
"sync/atomic"
610
"testing"
711
"time"
812

913
"github.com/stretchr/testify/assert"
14+
"github.com/stretchr/testify/require"
1015

1116
errUtils "github.com/cloudposse/atmos/errors"
1217
"github.com/cloudposse/atmos/pkg/pro/dtos"
@@ -148,3 +153,118 @@ func TestUploadAffectedStacks_RequestCreationError(t *testing.T) {
148153
assert.Error(t, err)
149154
assert.ErrorIs(t, err, errUtils.ErrFailedToCreateAuthRequest)
150155
}
156+
157+
func TestUploadAffectedStacks_Chunked(t *testing.T) {
158+
var requestCount atomic.Int32
159+
var mu sync.Mutex
160+
var receivedBodies []dtos.UploadAffectedStacksRequest
161+
var handlerErr error // Captures first error from handler goroutine, guarded by mu.
162+
163+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
164+
requestCount.Add(1)
165+
body, err := io.ReadAll(r.Body)
166+
if err != nil {
167+
mu.Lock()
168+
if handlerErr == nil {
169+
handlerErr = err
170+
}
171+
mu.Unlock()
172+
http.Error(w, err.Error(), http.StatusInternalServerError)
173+
return
174+
}
175+
176+
var req dtos.UploadAffectedStacksRequest
177+
if err := json.Unmarshal(body, &req); err != nil {
178+
mu.Lock()
179+
if handlerErr == nil {
180+
handlerErr = err
181+
}
182+
mu.Unlock()
183+
http.Error(w, err.Error(), http.StatusBadRequest)
184+
return
185+
}
186+
187+
mu.Lock()
188+
receivedBodies = append(receivedBodies, req)
189+
mu.Unlock()
190+
191+
w.WriteHeader(http.StatusOK)
192+
w.Write([]byte(`{"success": true}`))
193+
}))
194+
defer server.Close()
195+
196+
client := &AtmosProAPIClient{
197+
BaseURL: server.URL,
198+
BaseAPIEndpoint: "api",
199+
APIToken: "test-token",
200+
HTTPClient: &http.Client{Timeout: 10 * time.Second},
201+
}
202+
203+
// Create a large payload that exceeds DefaultMaxPayloadBytes.
204+
largeSettings := make(schema.AtmosSectionMapType)
205+
bigValue := make([]byte, 1000)
206+
for i := range bigValue {
207+
bigValue[i] = 'a'
208+
}
209+
largeSettings["big"] = string(bigValue)
210+
211+
numStacks := (DefaultMaxPayloadBytes / 1000) + 50
212+
stacks := make([]schema.Affected, numStacks)
213+
for i := range stacks {
214+
stacks[i] = schema.Affected{
215+
Component: "component",
216+
Stack: "stack",
217+
Settings: largeSettings,
218+
}
219+
}
220+
221+
dto := dtos.UploadAffectedStacksRequest{
222+
HeadSHA: "head-sha",
223+
BaseSHA: "base-sha",
224+
RepoURL: "https://github.com/test/repo",
225+
RepoName: "repo",
226+
RepoOwner: "test",
227+
RepoHost: "github.com",
228+
Stacks: stacks,
229+
}
230+
231+
err := client.UploadAffectedStacks(&dto)
232+
require.NoError(t, err)
233+
234+
// Check for handler-side errors (read/unmarshal failures).
235+
mu.Lock()
236+
hErr := handlerErr
237+
mu.Unlock()
238+
if hErr != nil {
239+
t.Fatalf("httptest handler error: %v", hErr)
240+
}
241+
242+
// Should have sent multiple requests.
243+
totalRequests := int(requestCount.Load())
244+
require.Greater(t, totalRequests, 1, "large payload should be chunked into multiple requests")
245+
246+
// All requests should have the same batch_id.
247+
mu.Lock()
248+
bodies := make([]dtos.UploadAffectedStacksRequest, len(receivedBodies))
249+
copy(bodies, receivedBodies)
250+
mu.Unlock()
251+
252+
require.NotEmpty(t, bodies, "receivedBodies should not be empty")
253+
batchID := bodies[0].BatchID
254+
assert.NotEmpty(t, batchID)
255+
256+
totalStacks := 0
257+
for i, body := range bodies {
258+
assert.Equal(t, batchID, body.BatchID)
259+
require.NotNil(t, body.BatchIndex)
260+
assert.Equal(t, i, *body.BatchIndex)
261+
require.NotNil(t, body.BatchTotal)
262+
assert.Equal(t, totalRequests, *body.BatchTotal)
263+
// Metadata should be preserved in each chunk.
264+
assert.Equal(t, "head-sha", body.HeadSHA)
265+
assert.Equal(t, "base-sha", body.BaseSHA)
266+
assert.Equal(t, "test", body.RepoOwner)
267+
totalStacks += len(body.Stacks)
268+
}
269+
assert.Equal(t, numStacks, totalStacks, "all stacks should be accounted for across chunks")
270+
}

0 commit comments

Comments
 (0)