Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass instant from aptosdb for calculating latency metric #15678

Merged
merged 1 commit into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion api/src/tests/transactions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ async fn test_get_transaction_by_hash() {
async fn test_get_transaction_by_hash_with_delayed_internal_indexer() {
let mut context = new_test_context_with_sharding_and_delayed_internal_indexer(
current_function_name!(),
Some(1),
Some(2),
);

let mut account = context.gen_account();
Expand Down
10 changes: 8 additions & 2 deletions api/test-context/src/test_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ use bytes::Bytes;
use hyper::{HeaderMap, Response};
use rand::SeedableRng;
use serde_json::{json, Value};
use std::{boxed::Box, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{
boxed::Box,
net::SocketAddr,
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::watch::channel;
use warp::{http::header::CONTENT_TYPE, Filter, Rejection, Reply};
use warp_reverse_proxy::reverse_proxy_filter;
Expand Down Expand Up @@ -132,7 +138,7 @@ pub fn new_test_context_inner(
let (root_key, genesis, genesis_waypoint, validators) = builder.build(&mut rng).unwrap();
let (validator_identity, _, _, _) = validators[0].get_key_objects(None).unwrap();
let validator_owner = validator_identity.account_address.unwrap();
let (sender, recver) = channel::<Version>(0);
let (sender, recver) = channel::<(Instant, Version)>((Instant::now(), 0 as Version));
let (db, db_rw) = if use_db_with_indexer {
let mut aptos_db = AptosDB::new_for_test_with_indexer(
&tmp_dir,
Expand Down
6 changes: 4 additions & 2 deletions aptos-node/src/services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ use aptos_peer_monitoring_service_server::{
use aptos_peer_monitoring_service_types::PeerMonitoringServiceMessage;
use aptos_storage_interface::{DbReader, DbReaderWriter};
use aptos_time_service::TimeService;
use aptos_types::{chain_id::ChainId, indexer::indexer_db_reader::IndexerReader};
use aptos_types::{
chain_id::ChainId, indexer::indexer_db_reader::IndexerReader, transaction::Version,
};
use aptos_validator_transaction_pool::VTxnPoolState;
use futures::channel::{mpsc, mpsc::Sender, oneshot};
use std::{sync::Arc, time::Instant};
Expand All @@ -51,7 +53,7 @@ pub fn bootstrap_api_and_indexer(
db_rw: DbReaderWriter,
chain_id: ChainId,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<u64>>,
update_receiver: Option<WatchReceiver<(Instant, Version)>>,
api_port_tx: Option<oneshot::Sender<u16>>,
indexer_grpc_port_tx: Option<oneshot::Sender<u16>>,
) -> anyhow::Result<(
Expand Down
7 changes: 3 additions & 4 deletions aptos-node/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use tokio::{
runtime::Runtime,
sync::watch::{channel, Receiver as WatchReceiver},
};

pub(crate) fn maybe_apply_genesis(
db_rw: &DbReaderWriter,
node_config: &NodeConfig,
Expand Down Expand Up @@ -51,11 +50,11 @@ pub(crate) fn bootstrap_db(
DbReaderWriter,
Option<Runtime>,
Option<InternalIndexerDB>,
Option<WatchReceiver<u64>>,
Option<WatchReceiver<(Instant, Version)>>,
)> {
let internal_indexer_db = InternalIndexerDBService::get_indexer_db(node_config);
let (update_sender, update_receiver) = if internal_indexer_db.is_some() {
let (sender, receiver) = channel::<u64>(0);
let (sender, receiver) = channel::<(Instant, Version)>((Instant::now(), 0 as Version));
(Some(sender), Some(receiver))
} else {
(None, None)
Expand Down Expand Up @@ -177,7 +176,7 @@ pub fn initialize_database_and_checkpoints(
Option<Runtime>,
Waypoint,
Option<InternalIndexerDB>,
Option<WatchReceiver<Version>>,
Option<WatchReceiver<(Instant, Version)>>,
)> {
// If required, create RocksDB checkpoints and change the working directory.
// This is test-only.
Expand Down
2 changes: 2 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ aptos-indexer-grpc-fullnode = { workspace = true }
aptos-indexer-grpc-utils = { workspace = true }
aptos-logger = { workspace = true }
aptos-mempool = { workspace = true }
aptos-metrics-core = { workspace = true }
aptos-runtimes = { workspace = true }
aptos-storage-interface = { workspace = true }
aptos-types = { workspace = true }
Expand All @@ -29,6 +30,7 @@ futures = { workspace = true }
google-cloud-storage = { workspace = true }
hyper = { workspace = true }
itertools = { workspace = true }
once_cell = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tar = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::metrics::INDEXER_DB_LATENCY;
use anyhow::Result;
use aptos_config::config::{internal_indexer_db_config::InternalIndexerDBConfig, NodeConfig};
use aptos_db_indexer::{
Expand All @@ -9,13 +10,12 @@ use aptos_db_indexer::{
indexer_reader::IndexerReaders,
};
use aptos_indexer_grpc_utils::counters::{log_grpc_step, IndexerGrpcStep};
use aptos_logger::info;
use aptos_storage_interface::DbReader;
use aptos_types::{indexer::indexer_db_reader::IndexerReader, transaction::Version};
use std::{
path::{Path, PathBuf},
sync::Arc,
time::Duration,
time::Instant,
};
use tokio::{runtime::Handle, sync::watch::Receiver as WatchReceiver};

Expand All @@ -24,14 +24,14 @@ const INTERNAL_INDEXER_DB: &str = "internal_indexer_db";

pub struct InternalIndexerDBService {
pub db_indexer: Arc<DBIndexer>,
pub update_receiver: WatchReceiver<Version>,
pub update_receiver: WatchReceiver<(Instant, Version)>,
}

impl InternalIndexerDBService {
pub fn new(
db_reader: Arc<dyn DbReader>,
internal_indexer_db: InternalIndexerDB,
update_receiver: WatchReceiver<Version>,
update_receiver: WatchReceiver<(Instant, Version)>,
) -> Self {
let internal_db_indexer = Arc::new(DBIndexer::new(internal_indexer_db, db_reader));
Self {
Expand Down Expand Up @@ -166,31 +166,30 @@ impl InternalIndexerDBService {

pub async fn run(&mut self, node_config: &NodeConfig) -> Result<()> {
let mut start_version = self.get_start_version(node_config).await?;
let mut target_version = self.db_indexer.main_db_reader.ensure_synced_version()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will error out with an empty db (before genesis is put in), is that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, internal indexer is supposed to start after main db bootstrapped.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay..

let mut step_timer = std::time::Instant::now();

loop {
let start_time: std::time::Instant = std::time::Instant::now();
let next_version = self.db_indexer.process_a_batch(start_version)?;

if next_version == start_version {
if let Ok(recv_res) =
tokio::time::timeout(Duration::from_millis(100), self.update_receiver.changed())
.await
{
if recv_res.is_err() {
info!("update sender is dropped");
return Ok(());
}
if target_version <= start_version {
match self.update_receiver.changed().await {
Ok(_) => {
(step_timer, target_version) = *self.update_receiver.borrow();
},
Err(e) => {
panic!("Failed to get update from update_receiver: {}", e);
},
Comment on lines +178 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how this thread knows the main db has quit, right? In that case we should return Ok instead of suicide here?

And I realized even in the previous logic, this thread doesn't have a chance to quit until hitting the target version? It can be an issue when the first time the indexer is enabled? (granted we usually quit the maindb by killing the whole process, it'd be better if we deal with this case gracefully, if not too complicated.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. the DBindexer should have Arc of main_db. I expect the err should never be caused by main_db quit. (The main db quits after db indexer being dropped). Thus, I think it should panic if this unexpected error occurs.
  2. if the process is killed while the indexer is in the middle of processing a large amount of data. the thread is terminated the same way as other components reading main_db eg: API is reading transactions using storage interface and the node is killed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ugh, in that case the indexer loop never quits? That should be an issue in tests?

Shall we implement graceful quitting (separately)? @grao1991 ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree.

Copy link
Contributor Author

@areshand areshand Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a method called "run_with_end_version" for testing

}
continue;
};
}
let next_version = self.db_indexer.process(start_version, target_version)?;
INDEXER_DB_LATENCY.set(step_timer.elapsed().as_millis() as i64);
log_grpc_step(
SERVICE_TYPE,
IndexerGrpcStep::InternalIndexerDBProcessed,
Some(start_version as i64),
Some(next_version as i64),
None,
None,
Some(start_time.elapsed().as_secs_f64()),
Some(step_timer.elapsed().as_secs_f64()),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue with yesterday, that before it's caught up you already logged the latency, which is not accurate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each loop now is blocked on notification of write to main db. Previously, each loop is only a batch of all updates. so this should reflect the latency.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see.

None,
Some((next_version - start_version) as i64),
None,
Expand All @@ -205,18 +204,14 @@ impl InternalIndexerDBService {
node_config: &NodeConfig,
end_version: Option<Version>,
) -> Result<()> {
let mut start_version = self.get_start_version(node_config).await?;
while start_version <= end_version.unwrap_or(std::u64::MAX) {
let next_version = self.db_indexer.process_a_batch(start_version)?;
if next_version == start_version {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
continue;
}
start_version = next_version;
let start_version = self.get_start_version(node_config).await?;
let end_version = end_version.unwrap_or(std::u64::MAX);
let mut next_version = start_version;
while next_version < end_version {
next_version = self.db_indexer.process(start_version, end_version)?;
// We shouldn't stop the internal indexer so that internal indexer can catch up with the main DB
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
// We should never stop the internal indexer
tokio::time::sleep(std::time::Duration::from_secs(100)).await;

Ok(())
}
}
Expand All @@ -230,7 +225,7 @@ impl MockInternalIndexerDBService {
pub fn new_for_test(
db_reader: Arc<dyn DbReader>,
node_config: &NodeConfig,
update_receiver: WatchReceiver<Version>,
update_receiver: WatchReceiver<(Instant, Version)>,
end_version: Option<Version>,
) -> Self {
if !node_config
Expand Down
1 change: 1 addition & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

pub mod backup_restore;
pub mod internal_indexer_db_service;
pub mod metrics;
pub mod runtime;
pub mod table_info_service;

Expand Down
13 changes: 13 additions & 0 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use aptos_metrics_core::{register_int_gauge, IntGauge};
use once_cell::sync::Lazy;

pub static INDEXER_DB_LATENCY: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_internal_indexer_latency",
"The latency between main db update and data written to indexer db"
)
.unwrap()
});
4 changes: 2 additions & 2 deletions ecosystem/indexer-grpc/indexer-grpc-table-info/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use aptos_db_indexer::{
use aptos_mempool::MempoolClientSender;
use aptos_storage_interface::DbReaderWriter;
use aptos_types::{chain_id::ChainId, transaction::Version};
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tokio::{runtime::Runtime, sync::watch::Receiver as WatchReceiver};

const INDEX_ASYNC_V2_DB_NAME: &str = "index_indexer_async_v2_db";
Expand All @@ -24,7 +24,7 @@ pub fn bootstrap_internal_indexer_db(
config: &NodeConfig,
db_rw: DbReaderWriter,
internal_indexer_db: Option<InternalIndexerDB>,
update_receiver: Option<WatchReceiver<Version>>,
update_receiver: Option<WatchReceiver<(Instant, Version)>>,
) -> Option<(Runtime, Arc<DBIndexer>)> {
if !config.indexer_db_config.is_internal_indexer_db_enabled() || internal_indexer_db.is_none() {
return None;
Expand Down
8 changes: 4 additions & 4 deletions execution/executor/tests/internal_indexer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,10 @@ fn test_db_indexer_data() {
// assert the data matches the expected data
let version = internal_indexer_db.get_persisted_version().unwrap();
assert_eq!(version, None);
let mut start_version = version.map_or(0, |v| v + 1);
while start_version < total_version {
start_version = db_indexer.process_a_batch(start_version).unwrap();
}
let start_version = version.map_or(0, |v| v + 1);
db_indexer
.process_a_batch(start_version, total_version)
.unwrap();
// wait for the commit to finish
thread::sleep(Duration::from_millis(100));
// indexer has process all the transactions
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ impl AptosDB {
LATEST_TXN_VERSION.set(version as i64);
if let Some(update_sender) = &self.update_subscriber {
update_sender.send(
version
(Instant::now(), version)
).map_err(| err | {
AptosDbError::Other(format!("Failed to send update to subscriber: {}", err))
})?;
Expand Down
7 changes: 5 additions & 2 deletions storage/aptosdb/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct AptosDB {
commit_lock: std::sync::Mutex<()>,
indexer: Option<Indexer>,
skip_index_and_usage: bool,
update_subscriber: Option<Sender<Version>>,
update_subscriber: Option<Sender<(Instant, Version)>>,
}

// DbReader implementations and private functions used by them.
Expand Down Expand Up @@ -186,7 +186,10 @@ impl AptosDB {
Ok((ledger_db, state_merkle_db, state_kv_db))
}

pub fn add_version_update_subscriber(&mut self, sender: Sender<Version>) -> Result<()> {
pub fn add_version_update_subscriber(
&mut self,
sender: Sender<(Instant, Version)>,
) -> Result<()> {
self.update_subscriber = Some(sender);
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions storage/aptosdb/src/fast_sync_storage_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use aptos_types::{
transaction::{TransactionOutputListWithProof, Version},
};
use either::Either;
use std::sync::Arc;
use std::{sync::Arc, time::Instant};
use tokio::sync::watch::Sender;

pub const SECONDARY_DB_DIR: &str = "fast_sync_secondary";

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
Expand All @@ -44,7 +43,7 @@ impl FastSyncStorageWrapper {
pub fn initialize_dbs(
config: &NodeConfig,
internal_indexer_db: Option<InternalIndexerDB>,
update_sender: Option<Sender<Version>>,
update_sender: Option<Sender<(Instant, Version)>>,
) -> Result<Either<AptosDB, Self>> {
let mut db_main = AptosDB::open(
config.storage.get_dir_paths(),
Expand Down
25 changes: 20 additions & 5 deletions storage/indexer/src/db_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,8 @@ impl DBIndexer {
Ok(zipped)
}

fn get_num_of_transactions(&self, version: Version) -> Result<u64> {
let highest_version = self.main_db_reader.ensure_synced_version()?;
fn get_num_of_transactions(&self, version: Version, end_version: Version) -> Result<u64> {
let highest_version = min(self.main_db_reader.ensure_synced_version()?, end_version);
if version > highest_version {
// In case main db is not synced yet or recreated
return Ok(0);
Expand All @@ -392,10 +392,25 @@ impl DBIndexer {
Ok(num_of_transaction)
}

pub fn process_a_batch(&self, start_version: Version) -> Result<Version> {
let _timer = TIMER.with_label_values(&["process_a_batch"]).start_timer();
/// Process all transactions from `start_version` to `end_version`. Left inclusive, right exclusive.
pub fn process(&self, start_version: Version, end_version: Version) -> Result<Version> {
let mut version = start_version;
let num_transactions = self.get_num_of_transactions(version)?;
while version < end_version {
let next_version = self.process_a_batch(version, end_version)?;
if next_version == version {
break;
}
version = next_version;
}
Ok(version)
}

/// Process a batch of transactions that is within the range of `start_version` to `end_version`. Left inclusive, right exclusive.
pub fn process_a_batch(&self, start_version: Version, end_version: Version) -> Result<Version> {
let _timer: aptos_metrics_core::HistogramTimer =
TIMER.with_label_values(&["process_a_batch"]).start_timer();
let mut version = start_version;
let num_transactions = self.get_num_of_transactions(version, end_version)?;
// This promises num_transactions should be readable from main db
let mut db_iter = self.get_main_db_iter(version, num_transactions)?;
let mut batch = SchemaBatch::new();
Expand Down
Loading