Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Better RPC prometheus metrics. #9358

Merged
18 commits merged into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 53 additions & 2 deletions client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod middleware;

use jsonrpc_core::{IoHandlerExtension, MetaIoHandler};
use log::error;
use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64};
use pubsub::PubSubMetadata;
use std::io;

Expand All @@ -42,7 +43,7 @@ const HTTP_THREADS: usize = 4;
pub type RpcHandler<T> = pubsub::PubSubHandler<T, RpcMiddleware>;

pub use self::inner::*;
pub use middleware::{RpcMetrics, RpcMiddleware};
pub use middleware::{method_names, RpcMetrics, RpcMiddleware};

/// Construct rpc `IoHandler`
pub fn rpc_handler<M: PubSubMetadata>(
Expand Down Expand Up @@ -70,6 +71,43 @@ pub fn rpc_handler<M: PubSubMetadata>(
io
}

/// RPC server-specific prometheus metrics.
#[derive(Debug, Clone)]
pub struct ServerMetrics {
/// Number of sessions opened.
session_opened: Option<Counter<U64>>,
/// Number of sessions closed.
session_closed: Option<Counter<U64>>,
}

impl ServerMetrics {
/// Create new WebSocket RPC server metrics.
pub fn new(registry: Option<&Registry>) -> Result<Self, PrometheusError> {
registry
.map(|r| {
Ok(Self {
session_opened: register(
Counter::new(
"rpc_sessions_opened",
"Number of persistent RPC sessions opened",
)?,
r,
)?
.into(),
session_closed: register(
Counter::new(
"rpc_sessions_closed",
"Number of persistent RPC sessions closed",
)?,
r,
)?
.into(),
})
})
.unwrap_or_else(|| Ok(Self { session_opened: None, session_closed: None }))
}
}

#[cfg(not(target_os = "unknown"))]
mod inner {
use super::*;
Expand All @@ -81,6 +119,16 @@ mod inner {
/// Type alias for ws server
pub type WsServer = ws::Server;

impl ws::SessionStats for ServerMetrics {
fn open_session(&self, _id: ws::SessionId) {
self.session_opened.as_ref().map(|m| m.inc());
}

fn close_session(&self, _id: ws::SessionId) {
self.session_closed.as_ref().map(|m| m.inc());
}
}

/// Start HTTP server listening on given address.
///
/// **Note**: Only available if `not(target_os = "unknown")`.
Expand Down Expand Up @@ -110,6 +158,7 @@ mod inner {
pub fn start_ipc<M: pubsub::PubSubMetadata + Default>(
addr: &str,
io: RpcHandler<M>,
server_metrics: ServerMetrics,
) -> io::Result<ipc::Server> {
let builder = ipc::ServerBuilder::new(io);
#[cfg(target_os = "unix")]
Expand All @@ -118,7 +167,7 @@ mod inner {
security_attributes.set_mode(0o600)?;
security_attributes
});
builder.start(addr)
builder.session_stats(server_metrics).start(addr)
}

/// Start WS server listening on given address.
Expand All @@ -132,6 +181,7 @@ mod inner {
cors: Option<&Vec<String>>,
io: RpcHandler<M>,
maybe_max_payload_mb: Option<usize>,
server_metrics: ServerMetrics,
) -> io::Result<ws::Server> {
let rpc_max_payload = maybe_max_payload_mb
.map(|mb| mb.saturating_mul(MEGABYTE))
Expand All @@ -143,6 +193,7 @@ mod inner {
.max_connections(max_connections.unwrap_or(WS_MAX_CONNECTIONS))
.allowed_origins(map_cors(cors))
.allowed_hosts(hosts_filtering(cors.is_some()))
.session_stats(server_metrics)
.start(addr)
.map_err(|err| match err {
ws::Error::Io(io) => io,
Expand Down
208 changes: 181 additions & 27 deletions client/rpc-servers/src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,104 @@

//! Middleware for RPC requests.

use jsonrpc_core::{
FutureOutput, FutureResponse, Metadata, Middleware as RequestMiddleware, Request, Response,
use std::collections::HashSet;

use jsonrpc_core::{FutureOutput, FutureResponse, Metadata, Middleware as RequestMiddleware};
use prometheus_endpoint::{
register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64,
};
use prometheus_endpoint::{register, CounterVec, Opts, PrometheusError, Registry, U64};

use futures::{future::Either, Future};
use pubsub::PubSubMetadata;

use crate::RpcHandler;

/// Metrics for RPC middleware
#[derive(Debug, Clone)]
pub struct RpcMetrics {
rpc_calls: Option<CounterVec<U64>>,
requests_started: CounterVec<U64>,
requests_finished: CounterVec<U64>,
calls_time: HistogramVec,
calls_started: CounterVec<U64>,
calls_finished: CounterVec<U64>,
}

impl RpcMetrics {
/// Create an instance of metrics
pub fn new(metrics_registry: Option<&Registry>) -> Result<Self, PrometheusError> {
Ok(Self {
rpc_calls: metrics_registry
.map(|r| {
register(
CounterVec::new(
Opts::new("rpc_calls_total", "Number of rpc calls received"),
&["protocol"],
)?,
r,
)
})
.transpose()?,
})
pub fn new(metrics_registry: Option<&Registry>) -> Result<Option<Self>, PrometheusError> {
if let Some(r) = metrics_registry {
Ok(Some(Self {
requests_started: register(
CounterVec::new(
Opts::new(
"rpc_requests_started",
"Number of RPC requests (not calls) received by the server.",
),
&["protocol"],
)?,
r,
)?,
requests_finished: register(
CounterVec::new(
Opts::new(
"rpc_requests_finished",
"Number of RPC requests (not calls) processed by the server.",
),
&["protocol"],
)?,
r,
)?,
calls_time: register(
HistogramVec::new(
HistogramOpts::new(
"rpc_calls_time",
"Total time [μs] of processed RPC calls",
),
&["protocol", "method"],
)?,
r,
)?,
calls_started: register(
CounterVec::new(
Opts::new(
"rpc_calls_started",
"Number of received RPC calls (unique un-batched requests)",
),
&["protocol", "method"],
)?,
r,
)?,
calls_finished: register(
CounterVec::new(
Opts::new(
"rpc_calls_finished",
"Number of processed RPC calls (unique un-batched requests)",
),
&["protocol", "method", "is_error"],
)?,
r,
)?,
}))
} else {
Ok(None)
}
}
}

/// Instantiates a dummy `IoHandler` given a builder function to extract supported method names.
pub fn method_names<F, M>(gen_handler: F) -> HashSet<String>
where
F: FnOnce(RpcMiddleware) -> RpcHandler<M>,
M: PubSubMetadata,
{
let io = gen_handler(RpcMiddleware::new(None, HashSet::new(), "dummy"));
io.iter().map(|x| x.0.clone()).collect()
}

/// Middleware for RPC calls
pub struct RpcMiddleware {
metrics: RpcMetrics,
metrics: Option<RpcMetrics>,
known_rpc_method_names: HashSet<String>,
transport_label: String,
}

Expand All @@ -61,24 +124,115 @@ impl RpcMiddleware {
///
/// - `metrics`: Will be used to report statistics.
/// - `transport_label`: The label that is used when reporting the statistics.
pub fn new(metrics: RpcMetrics, transport_label: &str) -> Self {
RpcMiddleware { metrics, transport_label: String::from(transport_label) }
pub fn new(
metrics: Option<RpcMetrics>,
known_rpc_method_names: HashSet<String>,
transport_label: &str,
) -> Self {
RpcMiddleware {
metrics,
known_rpc_method_names,
transport_label: String::from(transport_label),
}
}
}

impl<M: Metadata> RequestMiddleware<M> for RpcMiddleware {
type Future = FutureResponse;
type CallFuture = FutureOutput;

fn on_request<F, X>(&self, request: Request, meta: M, next: F) -> Either<FutureResponse, X>
fn on_request<F, X>(
&self,
request: jsonrpc_core::Request,
meta: M,
next: F,
) -> Either<Self::Future, X>
where
F: Fn(jsonrpc_core::Request, M) -> X + Send + Sync,
X: Future<Item = Option<jsonrpc_core::Response>, Error = ()> + Send + 'static,
{
let metrics = self.metrics.clone();
let transport_label = self.transport_label.clone();
if let Some(ref metrics) = metrics {
metrics.requests_started.with_label_values(&[transport_label.as_str()]).inc();
}
Either::A(Box::new(next(request, meta).then(move |r| {
if let Some(ref metrics) = metrics {
metrics.requests_finished.with_label_values(&[transport_label.as_str()]).inc();
}
r
})))
}

fn on_call<F, X>(
&self,
call: jsonrpc_core::Call,
meta: M,
next: F,
) -> Either<Self::CallFuture, X>
where
F: Fn(Request, M) -> X + Send + Sync,
X: Future<Item = Option<Response>, Error = ()> + Send + 'static,
F: Fn(jsonrpc_core::Call, M) -> X + Send + Sync,
X: Future<Item = Option<jsonrpc_core::Output>, Error = ()> + Send + 'static,
{
if let Some(ref rpc_calls) = self.metrics.rpc_calls {
rpc_calls.with_label_values(&[self.transport_label.as_str()]).inc();
#[cfg(not(target_os = "unknown"))]
let start = std::time::Instant::now();
let name = call_name(&call, &self.known_rpc_method_names).to_owned();
let metrics = self.metrics.clone();
let transport_label = self.transport_label.clone();
log::trace!(target: "rpc_metrics", "[{}] {} call: {:?}", transport_label, name, &call);
if let Some(ref metrics) = metrics {
metrics
.calls_started
.with_label_values(&[transport_label.as_str(), name.as_str()])
.inc();
}
Either::A(Box::new(next(call, meta).then(move |r| {
#[cfg(not(target_os = "unknown"))]
let micros = start.elapsed().as_micros();
// seems that std::time is not implemented for browser target
#[cfg(target_os = "unknown")]
let micros = 1;
if let Some(ref metrics) = metrics {
metrics
.calls_time
.with_label_values(&[transport_label.as_str(), name.as_str()])
.observe(micros as _);
metrics
.calls_finished
.with_label_values(&[
transport_label.as_str(),
name.as_str(),
format!("{}", is_success(&r)).as_str(),
])
.inc();
}
log::debug!(target: "rpc_metrics", "[{}] {} call took {} μs", transport_label, name,
micros);
r
})))
}
}

fn call_name<'a>(call: &'a jsonrpc_core::Call, known_methods: &HashSet<String>) -> &'a str {
// To prevent bloating metric with all invalid method names we filter them out here.
let only_known = |method: &'a String| {
if known_methods.contains(method) {
method.as_str()
} else {
"invalid method"
}
};

match call {
jsonrpc_core::Call::Invalid { .. } => "invalid call",
jsonrpc_core::Call::MethodCall(ref call) => only_known(&call.method),
jsonrpc_core::Call::Notification(ref notification) => only_known(&notification.method),
}
}

Either::B(next(request, meta))
fn is_success(output: &Result<Option<jsonrpc_core::Output>, ()>) -> bool {
match output {
Ok(Some(jsonrpc_core::Output::Success(..))) => true,
_ => false,
}
}
7 changes: 5 additions & 2 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -648,12 +648,15 @@ where
)
};
let rpc_metrics = sc_rpc_server::RpcMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone())?;
let server_metrics = sc_rpc_server::ServerMetrics::new(config.prometheus_registry())?;
let rpc = start_rpc_servers(&config, gen_handler, rpc_metrics.clone(), server_metrics)?;
// This is used internally, so don't restrict access to unsafe RPC
let known_rpc_method_names =
sc_rpc_server::method_names(|m| gen_handler(sc_rpc::DenyUnsafe::No, m));
let rpc_handlers = RpcHandlers(Arc::new(
gen_handler(
sc_rpc::DenyUnsafe::No,
sc_rpc_server::RpcMiddleware::new(rpc_metrics, "inbrowser"),
sc_rpc_server::RpcMiddleware::new(rpc_metrics, known_rpc_method_names, "inbrowser"),
)
.into(),
));
Expand Down
Loading