Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .clang-format
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
BasedOnStyle: Google
IndentWidth: 4
8 changes: 0 additions & 8 deletions pkg/backend/dqlite/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"strings"

"github.com/canonical/go-dqlite/v3"
"github.com/canonical/go-dqlite/v3/app"
"github.com/canonical/go-dqlite/v3/driver"
"github.com/canonical/k8s-dqlite/pkg/backend/sqlite"
Expand All @@ -15,13 +14,6 @@ import (
"github.com/sirupsen/logrus"
)

func init() {
// We assume SQLite will be used multi-threaded
if err := dqlite.ConfigMultiThread(); err != nil {
panic(fmt.Errorf("failed to set dqlite multithreaded mode: %v", err))
}
}

type Driver struct {
*sqlite.Driver

Expand Down
56 changes: 49 additions & 7 deletions pkg/backend/sqlite/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ var (
AND id = ?`

getSizeSQL = `
SELECT (page_count - freelist_count) * page_size
FROM pragma_page_count(), pragma_page_size(), pragma_freelist_count()`
SELECT page_count * page_size
FROM pragma_page_count(), pragma_page_size()`
)

const maxRetries = 500
Expand Down Expand Up @@ -351,10 +351,14 @@ func setup(ctx context.Context, db database.Interface) (SchemaVersion, error) {
}
defer conn.Close()

err = maybeEnableAutovacuum(ctx, conn)
if err != nil {
return 0, err
}

// Optimistically ask for the user_version without starting a transaction
var currentSchemaVersion SchemaVersion
row := conn.QueryRowContext(ctx, "PRAGMA user_version")
if err := row.Scan(&currentSchemaVersion); err != nil {
currentSchemaVersion, err := queryValue[SchemaVersion](ctx, conn, "PRAGMA user_version")
if err != nil {
return 0, err
}

Expand All @@ -373,8 +377,8 @@ func setup(ctx context.Context, db database.Interface) (SchemaVersion, error) {
}
defer txn.Rollback()

row = txn.QueryRowContext(ctx, `PRAGMA user_version`)
if err := row.Scan(&currentSchemaVersion); err != nil {
currentSchemaVersion, err = queryValue[SchemaVersion](ctx, txn, `PRAGMA user_version`)
if err != nil {
return 0, err
}

Expand Down Expand Up @@ -402,6 +406,44 @@ func setup(ctx context.Context, db database.Interface) (SchemaVersion, error) {
return currentSchemaVersion, txn.Commit()
}

// maybeEnableAutovacuum enables autovacuum for small databases. This is necessary
// to avoid having to deal with big transactions which might create issues for clusters.
func maybeEnableAutovacuum(ctx context.Context, conn *sql.Conn) error {
const autovacuumThreshold = 4 * 1024 * 1024 // 4 MiB

size, err := queryValue[int32](ctx, conn, `
SELECT (page_count - freelist_count) * page_size
FROM pragma_page_count(), pragma_freelist_count(), pragma_page_size()`)
if err != nil {
return err
}

// If the database is small, we can have an initial VACUUM operation which we know it
// will always yield a transaction smaller than its initial size and as such will not
// create big problems when using RAFT protocol.
if size <= autovacuumThreshold {
if _, err := conn.ExecContext(ctx, "PRAGMA auto_vacuum = FULL; VACUUM"); err != nil {
return err
}
}

return nil
}

type Queryer interface {
QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error)
QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row
}

func queryValue[T any](ctx context.Context, db Queryer, query string, args ...interface{}) (T, error) {
var value T
row := db.QueryRowContext(ctx, query, args...)
if err := row.Scan(&value); err != nil {
return value, err
}
return value, nil
}

func (d *Driver) query(ctx context.Context, txName, query string, args ...interface{}) (rows *sql.Rows, err error) {
ctx, span := otelTracer.Start(ctx, fmt.Sprintf("%s.query", otelName))
defer func() {
Expand Down
60 changes: 60 additions & 0 deletions pkg/backend/sqlite/extensions.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#include "extensions.h"

#include <assert.h>
#include <sqlite3.h>
#include <stdatomic.h>
#include <stdint.h>
#include <string.h>

#define UNUSED(x) (void)(x)

struct vacuum_config_s {
size_t percent;
size_t max_pages;
} vacuum_config;

static unsigned int autovacuum_pages_callback(void *pClientData,
const char *zSchema,
unsigned int nDbPage,
unsigned int nFreePage,
unsigned int nBytePerPage) {
UNUSED(pClientData);
UNUSED(zSchema);
UNUSED(nBytePerPage);

assert(nDbPage >= nFreePage);

unsigned int keep_amount =
(nDbPage - nFreePage) * vacuum_config.percent / 100;
if (nFreePage < keep_amount) {
return 0;
}

unsigned int free_amount = nFreePage - keep_amount;
if (free_amount > vacuum_config.max_pages) {
free_amount = vacuum_config.max_pages;
}
return free_amount;
}

static sqlite3_error auto_instrument_connection(
sqlite3 *connection, const char **pzErrMsg,
const struct sqlite3_api_routines *pThunk) {
UNUSED(pzErrMsg);
UNUSED(pThunk);

return sqlite3_autovacuum_pages(connection, autovacuum_pages_callback, NULL,
NULL);
}

sqlite3_error sqlite3_enable_autovacuum_limit(size_t percent,
size_t max_pages) {
vacuum_config.percent = percent;
vacuum_config.max_pages = max_pages;

return sqlite3_auto_extension((void (*)())(auto_instrument_connection));
}

void sqlite3_disable_autovacuum_limit() {
sqlite3_cancel_auto_extension((void (*)())(auto_instrument_connection));
}
13 changes: 13 additions & 0 deletions pkg/backend/sqlite/extensions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package sqlite

// #cgo LDFLAGS: -lsqlite3
// #include "extensions.h"
import "C"

import "github.com/mattn/go-sqlite3"

func init() {
if err := C.sqlite3_enable_autovacuum_limit(10, 16); err != C.SQLITE_OK {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we/users going to need to tune the percentage and max pages used in auto-vacuuming?

panic(sqlite3.ErrNo(err))
}
}
17 changes: 17 additions & 0 deletions pkg/backend/sqlite/extensions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include <memory.h> // for size_t
#include <sqlite3.h>
#include <stdint.h>

typedef int sqlite3_error;

// sqlite3_enable_autovacuum_limit limits the amount of autovacuumed
// pages to a percentage of the total database size. This is useful
// to decide on a compromise between performance and resource usage
// - percent: percentage of database size to keep, so that the databse
// so that the databse does not need to grow again
// - max_pages: maximum number of pages to autovacuum
sqlite3_error sqlite3_enable_autovacuum_limit(size_t percent, size_t max_pages);

// sqlite3_disable_autovacuum_limit disables the autovacuum limit enabled
// by @sqlite3_enable_autovacuum_limit.
void sqlite3_disable_autovacuum_limit();
49 changes: 46 additions & 3 deletions test/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestCompaction(t *testing.T) {
g.Expect(finalSize).To(BeNumerically("==", initialSize)) // Expecting no compaction.
})

t.Run("LargeDatabaseDeleteFivePercent", func(t *testing.T) {
t.Run("LargeDatabaseDeleteFew", func(t *testing.T) {
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -60,7 +60,7 @@ func TestCompaction(t *testing.T) {
if _, err := insertMany(ctx, tx, "key", 100, 10_000); err != nil {
return err
}
if _, err := updateMany(ctx, tx, "key", 100, 500); err != nil {
if _, err := updateMany(ctx, tx, "key", 100, 5_000); err != nil {
return err
}
if _, err := deleteMany(ctx, tx, "key", 500); err != nil {
Expand All @@ -76,6 +76,49 @@ func TestCompaction(t *testing.T) {
err = server.backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

// Expect compaction not to increase the size.
finalSize, err := server.backend.DbSize(ctx)
g.Expect(err).To(BeNil())
g.Expect(finalSize).To(BeNumerically("<=", initialSize))

// Expect for keys to still be there.
rev, count, err := server.backend.Count(ctx, []byte("key/"), []byte("key0"), 0)
g.Expect(err).To(BeNil())
g.Expect(count).To(Equal(int64(10_000 - 500)))

// Expect old revisions not to be there anymore.
_, _, err = server.backend.List(ctx, []byte("key/"), []byte("key0"), 0, rev-400)
g.Expect(err).To(Not(BeNil()))
})

t.Run("LargeDatabaseDeleteMost", func(t *testing.T) {
g := NewWithT(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

server := newK8sDqliteServer(ctx, t, &k8sDqliteConfig{
backendType: backendType,
setup: func(ctx context.Context, tx *sql.Tx) error {
if _, err := insertMany(ctx, tx, "key", 100, 10_000); err != nil {
return err
}
if _, err := updateMany(ctx, tx, "key", 100, 500); err != nil {
return err
}
if _, err := deleteMany(ctx, tx, "key", 9_500); err != nil {
return err
}
return nil
},
})

initialSize, err := server.backend.DbSize(ctx)
g.Expect(err).To(BeNil())

err = server.backend.DoCompact(ctx)
g.Expect(err).To(BeNil())

// Expect compaction to reduce the size.
finalSize, err := server.backend.DbSize(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we please be more precise here and check whether the size aligns with the percentage we set in qlite3_enable_autovacuum_limit?

g.Expect(err).To(BeNil())
Expand All @@ -84,7 +127,7 @@ func TestCompaction(t *testing.T) {
// Expect for keys to still be there.
rev, count, err := server.backend.Count(ctx, []byte("key/"), []byte("key0"), 0)
g.Expect(err).To(BeNil())
g.Expect(count).To(Equal(int64(10_000 - 500)))
g.Expect(count).To(Equal(int64(10_000 - 9_500)))

// Expect old revisions not to be there anymore.
_, _, err = server.backend.List(ctx, []byte("key/"), []byte("key0"), 0, rev-400)
Expand Down
Loading