Skip to content

Commit 215e2a2

Browse files
committed
feat: add chunked uploads for large stack payloads to avoid 413 errors
1 parent f941c79 commit 215e2a2

File tree

8 files changed

+511
-30
lines changed

8 files changed

+511
-30
lines changed

pkg/pro/api_client.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
cfg "github.com/cloudposse/atmos/pkg/config"
1818
"github.com/cloudposse/atmos/pkg/pro/dtos"
1919
"github.com/cloudposse/atmos/pkg/schema"
20-
"github.com/cloudposse/atmos/pkg/utils"
2120
)
2221

2322
const (
@@ -93,6 +92,7 @@ type AtmosProAPIClient struct {
9392
BaseAPIEndpoint string
9493
BaseURL string
9594
HTTPClient *http.Client
95+
MaxPayloadBytes int // Configurable max payload size before chunking. 0 uses default.
9696
}
9797

9898
// NewAtmosProAPIClient creates a new instance of AtmosProAPIClient.
@@ -120,11 +120,15 @@ func NewAtmosProAPIClientFromEnv(atmosConfig *schema.AtmosConfiguration) (*Atmos
120120
}
121121
log.Debug("Using baseAPIEndpoint", "baseAPIEndpoint", baseAPIEndpoint)
122122

123+
maxPayloadBytes := atmosConfig.Settings.Pro.MaxPayloadBytes
124+
123125
// First, check if the API key is set via environment variable
124126
apiToken := atmosConfig.Settings.Pro.Token
125127
if apiToken != "" {
126128
log.Debug("Creating API client with API token from environment variable")
127-
return NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken), nil
129+
client := NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken)
130+
client.MaxPayloadBytes = maxPayloadBytes
131+
return client, nil
128132
}
129133

130134
// If API key is not set, attempt to use GitHub OIDC token exchange
@@ -146,7 +150,9 @@ func NewAtmosProAPIClientFromEnv(atmosConfig *schema.AtmosConfiguration) (*Atmos
146150
return nil, errors.Join(errUtils.ErrOIDCTokenExchangeFailed, err)
147151
}
148152

149-
return NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken), nil
153+
client := NewAtmosProAPIClient(baseURL, baseAPIEndpoint, apiToken)
154+
client.MaxPayloadBytes = maxPayloadBytes
155+
return client, nil
150156
}
151157

