Skip to content

Commit 6770793

Browse files
committed
shred: notify replay tile of completed shreds
1 parent 7c6531b commit 6770793

File tree

5 files changed

+125
-14
lines changed

5 files changed

+125
-14
lines changed

src/app/fdctl/run/tiles/fd_replay.c

+58-1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
#define PACK_IN_IDX (1UL)
6060
#define GOSSIP_IN_IDX (2UL)
6161
#define BATCH_IN_IDX (3UL)
62+
#define SHRED_IN_IDX (4UL)
6263

6364
#define STAKE_OUT_IDX (0UL)
6465
#define NOTIF_OUT_IDX (1UL)
@@ -120,6 +121,11 @@ struct fd_replay_tile_ctx {
120121
ulong batch_in_chunk0;
121122
ulong batch_in_wmark;
122123

124+
// Shred tile input
125+
fd_wksp_t * shred_in_mem;
126+
ulong shred_in_chunk0;
127+
ulong shred_in_wmark;
128+
123129
// Notification output defs
124130
fd_frag_meta_t * notif_out_mcache;
125131
ulong * notif_out_sync;
@@ -443,6 +449,40 @@ publish_stake_weights( fd_replay_tile_ctx_t * ctx,
443449
}
444450
}
445451

452+
static int
453+
before_frag( fd_replay_tile_ctx_t * ctx,
454+
ulong in_idx,
455+
ulong seq,
456+
ulong sig ) {
457+
(void)ctx;
458+
(void)seq;
459+
460+
if( in_idx == SHRED_IN_IDX ){
461+
/* NOTE: If we filter on every fec set thats sent to us from the shred
462+
tile, there is going to be contention if we query the block map every
463+
shred in order to filter them. */
464+
465+
ulong slot = fd_disco_shred_sig_slot( sig );
466+
uchar block_flags = 0;
467+
int err = FD_MAP_ERR_AGAIN;
468+
while( err == FD_MAP_ERR_AGAIN ){
469+
fd_block_map_query_t quer[1] = { 0 };
470+
err = fd_block_map_query_try( ctx->blockstore->block_map, &slot, NULL, quer, 0 );
471+
fd_block_meta_t * block_map_entry = fd_block_map_query_ele( quer );
472+
if( FD_UNLIKELY( err == FD_MAP_ERR_AGAIN ) ) continue;
473+
if( FD_UNLIKELY( err == FD_MAP_ERR_KEY )) break;
474+
block_flags = block_map_entry->flags;
475+
err = fd_block_map_query_test( quer );
476+
}
477+
478+
if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_PROCESSED ) ) ) {
479+
FD_LOG_INFO(( "before_frag filtering: block already processed - slot: %lu", slot ));
480+
return -1; /* skip this frag */
481+
}
482+
}
483+
return 0;
484+
}
485+
446486
static void
447487
during_frag( fd_replay_tile_ctx_t * ctx,
448488
ulong in_idx,
@@ -481,6 +521,17 @@ during_frag( fd_replay_tile_ctx_t * ctx,
481521
fd_memcpy( dst_poh, src, sz * sizeof(fd_txn_p_t) );
482522

483523
FD_LOG_INFO(( "other microblock - slot: %lu, parent_slot: %lu, txn_cnt: %lu", ctx->curr_slot, ctx->parent_slot, sz ));
524+
} else if ( in_idx == SHRED_IN_IDX) {
525+
if( FD_UNLIKELY( sz != 0 ) ) {
526+
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->shred_in_chunk0, ctx->shred_in_wmark ));
527+
}
528+
ulong slot = fd_disco_shred_sig_slot( sig );
529+
ulong parent_off = fd_disco_shred_sig_parent_off( sig );
530+
ulong flags = fd_disco_shred_sig_flags( sig );
531+
532+
FD_LOG_INFO(( "Notified of complete FEC set from the shred tile: slot: %lu, flags: %lu, parent_slot: %lu", slot, flags, slot - parent_off ));
533+
ctx->skip_frag = 1;
534+
return;
484535
} else if( in_idx == PACK_IN_IDX ) {
485536
if( FD_UNLIKELY( chunk<ctx->pack_in_chunk0 || chunk>ctx->pack_in_wmark || sz>USHORT_MAX ) ) {
486537
FD_LOG_ERR(( "chunk %lu %lu corrupt, not in range [%lu,%lu]", chunk, sz, ctx->pack_in_chunk0, ctx->pack_in_wmark ));
@@ -557,7 +608,7 @@ during_frag( fd_replay_tile_ctx_t * ctx,
557608
}
558609

559610
if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_PROCESSED ) ) ) {
560-
FD_LOG_WARNING(( "block already processed - slot: %lu", ctx->curr_slot ));
611+
FD_LOG_WARNING(( "block already processed - slot: %lu, from %lu", ctx->curr_slot, in_idx ));
561612
ctx->skip_frag = 1;
562613
}
563614
if( FD_UNLIKELY( fd_uchar_extract_bit( block_flags, FD_BLOCK_FLAG_DEADBLOCK ) ) ) {
@@ -3001,6 +3052,11 @@ unprivileged_init( fd_topo_t * topo,
30013052
ctx->batch_in_chunk0 = fd_dcache_compact_chunk0( ctx->batch_in_mem, batch_in_link->dcache );
30023053
ctx->batch_in_wmark = fd_dcache_compact_wmark( ctx->batch_in_mem, batch_in_link->dcache, batch_in_link->mtu );
30033054

3055+
fd_topo_link_t * shred_in_link = &topo->links[ tile->in_link_id[ SHRED_IN_IDX ] ];
3056+
ctx->shred_in_mem = topo->workspaces[ topo->objs[ shred_in_link->dcache_obj_id ].wksp_id ].wksp;
3057+
ctx->shred_in_chunk0 = fd_dcache_compact_chunk0( ctx->shred_in_mem, shred_in_link->dcache );
3058+
ctx->shred_in_wmark = fd_dcache_compact_wmark( ctx->shred_in_mem, shred_in_link->dcache, shred_in_link->mtu );
3059+
30043060
fd_topo_link_t * notif_out = &topo->links[ tile->out_link_id[ NOTIF_OUT_IDX ] ];
30053061
ctx->notif_out_mcache = notif_out->mcache;
30063062
ctx->notif_out_sync = fd_mcache_seq_laddr( ctx->notif_out_mcache );
@@ -3130,6 +3186,7 @@ metrics_write( fd_replay_tile_ctx_t * ctx ) {
31303186

31313187
#define STEM_CALLBACK_DURING_HOUSEKEEPING during_housekeeping
31323188
#define STEM_CALLBACK_AFTER_CREDIT after_credit
3189+
#define STEM_CALLBACK_BEFORE_FRAG before_frag
31333190
#define STEM_CALLBACK_DURING_FRAG during_frag
31343191
#define STEM_CALLBACK_AFTER_FRAG after_frag
31353192
#define STEM_CALLBACK_METRICS_WRITE metrics_write

src/app/fdctl/run/tiles/fd_shred.c

+45-12
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
#define STORE_OUT_IDX 0
9292
#define NET_OUT_IDX 1
9393
#define SIGN_OUT_IDX 2
94+
#define MULTI_OUT_IDX 3
9495

9596
#define MAX_SLOTS_PER_EPOCH 432000UL
9697

@@ -180,6 +181,11 @@ typedef struct {
180181
ulong store_out_wmark;
181182
ulong store_out_chunk;
182183

184+
fd_wksp_t * replay_out_mem;
185+
ulong replay_out_chunk0;
186+
ulong replay_out_wmark;
187+
ulong replay_out_chunk;
188+
183189
struct {
184190
fd_histf_t contact_info_cnt[ 1 ];
185191
fd_histf_t batch_sz[ 1 ];
@@ -523,6 +529,7 @@ send_shred( fd_shred_ctx_t * ctx,
523529
ctx->net_out_chunk = fd_dcache_compact_next( ctx->net_out_chunk, pkt_sz, ctx->net_out_chunk0, ctx->net_out_wmark );
524530
}
525531

532+
526533
static void
527534
after_frag( fd_shred_ctx_t * ctx,
528535
ulong in_idx,
@@ -554,11 +561,12 @@ after_frag( fd_shred_ctx_t * ctx,
554561
const ulong fanout = 200UL;
555562
fd_shred_dest_idx_t _dests[ 200*(FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX) ];
556563

557-
if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
558-
uchar * shred_buffer = ctx->shred_buffer;
559-
ulong shred_buffer_sz = ctx->shred_buffer_sz;
564+
uchar * shred_buffer = ctx->shred_buffer;
565+
ulong shred_buffer_sz = ctx->shred_buffer_sz;
560566

561-
fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz );
567+
fd_shred_t const * shred = fd_shred_parse( shred_buffer, shred_buffer_sz );
568+
569+
if( FD_LIKELY( ctx->in_kind[ in_idx ]==IN_KIND_NET ) ) {
562570
if( FD_UNLIKELY( !shred ) ) { ctx->metrics->shred_processing_result[ 1 ]++; return; }
563571

564572
fd_epoch_leaders_t const * lsched = fd_stake_ci_get_lsched_for_slot( ctx->stake_ci, shred->slot );
@@ -633,14 +641,19 @@ after_frag( fd_shred_ctx_t * ctx,
633641
/* Send to the blockstore, skipping any empty shred34_t s. */
634642
ulong new_sig = ctx->in_kind[ in_idx ]!=IN_KIND_NET; /* sig==0 means the store tile will do extra checks */
635643
ulong tspub = fd_frag_meta_ts_comp( fd_tickcount() );
636-
fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+0UL ), sz0, 0UL, ctx->tsorig, tspub );
644+
fd_stem_publish( stem, STORE_OUT_IDX, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+0UL ), sz0, 0UL, ctx->tsorig, tspub );
637645
if( FD_UNLIKELY( s34[ 1 ].shred_cnt ) )
638-
fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+1UL ), sz1, 0UL, ctx->tsorig, tspub );
639-
fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+2UL), sz2, 0UL, ctx->tsorig, tspub );
646+
fd_stem_publish( stem, STORE_OUT_IDX, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+1UL ), sz1, 0UL, ctx->tsorig, tspub );
647+
fd_stem_publish( stem, STORE_OUT_IDX, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+2UL), sz2, 0UL, ctx->tsorig, tspub );
640648
if( FD_UNLIKELY( s34[ 3 ].shred_cnt ) )
641-
fd_stem_publish( stem, 0UL, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+3UL ), sz3, 0UL, ctx->tsorig, tspub );
642-
643-
649+
fd_stem_publish( stem, STORE_OUT_IDX, new_sig, fd_laddr_to_chunk( ctx->store_out_mem, s34+3UL ), sz3, 0UL, ctx->tsorig, tspub );
650+
651+
/* Send to replay tile */
652+
#if FD_HAS_NO_AGAVE
653+
fd_shred_t const * last_data_shred = (fd_shred_t const *)set->data_shreds[ set->data_shred_cnt-1 ];
654+
ulong replay_sig = fd_disco_shred_sig( shred->slot, last_data_shred->data.parent_off, last_data_shred->data.flags, 1 );
655+
fd_stem_publish( stem, MULTI_OUT_IDX, replay_sig, ctx->replay_out_chunk, 0, 0UL, ctx->tsorig, tspub );
656+
#endif
644657
/* Compute all the destinations for all the new shreds */
645658

