Skip to content

bottomless: add zstd compression option #468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bottomless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL"

[dependencies]
anyhow = "1.0.66"
async-compression = { version = "0.3.15", features = ["tokio", "gzip"] }
async-compression = { version = "0.4.4", features = ["tokio", "gzip", "zstd"] }
aws-config = { version = "0.55" }
aws-sdk-s3 = { version = "0.28" }
bytes = "1"
Expand Down
5 changes: 5 additions & 0 deletions bottomless/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ impl WalCopier {
wal.copy_frames(&mut gzip, len).await?;
gzip.shutdown().await?;
}
CompressionKind::Zstd => {
let mut zstd = async_compression::tokio::write::ZstdEncoder::new(&mut out);
wal.copy_frames(&mut zstd, len).await?;
zstd.shutdown().await?;
}
}
if tracing::enabled!(tracing::Level::DEBUG) {
let elapsed = Instant::now() - period_start;
Expand Down
6 changes: 5 additions & 1 deletion bottomless/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::bufread::{GzipDecoder, ZstdDecoder};
use aws_sdk_s3::primitives::ByteStream;
use std::io::ErrorKind;
use std::pin::Pin;
Expand Down Expand Up @@ -32,6 +32,10 @@ impl BatchReader {
let gzip = GzipDecoder::new(reader);
Box::pin(gzip)
}
CompressionKind::Zstd => {
let zstd = ZstdDecoder::new(reader);
Box::pin(zstd)
}
},
}
}
Expand Down
115 changes: 84 additions & 31 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
use crate::wal::WalFileReader;
use anyhow::{anyhow, bail};
use arc_swap::ArcSwapOption;
use async_compression::tokio::write::GzipEncoder;
use async_compression::tokio::write::{GzipEncoder, ZstdEncoder};
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
Expand Down Expand Up @@ -171,15 +171,15 @@ impl Options {
let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok();
let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok();
let max_frames_per_batch =
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::<usize>()?;
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::<usize>()?;
let s3_upload_max_parallelism =
env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::<usize>()?;
let restore_transaction_page_swap_after =
env_var_or("LIBSQL_BOTTOMLESS_RESTORE_TXN_SWAP_THRESHOLD", 1000).parse::<u32>()?;
let restore_transaction_cache_fpath =
env_var_or("LIBSQL_BOTTOMLESS_RESTORE_TXN_FILE", ".bottomless.restore");
let use_compression =
CompressionKind::parse(&env_var_or("LIBSQL_BOTTOMLESS_COMPRESSION", "gz"))
CompressionKind::parse(&env_var_or("LIBSQL_BOTTOMLESS_COMPRESSION", "zstd"))
.map_err(|e| anyhow!("unknown compression kind: {}", e))?;
let verify_crc = match env_var_or("LIBSQL_BOTTOMLESS_VERIFY_CRC", true)
.to_lowercase()
Expand Down Expand Up @@ -653,7 +653,7 @@ impl Replicator {
CompressionKind::None => Ok(ByteStream::from_path(db_path).await?),
CompressionKind::Gzip => {
let mut reader = File::open(db_path).await?;
let gzip_path = Self::db_gzip_path(db_path);
let gzip_path = Self::db_compressed_path(db_path, "gz");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
Expand All @@ -671,13 +671,33 @@ impl Replicator {
);
Ok(ByteStream::from_path(gzip_path).await?)
}
CompressionKind::Zstd => {
let mut reader = File::open(db_path).await?;
let zstd_path = Self::db_compressed_path(db_path, "zstd");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.truncate(true)
.open(&zstd_path)
.await?;
let mut writer = ZstdEncoder::new(compressed_file);
let size = tokio::io::copy(&mut reader, &mut writer).await?;
writer.shutdown().await?;
tracing::debug!(
"Compressed database file ({} bytes) into `{}`",
size,
zstd_path.display()
);
Ok(ByteStream::from_path(zstd_path).await?)
}
}
}

