From beb5cd842196c30f37adafdb8306607fd89cd76a Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Fri, 10 Jan 2025 00:05:19 +0000 Subject: [PATCH] [Indexer-Grpc-V2] Add FileStoreUploader. --- .../indexer-grpc-manager/Cargo.toml | 1 + .../indexer-grpc-manager/src/config.rs | 1 + .../src/file_store_uploader.rs | 225 ++++++++++++++++++ .../indexer-grpc-manager/src/grpc_manager.rs | 33 +++ .../indexer-grpc-manager/src/lib.rs | 1 + .../indexer-grpc-manager/src/test.rs | 1 + 6 files changed, 262 insertions(+) create mode 100644 ecosystem/indexer-grpc/indexer-grpc-manager/src/file_store_uploader.rs diff --git a/ecosystem/indexer-grpc/indexer-grpc-manager/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-manager/Cargo.toml index d310ddbf6c7f5..03f42f2ba7a0a 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-manager/Cargo.toml +++ b/ecosystem/indexer-grpc/indexer-grpc-manager/Cargo.toml @@ -24,6 +24,7 @@ futures = { workspace = true } prost = { workspace = true } rand = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } tokio-scoped = { workspace = true } tonic = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-manager/src/config.rs b/ecosystem/indexer-grpc/indexer-grpc-manager/src/config.rs index af87bc7109818..d913cb12f674f 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-manager/src/config.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-manager/src/config.rs @@ -35,6 +35,7 @@ pub struct IndexerGrpcManagerConfig { pub(crate) self_advertised_address: GrpcAddress, pub(crate) grpc_manager_addresses: Vec, pub(crate) fullnode_addresses: Vec, + pub(crate) is_master: bool, } const fn default_cache_config() -> CacheConfig { diff --git a/ecosystem/indexer-grpc/indexer-grpc-manager/src/file_store_uploader.rs b/ecosystem/indexer-grpc/indexer-grpc-manager/src/file_store_uploader.rs new file mode 100644 index 0000000000000..eca96bde1a174 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-manager/src/file_store_uploader.rs @@ -0,0 +1,225 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::data_manager::DataManager; +use anyhow::Result; +use aptos_indexer_grpc_utils::{ + compression_util::{FileEntry, StorageFormat}, + config::IndexerGrpcFileStoreConfig, + file_store_operator_v2::{ + common::{BatchMetadata, FileStoreMetadata, IFileStore, METADATA_FILE_NAME}, + file_store_operator::FileStoreOperatorV2, + file_store_reader::FileStoreReader, + }, +}; +use aptos_protos::transaction::v1::Transaction; +use std::{path::PathBuf, sync::Arc, time::Duration}; +use tokio::{sync::mpsc::channel, time::Instant}; +use tracing::info; + +const NUM_TXNS_PER_FOLDER: u64 = 100000; +const MAX_SIZE_PER_FILE: usize = 20 * (1 << 20); +const MAX_NUM_FOLDERS_TO_CHECK_FOR_RECOVERY: usize = 5; + +pub(crate) struct FileStoreUploader { + chain_id: u64, + reader: FileStoreReader, + // TODO(grao): Change to IFileStoreReader when the trait_upcasting feature is in stable Rust. + writer: Arc, + + last_batch_metadata_update_time: Option, + last_metadata_update_time: Instant, +} + +impl FileStoreUploader { + pub(crate) async fn new( + chain_id: u64, + file_store_config: IndexerGrpcFileStoreConfig, + ) -> Result { + let file_store = file_store_config.create_filestore().await; + if !file_store.is_initialized().await { + info!( + chain_id = chain_id, + "FileStore is not initialized, initializing..." + ); + info!("Transactions per folder: {NUM_TXNS_PER_FOLDER}."); + let metadata = FileStoreMetadata { + chain_id, + num_transactions_per_folder: NUM_TXNS_PER_FOLDER, + version: 0, + }; + let raw_data = serde_json::to_vec(&metadata).unwrap(); + file_store + .save_raw_file(PathBuf::from(METADATA_FILE_NAME), raw_data) + .await + .unwrap_or_else(|e| panic!("Failed to initialize FileStore: {e:?}.")); + } + + let reader = FileStoreReader::new(chain_id, file_store.clone()).await; + // NOTE: We cannot change NUM_TXNS_PER_FOLDER without backfilling the data, put a check + // here to make sure we don't change it accidentally. + assert_eq!( + reader + .get_file_store_metadata() + .await + .unwrap() + .num_transactions_per_folder, + NUM_TXNS_PER_FOLDER + ); + + Ok(Self { + chain_id, + reader, + writer: file_store, + last_batch_metadata_update_time: None, + last_metadata_update_time: Instant::now(), + }) + } + + /// Recovers the batch metadata in memory buffer for the unfinished batch from file store. + async fn recover(&self) -> Result<(u64, BatchMetadata)> { + let mut version = self + .reader + .get_latest_version() + .await + .expect("Latest version must exist."); + let mut num_folders_checked = 0; + let mut buffered_batch_metadata_to_recover = BatchMetadata::default(); + while let Some(batch_metadata) = self.reader.get_batch_metadata(version).await { + let batch_last_version = batch_metadata.files.last().unwrap().last_version; + version = batch_last_version; + if version % NUM_TXNS_PER_FOLDER != 0 { + buffered_batch_metadata_to_recover = batch_metadata; + break; + } + num_folders_checked += 1; + if num_folders_checked >= MAX_NUM_FOLDERS_TO_CHECK_FOR_RECOVERY { + panic!( + "File store metadata is way behind batch metadata, data might be corrupted." + ); + } + } + + self.update_file_store_metadata(version).await?; + + Ok((version, buffered_batch_metadata_to_recover)) + } + + pub(crate) async fn start(&mut self, data_manager: Arc) -> Result<()> { + let (version, batch_metadata) = self.recover().await?; + + let mut file_store_operator = FileStoreOperatorV2::new( + MAX_SIZE_PER_FILE, + NUM_TXNS_PER_FOLDER, + version, + batch_metadata, + ) + .await; + tokio_scoped::scope(|s| { + let (tx, mut rx) = channel(5); + s.spawn(async move { + while let Some((transactions, batch_metadata, end_batch)) = rx.recv().await { + self.do_upload(transactions, batch_metadata, end_batch) + .await + .unwrap(); + } + }); + s.spawn(async move { + loop { + let transactions = data_manager + .get_transactions_from_cache( + file_store_operator.version(), + MAX_SIZE_PER_FILE, + /*update_file_store_version=*/ true, + ) + .await; + let len = transactions.len(); + for transaction in transactions { + file_store_operator + .buffer_and_maybe_dump_transactions_to_file(transaction, tx.clone()) + .await + .unwrap(); + } + if len == 0 { + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + }); + }); + + Ok(()) + } + + async fn do_upload( + &mut self, + transactions: Vec, + batch_metadata: BatchMetadata, + end_batch: bool, + ) -> Result<()> { + let first_version = transactions.first().unwrap().version; + let last_version = transactions.last().unwrap().version; + let data_file = + FileEntry::from_transactions(transactions, StorageFormat::Lz4CompressedProto); + let path = self.reader.get_path_for_version(first_version); + + info!("Dumping transactions [{first_version}, {last_version}] to file {path:?}."); + + self.writer + .save_raw_file(path, data_file.into_inner()) + .await?; + + let mut update_batch_metadata = false; + let max_update_frequency = self.writer.max_update_frequency(); + if self.last_batch_metadata_update_time.is_none() + || Instant::now() - self.last_batch_metadata_update_time.unwrap() + >= max_update_frequency + { + update_batch_metadata = true; + } else if end_batch { + update_batch_metadata = true; + tokio::time::sleep_until( + self.last_batch_metadata_update_time.unwrap() + max_update_frequency, + ) + .await; + } + + if !update_batch_metadata { + return Ok(()); + } + + let batch_metadata_path = self.reader.get_path_for_batch_metadata(first_version); + self.writer + .save_raw_file( + batch_metadata_path, + serde_json::to_vec(&batch_metadata).map_err(anyhow::Error::msg)?, + ) + .await?; + + if end_batch { + self.last_batch_metadata_update_time = None; + } else { + self.last_batch_metadata_update_time = Some(Instant::now()); + } + + if Instant::now() - self.last_metadata_update_time >= max_update_frequency { + self.update_file_store_metadata(last_version + 1).await?; + self.last_metadata_update_time = Instant::now(); + } + + Ok(()) + } + + /// Updates the file store metadata. + async fn update_file_store_metadata(&self, version: u64) -> Result<()> { + let metadata = FileStoreMetadata { + chain_id: self.chain_id, + num_transactions_per_folder: NUM_TXNS_PER_FOLDER, + version, + }; + + let raw_data = serde_json::to_vec(&metadata).map_err(anyhow::Error::msg)?; + self.writer + .save_raw_file(PathBuf::from(METADATA_FILE_NAME), raw_data) + .await + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-manager/src/grpc_manager.rs b/ecosystem/indexer-grpc/indexer-grpc-manager/src/grpc_manager.rs index 8b7026c26fe84..3e6865d6b9e13 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-manager/src/grpc_manager.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-manager/src/grpc_manager.rs @@ -4,12 +4,14 @@ use crate::{ config::{IndexerGrpcManagerConfig, ServiceConfig}, data_manager::DataManager, + file_store_uploader::FileStoreUploader, metadata_manager::MetadataManager, service::GrpcManagerService, }; use anyhow::Result; use aptos_protos::indexer::v1::grpc_manager_server::GrpcManagerServer; use std::{sync::Arc, time::Duration}; +use tokio::sync::Mutex; use tonic::{codec::CompressionEncoding, transport::Server}; use tracing::info; @@ -18,13 +20,30 @@ const HTTP2_PING_TIMEOUT_DURATION: Duration = Duration::from_secs(10); pub(crate) struct GrpcManager { chain_id: u64, + file_store_uploader: Mutex, metadata_manager: Arc, data_manager: Arc, + is_master: bool, } impl GrpcManager { pub(crate) async fn new(config: &IndexerGrpcManagerConfig) -> Self { let chain_id = config.chain_id; + let file_store_uploader = Mutex::new( + FileStoreUploader::new(chain_id, config.file_store_config.clone()) + .await + .unwrap_or_else(|e| { + panic!( + "Failed to create filestore uploader, config: {:?}, error: {e:?}", + config.file_store_config + ) + }), + ); + + info!( + chain_id = chain_id, + "FilestoreUploader is created, config: {:?}.", config.file_store_config + ); let metadata_manager = Arc::new(MetadataManager::new( chain_id, @@ -54,8 +73,10 @@ impl GrpcManager { Self { chain_id, + file_store_uploader, metadata_manager, data_manager, + is_master: config.is_master, } } @@ -77,6 +98,18 @@ impl GrpcManager { self.metadata_manager.start().await.unwrap(); }); s.spawn(async move { self.data_manager.start().await }); + if self.is_master { + s.spawn(async move { + self.file_store_uploader + .lock() + .await + .start(self.data_manager.clone()) + .await + .unwrap(); + }); + } else { + // TODO(grao): Start a task to periodically update the file store version. + } s.spawn(async move { info!("Starting GrpcManager at {}.", service_config.listen_address); server.serve(service_config.listen_address).await.unwrap(); diff --git a/ecosystem/indexer-grpc/indexer-grpc-manager/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-manager/src/lib.rs index ac85b6b1c7460..53e78d9a93baf 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-manager/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-manager/src/lib.rs @@ -3,6 +3,7 @@ pub mod config; mod data_manager; +mod file_store_uploader; mod grpc_manager; mod metadata_manager; mod service; diff --git a/ecosystem/indexer-grpc/indexer-grpc-manager/src/test.rs b/ecosystem/indexer-grpc/indexer-grpc-manager/src/test.rs index 8d2e782456454..90f28e46cf515 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-manager/src/test.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-manager/src/test.rs @@ -39,6 +39,7 @@ async fn test_run() { self_advertised_address: self_address.clone(), grpc_manager_addresses: vec![self_address], fullnode_addresses: vec![], + is_master: true, }; let task = tokio::spawn(async move {