Skip to content

Commit b62431c

Browse files
committed
[Indexer-Grpc-V2] Several fixes/improvements.
1 parent 5359cde commit b62431c

File tree

15 files changed

+993
-620
lines changed

15 files changed

+993
-620
lines changed

ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/historical_data_service.rs

+57-23
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
metrics::{COUNTER, TIMER},
88
};
99
use aptos_indexer_grpc_utils::file_store_operator_v2::file_store_reader::FileStoreReader;
10-
use aptos_protos::indexer::v1::{GetTransactionsRequest, TransactionsResponse};
10+
use aptos_protos::indexer::v1::{GetTransactionsRequest, ProcessedRange, TransactionsResponse};
1111
use aptos_transaction_filter::BooleanTransactionFilter;
1212
use futures::executor::block_on;
1313
use std::{
@@ -167,14 +167,21 @@ impl HistoricalDataService {
167167
/*retries=*/ 3,
168168
/*max_files=*/ None,
169169
filter,
170+
Some(ending_version),
170171
tx,
171172
)
172173
.await;
173174
});
174175

175176
let mut close_to_latest = false;
176-
while let Some((transactions, batch_size_bytes, timestamp)) = rx.recv().await {
177-
next_version += transactions.len() as u64;
177+
while let Some((
178+
transactions,
179+
batch_size_bytes,
180+
timestamp,
181+
(first_processed_version, last_processed_version),
182+
)) = rx.recv().await
183+
{
184+
next_version = last_processed_version + 1;
178185
size_bytes += batch_size_bytes as u64;
179186
let timestamp_since_epoch =
180187
Duration::new(timestamp.seconds as u64, timestamp.nanos as u32);
@@ -186,28 +193,55 @@ impl HistoricalDataService {
186193
close_to_latest = true;
187194
}
188195

189-
if !transactions.is_empty() {
190-
let responses =
191-
transactions
192-
.chunks(max_num_transactions_per_batch)
193-
.map(|chunk| TransactionsResponse {
196+
let responses = if !transactions.is_empty() {
197+
let mut current_version = first_processed_version;
198+
let mut responses: Vec<_> = transactions
199+
.chunks(max_num_transactions_per_batch)
200+
.map(|chunk| {
201+
let first_version = current_version;
202+
let last_version = chunk.last().unwrap().version;
203+
current_version = last_version + 1;
204+
TransactionsResponse {
194205
transactions: chunk.to_vec(),
195206
chain_id: Some(self.chain_id),
196-
});
197-
for response in responses {
198-
let _timer = TIMER
199-
.with_label_values(&["historical_data_service_send_batch"])
200-
.start_timer();
201-
if response_sender.send(Ok(response)).await.is_err() {
202-
// NOTE: We are not recalculating the version and size_bytes for the stream
203-
// progress since nobody cares about the accurate if client has dropped the
204-
// connection.
205-
info!(stream_id = id, "Client dropped.");
206-
COUNTER
207-
.with_label_values(&["historical_data_service_client_dropped"])
208-
.inc();
209-
break 'out;
210-
}
207+
processed_range: Some(ProcessedRange {
208+
first_version,
209+
last_version,
210+
}),
211+
}
212+
})
213+
.collect();
214+
responses
215+
.last_mut()
216+
.unwrap()
217+
.processed_range
218+
.unwrap()
219+
.last_version = last_processed_version;
220+
responses
221+
} else {
222+
vec![TransactionsResponse {
223+
transactions: vec![],
224+
chain_id: Some(self.chain_id),
225+
processed_range: Some(ProcessedRange {
226+
first_version: first_processed_version,
227+
last_version: last_processed_version,
228+
}),
229+
}]
230+
};
231+
232+
for response in responses {
233+
let _timer = TIMER
234+
.with_label_values(&["historical_data_service_send_batch"])
235+
.start_timer();
236+
if response_sender.send(Ok(response)).await.is_err() {
237+
// NOTE: We are not recalculating the version and size_bytes for the stream
238+
// progress since nobody cares about the accurate if client has dropped the
239+
// connection.
240+
info!(stream_id = id, "Client dropped.");
241+
COUNTER
242+
.with_label_values(&["historical_data_service_client_dropped"])
243+
.inc();
244+
break 'out;
211245
}
212246
}
213247
}

ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/live_data_service/in_memory_cache.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl<'a> InMemoryCache<'a> {
4444
max_num_transactions_per_batch: usize,
4545
max_bytes_per_batch: usize,
4646
filter: &Option<BooleanTransactionFilter>,
47-
) -> Option<(Vec<Transaction>, usize)> {
47+
) -> Option<(Vec<Transaction>, usize, u64)> {
4848
let _timer = TIMER.with_label_values(&["cache_get_data"]).start_timer();
4949

5050
while starting_version >= self.data_manager.read().await.end_version {
@@ -97,7 +97,7 @@ impl<'a> InMemoryCache<'a> {
9797
}
9898
}
9999
trace!("Data was sent from cache, last version: {}.", version - 1);
100-
return Some((result, total_bytes));
100+
return Some((result, total_bytes, version - 1));
101101
}
102102
}
103103
}

ecosystem/indexer-grpc/indexer-grpc-data-service-v2/src/live_data_service/mod.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{
1212
live_data_service::in_memory_cache::InMemoryCache,
1313
metrics::{COUNTER, TIMER},
1414
};
15-
use aptos_protos::indexer::v1::{GetTransactionsRequest, TransactionsResponse};
15+
use aptos_protos::indexer::v1::{GetTransactionsRequest, ProcessedRange, TransactionsResponse};
1616
use aptos_transaction_filter::BooleanTransactionFilter;
1717
use std::{sync::Arc, time::Duration};
1818
use tokio::sync::mpsc::{Receiver, Sender};
@@ -175,7 +175,7 @@ impl<'a> LiveDataService<'a> {
175175
continue;
176176
}
177177

178-
if let Some((transactions, batch_size_bytes)) = self
178+
if let Some((transactions, batch_size_bytes, last_processed_version)) = self
179179
.in_memory_cache
180180
.get_data(
181181
next_version,
@@ -189,12 +189,16 @@ impl<'a> LiveDataService<'a> {
189189
let _timer = TIMER
190190
.with_label_values(&["live_data_service_send_batch"])
191191
.start_timer();
192-
next_version += transactions.len() as u64;
193-
size_bytes += batch_size_bytes as u64;
194192
let response = TransactionsResponse {
195193
transactions,
196194
chain_id: Some(self.chain_id),
195+
processed_range: Some(ProcessedRange {
196+
first_version: next_version,
197+
last_version: last_processed_version,
198+
}),
197199
};
200+
next_version = last_processed_version + 1;
201+
size_bytes += batch_size_bytes as u64;
198202
if response_sender.send(Ok(response)).await.is_err() {
199203
info!(stream_id = id, "Client dropped.");
200204
COUNTER

ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs

+1
Original file line numberDiff line numberDiff line change
@@ -699,6 +699,7 @@ fn get_transactions_responses_builder(
699699
.map(|chunk| TransactionsResponse {
700700
chain_id: Some(chain_id as u64),
701701
transactions: chunk,
702+
processed_range: None,
702703
})
703704
.collect();
704705
(responses, num_stripped)

ecosystem/indexer-grpc/indexer-grpc-fullnode/src/localnet_data_service.rs

+1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ impl RawData for LocalnetDataService {
105105
},
106106
_ => panic!("Unexpected response type."),
107107
},
108+
processed_range: None,
108109
});
109110
match external_service_tx.send(response).await {
110111
Ok(_) => {},

ecosystem/indexer-grpc/indexer-grpc-manager/src/data_manager.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -296,14 +296,15 @@ impl DataManager {
296296
/*retries=*/ 3,
297297
/*max_files=*/ Some(1),
298298
/*filter=*/ None,
299+
/*ending_version=*/ None,
299300
tx,
300301
)
301302
.await;
302303

303-
if let Some((transactions, _, _)) = rx.recv().await {
304+
if let Some((transactions, _, _, range)) = rx.recv().await {
304305
debug!(
305-
"Transactions returned from filestore: [{start_version}, {}).",
306-
transactions.last().unwrap().version
306+
"Transactions returned from filestore: [{}, {}].",
307+
range.0, range.1
307308
);
308309
let first_version = transactions.first().unwrap().version;
309310
ensure!(

ecosystem/indexer-grpc/indexer-grpc-manager/src/service.rs

+2
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,8 @@ impl GrpcManager for GrpcManagerService {
140140
Ok(Response::new(TransactionsResponse {
141141
transactions,
142142
chain_id: Some(self.chain_id),
143+
// Not used.
144+
processed_range: None,
143145
}))
144146
}
145147

ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator_v2/file_store_reader.rs

+10-2
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ impl FileStoreReader {
7676
retries: u8,
7777
max_files: Option<usize>,
7878
filter: Option<BooleanTransactionFilter>,
79-
tx: Sender<(Vec<Transaction>, usize, Timestamp)>,
79+
ending_version: Option<u64>,
80+
tx: Sender<(Vec<Transaction>, usize, Timestamp, (u64, u64))>,
8081
) {
8182
trace!(
8283
"Getting transactions from file store, version: {version}, max_files: {max_files:?}."
@@ -118,13 +119,20 @@ impl FileStoreReader {
118119
if num_to_skip > 0 {
119120
transactions = transactions.split_off(num_to_skip);
120121
}
122+
let processed_range = (
123+
transactions.first().unwrap().version,
124+
transactions.last().unwrap().version,
125+
);
126+
if let Some(ending_version) = ending_version {
127+
transactions.retain(|t| t.version < ending_version);
128+
}
121129
if let Some(ref filter) = filter {
122130
transactions.retain(|t| filter.matches(t));
123131
}
124132
let size_bytes = transactions.iter().map(|t| t.encoded_len()).sum();
125133
trace!("Got {} transactions from file store to send, size: {size_bytes}, first_version: {:?}", transactions.len(), transactions.first().map(|t| t.version));
126134
if tx
127-
.send((transactions, size_bytes, timestamp))
135+
.send((transactions, size_bytes, timestamp, processed_range))
128136
.await
129137
.is_err()
130138
{

ecosystem/indexer-grpc/transaction-filter/src/traits.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ where
4747

4848
#[inline]
4949
fn matches_vec(&self, items: &[T]) -> bool {
50-
items.iter().all(|item| self.matches(item))
50+
items.iter().any(|item| self.matches(item))
5151
}
5252

5353
#[inline]

protos/proto/aptos/indexer/v1/raw_data.proto

+7
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,20 @@ message GetTransactionsRequest {
3232
optional BooleanTransactionFilter transaction_filter = 4;
3333
}
3434

35+
message ProcessedRange {
36+
uint64 first_version = 1;
37+
uint64 last_version = 2;
38+
}
39+
3540
// TransactionsResponse is a batch of transactions.
3641
message TransactionsResponse {
3742
// Required; transactions data.
3843
repeated aptos.transaction.v1.Transaction transactions = 1;
3944

4045
// Required; chain id.
4146
optional uint64 chain_id = 2 [jstype = JS_STRING];
47+
48+
optional ProcessedRange processed_range = 3;
4249
}
4350

4451
service RawData {

protos/python/aptos_protos/aptos/indexer/v1/raw_data_pb2.py

+7-5
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protos/python/aptos_protos/aptos/indexer/v1/raw_data_pb2.pyi

+14-1
Original file line numberDiff line numberDiff line change
@@ -53,18 +53,31 @@ class GetTransactionsRequest(_message.Message):
5353
] = ...,
5454
) -> None: ...
5555

56+
class ProcessedRange(_message.Message):
57+
__slots__ = ["first_version", "last_version"]
58+
FIRST_VERSION_FIELD_NUMBER: _ClassVar[int]
59+
LAST_VERSION_FIELD_NUMBER: _ClassVar[int]
60+
first_version: int
61+
last_version: int
62+
def __init__(
63+
self, first_version: _Optional[int] = ..., last_version: _Optional[int] = ...
64+
) -> None: ...
65+
5666
class TransactionsResponse(_message.Message):
57-
__slots__ = ["transactions", "chain_id"]
67+
__slots__ = ["transactions", "chain_id", "processed_range"]
5868
TRANSACTIONS_FIELD_NUMBER: _ClassVar[int]
5969
CHAIN_ID_FIELD_NUMBER: _ClassVar[int]
70+
PROCESSED_RANGE_FIELD_NUMBER: _ClassVar[int]
6071
transactions: _containers.RepeatedCompositeFieldContainer[
6172
_transaction_pb2.Transaction
6273
]
6374
chain_id: int
75+
processed_range: ProcessedRange
6476
def __init__(
6577
self,
6678
transactions: _Optional[
6779
_Iterable[_Union[_transaction_pb2.Transaction, _Mapping]]
6880
] = ...,
6981
chain_id: _Optional[int] = ...,
82+
processed_range: _Optional[_Union[ProcessedRange, _Mapping]] = ...,
7083
) -> None: ...

0 commit comments

Comments
 (0)