646659
fd_shred_t const * new_shreds[ FD_REEDSOL_DATA_SHREDS_MAX+FD_REEDSOL_PARITY_SHREDS_MAX ];
@@ -697,11 +710,18 @@ unprivileged_init( fd_topo_t * topo,
697710
fd_topo_tile_t * tile ) {
698711
void * scratch = fd_topo_obj_laddr( topo, tile->tile_obj_id );
699712

700-
if( FD_UNLIKELY( tile->out_cnt!=3UL ||
713+
#if FD_HAS_NO_AGAVE
714+
int check_multi_out_link = strcmp( topo->links[ tile->out_link_id[ MULTI_OUT_IDX ] ].name, "shred_multi" );
715+
#else
716+
int check_multi_out_link = 0;
717+
#endif
718+
719+
if( FD_UNLIKELY( tile->out_cnt<3UL ||
701720
(strcmp( topo->links[ tile->out_link_id[ STORE_OUT_IDX ] ].name, "shred_store" ) &&
702721
strcmp( topo->links[ tile->out_link_id[ STORE_OUT_IDX ] ].name, "shred_storei" ) ) ||
703722
strcmp( topo->links[ tile->out_link_id[ NET_OUT_IDX ] ].name, "shred_net" ) ||
704-
strcmp( topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ].name, "shred_sign" ) ) )
723+
strcmp( topo->links[ tile->out_link_id[ SIGN_OUT_IDX ] ].name, "shred_sign" ) ||
724+
check_multi_out_link ) )
705725
FD_LOG_ERR(( "shred tile has none or unexpected output links %lu %s %s",
706726
tile->out_cnt, topo->links[ tile->out_link_id[ 0 ] ].name, topo->links[ tile->out_link_id[ 1 ] ].name ));
707727

