Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge main to randomnet #12171

Merged
merged 40 commits into from
Feb 22, 2024
Merged
Changes from 1 commit
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
0d29ccd
Fix `iss`-related bug in Groth16 path & refactor (#12017)
alinush Feb 19, 2024
f92a197
[aptosvm] Simplify VM flows (#11888)
georgemitenkov Feb 20, 2024
d4fdb8f
[Compiler V2] Critical edge elimination (#11894)
fEst1ck Feb 20, 2024
9d802b8
[consensus configs] reduce sending block size from 2500 to 1900 (#12091)
bchocho Feb 20, 2024
6b3219e
[Indexer-grpc] Add profiling support. (#12034)
grao1991 Feb 20, 2024
67f372a
Minor aggregator cleanup (#12013)
vusirikala Feb 20, 2024
d771cec
[move] rotate_authentication_key_call should not modify OriginatingAd…
davidiw Feb 21, 2024
6ce7638
[Data Streaming Service] Add dynamic prefetching support
JoshLind Feb 8, 2024
47a1f66
[Data Streaming Service] Add dynamic prefetching unit tests.
JoshLind Feb 8, 2024
ea17e7e
[Data Streaming Service] Update existing integration tests.
JoshLind Feb 8, 2024
8d24ec7
[State Sync] Add backpressure to fast sync receiver.
JoshLind Feb 8, 2024
bc50dae
Update perf baseline for gas charging coverage improvements (reducing…
igor-aptos Feb 21, 2024
9972ea0
Reduce latency of cloning network sender using Arc pointers (#12103)
vusirikala Feb 21, 2024
3bef23f
adopt AIP-61 terminology for consistency (#12123)
alinush Feb 21, 2024
055683a
[Consensus] Remove non-decoupled execution and refactor for cleaner i…
sitalkedia Feb 21, 2024
5256800
fix jwk key logging (#12090)
zjma Feb 21, 2024
122f51f
remove spurious error lines (#12137)
igor-aptos Feb 21, 2024
6c7e9d4
randomness #1: types update from randomnet (#12106)
zjma Feb 21, 2024
76d8532
All validators broadcast commit vote messages (#12059)
vusirikala Feb 21, 2024
f325f52
[vm] Resource access control: runtime engine (#10544)
wrwg Feb 21, 2024
e229929
ObjectCodeDeployment API cleanup update (#12133)
igor-aptos Feb 21, 2024
ff49622
ObjectCodeDeployment API cleanup update (#12141)
igor-aptos Feb 21, 2024
b934829
[Compiler-v2] porting more V1 unit tests to V2 (#12085)
rahxephon89 Feb 22, 2024
d69d3c1
Enable the max object nesting check (#12129)
junkil-park Feb 22, 2024
72906ab
Resolved the warning for unused variable (#12157)
junkil-park Feb 22, 2024
8d7b337
Merge remote-tracking branch 'origin/main' into randomnet
zjma Feb 22, 2024
108b54c
update
zjma Feb 22, 2024
2fb99db
update
zjma Feb 22, 2024
bb1ffbb
update
zjma Feb 22, 2024
e9d1f6c
update
zjma Feb 22, 2024
dc74bb7
update
zjma Feb 22, 2024
22761c5
update
zjma Feb 22, 2024
843d2bc
update
zjma Feb 22, 2024
761c983
update
zjma Feb 22, 2024
86db77f
update
zjma Feb 22, 2024
60aa5a1
update
zjma Feb 22, 2024
bd55100
update
zjma Feb 22, 2024
ae1a9d8
update
zjma Feb 22, 2024
bbed07c
update
zjma Feb 22, 2024
f677b1a
Squashed commit of the following:
zjma Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[Data Streaming Service] Add dynamic prefetching support
JoshLind committed Feb 21, 2024
commit 6ce763870143241e45e84f81bd172781f46508a1
43 changes: 43 additions & 0 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
@@ -204,6 +204,9 @@ impl Default for StorageServiceConfig {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DataStreamingServiceConfig {
/// The dynamic prefetching config for the data streaming service
pub dynamic_prefetching: DynamicPrefetchingConfig,

/// Whether or not to enable data subscription streaming.
pub enable_subscription_streaming: bool,

@@ -248,6 +251,7 @@ pub struct DataStreamingServiceConfig {
impl Default for DataStreamingServiceConfig {
fn default() -> Self {
Self {
dynamic_prefetching: DynamicPrefetchingConfig::default(),
enable_subscription_streaming: false,
global_summary_refresh_interval_ms: 50,
max_concurrent_requests: MAX_CONCURRENT_REQUESTS,
@@ -263,6 +267,45 @@ impl Default for DataStreamingServiceConfig {
}
}

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DynamicPrefetchingConfig {
/// Whether or not to enable dynamic prefetching
pub enable_dynamic_prefetching: bool,

/// The initial number of concurrent prefetching requests
pub initial_prefetching_value: u64,

/// The maximum number of concurrent prefetching requests
pub max_prefetching_value: u64,

/// The minimum number of concurrent prefetching requests
pub min_prefetching_value: u64,

/// The amount by which to increase the concurrent prefetching value (i.e., on a successful response)
pub prefetching_value_increase: u64,

/// The amount by which to decrease the concurrent prefetching value (i.e., on a timeout)
pub prefetching_value_decrease: u64,

/// The duration by which to freeze the prefetching value on a timeout
pub timeout_freeze_duration_secs: u64,
}

impl Default for DynamicPrefetchingConfig {
fn default() -> Self {
Self {
enable_dynamic_prefetching: true,
initial_prefetching_value: 3,
max_prefetching_value: 50,
min_prefetching_value: 3,
prefetching_value_increase: 1,
prefetching_value_decrease: 2,
timeout_freeze_duration_secs: 30,
}
}
}

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AptosDataPollerConfig {
5 changes: 5 additions & 0 deletions state-sync/data-streaming-service/src/data_notification.rs
Original file line number Diff line number Diff line change
@@ -88,6 +88,11 @@ impl DataClientRequest {
}
}

/// Returns true iff the request is a new data request
pub fn is_new_data_request(&self) -> bool {
self.is_optimistic_fetch_request() || self.is_subscription_request()
}

/// Returns true iff the request is an optimistic fetch request
pub fn is_optimistic_fetch_request(&self) -> bool {
matches!(self, DataClientRequest::NewTransactionsWithProof(_))
176 changes: 90 additions & 86 deletions state-sync/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
@@ -13,6 +13,7 @@ use crate::{
TransactionOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest,
TransactionsWithProofRequest,
},
dynamic_prefetching::DynamicPrefetchingState,
error::Error,
logging::{LogEntry, LogEvent, LogSchema},
metrics,
@@ -119,6 +120,9 @@ pub struct DataStream<T> {

// The time service to track elapsed time (e.g., during stream lag checks)
time_service: TimeService,

// The dynamic prefetching state (if enabled)
dynamic_prefetching_state: DynamicPrefetchingState,
}

impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
@@ -141,6 +145,10 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
// Create a new stream engine
let stream_engine = StreamEngine::new(data_stream_config, stream_request, advertised_data)?;

// Create the dynamic prefetching state
let dynamic_prefetching_state =
DynamicPrefetchingState::new(data_stream_config, time_service.clone());

// Create a new data stream
let data_stream = Self {
data_client_config,
@@ -159,6 +167,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
send_failure: false,
subscription_stream_lag: None,
time_service,
dynamic_prefetching_state,
};

Ok((data_stream, data_stream_listener))
@@ -255,17 +264,6 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
Ok(())
}

/// Returns the maximum number of concurrent requests that can be executing
/// at any given time.
fn get_max_concurrent_requests(&self) -> u64 {
match self.stream_engine {
StreamEngine::StateStreamEngine(_) => {
self.streaming_service_config.max_concurrent_state_requests
},
_ => self.streaming_service_config.max_concurrent_requests,
}
}

/// Creates and sends a batch of aptos data client requests to the network
fn create_and_send_client_requests(
&mut self,
@@ -285,7 +283,8 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
// Otherwise, calculate the max number of requests to send based on
// the max concurrent requests and the number of pending request slots.
let remaining_concurrent_requests = self
.get_max_concurrent_requests()
.dynamic_prefetching_state
.get_max_concurrent_requests(&self.stream_engine)
.saturating_sub(num_in_flight_requests);
let remaining_request_slots = max_pending_requests.saturating_sub(num_pending_requests);
min(remaining_concurrent_requests, remaining_request_slots)
@@ -453,85 +452,89 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
return Ok(()); // There's nothing left to do
}

// Process any ready data responses
for _ in 0..self.get_num_pending_data_requests()? {
if let Some(pending_response) = self.pop_pending_response_queue()? {
// Get the client request and response information
let maybe_client_response = pending_response.lock().client_response.take();
let client_response = maybe_client_response.ok_or_else(|| {
Error::UnexpectedErrorEncountered("The client response should be ready!".into())
})?;
let client_request = &pending_response.lock().client_request.clone();

// Process the client response
match client_response {
Ok(client_response) => {
// Sanity check and process the response
if sanity_check_client_response_type(client_request, &client_response) {
// The response is valid, send the data notification to the client
let client_response_payload = client_response.payload.clone();
self.send_data_notification_to_client(client_request, client_response)
.await?;

// If the response wasn't enough to satisfy the original request (e.g.,
// it was truncated), missing data should be requested.
match self
.request_missing_data(client_request, &client_response_payload)
{
Ok(missing_data_requested) => {
if missing_data_requested {
break; // We're now head of line blocked on the missing data
}
},
Err(error) => {
warn!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
.error(&error)
.message(
"Failed to determine if missing data was requested!"
));
},
}

// If the request was a subscription request and the subscription
// stream is lagging behind the data advertisements, the stream
// engine should be notified (e.g., so that it can catch up).
if client_request.is_subscription_request() {
if let Err(error) = self.check_subscription_stream_lag(
&global_data_summary,
&client_response_payload,
) {
self.notify_new_data_request_error(client_request, error)?;
break; // We're now head of line blocked on the failed stream
// Continuously process any ready data responses
while let Some(pending_response) = self.pop_pending_response_queue()? {
// Get the client request and response information
let maybe_client_response = pending_response.lock().client_response.take();
let client_response = maybe_client_response.ok_or_else(|| {
Error::UnexpectedErrorEncountered("The client response should be ready!".into())
})?;
let client_request = &pending_response.lock().client_request.clone();

// Process the client response
match client_response {
Ok(client_response) => {
// Sanity check and process the response
if sanity_check_client_response_type(client_request, &client_response) {
// If the response wasn't enough to satisfy the original request (e.g.,
// it was truncated), missing data should be requested.
let mut head_of_line_blocked = false;
match self.request_missing_data(client_request, &client_response.payload) {
Ok(missing_data_requested) => {
if missing_data_requested {
head_of_line_blocked = true; // We're now head of line blocked on the missing data
}
},
Err(error) => {
warn!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
.error(&error)
.message("Failed to determine if missing data was requested!"));
},
}

// If the request was a subscription request and the subscription
// stream is lagging behind the data advertisements, the stream
// engine should be notified (e.g., so that it can catch up).
if client_request.is_subscription_request() {
if let Err(error) = self.check_subscription_stream_lag(
&global_data_summary,
&client_response.payload,
) {
self.notify_new_data_request_error(client_request, error)?;
head_of_line_blocked = true; // We're now head of line blocked on the failed stream
}
} else {
// The sanity check failed
self.handle_sanity_check_failure(
client_request,
&client_response.context,
)?;
break; // We're now head of line blocked on the failed request
}
},
Err(error) => {
// Handle the error depending on the request type
if client_request.is_subscription_request()
|| client_request.is_optimistic_fetch_request()
{
// The request was for new data. We should notify the
// stream engine and clear the requests queue.
self.notify_new_data_request_error(client_request, error)?;
} else {
// Otherwise, we should handle the error and simply retry
self.handle_data_client_error(client_request, &error)?;

// The response is valid, send the data notification to the client
self.send_data_notification_to_client(client_request, client_response)
.await?;

// If the request is for specific data, increase the prefetching limit.
// Note: we don't increase the limit for new data requests because
// those don't invoke the prefetcher (as we're already up-to-date).
if !client_request.is_new_data_request() {
self.dynamic_prefetching_state
.increase_max_concurrent_requests();
}

// If we're head of line blocked, we should return early
if head_of_line_blocked {
break;
}
} else {
// The sanity check failed
self.handle_sanity_check_failure(client_request, &client_response.context)?;
break; // We're now head of line blocked on the failed request
},
}
} else {
break; // The first response hasn't arrived yet
}
},
Err(error) => {
// Handle the error depending on the request type
if client_request.is_new_data_request() {
// The request was for new data. We should notify the
// stream engine and clear the requests queue.
self.notify_new_data_request_error(client_request, error)?;
} else {
// Decrease the prefetching limit on an error
self.dynamic_prefetching_state
.decrease_max_concurrent_requests();

// Handle the error and simply retry
self.handle_data_client_error(client_request, &error)?;
}
break; // We're now head of line blocked on the failed request
},
}
}

@@ -705,6 +708,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
data_client_request: &DataClientRequest,
data_client_error: &aptos_data_client::error::Error,
) -> Result<(), Error> {
// Log the error
warn!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
151 changes: 151 additions & 0 deletions state-sync/data-streaming-service/src/dynamic_prefetching.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{metrics, stream_engine::StreamEngine};
use aptos_config::config::{DataStreamingServiceConfig, DynamicPrefetchingConfig};
use aptos_time_service::{TimeService, TimeServiceTrait};
use std::{
cmp::{max, min},
time::{Duration, Instant},
};

/// A simple container for the dynamic prefetching state
#[derive(Debug)]
pub struct DynamicPrefetchingState {
// The data streaming service config
streaming_service_config: DataStreamingServiceConfig,

// The instant the last timeout occurred (if any)
last_timeout_instant: Option<Instant>,

// The maximum number of concurrent requests that can be executing at any given time
max_dynamic_concurrent_requests: u64,

// The time service to track elapsed time (e.g., during stream lag checks)
time_service: TimeService,
}

impl DynamicPrefetchingState {
pub fn new(
data_streaming_service_config: DataStreamingServiceConfig,
time_service: TimeService,
) -> Self {
// Get the initial prefetching value from the config
let max_dynamic_concurrent_requests = data_streaming_service_config
.dynamic_prefetching
.initial_prefetching_value;

// Create and return the new dynamic prefetching state
Self {
streaming_service_config: data_streaming_service_config,
last_timeout_instant: None,
max_dynamic_concurrent_requests,
time_service,
}
}

/// A simple helper function that returns the dynamic prefetching config
fn get_dynamic_prefetching_config(&self) -> &DynamicPrefetchingConfig {
&self.streaming_service_config.dynamic_prefetching
}

/// Returns true iff dynamic prefetching is enabled
fn is_dynamic_prefetching_enabled(&self) -> bool {
self.get_dynamic_prefetching_config()
.enable_dynamic_prefetching
}

/// Returns true iff the prefetching value is currently frozen (i.e.,
/// to avoid overly increasing the value near saturation). Freezing
/// occurs after a timeout and lasts for a configured duration.
fn is_prefetching_value_frozen(&self) -> bool {
match self.last_timeout_instant {
Some(last_failure_time) => {
// Get the time since the last failure and max freeze duration
let time_since_last_failure =
self.time_service.now().duration_since(last_failure_time);
let max_freeze_duration = Duration::from_secs(
self.get_dynamic_prefetching_config()
.timeout_freeze_duration_secs,
);

// Check if the time since the last failure is less than the freeze duration
time_since_last_failure < max_freeze_duration
},
None => false, // No failures have occurred
}
}

/// Returns the number of maximum concurrent requests that can be executing
/// at any given time. Depending on if dynamic prefetching is enabled, this
/// value will be dynamic or static (i.e., config defined).
pub fn get_max_concurrent_requests(&self, stream_engine: &StreamEngine) -> u64 {
// If dynamic prefetching is disabled, use the static values defined
// in the config. Otherwise get the current dynamic max value.
let max_concurrent_requests = if !self.is_dynamic_prefetching_enabled() {
match stream_engine {
StreamEngine::StateStreamEngine(_) => {
// Use the configured max for state value requests
self.streaming_service_config.max_concurrent_state_requests
},
_ => {
// Use the configured max for all other requests
self.streaming_service_config.max_concurrent_requests
},
}
} else {
// Otherwise, return the current max value
self.max_dynamic_concurrent_requests
};

// Update the metrics for the max concurrent requests
metrics::set_max_concurrent_requests(max_concurrent_requests);

max_concurrent_requests
}

/// Increases the maximum number of concurrent requests that should be executing.
/// This is typically called after a successful response is received.
pub fn increase_max_concurrent_requests(&mut self) {
// If dynamic prefetching is disabled, or the value is currently frozen, do nothing
if !self.is_dynamic_prefetching_enabled() || self.is_prefetching_value_frozen() {
return;
}

// Otherwise, get and increase the current max
let dynamic_prefetching_config = self.get_dynamic_prefetching_config();
let amount_to_increase = dynamic_prefetching_config.prefetching_value_increase;
let max_dynamic_concurrent_requests = self
.max_dynamic_concurrent_requests
.saturating_add(amount_to_increase);

// Bound the value by the configured maximum
let max_prefetching_value = dynamic_prefetching_config.max_prefetching_value;
self.max_dynamic_concurrent_requests =
min(max_dynamic_concurrent_requests, max_prefetching_value);
}

/// Decreases the maximum number of concurrent requests that should be executing.
/// This is typically called after a timeout is received.
pub fn decrease_max_concurrent_requests(&mut self) {
// If dynamic prefetching is disabled, do nothing
if !self.is_dynamic_prefetching_enabled() {
return;
}

// Update the last failure time
self.last_timeout_instant = Some(self.time_service.now());

// Otherwise, get and decrease the current max
let dynamic_prefetching_config = self.get_dynamic_prefetching_config();
let amount_to_decrease = dynamic_prefetching_config.prefetching_value_decrease;
let max_dynamic_concurrent_requests = self
.max_dynamic_concurrent_requests
.saturating_sub(amount_to_decrease);

// Bound the value by the configured minimum
let min_prefetching_value = dynamic_prefetching_config.min_prefetching_value;
self.max_dynamic_concurrent_requests =
max(max_dynamic_concurrent_requests, min_prefetching_value);
}
}
1 change: 1 addition & 0 deletions state-sync/data-streaming-service/src/lib.rs
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

pub mod data_notification;
pub mod data_stream;
mod dynamic_prefetching;
pub mod error;
mod logging;
mod metrics;
14 changes: 14 additions & 0 deletions state-sync/data-streaming-service/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -134,6 +134,15 @@ pub static RETRIED_DATA_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

/// Counter for the number of max concurrent prefetching requests
pub static MAX_CONCURRENT_PREFETCHING_REQUESTS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_data_streaming_service_max_concurrent_prefetching_requests",
"The number of max concurrent prefetching requests",
)
.unwrap()
});

/// Counter for the number of pending data responses
pub static PENDING_DATA_RESPONSES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
@@ -252,6 +261,11 @@ pub fn set_active_data_streams(value: usize) {
ACTIVE_DATA_STREAMS.set(value as i64);
}

/// Sets the number of max concurrent requests
pub fn set_max_concurrent_requests(value: u64) {
MAX_CONCURRENT_PREFETCHING_REQUESTS.set(value as i64);
}

/// Sets the number of complete pending data responses
pub fn set_complete_pending_data_responses(value: u64) {
COMPLETE_PENDING_DATA_RESPONSES.set(value as i64);