Skip to content

Commit 5a723cd

Browse files
committed
merged follow
2 parents c4e082d + ee0e1a9 commit 5a723cd

File tree

24 files changed

+1970
-307
lines changed

24 files changed

+1970
-307
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/corro-admin/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,3 +21,4 @@ tracing = { workspace = true }
2121
tripwire = { path = "../tripwire" }
2222
rangemap = { workspace = true }
2323
uuid = { workspace = true }
24+
bytes = { workspace = true }

crates/corro-admin/src/lib.rs

Lines changed: 154 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
1-
use std::{fmt::Display, time::Duration};
1+
use std::{fmt::Display, net::SocketAddr, time::Duration};
22

3+
use bytes::BytesMut;
34
use camino::Utf8PathBuf;
5+
use corro_agent::{
6+
api::peer::{
7+
encode_write_bipayload_msg,
8+
follow::{read_follow_msg, FollowMessage, FollowMessageV1},
9+
},
10+
transport::Transport,
11+
};
412
use corro_types::{
513
actor::{ActorId, ClusterId},
614
agent::{Agent, Bookie, LockKind, LockMeta, LockState},
15+
api::SqliteValueRef,
716
base::{CrsqlDbVersion, CrsqlSeq, CrsqlSiteVersion},
8-
broadcast::{FocaCmd, FocaInput},
17+
broadcast::{BiPayload, Changeset, FocaCmd, FocaInput},
18+
pubsub::unpack_columns,
919
sqlite::SqlitePoolError,
1020
sync::generate_sync,
21+
updates::Handle,
1122
};
1223
use futures::{SinkExt, TryStreamExt};
1324
use rusqlite::{named_params, OptionalExtension};
@@ -21,7 +32,7 @@ use tokio::{
2132
task::block_in_place,
2233
};
2334
use tokio_serde::{formats::Json, Framed};
24-
use tokio_util::codec::LengthDelimitedCodec;
35+
use tokio_util::codec::{FramedRead, LengthDelimitedCodec};
2536
use tracing::{debug, error, info, warn};
2637
use tripwire::Tripwire;
2738
use uuid::Uuid;
@@ -41,6 +52,7 @@ pub struct AdminConfig {
4152
pub fn start_server(
4253
agent: Agent,
4354
bookie: Bookie,
55+
transport: Transport,
4456
config: AdminConfig,
4557
mut tripwire: Tripwire,
4658
) -> Result<(), AdminError> {
@@ -74,8 +86,9 @@ pub fn start_server(
7486
let agent = agent.clone();
7587
let bookie = bookie.clone();
7688
let config = config.clone();
89+
let transport = transport.clone();
7790
async move {
78-
if let Err(e) = handle_conn(agent, &bookie, config, stream).await {
91+
if let Err(e) = handle_conn(agent, &bookie, &transport, config, stream).await {
7992
error!("could not handle admin connection: {e}");
8093
}
8194
}
@@ -95,6 +108,16 @@ pub enum Command {
95108
Cluster(ClusterCommand),
96109
Actor(ActorCommand),
97110
Subs(SubsCommand),
111+
Debug(DebugCommand),
112+
}
113+
114+
#[derive(Debug, Clone, Serialize, Deserialize)]
115+
pub enum DebugCommand {
116+
Follow {
117+
peer_addr: SocketAddr,
118+
from: Option<u64>,
119+
local_only: bool,
120+
},
98121
}
99122

100123
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -193,6 +216,7 @@ impl From<LockMeta> for LockMetaElapsed {
193216
async fn handle_conn(
194217
agent: Agent,
195218
bookie: &Bookie,
219+
transport: &Transport,
196220
_config: AdminConfig,
197221
stream: UnixStream,
198222
) -> Result<(), AdminError> {
@@ -475,6 +499,132 @@ async fn handle_conn(
475499
}
476500
};
477501
}
502+
Command::Debug(DebugCommand::Follow {
503+
peer_addr,
504+
from,
505+
local_only,
506+
}) => match transport.open_bi(peer_addr).await {
507+
Ok((mut tx, recv)) => {
508+
let mut codec = LengthDelimitedCodec::builder()
509+
.max_frame_length(100 * 1_024 * 1_024)
510+
.new_codec();
511+
let mut encoding_buf = BytesMut::new();
512+
let mut buf = BytesMut::new();
513+
514+
if let Err(e) = encode_write_bipayload_msg(
515+
&mut codec,
516+
&mut encoding_buf,
517+
&mut buf,
518+
BiPayload::V1 {
519+
data: corro_types::broadcast::BiPayloadV1::Follow {
520+
from: from.map(CrsqlDbVersion),
521+
local_only,
522+
},
523+
cluster_id: agent.cluster_id(),
524+
},
525+
&mut tx,
526+
)
527+
.await
528+
{
529+
send_error(
530+
&mut stream,
531+
format!("could not send follow payload to {peer_addr}: {e}"),
532+
)
533+
.await;
534+
continue;
535+
}
536+
537+
let mut framed = FramedRead::new(
538+
recv,
539+
LengthDelimitedCodec::builder()
540+
.max_frame_length(100 * 1_024 * 1_024)
541+
.new_codec(),
542+
);
543+
544+
'msg: loop {
545+
match read_follow_msg(&mut framed).await {
546+
Ok(None) => {
547+
send_success(&mut stream).await;
548+
break;
549+
}
550+
Err(e) => {
551+
send_error(
552+
&mut stream,
553+
format!("error receiving follow message: {e}"),
554+
)
555+
.await;
556+
break;
557+
}
558+
Ok(Some(msg)) => {
559+
match msg {
560+
FollowMessage::V1(FollowMessageV1::Change(change)) => {
561+
let actor_id = change.actor_id;
562+
match change.changeset {
563+
Changeset::Full {
564+
version,
565+
changes,
566+
ts,
567+
..
568+
} => {
569+
if let Err(e) = stream
570+
.send(Response::Json(serde_json::json!({
571+
"actor_id": actor_id,
572+
"type": "full",
573+
"version": version,
574+
"ts": ts.to_string(),
575+
})))
576+
.await
577+
{
578+
warn!("could not send to steam, breaking ({e})");
579+
break;
580+
}
581+
582+
for change in changes {
583+
if let Err(e) = stream.send(
584+
Response::Json(
585+
serde_json::json!({
586+
"table": change.table,
587+
"pk": unpack_columns(&change.pk).unwrap().iter().map(SqliteValueRef::to_owned).collect::<Vec<_>>(),
588+
"cid": change.cid,
589+
"val": change.val,
590+
"col_version": change.col_version,
591+
"db_version": change.db_version,
592+
"seq": change.seq,
593+
"site_id": ActorId::from_bytes(change.site_id),
594+
"cl": change.cl,
595+
}),
596+
),
597+
)
598+
.await {
599+
warn!("could not send to steam, breaking ({e})");
600+
break 'msg;
601+
}
602+
}
603+
}
604+
changeset => {
605+
send_log(
606+
&mut stream,
607+
LogLevel::Warn,
608+
format!("unknown change type received: {changeset:?}"),
609+
)
610+
.await;
611+
}
612+
}
613+
}
614+
}
615+
}
616+
}
617+
}
618+
}
619+
Err(e) => {
620+
send_error(
621+
&mut stream,
622+
format!("could not open bi-directional stream with {peer_addr}: {e}"),
623+
)
624+
.await;
625+
continue;
626+
}
627+
},
478628
},
479629
Ok(None) => {
480630
debug!("done with admin conn");

crates/corro-agent/src/agent/bi.rs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::api::peer::serve_sync;
1+
use crate::api::peer::{follow::serve_follow, serve_sync};
22
use corro_types::{
33
agent::{Agent, Bookie},
44
broadcast::{BiPayload, BiPayloadV1},
@@ -56,7 +56,12 @@ pub fn spawn_bipayload_handler(
5656
let agent = agent.clone();
5757
let bookie = bookie.clone();
5858
async move {
59-
let mut framed = FramedRead::new(rx, LengthDelimitedCodec::builder().max_frame_length(100 * 1_024 * 1_024).new_codec());
59+
let mut framed = FramedRead::new(
60+
rx,
61+
LengthDelimitedCodec::builder()
62+
.max_frame_length(100 * 1_024 * 1_024)
63+
.new_codec(),
64+
);
6065

6166
loop {
6267
match timeout(Duration::from_secs(5), StreamExt::next(&mut framed)).await {
@@ -72,30 +77,38 @@ pub fn spawn_bipayload_handler(
7277
match BiPayload::read_from_buffer(&b) {
7378
Ok(payload) => {
7479
match payload {
75-
BiPayload::V1 {
76-
data:
77-
BiPayloadV1::SyncStart {
78-
actor_id,
79-
trace_ctx,
80-
},
81-
cluster_id,
82-
} => {
83-
trace!(
84-
"framed read buffer len: {}",
85-
framed.read_buffer().len()
86-
);
80+
BiPayload::V1 { data, cluster_id } => match data {
81+
BiPayloadV1::SyncStart {
82+
actor_id,
83+
trace_ctx,
84+
} => {
85+
trace!(
86+
"framed read buffer len: {}",
87+
framed.read_buffer().len()
88+
);
8789

88-
// println!("got sync state: {state:?}");
89-
if let Err(e) = serve_sync(
90-
&agent, &bookie, actor_id, trace_ctx,
91-
cluster_id, framed, tx,
92-
)
93-
.await
94-
{
95-
warn!("could not complete receiving sync: {e}");
90+
// println!("got sync state: {state:?}");
91+
if let Err(e) = serve_sync(
92+
&agent, &bookie, actor_id, trace_ctx,
93+
cluster_id, framed, tx,
94+
)
95+
.await
96+
{
97+
warn!("could not complete receiving sync: {e}");
98+
}
99+
break;
96100
}
97-
break;
98-
}
101+
BiPayloadV1::Follow { from, local_only } => {
102+
if let Err(e) = serve_follow(
103+
&agent, from, local_only, tx,
104+
)
105+
.await
106+
{
107+
warn!("could not complete follow: {e}");
108+
}
109+
break;
110+
}
111+
},
99112
}
100113
}
101114

crates/corro-agent/src/agent/run_root.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
metrics, setup, util, AgentOptions,
99
},
1010
broadcast::runtime_loop,
11+
transport::Transport,
1112
};
1213
use corro_types::{
1314
actor::ActorId,
@@ -26,12 +27,16 @@ use tripwire::Tripwire;
2627
///
2728
/// First initialise `AgentOptions` state via `setup()`, then spawn a
2829
/// new task that runs the main agent state machine
29-
pub async fn start_with_config(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Bookie)> {
30+
pub async fn start_with_config(
31+
conf: Config,
32+
tripwire: Tripwire,
33+
) -> eyre::Result<(Agent, Bookie, Transport)> {
3034
let (agent, opts) = setup(conf.clone(), tripwire.clone()).await?;
35+
let transport = opts.transport.clone();
3136

3237
let bookie = run(agent.clone(), opts, conf.perf).await?;
3338

34-
Ok((agent, bookie))
39+
Ok((agent, bookie, transport))
3540
}
3641

3742
async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Result<Bookie> {
@@ -49,6 +54,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
4954
rx_foca,
5055
subs_manager,
5156
subs_bcast_cache,
57+
updates_bcast_cache,
5258
rtt_rx,
5359
} = opts;
5460

@@ -96,6 +102,7 @@ async fn run(agent: Agent, opts: AgentOptions, pconf: PerfConfig) -> eyre::Resul
96102
&agent,
97103
&tripwire,
98104
subs_bcast_cache,
105+
updates_bcast_cache,
99106
&subs_manager,
100107
api_listeners,
101108
)

0 commit comments

Comments
 (0)