@@ -203,6 +203,7 @@ fd_topo_initialize( config_t * config ) {
203
203
ulong verify_tile_cnt = config -> layout .verify_tile_count ;
204
204
ulong bank_tile_cnt = config -> layout .bank_tile_count ;
205
205
ulong exec_tile_cnt = config -> layout .exec_tile_count ;
206
+ ulong writer_tile_cnt = config -> layout .writer_tile_count ;
206
207
207
208
int enable_rpc = ( config -> rpc .port != 0 );
208
209
@@ -236,6 +237,8 @@ fd_topo_initialize( config_t * config ) {
236
237
fd_topob_wksp ( topo , "sign_gossip" );
237
238
238
239
fd_topob_wksp ( topo , "replay_exec" );
240
+ fd_topob_wksp ( topo , "replay_wtr" );
241
+ fd_topob_wksp ( topo , "exec_writer" );
239
242
240
243
fd_topob_wksp ( topo , "voter_sign" );
241
244
fd_topob_wksp ( topo , "sign_voter" );
@@ -282,6 +285,7 @@ fd_topo_initialize( config_t * config ) {
282
285
fd_topob_wksp ( topo , "replay" );
283
286
fd_topob_wksp ( topo , "runtime_pub" );
284
287
fd_topob_wksp ( topo , "exec" );
288
+ fd_topob_wksp ( topo , "writer" );
285
289
fd_topob_wksp ( topo , "bhole" );
286
290
fd_topob_wksp ( topo , "bstore" );
287
291
fd_topob_wksp ( topo , "tcache" );
@@ -294,6 +298,7 @@ fd_topo_initialize( config_t * config ) {
294
298
fd_topob_wksp ( topo , "restart" );
295
299
fd_topob_wksp ( topo , "exec_spad" );
296
300
fd_topob_wksp ( topo , "exec_fseq" );
301
+ fd_topob_wksp ( topo , "writer_fseq" );
297
302
298
303
if ( enable_rpc ) fd_topob_wksp ( topo , "rpcsrv" );
299
304
@@ -319,7 +324,20 @@ fd_topo_initialize( config_t * config ) {
319
324
/**/ fd_topob_link( topo, " sign_gossip ", " sign_gossip ", 128UL, 64UL, 1UL );
320
325
/* TODO: The MTU is currently relatively arbitrary and needs to be resized to the size of the largest
321
326
message that is outbound from the replay to exec. */
322
- FOR (exec_tile_cnt ) fd_topob_link ( topo , "replay_exec" , "replay_exec" , 128UL , 10240UL , exec_tile_cnt );
327
+ FOR (exec_tile_cnt ) fd_topob_link ( topo , "replay_exec" , "replay_exec" , 128UL , 10240UL , exec_tile_cnt );
328
+ FOR (writer_tile_cnt ) fd_topob_link ( topo , "replay_wtr ", " replay_wtr ", 128UL, FD_REPLAY_WRITER_MTU, 1UL );
329
+ /* Assuming the number of writer tiles is sufficient to keep up with
330
+ the number of exec tiles, under equilibrium, we should have at least
331
+ enough link space to buffer worst case input shuffling done by the
332
+ stem. That is, when a link is so unlucky, that the stem RNG decided
333
+ to process every other link except this one, for all writer tiles.
334
+ This would be fd_ulong_pow2_up( exec_tile_cnt*writer_tile_cnt+1UL ).
335
+
336
+ This is all assuming we have true pipelining between exec and writer
337
+ tiles. Right now, we don't. So in reality there can be at most 1
338
+ in-flight transaction per exec tile, and hence a depth of 1 is in
339
+ theory sufficient for each exec_writer link. */
340
+ FOR (exec_tile_cnt ) fd_topob_link ( topo , "exec_writer" , "exec_writer" , 128UL , FD_EXEC_WRITER_MTU , 1UL );
323
341
324
342
/**/ fd_topob_link ( topo , "gossip_verif" , "gossip_verif" , config -> tiles .verify .receive_buffer_size , FD_TPU_MTU , 1UL );
325
343
/**/ fd_topob_link ( topo , "gossip_eqvoc" , "gossip_eqvoc" , 128UL , FD_TPU_MTU , 1UL );
@@ -412,6 +430,7 @@ fd_topo_initialize( config_t * config ) {
412
430
413
431
/**/ fd_topob_tile( topo, " replay ", " replay ", " metric_in ", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
414
432
FOR (exec_tile_cnt ) fd_topob_tile ( topo , "exec" , "exec" , "metric_in" , tile_to_cpu [ topo -> tile_cnt ], 0 , 0 );
433
+ FOR (writer_tile_cnt ) fd_topob_tile ( topo , "writer ", " writer ", " metric_in ", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
415
434
/**/ fd_topob_tile( topo, " batch ", " batch ", " metric_in ", tile_to_cpu[ topo->tile_cnt ], 0, 0 );
416
435
/* TODO: not launching the restart tile if in_wen_restart is false */
417
436
//if( FD_UNLIKELY( config->tiles.restart.in_wen_restart ) ) {
@@ -425,7 +444,6 @@ fd_topo_initialize( config_t * config ) {
425
444
fd_topo_tile_t * repair_tile = & topo -> tiles [ fd_topo_find_tile ( topo , "repair" , 0UL ) ];
426
445
fd_topo_tile_t * batch_tile = & topo -> tiles [ fd_topo_find_tile ( topo , "batch" , 0UL ) ];
427
446
fd_topo_tile_t * pack_tile = & topo -> tiles [ fd_topo_find_tile ( topo , "pack" , 0UL ) ];
428
- fd_topo_tile_t * exec_tile = & topo -> tiles [ fd_topo_find_tile ( topo , "exec" , 0UL ) ];
429
447
430
448
/* Create a shared blockstore to be used by store and replay. */
431
449
fd_topo_obj_t * blockstore_obj = setup_topo_blockstore ( topo ,
@@ -448,8 +466,9 @@ fd_topo_initialize( config_t * config ) {
448
466
fd_topo_obj_t * runtime_pub_obj = setup_topo_runtime_pub ( topo , "runtime_pub" );
449
467
fd_topob_tile_uses ( topo , replay_tile , runtime_pub_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
450
468
fd_topob_tile_uses ( topo , batch_tile , runtime_pub_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
451
- fd_topob_tile_uses ( topo , pack_tile , runtime_pub_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
452
- fd_topob_tile_uses ( topo , exec_tile , runtime_pub_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
469
+ fd_topob_tile_uses ( topo , pack_tile , runtime_pub_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
470
+ FOR (exec_tile_cnt ) fd_topob_tile_uses ( topo , & topo -> tiles [ fd_topo_find_tile ( topo , "exec" , i ) ], runtime_pub_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
471
+ FOR (writer_tile_cnt ) fd_topob_tile_uses ( topo , & topo -> tiles [ fd_topo_find_tile ( topo , "writer" , i ) ], runtime_pub_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
453
472
FD_TEST ( fd_pod_insertf_ulong ( topo -> props , runtime_pub_obj -> id , "runtime_pub" ) );
454
473
455
474
/* Create a txncache to be used by replay. */
@@ -470,15 +489,26 @@ fd_topo_initialize( config_t * config ) {
470
489
for ( ulong i = 0UL ; i < exec_tile_cnt ; i ++ ) {
471
490
fd_topo_obj_t * exec_spad_obj = fd_topob_obj ( topo , "exec_spad" , "exec_spad" );
472
491
fd_topob_tile_uses ( topo , replay_tile , exec_spad_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
473
- fd_topob_tile_uses ( topo , exec_tile , exec_spad_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
492
+ fd_topob_tile_uses ( topo , & topo -> tiles [ fd_topo_find_tile ( topo , "exec" , i ) ], exec_spad_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
493
+ for ( ulong j = 0UL ; j < writer_tile_cnt ; j ++ ) {
494
+ /* For txn_ctx. */
495
+ fd_topob_tile_uses ( topo , & topo -> tiles [ fd_topo_find_tile ( topo , "writer" , j ) ], exec_spad_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
496
+ }
474
497
FD_TEST ( fd_pod_insertf_ulong ( topo -> props , exec_spad_obj -> id , "exec_spad.%lu" , i ) );
475
498
}
476
499
477
500
for ( ulong i = 0UL ; i < exec_tile_cnt ; i ++ ) {
478
- fd_topo_obj_t * exec_spad_obj = fd_topob_obj ( topo , "fseq" , "exec_fseq" );
479
- fd_topob_tile_uses ( topo , exec_tile , exec_spad_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
480
- fd_topob_tile_uses ( topo , replay_tile , exec_spad_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
481
- FD_TEST ( fd_pod_insertf_ulong ( topo -> props , exec_spad_obj -> id , "exec_fseq.%lu" , i ) );
501
+ fd_topo_obj_t * exec_fseq_obj = fd_topob_obj ( topo , "fseq" , "exec_fseq" );
502
+ fd_topob_tile_uses ( topo , & topo -> tiles [ fd_topo_find_tile ( topo , "exec" , i ) ], exec_fseq_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
503
+ fd_topob_tile_uses ( topo , replay_tile , exec_fseq_obj , FD_SHMEM_JOIN_MODE_READ_ONLY );
504
+ FD_TEST ( fd_pod_insertf_ulong ( topo -> props , exec_fseq_obj -> id , "exec_fseq.%lu" , i ) );
505
+ }
506
+
507
+ for ( ulong i = 0UL ; i < writer_tile_cnt ; i ++ ) {
508
+ fd_topo_obj_t * writer_fseq_obj = fd_topob_obj ( topo , "fseq" , "writer_fseq" );
509
+ fd_topob_tile_uses ( topo , & topo -> tiles [ fd_topo_find_tile ( topo , "writer" , i ) ], writer_fseq_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
510
+ fd_topob_tile_uses ( topo , replay_tile , writer_fseq_obj , FD_SHMEM_JOIN_MODE_READ_WRITE );
511
+ FD_TEST ( fd_pod_insertf_ulong ( topo -> props , writer_fseq_obj -> id , "writer_fseq.%lu" , i ) );
482
512
}
483
513
484
514
/* There's another special fseq that's used to communicate the shred
@@ -607,10 +637,16 @@ fd_topo_initialize( config_t * config ) {
607
637
/**/ fd_topob_tile_in ( topo , "replay" , 0UL , "metric_in" , "batch_replay" , 0UL , FD_TOPOB_RELIABLE , FD_TOPOB_POLLED );
608
638
/**/ fd_topob_tile_out ( topo , "replay" , 0UL , "replay_voter" , 0UL );
609
639
FOR (bank_tile_cnt ) fd_topob_tile_out ( topo , "replay ", 0UL, " replay_poh ", i );
610
- FOR (exec_tile_cnt ) fd_topob_tile_out ( topo , "replay" , 0UL , "replay_exec" , i ); /* TODO check order in fd_replay.c macros*/
611
-
640
+ FOR (exec_tile_cnt ) fd_topob_tile_out ( topo , "replay" , 0UL , "replay_exec" , i ); /* TODO check order in fd_replay.c macros*/
641
+ FOR ( writer_tile_cnt ) fd_topob_tile_out ( topo , " replay ", 0UL, " replay_wtr ", i );
612
642
613
- FOR (exec_tile_cnt ) fd_topob_tile_in ( topo , "exec ", i, " metric_in ", " replay_exec ", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
643
+ FOR (exec_tile_cnt ) fd_topob_tile_in ( topo , "exec" , i , "metric_in" , "replay_exec" , i , FD_TOPOB_RELIABLE , FD_TOPOB_POLLED );
644
+ FOR (exec_tile_cnt ) fd_topob_tile_out ( topo , "exec ", i, " exec_writer ", i );
645
+ /* All writer tiles read from all exec tiles. Each exec tile has a
646
+ single out link, over which all the writer tiles round-robin. */
647
+ FOR (writer_tile_cnt ) for ( ulong j = 0UL ; j < exec_tile_cnt ; j ++ )
648
+ fd_topob_tile_in ( topo , "writer" , i , "metric_in" , "exec_writer" , j , FD_TOPOB_RELIABLE , FD_TOPOB_POLLED );
649
+ FOR (writer_tile_cnt ) fd_topob_tile_in ( topo , "writer ", i, " metric_in ", " replay_wtr ", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
614
650
615
651
/**/ fd_topob_tile_in( topo, " sender ", 0UL, " metric_in ", " stake_out ", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
616
652
/**/ fd_topob_tile_in( topo, " sender ", 0UL, " metric_in ", " gossip_voter ", 0UL, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
@@ -816,7 +852,8 @@ fd_topo_initialize( config_t * config ) {
816
852
strncpy ( tile -> replay .status_cache , config -> tiles .replay .status_cache , sizeof (tile -> replay .status_cache ) );
817
853
strncpy ( tile -> replay .cluster_version , config -> tiles .replay .cluster_version , sizeof (tile -> replay .cluster_version ) );
818
854
tile -> replay .bank_tile_count = config -> layout .bank_tile_count ;
819
- tile -> replay .exec_tile_count = config -> layout .exec_tile_count ;
855
+ tile -> replay .exec_tile_count = config -> layout .exec_tile_count ;
856
+ tile -> replay .writer_tile_cuont = config -> layout .writer_tile_count ;
820
857
strncpy ( tile -> replay .tower_checkpt , config -> tiles .replay .tower_checkpt , sizeof (tile -> replay .tower_checkpt ) );
821
858
822
859
/* not specified by [tiles.replay] */
@@ -885,6 +922,8 @@ fd_topo_initialize( config_t * config ) {
885
922
886
923
} else if ( FD_UNLIKELY ( !strcmp ( tile -> name , "exec" ) ) ) {
887
924
strncpy ( tile -> exec .funk_file , config -> tiles .replay .funk_file , sizeof (tile -> exec .funk_file ) );
925
+ } else if ( FD_UNLIKELY ( !strcmp ( tile -> name , "writer" ) ) ) {
926
+ strncpy ( tile -> writer .funk_file , config -> tiles .replay .funk_file , sizeof (tile -> writer .funk_file ) );
888
927
} else if ( FD_UNLIKELY ( !strcmp ( tile -> name , "rstart" ) ) ) {
889
928
tile -> restart .in_wen_restart = config -> tiles .restart .in_wen_restart ;
890
929
strncpy ( tile -> restart .funk_file , config -> tiles .replay .funk_file , sizeof (tile -> replay .funk_file ) );
0 commit comments