Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Streaming Service] Add support for dynamic prefetching #11951

Merged
merged 4 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions config/src/config/state_sync_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl Default for StateSyncDriverConfig {
max_connection_deadline_secs: 10,
max_consecutive_stream_notifications: 10,
max_num_stream_timeouts: 12,
max_pending_data_chunks: 100,
max_pending_data_chunks: 50,
max_pending_mempool_notifications: 100,
max_stream_wait_time_ms: 5000,
num_versions_to_skip_snapshot_sync: 100_000_000, // At 5k TPS, this allows a node to fail for about 6 hours.
Expand Down Expand Up @@ -204,6 +204,9 @@ impl Default for StorageServiceConfig {
#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DataStreamingServiceConfig {
/// The dynamic prefetching config for the data streaming service
pub dynamic_prefetching: DynamicPrefetchingConfig,

/// Whether or not to enable data subscription streaming.
pub enable_subscription_streaming: bool,

Expand All @@ -216,9 +219,7 @@ pub struct DataStreamingServiceConfig {
/// Maximum number of concurrent data client requests (per stream) for state keys/values.
pub max_concurrent_state_requests: u64,

/// Maximum channel sizes for each data stream listener. If messages are not
/// consumed, they will be dropped (oldest messages first). The remaining
/// messages will be retrieved using FIFO ordering.
/// Maximum channel sizes for each data stream listener (per stream).
pub max_data_stream_channel_sizes: u64,

/// Maximum number of notification ID to response context mappings held in
Expand Down Expand Up @@ -248,11 +249,12 @@ pub struct DataStreamingServiceConfig {
impl Default for DataStreamingServiceConfig {
fn default() -> Self {
Self {
dynamic_prefetching: DynamicPrefetchingConfig::default(),
enable_subscription_streaming: false,
global_summary_refresh_interval_ms: 50,
max_concurrent_requests: MAX_CONCURRENT_REQUESTS,
max_concurrent_state_requests: MAX_CONCURRENT_STATE_REQUESTS,
max_data_stream_channel_sizes: 300,
max_data_stream_channel_sizes: 50,
max_notification_id_mappings: 300,
max_num_consecutive_subscriptions: 40, // At ~4 blocks per second, this should last 10 seconds
max_pending_requests: 50,
Expand All @@ -263,6 +265,45 @@ impl Default for DataStreamingServiceConfig {
}
}

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct DynamicPrefetchingConfig {
/// Whether or not to enable dynamic prefetching
pub enable_dynamic_prefetching: bool,

/// The initial number of concurrent prefetching requests
pub initial_prefetching_value: u64,

/// The maximum number of concurrent prefetching requests
pub max_prefetching_value: u64,

/// The minimum number of concurrent prefetching requests
pub min_prefetching_value: u64,

/// The amount by which to increase the concurrent prefetching value (i.e., on a successful response)
pub prefetching_value_increase: u64,

/// The amount by which to decrease the concurrent prefetching value (i.e., on a timeout)
pub prefetching_value_decrease: u64,

/// The duration by which to freeze the prefetching value on a timeout
pub timeout_freeze_duration_secs: u64,
}

impl Default for DynamicPrefetchingConfig {
fn default() -> Self {
Self {
enable_dynamic_prefetching: true,
initial_prefetching_value: 3,
max_prefetching_value: 30,
min_prefetching_value: 3,
prefetching_value_increase: 1,
prefetching_value_decrease: 2,
timeout_freeze_duration_secs: 30,
}
}
}

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default, deny_unknown_fields)]
pub struct AptosDataPollerConfig {
Expand Down
5 changes: 5 additions & 0 deletions state-sync/data-streaming-service/src/data_notification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ impl DataClientRequest {
}
}

/// Returns true iff the request is a new data request
pub fn is_new_data_request(&self) -> bool {
self.is_optimistic_fetch_request() || self.is_subscription_request()
}

/// Returns true iff the request is an optimistic fetch request
pub fn is_optimistic_fetch_request(&self) -> bool {
matches!(self, DataClientRequest::NewTransactionsWithProof(_))
Expand Down
183 changes: 95 additions & 88 deletions state-sync/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
TransactionOutputsWithProofRequest, TransactionsOrOutputsWithProofRequest,
TransactionsWithProofRequest,
},
dynamic_prefetching::DynamicPrefetchingState,
error::Error,
logging::{LogEntry, LogEvent, LogSchema},
metrics,
Expand Down Expand Up @@ -119,6 +120,9 @@ pub struct DataStream<T> {

// The time service to track elapsed time (e.g., during stream lag checks)
time_service: TimeService,

// The dynamic prefetching state (if enabled)
dynamic_prefetching_state: DynamicPrefetchingState,
}

impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
Expand All @@ -141,6 +145,10 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
// Create a new stream engine
let stream_engine = StreamEngine::new(data_stream_config, stream_request, advertised_data)?;

// Create the dynamic prefetching state
let dynamic_prefetching_state =
DynamicPrefetchingState::new(data_stream_config, time_service.clone());

// Create a new data stream
let data_stream = Self {
data_client_config,
Expand All @@ -159,6 +167,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
send_failure: false,
subscription_stream_lag: None,
time_service,
dynamic_prefetching_state,
};

Ok((data_stream, data_stream_listener))
Expand Down Expand Up @@ -255,17 +264,6 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
Ok(())
}

/// Returns the maximum number of concurrent requests that can be executing
/// at any given time.
fn get_max_concurrent_requests(&self) -> u64 {
match self.stream_engine {
StreamEngine::StateStreamEngine(_) => {
self.streaming_service_config.max_concurrent_state_requests
},
_ => self.streaming_service_config.max_concurrent_requests,
}
}

/// Creates and sends a batch of aptos data client requests to the network
fn create_and_send_client_requests(
&mut self,
Expand All @@ -285,7 +283,8 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
// Otherwise, calculate the max number of requests to send based on
// the max concurrent requests and the number of pending request slots.
let remaining_concurrent_requests = self
.get_max_concurrent_requests()
.dynamic_prefetching_state
.get_max_concurrent_requests(&self.stream_engine)
.saturating_sub(num_in_flight_requests);
let remaining_request_slots = max_pending_requests.saturating_sub(num_pending_requests);
min(remaining_concurrent_requests, remaining_request_slots)
Expand Down Expand Up @@ -394,8 +393,11 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
pending_client_response
}

// TODO(joshlind): this function shouldn't be blocking when trying to send! If there are
// multiple streams, a single blocked stream could cause them all to block.
// TODO(joshlind): this function shouldn't be blocking when trying to send.
// If there are multiple streams, a single blocked stream could cause them
// all to block. This is acceptable for now (because there is only ever
// a single stream in use by the driver) but it should be fixed if we want
// to generalize this for multiple streams.
async fn send_data_notification(
&mut self,
data_notification: DataNotification,
Expand Down Expand Up @@ -453,85 +455,89 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
return Ok(()); // There's nothing left to do
}

// Process any ready data responses
for _ in 0..self.get_num_pending_data_requests()? {
if let Some(pending_response) = self.pop_pending_response_queue()? {
// Get the client request and response information
let maybe_client_response = pending_response.lock().client_response.take();
let client_response = maybe_client_response.ok_or_else(|| {
Error::UnexpectedErrorEncountered("The client response should be ready!".into())
})?;
let client_request = &pending_response.lock().client_request.clone();

// Process the client response
match client_response {
Ok(client_response) => {
// Sanity check and process the response
if sanity_check_client_response_type(client_request, &client_response) {
// The response is valid, send the data notification to the client
let client_response_payload = client_response.payload.clone();
self.send_data_notification_to_client(client_request, client_response)
.await?;

// If the response wasn't enough to satisfy the original request (e.g.,
// it was truncated), missing data should be requested.
match self
.request_missing_data(client_request, &client_response_payload)
{
Ok(missing_data_requested) => {
if missing_data_requested {
break; // We're now head of line blocked on the missing data
}
},
Err(error) => {
warn!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
.error(&error)
.message(
"Failed to determine if missing data was requested!"
));
},
}

// If the request was a subscription request and the subscription
// stream is lagging behind the data advertisements, the stream
// engine should be notified (e.g., so that it can catch up).
if client_request.is_subscription_request() {
if let Err(error) = self.check_subscription_stream_lag(
&global_data_summary,
&client_response_payload,
) {
self.notify_new_data_request_error(client_request, error)?;
break; // We're now head of line blocked on the failed stream
// Continuously process any ready data responses
while let Some(pending_response) = self.pop_pending_response_queue()? {
// Get the client request and response information
let maybe_client_response = pending_response.lock().client_response.take();
let client_response = maybe_client_response.ok_or_else(|| {
Error::UnexpectedErrorEncountered("The client response should be ready!".into())
})?;
let client_request = &pending_response.lock().client_request.clone();

// Process the client response
match client_response {
Ok(client_response) => {
// Sanity check and process the response
if sanity_check_client_response_type(client_request, &client_response) {
// If the response wasn't enough to satisfy the original request (e.g.,
// it was truncated), missing data should be requested.
let mut head_of_line_blocked = false;
match self.request_missing_data(client_request, &client_response.payload) {
Ok(missing_data_requested) => {
if missing_data_requested {
head_of_line_blocked = true; // We're now head of line blocked on the missing data
}
},
Err(error) => {
warn!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
.error(&error)
.message("Failed to determine if missing data was requested!"));
},
}

// If the request was a subscription request and the subscription
// stream is lagging behind the data advertisements, the stream
// engine should be notified (e.g., so that it can catch up).
if client_request.is_subscription_request() {
if let Err(error) = self.check_subscription_stream_lag(
&global_data_summary,
&client_response.payload,
) {
self.notify_new_data_request_error(client_request, error)?;
head_of_line_blocked = true; // We're now head of line blocked on the failed stream
}
} else {
// The sanity check failed
self.handle_sanity_check_failure(
client_request,
&client_response.context,
)?;
break; // We're now head of line blocked on the failed request
}
},
Err(error) => {
// Handle the error depending on the request type
if client_request.is_subscription_request()
|| client_request.is_optimistic_fetch_request()
{
// The request was for new data. We should notify the
// stream engine and clear the requests queue.
self.notify_new_data_request_error(client_request, error)?;
} else {
// Otherwise, we should handle the error and simply retry
self.handle_data_client_error(client_request, &error)?;

// The response is valid, send the data notification to the client
self.send_data_notification_to_client(client_request, client_response)
.await?;

// If the request is for specific data, increase the prefetching limit.
// Note: we don't increase the limit for new data requests because
// those don't invoke the prefetcher (as we're already up-to-date).
if !client_request.is_new_data_request() {
self.dynamic_prefetching_state
.increase_max_concurrent_requests();
}

// If we're head of line blocked, we should return early
if head_of_line_blocked {
break;
}
} else {
// The sanity check failed
self.handle_sanity_check_failure(client_request, &client_response.context)?;
break; // We're now head of line blocked on the failed request
},
}
} else {
break; // The first response hasn't arrived yet
}
},
Err(error) => {
// Handle the error depending on the request type
if client_request.is_new_data_request() {
// The request was for new data. We should notify the
// stream engine and clear the requests queue.
self.notify_new_data_request_error(client_request, error)?;
} else {
// Decrease the prefetching limit on an error
self.dynamic_prefetching_state
.decrease_max_concurrent_requests();

// Handle the error and simply retry
self.handle_data_client_error(client_request, &error)?;
}
break; // We're now head of line blocked on the failed request
},
}
}

Expand Down Expand Up @@ -705,6 +711,7 @@ impl<T: AptosDataClientInterface + Send + Clone + 'static> DataStream<T> {
data_client_request: &DataClientRequest,
data_client_error: &aptos_data_client::error::Error,
) -> Result<(), Error> {
// Log the error
warn!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
Expand Down
Loading
Loading