Skip to content

Commit 6f6da06

Browse files
committed
bottomless: add xz compression option
Transplanted from libsql/sqld#780
1 parent 18cf448 commit 6f6da06

File tree

4 files changed

+94
-32
lines changed

4 files changed

+94
-32
lines changed

libsql-server/bottomless/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL"
1010

1111
[dependencies]
1212
anyhow = "1.0.66"
13-
async-compression = { version = "0.3.15", features = ["tokio", "gzip"] }
13+
async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] }
1414
aws-config = { version = "0.55" }
1515
aws-sdk-s3 = { version = "0.28" }
1616
bytes = "1"

libsql-server/bottomless/src/backup.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,11 @@ impl WalCopier {
116116
wal.copy_frames(&mut gzip, len).await?;
117117
gzip.shutdown().await?;
118118
}
119+
CompressionKind::Xz => {
120+
let mut xz = async_compression::tokio::write::XzEncoder::new(&mut out);
121+
wal.copy_frames(&mut xz, len).await?;
122+
xz.shutdown().await?;
123+
}
119124
}
120125
if tracing::enabled!(tracing::Level::DEBUG) {
121126
let elapsed = Instant::now() - period_start;

libsql-server/bottomless/src/read.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::replicator::CompressionKind;
22
use crate::wal::WalFrameHeader;
33
use anyhow::Result;
4-
use async_compression::tokio::bufread::GzipDecoder;
4+
use async_compression::tokio::bufread::{GzipDecoder, XzEncoder};
55
use aws_sdk_s3::primitives::ByteStream;
66
use std::io::ErrorKind;
77
use std::pin::Pin;
@@ -32,6 +32,10 @@ impl BatchReader {
3232
let gzip = GzipDecoder::new(reader);
3333
Box::pin(gzip)
3434
}
35+
CompressionKind::Xz => {
36+
let xz = XzEncoder::new(reader);
37+
Box::pin(xz)
38+
}
3539
},
3640
}
3741
}

libsql-server/bottomless/src/replicator.rs

Lines changed: 83 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
55
use crate::wal::WalFileReader;
66
use anyhow::{anyhow, bail};
77
use arc_swap::ArcSwapOption;
8-
use async_compression::tokio::write::GzipEncoder;
8+
use async_compression::tokio::write::{GzipEncoder, XzEncoder};
99
use aws_sdk_s3::config::{Credentials, Region};
1010
use aws_sdk_s3::error::SdkError;
1111
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
@@ -171,7 +171,7 @@ impl Options {
171171
let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok();
172172
let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok();
173173
let max_frames_per_batch =
174-
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::<usize>()?;
174+
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::<usize>()?;
175175
let s3_upload_max_parallelism =
176176
env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::<usize>()?;
177177
let restore_transaction_page_swap_after =
@@ -653,7 +653,7 @@ impl Replicator {
653653
CompressionKind::None => Ok(ByteStream::from_path(db_path).await?),
654654
CompressionKind::Gzip => {
655655
let mut reader = File::open(db_path).await?;
656-
let gzip_path = Self::db_gzip_path(db_path);
656+
let gzip_path = Self::db_compressed_path(db_path, "gz");
657657
let compressed_file = OpenOptions::new()
658658
.create(true)
659659
.write(true)
@@ -671,13 +671,33 @@ impl Replicator {
671671
);
672672
Ok(ByteStream::from_path(gzip_path).await?)
673673
}
674+
CompressionKind::Xz => {
675+
let mut reader = File::open(db_path).await?;
676+
let xz_path = Self::db_compressed_path(db_path, "xz");
677+
let compressed_file = OpenOptions::new()
678+
.create(true)
679+
.write(true)
680+
.read(true)
681+
.truncate(true)
682+
.open(&xz_path)
683+
.await?;
684+
let mut writer = XzEncoder::new(compressed_file);
685+
let size = tokio::io::copy(&mut reader, &mut writer).await?;
686+
writer.shutdown().await?;
687+
tracing::debug!(
688+
"Compressed database file ({} bytes) into `{}`",
689+
size,
690+
xz_path.display()
691+
);
692+
Ok(ByteStream::from_path(xz_path).await?)
693+
}
674694
}
675695
}
676696

677-
fn db_gzip_path(db_path: &Path) -> PathBuf {
678-
let mut gzip_path = db_path.to_path_buf();
679-
gzip_path.pop();
680-
gzip_path.join("db.gz")
697+
fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf {
698+
let mut compressed_path: PathBuf = db_path.to_path_buf();
699+
compressed_path.pop();
700+
compressed_path.join(format!("db.{suffix}"))
681701
}
682702

683703
fn restore_db_path(&self) -> PathBuf {
@@ -816,9 +836,10 @@ impl Replicator {
816836
let _ = snapshot_notifier.send(Ok(Some(generation)));
817837
let elapsed = Instant::now() - start;
818838
tracing::debug!("Snapshot upload finished (took {:?})", elapsed);
819-
// cleanup gzip database snapshot if exists
820-
let gzip_path = Self::db_gzip_path(&db_path);
821-
let _ = tokio::fs::remove_file(gzip_path).await;
839+
// cleanup gzip/xz database snapshot if exists
840+
for suffix in &["gz", "xz"] {
841+
let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await;
842+
}
822843
});
823844
let elapsed = Instant::now() - start_ts;
824845
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);
@@ -1160,31 +1181,58 @@ impl Replicator {
11601181
}
11611182

