Skip to content

kvdb/sqlbase: make NextSequence atomic, using a single SQL statement #9676

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 79 additions & 23 deletions kvdb/sqlbase/readwrite_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,46 +323,71 @@ func (b *readWriteBucket) Put(key, value []byte) error {
return nil
}

// Delete deletes the key/value pointed to by the passed key.
// Returns ErrKeyRequired if the passed key is empty.
// Delete deletes the key/value pointed to by the passed key. Returns
// ErrKeyRequired if the passed key is empty.
func (b *readWriteBucket) Delete(key []byte) error {
if key == nil {
// Deleting a nil key seems like a no-op in original context,
// maintain that.
return nil
}

if len(key) == 0 {
return walletdb.ErrKeyRequired
}

// Check to see if a bucket with this key exists.
var dummy int
// First, try to delete the key directly, but only if it's a value
// (value IS NOT NULL).
result, err := b.tx.Exec(
"DELETE FROM "+b.table+" WHERE key=$1 AND "+
parentSelector(b.id)+" AND value IS NOT NULL",
key,
)
if err != nil {
return fmt.Errorf("error attempting to delete "+
"key %x: %w", key, err)
}

rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("error getting rows affected for "+
"key %x: %w", key, err)
}

// If we deleted exactly one row, we're done. It was a key-value pair.
if rowsAffected == 1 {
return nil
}

// If rowsAffected is 0, it means either:
// 1. The key doesn't exist at all.
// 2. The key exists, but `value IS NULL` (it's a bucket).
//
// We need to check for case 2 to return ErrIncompatibleValue.
var existsAsBucket int
row, cancel := b.tx.QueryRow(
"SELECT 1 FROM "+b.table+" WHERE "+parentSelector(b.id)+
" AND key=$1 AND value IS NULL", key,
)
defer cancel()
err := row.Scan(&dummy)
switch {
// No bucket exists, proceed to deletion of the key.
case err == sql.ErrNoRows:
err = row.Scan(&existsAsBucket)

case err != nil:
return err

// Bucket exists.
default:
if err == nil {
// Scan succeeded without error, meaning we found a row where
// value IS NULL. It's a bucket.
return walletdb.ErrIncompatibleValue
}

_, err = b.tx.Exec(
"DELETE FROM "+b.table+" WHERE key=$1 AND "+
parentSelector(b.id)+" AND value IS NOT NULL",
key,
)
if err != nil {
return err
if err == sql.ErrNoRows {
// Key didn't exist as a value (rowsAffected==0) AND it doesn't
// exist as a bucket. So the key just wasn't found. Deleting a
// non-existent key is often a no-op.
}

return nil

// Some other database error occurred during the check.
return fmt.Errorf("error checking if key %x exists as bucket: %w",
key, err)
}

// ReadWriteCursor returns a new read-write cursor for this bucket.
Expand All @@ -379,9 +404,40 @@ func (b *readWriteBucket) Tx() walletdb.ReadWriteTx {
// Note that this is not a thread safe function and as such it must not be used
// for synchronization.
func (b *readWriteBucket) NextSequence() (uint64, error) {
seq := b.Sequence() + 1
if b.id == nil {
// Sequence numbers are only supported for nested buckets, as
// top-level buckets don't have a unique row ID in the same way.
panic("sequence not supported on top level bucket")
}

var nextSeq uint64
row, cancel := b.tx.QueryRow(
// Atomically increment the sequence number for the bucket ID.
// We use COALESCE to handle the case where the sequence is NULL
// (never set before), treating it as 0 before incrementing. The
// RETURNING clause gives us the new value.
"UPDATE "+b.table+" SET sequence = COALESCE(sequence, 0) + 1 "+
"WHERE id=$1 RETURNING sequence",
b.id,
)
defer cancel()

err := row.Scan(&nextSeq)
if err != nil {
// If we get sql.ErrNoRows, it means the bucket ID didn't exist,
// which shouldn't happen if the bucket was created correctly.
// We wrap the error for clarity.
if errors.Is(err, sql.ErrNoRows) {
return 0, fmt.Errorf("bucket with id %d not found "+
"for sequence update", *b.id)
}

// Return other potential scan or query errors directly.
return 0, fmt.Errorf("failed to update and retrieve "+
"sequence for bucket id %d: %w", *b.id, err)
}

return seq, b.SetSequence(seq)
return nextSeq, nil
}

// SetSequence updates the sequence number for the bucket.
Expand Down
65 changes: 39 additions & 26 deletions kvdb/sqlbase/readwrite_cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ package sqlbase

import (
"database/sql"
"errors"
"fmt"

"github.com/btcsuite/btcwallet/walletdb"
)
Expand Down Expand Up @@ -184,49 +186,60 @@ func (c *readWriteCursor) Seek(seek []byte) ([]byte, []byte) {
}

// Delete removes the current key/value pair the cursor is at without
// invalidating the cursor. Returns ErrIncompatibleValue if attempted
// when the cursor points to a nested bucket.
// invalidating the cursor. Returns ErrIncompatibleValue if attempted when the
// cursor points to a nested bucket.
func (c *readWriteCursor) Delete() error {
// Get first record at or after cursor.
var key []byte
row, cancel := c.bucket.tx.QueryRow(
"SELECT key FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
" AND key>=$1 ORDER BY key LIMIT 1",
c.currKey,
)
defer cancel()
err := row.Scan(&key)

switch {
case err == sql.ErrNoRows:
return nil

case err != nil:
panic(err)
if c.currKey == nil {
// Cursor might not be positioned on a valid key.
return errors.New("cursor not positioned on a key")
}

// Delete record.
// Attempt to delete the key the cursor is pointing at, but only if it's
// a value.
result, err := c.bucket.tx.Exec(
"DELETE FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
" AND key=$1 AND value IS NOT NULL",
key,
c.currKey,
)
if err != nil {
panic(err)
}

rows, err := result.RowsAffected()
if err != nil {
return err
return fmt.Errorf("error getting rows affected "+
"for cursor key %x: %w", c.currKey, err)
}

// If deletion succeeded, we are done.
if rows == 1 {
return nil
}

// The key exists but nothing has been deleted. This means that the key
// must have been a bucket key.
if rows != 1 {
// If rows == 0, the key either didn't exist anymore (concurrent
// delete?) or it was a bucket. Check if it's a bucket.
var existsAsBucket int
rowCheck, cancelCheck := c.bucket.tx.QueryRow(
"SELECT 1 FROM "+c.bucket.table+" WHERE "+
parentSelector(c.bucket.id)+
" AND key=$1 AND value IS NULL", c.currKey,
)
defer cancelCheck()

errCheck := rowCheck.Scan(&existsAsBucket)
if errCheck == nil {
// Found it, and value IS NULL -> It's a bucket.
return walletdb.ErrIncompatibleValue
}

return err
if errCheck == sql.ErrNoRows {
return nil
}

// Some other error during the check.
//
// TODO(roasbeef): panic here like above/
return fmt.Errorf("error checking if cursor key %x is "+
"bucket: %w", c.currKey, errCheck)
}
Loading