Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,8 +1067,10 @@ mod tests {
use super::*;
use axum::{http::StatusCode, Extension, Json};
use corro_tests::TEST_SCHEMA;
use corro_types::api::{Change, ColumnName, TableName};
use corro_types::{base::CrsqlDbVersion, base::Version, config::Config, pubsub::pack_columns};
use corro_types::api::{ColumnName, TableName};
use corro_types::{
base::CrsqlDbVersion, base::Version, change::Change, config::Config, pubsub::pack_columns,
};
use rusqlite::Connection;
use std::sync::Arc;
use tokio::sync::Semaphore;
Expand Down
68 changes: 43 additions & 25 deletions crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use corro_types::change::Change;
use corro_types::{
actor::ActorId,
agent::migrate,
api::{row_to_change, ExecResponse, ExecResult, Statement},
api::{ExecResponse, ExecResult, Statement},
base::{CrsqlDbVersion, CrsqlSeq, Version},
broadcast::{ChangeSource, ChangeV1, Changeset},
change::store_empty_changeset,
Expand All @@ -47,6 +47,7 @@ use corro_types::{
use corro_types::{
agent::Agent,
api::{ColumnName, TableName},
change::row_to_change,
pubsub::pack_columns,
};

Expand Down Expand Up @@ -943,7 +944,8 @@ async fn process_failed_changes() -> eyre::Result<()> {
.await;
assert_eq!(status_code, StatusCode::OK);
}
let mut good_changes = get_rows(ta2.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;
let mut good_changes =
get_rows(ta2.agent.clone(), vec![(Version(1)..=Version(5), None)]).await?;

let change6 = Change {
table: TableName("tests".into()),
Expand All @@ -969,22 +971,20 @@ async fn process_failed_changes() -> eyre::Result<()> {
cl: 1,
};

let mut rows = vec![
(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(1),
changes: vec![change6.clone(), bad_change],
seqs: CrsqlSeq(0)..=CrsqlSeq(1),
last_seq: CrsqlSeq(1),
ts: Default::default(),
},
let mut rows = vec![(
ChangeV1 {
actor_id,
changeset: Changeset::Full {
version: Version(1),
changes: vec![change6.clone(), bad_change],
seqs: CrsqlSeq(0)..=CrsqlSeq(1),
last_seq: CrsqlSeq(1),
ts: Default::default(),
},
ChangeSource::Sync,
Instant::now(),
)
];
},
ChangeSource::Sync,
Instant::now(),
)];

rows.append(&mut good_changes);

Expand All @@ -997,7 +997,10 @@ async fn process_failed_changes() -> eyre::Result<()> {

for i in 1..=5_i64 {
let pk = pack_columns(&[i.into()])?;
let crsql_dbv = conn.prepare_cached(r#"SELECT db_version from crsql_changes where "table" = "tests" and pk = ?"#)?
let crsql_dbv = conn
.prepare_cached(
r#"SELECT db_version from crsql_changes where "table" = "tests" and pk = ?"#,
)?
.query_row([pk], |row| row.get::<_, CrsqlDbVersion>(0))?;

let booked_dbv = conn.prepare_cached("SELECT db_version from __corro_bookkeeping where start_version = ? and actor_id = ?")?
Expand Down Expand Up @@ -2204,7 +2207,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

let version = body.0.version.unwrap();

assert_eq!(version, Version(1));
assert_eq!(version, 1);

let conn = ta1.agent.pool().read().await?;

Expand All @@ -2221,7 +2224,12 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

assert_eq!(
bk,
vec![(ta1.agent.actor_id(), version, None, CrsqlDbVersion(1))]
vec![(
ta1.agent.actor_id(),
Version(version),
None,
CrsqlDbVersion(1)
)]
);

let mut changes = vec![];
Expand All @@ -2243,7 +2251,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
ChangeV1 {
actor_id: ta1.agent.actor_id(),
changeset: Changeset::Full {
version,
version: Version(version),
changes,
seqs: CrsqlSeq(0)..=last_seq,
last_seq,
Expand All @@ -2269,7 +2277,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {

let version = body.0.version.unwrap();

assert_eq!(version, Version(2));
assert_eq!(version, 2);

let bk: Vec<(ActorId, Version, Option<Version>, Option<CrsqlDbVersion>)> = conn
.prepare(
Expand All @@ -2284,7 +2292,12 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
bk,
vec![
(ta1.agent.actor_id(), Version(1), Some(Version(1)), None),
(ta1.agent.actor_id(), version, None, Some(CrsqlDbVersion(2)))
(
ta1.agent.actor_id(),
Version(version),
None,
Some(CrsqlDbVersion(2))
)
]
);

Expand All @@ -2305,7 +2318,7 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
ChangeV1 {
actor_id: ta1.agent.actor_id(),
changeset: Changeset::Full {
version,
version: Version(version),
changes,
seqs: CrsqlSeq(0)..=last_seq,
last_seq,
Expand Down Expand Up @@ -2338,7 +2351,12 @@ async fn test_automatic_bookkeeping_clearing() -> eyre::Result<()> {
None,
Some(CrsqlDbVersion(1))
),
(ta1.agent.actor_id(), version, None, Some(CrsqlDbVersion(2)))
(
ta1.agent.actor_id(),
Version(version),
None,
Some(CrsqlDbVersion(2))
)
]
);

Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1766,7 +1766,7 @@ mod tests {
assert_eq!(status_code, StatusCode::OK);

let version = body.0.version.unwrap();
assert_eq!(version, Version(i));
assert_eq!(version, i);
}

let dir = tempfile::tempdir()?;
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/api/public/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub async fn api_v1_transactions(
axum::Json(ExecResponse {
results,
time: elapsed.as_secs_f64(),
version,
version: version.map(Into::into),
}),
)
}
Expand Down
3 changes: 2 additions & 1 deletion crates/corro-agent/src/api/public/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,9 +883,10 @@ async fn forward_bytes_to_body_sender(
mod tests {
use corro_types::actor::ActorId;
use corro_types::api::NotifyEvent;
use corro_types::api::{Change, ColumnName, TableName};
use corro_types::api::{ColumnName, TableName};
use corro_types::base::{CrsqlDbVersion, CrsqlSeq, Version};
use corro_types::broadcast::{ChangeSource, ChangeV1, Changeset};
use corro_types::change::Change;
use corro_types::pubsub::pack_columns;
use corro_types::{
api::{ChangeId, RowId},
Expand Down
1 change: 0 additions & 1 deletion crates/corro-api-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,3 @@ speedy = { workspace = true }
strum = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
corro-base-types = { path = "../corro-base-types" }
50 changes: 2 additions & 48 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ use std::{
};

use compact_str::CompactString;
use corro_base_types::{CrsqlDbVersion, CrsqlSeq, Version};
use rusqlite::{
types::{FromSql, FromSqlError, ToSqlOutput, Value, ValueRef},
Row, ToSql,
ToSql,
};
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
Expand Down Expand Up @@ -220,7 +219,7 @@ impl From<&str> for Statement {
pub struct ExecResponse {
pub results: Vec<ExecResult>,
pub time: f64,
pub version: Option<Version>,
pub version: Option<u64>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand All @@ -241,51 +240,6 @@ pub struct TableStatResponse {
pub invalid_tables: Vec<String>,
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, Readable, Writable, PartialEq)]
pub struct Change {
pub table: TableName,
pub pk: Vec<u8>,
pub cid: ColumnName,
pub val: SqliteValue,
pub col_version: i64,
pub db_version: CrsqlDbVersion,
pub seq: CrsqlSeq,
pub site_id: [u8; 16],
pub cl: i64,
}

impl Change {
// this is an ESTIMATE, it should give a rough idea of how many bytes will
// be required on the wire
pub fn estimated_byte_size(&self) -> usize {
self.table.len() + self.pk.len() + self.cid.len() + self.val.estimated_byte_size() +
// col_version
8 +
// db_version
8 +
// seq
8 +
// site_id
16 +
// cl
8
}
}

pub fn row_to_change(row: &Row) -> Result<Change, rusqlite::Error> {
Ok(Change {
table: row.get(0)?,
pk: row.get(1)?,
cid: row.get(2)?,
val: row.get(3)?,
col_version: row.get(4)?,
db_version: row.get(5)?,
seq: row.get(6)?,
site_id: row.get(7)?,
cl: row.get(8)?,
})
}

#[derive(Debug, Copy, Clone, PartialEq)]
pub struct SqliteValueRef<'a>(pub ValueRef<'a>);

Expand Down
6 changes: 6 additions & 0 deletions crates/corro-base-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ use speedy::{Context, Readable, Writable};
#[serde(transparent)]
pub struct Version(pub u64);

impl From<Version> for u64 {
fn from(v: Version) -> Self {
v.0
}
}

impl Step for Version {
fn steps_between(start: &Self, end: &Self) -> Option<usize> {
u64::steps_between(&start.0, &end.0)
Expand Down
3 changes: 1 addition & 2 deletions crates/corro-types/src/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use std::{
};

use bytes::{Bytes, BytesMut};
use corro_api_types::{row_to_change, Change};
use foca::{Identity, Member, Notification, Runtime, Timer};
use itertools::Itertools;
use metrics::counter;
Expand All @@ -28,7 +27,7 @@ use crate::{
actor::{Actor, ActorId, ClusterId},
agent::Agent,
base::{CrsqlDbVersion, CrsqlSeq, Version},
change::{ChunkedChanges, MAX_CHANGES_BYTE_SIZE},
change::{row_to_change, Change, ChunkedChanges, MAX_CHANGES_BYTE_SIZE},
channel::CorroSender,
sqlite::SqlitePoolError,
sync::SyncTraceContextV1,
Expand Down
51 changes: 49 additions & 2 deletions crates/corro-types/src/change.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{iter::Peekable, ops::RangeInclusive, time::Instant};

pub use corro_api_types::{row_to_change, Change, SqliteValue};
pub use corro_api_types::SqliteValue;
use corro_api_types::{ColumnName, TableName};
use corro_base_types::{CrsqlDbVersion, Version};
use rangemap::RangeInclusiveSet;
use rusqlite::{named_params, params, Connection};
use rusqlite::{named_params, params, Connection, Row};
use speedy::{Readable, Writable};
use tracing::{debug, trace, warn};

use crate::{
Expand All @@ -13,6 +15,51 @@ use crate::{
broadcast::Timestamp,
};

#[derive(Debug, Default, Clone, Readable, Writable, PartialEq)]
pub struct Change {
pub table: TableName,
pub pk: Vec<u8>,
pub cid: ColumnName,
pub val: SqliteValue,
pub col_version: i64,
pub db_version: CrsqlDbVersion,
pub seq: CrsqlSeq,
pub site_id: [u8; 16],
pub cl: i64,
}

impl Change {
// this is an ESTIMATE, it should give a rough idea of how many bytes will
// be required on the wire
pub fn estimated_byte_size(&self) -> usize {
self.table.len() + self.pk.len() + self.cid.len() + self.val.estimated_byte_size() +
// col_version
8 +
// db_version
8 +
// seq
8 +
// site_id
16 +
// cl
8
}
}

pub fn row_to_change(row: &Row) -> Result<Change, rusqlite::Error> {
Ok(Change {
table: row.get(0)?,
pk: row.get(1)?,
cid: row.get(2)?,
val: row.get(3)?,
col_version: row.get(4)?,
db_version: row.get(5)?,
seq: row.get(6)?,
site_id: row.get(7)?,
cl: row.get(8)?,
})
}

pub struct ChunkedChanges<I: Iterator> {
iter: Peekable<I>,
changes: Vec<Change>,
Expand Down
Loading
Loading