11621183
async fn restore_from_snapshot(&mut self, generation: &Uuid, db: &mut File) -> Result<bool> {
1163-
let main_db_path = match self.use_compression {
1164-
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
1165-
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
1184+
let algos_to_try = match self.use_compression {
1185+
CompressionKind::None => &[
1186+
CompressionKind::None,
1187+
CompressionKind::Xz,
1188+
CompressionKind::Gzip,
1189+
],
1190+
CompressionKind::Gzip => &[
1191+
CompressionKind::Gzip,
1192+
CompressionKind::Xz,
1193+
CompressionKind::None,
1194+
],
1195+
CompressionKind::Xz => &[
1196+
CompressionKind::Xz,
1197+
CompressionKind::Gzip,
1198+
CompressionKind::None,
1199+
],
11661200
};
11671201

1168-
if let Ok(db_file) = self.get_object(main_db_path).send().await {
1169-
let mut body_reader = db_file.body.into_async_read();
1170-
let db_size = match self.use_compression {
1171-
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
1172-
CompressionKind::Gzip => {
1173-
let mut decompress_reader = async_compression::tokio::bufread::GzipDecoder::new(
1174-
tokio::io::BufReader::new(body_reader),
1175-
);
1176-
tokio::io::copy(&mut decompress_reader, db).await?
1177-
}
1202+
for algo in algos_to_try {
1203+
let main_db_path = match algo {
1204+
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
1205+
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
1206+
CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation),
11781207
};
1179-
db.flush().await?;
1208+
if let Ok(db_file) = self.get_object(main_db_path).send().await {
1209+
let mut body_reader = db_file.body.into_async_read();
1210+
let db_size = match algo {
1211+
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
1212+
CompressionKind::Gzip => {
1213+
let mut decompress_reader =
1214+
async_compression::tokio::bufread::GzipDecoder::new(
1215+
tokio::io::BufReader::new(body_reader),
1216+
);
1217+
tokio::io::copy(&mut decompress_reader, db).await?
1218+
}
1219+
CompressionKind::Xz => {
1220+
let mut decompress_reader =
1221+
async_compression::tokio::bufread::XzDecoder::new(
1222+
tokio::io::BufReader::new(body_reader),
1223+
);
1224+
tokio::io::copy(&mut decompress_reader, db).await?
1225+
}
1226+
};
1227+
db.flush().await?;
11801228

1181-
let page_size = Self::read_page_size(db).await?;
1182-
self.set_page_size(page_size)?;
1183-
tracing::info!("Restored the main database file ({} bytes)", db_size);
1184-
Ok(true)
1185-
} else {
1186-
Ok(false)
1229+
let page_size = Self::read_page_size(db).await?;
1230+
self.set_page_size(page_size)?;
1231+
tracing::info!("Restored the main database file ({} bytes)", db_size);
1232+
return Ok(true);
1233+
}
11871234
}
1235+
Ok(false)
11881236
}
11891237

11901238
async fn restore_wal(
@@ -1235,6 +1283,7 @@ impl Replicator {
12351283
Some(result) => result,
12361284
None => {
12371285
if !key.ends_with(".gz")
1286+
&& !key.ends_with(".xz")
12381287
&& !key.ends_with(".db")
12391288
&& !key.ends_with(".meta")
12401289
&& !key.ends_with(".dep")
@@ -1423,6 +1472,7 @@ impl Replicator {
14231472
let str = fpath.to_str()?;
14241473
if str.ends_with(".db")
14251474
| str.ends_with(".gz")
1475+
| str.ends_with(".xz")
14261476
| str.ends_with(".raw")
14271477
| str.ends_with(".meta")
14281478
| str.ends_with(".dep")
@@ -1670,13 +1720,15 @@ pub enum CompressionKind {
16701720
#[default]
16711721
None,
16721722
Gzip,
1723+
Xz,
16731724
}
16741725

16751726
impl CompressionKind {
16761727
pub fn parse(kind: &str) -> std::result::Result<Self, &str> {
16771728
match kind {
16781729
"gz" | "gzip" => Ok(CompressionKind::Gzip),
16791730
"raw" | "" => Ok(CompressionKind::None),
1731+
"xz" => Ok(CompressionKind::Xz),
16801732
other => Err(other),
16811733
}
16821734
}
@@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind {
16871739
match self {
16881740
CompressionKind::None => write!(f, "raw"),
16891741
CompressionKind::Gzip => write!(f, "gz"),
1742+
CompressionKind::Xz => write!(f, "xz"),
16901743
}
16911744
}
16921745
}

0 commit comments

Comments
 (0)