Skip to content

Commit 16e9349

Browse files
committed
lint
1 parent bbed07c commit 16e9349

File tree

4 files changed

+88
-52
lines changed

4 files changed

+88
-52
lines changed

consensus/src/epoch_manager.rs

+23-22
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,16 @@
44

55
use crate::{
66
block_storage::{
7+
tracing::{observe_block, BlockStage},
78
BlockStore,
8-
tracing::{BlockStage, observe_block},
99
},
1010
counters,
1111
dag::{DagBootstrapper, DagCommitSigner, StorageAdapter},
12-
error::{DbError, error_kind},
12+
error::{error_kind, DbError},
1313
liveness::{
1414
cached_proposer_election::CachedProposerElection,
1515
leader_reputation::{
16-
AptosDBBackend, extract_epoch_to_proposers, LeaderReputation,
16+
extract_epoch_to_proposers, AptosDBBackend, LeaderReputation,
1717
ProposerAndVoterHeuristic, ReputationHeuristic,
1818
},
1919
proposal_generator::{
@@ -28,15 +28,12 @@ use crate::{
2828
metrics_safety_rules::MetricsSafetyRules,
2929
monitor,
3030
network::{
31-
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest,
32-
IncomingDAGRequest,
33-
IncomingRandGenRequest,
34-
IncomingRpcRequest,
35-
NetworkReceivers, NetworkSender,
31+
IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingDAGRequest,
32+
IncomingRandGenRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender,
3633
},
3734
network_interface::{ConsensusMsg, ConsensusNetworkClient},
3835
payload_client::{
39-
mixed::MixedPayloadClient, PayloadClient, user::quorum_store_client::QuorumStoreClient,
36+
mixed::MixedPayloadClient, user::quorum_store_client::QuorumStoreClient, PayloadClient,
4037
},
4138
payload_manager::PayloadManager,
4239
persistent_liveness_storage::{LedgerRecoveryData, PersistentLivenessStorage, RecoveryData},
@@ -54,7 +51,7 @@ use crate::{
5451
round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent},
5552
util::time_service::TimeService,
5653
};
57-
use anyhow::{anyhow, bail, Context, ensure};
54+
use anyhow::{anyhow, bail, ensure, Context};
5855
use aptos_bounded_executor::BoundedExecutor;
5956
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
6057
use aptos_config::config::{
@@ -66,7 +63,7 @@ use aptos_consensus_types::{
6663
epoch_retrieval::EpochRetrievalRequest,
6764
};
6865
use aptos_dkg::{
69-
pvss::{Player, traits::Transcript},
66+
pvss::{traits::Transcript, Player},
7067
weighted_vuf::traits::WeightedVUF,
7168
};
7269
use aptos_event_notifications::ReconfigNotificationListener;
@@ -79,27 +76,27 @@ use aptos_safety_rules::SafetyRulesManager;
7976
use aptos_secure_storage::{KVStorage, Storage};
8077
use aptos_types::{
8178
account_address::AccountAddress,
82-
dkg::{DefaultDKG, DKGState, DKGTrait, real_dkg::maybe_dk_from_bls_sk},
79+
dkg::{real_dkg::maybe_dk_from_bls_sk, DKGState, DKGTrait, DefaultDKG},
8380
epoch_change::EpochChangeProof,
8481
epoch_state::EpochState,
8582
on_chain_config::{
86-
Features, LeaderReputationType, OnChainConfigPayload, OnChainConfigProvider,
83+
FeatureFlag, Features, LeaderReputationType, OnChainConfigPayload, OnChainConfigProvider,
8784
OnChainConsensusConfig, OnChainExecutionConfig, ProposerElectionType, ValidatorSet,
8885
},
89-
randomness::{RandKeys, WVUF, WvufPP},
86+
randomness::{RandKeys, WvufPP, WVUF},
9087
};
9188
use aptos_validator_transaction_pool::VTxnPoolState;
9289
use fail::fail_point;
9390
use futures::{
9491
channel::{
9592
mpsc,
96-
mpsc::{Sender, unbounded, UnboundedSender},
93+
mpsc::{unbounded, Sender, UnboundedSender},
9794
oneshot,
9895
},
9996
SinkExt, StreamExt,
10097
};
10198
use itertools::Itertools;
102-
use rand::{prelude::StdRng, SeedableRng, thread_rng};
99+
use rand::{prelude::StdRng, thread_rng, SeedableRng};
103100
use std::{
104101
cmp::Ordering,
105102
collections::HashMap,
@@ -108,7 +105,6 @@ use std::{
108105
sync::Arc,
109106
time::Duration,
110107
};
111-
use aptos_types::on_chain_config::FeatureFlag;
112108

113109
/// Range of rounds (window) that we might be calling proposer election
114110
/// functions with at any given time, in addition to the proposer history length.
@@ -341,9 +337,9 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
341337
LeaderReputationType::ProposerAndVoter(proposer_and_voter_config)
342338
| LeaderReputationType::ProposerAndVoterV2(proposer_and_voter_config) => {
343339
let proposer_window_size = proposers.len()
344-
*proposer_and_voter_config.proposer_window_num_validators_multiplier;
340+
* proposer_and_voter_config.proposer_window_num_validators_multiplier;
345341
let voter_window_size = proposers.len()
346-
*proposer_and_voter_config.voter_window_num_validators_multiplier;
342+
* proposer_and_voter_config.voter_window_num_validators_multiplier;
347343
let heuristic: Box<dyn ReputationHeuristic> =
348344
Box::new(ProposerAndVoterHeuristic::new(
349345
self.author,
@@ -820,7 +816,8 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
820816

821817
let safety_rules_container = Arc::new(Mutex::new(safety_rules));
822818

823-
self.rand_manager_msg_tx = self.execution_client
819+
self.rand_manager_msg_tx = self
820+
.execution_client
824821
.start_epoch(
825822
epoch_state.clone(),
826823
safety_rules_container.clone(),
@@ -1027,7 +1024,10 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
10271024
epoch: payload.epoch(),
10281025
verifier: (&validator_set).into(),
10291026
});
1030-
debug!(epoch = epoch_state.epoch, "EpochManager::star_new_epoch() starting.");
1027+
debug!(
1028+
epoch = epoch_state.epoch,
1029+
"EpochManager::star_new_epoch() starting."
1030+
);
10311031

10321032
self.epoch_state = Some(epoch_state.clone());
10331033

@@ -1185,7 +1185,8 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
11851185
"decoupled execution must be enabled"
11861186
);
11871187

1188-
self.rand_manager_msg_tx = self.execution_client
1188+
self.rand_manager_msg_tx = self
1189+
.execution_client
11891190
.start_epoch(
11901191
epoch_state.clone(),
11911192
commit_signer,

consensus/src/lib.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -64,18 +64,17 @@ mod txn_hash_and_authenticator_deduper;
6464

6565
use aptos_config::config::SecureBackend;
6666
use aptos_consensus_types::common::Author;
67+
use aptos_global_constants::CONSENSUS_KEY;
6768
use aptos_metrics_core::IntGauge;
69+
use aptos_secure_storage::{KVStorage, Storage};
70+
use aptos_types::validator_signer::ValidatorSigner;
6871
pub use consensusdb::create_checkpoint;
6972
/// Required by the smoke tests
7073
pub use consensusdb::CONSENSUS_DB_NAME;
7174
pub use quorum_store::quorum_store_db::QUORUM_STORE_DB_NAME;
7275
#[cfg(feature = "fuzzing")]
7376
pub use round_manager::round_manager_fuzzing;
7477
use std::sync::Arc;
75-
use aptos_global_constants::CONSENSUS_KEY;
76-
use aptos_secure_storage::Storage;
77-
use aptos_types::validator_signer::ValidatorSigner;
78-
use aptos_secure_storage::KVStorage;
7978

8079
struct IntGaugeGuard {
8180
gauge: IntGauge,

consensus/src/pipeline/execution_client.rs

+59-22
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,33 @@
22
// Parts of the project are originally copyright © Meta Platforms, Inc.
33
// SPDX-License-Identifier: Apache-2.0
44

5-
use crate::{counters, error::StateSyncError, network::{IncomingCommitRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient}, new_signer_from_storage, payload_manager::PayloadManager, pipeline::{
6-
buffer_manager::{OrderedBlocks, ResetAck, ResetRequest, ResetSignal},
7-
decoupled_execution_utils::prepare_phases_and_buffer_manager,
8-
errors::Error,
9-
signing_phase::CommitSignerProvider,
10-
}, state_computer::ExecutionProxy, state_replication::{StateComputer, StateComputerCommitCallBackType}, transaction_deduper::create_transaction_deduper, transaction_shuffler::create_transaction_shuffler};
5+
use crate::{
6+
counters,
7+
error::StateSyncError,
8+
network::{IncomingCommitRequest, IncomingRandGenRequest, NetworkSender},
9+
network_interface::{ConsensusMsg, ConsensusNetworkClient},
10+
new_signer_from_storage,
11+
payload_manager::PayloadManager,
12+
pipeline::{
13+
buffer_manager::{OrderedBlocks, ResetAck, ResetRequest, ResetSignal},
14+
decoupled_execution_utils::prepare_phases_and_buffer_manager,
15+
errors::Error,
16+
signing_phase::CommitSignerProvider,
17+
},
18+
rand::rand_gen::{
19+
rand_manager::RandManager,
20+
storage::interface::RandStorage,
21+
types::{AugmentedData, RandConfig, Share},
22+
},
23+
state_computer::ExecutionProxy,
24+
state_replication::{StateComputer, StateComputerCommitCallBackType},
25+
transaction_deduper::create_transaction_deduper,
26+
transaction_shuffler::create_transaction_shuffler,
27+
};
1128
use anyhow::Result;
1229
use aptos_bounded_executor::BoundedExecutor;
1330
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
31+
use aptos_config::config::ConsensusConfig;
1432
use aptos_consensus_types::{common::Author, pipelined_block::PipelinedBlock};
1533
use aptos_executor_types::ExecutorResult;
1634
use aptos_infallible::RwLock;
@@ -29,11 +47,6 @@ use futures::{
2947
use futures_channel::mpsc::unbounded;
3048
use move_core_types::account_address::AccountAddress;
3149
use std::sync::Arc;
32-
use aptos_config::config::ConsensusConfig;
33-
use crate::network::IncomingRandGenRequest;
34-
use crate::rand::rand_gen::rand_manager::RandManager;
35-
use crate::rand::rand_gen::storage::interface::RandStorage;
36-
use crate::rand::rand_gen::types::{AugmentedData, RandConfig, Share};
3750

3851
#[async_trait::async_trait]
3952
pub trait TExecutionClient: Send + Sync {
@@ -102,7 +115,12 @@ impl BufferManagerHandle {
102115
self.reset_tx_to_rand_manager = reset_tx_to_rand_manager;
103116
}
104117

105-
pub fn reset(&mut self) -> (Option<UnboundedSender<ResetRequest>>, Option<UnboundedSender<ResetRequest>>) {
118+
pub fn reset(
119+
&mut self,
120+
) -> (
121+
Option<UnboundedSender<ResetRequest>>,
122+
Option<UnboundedSender<ResetRequest>>,
123+
) {
106124
let reset_tx_to_rand_manager = self.reset_tx_to_rand_manager.take();
107125
let reset_tx_to_buffer_manager = self.reset_tx_to_buffer_manager.take();
108126
self.execute_tx = None;
@@ -203,13 +221,23 @@ impl ExecutionProxyClient {
203221
self.bounded_executor.clone(),
204222
));
205223

206-
(ordered_block_tx, rand_ready_block_rx, Some(reset_tx_to_rand_manager), Some(rand_msg_tx))
224+
(
225+
ordered_block_tx,
226+
rand_ready_block_rx,
227+
Some(reset_tx_to_rand_manager),
228+
Some(rand_msg_tx),
229+
)
207230
} else {
208231
let (ordered_block_tx, ordered_block_rx) = unbounded();
209232
(ordered_block_tx, ordered_block_rx, None, None)
210233
};
211234

212-
self.handle.write().init(execution_ready_block_tx, commit_msg_tx, reset_buffer_manager_tx, maybe_reset_tx_to_rand_manager);
235+
self.handle.write().init(
236+
execution_ready_block_tx,
237+
commit_msg_tx,
238+
reset_buffer_manager_tx,
239+
maybe_reset_tx_to_rand_manager,
240+
);
213241

214242
let (
215243
execution_schedule_phase,
@@ -250,7 +278,11 @@ impl TExecutionClient for ExecutionProxyClient {
250278
features: &Features,
251279
rand_config: Option<RandConfig>,
252280
) -> Option<aptos_channel::Sender<AccountAddress, IncomingRandGenRequest>> {
253-
let maybe_rand_msg_tx = self.spawn_decoupled_execution(commit_signer_provider, epoch_state.clone(), rand_config);
281+
let maybe_rand_msg_tx = self.spawn_decoupled_execution(
282+
commit_signer_provider,
283+
epoch_state.clone(),
284+
rand_config,
285+
);
254286

255287
let transaction_shuffler =
256288
create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type());
@@ -339,15 +371,20 @@ impl TExecutionClient for ExecutionProxyClient {
339371

340372
let (reset_tx_to_rand_manager, reset_tx_to_buffer_manager) = {
341373
let handle = self.handle.read();
342-
(handle.reset_tx_to_rand_manager.clone(), handle.reset_tx_to_buffer_manager.clone())
374+
(
375+
handle.reset_tx_to_rand_manager.clone(),
376+
handle.reset_tx_to_buffer_manager.clone(),
377+
)
343378
};
344379

345380
if let Some(mut reset_tx) = reset_tx_to_rand_manager {
346381
let (ack_tx, ack_rx) = oneshot::channel::<ResetAck>();
347-
reset_tx.send(ResetRequest {
348-
tx: ack_tx,
349-
signal: ResetSignal::TargetRound(target.commit_info().round()),
350-
}).await
382+
reset_tx
383+
.send(ResetRequest {
384+
tx: ack_tx,
385+
signal: ResetSignal::TargetRound(target.commit_info().round()),
386+
})
387+
.await
351388
.map_err(|_| Error::RandResetDropped)?;
352389
ack_rx.await.map_err(|_| Error::RandResetDropped)?;
353390
}
@@ -383,8 +420,8 @@ impl TExecutionClient for ExecutionProxyClient {
383420
tx: ack_tx,
384421
signal: ResetSignal::Stop,
385422
})
386-
.await
387-
.expect("[EpochManager] Fail to drop rand manager");
423+
.await
424+
.expect("[EpochManager] Fail to drop rand manager");
388425
ack_rx
389426
.await
390427
.expect("[EpochManager] Fail to drop rand manager");

consensus/src/test_utils/mock_execution_client.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@
44

55
use crate::{
66
error::StateSyncError,
7-
network::IncomingCommitRequest,
7+
network::{IncomingCommitRequest, IncomingRandGenRequest},
88
payload_manager::PayloadManager,
99
pipeline::{
1010
buffer_manager::OrderedBlocks, execution_client::TExecutionClient,
1111
signing_phase::CommitSignerProvider,
1212
},
13+
rand::rand_gen::types::RandConfig,
1314
state_replication::StateComputerCommitCallBackType,
1415
test_utils::mock_storage::MockStorage,
1516
};
1617
use anyhow::{format_err, Result};
18+
use aptos_channels::aptos_channel;
1719
use aptos_consensus_types::{common::Payload, pipelined_block::PipelinedBlock};
1820
use aptos_crypto::HashValue;
1921
use aptos_executor_types::ExecutorResult;
@@ -29,9 +31,6 @@ use futures::{channel::mpsc, SinkExt};
2931
use futures_channel::mpsc::UnboundedSender;
3032
use move_core_types::account_address::AccountAddress;
3133
use std::{collections::HashMap, sync::Arc};
32-
use aptos_channels::aptos_channel;
33-
use crate::network::IncomingRandGenRequest;
34-
use crate::rand::rand_gen::types::RandConfig;
3534

3635
pub struct MockExecutionClient {
3736
state_sync_client: mpsc::UnboundedSender<Vec<SignedTransaction>>,

0 commit comments

Comments
 (0)