Skip to content

Commit 94bd76a

Browse files
authored
Less logging (#81)
* reduce logs by a ton * don't try to send a request if there was no statements to send * relax 'no good candidates' errors
1 parent 3e46d71 commit 94bd76a

File tree

9 files changed

+48
-46
lines changed

9 files changed

+48
-46
lines changed

crates/corro-agent/src/agent.rs

+26-31
Original file line numberDiff line numberDiff line change
@@ -957,7 +957,7 @@ async fn clear_overwritten_versions(agent: Agent) {
957957
interval = COMPACT_BOOKED_INTERVAL;
958958
}
959959

960-
info!("starting compaction...");
960+
info!("Starting compaction...");
961961
let start = Instant::now();
962962

963963
let mut to_check: BTreeMap<i64, (ActorId, i64)> = BTreeMap::new();
@@ -991,7 +991,7 @@ async fn clear_overwritten_versions(agent: Agent) {
991991
}
992992
}
993993

994-
info!("got actors and their versions");
994+
debug!("got actors and their versions");
995995

996996
let cleared_versions: BTreeSet<i64> = {
997997
match pool.read().await {
@@ -1030,7 +1030,7 @@ async fn clear_overwritten_versions(agent: Agent) {
10301030
let mut inserted = 0;
10311031

10321032
for (actor_id, to_clear) in to_clear_by_actor {
1033-
info!(%actor_id, "clearing actor {} versions", to_clear.len());
1033+
info!(%actor_id, "Clearing {} versions", to_clear.len());
10341034
let booked = {
10351035
bookie
10361036
.write(format!("to_clear_get_booked:{}", actor_id.as_simple()))
@@ -1063,7 +1063,7 @@ async fn clear_overwritten_versions(agent: Agent) {
10631063
}
10641064

10651065
info!(
1066-
"compaction done, cleared {} db bookkeeping table rows in {:?}",
1066+
"Compaction done, cleared {} DB bookkeeping table rows in {:?}",
10671067
deleted - inserted,
10681068
start.elapsed()
10691069
);
@@ -1226,7 +1226,7 @@ fn find_cleared_db_versions(tx: &Transaction) -> rusqlite::Result<BTreeSet<i64>>
12261226
.collect::<rusqlite::Result<_>>()?;
12271227

12281228
info!(
1229-
"aggregated {} db versions to clear in {:?}",
1229+
"Aggregated {} DB versions to clear in {:?}",
12301230
cleared_db_versions.len(),
12311231
start.elapsed()
12321232
);
@@ -1629,7 +1629,7 @@ async fn process_fully_buffered_changes(
16291629

16301630
let tx = conn.transaction()?;
16311631

1632-
info!(%actor_id, version, "processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})");
1632+
info!(%actor_id, version, "Processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})");
16331633

16341634
let max_db_version: Option<i64> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?")?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional()?;
16351635

@@ -1649,9 +1649,9 @@ async fn process_fully_buffered_changes(
16491649
"#,
16501650
)?
16511651
.execute(params![max_db_version, actor_id.as_bytes(), version])?;
1652-
info!(%actor_id, version, "inserted {count} rows from buffered into crsql_changes in {:?}", start.elapsed());
1652+
info!(%actor_id, version, "Inserted {count} rows from buffered into crsql_changes in {:?}", start.elapsed());
16531653
} else {
1654-
info!(%actor_id, version, "no buffered rows, skipped insertion into crsql_changes");
1654+
info!(%actor_id, version, "No buffered rows, skipped insertion into crsql_changes");
16551655
}
16561656

16571657
clear_buffered_meta(&tx, actor_id, version..=version)?;
@@ -1660,7 +1660,7 @@ async fn process_fully_buffered_changes(
16601660
.prepare_cached("SELECT crsql_rows_impacted()")?
16611661
.query_row((), |row| row.get(0))?;
16621662

1663-
info!(%actor_id, version, "rows impacted by buffered changes insertion: {rows_impacted}");
1663+
debug!(%actor_id, version, "rows impacted by buffered changes insertion: {rows_impacted}");
16641664

16651665
let known_version = if rows_impacted > 0 {
16661666
let db_version: i64 =
@@ -1687,7 +1687,7 @@ async fn process_fully_buffered_changes(
16871687
":ts": ts
16881688
})?;
16891689

1690-
info!(%actor_id, version, "inserted bookkeeping row after buffered insert");
1690+
debug!(%actor_id, version, "inserted bookkeeping row after buffered insert");
16911691

16921692
Some(KnownDbVersion::Current {
16931693
db_version,
@@ -1697,7 +1697,7 @@ async fn process_fully_buffered_changes(
16971697
} else {
16981698
store_empty_changeset(&tx, actor_id, version..=version)?;
16991699

1700-
info!(%actor_id, version, "inserted CLEARED bookkeeping row after buffered insert");
1700+
debug!(%actor_id, version, "inserted CLEARED bookkeeping row after buffered insert");
17011701
Some(KnownDbVersion::Cleared)
17021702
};
17031703

@@ -1735,7 +1735,7 @@ pub async fn process_multiple_changes(
17351735
agent: &Agent,
17361736
changes: Vec<(ChangeV1, ChangeSource)>,
17371737
) -> Result<(), ChangeError> {
1738-
info!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _)| cmp::max(change.len(), 1)).sum::<usize>());
1738+
debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _)| cmp::max(change.len(), 1)).sum::<usize>());
17391739

17401740
let bookie = agent.bookie();
17411741

@@ -1911,7 +1911,7 @@ pub async fn process_multiple_changes(
19111911

19121912
tx.commit()?;
19131913

1914-
info!("committed {count} changes in {:?}", start.elapsed());
1914+
debug!("committed {count} changes in {:?}", start.elapsed());
19151915

19161916
for (actor_id, knowns) in knowns {
19171917
let booked = {
@@ -1934,7 +1934,7 @@ pub async fn process_multiple_changes(
19341934
let version = *versions.start();
19351935
if gaps_count == 0 {
19361936
// if we have no gaps, then we can schedule applying all these changes.
1937-
info!(%actor_id, version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}");
1937+
debug!(%actor_id, version, "we now have all versions, notifying for background jobber to insert buffered changes! seqs: {seqs:?}, expected full seqs: {full_seqs_range:?}");
19381938
let tx_apply = agent.tx_apply().clone();
19391939
tokio::spawn(async move {
19401940
if let Err(e) = tx_apply.send((actor_id, version)).await {
@@ -2082,7 +2082,7 @@ fn process_complete_version(
20822082

20832083
let max_db_version = changes.iter().map(|c| c.db_version).max().unwrap_or(0);
20842084

2085-
info!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}, changes len: {len}, max db version: {max_db_version}");
2085+
debug!(%actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}, changes len: {len}, max db version: {max_db_version}");
20862086

20872087
debug_assert!(len <= (seqs.end() - seqs.start() + 1) as usize);
20882088

@@ -2258,8 +2258,6 @@ pub enum SyncClientError {
22582258
Io(#[from] std::io::Error),
22592259
#[error(transparent)]
22602260
Pool(#[from] SqlitePoolError),
2261-
#[error("no good candidates found")]
2262-
NoGoodCandidate,
22632261
#[error("could not decode message: {0}")]
22642262
Decoded(#[from] SyncMessageDecodeError),
22652263
#[error("could not encode message: {0}")]
@@ -2321,7 +2319,7 @@ async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncCli
23212319
};
23222320

23232321
if candidates.is_empty() {
2324-
return Err(SyncClientError::NoGoodCandidate);
2322+
return Ok(());
23252323
}
23262324

23272325
debug!("found {} candidates to synchronize with", candidates.len());
@@ -2351,7 +2349,7 @@ async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncCli
23512349
};
23522350

23532351
if chosen.is_empty() {
2354-
return Err(SyncClientError::NoGoodCandidate);
2352+
return Ok(());
23552353
}
23562354

23572355
let start = Instant::now();
@@ -2418,7 +2416,7 @@ async fn handle_changes(
24182416
count = 0;
24192417
}
24202418

2421-
info!("draining changes receiver...");
2419+
info!("Draining changes receiver...");
24222420

24232421
// drain!
24242422
while let Ok((change, src)) = rx_changes.try_recv() {
@@ -2487,7 +2485,7 @@ async fn write_empties_loop(
24872485
}
24882486

24892487
if !empties.is_empty() {
2490-
info!("inserting last unprocessed empties before shut down");
2488+
info!("Inserting last unprocessed empties before shut down");
24912489
if let Err(e) = process_completed_empties(&agent, &mut empties).await {
24922490
error!("could not process empties: {e}");
24932491
}
@@ -2552,9 +2550,7 @@ async fn sync_loop(
25522550
}
25532551
tripwire::Outcome::Completed(res) => {
25542552
if let Err(e) = res {
2555-
if !matches!(e, SyncClientError::NoGoodCandidate) {
2556-
error!("could not sync: {e}");
2557-
}
2553+
error!("could not sync: {e}");
25582554
// keep syncing until we successfully sync
25592555
continue;
25602556
}
@@ -2565,13 +2561,13 @@ async fn sync_loop(
25652561
.reset(tokio::time::Instant::now() + sync_backoff.next().unwrap());
25662562
}
25672563
Branch::BackgroundApply { actor_id, version } => {
2568-
info!(%actor_id, version, "picked up background apply of buffered changes");
2564+
debug!(%actor_id, version, "picked up background apply of buffered changes");
25692565
match process_fully_buffered_changes(&agent, actor_id, version).await {
25702566
Ok(false) => {
25712567
warn!(%actor_id, version, "did not apply buffered changes");
25722568
}
25732569
Ok(true) => {
2574-
info!(%actor_id, version, "succesfully applied buffered changes");
2570+
debug!(%actor_id, version, "succesfully applied buffered changes");
25752571
}
25762572
Err(e) => {
25772573
error!(%actor_id, version, "could not apply fully buffered changes: {e}");
@@ -2587,7 +2583,7 @@ async fn process_completed_empties(
25872583
agent: &Agent,
25882584
empties: &mut BTreeMap<ActorId, RangeInclusiveSet<i64>>,
25892585
) -> eyre::Result<()> {
2590-
info!(
2586+
debug!(
25912587
"processing empty versions (count: {})",
25922588
empties.values().map(RangeInclusiveSet::len).sum::<usize>()
25932589
);
@@ -2614,7 +2610,7 @@ async fn process_completed_empties(
26142610
}
26152611
}
26162612

2617-
info!(
2613+
debug!(
26182614
"upserted {inserted} empty version ranges in {:?}",
26192615
start.elapsed()
26202616
);
@@ -3125,11 +3121,11 @@ pub mod tests {
31253121

31263122
let actual_count: i64 =
31273123
conn.query_row("SELECT count(*) FROM crsql_changes;", (), |row| row.get(0))?;
3128-
info!("actual count: {actual_count}");
3124+
debug!("actual count: {actual_count}");
31293125

31303126
let bookie = ta.agent.bookie();
31313127

3132-
info!(
3128+
debug!(
31333129
"last version: {:?}",
31343130
bookie
31353131
.write("test")
@@ -3144,7 +3140,6 @@ pub mod tests {
31443140
let needed = sync.need_len();
31453141

31463142
debug!("generated sync: {sync:?}");
3147-
info!("needed: {needed}");
31483143

31493144
v.push((counts.values().sum::<i64>(), needed));
31503145
}

crates/corro-agent/src/api/peer.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ fn handle_known_version(
451451
Some(known) => match known {
452452
KnownVersion::Partial(PartialVersion { seqs, .. }) => {
453453
if seqs != &partial_seqs {
454-
info!(%actor_id, version, "different partial sequences, updating! range_needed: {range_needed:?}");
454+
warn!(%actor_id, version, "different partial sequences, updating! range_needed: {range_needed:?}");
455455
partial_seqs = seqs.clone();
456456
if let Some(new_start_seq) = last_sent_seq.take() {
457457
range_needed =
@@ -474,7 +474,7 @@ fn handle_known_version(
474474
};
475475

476476
if let Some(known) = maybe_current_version {
477-
info!(%actor_id, version, "switched from partial to current version");
477+
warn!(%actor_id, version, "switched from partial to current version");
478478

479479
// drop write lock
480480
drop(bw);
@@ -1053,7 +1053,7 @@ pub async fn parallel_sync(
10531053

10541054
trace!(%actor_id, "needs: {needs:?}");
10551055

1056-
info!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need {
1056+
debug!(%actor_id, %addr, "needs len: {}", needs.values().map(|needs| needs.iter().map(|need| match need {
10571057
SyncNeedV1::Full {versions} => (versions.end() - versions.start()) as usize + 1,
10581058
SyncNeedV1::Partial {..} => 0,
10591059
}).sum::<usize>()).sum::<usize>());
@@ -1204,7 +1204,7 @@ pub async fn parallel_sync(
12041204
if let Err(e) = tx.finish().instrument(info_span!("quic_finish")).await {
12051205
warn!("could not finish stream while sending sync requests: {e}");
12061206
}
1207-
info!(%server_actor_id, %addr, "done trying to sync w/ actor after {:?}", start.elapsed());
1207+
debug!(%server_actor_id, %addr, "done trying to sync w/ actor after {:?}", start.elapsed());
12081208
continue;
12091209
}
12101210

@@ -1260,7 +1260,7 @@ pub async fn parallel_sync(
12601260
}
12611261
}
12621262

1263-
info!(%actor_id, %count, "done reading sync messages");
1263+
debug!(%actor_id, %count, "done reading sync messages");
12641264

12651265
Ok(count)
12661266
}.instrument(info_span!("read_sync_requests_responses", %actor_id))
@@ -1289,7 +1289,7 @@ pub async fn serve_sync(
12891289
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&trace_ctx));
12901290
tracing::Span::current().set_parent(context);
12911291

1292-
info!(actor_id = %their_actor_id, self_actor_id = %agent.actor_id(), "received sync request");
1292+
debug!(actor_id = %their_actor_id, self_actor_id = %agent.actor_id(), "received sync request");
12931293
let mut codec = LengthDelimitedCodec::new();
12941294
let mut send_buf = BytesMut::new();
12951295
let mut encode_buf = BytesMut::new();
@@ -1383,7 +1383,7 @@ pub async fn serve_sync(
13831383
stopped_res = write.stopped() => {
13841384
match stopped_res {
13851385
Ok(code) => {
1386-
info!(actor_id = %their_actor_id, "send stream was stopped by peer, code: {code}");
1386+
debug!(actor_id = %their_actor_id, "send stream was stopped by peer, code: {code}");
13871387
},
13881388
Err(e) => {
13891389
warn!(actor_id = %their_actor_id, "error waiting for stop from stream: {e}");

crates/corro-agent/src/api/public/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ async fn execute_schema(agent: &Agent, statements: Vec<String>) -> eyre::Result<
654654
tx.execute("DELETE FROM __corro_schema WHERE tbl_name = ?", [tbl_name])?;
655655

656656
let n = tx.execute("INSERT INTO __corro_schema SELECT tbl_name, type, name, sql, 'api' AS source FROM sqlite_schema WHERE tbl_name = ? AND type IN ('table', 'index') AND name IS NOT NULL AND sql IS NOT NULL", [tbl_name])?;
657-
info!("updated {n} rows in __corro_schema for table {tbl_name}");
657+
info!("Updated {n} rows in __corro_schema for table {tbl_name}");
658658
}
659659

660660
tx.commit()?;

crates/corro-agent/src/api/public/pubsub.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub async fn process_sub_channel(
137137
biased;
138138
Some(query_evt) = evt_rx.recv() => query_evt,
139139
_ = deadline_check => {
140-
info!("all subscribers for {id} are gone and didn't come back within {MAX_UNSUB_TIME:?}");
140+
info!("All subscribers for {id} are gone and didn't come back within {MAX_UNSUB_TIME:?}");
141141
break;
142142
},
143143
_ = subs_check.tick() => {

crates/corro-client/src/lib.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ impl CorrosionApiClient {
185185
pub async fn schema_from_paths<P: AsRef<Path>>(
186186
&self,
187187
schema_paths: &[P],
188-
) -> Result<ExecResponse, Error> {
188+
) -> Result<Option<ExecResponse>, Error> {
189189
let mut statements = vec![];
190190

191191
for schema_path in schema_paths.iter() {
@@ -261,7 +261,11 @@ impl CorrosionApiClient {
261261
}
262262
}
263263

264-
self.schema(&statements).await
264+
if statements.is_empty() {
265+
return Ok(None);
266+
}
267+
268+
Ok(Some(self.schema(&statements).await?))
265269
}
266270
}
267271

crates/corro-types/src/agent.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ impl SplitPool {
303303
wait_conn_drop(tx).await
304304
}
305305

306-
info!("write loop done, draining...");
306+
info!("Write loop done, draining...");
307307

308308
// keep processing priority messages
309309
// NOTE: using `recv` would wait indefinitely, this loop only waits until all

crates/corro-types/src/pubsub.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,7 @@ impl Matcher {
689689

690690
match res {
691691
Ok(deleted) => info!(
692-
"deleted {deleted} old changes row for subscription {}",
692+
"Deleted {deleted} old changes row for subscription {}",
693693
self.id.as_simple()
694694
),
695695
Err(e) => {

crates/corrosion/src/command/agent.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,12 @@ pub async fn run(config: Config, config_path: &Utf8PathBuf) -> eyre::Result<()>
4343
.schema_from_paths(config.db.schema_paths.as_slice())
4444
.await
4545
{
46-
Ok(res) => {
46+
Ok(Some(res)) => {
4747
info!("Applied schema in {}s", res.time);
4848
}
49+
Ok(None) => {
50+
info!("No schema files to apply, skipping.");
51+
}
4952
Err(e) => {
5053
error!("could not apply schema: {e}");
5154
}

crates/corrosion/src/main.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ async fn process_cli(cli: Cli) -> eyre::Result<()> {
202202
)?;
203203
}
204204

205-
info!("successfully cleaned for restoration and backed up database to {path}");
205+
info!("Successfully cleaned for restoration and backed up database to {path}");
206206
}
207207
Command::Restore {
208208
path,

0 commit comments

Comments
 (0)