Skip to content

Commit da1015b

Browse files
authored
Reject concurrent worker session operations (#711)
* Reject concurrent worker session operations * Fix abandoned session operation cleanup * Allow cleanup of abandoned session continuations * Avoid mutating rejected transaction cleanup * Clean up abandoned metadata operations on rollback * Keep rejected transaction cleanup from consuming metadata * Fix lint after main merge
1 parent 6926069 commit da1015b

12 files changed

Lines changed: 1010 additions & 73 deletions

controlplane/provisioning/api_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ func TestProvisionCnpgShard(t *testing.T) {
511511
w := store.warehouses["shardco"]
512512
if w == nil {
513513
t.Fatal("expected warehouse to be created")
514+
return
514515
}
515516
if w.MetadataStore.Kind != configstore.MetadataStoreKindCnpgShard {
516517
t.Errorf("metadata store kind = %q, want cnpg-shard", w.MetadataStore.Kind)
@@ -545,6 +546,7 @@ func TestProvisionIcebergExternal(t *testing.T) {
545546
w := store.warehouses["extice"]
546547
if w == nil {
547548
t.Fatal("expected warehouse to be created")
549+
return
548550
}
549551
if w.MetadataStore.Kind != configstore.MetadataStoreKindExternal {
550552
t.Errorf("metadata store kind = %q, want external", w.MetadataStore.Kind)
@@ -632,6 +634,7 @@ func TestProvisionDuckLakeExternal(t *testing.T) {
632634
w := store.warehouses["extdl"]
633635
if w == nil {
634636
t.Fatal("expected warehouse to be created")
637+
return
635638
}
636639
if w.MetadataStore.Kind != configstore.MetadataStoreKindExternal {
637640
t.Errorf("metadata store kind = %q, want external", w.MetadataStore.Kind)

duckdbservice/activation_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ func TestSessionPoolActivateTenantAllowsSameOwnerSameEpochCredentialRetry(t *tes
355355
current := pool.currentActivation()
356356
if current == nil {
357357
t.Fatal("expected activation to remain present")
358+
return
358359
}
359360
if current.payload.DuckLake.S3AccessKey != "NEW_ACCESS_KEY" {
360361
t.Fatalf("expected activation payload to be refreshed, got %q", current.payload.DuckLake.S3AccessKey)

duckdbservice/copy_from_stdin.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ func (h *FlightSQLHandler) doCopyFromStdin(
4848
if err != nil {
4949
return err
5050
}
51+
finishOperation, ok := session.beginOperation()
52+
if busyErr := sessionBusyStatus(ok); busyErr != nil {
53+
return busyErr
54+
}
55+
defer finishOperation()
5156
endConnWork, ok := session.beginConnWork()
5257
if !ok {
5358
return status.Error(codes.NotFound, "session closed")

duckdbservice/copy_from_stdin_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,32 @@ func TestDoCopyFromStdinIngestsCSVAndRunsCOPY(t *testing.T) {
186186
}
187187
}
188188

189+
func TestDoCopyFromStdinRejectsConcurrentSessionOperation(t *testing.T) {
190+
session, cleanup := newSessionWithInMemoryDuckDB(t)
191+
defer cleanup()
192+
finishOperation, ok := session.beginOperation()
193+
if !ok {
194+
t.Fatal("expected operation to start")
195+
}
196+
defer finishOperation()
197+
198+
pool := &SessionPool{sessions: map[string]*Session{session.ID: session}}
199+
handler := &FlightSQLHandler{pool: pool}
200+
ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("x-duckgres-session", session.ID))
201+
202+
first := &flight.FlightData{
203+
FlightDescriptor: &flight.FlightDescriptor{
204+
Type: flight.DescriptorPATH,
205+
Path: []string{flightclient.CopyFromStdinDescriptorPath},
206+
Cmd: []byte("COPY t FROM '" + flightclient.CopyFromStdinPathPlaceholder + "' (FORMAT CSV)"),
207+
},
208+
}
209+
err := handler.doCopyFromStdin(ctx, first, &fakeDoPutStream{ctx: ctx})
210+
if status.Code(err) != codes.FailedPrecondition {
211+
t.Fatalf("expected concurrent COPY to be rejected, got %v", err)
212+
}
213+
}
214+
189215
func TestDoCopyFromStdinRejectsMissingPlaceholder(t *testing.T) {
190216
session, cleanup := newSessionWithInMemoryDuckDB(t)
191217
defer cleanup()

0 commit comments

Comments
 (0)