Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use corro_types::{
actor::{Actor, ActorId},
agent::{
find_overwritten_versions, Agent, Bookie, ChangeError, CurrentVersion, KnownDbVersion,
PartialVersion, PoolError,
PartialVersion,
},
api::TableName,
base::{CrsqlDbVersion, CrsqlSeq, Version},
Expand Down Expand Up @@ -55,7 +55,7 @@ use metrics::{counter, histogram};
use rangemap::{RangeInclusiveMap, RangeInclusiveSet};
use rusqlite::{named_params, params, Connection, OptionalExtension, Savepoint, Transaction};
use spawn::spawn_counted;
use tokio::{net::TcpListener, task::block_in_place, time::timeout};
use tokio::{net::TcpListener, task::block_in_place};
use tower::{limit::ConcurrencyLimitLayer, load_shed::LoadShedLayer};
use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, trace, warn};
Expand Down Expand Up @@ -800,9 +800,7 @@ pub async fn process_multiple_changes(
warn!("process_multiple_changes: removing duplicates took too long - {elapsed:?}");
}

let mut conn = timeout(Duration::from_secs(5 * 60), agent.pool().write_normal())
.await
.map_err(PoolError::from)??;
let mut conn = agent.pool().write_normal().await?;

let changesets = block_in_place(|| {
let start = Instant::now();
Expand Down Expand Up @@ -1121,7 +1119,7 @@ pub async fn process_multiple_changes(
match_changes(agent.updates_manager(), changeset.changes(), db_version);
}

histogram!("corro.agent.changes.processing.time.seconds").record(start.elapsed());
histogram!("corro.agent.changes.processing.time.seconds", "source" => "remote").record(start.elapsed());
histogram!("corro.agent.changes.processing.chunk_size").record(change_chunk_size as f64);

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use corro_types::{
sqlite::SqlitePoolError,
};
use hyper::StatusCode;
use metrics::histogram;
use rusqlite::{params_from_iter, ToSql, Transaction};
use spawn::spawn_counted;
use tokio::{
Expand Down Expand Up @@ -76,6 +77,7 @@ where
})?;

let elapsed = start.elapsed();
histogram!("corro.agent.changes.processing.time.seconds", "source" => "local").record(start.elapsed());

match insert_info {
None => Ok((ret, None, elapsed)),
Expand Down
56 changes: 44 additions & 12 deletions crates/corro-types/src/agent.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{
cmp,
collections::{btree_map, BTreeMap, HashMap, HashSet},
fmt, io,
fmt,
future::Future,
io,
net::SocketAddr,
ops::{Deref, DerefMut, RangeInclusive},
path::{Path, PathBuf},
Expand All @@ -21,15 +23,18 @@ use parking_lot::RwLock;
use rangemap::RangeInclusiveSet;
use rusqlite::{named_params, Connection, OptionalExtension, Transaction};
use serde::{Deserialize, Serialize};
use tokio::{sync::{
AcquireError, OwnedRwLockWriteGuard as OwnedTokioRwLockWriteGuard, OwnedSemaphorePermit,
RwLock as TokioRwLock, RwLockReadGuard as TokioRwLockReadGuard,
RwLockWriteGuard as TokioRwLockWriteGuard,
}, time::error::Elapsed};
use tokio::{
runtime::Handle,
sync::{oneshot, Semaphore},
};
use tokio::{
sync::{
AcquireError, OwnedRwLockWriteGuard as OwnedTokioRwLockWriteGuard, OwnedSemaphorePermit,
RwLock as TokioRwLock, RwLockReadGuard as TokioRwLockReadGuard,
RwLockWriteGuard as TokioRwLockWriteGuard,
},
time::timeout,
};
use tokio_util::sync::{CancellationToken, DropGuard};
use tracing::{debug, error, trace, warn};
use tripwire::Tripwire;
Expand Down Expand Up @@ -532,8 +537,8 @@ pub enum PoolError {
CallbackClosed,
#[error("could not acquire write permit")]
Permit(#[from] AcquireError),
#[error("timed out waiting to acquire write connection")]
TimedOut(#[from] Elapsed),
#[error("timed out acquiring write permit while {op:?}")]
TimedOut { op: String },
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -671,6 +676,9 @@ impl SplitPool {
gauge!("corro.sqlite.pool.write.connections").set(write_state.size as f64);
gauge!("corro.sqlite.pool.write.connections.available").set(write_state.available as f64);
gauge!("corro.sqlite.pool.write.connections.waiting").set(write_state.waiting as f64);

let available_permit = self.0.write_sema.available_permits();
gauge!("corro.sqlite.write.permits.available").set(available_permit as f64);
}

// get a read-only connection
Expand Down Expand Up @@ -721,15 +729,30 @@ impl SplitPool {
queue: &'static str,
) -> Result<WriteConn, PoolError> {
let (tx, rx) = oneshot::channel();
chan.send(tx).await.map_err(|_| PoolError::QueueClosed)?;
let max_timeout = Duration::from_secs(5 * 60);

timeout_fut("tx to oneshot channel", max_timeout, chan.send(tx))
.await?
.map_err(|_| PoolError::QueueClosed)?;

let start = Instant::now();
let token = rx.await.map_err(|_| PoolError::CallbackClosed)?;

let token = timeout_fut("rx from oneshot channel", max_timeout, rx)
.await?
.map_err(|_| PoolError::CallbackClosed)?;

histogram!("corro.sqlite.pool.queue.seconds", "queue" => queue)
.record(start.elapsed().as_secs_f64());
let conn = self.0.write.get().await?;
let conn = timeout_fut("acquiring write conn", max_timeout, self.0.write.get()).await??;

let start = Instant::now();
let _permit = self.0.write_sema.clone().acquire_owned().await?;
let _permit = timeout_fut(
"acquiring write semaphore",
max_timeout,
self.0.write_sema.clone().acquire_owned(),
)
.await??;

histogram!("corro.sqlite.write_permit.acquisition.seconds")
.record(start.elapsed().as_secs_f64());

Expand All @@ -752,6 +775,15 @@ async fn wait_conn_drop(tx: oneshot::Sender<CancellationToken>) {
cancel.cancelled().await
}

async fn timeout_fut<T, F>(op: &'static str, duration: Duration, fut: F) -> Result<T, PoolError>
where
F: Future<Output = T>,
{
timeout(duration, fut)
.await
.map_err(|_| PoolError::TimedOut { op: op.to_string() })
}

pub struct WriteConn {
conn: sqlite_pool::Connection<CrConn>,
_drop_guard: DropGuard,
Expand Down
Loading