@@ -272,22 +272,29 @@ impl ConsensusObserver {
272
272
) ;
273
273
274
274
if self . pipeline_enabled ( ) {
275
- for block in ordered_block. blocks ( ) {
276
- let commit_callback = self . active_observer_state . create_commit_callback (
277
- self . ordered_block_store . clone ( ) ,
278
- self . block_payload_store . clone ( ) ,
279
- ) ;
280
- if let Some ( futs) = self . get_parent_pipeline_futs ( block) {
281
- self . pipeline_builder ( ) . build ( block, futs, commit_callback) ;
282
- } else {
283
- warn ! (
275
+ let block = ordered_block. first_block ( ) ;
276
+ let mut parent_fut = if let Some ( futs) = self . get_parent_pipeline_futs ( & block) {
277
+ Some ( futs)
278
+ } else {
279
+ warn ! (
284
280
LogSchema :: new( LogEntry :: ConsensusObserver ) . message( & format!(
285
281
"Parent block's pipeline futures for ordered block is missing! Ignoring: {:?}" ,
286
282
ordered_block. proof_block_info( )
287
283
) )
288
284
) ;
289
- return ;
290
- }
285
+ return ;
286
+ } ;
287
+ for block in ordered_block. blocks ( ) {
288
+ let commit_callback = self . active_observer_state . create_commit_callback (
289
+ self . ordered_block_store . clone ( ) ,
290
+ self . block_payload_store . clone ( ) ,
291
+ ) ;
292
+ self . pipeline_builder ( ) . build (
293
+ block,
294
+ parent_fut. take ( ) . expect ( "future should be set" ) ,
295
+ commit_callback,
296
+ ) ;
297
+ parent_fut = Some ( block. pipeline_futs ( ) . expect ( "pipeline futures just built" ) ) ;
291
298
}
292
299
}
293
300
// Create the commit callback (to be called after the execution pipeline)
0 commit comments