Skip to content

Commit c7cfd0c

Browse files
MatthewAThompsoncursoragentnickmazziChris Jonesclaude
authored
feat(autorag): add dropzone for uploading input data file (opendatahub-io#6950)
* feat(autorag): add dropzone for uploading input data file Co-authored-by: Cursor <cursoragent@cursor.com> * chore(autorag): move mutation to mutations.ts Co-authored-by: Cursor <cursoragent@cursor.com> * chore(autorag): address coderabbitai comments Co-authored-by: Cursor <cursoragent@cursor.com> * chore(autorag): address potential race condition * fix(autorag): handle key collision race condition Co-authored-by: Cursor <cursoragent@cursor.com> Made-with: Cursor * fix(autorag): context unit tests Co-authored-by: Cursor <cursoragent@cursor.com> * fix(autorag): guard empty file drops * fix(autorag): schema validation timing for edit button * fix(autorag): dropzone a11y * chore(autorag): namespaced global css class * chore(autorag,automl): additional bff test coverage Co-authored-by: Cursor <cursoragent@cursor.com> * chore(autorag): missing watch value Co-authored-by: Cursor <cursoragent@cursor.com> * chore(autorag): remove json from accepted input upload type * feat(autorag): upload immediately instead of deferring to submit Co-authored-by: Cursor <cursoragent@cursor.com> * chore: fix merge, adjust test --------- Co-authored-by: Cursor <cursoragent@cursor.com> Co-authored-by: Nicholas Mazzitelli <nickmazz@ca.ibm.com> Co-authored-by: Chris Jones <chrisj@ca.ibm.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e1a93a9 commit c7cfd0c

16 files changed

Lines changed: 1234 additions & 148 deletions

File tree

packages/automl/bff/internal/integrations/s3/client_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package s3
22

33
import (
44
"errors"
5+
"fmt"
56
"testing"
67

78
"github.com/stretchr/testify/assert"
@@ -207,3 +208,50 @@ func TestCountLines(t *testing.T) {
207208
assert.Equal(t, 1, countLines([]byte("a\nb")))
208209
assert.Equal(t, 3, countLines([]byte("a\nb\nc\n")))
209210
}
211+
212+
// mockS3CodedError simulates AWS SDK errors that implement ErrorCode().
213+
type mockS3CodedError struct {
214+
msg string
215+
code string
216+
}
217+
218+
func (e mockS3CodedError) Error() string {
219+
if e.msg != "" {
220+
return e.msg
221+
}
222+
return e.code
223+
}
224+
225+
func (e mockS3CodedError) ErrorCode() string {
226+
return e.code
227+
}
228+
229+
func TestIsS3ConditionalCreateConflict(t *testing.T) {
230+
t.Parallel()
231+
tests := []struct {
232+
name string
233+
err error
234+
want bool
235+
}{
236+
{name: "PreconditionFailed", err: mockS3CodedError{code: "PreconditionFailed"}, want: true},
237+
{name: "ConditionalRequestConflict", err: mockS3CodedError{code: "ConditionalRequestConflict"}, want: true},
238+
{
239+
name: "wrapped PreconditionFailed",
240+
err: fmt.Errorf("upload failed: %w", mockS3CodedError{code: "PreconditionFailed"}),
241+
want: true,
242+
},
243+
{
244+
name: "wrapped ConditionalRequestConflict",
245+
err: fmt.Errorf("upload failed: %w", mockS3CodedError{code: "ConditionalRequestConflict"}),
246+
want: true,
247+
},
248+
{name: "other ErrorCode", err: mockS3CodedError{code: "NoSuchKey"}, want: false},
249+
{name: "plain error", err: errors.New("failed"), want: false},
250+
{name: "nil", err: nil, want: false},
251+
}
252+
for _, tt := range tests {
253+
t.Run(tt.name, func(t *testing.T) {
254+
assert.Equal(t, tt.want, isS3ConditionalCreateConflict(tt.err))
255+
})
256+
}
257+
}

packages/autorag/api/openapi/autorag.yaml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,8 @@ paths:
316316
application/json:
317317
schema:
318318
$ref: "#/components/schemas/ErrorEnvelope"
319+
"409":
320+
$ref: "#/components/responses/Conflict"
319321
"500":
320322
$ref: "#/components/responses/InternalServerError"
321323
operationId: uploadS3File
@@ -329,6 +331,12 @@ paths:
329331
1 GiB plus allowance for multipart framing (~64 MiB) or the request is rejected with 413.
330332
The file part is streamed via http.MaxBytesReader (1 GiB max); larger file parts fail with 413.
331333
334+
On success, returns JSON with `uploaded: true` and the resolved `key` (which may differ
335+
from the requested key if a collision was avoided by probing existing keys).
336+
337+
Returns 409 if the object key chosen after collision resolution still conflicts at upload time
338+
(e.g. concurrent writer); the client should retry the upload.
339+
332340
# TODO [ Gustavo ] Ongoing discussion on what this endpoint and the other S3/file endpoint should be structured as
333341
/api/v1/s3/files:
334342
summary: Endpoints for dealing with multiple files within an S3-compatible connection
@@ -1135,11 +1143,18 @@ components:
11351143
description: Response body for successful S3 file upload
11361144
required:
11371145
- uploaded
1146+
- key
11381147
type: object
11391148
properties:
11401149
uploaded:
11411150
type: boolean
11421151
example: true
1152+
key:
1153+
type: string
1154+
description: >-
1155+
The actual S3 key used for the uploaded file. When the requested key already exists,
1156+
the server appends or increments a numeric suffix (for example: file.pdf -> file-1.pdf).
1157+
example: documents/myfile-1.pdf
11431158

11441159
S3ListObjectsResult:
11451160
type: object
@@ -1403,6 +1418,14 @@ components:
14031418
schema:
14041419
$ref: "#/components/schemas/ErrorEnvelope"
14051420
description: Forbidden - insufficient permissions
1421+
Conflict:
1422+
description: >-
1423+
Conflict (e.g. S3 conditional upload failed because the object already exists at the
1424+
resolved key; client may retry the request)
1425+
content:
1426+
application/json:
1427+
schema:
1428+
$ref: "#/components/schemas/ErrorEnvelope"
14061429
InternalServerError:
14071430
content:
14081431
application/json:

packages/autorag/bff/internal/api/app.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ type App struct {
5656
s3PostMaxFilePartBytes int64
5757
// s3PostMaxRequestBodyBytes caps total POST body in tests (0 = file max + multipart envelope).
5858
s3PostMaxRequestBodyBytes int64
59+
// s3PostMaxCollisionAttempts limits HeadObject-based key suffix attempts in tests (0 = default cap).
60+
s3PostMaxCollisionAttempts int
5961
//used only on mocked k8s client
6062
testEnv *envtest.Environment
6163
// rootCAs used for outbound TLS connections to Client Service

packages/autorag/bff/internal/api/errors.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,17 @@ func (app *App) forbiddenResponse(w http.ResponseWriter, r *http.Request, messag
5454
app.errorResponse(w, r, httpError)
5555
}
5656

57+
func (app *App) conflictResponse(w http.ResponseWriter, r *http.Request, message string) {
58+
httpError := &integrations.HTTPError{
59+
StatusCode: http.StatusConflict,
60+
ErrorResponse: integrations.ErrorResponse{
61+
Code: strconv.Itoa(http.StatusConflict),
62+
Message: message,
63+
},
64+
}
65+
app.errorResponse(w, r, httpError)
66+
}
67+
5768
func (app *App) unauthorizedResponse(w http.ResponseWriter, r *http.Request, message string) {
5869
// Log unauthorized access without sensitive details
5970
app.logger.Warn("Unauthorized access attempt", "method", r.Method, "uri", r.URL.Path)

packages/autorag/bff/internal/api/s3_handler.go

Lines changed: 100 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"mime"
99
"mime/multipart"
1010
"net/http"
11+
"path"
12+
"regexp"
1113
"strconv"
1214
"strings"
1315

@@ -24,6 +26,8 @@ import (
2426

2527
type S3FilesEnvelope Envelope[models.S3ListObjectsResponse, None]
2628

29+
var trailingNumberPattern = regexp.MustCompile(`^(.*)-(\d+)$`)
30+
2731
// resolvedS3 holds a ready-to-use S3 client and the resolved bucket name.
2832
type resolvedS3 struct {
2933
client s3int.S3ClientInterface
@@ -232,9 +236,20 @@ func (app *App) GetS3FileHandler(w http.ResponseWriter, r *http.Request, _ httpr
232236
}
233237
}
234238

239+
const defaultS3PostMaxCollisionAttempts = 10
240+
241+
func (app *App) effectivePostS3CollisionAttempts() int {
242+
if app != nil && app.s3PostMaxCollisionAttempts > 0 {
243+
return app.s3PostMaxCollisionAttempts
244+
}
245+
return defaultS3PostMaxCollisionAttempts
246+
}
247+
235248
// PostS3FileHandler uploads a file to S3 storage using credentials from a Kubernetes secret.
236249
// Query parameters: namespace, secretName, key (required); bucket (optional, uses AWS_S3_BUCKET from secret if not provided).
237250
// Request body: multipart/form-data with a file part named "file". Streams the file to S3 without buffering.
251+
// Candidate keys are chosen via HeadObject; the file is streamed to S3 once with If-None-Match (no full-file buffer).
252+
// If HeadObject and PUT disagree (concurrent writer), the handler returns 409 Conflict without retrying.
238253
//
239254
// Note: namespace is provided via the AttachNamespace middleware
240255
func (app *App) PostS3FileHandler(w http.ResponseWriter, r *http.Request, _ httprouter.Params) {
@@ -263,6 +278,11 @@ func (app *App) PostS3FileHandler(w http.ResponseWriter, r *http.Request, _ http
263278

264279
ctx := r.Context()
265280
bucket := s3.bucket
281+
resolvedKey, err := resolveNonCollidingS3Key(ctx, s3.client, bucket, key, app.effectivePostS3CollisionAttempts())
282+
if err != nil {
283+
app.serverErrorResponse(w, r, fmt.Errorf("error resolving S3 key for upload: %w", err))
284+
return
285+
}
266286

267287
maxUploadSize := app.s3PostMaxTotalBodyBytes()
268288
// Cap the entire request body so chunked/unknown-length clients cannot force the multipart
@@ -331,25 +351,102 @@ func (app *App) PostS3FileHandler(w http.ResponseWriter, r *http.Request, _ http
331351
limitedFile := http.MaxBytesReader(nil, filePart, maxFilePartBytes)
332352
// MaxBytesReader.Close forwards to Part.Close, which drains the rest of the file part.
333353
defer limitedFile.Close()
334-
if err := s3.client.UploadObject(ctx, bucket, key, limitedFile, contentType); err != nil {
354+
if err := s3.client.UploadObject(ctx, bucket, resolvedKey, limitedFile, contentType); err != nil {
335355
var maxBytesErr *http.MaxBytesError
336356
if errors.As(err, &maxBytesErr) {
337357
app.payloadTooLargeResponse(w, r, "file exceeds maximum size of 1 GiB")
338358
return
339359
}
360+
if errors.Is(err, s3int.ErrObjectAlreadyExists) {
361+
app.conflictResponse(w, r, fmt.Sprintf("object key %q already exists in S3 (upload conflict); retry with a different key", resolvedKey))
362+
return
363+
}
340364
var accessDenied interface{ ErrorCode() string }
341365
if errors.As(err, &accessDenied) && accessDenied.ErrorCode() == "AccessDenied" {
342-
app.forbiddenResponse(w, r, fmt.Sprintf("access denied uploading to S3 '%s/%s'", bucket, key))
366+
app.forbiddenResponse(w, r, fmt.Sprintf("access denied uploading to S3 '%s/%s'", bucket, resolvedKey))
343367
return
344368
}
345369
app.serverErrorResponse(w, r, fmt.Errorf("error uploading file to S3: %w", err))
346370
return
347371
}
348372

349-
body := map[string]bool{"uploaded": true}
373+
body := map[string]any{
374+
"uploaded": true,
375+
"key": resolvedKey,
376+
}
350377
_ = app.WriteJSON(w, http.StatusCreated, body, nil)
351378
}
352379

380+
// resolveNonCollidingS3Key picks a candidate object key using HeadObject only (no upload body read).
381+
// When the requested key exists, it tries name-1, name-2, … up to maxSuffixAttempts times.
382+
func resolveNonCollidingS3Key(
383+
ctx context.Context,
384+
client s3int.S3ClientInterface,
385+
bucket string,
386+
requestedKey string,
387+
maxCollisionAttempts int,
388+
) (string, error) {
389+
exists, err := client.ObjectExists(ctx, bucket, requestedKey)
390+
if err != nil {
391+
return "", err
392+
}
393+
if !exists {
394+
return requestedKey, nil
395+
}
396+
397+
dir, name := splitS3ObjectPath(requestedKey)
398+
stem, ext := splitNameAndExtension(name)
399+
stemBase, nextIndex := splitStemAndNextIndex(stem)
400+
401+
for range maxCollisionAttempts {
402+
candidateName := fmt.Sprintf("%s-%d%s", stemBase, nextIndex, ext)
403+
candidateKey := dir + candidateName
404+
405+
candidateExists, checkErr := client.ObjectExists(ctx, bucket, candidateKey)
406+
if checkErr != nil {
407+
return "", checkErr
408+
}
409+
if !candidateExists {
410+
return candidateKey, nil
411+
}
412+
nextIndex++
413+
}
414+
return "", fmt.Errorf("failed to resolve non-colliding S3 key after %d attempts", maxCollisionAttempts)
415+
}
416+
417+
func splitS3ObjectPath(key string) (dir string, name string) {
418+
lastSlashIndex := strings.LastIndex(key, "/")
419+
if lastSlashIndex == -1 {
420+
return "", key
421+
}
422+
return key[:lastSlashIndex+1], key[lastSlashIndex+1:]
423+
}
424+
425+
func splitNameAndExtension(fileName string) (stem string, ext string) {
426+
ext = path.Ext(fileName)
427+
if ext == "" {
428+
return fileName, ""
429+
}
430+
stem = strings.TrimSuffix(fileName, ext)
431+
if stem == "" {
432+
return fileName, ""
433+
}
434+
return stem, ext
435+
}
436+
437+
func splitStemAndNextIndex(stem string) (base string, nextIndex int) {
438+
match := trailingNumberPattern.FindStringSubmatch(stem)
439+
if len(match) != 3 {
440+
return stem, 1
441+
}
442+
443+
parsedIndex, err := strconv.Atoi(match[2])
444+
if err != nil {
445+
return stem, 1
446+
}
447+
return match[1], parsedIndex + 1
448+
}
449+
353450
// rejectDeclaredOversizedS3Post returns 413 when Content-Length is set and exceeds
354451
// s3PostMaxTotalBodyBytes. Chunked or unknown length passes here; PostS3FileHandler still wraps
355452
// r.Body with http.MaxBytesReader before MultipartReader so total bytes read are capped.

0 commit comments

Comments
 (0)