Skip to content

Commit 680ced2

Browse files
authored
fix: make COPY TO STDOUT robust to SQL errors (#323)
1 parent 76120eb commit 680ced2

File tree

3 files changed

+38
-15
lines changed

3 files changed

+38
-15
lines changed

.github/workflows/replication-test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ jobs:
118118
apecloud/myduckserver:latest
119119
120120
# Wait and check container status
121-
for i in {1..10}; do
121+
for i in {1..15}; do
122122
if ! docker ps | grep -q myduck; then
123123
echo "MyDuck container exited unexpectedly"
124124
docker logs myduck

pgserver/connection_handler.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1420,20 +1420,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
14201420
}
14211421

14221422
done := make(chan struct{})
1423-
var sendErr atomic.Value
1423+
var globalErr atomic.Value
1424+
var blocked atomic.Bool
1425+
blocked.Store(true)
14241426
go func() {
14251427
defer close(done)
14261428

14271429
// Open the pipe for reading.
14281430
ctx.GetLogger().Tracef("Opening FIFO pipe for reading: %s", pipePath)
14291431
pipe, err := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
1432+
blocked.Store(false)
14301433
if err != nil {
1431-
sendErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err))
1434+
globalErr.Store(fmt.Errorf("failed to open pipe for reading: %w", err))
14321435
cancel()
14331436
return
14341437
}
14351438
defer pipe.Close()
14361439

1440+
// If the error has been set, then we should cancel the operation.
1441+
if globalErr.Load() != nil {
1442+
cancel()
1443+
return
1444+
}
1445+
14371446
ctx.GetLogger().Debug("Copying data from the pipe to the client")
14381447
defer func() {
14391448
ctx.GetLogger().Debug("Finished copying data from the pipe to the client")
@@ -1464,7 +1473,7 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
14641473
if err == io.EOF {
14651474
break
14661475
}
1467-
sendErr.Store(err)
1476+
globalErr.Store(err)
14681477
cancel()
14691478
return
14701479
}
@@ -1473,22 +1482,22 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
14731482
count := bytes.Count(line, []byte{'\t'})
14741483
err := sendCopyOutResponse(count + 1)
14751484
if err != nil {
1476-
sendErr.Store(err)
1485+
globalErr.Store(err)
14771486
cancel()
14781487
return
14791488
}
14801489
}
14811490
err = sendCopyData(line)
14821491
if err != nil {
1483-
sendErr.Store(err)
1492+
globalErr.Store(err)
14841493
cancel()
14851494
return
14861495
}
14871496
}
14881497
default:
14891498
err := sendCopyOutResponse(1)
14901499
if err != nil {
1491-
sendErr.Store(err)
1500+
globalErr.Store(err)
14921501
cancel()
14931502
return
14941503
}
@@ -1500,14 +1509,14 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
15001509
if err == io.EOF {
15011510
break
15021511
}
1503-
sendErr.Store(err)
1512+
globalErr.Store(err)
15041513
cancel()
15051514
return
15061515
}
15071516
if n > 0 {
15081517
err := sendCopyData(buf[:n])
15091518
if err != nil {
1510-
sendErr.Store(err)
1519+
globalErr.Store(err)
15111520
cancel()
15121521
return
15131522
}
@@ -1519,16 +1528,29 @@ func (h *ConnectionHandler) handleCopyToStdout(query ConvertedStatement, copyTo
15191528
select {
15201529
case <-ctx.Done(): // Context is canceled
15211530
<-done
1522-
err, _ := sendErr.Load().(error)
1531+
err, _ := globalErr.Load().(error)
15231532
return errors.Join(ctx.Err(), err)
15241533
case result := <-ch:
1534+
if blocked.Load() {
1535+
// If the pipe is still opened for reading but the writer has exited,
1536+
// then we need to open the pipe for writing again to unblock the reader.
1537+
globalErr.Store(errors.Join(
1538+
fmt.Errorf("pipe is opened for reading but the writer has exited"),
1539+
result.Err,
1540+
))
1541+
pipe, _ := os.OpenFile(pipePath, os.O_WRONLY, os.ModeNamedPipe)
1542+
if pipe != nil {
1543+
pipe.Close()
1544+
}
1545+
}
1546+
15251547
<-done
15261548

15271549
if result.Err != nil {
15281550
return fmt.Errorf("failed to copy data: %w", result.Err)
15291551
}
15301552

1531-
if err, ok := sendErr.Load().(error); ok {
1553+
if err, ok := globalErr.Load().(error); ok {
15321554
return err
15331555
}
15341556

pgserver/dataloader.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,16 @@ type PipeDataLoader struct {
5757
pipePath string
5858
read func()
5959
pipe atomic.Pointer[os.File] // for writing
60-
opening atomic.Bool // for writing
60+
blocked atomic.Bool // for writing
6161
errPipe atomic.Pointer[os.File] // for error handling
6262
rowCount chan int64
6363
err atomic.Pointer[error]
6464
logger *logrus.Entry
6565
}
6666

6767
func (loader *PipeDataLoader) Start() <-chan error {
68+
loader.blocked.Store(true)
69+
6870
// Open the reader.
6971
go loader.read()
7072

@@ -76,9 +78,8 @@ func (loader *PipeDataLoader) Start() <-chan error {
7678
// Open the pipe for writing.
7779
// This operation will block until the reader opens the pipe for reading.
7880
loader.logger.Debugf("Opening pipe for writing: %s", loader.pipePath)
79-
loader.opening.Store(true)
8081
pipe, err := os.OpenFile(loader.pipePath, os.O_WRONLY, os.ModeNamedPipe)
81-
loader.opening.Store(false)
82+
loader.blocked.Store(false)
8283
if err != nil {
8384
ready <- err
8485
return
@@ -259,7 +260,7 @@ func (loader *CsvDataLoader) executeCopy(sql string, pipePath string) {
259260
if err != nil {
260261
loader.ctx.GetLogger().Error(err)
261262
loader.err.Store(&err)
262-
if loader.opening.Load() {
263+
if loader.blocked.Load() {
263264
// Open the pipe once to unblock the writer
264265
pipe, _ := os.OpenFile(pipePath, os.O_RDONLY, os.ModeNamedPipe)
265266
loader.errPipe.Store(pipe)

0 commit comments

Comments
 (0)