-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Indexer-Grpc-V2] Add GrpcManagerService. #15726
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,34 @@ | ||
// Copyright © Aptos Foundation | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::grpc_manager::GrpcManager; | ||
use anyhow::Result; | ||
use aptos_indexer_grpc_server_framework::RunnableConfig; | ||
use serde::{Deserialize, Serialize}; | ||
use std::net::SocketAddr; | ||
use tokio::sync::OnceCell; | ||
|
||
static GRPC_MANAGER: OnceCell<GrpcManager> = OnceCell::const_new(); | ||
|
||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
pub(crate) struct ServiceConfig { | ||
pub(crate) listen_address: SocketAddr, | ||
} | ||
|
||
#[derive(Clone, Debug, Deserialize, Serialize)] | ||
#[serde(deny_unknown_fields)] | ||
pub struct IndexerGrpcManagerConfig {} | ||
pub struct IndexerGrpcManagerConfig { | ||
pub(crate) chain_id: u64, | ||
pub(crate) service_config: ServiceConfig, | ||
} | ||
Comment on lines
+20
to
+23
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
These fields need to be added to maintain consistency between the config struct and its usage. The current implementation will fail to compile since these fields are required by the manager initialization. Spotted by Graphite Reviewer |
||
|
||
#[async_trait::async_trait] | ||
impl RunnableConfig for IndexerGrpcManagerConfig { | ||
async fn run(&self) -> Result<()> { | ||
Ok(()) | ||
GRPC_MANAGER | ||
.get_or_init(|| async { GrpcManager::new(self).await }) | ||
.await | ||
.start(&self.service_config) | ||
} | ||
|
||
fn get_server_name(&self) -> String { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
// Copyright © Aptos Foundation | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::{ | ||
config::{IndexerGrpcManagerConfig, ServiceConfig}, | ||
service::GrpcManagerService, | ||
}; | ||
use anyhow::Result; | ||
use aptos_protos::indexer::v1::grpc_manager_server::GrpcManagerServer; | ||
use std::time::Duration; | ||
use tonic::{codec::CompressionEncoding, transport::Server}; | ||
use tracing::info; | ||
|
||
const HTTP2_PING_INTERVAL_DURATION: Duration = Duration::from_secs(60); | ||
const HTTP2_PING_TIMEOUT_DURATION: Duration = Duration::from_secs(10); | ||
|
||
pub(crate) struct GrpcManager { | ||
chain_id: u64, | ||
} | ||
|
||
impl GrpcManager { | ||
pub(crate) async fn new(config: &IndexerGrpcManagerConfig) -> Self { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why does this need to be async? |
||
let chain_id = config.chain_id; | ||
|
||
Self { chain_id } | ||
} | ||
|
||
pub(crate) fn start(&self, service_config: &ServiceConfig) -> Result<()> { | ||
let service = GrpcManagerServer::new(GrpcManagerService::new(self.chain_id)) | ||
.send_compressed(CompressionEncoding::Zstd) | ||
.accept_compressed(CompressionEncoding::Zstd); | ||
let server = Server::builder() | ||
.http2_keepalive_interval(Some(HTTP2_PING_INTERVAL_DURATION)) | ||
.http2_keepalive_timeout(Some(HTTP2_PING_TIMEOUT_DURATION)) | ||
.add_service(service); | ||
|
||
tokio_scoped::scope(|s| { | ||
s.spawn(async move { | ||
info!("Starting GrpcManager at {}.", service_config.listen_address); | ||
server.serve(service_config.listen_address).await.unwrap(); | ||
}); | ||
}); | ||
Comment on lines
+37
to
+42
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Using Spotted by Graphite Reviewer |
||
|
||
Ok(()) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,3 +2,7 @@ | |
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
pub mod config; | ||
mod grpc_manager; | ||
mod service; | ||
#[cfg(test)] | ||
mod test; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
// Copyright (c) Aptos Foundation | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use aptos_protos::indexer::v1::{ | ||
grpc_manager_server::GrpcManager, service_info::Info, GetDataServiceForRequestRequest, | ||
GetDataServiceForRequestResponse, GetTransactionsRequest, HeartbeatRequest, HeartbeatResponse, | ||
TransactionsResponse, | ||
}; | ||
use tonic::{Request, Response, Status}; | ||
|
||
pub struct GrpcManagerService { | ||
chain_id: u64, | ||
} | ||
|
||
impl GrpcManagerService { | ||
pub(crate) fn new(chain_id: u64) -> Self { | ||
Self { chain_id } | ||
} | ||
|
||
async fn handle_heartbeat( | ||
&self, | ||
_address: String, | ||
_info: Info, | ||
) -> anyhow::Result<Response<HeartbeatResponse>> { | ||
// TODO(grao): Implement. | ||
todo!() | ||
} | ||
|
||
fn pick_live_data_service(&self, _starting_version: u64) -> Option<String> { | ||
// TODO(grao): Implement. | ||
todo!() | ||
} | ||
|
||
async fn pick_historical_data_service(&self, _starting_version: u64) -> Option<String> { | ||
// TODO(grao): Implement. | ||
todo!() | ||
} | ||
} | ||
|
||
#[tonic::async_trait] | ||
impl GrpcManager for GrpcManagerService { | ||
async fn heartbeat( | ||
&self, | ||
request: Request<HeartbeatRequest>, | ||
) -> Result<Response<HeartbeatResponse>, Status> { | ||
let request = request.into_inner(); | ||
if let Some(service_info) = request.service_info { | ||
if let Some(address) = service_info.address { | ||
if let Some(info) = service_info.info { | ||
return self | ||
.handle_heartbeat(address, info) | ||
.await | ||
.map_err(|e| Status::internal(format!("Error handling heartbeat: {e}"))); | ||
} | ||
} | ||
} | ||
|
||
Err(Status::invalid_argument("Bad request.")) | ||
} | ||
|
||
async fn get_transactions( | ||
&self, | ||
request: Request<GetTransactionsRequest>, | ||
) -> Result<Response<TransactionsResponse>, Status> { | ||
let _request = request.into_inner(); | ||
let transactions = vec![]; | ||
// TODO(grao): Implement. | ||
Comment on lines
+65
to
+67
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this method is not yet implemented, returning an empty vector could silently mask errors for clients. Consider returning Spotted by Graphite Reviewer |
||
|
||
Ok(Response::new(TransactionsResponse { | ||
transactions, | ||
chain_id: Some(self.chain_id), | ||
})) | ||
} | ||
|
||
async fn get_data_service_for_request( | ||
&self, | ||
request: Request<GetDataServiceForRequestRequest>, | ||
) -> Result<Response<GetDataServiceForRequestResponse>, Status> { | ||
let request = request.into_inner(); | ||
|
||
if request.user_request.is_none() { | ||
return Err(Status::invalid_argument("Bad request.")); | ||
} | ||
|
||
let user_request = request.user_request.unwrap(); | ||
if user_request.starting_version.is_none() { | ||
return Err(Status::invalid_argument("Bad request.")); | ||
} | ||
|
||
let starting_version = user_request.starting_version(); | ||
|
||
let data_service_address = | ||
// TODO(grao): Use a simple strategy for now. Consider to make it smarter in the | ||
// future. | ||
if let Some(address) = self.pick_live_data_service(starting_version) { | ||
address | ||
} else if let Some(address) = self.pick_historical_data_service(starting_version).await { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
address | ||
} else { | ||
return Err(Status::internal( | ||
"Cannot find a data service instance to serve the provided request.", | ||
)); | ||
}; | ||
|
||
Ok(Response::new(GetDataServiceForRequestResponse { | ||
data_service_address, | ||
})) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
// Copyright © Aptos Foundation | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
use crate::config::{IndexerGrpcManagerConfig, ServiceConfig}; | ||
use aptos_config::utils::get_available_port; | ||
use aptos_indexer_grpc_server_framework::RunnableConfig; | ||
use std::time::Duration; | ||
|
||
#[tokio::test(flavor = "multi_thread", worker_threads = 16)] | ||
async fn test_run() { | ||
let port = get_available_port(); | ||
let listen_address = format!("127.0.0.1:{port}").parse().unwrap(); | ||
let config = IndexerGrpcManagerConfig { | ||
chain_id: 0, | ||
service_config: ServiceConfig { listen_address }, | ||
}; | ||
|
||
let task = tokio::spawn(async move { | ||
config.run().await.unwrap(); | ||
}); | ||
|
||
tokio::time::sleep(Duration::from_secs(10)).await; | ||
|
||
assert!(!task.is_finished()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to have a custom health check endpoint so that k8s can kick-in to restart the pod if error happens? or the design is service can restart itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Will come up with something later.