@@ -859,6 +879,19 @@ unprivileged_init( fd_topo_t * topo,
859879
ctx->store_out_wmark = fd_dcache_compact_wmark ( ctx->store_out_mem, store_out->dcache, store_out->mtu );
860880
ctx->store_out_chunk = ctx->store_out_chunk0;
861881

882+
#if FD_HAS_NO_AGAVE
883+
fd_topo_link_t * replay_out = &topo->links[ tile->out_link_id[ MULTI_OUT_IDX ] ];
884+
ctx->replay_out_mem = topo->workspaces[ topo->objs[ replay_out->dcache_obj_id ].wksp_id ].wksp;
885+
ctx->replay_out_chunk0 = fd_dcache_compact_chunk0( ctx->replay_out_mem, replay_out->dcache );
886+
ctx->replay_out_wmark = fd_dcache_compact_wmark ( ctx->replay_out_mem, replay_out->dcache, replay_out->mtu );
887+
ctx->replay_out_chunk = ctx->replay_out_chunk0;
888+
#else
889+
ctx->replay_out_mem = NULL;
890+
ctx->replay_out_chunk0 = 0;
891+
ctx->replay_out_wmark = 0;
892+
ctx->replay_out_chunk = 0;
893+
#endif
894+
862895
ctx->poh_in_expect_seq = 0UL;
863896

864897
ctx->shredder_fec_set_idx = 0UL;

src/app/fdctl/run/tiles/fd_store_int.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ fd_store_tile_slot_prepare( fd_store_tile_ctx_t * ctx,
451451
ulong repair_req_sz = repair_req_cnt * sizeof(fd_repair_request_t);
452452
FD_TEST( repair_req_sz<=USHORT_MAX );
453453
fd_mcache_publish( ctx->repair_req_out_mcache, ctx->repair_req_out_depth, ctx->repair_req_out_seq, repair_req_sig, ctx->repair_req_out_chunk,
454-
repair_req_sz, 0UL, tsorig, tspub );
454+
repair_req_sz, 0UL, tsorig, tspub );
455455
ctx->repair_req_out_seq = fd_seq_inc( ctx->repair_req_out_seq, 1UL );
456456
ctx->repair_req_out_chunk = fd_dcache_compact_next( ctx->repair_req_out_chunk, repair_req_sz, ctx->repair_req_out_chunk0, ctx->repair_req_out_wmark );
457457
}

