Skip to content

Commit a6c79cb

Browse files
committed
dataflow-state: Undelete compaction progress monitor
We figured out that this code was broken due to a bug in the notify crate on MacOS. Change-Id: I32b64ae57df92ef6ad01d0f59e420a47e4d60495 Reviewed-on: https://gerrit.readyset.name/c/readyset/+/8157 Tested-by: Buildkite CI Reviewed-by: Jason Brown <[email protected]>
1 parent db7f758 commit a6c79cb

File tree

1 file changed

+128
-1
lines changed
  • dataflow-state/src/persistent_state

1 file changed

+128
-1
lines changed

dataflow-state/src/persistent_state/mod.rs

Lines changed: 128 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ mod recorded;
6666

6767
use std::borrow::Cow;
6868
use std::cmp::Ordering;
69-
use std::io;
69+
use std::io::{self, Read};
7070
use std::path::PathBuf;
7171
use std::str::FromStr;
7272
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
@@ -1276,6 +1276,128 @@ impl IndexParams {
12761276
}
12771277
}
12781278

1279+
// Getting the current compaction progress is as easy as getting the property value
1280+
// for `rocksdb.num-files-at-level<N>` NOT.
1281+
// Essentially we have to implement a huge hack here, since the only way I could find
1282+
// to get accurate progress stats is from reading the DB LOG directly. This is very
1283+
// fragile, as it depends on the LOG format not changing, and if it does the report
1284+
// will be less accurate or not work at all. This is however not critical.
1285+
fn compaction_progress_watcher(table_name: &str, db: &DB) -> anyhow::Result<impl notify::Watcher> {
1286+
use std::fs::File;
1287+
use std::io::{Seek, SeekFrom};
1288+
1289+
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
1290+
1291+
// We open the LOG file, skip to the end, and begin watching for change events
1292+
// on it in order to get the latest log entries as they come
1293+
let log_path = db.path().join("LOG");
1294+
let (tx, rx) = std::sync::mpsc::channel();
1295+
1296+
// note: the Config is ignored in `RecommendedWatcher` :meh:
1297+
let mut log_watcher = RecommendedWatcher::new(tx, Config::default())?;
1298+
let table = table_name.to_owned();
1299+
// Row count, but without a lock
1300+
let pk_cf = db.cf_handle(PK_CF).unwrap();
1301+
let row_count = db
1302+
.property_int_value_cf(pk_cf, "rocksdb.estimate-num-keys")
1303+
.unwrap()
1304+
.unwrap() as usize;
1305+
let mut log_file = File::options().read(true).open(&log_path)?;
1306+
log_file.seek(SeekFrom::End(0))?;
1307+
1308+
log_watcher.watch(log_path.as_ref(), RecursiveMode::NonRecursive)?;
1309+
1310+
let mut monitor = move || -> anyhow::Result<()> {
1311+
const REPORT_INTERVAL: Duration = Duration::from_secs(1);
1312+
let mut compaction_started = false;
1313+
let mut buf = String::new();
1314+
let mut first_stage_keys = 0;
1315+
let mut second_stage_keys = 0;
1316+
let mut last_report = Instant::now();
1317+
1318+
// The thread will stop once the notifier drops
1319+
while rx.recv().is_ok() {
1320+
// When we get notified about changes to LOG, we read its latest contents
1321+
log_file.read_to_string(&mut buf)?;
1322+
for line in buf.lines() {
1323+
if line.contains("compaction_started") && line.contains("ManualCompaction") {
1324+
compaction_started = true;
1325+
}
1326+
if !compaction_started {
1327+
continue;
1328+
}
1329+
// As far as I can tell compaction has four stages, first files are created for
1330+
// the appropriate keys, then are indexed, then moved to the
1331+
// correct level (zero cost in case of manual compaction),
1332+
// finally old files are deleted. The final two stages are almost immediate so
1333+
// we don't care about logging them. We only going to log
1334+
// progress for the first two stages.
1335+
1336+
// In the first stage we have log entries of the form `Generated table #53:
1337+
// 3314046 keys, 268436084 bytes` we will be looking for the
1338+
// number of keys in the table, it seems when we have all of the keys processed
1339+
// is when first stage is done.
1340+
if line.contains("Generated table") {
1341+
// Look for number of keys
1342+
let mut fields = line.split(' ').peekable();
1343+
while let Some(f) = fields.next() {
1344+
if fields.peek() == Some(&"keys,") {
1345+
first_stage_keys += f.parse().unwrap_or(0);
1346+
break;
1347+
}
1348+
}
1349+
}
1350+
// In the second stage we have log entries of the form
1351+
// `Number of Keys per prefix Histogram: Count: 1313702 Average: 1.0000 StdDev:
1352+
// 0.00` Here we are looking for the Count to figure out the
1353+
// number of keys processed in this stage
1354+
if line.contains("Number of Keys per prefix Histogram") {
1355+
// Look for number of keys
1356+
let mut fields = line.split(' ').peekable();
1357+
while let Some(f) = fields.next() {
1358+
if f == "Count:" {
1359+
let count_per_hist =
1360+
fields.next().and_then(|f| f.parse().ok()).unwrap_or(0);
1361+
let avg_per_hist =
1362+
fields.nth(1).and_then(|f| f.parse().ok()).unwrap_or(0f64);
1363+
second_stage_keys += (count_per_hist as f64 * avg_per_hist) as u64;
1364+
break;
1365+
}
1366+
}
1367+
}
1368+
1369+
if last_report.elapsed() > REPORT_INTERVAL {
1370+
let first_stage = format!(
1371+
"{:.2}%",
1372+
(first_stage_keys as f64 / row_count as f64) * 100.0
1373+
);
1374+
let second_stage = format!(
1375+
"{:.2}%",
1376+
(second_stage_keys as f64 / row_count as f64) * 100.0
1377+
);
1378+
info!(%table, %first_stage, %second_stage, "Compaction");
1379+
last_report = Instant::now();
1380+
}
1381+
}
1382+
buf.clear();
1383+
}
1384+
1385+
Ok(())
1386+
};
1387+
1388+
let table = table_name.to_owned();
1389+
1390+
let s = std::thread::Builder::new();
1391+
s.name("Compaction Monitor".to_string())
1392+
.spawn_wrapper(move || {
1393+
if let Err(err) = monitor() {
1394+
warn!(%err, %table, "Compaction monitor error");
1395+
}
1396+
})?;
1397+
1398+
Ok(log_watcher)
1399+
}
1400+
12791401
fn compact_cf(table: &str, db: &DB, index: &PersistentIndex, opts: &CompactOptions) {
12801402
let cf = match db.cf_handle(&index.column_family) {
12811403
Some(cf) => cf,
@@ -1285,6 +1407,11 @@ fn compact_cf(table: &str, db: &DB, index: &PersistentIndex, opts: &CompactOptio
12851407
}
12861408
};
12871409

1410+
let _log_watcher = compaction_progress_watcher(table, db);
1411+
if let Err(error) = &_log_watcher {
1412+
warn!(%error, %table, "Could not start compaction monitor");
1413+
}
1414+
12881415
info!(%table, cf = %index.column_family, "Compaction starting");
12891416
db.compact_range_cf_opt(cf, Option::<&[u8]>::None, Option::<&[u8]>::None, opts);
12901417
info!(%table, cf = %index.column_family, "Compaction finished");

0 commit comments

Comments
 (0)