Skip to content

Commit 33529b3

Browse files
authored
Separated disseminnation layer: working, but without the reputation system (#1)
1 parent 90bc228 commit 33529b3

13 files changed

+566
-306
lines changed

raikou/Cargo.toml

+3
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ rust-version.workspace = true
1212
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
1313

1414
[dependencies]
15+
aptos-consensus = { workspace = true }
16+
aptos-consensus-types = { workspace = true }
17+
aptos-crypto = { workspace = true }
1518
bitvec = { workspace = true }
1619
defaultmap = "0.5.0" # Since version 0.6.0, the maps are !Send and !Sync
1720
env_logger = { workspace = true }

raikou/src/delays.rs

+15-18
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{framework::NodeId, multichain};
1+
use crate::framework::NodeId;
22
use rand::{rngs::SmallRng, thread_rng, Rng, SeedableRng};
33
use rand_distr::Distribution;
44
use std::{
@@ -8,24 +8,21 @@ use std::{
88
sync::Arc,
99
};
1010

11-
pub trait DelayFunction<M>: Fn(NodeId, NodeId, &M) -> f64 + Clone + Send + Sync + 'static {}
11+
pub trait DelayFunction: Fn(NodeId, NodeId) -> f64 + Clone + Send + Sync + 'static {}
1212

13-
impl<M, F> DelayFunction<M> for F where
14-
F: Fn(NodeId, NodeId, &M) -> f64 + Clone + Send + Sync + 'static
15-
{
16-
}
13+
impl<F> DelayFunction for F where F: Fn(NodeId, NodeId) -> f64 + Clone + Send + Sync + 'static {}
1714

18-
pub fn uniformly_random_delay<M>(
15+
pub fn uniformly_random_delay(
1916
distr: impl Distribution<f64> + Send + Sync + Copy + 'static,
20-
) -> impl DelayFunction<M> {
21-
move |_from, _to, _msg| thread_rng().sample(distr)
17+
) -> impl DelayFunction {
18+
move |_from, _to| thread_rng().sample(distr)
2219
}
2320

24-
pub fn spacial_delay_2d<M>(
21+
pub fn spacial_delay_2d(
2522
max_distr: impl Distribution<f64> + Send + Sync + Copy + 'static,
26-
) -> impl DelayFunction<M> {
23+
) -> impl DelayFunction {
2724
let sqrt2 = f64::sqrt(2.);
28-
move |from: NodeId, to, _msg| {
25+
move |from: NodeId, to| {
2926
let from_coordinate = coordinate_2d_from_hash(from);
3027
let to_coordinate = coordinate_2d_from_hash(to);
3128
distance_2d(from_coordinate, to_coordinate) / sqrt2 * thread_rng().sample(max_distr)
@@ -35,12 +32,12 @@ pub fn spacial_delay_2d<M>(
3532
/// `base_distr` is sampled once per pair of nodes.
3633
/// `mul_noise_distr` and `add_noise_distr` are sampled for each message.
3734
/// The delay is computed as `base * mul_noise + add_noise`.
38-
pub fn heterogeneous_symmetric_delay<M>(
35+
pub fn heterogeneous_symmetric_delay(
3936
link_base_distr: impl Distribution<f64> + Send + Sync + Copy + 'static,
4037
mul_noise_distr: impl Distribution<f64> + Send + Sync + Copy + 'static,
4138
add_noise_distr: impl Distribution<f64> + Send + Sync + Copy + 'static,
42-
) -> impl DelayFunction<M> {
43-
move |from: NodeId, to: NodeId, _msg| {
39+
) -> impl DelayFunction {
40+
move |from: NodeId, to: NodeId| {
4441
let mut base_seed = [0; 16];
4542
base_seed[..8].copy_from_slice(&hash((min(from, to), max(from, to))).to_le_bytes());
4643
let mut base_rng = SmallRng::from_seed(base_seed);
@@ -52,11 +49,11 @@ pub fn heterogeneous_symmetric_delay<M>(
5249
}
5350
}
5451

55-
pub fn clustered_delay<M>(
52+
pub fn clustered_delay(
5653
within_cluster_distr: impl Distribution<f64> + Send + Sync + Copy + 'static,
5754
between_cluster_distr: impl Distribution<f64> + Send + Sync + Copy + 'static,
5855
clusters: Vec<Vec<NodeId>>,
59-
) -> impl DelayFunction<M> {
56+
) -> impl DelayFunction {
6057
// Perform a sanity check that no node is missing or present in multiple clusters.
6158
let max_id = clusters.iter().flatten().max().unwrap();
6259
let n_nodes = clusters.iter().map(|cluster| cluster.len()).sum::<usize>();
@@ -72,7 +69,7 @@ pub fn clustered_delay<M>(
7269
.collect(),
7370
);
7471

75-
move |from: NodeId, to: NodeId, _msg| {
72+
move |from: NodeId, to: NodeId| {
7673
let from_cluster = clusters.get(&from).unwrap();
7774
let to_cluster = clusters.get(&to).unwrap();
7875
if from_cluster == to_cluster {

raikou/src/framework/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ where
2121
}
2222

2323
pub trait Protocol: Send + Sync {
24-
type Message: Send + Sync;
24+
type Message: Clone + Send + Sync;
2525
type TimerEvent: Send + Sync;
2626

2727
fn start_handler<Ctx>(&mut self, ctx: &mut Ctx) -> impl Future<Output = ()> + Send
@@ -50,7 +50,7 @@ pub trait Protocol: Send + Sync {
5050
Ctx: ContextFor<Self>;
5151

5252
fn run_ctx<Ctx>(
53-
protocol: &tokio::sync::Mutex<Self>,
53+
protocol: Arc<tokio::sync::Mutex<Self>>,
5454
ctx: &mut Ctx,
5555
) -> impl Future<Output = ()> + Send
5656
where
@@ -81,7 +81,7 @@ pub trait Protocol: Send + Sync {
8181
}
8282

8383
fn run<NS, TS>(
84-
protocol: &tokio::sync::Mutex<Self>,
84+
protocol: Arc<tokio::sync::Mutex<Self>>,
8585
node_id: NodeId,
8686
network_service: NS,
8787
timer: TS,

raikou/src/framework/timer.rs

+10
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ impl<E: Send + Sync> TimerService for LocalTimerService<E> {
4444
}
4545
}
4646

47+
#[derive(Clone)]
4748
pub struct InjectedTimerService<U, I> {
4849
underlying: U,
4950
injection: I,
@@ -53,6 +54,15 @@ pub trait TimerInjection<E>: Fn(Duration, E) -> (Duration, E) + Send + Sync + 's
5354

5455
impl<I, E> TimerInjection<E> for I where I: Fn(Duration, E) -> (Duration, E) + Send + Sync + 'static {}
5556

57+
pub fn clock_skew_injection<E>(clock_speed: f64) -> impl TimerInjection<E> {
58+
move |duration, event| {
59+
(
60+
Duration::from_secs_f64(duration.as_secs_f64() / clock_speed),
61+
event,
62+
)
63+
}
64+
}
65+
5666
impl<U, I> InjectedTimerService<U, I>
5767
where
5868
U: TimerService,

raikou/src/main.rs

+51-29
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
use crate::{
2-
delays::{heterogeneous_symmetric_delay, spacial_delay_2d, DelayFunction},
2+
delays::{heterogeneous_symmetric_delay, DelayFunction},
33
framework::{
44
network::{InjectedLocalNetwork, Network, NetworkInjection},
5-
timer::InjectedTimerService,
5+
timer::{clock_skew_injection, InjectedTimerService},
66
NodeId, Protocol,
77
},
88
leader_schedule::round_robin,
99
multichain::{Config, MultiChainBft},
10+
raikou::{dissemination, dissemination::fake::FakeDisseminationLayer},
1011
};
1112
use rand::{seq::SliceRandom, thread_rng, Rng};
1213
use std::{collections::BTreeMap, iter, sync::Arc, time::Duration};
@@ -29,7 +30,7 @@ const JOLTEON_TIMEOUT: u32 = 3; // in Deltas
2930

3031
// TODO: generalize this to any protocol
3132
fn multichain_network_injection(
32-
delay_function: impl DelayFunction<multichain::Message>,
33+
delay_function: impl DelayFunction,
3334
crashes: Vec<(NodeId, Slot)>,
3435
) -> impl NetworkInjection<multichain::Message> {
3536
let crashes = Arc::new(BTreeMap::from_iter(crashes));
@@ -52,29 +53,29 @@ fn multichain_network_injection(
5253
}
5354
}
5455

55-
let delay = f64::max(delay_function(from, to, &message), 0.);
56+
let delay = f64::max(delay_function(from, to), 0.);
5657
tokio::time::sleep(Duration::from_secs_f64(delay)).await;
5758
Some(message)
5859
}
5960
}
6061
}
6162
fn network_injection<M: Send>(
62-
delay_function: impl DelayFunction<M>,
63+
delay_function: impl DelayFunction,
6364
// crashes: Vec<(NodeId, Instant)>,
6465
) -> impl NetworkInjection<M> {
6566
move |from, to, message| {
6667
let delay_function = delay_function.clone();
6768

6869
async move {
69-
let delay = f64::max(delay_function(from, to, &message), 0.);
70+
let delay = f64::max(delay_function(from, to), 0.);
7071
tokio::time::sleep(Duration::from_secs_f64(delay)).await;
7172
Some(message)
7273
}
7374
}
7475
}
7576

7677
async fn test_multichain(
77-
delay_function: impl DelayFunction<multichain::Message>,
78+
delay_function: impl DelayFunction,
7879
n_nodes: usize,
7980
slots_per_delta: Slot,
8081
delta: f64,
@@ -158,7 +159,7 @@ async fn test_multichain(
158159
// network_service.clear_inbox().await;
159160

160161
// println!("Spawning node {node_id}");
161-
let node = tokio::sync::Mutex::new(MultiChainBft::new_node(
162+
let node = Arc::new(tokio::sync::Mutex::new(MultiChainBft::new_node(
162163
node_id,
163164
config,
164165
start_time,
@@ -169,10 +170,10 @@ async fn test_multichain(
169170
batch_commit_time: batch_commit_time_sender,
170171
indirectly_committed_slots: indirectly_committed_slots_sender,
171172
},
172-
));
173+
)));
173174

174175
semaphore.add_permits(1);
175-
Protocol::run(&node, node_id, network_service, timer).await
176+
Protocol::run(node, node_id, network_service, timer).await
176177
}));
177178
}
178179

@@ -235,7 +236,7 @@ async fn test_multichain(
235236
}
236237

237238
async fn test_multichain_with_random_crashes(
238-
delay_function: impl DelayFunction<multichain::Message>,
239+
delay_function: impl DelayFunction,
239240
n_nodes: usize,
240241
slots_per_delta: Slot,
241242
delta: f64,
@@ -268,7 +269,7 @@ async fn test_multichain_with_random_crashes(
268269
}
269270

270271
async fn test_multichain_with_consecutive_faulty_leaders_in_a_chain(
271-
delay_function: impl DelayFunction<multichain::Message>,
272+
delay_function: impl DelayFunction,
272273
n_nodes: usize,
273274
slots_per_delta: Slot,
274275
delta: f64,
@@ -298,7 +299,7 @@ async fn test_multichain_with_consecutive_faulty_leaders_in_a_chain(
298299
}
299300

300301
async fn test_jolteon(
301-
delay_function: impl DelayFunction<jolteon::Message<()>>,
302+
delay_function: impl DelayFunction,
302303
n_nodes: usize,
303304
delta: f64,
304305
warmup_duration_in_delta: u32,
@@ -377,16 +378,16 @@ async fn test_jolteon(
377378
let (_, txns_receiver) = mpsc::channel::<()>(100);
378379

379380
// println!("Spawning node {node_id}");
380-
let node = tokio::sync::Mutex::new(jolteon::JolteonNode::new(
381+
let node = Arc::new(tokio::sync::Mutex::new(jolteon::JolteonNode::new(
381382
node_id,
382383
config,
383384
txns_receiver,
384385
start_time,
385386
node_id == monitored_node,
386-
));
387+
)));
387388

388389
semaphore.add_permits(1);
389-
Protocol::run(&node, node_id, network_service, timer).await
390+
Protocol::run(node, node_id, network_service, timer).await
390391
}));
391392
}
392393

@@ -449,7 +450,7 @@ async fn test_jolteon(
449450
}
450451

451452
async fn test_jolteon_with_fast_qs(
452-
delay_function: impl DelayFunction<jolteon_fast_qs::Message<()>>,
453+
delay_function: impl DelayFunction,
453454
n_nodes: usize,
454455
delta: f64,
455456
warmup_duration_in_delta: u32,
@@ -532,7 +533,7 @@ async fn test_jolteon_with_fast_qs(
532533
let next_txn = || ();
533534

534535
// println!("Spawning node {node_id}");
535-
let node = tokio::sync::Mutex::new(jolteon_fast_qs::JolteonNode::new(
536+
let node = Arc::new(tokio::sync::Mutex::new(jolteon_fast_qs::JolteonNode::new(
536537
node_id,
537538
config,
538539
next_txn,
@@ -544,10 +545,10 @@ async fn test_jolteon_with_fast_qs(
544545
batch_commit_time: batch_commit_time_sender,
545546
// indirectly_committed_slots: indirectly_committed_slots_sender,
546547
},
547-
));
548+
)));
548549

549550
semaphore.add_permits(1);
550-
Protocol::run(&node, node_id, network_service, timer).await
551+
Protocol::run(node, node_id, network_service, timer).await
551552
}));
552553
}
553554

@@ -612,7 +613,7 @@ async fn test_jolteon_with_fast_qs(
612613
}
613614

614615
async fn test_raikou(
615-
delay_function: impl DelayFunction<raikou::Message>,
616+
delay_function: impl DelayFunction + Clone,
616617
n_nodes: usize,
617618
delta: f64,
618619
spawn_period_in_delta: u32,
@@ -631,6 +632,8 @@ async fn test_raikou(
631632
rand_distr::Uniform::new(1. * delta, spawn_period_in_delta as f64 * delta);
632633
let clock_speed_distr = rand_distr::Normal::new(1., 0.01).unwrap();
633634

635+
let mut diss_network =
636+
InjectedLocalNetwork::new(n_nodes, network_injection(delay_function.clone()));
634637
let mut network = InjectedLocalNetwork::new(n_nodes, network_injection(delay_function));
635638

636639
let f = (n_nodes - 1) / 3;
@@ -663,15 +666,14 @@ async fn test_raikou(
663666
let start_time = Instant::now();
664667
for node_id in 0..n_nodes {
665668
let config = config.clone();
669+
let diss_network_service = diss_network.service(node_id);
666670
let network_service = network.service(node_id);
667671

668672
let clock_speed = { thread_rng().sample(clock_speed_distr) };
669-
let timer = InjectedTimerService::local(move |duration, event| {
670-
(
671-
Duration::from_secs_f64(duration.as_secs_f64() / clock_speed),
672-
event,
673-
)
674-
});
673+
674+
// introduce artificial clock skew.
675+
let diss_timer = InjectedTimerService::local(clock_skew_injection(clock_speed));
676+
let timer = InjectedTimerService::local(clock_skew_injection(clock_speed));
675677

676678
// let propose_time_sender = Some(propose_time.new_sender());
677679
// let enter_time_sender = if node_id == monitored_node {
@@ -698,10 +700,23 @@ async fn test_raikou(
698700
// // Before starting the node, "drop" all messages sent to it during the spawn delay.
699701
// network_service.clear_inbox().await;
700702

703+
let txns_iter = iter::repeat_with(|| vec![]);
704+
705+
let dissemination = FakeDisseminationLayer::new(
706+
node_id,
707+
dissemination::fake::Config {
708+
n_nodes,
709+
ac_quorum: 2 * f + 1,
710+
batch_interval: Duration::from_secs_f64(delta * 0.1),
711+
},
712+
txns_iter,
713+
);
714+
701715
// println!("Spawning node {node_id}");
702-
let node = tokio::sync::Mutex::new(raikou::RaikouNode::new(
716+
let node = Arc::new(tokio::sync::Mutex::new(raikou::RaikouNode::new(
703717
node_id,
704718
config,
719+
dissemination.clone(),
705720
start_time,
706721
node_id == monitored_node,
707722
raikou::Metrics {
@@ -710,10 +725,17 @@ async fn test_raikou(
710725
batch_commit_time: batch_commit_time_sender,
711726
// indirectly_committed_slots: indirectly_committed_slots_sender,
712727
},
728+
)));
729+
730+
spawn(Protocol::run(
731+
dissemination.protocol(),
732+
node_id,
733+
diss_network_service,
734+
diss_timer,
713735
));
736+
spawn(Protocol::run(node, node_id, network_service, timer));
714737

715738
semaphore.add_permits(1);
716-
Protocol::run(&node, node_id, network_service, timer).await
717739
}));
718740
}
719741

0 commit comments

Comments
 (0)