Skip to content

Commit 06bb81b

Browse files
authored
libsql wal s3 backend (#1554)
* introduce checkpointer * fmt * impl IoBuf(Mut) for BytesMut * add tokio_util dep * implement FileStreamBody wrapper stream arounf File, to send as payload to s3 * introduce SegmentKey * add copy stream to FileExt compat method * add warning comment * fix imports * add read_at and read_at_async methods * update Backend trait to return segmetn index * introduce S3Backend * add s3 keys formatters * implement Backend for S3Backend * fix imports * remove old implementation * add basic s3 test * remove unused implementation of FileExt for Vec<u8> * add net abstraction * feature gate s3 backend * s3 backend with custom connector * fmt * expose WalRegistry::get_async * expose seal_current method * implement get_durable_frame_no * expose convenient constructor for AsyncStorage * implement Clone for LibsqlWalManager * pass config to Storage methods * add unhandled storage variant * cargo bucket already exist error * return optional segment * remove net abstraction the connector can be passed to the SdkConfig * add basic shell to interact with libsql-wal * fmt * do not publish libsql wal
1 parent e2d12b0 commit 06bb81b

File tree

22 files changed

+1190
-228
lines changed

22 files changed

+1190
-228
lines changed

Cargo.lock

Lines changed: 100 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ rusqlite = { package = "libsql-rusqlite", path = "vendored/rusqlite", version =
4343
"limits",
4444
"hooks",
4545
] }
46+
hyper = { version = "0.14" }
47+
tower = { version = "0.4.13" }
4648

4749
# Config for 'cargo dist'
4850
[workspace.metadata.dist]

libsql-server/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ fallible-iterator = "0.3.0"
3030
futures = "0.3.25"
3131
futures-core = "0.3"
3232
hmac = "0.12"
33-
hyper = { version = "0.14.23", features = ["http2"] }
33+
hyper = { workspace = true, features = ["http2"] }
3434
hyper-rustls = { git = "https://github.com/rustls/hyper-rustls.git", rev = "163b3f5" }
3535
hyper-tungstenite = "0.11"
3636
itertools = "0.10.5"
@@ -74,7 +74,7 @@ tokio-tungstenite = "0.20"
7474
tokio-util = { version = "0.7.8", features = ["io", "io-util"] }
7575
tonic = { version = "0.11", features = ["tls"] }
7676
tonic-web = "0.11"
77-
tower = { version = "0.4.13", features = ["make"] }
77+
tower = { workspace = true, features = ["make"] }
7878
tower-http = { version = "0.3.5", features = ["compression-full", "cors", "trace"] }
7979
tracing = "0.1.37"
8080
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
@@ -97,7 +97,7 @@ arbitrary = { version = "1.3.0", features = ["derive_arbitrary"] }
9797
aws-config = "0.55"
9898
aws-sdk-s3 = "0.28"
9999
env_logger = "0.10"
100-
hyper = { version = "0.14", features = ["client"] }
100+
hyper = { workspace = true, features = ["client"] }
101101
insta = { version = "1.26.0", features = ["json"] }
102102
libsql = { path = "../libsql/"}
103103
libsql-client = { version = "0.6.5", default-features = false, features = ["reqwest_backend"] }

libsql-server/src/schema/db.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@ pub(super) fn get_next_pending_migration_tasks_batch(
181181
status: MigrationTaskStatus,
182182
limit: usize,
183183
) -> Result<Vec<MigrationTask>, Error> {
184+
dbg!(job_id);
185+
dbg!(status, status as u64);
184186
let txn = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
185187
let tasks = txn
186188
.prepare(
@@ -190,6 +192,7 @@ pub(super) fn get_next_pending_migration_tasks_batch(
190192
LIMIT ?",
191193
)?
192194
.query_map((job_id, status as u64, limit), |row| {
195+
dbg!(row);
193196
let task_id = row.get::<_, i64>(0)?;
194197
let namespace = NamespaceName::from_string(row.get::<_, String>(1)?).unwrap();
195198
let status = MigrationTaskStatus::from_int(row.get::<_, u64>(2)?);
@@ -205,6 +208,7 @@ pub(super) fn get_next_pending_migration_tasks_batch(
205208
.collect::<Result<Vec<_>, Error>>()?;
206209

207210
for task in tasks.iter() {
211+
dbg!(task);
208212
txn.execute("INSERT INTO enqueued_tasks VALUES (?)", [task.task_id])?;
209213
}
210214

libsql-server/src/schema/scheduler.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,7 @@ impl Scheduler {
299299
let Some(ref job) = self.current_job else {
300300
return Ok(());
301301
};
302+
dbg!(job);
302303
if self.current_batch.is_empty()
303304
&& matches!(
304305
*job.status(),
@@ -310,7 +311,7 @@ impl Scheduler {
310311
const MAX_BATCH_SIZE: usize = 50;
311312
// get a batch of enqueued tasks
312313
let job_id = job.job_id();
313-
self.current_batch = match *job.status() {
314+
self.current_batch = match dbg!(*job.status()) {
314315
MigrationJobStatus::WaitingDryRun => {
315316
with_conn_async(self.migration_db.clone(), move |conn| {
316317
get_next_pending_migration_tasks_batch(

libsql-wal/Cargo.toml

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
name = "libsql-wal"
33
version = "0.1.0"
44
edition = "2021"
5+
publish = false
56

67
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
78

@@ -30,8 +31,16 @@ uuid = { version = "1.8.0", features = ["v4"] }
3031
walkdir = "2.5.0"
3132
zerocopy = { version = "0.7.32", features = ["derive", "alloc"] }
3233

33-
aws-config = { version = "1", features = ["behavior-version-latest"] }
34-
aws-sdk-s3 = "1"
34+
aws-config = { version = "1", optional = true, features = ["behavior-version-latest"] }
35+
aws-sdk-s3 = { version = "1", optional = true }
36+
http-body = "1.0.0"
37+
tokio-util = "0.7.11"
38+
hyper = { workspace = true, optional = true, features = ["client", "http2"] }
39+
aws-smithy-runtime = { version = "1.6.2", optional = true, features = ["connector-hyper-0-14-x", "client"]}
40+
clap = { version = "4.5.9", optional = true, features = ["derive"] }
41+
inquire = { version = "0.7.5", optional = true }
42+
tracing-subscriber = { version = "0.3.18", optional = true }
43+
aws-credential-types = { version = "1.2.0", optional = true }
3544

3645
[dev-dependencies]
3746
criterion = "0.5.1"
@@ -51,3 +60,24 @@ tokio = { version = "*", features = ["test-util"] }
5160
[[bench]]
5261
name = "benchmarks"
5362
harness = false
63+
64+
[features]
65+
default = ["s3", "shell-bin"]
66+
s3 = [
67+
"dep:hyper",
68+
"dep:aws-smithy-runtime",
69+
"dep:aws-sdk-s3",
70+
"dep:aws-config",
71+
"dep:aws-credential-types",
72+
]
73+
shell-bin = [
74+
"dep:clap",
75+
"dep:inquire",
76+
"s3",
77+
"dep:tracing-subscriber",
78+
]
79+
80+
[[bin]]
81+
name = "shell"
82+
path = "src/bins/shell/main.rs"
83+
required-features = ["shell-bin"]

0 commit comments

Comments
 (0)