Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Indexer-Grpc-V2] Add protos. #15701

Merged
merged 1 commit into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use aptos_logger::{error, info};
use aptos_moving_average::MovingAverage;
use aptos_protos::internal::fullnode::v1::{
fullnode_data_server::FullnodeData, stream_status::StatusType, transactions_from_node_response,
GetTransactionsFromNodeRequest, StreamStatus, TransactionsFromNodeResponse,
GetTransactionsFromNodeRequest, PingFullnodeRequest, PingFullnodeResponse, StreamStatus,
TransactionsFromNodeResponse,
};
use futures::Stream;
use std::pin::Pin;
Expand Down Expand Up @@ -156,6 +157,13 @@ impl FullnodeData for FullnodeDataService {
Box::pin(output_stream) as Self::GetTransactionsFromNodeStream
))
}

async fn ping(
&self,
_request: Request<PingFullnodeRequest>,
) -> Result<Response<PingFullnodeResponse>, Status> {
unimplemented!()
}
Comment on lines +161 to +166
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using unimplemented!() will cause a panic at runtime. Consider returning a proper gRPC response instead:

Ok(Response::new(PingFullnodeResponse { info: None }))

This maintains the gRPC contract while indicating the endpoint is not yet implemented.

Spotted by Graphite Reviewer

Is this helpful? React 👍 or 👎 to let us know.

}

