Skip to content

Commit 5269b13

Browse files
Fix remaining sync verification regressions
1 parent b804837 commit 5269b13

3 files changed

Lines changed: 16 additions & 19 deletions

File tree

crates/contextdb-server/src/sync_client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@ mod tests {
770770
let id = Uuid::new_v4();
771771
let mut values = HashMap::new();
772772
values.insert("id".to_string(), Value::Uuid(id));
773-
values.insert("data".to_string(), Value::Text("x".repeat(60 * 1024)));
773+
values.insert("data".to_string(), Value::Text("x".repeat(100 * 1024)));
774774
rows.push(RowChange {
775775
table: "observations".to_string(),
776776
natural_key: NaturalKey {
@@ -797,7 +797,7 @@ mod tests {
797797

798798
assert!(
799799
batches.len() >= 2,
800-
"10 rows * ~60KB each must split into at least 2 batches, got {}",
800+
"10 rows * ~100KB each must split into at least 2 batches, got {}",
801801
batches.len()
802802
);
803803
let total_rows: usize = batches.iter().map(|b| b.rows.len()).sum();

crates/contextdb-server/src/sync_server.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,17 +69,15 @@ impl SyncServer {
6969
loop {
7070
tokio::select! {
7171
maybe_msg = push_sub.next() => {
72-
if let Some(msg) = maybe_msg {
73-
if let Err(e) = self.handle_push(&client, msg).await {
74-
tracing::error!(error = %e, "handle_push failed");
75-
}
72+
if let Some(msg) = maybe_msg
73+
&& let Err(e) = self.handle_push(&client, msg).await {
74+
tracing::error!(error = %e, "handle_push failed");
7675
}
7776
}
7877
maybe_msg = pull_sub.next() => {
79-
if let Some(msg) = maybe_msg {
80-
if let Err(e) = self.handle_pull(&client, msg).await {
81-
tracing::error!(error = %e, "handle_pull failed");
82-
}
78+
if let Some(msg) = maybe_msg
79+
&& let Err(e) = self.handle_pull(&client, msg).await {
80+
tracing::error!(error = %e, "handle_pull failed");
8381
}
8482
}
8583
_ = cleanup_interval.tick() => {

crates/contextdb-server/tests/sync_integration.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,7 @@ async fn a15_concurrent_push_and_pull() {
10421042
use contextdb_core::Value;
10431043
use uuid::Uuid;
10441044

1045+
let nats = start_nats().await;
10451046
let server_db = Arc::new(Database::open_memory());
10461047
let edge_db = Arc::new(Database::open_memory());
10471048
let empty = HashMap::new();
@@ -1063,20 +1064,18 @@ async fn a15_concurrent_push_and_pull() {
10631064
.execute("INSERT INTO t (id, v) VALUES ($id, $v)", &p)
10641065
.unwrap();
10651066

1066-
// Register local server
10671067
let policies = ConflictPolicies::uniform(ConflictPolicy::InsertIfNotExists);
1068-
let _server = SyncServer::new(
1068+
let server = Arc::new(SyncServer::new(
10691069
server_db.clone(),
1070-
"nats://localhost:19999",
1070+
&nats.nats_url,
10711071
"concurrent-client-test",
10721072
policies.clone(),
1073-
);
1073+
));
1074+
let server_handle = server.clone();
1075+
tokio::spawn(async move { server_handle.run().await });
1076+
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
10741077

1075-
let client = SyncClient::new(
1076-
edge_db.clone(),
1077-
"nats://localhost:19999",
1078-
"concurrent-client-test",
1079-
);
1078+
let client = SyncClient::new(edge_db.clone(), &nats.nats_url, "concurrent-client-test");
10801079

10811080
// Insert data on edge (for push to send)
10821081
let edge_id = Uuid::new_v4();

0 commit comments

Comments
 (0)