@@ -84,14 +84,15 @@ const COMPACT_BOOKED_INTERVAL: Duration = Duration::from_secs(300);
84
84
const ANNOUNCE_INTERVAL : Duration = Duration :: from_secs ( 300 ) ;
85
85
86
86
pub struct AgentOptions {
87
- actor_id : ActorId ,
88
- gossip_server_endpoint : quinn:: Endpoint ,
89
- transport : Transport ,
90
- api_listener : TcpListener ,
91
- rx_bcast : Receiver < BroadcastInput > ,
92
- rx_apply : Receiver < ( ActorId , i64 ) > ,
93
- rtt_rx : Receiver < ( SocketAddr , Duration ) > ,
94
- tripwire : Tripwire ,
87
+ pub actor_id : ActorId ,
88
+ pub gossip_server_endpoint : quinn:: Endpoint ,
89
+ pub transport : Transport ,
90
+ pub api_listener : TcpListener ,
91
+ pub rx_bcast : Receiver < BroadcastInput > ,
92
+ pub rx_apply : Receiver < ( ActorId , i64 ) > ,
93
+ pub rx_empty : Receiver < ( ActorId , RangeInclusive < i64 > ) > ,
94
+ pub rtt_rx : Receiver < ( SocketAddr , Duration ) > ,
95
+ pub tripwire : Tripwire ,
95
96
}
96
97
97
98
pub async fn setup ( conf : Config , tripwire : Tripwire ) -> eyre:: Result < ( Agent , AgentOptions ) > {
@@ -261,13 +262,16 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
261
262
262
263
let ( tx_bcast, rx_bcast) = channel ( 10240 ) ;
263
264
265
+ let ( tx_empty, rx_empty) = channel ( 10240 ) ;
266
+
264
267
let opts = AgentOptions {
265
268
actor_id,
266
269
gossip_server_endpoint,
267
270
transport,
268
271
api_listener,
269
272
rx_bcast,
270
273
rx_apply,
274
+ rx_empty,
271
275
rtt_rx,
272
276
tripwire : tripwire. clone ( ) ,
273
277
} ;
@@ -283,6 +287,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
283
287
bookie,
284
288
tx_bcast,
285
289
tx_apply,
290
+ tx_empty,
286
291
schema : RwLock :: new ( schema) ,
287
292
tripwire,
288
293
} ) ;
@@ -307,6 +312,7 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
307
312
mut tripwire,
308
313
rx_bcast,
309
314
rx_apply,
315
+ rx_empty,
310
316
rtt_rx,
311
317
} = opts;
312
318
@@ -893,8 +899,14 @@ pub async fn run(agent: Agent, opts: AgentOptions) -> eyre::Result<()> {
893
899
) ;
894
900
895
901
spawn_counted (
896
- sync_loop ( agent. clone ( ) , transport. clone ( ) , rx_apply, tripwire. clone ( ) )
897
- . inspect ( |_| info ! ( "corrosion agent sync loop is done" ) ) ,
902
+ sync_loop (
903
+ agent. clone ( ) ,
904
+ transport. clone ( ) ,
905
+ rx_apply,
906
+ rx_empty,
907
+ tripwire. clone ( ) ,
908
+ )
909
+ . inspect ( |_| info ! ( "corrosion agent sync loop is done" ) ) ,
898
910
) ;
899
911
900
912
let mut db_cleanup_interval = tokio:: time:: interval ( Duration :: from_secs ( 60 * 15 ) ) ;
@@ -987,7 +999,15 @@ async fn clear_overwritten_versions(agent: Agent) {
987
999
{
988
1000
let booked = bookie. read ( ) . await ;
989
1001
for ( actor_id, booked) in booked. iter ( ) {
990
- let versions = booked. read ( ) . await . current_versions ( ) ;
1002
+ let versions = {
1003
+ match timeout ( Duration :: from_secs ( 1 ) , booked. read ( ) ) . await {
1004
+ Ok ( booked) => booked. current_versions ( ) ,
1005
+ Err ( _) => {
1006
+ info ! ( %actor_id, "timed out acquiring read lock on bookkeeping, skipping for now" ) ;
1007
+ continue ;
1008
+ }
1009
+ }
1010
+ } ;
991
1011
if versions. is_empty ( ) {
992
1012
continue ;
993
1013
}
@@ -1081,7 +1101,7 @@ async fn clear_overwritten_versions(agent: Agent) {
1081
1101
db_version = NULL,
1082
1102
last_seq = NULL,
1083
1103
ts = NULL
1084
- WHERE end_version != excluded.end_version
1104
+ WHERE end_version < excluded.end_version
1085
1105
" ,
1086
1106
) ?
1087
1107
. execute ( params ! [ actor_id, range. start( ) , range. end( ) ] ) ?;
@@ -1271,12 +1291,16 @@ fn find_cleared_db_versions(tx: &Transaction) -> rusqlite::Result<BTreeSet<i64>>
1271
1291
. query_map ( [ ] , |row| row. get ( 0 ) ) ?
1272
1292
. collect :: < Result < BTreeSet < String > , _ > > ( ) ?;
1273
1293
1294
+ if tables. is_empty ( ) {
1295
+ return Ok ( BTreeSet :: new ( ) ) ;
1296
+ }
1297
+
1274
1298
let to_clear_query = format ! (
1275
1299
"SELECT DISTINCT(db_version) FROM __corro_bookkeeping WHERE db_version IS NOT NULL
1276
1300
EXCEPT SELECT db_version FROM ({});" ,
1277
1301
tables
1278
1302
. iter( )
1279
- . map( |table| format!( "SELECT DISTINCT(__crsql_db_version) AS db_version FROM {table}" ) )
1303
+ . map( |table| format!( "SELECT DISTINCT(db_version) FROM {table}" ) )
1280
1304
. collect:: <Vec <_>>( )
1281
1305
. join( " UNION " )
1282
1306
) ;
@@ -1585,7 +1609,12 @@ fn store_empty_changeset(
1585
1609
"
1586
1610
INSERT INTO __corro_bookkeeping (actor_id, start_version, end_version, db_version, ts)
1587
1611
VALUES (?, ?, ?, ?, ?)
1588
- ON CONFLICT (actor_id, start_version) DO NOTHING;
1612
+ ON CONFLICT (actor_id, start_version) DO UPDATE SET
1613
+ end_version = excluded.end_version,
1614
+ db_version = NULL,
1615
+ last_seq = NULL,
1616
+ ts = NULL
1617
+ WHERE end_version < excluded.end_version;
1589
1618
" ,
1590
1619
) ?
1591
1620
. execute ( params ! [
@@ -1830,17 +1859,36 @@ pub async fn process_multiple_changes(
1830
1859
continue ;
1831
1860
}
1832
1861
1833
- let tx = conn . transaction ( ) ? ;
1834
-
1835
- let ( known , changeset ) = match process_single_version ( & tx , change ) {
1836
- Ok ( res ) => res ,
1837
- Err ( e ) => {
1838
- error ! ( %actor_id , ?versions , "could not process single change: {e}" ) ;
1839
- continue ;
1862
+ // optimizing this, insert later!
1863
+ let ( known , changeset ) = if change . is_complete ( ) && change . is_empty ( ) {
1864
+ if let Err ( e ) = agent
1865
+ . tx_empty ( )
1866
+ . blocking_send ( ( actor_id , change . versions ( ) ) )
1867
+ {
1868
+ error ! ( "could not send empty changed versions into channel: {e}" ) ;
1840
1869
}
1841
- } ;
1870
+ // insert into in-memory bookkeeping right away
1871
+ booked_write. insert_many ( change. versions ( ) , KnownDbVersion :: Cleared ) ;
1872
+ (
1873
+ KnownDbVersion :: Cleared ,
1874
+ Changeset :: Empty {
1875
+ versions : change. versions ( ) ,
1876
+ } ,
1877
+ )
1878
+ } else {
1879
+ let tx = conn. transaction ( ) ?;
1842
1880
1843
- tx. commit ( ) ?;
1881
+ let ( known, changeset) = match process_single_version ( & tx, change) {
1882
+ Ok ( res) => res,
1883
+ Err ( e) => {
1884
+ error ! ( %actor_id, ?versions, "could not process single change: {e}" ) ;
1885
+ continue ;
1886
+ }
1887
+ } ;
1888
+
1889
+ tx. commit ( ) ?;
1890
+ ( known, changeset)
1891
+ } ;
1844
1892
1845
1893
seen. insert ( versions. clone ( ) , known. clone ( ) ) ;
1846
1894
@@ -1976,23 +2024,15 @@ fn process_complete_version(
1976
2024
tx : & Transaction ,
1977
2025
actor_id : ActorId ,
1978
2026
versions : RangeInclusive < i64 > ,
1979
- parts : Option < ChangesetParts > ,
2027
+ parts : ChangesetParts ,
1980
2028
) -> rusqlite:: Result < ( KnownDbVersion , Changeset ) > {
1981
2029
let ChangesetParts {
1982
2030
version,
1983
2031
changes,
1984
2032
seqs,
1985
2033
last_seq,
1986
2034
ts,
1987
- } = match parts {
1988
- None => {
1989
- store_empty_changeset ( tx, actor_id, versions. clone ( ) ) ?;
1990
- info ! ( %actor_id, ?versions, "cleared empty versions range" ) ;
1991
- // booked_write.insert_many(versions.clone(), KnownDbVersion::Cleared);
1992
- return Ok ( ( KnownDbVersion :: Cleared , Changeset :: Empty { versions } ) ) ;
1993
- }
1994
- Some ( parts) => parts,
1995
- } ;
2035
+ } = parts;
1996
2036
1997
2037
info ! ( %actor_id, version, "complete change, applying right away! seqs: {seqs:?}, last_seq: {last_seq}" ) ;
1998
2038
@@ -2099,7 +2139,14 @@ fn process_single_version(
2099
2139
let versions = changeset. versions ( ) ;
2100
2140
2101
2141
let ( known, changeset) = if changeset. is_complete ( ) {
2102
- process_complete_version ( tx, actor_id, versions, changeset. into_parts ( ) ) ?
2142
+ process_complete_version (
2143
+ tx,
2144
+ actor_id,
2145
+ versions,
2146
+ changeset
2147
+ . into_parts ( )
2148
+ . expect ( "no changeset parts, this shouldn't be happening!" ) ,
2149
+ ) ?
2103
2150
} else {
2104
2151
let parts = changeset. into_parts ( ) . unwrap ( ) ;
2105
2152
let known = process_incomplete_version ( tx, actor_id, & parts) ?;
@@ -2285,10 +2332,13 @@ async fn handle_sync(agent: &Agent, transport: &Transport) -> Result<(), SyncCli
2285
2332
Ok ( ( ) )
2286
2333
}
2287
2334
2335
+ const CHECK_EMPTIES_TO_INSERT_AFTER : Duration = Duration :: from_secs ( 120 ) ;
2336
+
2288
2337
async fn sync_loop (
2289
2338
agent : Agent ,
2290
2339
transport : Transport ,
2291
2340
mut rx_apply : Receiver < ( ActorId , i64 ) > ,
2341
+ mut rx_empty : Receiver < ( ActorId , RangeInclusive < i64 > ) > ,
2292
2342
mut tripwire : Tripwire ,
2293
2343
) {
2294
2344
let mut sync_backoff = backoff:: Backoff :: new ( 0 )
@@ -2297,6 +2347,62 @@ async fn sync_loop(
2297
2347
let next_sync_at = tokio:: time:: sleep ( sync_backoff. next ( ) . unwrap ( ) ) ;
2298
2348
tokio:: pin!( next_sync_at) ;
2299
2349
2350
+ spawn_counted ( {
2351
+ let mut tripwire = tripwire. clone ( ) ;
2352
+ let agent = agent. clone ( ) ;
2353
+ async move {
2354
+ let mut inserted_empties = 0 ;
2355
+ let mut empties: BTreeMap < ActorId , Vec < RangeInclusive < i64 > > > = BTreeMap :: new ( ) ;
2356
+
2357
+ let next_empties_check = tokio:: time:: sleep ( CHECK_EMPTIES_TO_INSERT_AFTER ) ;
2358
+ tokio:: pin!( next_empties_check) ;
2359
+
2360
+ loop {
2361
+ tokio:: select! {
2362
+ maybe_empty = rx_empty. recv( ) => match maybe_empty {
2363
+ Some ( ( actor_id, versions) ) => {
2364
+ empties. entry( actor_id) . or_default( ) . push( versions) ;
2365
+ inserted_empties += 1 ;
2366
+
2367
+ if inserted_empties < 1000 {
2368
+ continue ;
2369
+ }
2370
+ } ,
2371
+ None => {
2372
+ debug!( "empties queue is done" ) ;
2373
+ break ;
2374
+ }
2375
+ } ,
2376
+ _ = & mut next_empties_check => {
2377
+ next_empties_check. as_mut( ) . reset( tokio:: time:: Instant :: now( ) + CHECK_EMPTIES_TO_INSERT_AFTER ) ;
2378
+ if empties. is_empty( ) {
2379
+ continue ;
2380
+ }
2381
+ } ,
2382
+ _ = & mut tripwire => break
2383
+ }
2384
+
2385
+ inserted_empties = 0 ;
2386
+
2387
+ if let Err ( e) = process_completed_empties ( & agent, & mut empties) . await {
2388
+ error ! ( "could not process empties: {e}" ) ;
2389
+ }
2390
+ }
2391
+ info ! ( "Draining empty versions to process..." ) ;
2392
+ // drain empties channel
2393
+ while let Ok ( ( actor_id, versions) ) = rx_empty. try_recv ( ) {
2394
+ empties. entry ( actor_id) . or_default ( ) . push ( versions) ;
2395
+ }
2396
+
2397
+ if !empties. is_empty ( ) {
2398
+ info ! ( "inserting last unprocessed empties before shut down" ) ;
2399
+ if let Err ( e) = process_completed_empties ( & agent, & mut empties) . await {
2400
+ error ! ( "could not process empties: {e}" ) ;
2401
+ }
2402
+ }
2403
+ }
2404
+ } ) ;
2405
+
2300
2406
loop {
2301
2407
enum Branch {
2302
2408
Tick ,
@@ -2362,6 +2468,33 @@ async fn sync_loop(
2362
2468
}
2363
2469
}
2364
2470
2471
+ async fn process_completed_empties (
2472
+ agent : & Agent ,
2473
+ empties : & mut BTreeMap < ActorId , Vec < RangeInclusive < i64 > > > ,
2474
+ ) -> eyre:: Result < ( ) > {
2475
+ let mut conn = agent. pool ( ) . write_normal ( ) . await ?;
2476
+
2477
+ block_in_place ( || {
2478
+ let tx = conn. transaction ( ) ?;
2479
+ while let Some ( ( actor_id, empties) ) = empties. pop_first ( ) {
2480
+ let booked = agent. bookie ( ) . for_actor_blocking ( actor_id) ;
2481
+ let bookedw = booked. blocking_write ( ) ;
2482
+
2483
+ for ( range, _) in empties
2484
+ . iter ( )
2485
+ . filter_map ( |range| bookedw. get_key_value ( range. start ( ) ) )
2486
+ . dedup ( )
2487
+ {
2488
+ store_empty_changeset ( & tx, actor_id, range. clone ( ) ) ?;
2489
+ }
2490
+ }
2491
+
2492
+ tx. commit ( ) ?;
2493
+
2494
+ Ok ( ( ) )
2495
+ } )
2496
+ }
2497
+
2365
2498
pub fn migrate ( conn : & mut Connection ) -> rusqlite:: Result < ( ) > {
2366
2499
let migrations: Vec < Box < dyn Migration > > = vec ! [
2367
2500
Box :: new( init_migration as fn ( & Transaction ) -> rusqlite:: Result <( ) >) ,
@@ -2916,10 +3049,10 @@ pub mod tests {
2916
3049
2917
3050
conn. execute_batch (
2918
3051
"
2919
- CREATE TABLE foo (a INTEGER PRIMARY KEY, b INTEGER);
3052
+ CREATE TABLE foo (a INTEGER NOT NULL PRIMARY KEY, b INTEGER);
2920
3053
SELECT crsql_as_crr('foo');
2921
3054
2922
- CREATE TABLE foo2 (a INTEGER PRIMARY KEY, b INTEGER);
3055
+ CREATE TABLE foo2 (a INTEGER NOT NULL PRIMARY KEY, b INTEGER);
2923
3056
SELECT crsql_as_crr('foo2');
2924
3057
" ,
2925
3058
) ?;
@@ -2950,7 +3083,7 @@ pub mod tests {
2950
3083
}
2951
3084
2952
3085
{
2953
- let mut prepped = conn. prepare ( "SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo2__crsql_clock UNION SELECT DISTINCT(__crsql_db_version) AS db_version FROM foo__crsql_clock;" ) ?;
3086
+ let mut prepped = conn. prepare ( "SELECT DISTINCT(db_version) FROM foo2__crsql_clock UNION SELECT DISTINCT(db_version) FROM foo__crsql_clock;" ) ?;
2954
3087
let mut rows = prepped. query ( [ ] ) ?;
2955
3088
2956
3089
while let Ok ( Some ( row) ) = rows. next ( ) {
0 commit comments