Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Indexer-Grpc-V2] Add FileStoreUploader. #15724

Merged
merged 1 commit into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ futures = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
tokio-scoped = { workspace = true }
tonic = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct IndexerGrpcManagerConfig {
pub(crate) self_advertised_address: GrpcAddress,
pub(crate) grpc_manager_addresses: Vec<GrpcAddress>,
pub(crate) fullnode_addresses: Vec<GrpcAddress>,
pub(crate) is_master: bool,
}

const fn default_cache_config() -> CacheConfig {
Expand Down
225 changes: 225 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/file_store_uploader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::data_manager::DataManager;
use anyhow::Result;
use aptos_indexer_grpc_utils::{
compression_util::{FileEntry, StorageFormat},
config::IndexerGrpcFileStoreConfig,
file_store_operator_v2::{
common::{BatchMetadata, FileStoreMetadata, IFileStore, METADATA_FILE_NAME},
file_store_operator::FileStoreOperatorV2,
file_store_reader::FileStoreReader,
},
};
use aptos_protos::transaction::v1::Transaction;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::{sync::mpsc::channel, time::Instant};
use tracing::info;

const NUM_TXNS_PER_FOLDER: u64 = 100000;
const MAX_SIZE_PER_FILE: usize = 20 * (1 << 20);
const MAX_NUM_FOLDERS_TO_CHECK_FOR_RECOVERY: usize = 5;

pub(crate) struct FileStoreUploader {
chain_id: u64,
reader: FileStoreReader,
// TODO(grao): Change to IFileStoreReader when the trait_upcasting feature is in stable Rust.
writer: Arc<dyn IFileStore>,

last_batch_metadata_update_time: Option<Instant>,
last_metadata_update_time: Instant,
}

impl FileStoreUploader {
pub(crate) async fn new(
chain_id: u64,
file_store_config: IndexerGrpcFileStoreConfig,
) -> Result<Self> {
let file_store = file_store_config.create_filestore().await;
if !file_store.is_initialized().await {
info!(
chain_id = chain_id,
"FileStore is not initialized, initializing..."
);
info!("Transactions per folder: {NUM_TXNS_PER_FOLDER}.");
let metadata = FileStoreMetadata {
chain_id,
num_transactions_per_folder: NUM_TXNS_PER_FOLDER,
version: 0,
};
let raw_data = serde_json::to_vec(&metadata).unwrap();
file_store
.save_raw_file(PathBuf::from(METADATA_FILE_NAME), raw_data)
.await
.unwrap_or_else(|e| panic!("Failed to initialize FileStore: {e:?}."));
}

let reader = FileStoreReader::new(chain_id, file_store.clone()).await;
// NOTE: We cannot change NUM_TXNS_PER_FOLDER without backfilling the data, put a check
// here to make sure we don't change it accidentally.
assert_eq!(
reader
.get_file_store_metadata()
.await
.unwrap()
.num_transactions_per_folder,
NUM_TXNS_PER_FOLDER
);

Ok(Self {
chain_id,
reader,
writer: file_store,
last_batch_metadata_update_time: None,
last_metadata_update_time: Instant::now(),
})
}

/// Recovers the batch metadata in memory buffer for the unfinished batch from file store.
async fn recover(&self) -> Result<(u64, BatchMetadata)> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add some comments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

let mut version = self
.reader
.get_latest_version()
.await
.expect("Latest version must exist.");
let mut num_folders_checked = 0;
let mut buffered_batch_metadata_to_recover = BatchMetadata::default();
while let Some(batch_metadata) = self.reader.get_batch_metadata(version).await {
let batch_last_version = batch_metadata.files.last().unwrap().last_version;
version = batch_last_version;
if version % NUM_TXNS_PER_FOLDER != 0 {
buffered_batch_metadata_to_recover = batch_metadata;
break;
}
num_folders_checked += 1;
if num_folders_checked >= MAX_NUM_FOLDERS_TO_CHECK_FOR_RECOVERY {
panic!(
"File store metadata is way behind batch metadata, data might be corrupted."
);
}
}

self.update_file_store_metadata(version).await?;

Ok((version, buffered_batch_metadata_to_recover))
}

pub(crate) async fn start(&mut self, data_manager: Arc<DataManager>) -> Result<()> {
let (version, batch_metadata) = self.recover().await?;

let mut file_store_operator = FileStoreOperatorV2::new(
MAX_SIZE_PER_FILE,
NUM_TXNS_PER_FOLDER,
version,
batch_metadata,
)
.await;
tokio_scoped::scope(|s| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like tokio_scoped is no longer maintained; maybe async_scoped?

let (tx, mut rx) = channel(5);
s.spawn(async move {
while let Some((transactions, batch_metadata, end_batch)) = rx.recv().await {
self.do_upload(transactions, batch_metadata, end_batch)
.await
.unwrap();
}
});
s.spawn(async move {
loop {
let transactions = data_manager
.get_transactions_from_cache(
file_store_operator.version(),
MAX_SIZE_PER_FILE,
/*update_file_store_version=*/ true,
)
.await;
let len = transactions.len();
for transaction in transactions {
file_store_operator
.buffer_and_maybe_dump_transactions_to_file(transaction, tx.clone())
.await
.unwrap();
}
if len == 0 {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
});
});

Ok(())
}
Comment on lines +108 to +151
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The start() method spawns infinite tasks within tokio_scoped::scope but returns immediately, which causes the tasks to be terminated prematurely. Since these tasks need to run continuously, they should either:

  1. Use tokio::spawn instead of scoped tasks, or
  2. Have the method wait for a shutdown signal before returning

This will ensure the background processing continues running as intended rather than being cleaned up when the scope exits.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.


async fn do_upload(
&mut self,
transactions: Vec<Transaction>,
batch_metadata: BatchMetadata,
end_batch: bool,
) -> Result<()> {
let first_version = transactions.first().unwrap().version;
let last_version = transactions.last().unwrap().version;
let data_file =
FileEntry::from_transactions(transactions, StorageFormat::Lz4CompressedProto);
let path = self.reader.get_path_for_version(first_version);

info!("Dumping transactions [{first_version}, {last_version}] to file {path:?}.");

self.writer
.save_raw_file(path, data_file.into_inner())
.await?;

let mut update_batch_metadata = false;
let max_update_frequency = self.writer.max_update_frequency();
if self.last_batch_metadata_update_time.is_none()
|| Instant::now() - self.last_batch_metadata_update_time.unwrap()
>= max_update_frequency
{
update_batch_metadata = true;
} else if end_batch {
update_batch_metadata = true;
tokio::time::sleep_until(
self.last_batch_metadata_update_time.unwrap() + max_update_frequency,
)
.await;
}

if !update_batch_metadata {
return Ok(());
}

let batch_metadata_path = self.reader.get_path_for_batch_metadata(first_version);
self.writer
.save_raw_file(
batch_metadata_path,
serde_json::to_vec(&batch_metadata).map_err(anyhow::Error::msg)?,
)
.await?;

if end_batch {
self.last_batch_metadata_update_time = None;
} else {
self.last_batch_metadata_update_time = Some(Instant::now());
}

if Instant::now() - self.last_metadata_update_time >= max_update_frequency {
self.update_file_store_metadata(last_version + 1).await?;
self.last_metadata_update_time = Instant::now();
}

Ok(())
}

/// Updates the file store metadata.
async fn update_file_store_metadata(&self, version: u64) -> Result<()> {
let metadata = FileStoreMetadata {
chain_id: self.chain_id,
num_transactions_per_folder: NUM_TXNS_PER_FOLDER,
version,
};

let raw_data = serde_json::to_vec(&metadata).map_err(anyhow::Error::msg)?;
self.writer
.save_raw_file(PathBuf::from(METADATA_FILE_NAME), raw_data)
.await
}
}
33 changes: 33 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/grpc_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
use crate::{
config::{IndexerGrpcManagerConfig, ServiceConfig},
data_manager::DataManager,
file_store_uploader::FileStoreUploader,
metadata_manager::MetadataManager,
service::GrpcManagerService,
};
use anyhow::Result;
use aptos_protos::indexer::v1::grpc_manager_server::GrpcManagerServer;
use std::{sync::Arc, time::Duration};
use tokio::sync::Mutex;
use tonic::{codec::CompressionEncoding, transport::Server};
use tracing::info;

Expand All @@ -18,13 +20,30 @@ const HTTP2_PING_TIMEOUT_DURATION: Duration = Duration::from_secs(10);

pub(crate) struct GrpcManager {
chain_id: u64,
file_store_uploader: Mutex<FileStoreUploader>,
metadata_manager: Arc<MetadataManager>,
data_manager: Arc<DataManager>,
is_master: bool,
}

impl GrpcManager {
pub(crate) async fn new(config: &IndexerGrpcManagerConfig) -> Self {
let chain_id = config.chain_id;
let file_store_uploader = Mutex::new(
FileStoreUploader::new(chain_id, config.file_store_config.clone())
.await
.unwrap_or_else(|e| {
panic!(
"Failed to create filestore uploader, config: {:?}, error: {e:?}",
config.file_store_config
)
}),
);

info!(
chain_id = chain_id,
"FilestoreUploader is created, config: {:?}.", config.file_store_config
);

let metadata_manager = Arc::new(MetadataManager::new(
chain_id,
Expand Down Expand Up @@ -54,8 +73,10 @@ impl GrpcManager {

Self {
chain_id,
file_store_uploader,
metadata_manager,
data_manager,
is_master: config.is_master,
}
}

Expand All @@ -77,6 +98,18 @@ impl GrpcManager {
self.metadata_manager.start().await.unwrap();
});
s.spawn(async move { self.data_manager.start().await });
if self.is_master {
s.spawn(async move {
self.file_store_uploader
.lock()
.await
.start(self.data_manager.clone())
.await
.unwrap();
});
} else {
// TODO(grao): Start a task to periodically update the file store version.
}
s.spawn(async move {
info!("Starting GrpcManager at {}.", service_config.listen_address);
server.serve(service_config.listen_address).await.unwrap();
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod config;
mod data_manager;
mod file_store_uploader;
mod grpc_manager;
mod metadata_manager;
mod service;
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-manager/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ async fn test_run() {
self_advertised_address: self_address.clone(),
grpc_manager_addresses: vec![self_address],
fullnode_addresses: vec![],
is_master: true,
};

let task = tokio::spawn(async move {
Expand Down
Loading