152158
func getAuthenticatedRequest(c *AtmosProAPIClient, method, url string, body io.Reader) (*http.Request, error) {
@@ -162,15 +168,49 @@ func getAuthenticatedRequest(c *AtmosProAPIClient, method, url string, body io.R
162168
}
163169

164170
// UploadAffectedStacks uploads information about affected stacks.
171+
// Large payloads are automatically split into chunks to stay within server body size limits.
165172
func (c *AtmosProAPIClient) UploadAffectedStacks(dto *dtos.UploadAffectedStacksRequest) error {
166173
url := fmt.Sprintf("%s/%s/affected-stacks", c.BaseURL, c.BaseAPIEndpoint)
167174

168-
data, err := utils.ConvertToJSON(dto)
175+
// Estimate metadata overhead (everything except the stacks array).
176+
overheadDTO := dtos.UploadAffectedStacksRequest{
177+
HeadSHA: dto.HeadSHA,
178+
BaseSHA: dto.BaseSHA,
179+
RepoURL: dto.RepoURL,
180+
RepoName: dto.RepoName,
181+
RepoOwner: dto.RepoOwner,
182+
RepoHost: dto.RepoHost,
183+
Stacks: []schema.Affected{},
184+
}
185+
overhead := metadataOverhead(overheadDTO)
186+
187+
return sendChunked(dto.Stacks, c.MaxPayloadBytes, overhead, func(chunk []schema.Affected, batch *BatchInfo) error {
188+
chunkDTO := &dtos.UploadAffectedStacksRequest{
189+
HeadSHA: dto.HeadSHA,
190+
BaseSHA: dto.BaseSHA,
191+
RepoURL: dto.RepoURL,
192+
RepoName: dto.RepoName,
193+
RepoOwner: dto.RepoOwner,
194+
RepoHost: dto.RepoHost,
195+
Stacks: chunk,
196+
}
197+
if batch != nil {
198+
chunkDTO.BatchID = batch.BatchID
199+
chunkDTO.BatchIndex = &batch.BatchIndex
200+
chunkDTO.BatchTotal = &batch.BatchTotal
201+
}
202+
return c.sendAffectedStacksRequest(url, chunkDTO)
203+
})
204+
}
205+
206+
// sendAffectedStacksRequest sends a single affected stacks upload request.
207+
func (c *AtmosProAPIClient) sendAffectedStacksRequest(url string, dto *dtos.UploadAffectedStacksRequest) error {
208+
data, err := json.Marshal(dto)
169209
if err != nil {
170210
return errors.Join(errUtils.ErrFailedToMarshalPayload, err)
171211
}
172212

173-
req, err := getAuthenticatedRequest(c, "POST", url, bytes.NewBuffer([]byte(data)))
213+
req, err := getAuthenticatedRequest(c, "POST", url, bytes.NewBuffer(data))
174214
if err != nil {
175215
return errors.Join(errUtils.ErrFailedToCreateAuthRequest, err)
176216
}

pkg/pro/api_client_instances.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package pro
22

33
import (
4+
"bytes"
45
"crypto/sha256"
56
"encoding/hex"
7+
"encoding/json"
68
"errors"
79
"fmt"
810
"net/http"
9-
"strings"
1011
"time"
1112

1213
log "github.com/cloudposse/atmos/pkg/logger"
1314

1415
errUtils "github.com/cloudposse/atmos/errors"
1516
"github.com/cloudposse/atmos/pkg/pro/dtos"
16-
"github.com/cloudposse/atmos/pkg/utils"
17+
"github.com/cloudposse/atmos/pkg/schema"
1718
)
1819

1920
const (
@@ -22,6 +23,7 @@ const (
2223
)
2324

2425
// UploadInstances uploads drift detection data to the API.
26+
// Large payloads are automatically split into chunks to stay within server body size limits.
2527
func (c *AtmosProAPIClient) UploadInstances(dto *dtos.InstancesUploadRequest) error {
2628
if dto == nil {
2729
return errors.Join(
@@ -31,22 +33,57 @@ func (c *AtmosProAPIClient) UploadInstances(dto *dtos.InstancesUploadRequest) er
3133
}
3234
endpoint := fmt.Sprintf("%s/%s/instances", c.BaseURL, c.BaseAPIEndpoint)
3335

34-
// Guard against nil HTTPClient by ensuring a default client with a sane timeout
36+
// Estimate metadata overhead (everything except the instances array).
37+
overheadDTO := dtos.InstancesUploadRequest{
38+
RepoURL: dto.RepoURL,
39+
RepoName: dto.RepoName,
40+
RepoOwner: dto.RepoOwner,
41+
RepoHost: dto.RepoHost,
42+
Instances: []schema.Instance{},
43+
}
44+
overhead := metadataOverhead(overheadDTO)
45+
46+
return sendChunked(dto.Instances, c.MaxPayloadBytes, overhead, func(chunk []schema.Instance, batch *BatchInfo) error {
47+
chunkDTO := &dtos.InstancesUploadRequest{
48+
RepoURL: dto.RepoURL,
49+
RepoName: dto.RepoName,
50+
RepoOwner: dto.RepoOwner,
51+
RepoHost: dto.RepoHost,
52+
Instances: chunk,
53+
}
54+
if batch != nil {
55+
chunkDTO.BatchID = batch.BatchID
56+
chunkDTO.BatchIndex = &batch.BatchIndex
57+
chunkDTO.BatchTotal = &batch.BatchTotal
58+
}
59+
return c.sendInstancesRequest(endpoint, chunkDTO)
60+
})
61+
}
62+
63+
// sendInstancesRequest sends a single instances upload request.
64+
func (c *AtmosProAPIClient) sendInstancesRequest(endpoint string, dto *dtos.InstancesUploadRequest) error {
65+
// Guard against nil HTTPClient by ensuring a default client with a sane timeout.
3566
client := c.HTTPClient
3667
if client == nil {
3768
client = &http.Client{Timeout: DefaultHTTPClientTimeout}
3869
}
3970

40-
data, err := utils.ConvertToJSON(dto)
71+
data, err := json.Marshal(dto)
4172
if err != nil {
4273
return errors.Join(errUtils.ErrFailedToMarshalPayload, err)
4374
}
4475

45-
// Log safe metadata instead of full payload to prevent secret leakage
46-
hash := sha256.Sum256([]byte(data))
47-
log.Debug("Uploading instances DTO.", "repo_owner", dto.RepoOwner, "repo_name", dto.RepoName, "instances_count", len(dto.Instances), "payload_hash", hex.EncodeToString(hash[:]))
76+
// Log safe metadata instead of full payload to prevent secret leakage.
77+
hash := sha256.Sum256(data)
78+
log.Debug("Uploading instances DTO.",
79+
"repo_owner", dto.RepoOwner,
80+
"repo_name", dto.RepoName,
81+
"instances_count", len(dto.Instances),
82+
"payload_bytes", len(data),
83+
"payload_hash", hex.EncodeToString(hash[:]),
84+
)
4885

49-
req, err := getAuthenticatedRequest(c, "POST", endpoint, strings.NewReader(data))
86+
req, err := getAuthenticatedRequest(c, "POST", endpoint, bytes.NewReader(data))
5087
if err != nil {
5188
return errors.Join(errUtils.ErrFailedToCreateAuthRequest, err)
5289
}

pkg/pro/api_client_upload_affected_stacks_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package pro
22

33
import (
4+
"encoding/json"
5+
"io"
46
"net/http"
57
"net/http/httptest"
8+
"sync/atomic"
69
"testing"
710
"time"
811

912
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
1014

1115
errUtils "github.com/cloudposse/atmos/errors"
1216
"github.com/cloudposse/atmos/pkg/pro/dtos"
@@ -148,3 +152,83 @@ func TestUploadAffectedStacks_RequestCreationError(t *testing.T) {
148152
assert.Error(t, err)
149153
assert.ErrorIs(t, err, errUtils.ErrFailedToCreateAuthRequest)
150154
}
155+
156+
func TestUploadAffectedStacks_Chunked(t *testing.T) {
157+
var requestCount atomic.Int32
158+
var receivedBodies []dtos.UploadAffectedStacksRequest
159+
160+
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
161+
requestCount.Add(1)
162+
body, err := io.ReadAll(r.Body)
163+
require.NoError(t, err)
164+
165+
var req dtos.UploadAffectedStacksRequest
166+
require.NoError(t, json.Unmarshal(body, &req))
167+
receivedBodies = append(receivedBodies, req)
168+
169+
w.WriteHeader(http.StatusOK)
170+
w.Write([]byte(`{"success": true}`))
171+
}))
172+
defer server.Close()
173+
174+
client := &AtmosProAPIClient{
175+
BaseURL: server.URL,
176+
BaseAPIEndpoint: "api",
177+
APIToken: "test-token",
178+
HTTPClient: &http.Client{Timeout: 10 * time.Second},
179+
}
180+
181+
// Create a large payload that exceeds DefaultMaxPayloadBytes.
182+
largeSettings := make(schema.AtmosSectionMapType)
183+
bigValue := make([]byte, 1000)
184+
for i := range bigValue {
185+
bigValue[i] = 'a'
186+
}
187+
largeSettings["big"] = string(bigValue)
188+
189+
numStacks := (DefaultMaxPayloadBytes / 1000) + 50
190+
stacks := make([]schema.Affected, numStacks)
191+
for i := range stacks {
192+
stacks[i] = schema.Affected{
193+
Component: "component",
194+
Stack: "stack",
195+
Settings: largeSettings,
196+
}
197+
}
198+
199+
dto := dtos.UploadAffectedStacksRequest{
200+
HeadSHA: "head-sha",
201+
BaseSHA: "base-sha",
202+
RepoURL: "https://github.com/test/repo",
203+
RepoName: "repo",
204+
RepoOwner: "test",
205+
RepoHost: "github.com",
206+
Stacks: stacks,
207+
}
208+
209+
err := client.UploadAffectedStacks(&dto)
210+
require.NoError(t, err)
211+
212+
// Should have sent multiple requests.
213+
totalRequests := int(requestCount.Load())
214+
assert.Greater(t, totalRequests, 1, "large payload should be chunked into multiple requests")
215+
216+
// All requests should have the same batch_id.
217+
batchID := receivedBodies[0].BatchID
218+
assert.NotEmpty(t, batchID)
219+
220+
totalStacks := 0
221+
for i, body := range receivedBodies {
222+
assert.Equal(t, batchID, body.BatchID)
223+
require.NotNil(t, body.BatchIndex)
224+
assert.Equal(t, i, *body.BatchIndex)
225+
require.NotNil(t, body.BatchTotal)
226+
assert.Equal(t, totalRequests, *body.BatchTotal)
227+
// Metadata should be preserved in each chunk.
228+
assert.Equal(t, "head-sha", body.HeadSHA)
229+
assert.Equal(t, "base-sha", body.BaseSHA)
230+
assert.Equal(t, "test", body.RepoOwner)
231+
totalStacks += len(body.Stacks)
232+
}
233+
assert.Equal(t, numStacks, totalStacks, "all stacks should be accounted for across chunks")
234+
}

0 commit comments

Comments
 (0)