pub fn get_status(
Expand Down
112 changes: 112 additions & 0 deletions protos/proto/aptos/indexer/v1/grpc.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

syntax = "proto3";

package aptos.indexer.v1;

import "aptos/indexer/v1/raw_data.proto";
import "aptos/transaction/v1/transaction.proto";
import "aptos/util/timestamp/timestamp.proto";

message StreamProgressSampleProto {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i feel Proto here is a little redundant; either remove or use unit something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use here because in the rust code I have a corresponding struct called StreamProgressSample. Just want to have a different name here.

optional aptos.util.timestamp.Timestamp timestamp = 1;
uint64 version = 2;
uint64 size_bytes = 3;
}

message StreamProgress {
repeated StreamProgressSampleProto samples = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intent behind collecting multiple samples here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calculate tps, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sync'ed offline; for monitoring purpose.

}

message ActiveStream {
optional string id = 1;
optional aptos.util.timestamp.Timestamp start_time = 2;
uint64 start_version = 3;
optional uint64 end_version = 4;

optional StreamProgress progress = 5;
}

message StreamInfo {
repeated ActiveStream active_streams = 1;
}

message LiveDataServiceInfo {
optional uint64 chain_id = 1;
optional aptos.util.timestamp.Timestamp timestamp = 2;
optional uint64 known_latest_version = 3;
optional StreamInfo stream_info = 4;
// If not present, it means the data service is not available to serve anything yet.
optional uint64 min_servable_version = 5;
}

message HistoricalDataServiceInfo {
optional uint64 chain_id = 1;
optional aptos.util.timestamp.Timestamp timestamp = 2;
optional uint64 known_latest_version = 3;
optional StreamInfo stream_info = 4;
}

message FullnodeInfo {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should include validation here? like chain id?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Done.

optional uint64 chain_id = 1;
optional aptos.util.timestamp.Timestamp timestamp = 2;
optional uint64 known_latest_version = 3;
}

message GrpcManagerInfo {
optional uint64 chain_id = 1;
optional aptos.util.timestamp.Timestamp timestamp = 2;
optional uint64 known_latest_version = 3;
optional string master_address = 4;
}

message ServiceInfo {
optional string address = 1;
oneof info {
LiveDataServiceInfo live_data_service_info = 2;
HistoricalDataServiceInfo historical_data_service_info = 3;
FullnodeInfo fullnode_info = 4;
GrpcManagerInfo grpc_manager_info = 5;
}
}

message HeartbeatRequest {
optional ServiceInfo service_info = 1;
}

message HeartbeatResponse {
optional uint64 known_latest_version = 1;
}

message PingDataServiceRequest {
optional uint64 known_latest_version = 1;
// `true` for live data service, `false` for historical data service.
bool ping_live_data_service = 2;
}

message PingDataServiceResponse {
oneof info {
LiveDataServiceInfo live_data_service_info = 1;
HistoricalDataServiceInfo historical_data_service_info = 2;
}
}

message GetDataServiceForRequestRequest {
optional GetTransactionsRequest user_request = 1;
}

message GetDataServiceForRequestResponse {
string data_service_address = 1;
}

service GrpcManager {
rpc Heartbeat(HeartbeatRequest) returns (HeartbeatResponse);
rpc GetTransactions(GetTransactionsRequest) returns (TransactionsResponse);
rpc GetDataServiceForRequest(GetDataServiceForRequestRequest) returns (GetDataServiceForRequestResponse);
}

service DataService {
rpc Ping(PingDataServiceRequest) returns (PingDataServiceResponse);
rpc GetTransactions(GetTransactionsRequest) returns (stream TransactionsResponse);
}
9 changes: 9 additions & 0 deletions protos/proto/aptos/internal/fullnode/v1/fullnode_data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ syntax = "proto3";
package aptos.internal.fullnode.v1;

import "aptos/transaction/v1/transaction.proto";
import "aptos/indexer/v1/grpc.proto";

// Transaction data is transferred via 1 stream with batches until terminated.
// One stream consists:
Expand Down Expand Up @@ -52,6 +53,14 @@ message TransactionsFromNodeResponse {
uint32 chain_id = 3;
}

message PingFullnodeRequest {
}

message PingFullnodeResponse {
optional aptos.indexer.v1.FullnodeInfo info = 1;
}

service FullnodeData {
rpc Ping(PingFullnodeRequest) returns (PingFullnodeResponse);
rpc GetTransactionsFromNode(GetTransactionsFromNodeRequest) returns (stream TransactionsFromNodeResponse);
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from typing import Mapping as _Mapping
from typing import Optional as _Optional
from typing import Union as _Union

from aptos.indexer.v1 import grpc_pb2 as _grpc_pb2
from aptos.transaction.v1 import transaction_pb2 as _transaction_pb2
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
Expand Down Expand Up @@ -75,3 +76,15 @@ class TransactionsFromNodeResponse(_message.Message):
data: _Optional[_Union[TransactionsOutput, _Mapping]] = ...,
chain_id: _Optional[int] = ...,
) -> None: ...

class PingFullnodeRequest(_message.Message):
__slots__ = []
def __init__(self) -> None: ...

class PingFullnodeResponse(_message.Message):
__slots__ = ["info"]
INFO_FIELD_NUMBER: _ClassVar[int]
info: _grpc_pb2.FullnodeInfo
def __init__(
self, info: _Optional[_Union[_grpc_pb2.FullnodeInfo, _Mapping]] = ...
) -> None: ...
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def __init__(self, channel):
Args:
channel: A grpc.Channel.
"""
self.Ping = channel.unary_unary(
"/aptos.internal.fullnode.v1.FullnodeData/Ping",
request_serializer=aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.PingFullnodeRequest.SerializeToString,
response_deserializer=aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.PingFullnodeResponse.FromString,
)
self.GetTransactionsFromNode = channel.unary_stream(
"/aptos.internal.fullnode.v1.FullnodeData/GetTransactionsFromNode",
request_serializer=aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.GetTransactionsFromNodeRequest.SerializeToString,
Expand All @@ -25,6 +30,12 @@ def __init__(self, channel):
class FullnodeDataServicer(object):
"""Missing associated documentation comment in .proto file."""

def Ping(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details("Method not implemented!")
raise NotImplementedError("Method not implemented!")

def GetTransactionsFromNode(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
Expand All @@ -34,6 +45,11 @@ def GetTransactionsFromNode(self, request, context):

def add_FullnodeDataServicer_to_server(servicer, server):
rpc_method_handlers = {
"Ping": grpc.unary_unary_rpc_method_handler(
servicer.Ping,
request_deserializer=aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.PingFullnodeRequest.FromString,
response_serializer=aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.PingFullnodeResponse.SerializeToString,
),
"GetTransactionsFromNode": grpc.unary_stream_rpc_method_handler(
servicer.GetTransactionsFromNode,
request_deserializer=aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.GetTransactionsFromNodeRequest.FromString,
Expand All @@ -50,6 +66,35 @@ def add_FullnodeDataServicer_to_server(servicer, server):
class FullnodeData(object):
"""Missing associated documentation comment in .proto file."""

@staticmethod
def Ping(
request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None,
):
return grpc.experimental.unary_unary(
request,
target,
"/aptos.internal.fullnode.v1.FullnodeData/Ping",
aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.PingFullnodeRequest.SerializeToString,
aptos_dot_internal_dot_fullnode_dot_v1_dot_fullnode__data__pb2.PingFullnodeResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
)

@staticmethod
def GetTransactionsFromNode(
request,
Expand Down
Loading
Loading