Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge main to randomnet #12171

Merged
merged 40 commits into from
Feb 22, 2024
Merged
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
0d29ccd
Fix `iss`-related bug in Groth16 path & refactor (#12017)
alinush Feb 19, 2024
f92a197
[aptosvm] Simplify VM flows (#11888)
georgemitenkov Feb 20, 2024
d4fdb8f
[Compiler V2] Critical edge elimination (#11894)
fEst1ck Feb 20, 2024
9d802b8
[consensus configs] reduce sending block size from 2500 to 1900 (#12091)
bchocho Feb 20, 2024
6b3219e
[Indexer-grpc] Add profiling support. (#12034)
grao1991 Feb 20, 2024
67f372a
Minor aggregator cleanup (#12013)
vusirikala Feb 20, 2024
d771cec
[move] rotate_authentication_key_call should not modify OriginatingAd…
davidiw Feb 21, 2024
6ce7638
[Data Streaming Service] Add dynamic prefetching support
JoshLind Feb 8, 2024
47a1f66
[Data Streaming Service] Add dynamic prefetching unit tests.
JoshLind Feb 8, 2024
ea17e7e
[Data Streaming Service] Update existing integration tests.
JoshLind Feb 8, 2024
8d24ec7
[State Sync] Add backpressure to fast sync receiver.
JoshLind Feb 8, 2024
bc50dae
Update perf baseline for gas charging coverage improvements (reducing…
igor-aptos Feb 21, 2024
9972ea0
Reduce latency of cloning network sender using Arc pointers (#12103)
vusirikala Feb 21, 2024
3bef23f
adopt AIP-61 terminology for consistency (#12123)
alinush Feb 21, 2024
055683a
[Consensus] Remove non-decoupled execution and refactor for cleaner i…
sitalkedia Feb 21, 2024
5256800
fix jwk key logging (#12090)
zjma Feb 21, 2024
122f51f
remove spurious error lines (#12137)
igor-aptos Feb 21, 2024
6c7e9d4
randomness #1: types update from randomnet (#12106)
zjma Feb 21, 2024
76d8532
All validators broadcast commit vote messages (#12059)
vusirikala Feb 21, 2024
f325f52
[vm] Resource access control: runtime engine (#10544)
wrwg Feb 21, 2024
e229929
ObjectCodeDeployment API cleanup update (#12133)
igor-aptos Feb 21, 2024
ff49622
ObjectCodeDeployment API cleanup update (#12141)
igor-aptos Feb 21, 2024
b934829
[Compiler-v2] porting more V1 unit tests to V2 (#12085)
rahxephon89 Feb 22, 2024
d69d3c1
Enable the max object nesting check (#12129)
junkil-park Feb 22, 2024
72906ab
Resolved the warning for unused variable (#12157)
junkil-park Feb 22, 2024
8d7b337
Merge remote-tracking branch 'origin/main' into randomnet
zjma Feb 22, 2024
108b54c
update
zjma Feb 22, 2024
2fb99db
update
zjma Feb 22, 2024
bb1ffbb
update
zjma Feb 22, 2024
e9d1f6c
update
zjma Feb 22, 2024
dc74bb7
update
zjma Feb 22, 2024
22761c5
update
zjma Feb 22, 2024
843d2bc
update
zjma Feb 22, 2024
761c983
update
zjma Feb 22, 2024
86db77f
update
zjma Feb 22, 2024
60aa5a1
update
zjma Feb 22, 2024
bd55100
update
zjma Feb 22, 2024
ae1a9d8
update
zjma Feb 22, 2024
bbed07c
update
zjma Feb 22, 2024
f677b1a
Squashed commit of the following:
zjma Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[State Sync] Add backpressure to fast sync receiver.
  • Loading branch information
JoshLind committed Feb 21, 2024
commit 8d24ec79c623a1f4f90286ccd54bb67d0b36058e
10 changes: 4 additions & 6 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
@@ -136,7 +136,7 @@ impl Default for StateSyncDriverConfig {
max_connection_deadline_secs: 10,
max_consecutive_stream_notifications: 10,
max_num_stream_timeouts: 12,
max_pending_data_chunks: 100,
max_pending_data_chunks: 50,
max_pending_mempool_notifications: 100,
max_stream_wait_time_ms: 5000,
num_versions_to_skip_snapshot_sync: 100_000_000, // At 5k TPS, this allows a node to fail for about 6 hours.
@@ -219,9 +219,7 @@ pub struct DataStreamingServiceConfig {
/// Maximum number of concurrent data client requests (per stream) for state keys/values.
pub max_concurrent_state_requests: u64,

/// Maximum channel sizes for each data stream listener. If messages are not
/// consumed, they will be dropped (oldest messages first). The remaining
/// messages will be retrieved using FIFO ordering.
/// Maximum channel sizes for each data stream listener (per stream).
pub max_data_stream_channel_sizes: u64,

/// Maximum number of notification ID to response context mappings held in
@@ -256,7 +254,7 @@ impl Default for DataStreamingServiceConfig {
global_summary_refresh_interval_ms: 50,
max_concurrent_requests: MAX_CONCURRENT_REQUESTS,
max_concurrent_state_requests: MAX_CONCURRENT_STATE_REQUESTS,
max_data_stream_channel_sizes: 300,
max_data_stream_channel_sizes: 50,
max_notification_id_mappings: 300,
max_num_consecutive_subscriptions: 40, // At ~4 blocks per second, this should last 10 seconds
max_pending_requests: 50,
@@ -297,7 +295,7 @@ impl Default for DynamicPrefetchingConfig {
Self {
enable_dynamic_prefetching: true,
initial_prefetching_value: 3,
max_prefetching_value: 50,
max_prefetching_value: 30,
min_prefetching_value: 3,
prefetching_value_increase: 1,
prefetching_value_decrease: 2,
7 changes: 5 additions & 2 deletions state-sync/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
@@ -393,8 +393,11 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
pending_client_response
}

// TODO(joshlind): this function shouldn't be blocking when trying to send! If there are
// multiple streams, a single blocked stream could cause them all to block.
// TODO(joshlind): this function shouldn't be blocking when trying to send.
// If there are multiple streams, a single blocked stream could cause them
// all to block. This is acceptable for now (because there is only ever
// a single stream in use by the driver) but it should be fixed if we want
// to generalize this for multiple streams.
async fn send_data_notification(
&mut self,
data_notification: DataNotification,
1 change: 1 addition & 0 deletions state-sync/state-sync-driver/src/bootstrapper.rs
Original file line number Diff line number Diff line change
@@ -1030,6 +1030,7 @@ impl<
if let Err(error) = self
.storage_synchronizer
.save_state_values(notification_id, state_value_chunk_with_proof)
.await
{
self.reset_active_stream(Some(NotificationAndFeedback::new(
notification_id,
9 changes: 6 additions & 3 deletions state-sync/state-sync-driver/src/storage_synchronizer.rs
Original file line number Diff line number Diff line change
@@ -96,7 +96,7 @@ pub trait StorageSynchronizerInterface {
///
/// Note: this requires that `initialize_state_synchronizer` has been
/// called.
fn save_state_values(
async fn save_state_values(
&mut self,
notification_id: NotificationId,
state_value_chunk_with_proof: StateValueChunkWithProof,
@@ -403,17 +403,20 @@ impl<
load_pending_data_chunks(self.pending_data_chunks.clone()) > 0
}

fn save_state_values(
async fn save_state_values(
&mut self,
notification_id: NotificationId,
state_value_chunk_with_proof: StateValueChunkWithProof,
) -> Result<(), Error> {
// Get the snapshot notifier and create the storage data chunk
let state_snapshot_notifier = self.state_snapshot_notifier.as_mut().ok_or_else(|| {
Error::UnexpectedError("The state snapshot receiver has not been initialized!".into())
})?;
let storage_data_chunk =
StorageDataChunk::States(notification_id, state_value_chunk_with_proof);
if let Err(error) = state_snapshot_notifier.try_send(storage_data_chunk) {

// Notify the snapshot receiver of the storage data chunk
if let Err(error) = state_snapshot_notifier.send(storage_data_chunk).await {
Err(Error::UnexpectedError(format!(
"Failed to send storage data chunk to state snapshot listener: {:?}",
error
2 changes: 1 addition & 1 deletion state-sync/state-sync-driver/src/tests/mocks.rs
Original file line number Diff line number Diff line change
@@ -475,7 +475,7 @@ mock! {

fn pending_storage_data(&self) -> bool;

fn save_state_values(
async fn save_state_values(
&mut self,
notification_id: NotificationId,
state_value_chunk_with_proof: StateValueChunkWithProof,
13 changes: 10 additions & 3 deletions state-sync/state-sync-driver/src/tests/storage_synchronizer.rs
Original file line number Diff line number Diff line change
@@ -750,9 +750,11 @@ async fn test_save_states_completion() {
// Save multiple state chunks (including the last chunk)
storage_synchronizer
.save_state_values(0, create_state_value_chunk_with_proof(false))
.await
.unwrap();
storage_synchronizer
.save_state_values(1, create_state_value_chunk_with_proof(true))
.await
.unwrap();

// Verify we get a commit notification
@@ -808,6 +810,7 @@ async fn test_save_states_dropped_error_listener() {
let notification_id = 0;
storage_synchronizer
.save_state_values(notification_id, create_state_value_chunk_with_proof(true))
.await
.unwrap();

// The handler should panic as the commit listener was dropped
@@ -849,13 +852,14 @@ async fn test_save_states_invalid_chunk() {
let notification_id = 0;
storage_synchronizer
.save_state_values(notification_id, create_state_value_chunk_with_proof(false))
.await
.unwrap();
verify_error_notification(&mut error_listener, notification_id).await;
}

#[test]
#[tokio::test]
#[should_panic]
fn test_save_states_without_initialize() {
async fn test_save_states_without_initialize() {
// Create the storage synchronizer
let (_, _, _, _, _, mut storage_synchronizer, _) = create_storage_synchronizer(
create_mock_executor(),
@@ -864,7 +868,10 @@ fn test_save_states_without_initialize() {

// Attempting to save the states should panic as the state
// synchronizer was not initialized!
let _ = storage_synchronizer.save_state_values(0, create_state_value_chunk_with_proof(false));
storage_synchronizer
.save_state_values(0, create_state_value_chunk_with_proof(false))
.await
.unwrap();
}

/// Creates a storage synchronizer for testing
Loading