fn db_gzip_path(db_path: &Path) -> PathBuf {
let mut gzip_path = db_path.to_path_buf();
gzip_path.pop();
gzip_path.join("db.gz")
fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf {
let mut compressed_path: PathBuf = db_path.to_path_buf();
compressed_path.pop();
compressed_path.join(format!("db.{suffix}"))
}

fn restore_db_path(&self) -> PathBuf {
Expand Down Expand Up @@ -816,9 +836,10 @@ impl Replicator {
let _ = snapshot_notifier.send(Ok(Some(generation)));
let elapsed = Instant::now() - start;
tracing::debug!("Snapshot upload finished (took {:?})", elapsed);
// cleanup gzip database snapshot if exists
let gzip_path = Self::db_gzip_path(&db_path);
let _ = tokio::fs::remove_file(gzip_path).await;
// cleanup gzip/zstd database snapshot if exists
for suffix in &["gz", "zstd"] {
let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await;
}
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);
Expand Down Expand Up @@ -1160,31 +1181,58 @@ impl Replicator {
}

async fn restore_from_snapshot(&mut self, generation: &Uuid, db: &mut File) -> Result<bool> {
let main_db_path = match self.use_compression {
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
let algos_to_try = match self.use_compression {
CompressionKind::None => &[
CompressionKind::None,
CompressionKind::Zstd,
CompressionKind::Gzip,
],
CompressionKind::Gzip => &[
CompressionKind::Gzip,
CompressionKind::Zstd,
CompressionKind::None,
],
CompressionKind::Zstd => &[
CompressionKind::Zstd,
CompressionKind::Gzip,
CompressionKind::None,
],
};

if let Ok(db_file) = self.get_object(main_db_path).send().await {
let mut body_reader = db_file.body.into_async_read();
let db_size = match self.use_compression {
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader = async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
for algo in algos_to_try {
let main_db_path = match algo {
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
CompressionKind::Zstd => format!("{}-{}/db.zstd", self.db_name, generation),
};
db.flush().await?;
if let Ok(db_file) = self.get_object(main_db_path).send().await {
let mut body_reader = db_file.body.into_async_read();
let db_size = match algo {
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader =
async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
CompressionKind::Zstd => {
let mut decompress_reader =
async_compression::tokio::bufread::ZstdDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
};
db.flush().await?;

let page_size = Self::read_page_size(db).await?;
self.set_page_size(page_size)?;
tracing::info!("Restored the main database file ({} bytes)", db_size);
Ok(true)
} else {
Ok(false)
let page_size = Self::read_page_size(db).await?;
self.set_page_size(page_size)?;
tracing::info!("Restored the main database file ({} bytes)", db_size);
return Ok(true);
}
}
Ok(false)
}

async fn restore_wal(
Expand Down Expand Up @@ -1235,6 +1283,7 @@ impl Replicator {
Some(result) => result,
None => {
if !key.ends_with(".gz")
&& !key.ends_with(".zstd")
&& !key.ends_with(".db")
&& !key.ends_with(".meta")
&& !key.ends_with(".dep")
Expand Down Expand Up @@ -1423,6 +1472,7 @@ impl Replicator {
let str = fpath.to_str()?;
if str.ends_with(".db")
| str.ends_with(".gz")
| str.ends_with(".zstd")
| str.ends_with(".raw")
| str.ends_with(".meta")
| str.ends_with(".dep")
Expand Down Expand Up @@ -1670,13 +1720,15 @@ pub enum CompressionKind {
#[default]
None,
Gzip,
Zstd,
}

impl CompressionKind {
pub fn parse(kind: &str) -> std::result::Result<Self, &str> {
match kind {
"gz" | "gzip" => Ok(CompressionKind::Gzip),
"raw" | "" => Ok(CompressionKind::None),
"zstd" => Ok(CompressionKind::Zstd),
other => Err(other),
}
}
Expand All @@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind {
match self {
CompressionKind::None => write!(f, "raw"),
CompressionKind::Gzip => write!(f, "gz"),
CompressionKind::Zstd => write!(f, "zstd"),
}
}
}