From 42f5a779eada739e117a9d4873e838afb98ccb9f Mon Sep 17 00:00:00 2001 From: Guoteng Rao <3603304+grao1991@users.noreply.github.com> Date: Tue, 21 Jan 2025 08:23:03 +0000 Subject: [PATCH] [Indexer-Grpc-V2] Add FileStoreOperatorV2. --- .../src/compression_util.rs | 7 - .../src/file_store_operator_v2/common.rs | 53 +++++ .../file_store_operator.rs | 84 +++++++ .../file_store_reader.rs | 217 ++++++++++++++++++ .../src/file_store_operator_v2/gcs.rs | 140 +++++++++++ .../src/file_store_operator_v2/local.rs | 74 ++++++ .../src/file_store_operator_v2/mod.rs | 8 + .../indexer-grpc-utils/src/lib.rs | 1 + 8 files changed, 577 insertions(+), 7 deletions(-) create mode 100644 ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/common.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_operator.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_reader.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/gcs.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/local.rs create mode 100644 ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/mod.rs diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs index 07f528e6df124..f6f3a4bccc2a4 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/compression_util.rs @@ -197,13 +197,6 @@ impl FileEntry { .first() .expect("Cannot build empty file") .version; - let transactions_count = transactions.len(); - if transactions_count % FILE_ENTRY_TRANSACTION_COUNT as usize != 0 { - panic!("The number of transactions to upload has to be a multiple of FILE_ENTRY_TRANSACTION_COUNT.") - } - if starting_version % FILE_ENTRY_TRANSACTION_COUNT != 0 { - panic!("Starting version has to be a multiple of FILE_ENTRY_TRANSACTION_COUNT.") - } match storage_format { StorageFormat::Lz4CompressedProto => { let t = TransactionsInStorage { diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/common.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/common.rs new file mode 100644 index 0000000000000..1ac8b409b9e38 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/common.rs @@ -0,0 +1,53 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use anyhow::Result; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; +use tokio::time::Duration; + +pub const METADATA_FILE_NAME: &str = "metadata.json"; + +#[derive(Serialize, Deserialize, Debug)] +pub struct FileStoreMetadata { + pub chain_id: u64, + pub num_transactions_per_folder: u64, + pub version: u64, +} + +#[derive(Serialize, Deserialize, Default, Clone)] +pub struct FileMetadata { + // [first_version, last_version) + pub first_version: u64, + pub last_version: u64, + + pub size_bytes: usize, +} + +#[derive(Serialize, Deserialize, Default, Clone)] +pub struct BatchMetadata { + pub files: Vec, +} + +#[async_trait::async_trait] +pub trait IFileStoreReader: Sync + Send { + /// The tag of the store, for logging. + fn tag(&self) -> &str; + + /// Returns true if the file store is initialized (non-empty). + async fn is_initialized(&self) -> bool; + + async fn get_raw_file(&self, file_path: PathBuf) -> Result>>; +} + +#[async_trait::async_trait] +pub trait IFileStoreWriter: Sync + Send { + async fn save_raw_file(&self, file_path: PathBuf, data: Vec) -> Result<()>; + + fn max_update_frequency(&self) -> Duration; +} + +#[async_trait::async_trait] +pub trait IFileStore: IFileStoreReader + IFileStoreWriter {} + +impl IFileStore for T where T: IFileStoreReader + IFileStoreWriter {} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_operator.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_operator.rs new file mode 100644 index 0000000000000..21031901edc2c --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_operator.rs @@ -0,0 +1,84 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::file_store_operator_v2::common::{BatchMetadata, FileMetadata}; +use anyhow::Result; +use aptos_protos::transaction::v1::Transaction; +use prost::Message; +use tokio::sync::mpsc::Sender; + +pub struct FileStoreOperatorV2 { + max_size_per_file: usize, + num_txns_per_folder: u64, + + buffer: Vec, + buffer_size_in_bytes: usize, + buffer_batch_metadata: BatchMetadata, + version: u64, +} + +impl FileStoreOperatorV2 { + pub async fn new( + max_size_per_file: usize, + num_txns_per_folder: u64, + version: u64, + batch_metadata: BatchMetadata, + ) -> Self { + Self { + max_size_per_file, + num_txns_per_folder, + buffer: vec![], + buffer_size_in_bytes: 0, + buffer_batch_metadata: batch_metadata, + version, + } + } + + pub fn version(&self) -> u64 { + self.version + } + + /// Buffers a transaction, if the size of the buffer exceeds the threshold, or the transaction + /// is the last one in the batch, dump the buffer to the file store. + pub async fn buffer_and_maybe_dump_transactions_to_file( + &mut self, + transaction: Transaction, + tx: Sender<(Vec, BatchMetadata, bool)>, + ) -> Result<()> { + let end_batch = (transaction.version + 1) % self.num_txns_per_folder == 0; + let size_bytes = transaction.encoded_len(); + self.buffer.push(transaction); + self.buffer_size_in_bytes += size_bytes; + self.version += 1; + if self.buffer_size_in_bytes >= self.max_size_per_file || end_batch { + self.dump_transactions_to_file(end_batch, tx).await?; + } + + Ok(()) + } + + async fn dump_transactions_to_file( + &mut self, + end_batch: bool, + tx: Sender<(Vec, BatchMetadata, bool)>, + ) -> Result<()> { + let transactions = std::mem::take(&mut self.buffer); + let first_version = transactions.first().unwrap().version; + self.buffer_batch_metadata.files.push(FileMetadata { + first_version, + last_version: first_version + transactions.len() as u64, + size_bytes: self.buffer_size_in_bytes, + }); + self.buffer_size_in_bytes = 0; + + tx.send((transactions, self.buffer_batch_metadata.clone(), end_batch)) + .await + .map_err(anyhow::Error::msg)?; + + if end_batch { + self.buffer_batch_metadata = BatchMetadata::default(); + } + + Ok(()) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_reader.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_reader.rs new file mode 100644 index 0000000000000..bf10a6b5e4302 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_reader.rs @@ -0,0 +1,217 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::{ + compression_util::{FileEntry, StorageFormat}, + file_store_operator_v2::common::{ + BatchMetadata, FileStoreMetadata, IFileStore, METADATA_FILE_NAME, + }, +}; +use anyhow::Result; +use aptos_protos::transaction::v1::Transaction; +use prost::Message; +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, +}; +use tokio::sync::mpsc::Sender; +use tracing::{error, trace}; + +pub struct FileStoreReader { + chain_id: u64, + // TODO(grao): Change to IFileStoreReader when the trait_upcasting feature is in stable Rust. + reader: Arc, + num_transactions_per_folder: u64, + cached_file_store_version: AtomicU64, +} + +impl FileStoreReader { + pub async fn new(chain_id: u64, reader: Arc) -> Self { + assert!(reader.is_initialized().await); + + let mut myself = Self { + chain_id, + reader, + num_transactions_per_folder: 0, + cached_file_store_version: AtomicU64::new(0), + }; + + let metadata = Self::get_file_store_metadata(&myself) + .await + .expect("Failed to fetch num_transactions_per_folder."); + + assert!(chain_id == metadata.chain_id); + + myself.num_transactions_per_folder = metadata.num_transactions_per_folder; + + myself + } + + /// Returns the file path for the given version. Requires the version to be the first version + /// in the file. + pub fn get_path_for_version(&self, version: u64) -> PathBuf { + let mut buf = self.get_folder_name(version); + buf.push(format!("{}", version)); + buf + } + + /// Returns the metadata file path for the given version. + pub fn get_path_for_batch_metadata(&self, version: u64) -> PathBuf { + let folder = self.get_folder_name(version); + let mut batch_metadata_path = PathBuf::new(); + batch_metadata_path.push(folder); + batch_metadata_path.push(METADATA_FILE_NAME); + batch_metadata_path + } + + /// Returns transactions starting from the version, up to the end of the batch. Only + /// `max_files` will be read if provided. + pub async fn get_transaction_batch( + &self, + version: u64, + retries: u8, + max_files: Option, + tx: Sender<(Vec, usize)>, + ) { + trace!( + "Getting transactions from file store, version: {version}, max_files: {max_files:?}." + ); + let batch_metadata = self.get_batch_metadata(version).await; + if batch_metadata.is_none() { + // TODO(grao): This is unexpected, should only happen when data is corrupted. Consider + // make it panic!. + error!("Failed to get the batch metadata, unable to serve the request."); + return; + } + + let batch_metadata = batch_metadata.unwrap(); + + let mut file_index = None; + for (i, file_metadata) in batch_metadata.files.iter().enumerate().rev() { + let file_first_version = file_metadata.first_version; + if file_first_version <= version { + file_index = Some(i); + break; + } + } + + let file_index = + file_index.unwrap_or_else(|| panic!("Must find file_index for version: {version}.")); + let mut end_file_index = batch_metadata.files.len(); + if let Some(max_files) = max_files { + end_file_index = end_file_index.min(file_index.saturating_add(max_files)); + } + + for i in file_index..end_file_index { + let current_version = batch_metadata.files[i].first_version; + let mut size_bytes = batch_metadata.files[i].size_bytes; + let transactions = self + .get_transaction_file_at_version(current_version, retries) + .await; + if let Ok(mut transactions) = transactions { + let num_to_skip = version.saturating_sub(current_version) as usize; + let result = if num_to_skip > 0 { + let transactions_to_return = transactions.split_off(num_to_skip); + for transaction in transactions { + size_bytes -= transaction.encoded_len(); + } + (transactions_to_return, size_bytes) + } else { + (transactions, size_bytes) + }; + trace!("Got {} transactions from file store to send, size: {size_bytes}, first_version: {:?}", result.0.len(), result.0.first().map(|t| t.version)); + if tx.send(result).await.is_err() { + break; + } + } else { + error!("Got error from file store: {:?}.", transactions); + break; + } + } + } + + /// Returns file store metadata, or None if not found. + pub async fn get_file_store_metadata(&self) -> Option { + self.reader + .get_raw_file(PathBuf::from(METADATA_FILE_NAME)) + .await + .expect("Failed to get file store metadata.") + .map(|data| serde_json::from_slice(&data).expect("Metadata JSON is invalid.")) + } + + /// Returns the batch matadata for the batch that includes the provided version, or None if not + /// found. + pub async fn get_batch_metadata(&self, version: u64) -> Option { + self.reader + .get_raw_file(self.get_path_for_batch_metadata(version)) + .await + .expect("Failed to get batch metadata.") + .map(|data| serde_json::from_slice(&data).expect("Batch metadata JSON is invalid.")) + } + + /// Returns the latest_version (next_version) that is going to be process by file store, or + /// None if the metadata file doesn't exist. + pub async fn get_latest_version(&self) -> Option { + let metadata = self.get_file_store_metadata().await; + let latest_version = metadata.map(|metadata| { + if metadata.chain_id != self.chain_id { + panic!("Wrong chain_id."); + } + metadata.version + }); + + if let Some(version) = latest_version { + self.cached_file_store_version + .fetch_max(version, Ordering::SeqCst); + } + + latest_version + } + + /// Returns true iff the transaction at version can be served (i.e. less than file store + /// version). + pub async fn can_serve(&self, version: u64) -> bool { + if self.cached_file_store_version.load(Ordering::SeqCst) > version { + return true; + } + + self.get_latest_version().await.unwrap() > version + } + + fn get_folder_name(&self, version: u64) -> PathBuf { + let mut buf = PathBuf::new(); + buf.push(format!("{}", version / self.num_transactions_per_folder)); + buf + } + + async fn get_transaction_file_at_version( + &self, + version: u64, + retries: u8, + ) -> Result> { + let mut retries = retries; + let bytes = loop { + let path = self.get_path_for_version(version); + match self.reader.get_raw_file(path.clone()).await { + Ok(bytes) => break bytes.unwrap_or_else(|| panic!("File should exist: {path:?}.")), + Err(err) => { + if retries == 0 { + return Err(err); + } + retries -= 1; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + }, + } + }; + + let transactions_in_storage = tokio::task::spawn_blocking(move || { + FileEntry::new(bytes, StorageFormat::Lz4CompressedProto).into_transactions_in_storage() + }) + .await?; + + Ok(transactions_in_storage.transactions) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/gcs.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/gcs.rs new file mode 100644 index 0000000000000..3722b07cd2dbc --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/gcs.rs @@ -0,0 +1,140 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::file_store_operator_v2::common::{IFileStoreReader, IFileStoreWriter}; +use anyhow::{bail, Result}; +use cloud_storage::{Bucket, ListRequest, Object}; +use futures::StreamExt; +use std::{env, path::PathBuf}; +use tokio::time::Duration; +use tracing::{info, trace}; + +const JSON_FILE_TYPE: &str = "application/json"; +// The environment variable to set the service account path. +const SERVICE_ACCOUNT_ENV_VAR: &str = "SERVICE_ACCOUNT"; + +pub struct GcsFileStore { + bucket_name: String, + bucket_sub_dir: Option, +} + +impl GcsFileStore { + pub async fn new( + bucket_name: String, + bucket_sub_dir: Option, + service_account_path: String, + ) -> Self { + env::set_var(SERVICE_ACCOUNT_ENV_VAR, service_account_path); + + info!( + bucket_name = bucket_name, + "Verifying the bucket exists for GcsFileStore." + ); + + Bucket::read(&bucket_name) + .await + .expect("Failed to read bucket."); + + info!( + bucket_name = bucket_name, + "Bucket exists, GcsFileStore is created." + ); + Self { + bucket_name, + bucket_sub_dir, + } + } + + fn get_path(&self, file_path: PathBuf) -> String { + if let Some(sub_dir) = &self.bucket_sub_dir { + let mut path = sub_dir.clone(); + path.push(file_path); + path.to_string_lossy().into_owned() + } else { + file_path.to_string_lossy().into_owned() + } + } +} + +#[async_trait::async_trait] +impl IFileStoreReader for GcsFileStore { + fn tag(&self) -> &str { + "GCS" + } + + async fn is_initialized(&self) -> bool { + let request = ListRequest { + max_results: Some(1), + prefix: self + .bucket_sub_dir + .clone() + .map(|p| p.to_string_lossy().into_owned()), + ..Default::default() + }; + + let response = Object::list(&self.bucket_name, request) + .await + .unwrap_or_else(|e| { + panic!( + "Failed to list bucket. Bucket name: {}, sub_dir: {:?}, error: {e:?}.", + self.bucket_name, self.bucket_sub_dir + ) + }) + .boxed() + .next() + .await + .expect("Expect response.") + .unwrap_or_else(|e| panic!("Got error in response: {e:?}.")); + + !response.prefixes.is_empty() || !response.items.is_empty() + } + + async fn get_raw_file(&self, file_path: PathBuf) -> Result>> { + let path = self.get_path(file_path); + trace!( + "Downloading object at {}/{}.", + self.bucket_name, + path.as_str() + ); + match Object::download(&self.bucket_name, path.as_str()).await { + Ok(file) => Ok(Some(file)), + Err(cloud_storage::Error::Other(err)) => { + if err.contains("No such object: ") { + Ok(None) + } else { + bail!("[Indexer File] Error happens when downloading file at {path:?}. {err}",); + } + }, + Err(err) => { + bail!("[Indexer File] Error happens when downloading file at {path:?}. {err}"); + }, + } + } +} + +#[async_trait::async_trait] +impl IFileStoreWriter for GcsFileStore { + async fn save_raw_file(&self, file_path: PathBuf, data: Vec) -> Result<()> { + let path = self.get_path(file_path); + trace!( + "Uploading object to {}/{}.", + self.bucket_name, + path.as_str() + ); + Object::create( + self.bucket_name.as_str(), + data, + path.as_str(), + JSON_FILE_TYPE, + ) + .await + .map_err(anyhow::Error::msg)?; + + Ok(()) + } + + fn max_update_frequency(&self) -> Duration { + // NOTE: GCS has rate limiting on per object update rate at once per second. + Duration::from_secs_f32(1.5) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/local.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/local.rs new file mode 100644 index 0000000000000..b1f96087b88b5 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/local.rs @@ -0,0 +1,74 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +use crate::file_store_operator_v2::common::{IFileStoreReader, IFileStoreWriter}; +use anyhow::{bail, Result}; +use std::path::PathBuf; +use tokio::time::Duration; +use tracing::info; + +#[derive(Clone)] +pub struct LocalFileStore { + path: PathBuf, +} + +impl LocalFileStore { + pub fn new(path: PathBuf) -> Self { + info!( + path = path.to_str().unwrap(), + "Verifying the path exists for LocalFileStore." + ); + if !path.exists() { + panic!("LocalFileStore path does not exist."); + } + Self { path } + } +} + +#[async_trait::async_trait] +impl IFileStoreReader for LocalFileStore { + fn tag(&self) -> &str { + "LOCAL" + } + + async fn is_initialized(&self) -> bool { + tokio::fs::read_dir(&self.path) + .await + .unwrap() + .next_entry() + .await + .unwrap() + .is_some() + } + + async fn get_raw_file(&self, file_path: PathBuf) -> Result>> { + let file_path = self.path.join(file_path); + match tokio::fs::read(&file_path).await { + Ok(file) => Ok(Some(file)), + Err(err) => { + if err.kind() == std::io::ErrorKind::NotFound { + Ok(None) + } else { + bail!("[Indexer File] Error happens when getting file at {file_path:?}. {err}"); + } + }, + } + } +} + +#[async_trait::async_trait] +impl IFileStoreWriter for LocalFileStore { + async fn save_raw_file(&self, file_path: PathBuf, data: Vec) -> Result<()> { + let file_path = self.path.join(file_path); + if let Some(parent) = file_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + tokio::fs::write(file_path, data) + .await + .map_err(anyhow::Error::msg) + } + + fn max_update_frequency(&self) -> Duration { + Duration::from_secs(0) + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/mod.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/mod.rs new file mode 100644 index 0000000000000..abef23cc20f92 --- /dev/null +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/mod.rs @@ -0,0 +1,8 @@ +// Copyright © Aptos Foundation +// SPDX-License-Identifier: Apache-2.0 + +pub mod common; +pub mod file_store_operator; +pub mod file_store_reader; +pub mod gcs; +pub mod local; diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs index 8ae21f30420dd..a067f25d8c32e 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs @@ -7,6 +7,7 @@ pub mod config; pub mod constants; pub mod counters; pub mod file_store_operator; +pub mod file_store_operator_v2; pub mod in_memory_cache; pub mod types;