src/app/fdctl/run/topos/fd_firedancer.c

+4
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ fd_topo_initialize( config_t * config ) {
9797

9898
fd_topob_wksp( topo, "shred_sign" );
9999
fd_topob_wksp( topo, "sign_shred" );
100+
fd_topob_wksp( topo, "shred_multi" );
100101

101102
fd_topob_wksp( topo, "gossip_sign" );
102103
fd_topob_wksp( topo, "sign_gossip" );
@@ -175,6 +176,7 @@ fd_topo_initialize( config_t * config ) {
175176
FOR(shred_tile_cnt) fd_topob_link( topo, "shred_sign", "shred_sign", 128UL, 32UL, 1UL );
176177
FOR(shred_tile_cnt) fd_topob_link( topo, "sign_shred", "sign_shred", 128UL, 64UL, 1UL );
177178
179+
/**/ fd_topob_link( topo, "shred_multi", "shred_multi", 128UL, sizeof(ulong), 1UL );
178180
/**/ fd_topob_link( topo, "gossip_sign", "gossip_sign", 128UL, 2048UL, 1UL );
179181
/**/ fd_topob_link( topo, "sign_gossip", "sign_gossip", 128UL, 64UL, 1UL );
180182
/**/ fd_topob_link( topo, "gossip_repla", "gossip_repla", 128UL, 4UL + 128UL + 8192UL, 1UL );
@@ -408,6 +410,7 @@ fd_topo_initialize( config_t * config ) {
408410
/**/ fd_topob_tile_in( topo, "shred", i, "metric_in", "sign_shred", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_UNPOLLED );
409411
/**/ fd_topob_tile_out( topo, "sign", 0UL, "sign_shred", i );
410412
}
413+
FOR(shred_tile_cnt) fd_topob_tile_out( topo, "shred", i, "shred_multi", i );
411414

412415
FOR(net_tile_cnt) fd_topob_tile_out( topo, "net", i, "net_gossip", i );
413416
FOR(net_tile_cnt) fd_topob_tile_in( topo, "gossip", 0UL, "metric_in", "net_gossip", i, FD_TOPOB_UNRELIABLE, FD_TOPOB_POLLED ); /* No reliable consumers of networking fragments, may be dropped or overrun */
@@ -439,6 +442,7 @@ fd_topo_initialize( config_t * config ) {
439442
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_voter", 0UL );
440443
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_gossi", 0UL );
441444
/**/ fd_topob_tile_out( topo, "replay", 0UL, "replay_store", 0UL );
445+
/**/ fd_topob_tile_in( topo, "replay", 0UL, "metric_in", "shred_multi", 0UL, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );
442446
FOR(bank_tile_cnt) fd_topob_tile_out( topo, "replay", 0UL, "replay_poh", i );
443447
FOR(exec_tile_cnt) fd_topob_tile_out( topo, "replay", 0UL, "replay_exec", i ); /* TODO check order in fd_replay.c macros*/
444448
FOR(exec_tile_cnt) fd_topob_tile_in( topo, "exec", i, "metric_in", "replay_exec", i, FD_TOPOB_RELIABLE, FD_TOPOB_POLLED );

src/disco/fd_disco_base.h

+17
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,23 @@ fd_disco_replay_sig( ulong slot,
121121
FD_FN_CONST static inline ulong fd_disco_replay_sig_flags( ulong sig ) { return (sig & 0xFFUL); }
122122
FD_FN_CONST static inline ulong fd_disco_replay_sig_slot( ulong sig ) { return (sig >> 8); }
123123

124+
FD_FN_CONST static inline ulong
125+
fd_disco_shred_sig( ulong slot,
126+
ushort parent_off,
127+
uchar shred_flags,
128+
int is_turbine ) {
129+
/* lower 32 bits of the slot
130+
full 16 bits of parent offset
131+
full 8 bits of data shred flags
132+
8 bits of extra flags */
133+
return (slot << 32) | ((ulong)parent_off << 16) | ((ulong) shred_flags << 8) | ( is_turbine ? 1UL : 0UL );
134+
}
135+
136+
FD_FN_CONST static inline ulong fd_disco_shred_sig_slot( ulong sig ) { return (sig >> 32); }
137+
FD_FN_CONST static inline ushort fd_disco_shred_sig_parent_off( ulong sig ) { return (ushort)(sig >> 16); }
138+
FD_FN_CONST static inline uchar fd_disco_shred_sig_flags( ulong sig ) { return (uchar)(sig >> 8); }
139+
FD_FN_CONST static inline int fd_disco_shred_sig_is_turbine( ulong sig ) { return (int)(sig & 1UL); }
140+
124141
FD_FN_PURE static inline ulong
125142
fd_disco_compact_chunk0( void * wksp ) {
126143
return (((struct fd_wksp_private *)wksp)->gaddr_lo) >> FD_CHUNK_LG_SZ;

0 commit comments

Comments
 (0)