From 0578c27a579d46e3a7e113ce0bcb95f323a9de8b Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Tue, 21 Jan 2025 23:38:10 +0000 Subject: [PATCH] [Indexer-Grpc-V2] Add HistoricalDataService. --- Cargo.lock | 8 + .../indexer-grpc-data-service-v2/Cargo.toml | 10 + .../src/config.rs | 179 +++++++++++++++- .../src/connection_manager.rs | 4 + .../src/historical_data_service.rs | 172 +++++++++++++++ .../indexer-grpc-data-service-v2/src/lib.rs | 4 + .../src/service.rs | 197 ++++++++++++++++++ .../indexer-grpc-data-service-v2/src/test.rs | 5 + 8 files changed, 578 insertions(+), 1 deletion(-) create mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/historical_data_service.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/service.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/test.rs diff --git a/Cargo.lock b/Cargo.lock index 23bf1f76b92c9..c0dbf9d66fdf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2171,18 +2171,26 @@ name = "aptos-indexer-grpc-data-service-v2" version = "1.0.0" dependencies = [ "anyhow", + "aptos-config", "aptos-indexer-grpc-server-framework", "aptos-indexer-grpc-utils", "aptos-protos 1.3.1", "async-trait", "clap 4.5.21", "dashmap", + "futures", "jemallocator", + "once_cell", "rand 0.7.3", "serde", + "serde_json", "tokio", + "tokio-scoped", + "tokio-stream", "tonic 0.12.3", + "tonic-reflection", "tracing", + "uuid", ] [[package]] diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml index 392351bbf39d0..b99906a22f78a 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/Cargo.toml @@ -20,11 +20,21 @@ aptos-protos = { workspace = true } async-trait = { workspace = true } clap = { workspace = true } dashmap = { workspace = true } +futures = { workspace = true } +once_cell = { workspace = true } rand = { workspace = true } serde = { workspace = true } tokio = { workspace = true } +tokio-scoped = { workspace = true } +tokio-stream = { workspace = true } tonic = { workspace = true } +tonic-reflection = { workspace = true } tracing = { workspace = true } +uuid = { workspace = true } + +[dev-dependencies] +aptos-config = { workspace = true } +serde_json = { workspace = true } [target.'cfg(unix)'.dependencies] jemallocator = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs index 93db4301b658d..d554494eda711 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/config.rs @@ -1,19 +1,196 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 +use crate::{ + connection_manager::ConnectionManager, + historical_data_service::HistoricalDataService, + service::{DataServiceWrapper, DataServiceWrapperWrapper}, +}; use anyhow::Result; use aptos_indexer_grpc_server_framework::RunnableConfig; +use aptos_indexer_grpc_utils::config::IndexerGrpcFileStoreConfig; +use aptos_protos::{ + indexer::v1::FILE_DESCRIPTOR_SET as INDEXER_V1_FILE_DESCRIPTOR_SET, + transaction::v1::FILE_DESCRIPTOR_SET as TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET, + util::timestamp::FILE_DESCRIPTOR_SET as UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET, +}; +use once_cell::sync::OnceCell; use serde::{Deserialize, Serialize}; +use std::{net::SocketAddr, sync::Arc}; +use tokio::task::JoinHandle; +use tonic::{codec::CompressionEncoding, transport::Server}; +use tracing::info; + +pub(crate) static HISTORICAL_DATA_SERVICE: OnceCell = OnceCell::new(); pub(crate) const MAX_MESSAGE_SIZE: usize = 256 * (1 << 20); +// HTTP2 ping interval and timeout. +// This can help server to garbage collect dead connections. +// tonic server: https://docs.rs/tonic/latest/tonic/transport/server/struct.Server.html#method.http2_keepalive_interval +const HTTP2_PING_INTERVAL_DURATION: std::time::Duration = std::time::Duration::from_secs(60); +const HTTP2_PING_TIMEOUT_DURATION: std::time::Duration = std::time::Duration::from_secs(10); + +const DEFAULT_MAX_RESPONSE_CHANNEL_SIZE: usize = 5; + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct TlsConfig { + pub cert_path: String, + pub key_path: String, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct ServiceConfig { + /// The address to listen on. + pub(crate) listen_address: SocketAddr, + pub(crate) tls_config: Option, +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(deny_unknown_fields)] +pub struct HistoricalDataServiceConfig { + pub enabled: bool, + pub file_store_config: IndexerGrpcFileStoreConfig, +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(deny_unknown_fields)] -pub struct IndexerGrpcDataServiceConfig {} +pub struct IndexerGrpcDataServiceConfig { + pub(crate) chain_id: u64, + pub(crate) service_config: ServiceConfig, + pub(crate) historical_data_service_config: HistoricalDataServiceConfig, + pub(crate) grpc_manager_addresses: Vec, + pub(crate) self_advertised_address: String, + #[serde(default = "IndexerGrpcDataServiceConfig::default_data_service_response_channel_size")] + pub data_service_response_channel_size: usize, +} + +impl IndexerGrpcDataServiceConfig { + const fn default_data_service_response_channel_size() -> usize { + DEFAULT_MAX_RESPONSE_CHANNEL_SIZE + } + + async fn create_historical_data_service( + &self, + tasks: &mut Vec>>, + ) -> Option { + if !self.historical_data_service_config.enabled { + return None; + } + let connection_manager = Arc::new( + ConnectionManager::new( + self.chain_id, + self.grpc_manager_addresses.clone(), + self.self_advertised_address.clone(), + /*is_live_data_service=*/ false, + ) + .await, + ); + let (handler_tx, handler_rx) = tokio::sync::mpsc::channel(10); + let service = DataServiceWrapper::new( + connection_manager.clone(), + handler_tx, + self.data_service_response_channel_size, + /*is_live_data_service=*/ false, + ); + + let connection_manager_clone = connection_manager.clone(); + tasks.push(tokio::task::spawn(async move { + connection_manager_clone.start().await; + Ok(()) + })); + + let chain_id = self.chain_id; + let config = self.historical_data_service_config.clone(); + tasks.push(tokio::task::spawn_blocking(move || { + HISTORICAL_DATA_SERVICE + .get_or_init(|| HistoricalDataService::new(chain_id, config, connection_manager)) + .run(handler_rx); + Ok(()) + })); + + Some(service) + } +} #[async_trait::async_trait] impl RunnableConfig for IndexerGrpcDataServiceConfig { async fn run(&self) -> Result<()> { + let reflection_service = tonic_reflection::server::Builder::configure() + // Note: It is critical that the file descriptor set is registered for every + // file that the top level API proto depends on recursively. If you don't, + // compilation will still succeed but reflection will fail at runtime. + // + // TODO: Add a test for this / something in build.rs, this is a big footgun. + .register_encoded_file_descriptor_set(INDEXER_V1_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(TRANSACTION_V1_TESTING_FILE_DESCRIPTOR_SET) + .register_encoded_file_descriptor_set(UTIL_TIMESTAMP_FILE_DESCRIPTOR_SET) + .build_v1alpha() + .map_err(|e| anyhow::anyhow!("Failed to build reflection service: {}", e))? + .send_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Gzip); + + let mut tasks = vec![]; + + // TODO(grao): Implement. + let live_data_service = None; + + let historical_data_service = self.create_historical_data_service(&mut tasks).await; + + let wrapper = Arc::new(DataServiceWrapperWrapper::new( + live_data_service, + historical_data_service, + )); + let wrapper_service_raw = + aptos_protos::indexer::v1::raw_data_server::RawDataServer::from_arc(wrapper.clone()) + .send_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(MAX_MESSAGE_SIZE) + .max_encoding_message_size(MAX_MESSAGE_SIZE); + let wrapper_service = + aptos_protos::indexer::v1::data_service_server::DataServiceServer::from_arc(wrapper) + .send_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Zstd) + .accept_compressed(CompressionEncoding::Gzip) + .max_decoding_message_size(MAX_MESSAGE_SIZE) + .max_encoding_message_size(MAX_MESSAGE_SIZE); + + let listen_address = self.service_config.listen_address; + let mut server_builder = Server::builder() + .http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION)) + .http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION)); + if let Some(config) = &self.service_config.tls_config { + let cert = tokio::fs::read(config.cert_path.clone()).await?; + let key = tokio::fs::read(config.key_path.clone()).await?; + let identity = tonic::transport::Identity::from_pem(cert, key); + server_builder = server_builder + .tls_config(tonic::transport::ServerTlsConfig::new().identity(identity))?; + info!( + grpc_address = listen_address.to_string().as_str(), + "[Data Service] Starting gRPC server with TLS." + ); + } else { + info!( + grpc_address = listen_address.to_string().as_str(), + "[data service] starting gRPC server with non-TLS." + ); + } + + tasks.push(tokio::spawn(async move { + server_builder + .add_service(wrapper_service) + .add_service(wrapper_service_raw) + .add_service(reflection_service) + .serve(listen_address) + .await + .map_err(|e| anyhow::anyhow!(e)) + })); + + futures::future::try_join_all(tasks).await?; Ok(()) } diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs index 75a7bc4eaaaa5..ac0168d42f0ac 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/connection_manager.rs @@ -161,6 +161,10 @@ impl ConnectionManager { } } + pub(crate) fn chain_id(&self) -> u64 { + self.chain_id + } + pub(crate) fn get_grpc_manager_client_for_request(&self) -> GrpcManagerClient { let mut rng = thread_rng(); self.grpc_manager_connections diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/historical_data_service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/historical_data_service.rs new file mode 100644 index 0000000000000..81c57b71863f0 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/historical_data_service.rs @@ -0,0 +1,172 @@ +// Copyright (c) Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{config::HistoricalDataServiceConfig, connection_manager::ConnectionManager}; +use aptos_indexer_grpc_utils::file_store_operator_v2::file_store_reader::FileStoreReader; +use aptos_protos::indexer::v1::{GetTransactionsRequest, TransactionsResponse}; +use futures::executor::block_on; +use std::{ + sync::Arc, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; +use tonic::{Request, Status}; +use tracing::info; +use uuid::Uuid; + +const DEFAULT_MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 10000; + +pub struct HistoricalDataService { + chain_id: u64, + connection_manager: Arc, + file_store_reader: Arc, +} + +impl HistoricalDataService { + pub fn new( + chain_id: u64, + config: HistoricalDataServiceConfig, + connection_manager: Arc, + ) -> Self { + let file_store = block_on(config.file_store_config.create_filestore()); + let file_store_reader = Arc::new(block_on(FileStoreReader::new(chain_id, file_store))); + Self { + chain_id, + connection_manager: connection_manager.clone(), + file_store_reader, + } + } + + pub fn run( + &self, + mut handler_rx: Receiver<( + Request, + Sender>, + )>, + ) { + info!("Running HistoricalDataService..."); + tokio_scoped::scope(|scope| { + while let Some((request, response_sender)) = handler_rx.blocking_recv() { + // TODO(grao): Store request metadata. + let request = request.into_inner(); + // TODO(grao): We probably should have a more stable id from the client side. + let id = Uuid::new_v4().to_string(); + info!("Received request: {request:?}."); + + if request.starting_version.is_none() { + let err = Err(Status::invalid_argument("Must provide starting_version.")); + info!("Client error: {err:?}."); + let _ = response_sender.blocking_send(err); + continue; + } + let starting_version = request.starting_version.unwrap(); + + let max_num_transactions_per_batch = if let Some(batch_size) = request.batch_size { + batch_size as usize + } else { + DEFAULT_MAX_NUM_TRANSACTIONS_PER_BATCH + }; + + let ending_version = request + .transactions_count + .map(|count| starting_version + count); + + scope.spawn(async move { + self.start_streaming( + id, + starting_version, + ending_version, + max_num_transactions_per_batch, + response_sender, + ) + .await + }); + } + }); + } + + async fn start_streaming( + &self, + id: String, + starting_version: u64, + ending_version: Option, + max_num_transactions_per_batch: usize, + response_sender: tokio::sync::mpsc::Sender>, + ) { + info!(stream_id = id, "Start streaming, starting_version: {starting_version}, ending_version: {ending_version:?}."); + self.connection_manager + .insert_active_stream(&id, starting_version, ending_version); + let mut next_version = starting_version; + let ending_version = ending_version.unwrap_or(u64::MAX); + let mut size_bytes = 0; + 'out: loop { + self.connection_manager + .update_stream_progress(&id, next_version, size_bytes); + if next_version >= ending_version { + break; + } + + if !self.file_store_reader.can_serve(next_version).await { + info!(stream_id = id, "next_version {next_version} is larger or equal than file store version, terminate the stream."); + break; + } + + // TODO(grao): Pick a better channel size here, and consider doing parallel fetching + // inside the `get_transaction_batch` call based on the channel size. + let (tx, mut rx) = channel(1); + + let file_store_reader = self.file_store_reader.clone(); + tokio::spawn(async move { + file_store_reader + .get_transaction_batch( + next_version, + /*retries=*/ 3, + /*max_files=*/ None, + tx, + ) + .await; + }); + + let mut close_to_latest = false; + while let Some((transactions, batch_size_bytes)) = rx.recv().await { + next_version += transactions.len() as u64; + size_bytes += batch_size_bytes as u64; + let timestamp = transactions.first().unwrap().timestamp.unwrap(); + let timestamp_since_epoch = + Duration::new(timestamp.seconds as u64, timestamp.nanos as u32); + let now_since_epoch = SystemTime::now().duration_since(UNIX_EPOCH).unwrap(); + let delta = now_since_epoch.saturating_sub(timestamp_since_epoch); + + if delta < Duration::from_secs(60) { + close_to_latest = true; + } + let responses = transactions + .chunks(max_num_transactions_per_batch) + .map(|chunk| TransactionsResponse { + transactions: chunk.to_vec(), + chain_id: Some(self.chain_id), + }); + for response in responses { + if response_sender.send(Ok(response)).await.is_err() { + // NOTE: We are not recalculating the version and size_bytes for the stream + // progress since nobody cares about the accurate if client has dropped the + // connection. + info!(stream_id = id, "Client dropped."); + break 'out; + } + } + } + if close_to_latest { + info!( + stream_id = id, + "Stream is approaching to the latest transactions, terminate." + ); + break; + } + } + + self.connection_manager + .update_stream_progress(&id, next_version, size_bytes); + self.connection_manager.remove_active_stream(&id); + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs index 3add1bd47242e..d94f27926231a 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/lib.rs @@ -4,3 +4,7 @@ pub mod config; #[allow(dead_code)] mod connection_manager; +mod historical_data_service; +mod service; +#[cfg(test)] +mod test; diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/service.rs new file mode 100644 index 0000000000000..9154ce9f33968 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/service.rs @@ -0,0 +1,197 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::connection_manager::ConnectionManager; +use anyhow::Result; +use aptos_indexer_grpc_utils::timestamp_now_proto; +use aptos_protos::indexer::v1::{ + data_service_server::DataService, ping_data_service_response::Info, raw_data_server::RawData, + GetTransactionsRequest, HistoricalDataServiceInfo, LiveDataServiceInfo, PingDataServiceRequest, + PingDataServiceResponse, StreamInfo, TransactionsResponse, +}; +use futures::{Stream, StreamExt}; +use std::{pin::Pin, sync::Arc}; +use tokio::sync::mpsc::{channel, Sender}; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; + +type ResponseStream = Pin> + Send>>; + +// Note: We still allow starting both services together, so people don't have to rely on +// GrpcManager for routing, and it's also make it easier to run in testing environment. +pub struct DataServiceWrapperWrapper { + live_data_service: Option, + historical_data_service: Option, +} + +impl DataServiceWrapperWrapper { + pub fn new( + live_data_service: Option, + historical_data_service: Option, + ) -> Self { + Self { + live_data_service, + historical_data_service, + } + } +} + +#[tonic::async_trait] +impl DataService for DataServiceWrapperWrapper { + type GetTransactionsStream = ResponseStream; + + async fn get_transactions( + &self, + req: Request, + ) -> Result, Status> { + if let Some(live_data_service) = self.live_data_service.as_ref() { + if let Some(historical_data_service) = self.historical_data_service.as_ref() { + let request = req.into_inner(); + let mut stream = live_data_service + .get_transactions(Request::new(request)) + .await? + .into_inner(); + let peekable = std::pin::pin!(stream.as_mut().peekable()); + if let Some(Ok(_)) = peekable.peek().await { + return live_data_service + .get_transactions(Request::new(request)) + .await; + } + + historical_data_service + .get_transactions(Request::new(request)) + .await + } else { + live_data_service.get_transactions(req).await + } + } else if let Some(historical_data_service) = self.historical_data_service.as_ref() { + historical_data_service.get_transactions(req).await + } else { + unreachable!("Must have at least one of the data services enabled."); + } + } + + async fn ping( + &self, + req: Request, + ) -> Result, Status> { + let request = req.get_ref(); + if request.ping_live_data_service { + if let Some(live_data_service) = self.live_data_service.as_ref() { + live_data_service.ping(req).await + } else { + Err(Status::not_found("LiveDataService is not enabled.")) + } + } else if let Some(historical_data_service) = self.historical_data_service.as_ref() { + historical_data_service.ping(req).await + } else { + Err(Status::not_found("HistoricalDataService is not enabled.")) + } + } +} + +#[tonic::async_trait] +impl RawData for DataServiceWrapperWrapper { + type GetTransactionsStream = ResponseStream; + + async fn get_transactions( + &self, + req: Request, + ) -> Result, Status> { + DataService::get_transactions(self, req).await + } +} + +pub struct DataServiceWrapper { + connection_manager: Arc, + handler_tx: Sender<( + Request, + Sender>, + )>, + pub data_service_response_channel_size: usize, + is_live_data_service: bool, +} + +impl DataServiceWrapper { + pub fn new( + connection_manager: Arc, + handler_tx: Sender<( + Request, + Sender>, + )>, + data_service_response_channel_size: usize, + is_live_data_service: bool, + ) -> Self { + Self { + connection_manager, + handler_tx, + data_service_response_channel_size, + is_live_data_service, + } + } +} + +#[tonic::async_trait] +impl DataService for DataServiceWrapper { + type GetTransactionsStream = ResponseStream; + + async fn get_transactions( + &self, + req: Request, + ) -> Result, Status> { + let (tx, rx) = channel(self.data_service_response_channel_size); + self.handler_tx.send((req, tx)).await.unwrap(); + + let output_stream = ReceiverStream::new(rx); + let response = Response::new(Box::pin(output_stream) as Self::GetTransactionsStream); + + Ok(response) + } + + async fn ping( + &self, + req: Request, + ) -> Result, Status> { + let request = req.into_inner(); + if request.ping_live_data_service != self.is_live_data_service { + if request.ping_live_data_service { + return Err(Status::not_found("LiveDataService is not enabled.")); + } else { + return Err(Status::not_found("HistoricalDataService is not enabled.")); + } + } + + let known_latest_version = request.known_latest_version(); + self.connection_manager + .update_known_latest_version(known_latest_version); + let stream_info = StreamInfo { + active_streams: self.connection_manager.get_active_streams(), + }; + + let response = if self.is_live_data_service { + let info = LiveDataServiceInfo { + chain_id: self.connection_manager.chain_id(), + timestamp: Some(timestamp_now_proto()), + known_latest_version: Some(known_latest_version), + stream_info: Some(stream_info), + // TODO(grao): Populate min_servable_version. + min_servable_version: None, + }; + PingDataServiceResponse { + info: Some(Info::LiveDataServiceInfo(info)), + } + } else { + let info = HistoricalDataServiceInfo { + chain_id: self.connection_manager.chain_id(), + timestamp: Some(timestamp_now_proto()), + known_latest_version: Some(known_latest_version), + stream_info: Some(stream_info), + }; + PingDataServiceResponse { + info: Some(Info::HistoricalDataServiceInfo(info)), + } + }; + + Ok(Response::new(response)) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/test.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/test.rs new file mode 100644 index 0000000000000..04e94693150cf --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/test.rs @@ -0,0 +1,5 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +#[tokio::test(flavor = "multi_thread", worker_threads = 16)] +async fn test() {}