Description
Using the Appender
I can write from multiple go routines perfectly fine. However, when I set up a ticker to call Flush
at a regular interval it appears a race condition is created that materialises with the following error following error:
database/sql/driver: could not flush appender: duckdb error: PRIMARY KEY or UNIQUE constraint violated: duplicate key "80000000-0000-0000-0000-000000000000": appended data has been invalidated due to corrupt row
My reason for calling Flush
on a regular basis is to minimise the risk of data loss in the event of a process or system issue. Where it does occur I would like to be able to quantify the level of data loss ie. "30 seconds of data".
the documentation mentions that concurrency works in a single process which this is:
DuckDB supports concurrency within a single process according to the following rules. As long as there are no write conflicts, multiple concurrent writes will succeed. Appends will never conflict, even on the same table.
If I surround all calls to Append
and Flush
with a calls to Lock
and Unlock
for a sync.Mutex
it operates fine. Though this would then seem cancel out the concurrent benefits. While in my use-case I can switch to a channel. It seems like something that could impact many other users. Digging into the C implementation of the Appender I can't seem to find any race protection there.
Happy to look into contributing a PR to fix but keen to scope this out, is this a feature/use-case that simply needs a little extra documentation, or a bug where a fix maybe either a lower level use of a mutex or a SafeFlush
function be added?
Environment
Replicated on: iOS, Linux
Go-DuckDB: v1.8.3
DuckDB: v1.0.0 1f98600c2c
Replication
The following code sample can be used to replicate
package main
import (
"context"
"database/sql/driver"
"fmt"
"sync"
"time"
"github.com/gofrs/uuid/v5"
"github.com/marcboeker/go-duckdb"
)
func main() {
fmt.Println("Start")
db := NewDuckDb()
db.RunEventLoop()
wg := &sync.WaitGroup{}
asyncAppend(db, 1, wg)
asyncAppend(db, 2, wg)
asyncAppend(db, 3, wg)
wg.Wait()
fmt.Println("End")
}
func asyncAppend(db *DuckDbAppender, id int, wg *sync.WaitGroup) {
wg.Add(1)
go func() {
for i := 0; i < 1000; i++ {
db.Append(uuid.Must(uuid.NewV4()), time.Now(), fmt.Sprintf("row from routine %d iteration %d", id, i))
time.Sleep(time.Millisecond * 20)
}
wg.Done()
}()
}
type DuckDbAppender struct {
ddb *duckdb.Connector
con driver.Conn
app *duckdb.Appender
ticker *time.Ticker
}
func NewDuckDb() *DuckDbAppender {
dbFilepath := "test.duckdb"
tableName := "test"
schema := `CREATE TABLE IF NOT EXISTS test (
request_id uuid PRIMARY KEY DEFAULT gen_random_uuid(),
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
description varchar(255) NULL
);`
o := &DuckDbAppender{
ticker: time.NewTicker(time.Second * 5),
}
var err error
o.ddb, err = duckdb.NewConnector(fmt.Sprintf("%s?threads=1", dbFilepath), func(execer driver.ExecerContext) error {
_, err := execer.ExecContext(context.TODO(), schema, nil)
if err != nil {
panic(fmt.Errorf("unexpected error executing connection query: %w", err))
}
return nil
})
o.con, err = o.ddb.Connect(context.TODO())
if err != nil {
panic(fmt.Errorf("unexpected error while connecting: %w", err))
}
o.app, err = duckdb.NewAppenderFromConn(o.con, "", tableName)
if err != nil {
panic(fmt.Errorf("unexpected error while connecting: %w", err))
}
return o
}
func (o *DuckDbAppender) RunEventLoop() {
go func() {
for {
select {
case <-o.ticker.C:
{
err := o.app.Flush()
if err != nil {
panic(fmt.Errorf("unexpected error while flushing: %w", err))
}
}
}
}
}()
}
func (o *DuckDbAppender) Append(id uuid.UUID, date time.Time, desc string) error {
err := o.app.AppendRow(
duckdb.UUID(id),
date,
desc,
)
return err
}