Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/corro-agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ serde_json = { workspace = true }
spawn = { path = "../spawn" }
speedy = { workspace = true }
sqlite3-parser = { workspace = true }
sqlite-pool = { path = "../sqlite-pool" }
strum = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
Expand Down
5 changes: 3 additions & 2 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,7 @@ pub async fn handle_changes(
) {
let max_changes_chunk: usize = agent.config().perf.apply_queue_len;
let max_queue_len: usize = agent.config().perf.processing_queue_len;
let tx_timeout: Duration = Duration::from_secs(agent.config().perf.sql_tx_timeout as u64);
let mut queue: VecDeque<(ChangeV1, ChangeSource, Instant)> = VecDeque::new();
let mut buf = vec![];
let mut buf_cost = 0;
Expand Down Expand Up @@ -781,7 +782,7 @@ pub async fn handle_changes(
let changes = std::mem::take(&mut buf);
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone()));
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone(), tx_timeout));
counter!("corro.agent.changes.batch.spawned").increment(1);

buf_cost -= tmp_cost;
Expand Down Expand Up @@ -818,7 +819,7 @@ pub async fn handle_changes(
let changes: Vec<_> = queue.drain(..).collect();
let agent = agent.clone();
let bookie = bookie.clone();
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone()));
join_set.spawn(process_multiple_changes(agent, bookie, changes.clone(), tx_timeout));
counter!("corro.agent.changes.batch.spawned").increment(1);
buf_cost = 0;
}
Expand Down
32 changes: 22 additions & 10 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::{
agent::process_multiple_changes,
api::{
peer::parallel_sync,
public::{api_v1_db_schema, api_v1_transactions},
public::{api_v1_db_schema, api_v1_transactions, TransactionParams},
},
transport::Transport,
};
Expand Down Expand Up @@ -791,6 +791,8 @@ async fn test_clear_empty_versions() -> eyre::Result<()> {
let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let ta2 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;

let tx_timeout = Duration::from_secs(60);

let (rtt_tx, _rtt_rx) = mpsc::channel(1024);
let ta2_transport = Transport::new(&ta2.agent.config().gossip, rtt_tx.clone()).await?;
// setup the schema, for both nodes
Expand All @@ -813,7 +815,7 @@ async fn test_clear_empty_versions() -> eyre::Result<()> {
insert_rows(ta1.agent.clone(), 1, 50).await;
// send them all
let rows = get_rows(ta1.agent.clone(), vec![(Version(1)..=Version(50), None)]).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;

// overwrite different version ranges
insert_rows(ta1.agent.clone(), 1, 5).await;
Expand All @@ -831,7 +833,7 @@ async fn test_clear_empty_versions() -> eyre::Result<()> {
],
)
.await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;
check_bookie_versions(
ta2.clone(),
ta1.agent.actor_id(),
Expand Down Expand Up @@ -936,6 +938,9 @@ async fn process_failed_changes() -> eyre::Result<()> {
for i in 1..=5_i64 {
let (status_code, _) = api_v1_transactions(
Extension(ta2.agent.clone()),
axum::extract::Query(TransactionParams {
timeout: None,
}),
axum::Json(vec![Statement::WithParams(
"INSERT OR REPLACE INTO tests (id,text) VALUES (?,?)".into(),
vec![i.into(), "service-text".into()],
Expand Down Expand Up @@ -988,7 +993,7 @@ async fn process_failed_changes() -> eyre::Result<()> {

rows.append(&mut good_changes);

let res = process_multiple_changes(ta1.agent.clone(), ta1.bookie.clone(), rows).await;
let res = process_multiple_changes(ta1.agent.clone(), ta1.bookie.clone(), rows, Duration::from_secs(60)).await;

assert!(res.is_ok());

Expand Down Expand Up @@ -1033,6 +1038,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {
let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();
let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let ta2 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let tx_timeout = Duration::from_secs(60);

// setup the schema, for both nodes
let (status_code, _body) = api_v1_db_schema(
Expand All @@ -1055,7 +1061,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {

// sent 1-5
let rows = get_rows(ta1.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;
// check ta2 bookie
check_bookie_versions(
ta2.clone(),
Expand All @@ -1069,7 +1075,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {

// sent: 1-5, 9-10
let rows = get_rows(ta1.agent.clone(), vec![(Version(9)..=Version(10), None)]).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;
// check for gap 6-8
check_bookie_versions(
ta2.clone(),
Expand All @@ -1092,7 +1098,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {
],
)
.await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;
// check for gap 11-14 and 17-19
check_bookie_versions(
ta2.clone(),
Expand All @@ -1116,7 +1122,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {
],
)
.await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;

check_bookie_versions(
ta2.clone(),
Expand All @@ -1139,7 +1145,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {
],
)
.await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;
check_bookie_versions(
ta2.clone(),
ta1.agent.actor_id(),
Expand All @@ -1164,7 +1170,7 @@ async fn test_process_multiple_changes() -> eyre::Result<()> {
],
)
.await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows).await?;
process_multiple_changes(ta2.agent.clone(), ta2.bookie.clone(), rows, tx_timeout).await?;
check_bookie_versions(
ta2.clone(),
ta1.agent.actor_id(),
Expand Down Expand Up @@ -1333,6 +1339,7 @@ async fn insert_rows(agent: Agent, start: i64, n: i64) {
for i in start..=n {
let (status_code, _) = api_v1_transactions(
Extension(agent.clone()),
axum::extract::Query(TransactionParams{timeout: None}),
axum::Json(vec![Statement::WithParams(
"INSERT OR REPLACE INTO tests3 (id,text,text2, num, num2) VALUES (?,?,?,?,?)"
.into(),
Expand Down Expand Up @@ -2176,6 +2183,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();
let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let ta2 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let tx_timeout = Duration::from_secs(60);

// setup the schema, for both nodes
let (status_code, _body) = api_v1_db_schema(
Expand All @@ -2196,6 +2204,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

let (status_code, body) = api_v1_transactions(
Extension(ta1.agent.clone()),
axum::extract::Query(TransactionParams{timeout: None}),
axum::Json(vec![Statement::WithParams(
"insert into tests (id, text) values (?,?)".into(),
vec!["service-id".into(), "service-name".into()],
Expand Down Expand Up @@ -2261,11 +2270,13 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
ChangeSource::Broadcast,
Instant::now(),
)],
tx_timeout,
)
.await?;

let (status_code, body) = api_v1_transactions(
Extension(ta1.agent.clone()),
axum::extract::Query(TransactionParams{timeout: None}),
axum::Json(vec![Statement::WithParams(
"insert or replace into tests (id, text) values (?,?)".into(),
vec!["service-id".into(), "service-name-overwrite".into()],
Expand Down Expand Up @@ -2328,6 +2339,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
ChangeSource::Broadcast,
Instant::now(),
)],
tx_timeout,
)
.await?;

Expand Down
37 changes: 25 additions & 12 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,19 @@ use corro_types::{
pubsub::SubsManager,
updates::{match_changes, match_changes_from_db_version},
};

use std::{
cmp,
collections::{BTreeMap, HashSet},
convert::Infallible,
net::SocketAddr,
ops::RangeInclusive,
ops::{Deref, RangeInclusive},
sync::{atomic::AtomicI64, Arc},
time::{Duration, Instant},
};

use crate::api::public::update::api_v1_updates;
use sqlite_pool::{Committable, InterruptibleTransaction};
use axum::{
error_handling::HandleErrorLayer,
extract::DefaultBodyLimit,
Expand All @@ -53,7 +55,7 @@ use futures::FutureExt;
use hyper::{server::conn::AddrIncoming, StatusCode};
use metrics::{counter, histogram};
use rangemap::{RangeInclusiveMap, RangeInclusiveSet};
use rusqlite::{named_params, params, Connection, OptionalExtension, Savepoint, Transaction};
use rusqlite::{named_params, params, Connection, OptionalExtension};
use spawn::spawn_counted;
use tokio::{net::TcpListener, task::block_in_place};
use tower::{limit::ConcurrencyLimitLayer, load_shed::LoadShedLayer};
Expand Down Expand Up @@ -398,11 +400,12 @@ pub async fn apply_fully_buffered_changes_loop(
) {
info!("Starting apply_fully_buffered_changes loop");

let tx_timeout: Duration = Duration::from_secs(agent.config().perf.sql_tx_timeout as u64);
while let Outcome::Completed(Some((actor_id, version))) =
rx_apply.recv().preemptible(&mut tripwire).await
{
debug!(%actor_id, %version, "picked up background apply of buffered changes");
match process_fully_buffered_changes(&agent, &bookie, actor_id, version).await {
match process_fully_buffered_changes(&agent, &bookie, actor_id, version, tx_timeout).await {
Ok(false) => {
warn!(%actor_id, %version, "did not apply buffered changes");
}
Expand All @@ -423,6 +426,8 @@ pub async fn clear_buffered_meta_loop(
agent: Agent,
mut rx_partials: CorroReceiver<(ActorId, RangeInclusive<Version>)>,
) {
let tx_timeout: Duration = Duration::from_secs(agent.config().perf.sql_tx_timeout as u64);

while let Some((actor_id, versions)) = rx_partials.recv().await {
let pool = agent.pool().clone();
let self_actor_id = agent.actor_id();
Expand All @@ -432,7 +437,7 @@ pub async fn clear_buffered_meta_loop(
let mut conn = pool.write_low().await?;

block_in_place(|| {
let tx = conn.immediate_transaction()?;
let tx = InterruptibleTransaction::new(conn.immediate_transaction()?, Some(tx_timeout), "clear_buffered_meta");

// TODO: delete buffered changes from deleted sequences only (maybe, it's kind of hard and may not be necessary)

Expand Down Expand Up @@ -475,9 +480,9 @@ pub async fn clear_buffered_meta_loop(
}

#[tracing::instrument(skip_all, err)]
pub fn process_single_version(
pub fn process_single_version<T: Deref<Target = rusqlite::Connection> + Committable>(
agent: &Agent,
tx: &mut Transaction,
tx: &mut InterruptibleTransaction<T>,
last_db_version: Option<CrsqlDbVersion>,
change: ChangeV1,
) -> rusqlite::Result<(KnownDbVersion, Changeset)> {
Expand Down Expand Up @@ -535,9 +540,11 @@ pub async fn process_fully_buffered_changes(
bookie: &Bookie,
actor_id: ActorId,
version: Version,
tx_timeout: Duration,
) -> Result<bool, ChangeError> {
let db_version = {
let mut conn = agent.pool().write_normal().await?;

debug!(%actor_id, %version, "acquired write (normal) connection to process fully buffered changes");

let booked = {
Expand Down Expand Up @@ -573,14 +580,16 @@ pub async fn process_fully_buffered_changes(
}
};

let tx = conn
let base_tx = conn
.immediate_transaction()
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: Some(actor_id),
version: Some(version),
})?;

let tx = InterruptibleTransaction::new(base_tx, Some(tx_timeout), "process_buffered_changes");

info!(%actor_id, %version, "Processing buffered changes to crsql_changes (actor: {actor_id}, version: {version}, last_seq: {last_seq})");

let max_db_version: Option<Option<CrsqlDbVersion>> = tx.prepare_cached("SELECT MAX(db_version) FROM __corro_buffered_changes WHERE site_id = ? AND version = ?").map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?.query_row(params![actor_id.as_bytes(), version], |row| row.get(0)).optional().map_err(|source| ChangeError::Rusqlite{source, actor_id: Some(actor_id), version: Some(version)})?;
Expand Down Expand Up @@ -753,12 +762,14 @@ pub async fn process_multiple_changes(
agent: Agent,
bookie: Bookie,
changes: Vec<(ChangeV1, ChangeSource, Instant)>,
tx_timeout: Duration,
) -> Result<(), ChangeError> {
let start = Instant::now();
counter!("corro.agent.changes.processing.started").increment(changes.len() as u64);
debug!(self_actor_id = %agent.actor_id(), "processing multiple changes, len: {}", changes.iter().map(|(change, _, _)| cmp::max(change.len(), 1)).sum::<usize>());

const PROCESSING_WARN_THRESHOLD: Duration = Duration::from_secs(5);

let mut seen = HashSet::new();
let mut unknown_changes: BTreeMap<_, Vec<_>> = BTreeMap::new();
for (change, src, queued_at) in changes {
Expand Down Expand Up @@ -804,14 +815,15 @@ pub async fn process_multiple_changes(

let changesets = block_in_place(|| {
let start = Instant::now();
let mut tx = conn
let tx = conn
.immediate_transaction()
.map_err(|source| ChangeError::Rusqlite {
source,
actor_id: None,
version: None,
})?;

let mut tx = InterruptibleTransaction::new(tx, Some(tx_timeout), "process_multiple_changes");
let mut knowns: BTreeMap<ActorId, Vec<_>> = BTreeMap::new();
let mut changesets = vec![];

Expand Down Expand Up @@ -1039,6 +1051,7 @@ pub async fn process_multiple_changes(
actor_id: None,
version: None,
})?;

let elapsed = sub_start.elapsed();
if elapsed >= PROCESSING_WARN_THRESHOLD {
warn!("process_multiple_changes: commiting transaction took too long - {elapsed:?}");
Expand Down Expand Up @@ -1126,8 +1139,8 @@ pub async fn process_multiple_changes(
}

#[tracing::instrument(skip(sp, parts), err)]
pub fn process_incomplete_version(
sp: &Savepoint,
pub fn process_incomplete_version<T: Deref<Target = rusqlite::Connection> + Committable>(
sp: &InterruptibleTransaction<T>,
actor_id: ActorId,
parts: &ChangesetParts,
) -> rusqlite::Result<KnownDbVersion> {
Expand Down Expand Up @@ -1254,9 +1267,9 @@ pub fn process_incomplete_version(
}

#[tracing::instrument(skip(agent, sp, last_db_version, parts), err)]
pub fn process_complete_version(
pub fn process_complete_version<T: Deref<Target = rusqlite::Connection> + Committable>(
agent: Agent,
sp: &Savepoint,
sp: &InterruptibleTransaction<T>,
actor_id: ActorId,
last_db_version: Option<CrsqlDbVersion>,
versions: RangeInclusive<Version>,
Expand Down
Loading
Loading