Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/corro-admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@ tracing = { workspace = true }
tripwire = { path = "../tripwire" }
rangemap = { workspace = true }
uuid = { workspace = true }
bytes = { workspace = true }
156 changes: 153 additions & 3 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
use std::{
fmt::Display,
net::SocketAddr,
time::{Duration, Instant},
};

use bytes::BytesMut;
use camino::Utf8PathBuf;
use corro_agent::{
api::peer::{
encode_write_bipayload_msg,
follow::{read_follow_msg, FollowMessage, FollowMessageV1},
},
transport::Transport,
};
use corro_types::{
actor::{ActorId, ClusterId},
agent::{Agent, BookedVersions, Bookie, LockKind, LockMeta, LockState},
api::SqliteValueRef,
base::{CrsqlDbVersion, CrsqlSeq, Version},
broadcast::{FocaCmd, FocaInput, Timestamp},
broadcast::{BiPayload, Changeset, FocaCmd, FocaInput, Timestamp},
pubsub::unpack_columns,
sqlite::SqlitePoolError,
sync::generate_sync,
updates::Handle,
Expand All @@ -25,7 +36,7 @@ use tokio::{
task::block_in_place,
};
use tokio_serde::{formats::Json, Framed};
use tokio_util::codec::LengthDelimitedCodec;
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
use tracing::{debug, error, info, warn};
use tripwire::Tripwire;
use uuid::Uuid;
Expand All @@ -45,6 +56,7 @@ pub struct AdminConfig {
pub fn start_server(
agent: Agent,
bookie: Bookie,
transport: Transport,
config: AdminConfig,
mut tripwire: Tripwire,
) -> Result<(), AdminError> {
Expand Down Expand Up @@ -78,8 +90,9 @@ pub fn start_server(
let agent = agent.clone();
let bookie = bookie.clone();
let config = config.clone();
let transport = transport.clone();
async move {
if let Err(e) = handle_conn(agent, &bookie, config, stream).await {
if let Err(e) = handle_conn(agent, &bookie, &transport, config, stream).await {
error!("could not handle admin connection: {e}");
}
}
Expand All @@ -99,6 +112,16 @@ pub enum Command {
Cluster(ClusterCommand),
Actor(ActorCommand),
Subs(SubsCommand),
Debug(DebugCommand),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum DebugCommand {
Follow {
peer_addr: SocketAddr,
from: Option<u64>,
local_only: bool,
},
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -291,6 +314,7 @@ async fn collapse_gaps(
async fn handle_conn(
agent: Agent,
bookie: &Bookie,
transport: &Transport,
_config: AdminConfig,
stream: UnixStream,
) -> Result<(), AdminError> {
Expand Down Expand Up @@ -598,6 +622,132 @@ async fn handle_conn(
}
};
}
Command::Debug(DebugCommand::Follow {
peer_addr,
from,
local_only,
}) => match transport.open_bi(peer_addr).await {
Ok((mut tx, recv)) => {
let mut codec = LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec();
let mut encoding_buf = BytesMut::new();
let mut buf = BytesMut::new();

if let Err(e) = encode_write_bipayload_msg(
&mut codec,
&mut encoding_buf,
&mut buf,
BiPayload::V1 {
data: corro_types::broadcast::BiPayloadV1::Follow {
from: from.map(CrsqlDbVersion),
local_only,
},
cluster_id: agent.cluster_id(),
},
&mut tx,
)
.await
{
send_error(
&mut stream,
format!("could not send follow payload to {peer_addr}: {e}"),
)
.await;
continue;
}

let mut framed = FramedRead::new(
recv,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
);

'msg: loop {
match read_follow_msg(&mut framed).await {
Ok(None) => {
send_success(&mut stream).await;
break;
}
Err(e) => {
send_error(
&mut stream,
format!("error receiving follow message: {e}"),
)
.await;
break;
}
Ok(Some(msg)) => {
match msg {
FollowMessage::V1(FollowMessageV1::Change(change)) => {
let actor_id = change.actor_id;
match change.changeset {
Changeset::Full {
version,
changes,
ts,
..
} => {
if let Err(e) = stream
.send(Response::Json(serde_json::json!({
"actor_id": actor_id,
"type": "full",
"version": version,
"ts": ts.to_string(),
})))
.await
{
warn!("could not send to steam, breaking ({e})");
break;
}

for change in changes {
if let Err(e) = stream.send(
Response::Json(
serde_json::json!({
"table": change.table,
"pk": unpack_columns(&change.pk).unwrap().iter().map(SqliteValueRef::to_owned).collect::<Vec<_>>(),
"cid": change.cid,
"val": change.val,
"col_version": change.col_version,
"db_version": change.db_version,
"seq": change.seq,
"site_id": ActorId::from_bytes(change.site_id),
"cl": change.cl,
}),
),
)
.await {
warn!("could not send to steam, breaking ({e})");
break 'msg;
}
}
}
changeset => {
send_log(
&mut stream,
LogLevel::Warn,
format!("unknown change type received: {changeset:?}"),
)
.await;
}
}
}
}
}
}
}
}
Err(e) => {
send_error(
&mut stream,
format!("could not open bi-directional stream with {peer_addr}: {e}"),
)
.await;
continue;
}
},
},
Ok(None) => {
debug!("done with admin conn");
Expand Down
61 changes: 37 additions & 24 deletions crates/corro-agent/src/agent/bi.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::api::peer::serve_sync;
use crate::api::peer::{follow::serve_follow, serve_sync};
use corro_types::{
agent::{Agent, Bookie},
broadcast::{BiPayload, BiPayloadV1},
Expand Down Expand Up @@ -56,7 +56,12 @@ pub fn spawn_bipayload_handler(
let agent = agent.clone();
let bookie = bookie.clone();
async move {
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::builder().max_frame_length(100 * 1_024 * 1_024).new_codec());
let mut framed = FramedRead::new(
rx,
LengthDelimitedCodec::builder()
.max_frame_length(100 * 1_024 * 1_024)
.new_codec(),
);

loop {
match timeout(Duration::from_secs(5), StreamExt::next(&mut framed)).await {
Expand All @@ -72,30 +77,38 @@ pub fn spawn_bipayload_handler(
match BiPayload::read_from_buffer(&b) {
Ok(payload) => {
match payload {
BiPayload::V1 {
data:
BiPayloadV1::SyncStart {
actor_id,
trace_ctx,
},
cluster_id,
} => {
trace!(
"framed read buffer len: {}",
framed.read_buffer().len()
);
BiPayload::V1 { data, cluster_id } => match data {
BiPayloadV1::SyncStart {
actor_id,
trace_ctx,
} => {
trace!(
"framed read buffer len: {}",
framed.read_buffer().len()
);

// println!("got sync state: {state:?}");
if let Err(e) = serve_sync(
&agent, &bookie, actor_id, trace_ctx,
cluster_id, framed, tx,
)
.await
{
warn!("could not complete receiving sync: {e}");
// println!("got sync state: {state:?}");
if let Err(e) = serve_sync(
&agent, &bookie, actor_id, trace_ctx,
cluster_id, framed, tx,
)
.await
{
warn!("could not complete receiving sync: {e}");
}
break;
}
break;
}
BiPayloadV1::Follow { from, local_only } => {
if let Err(e) = serve_follow(
&agent, from, local_only, tx,
)
.await
{
warn!("could not complete follow: {e}");
}
break;
}
},
}
}

Expand Down
9 changes: 7 additions & 2 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
metrics, setup, util, AgentOptions,
},
broadcast::runtime_loop,
transport::Transport,
};
use corro_types::{
actor::ActorId,
Expand All @@ -26,12 +27,16 @@ use tripwire::Tripwire;
///
/// First initialise `AgentOptions` state via `setup()`, then spawn a
/// new task that runs the main agent state machine
pub async fn start_with_config(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Bookie)> {
pub async fn start_with_config(
conf: Config,
tripwire: Tripwire,
) -> eyre::Result<(Agent, Bookie, Transport)> {
let (agent, opts) = setup(conf.clone(), tripwire.clone()).await?;
let transport = opts.transport.clone();

let bookie = run(agent.clone(), opts, conf.perf).await?;

Ok((agent, bookie))
Ok((agent, bookie, transport))
}

async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Result<Bookie> {
Expand Down
Loading
Loading