diff --git a/Cargo.lock b/Cargo.lock index 5dddea40cb15f..2ee5b0581171f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,5 +1,7 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. +version = 3 + [[package]] name = "Inflector" version = "0.11.4" @@ -3971,6 +3973,7 @@ dependencies = [ "frame-system", "futures 0.3.12", "hex-literal", + "libp2p-wasm-ext", "log", "nix", "node-executor", @@ -7778,8 +7781,6 @@ dependencies = [ "tokio 0.2.25", "tracing", "tracing-futures", - "tracing-log", - "tracing-subscriber", "wasm-timer", ] @@ -7865,10 +7866,8 @@ dependencies = [ "rand 0.7.3", "serde", "serde_json", - "sp-utils", "take_mut", - "tracing", - "tracing-subscriber", + "thiserror", "void", "wasm-timer", ] @@ -7886,7 +7885,6 @@ dependencies = [ "parking_lot 0.11.1", "regex", "rustc-hash", - "sc-telemetry", "sc-tracing-proc-macro", "serde", "serde_json", @@ -9263,7 +9261,6 @@ dependencies = [ "sc-informant", "sc-network", "sc-service", - "sc-telemetry", "sc-tracing", "sp-database", "wasm-bindgen", diff --git a/bin/node-template/node/src/service.rs b/bin/node-template/node/src/service.rs index 4ea54dc8174a0..197a495b438b2 100644 --- a/bin/node-template/node/src/service.rs +++ b/bin/node-template/node/src/service.rs @@ -12,7 +12,7 @@ use sp_consensus_aura::sr25519::AuthorityPair as AuraPair; use sc_consensus_aura::{ImportQueueParams, StartAuraParams, SlotProportion}; use sc_finality_grandpa::SharedVoterState; use sc_keystore::LocalKeystore; -use sc_telemetry::TelemetrySpan; +use sc_telemetry::{Telemetry, TelemetryWorker}; // Our native executor instance. native_executor_instance!( @@ -38,6 +38,7 @@ pub fn new_partial(config: &Configuration) -> Result, sc_finality_grandpa::LinkHalf, + Option, ) >, ServiceError> { if config.keystore_remote.is_some() { @@ -46,10 +47,28 @@ pub fn new_partial(config: &Configuration) -> Result Result<_, sc_telemetry::Error> { + let worker = TelemetryWorker::new(16)?; + let telemetry = worker.handle().new_telemetry(endpoints); + Ok((worker, telemetry)) + }) + .transpose()?; + let (client, backend, keystore_container, task_manager) = - sc_service::new_full_parts::(&config)?; + sc_service::new_full_parts::( + &config, + telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()), + )?; let client = Arc::new(client); + let telemetry = telemetry + .map(|(worker, telemetry)| { + task_manager.spawn_handle().spawn("telemetry", worker.run()); + telemetry + }); + let select_chain = sc_consensus::LongestChain::new(backend.clone()); let transaction_pool = sc_transaction_pool::BasicPool::new_full( @@ -61,7 +80,10 @@ pub fn new_partial(config: &Configuration) -> Result), select_chain.clone(), + client.clone(), + &(client.clone() as Arc<_>), + select_chain.clone(), + telemetry.as_ref().map(|x| x.handle()), )?; let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new( @@ -79,6 +101,7 @@ pub fn new_partial(config: &Configuration) -> Result Result Result select_chain, transaction_pool, inherent_data_providers, - other: (block_import, grandpa_link), + other: (block_import, grandpa_link, mut telemetry), } = new_partial(&config)?; if let Some(url) = &config.keystore_remote { @@ -167,10 +190,7 @@ pub fn new_full(mut config: Configuration) -> Result }) }; - let telemetry_span = TelemetrySpan::new(); - let _telemetry_span_entered = telemetry_span.enter(); - - let (_rpc_handlers, telemetry_connection_notifier) = sc_service::spawn_tasks( + let _rpc_handlers = sc_service::spawn_tasks( sc_service::SpawnTasksParams { network: network.clone(), client: client.clone(), @@ -184,7 +204,7 @@ pub fn new_full(mut config: Configuration) -> Result network_status_sinks, system_rpc_tx, config, - telemetry_span: Some(telemetry_span.clone()), + telemetry: telemetry.as_mut(), }, )?; @@ -194,6 +214,7 @@ pub fn new_full(mut config: Configuration) -> Result client.clone(), transaction_pool, prometheus_registry.as_ref(), + telemetry.as_ref().map(|x| x.handle()), ); let can_author_with = @@ -213,6 +234,7 @@ pub fn new_full(mut config: Configuration) -> Result can_author_with, sync_oracle: network.clone(), block_proposal_slot_portion: SlotProportion::new(2f32 / 3f32), + telemetry: telemetry.as_ref().map(|x| x.handle()), }, )?; @@ -237,6 +259,7 @@ pub fn new_full(mut config: Configuration) -> Result observer_enabled: false, keystore, is_authority: role.is_authority(), + telemetry: telemetry.as_ref().map(|x| x.handle()), }; if enable_grandpa { @@ -250,10 +273,10 @@ pub fn new_full(mut config: Configuration) -> Result config: grandpa_config, link: grandpa_link, network, - telemetry_on_connect: telemetry_connection_notifier.map(|x| x.on_connect_stream()), voting_rule: sc_finality_grandpa::VotingRulesBuilder::default().build(), prometheus_registry, shared_voter_state: SharedVoterState::empty(), + telemetry: telemetry.as_ref().map(|x| x.handle()), }; // the GRANDPA voter task is considered infallible, i.e. @@ -270,8 +293,26 @@ pub fn new_full(mut config: Configuration) -> Result /// Builds a new service for a light client. pub fn new_light(mut config: Configuration) -> Result { + let telemetry = config.telemetry_endpoints.clone() + .filter(|x| !x.is_empty()) + .map(|endpoints| -> Result<_, sc_telemetry::Error> { + let worker = TelemetryWorker::new(16)?; + let telemetry = worker.handle().new_telemetry(endpoints); + Ok((worker, telemetry)) + }) + .transpose()?; + let (client, backend, keystore_container, mut task_manager, on_demand) = - sc_service::new_light_parts::(&config)?; + sc_service::new_light_parts::( + &config, + telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()), + )?; + + let mut telemetry = telemetry + .map(|(worker, telemetry)| { + task_manager.spawn_handle().spawn("telemetry", worker.run()); + telemetry + }); config.network.extra_sets.push(sc_finality_grandpa::grandpa_peers_set_config()); @@ -289,6 +330,7 @@ pub fn new_light(mut config: Configuration) -> Result client.clone(), &(client.clone() as Arc<_>), select_chain.clone(), + telemetry.as_ref().map(|x| x.handle()), )?; let aura_block_import = sc_consensus_aura::AuraBlockImport::<_, _, _, AuraPair>::new( @@ -307,6 +349,7 @@ pub fn new_light(mut config: Configuration) -> Result slot_duration: sc_consensus_aura::slot_duration(&*client)?, registry: config.prometheus_registry(), check_for_equivocation: Default::default(), + telemetry: telemetry.as_ref().map(|x| x.handle()), }, )?; @@ -327,9 +370,6 @@ pub fn new_light(mut config: Configuration) -> Result ); } - let telemetry_span = TelemetrySpan::new(); - let _telemetry_span_entered = telemetry_span.enter(); - sc_service::spawn_tasks(sc_service::SpawnTasksParams { remote_blockchain: Some(backend.remote_blockchain()), transaction_pool, @@ -343,7 +383,7 @@ pub fn new_light(mut config: Configuration) -> Result network, network_status_sinks, system_rpc_tx, - telemetry_span: Some(telemetry_span.clone()), + telemetry: telemetry.as_mut(), })?; network_starter.start_network(); diff --git a/bin/node/bench/src/construct.rs b/bin/node/bench/src/construct.rs index b64ffec641c22..8469ec62893b5 100644 --- a/bin/node/bench/src/construct.rs +++ b/bin/node/bench/src/construct.rs @@ -151,6 +151,7 @@ impl core::Benchmark for ConstructionBenchmark { context.client.clone(), self.transactions.clone().into(), None, + None, ); let inherent_data_providers = sp_inherents::InherentDataProviders::new(); inherent_data_providers diff --git a/bin/node/cli/Cargo.toml b/bin/node/cli/Cargo.toml index ba226629ae7f3..ebba2095e6be3 100644 --- a/bin/node/cli/Cargo.toml +++ b/bin/node/cli/Cargo.toml @@ -105,6 +105,7 @@ try-runtime-cli = { version = "0.9.0", optional = true, path = "../../../utils/f wasm-bindgen = { version = "0.2.57", optional = true } wasm-bindgen-futures = { version = "0.4.18", optional = true } browser-utils = { package = "substrate-browser-utils", path = "../../../utils/browser", optional = true, version = "0.9.0"} +libp2p-wasm-ext = { version = "0.27", features = ["websocket"], optional = true } [target.'cfg(target_arch="x86_64")'.dependencies] node-executor = { version = "2.0.0", path = "../executor", features = [ "wasmtime" ] } @@ -148,6 +149,7 @@ browser = [ "browser-utils", "wasm-bindgen", "wasm-bindgen-futures", + "libp2p-wasm-ext", ] cli = [ "node-executor/wasmi-errno", diff --git a/bin/node/cli/src/browser.rs b/bin/node/cli/src/browser.rs index 6c0a2f10d95e5..49ac309d42abc 100644 --- a/bin/node/cli/src/browser.rs +++ b/bin/node/cli/src/browser.rs @@ -21,7 +21,7 @@ use log::info; use wasm_bindgen::prelude::*; use browser_utils::{ Client, - browser_configuration, init_logging_and_telemetry, set_console_error_panic_hook, + browser_configuration, init_logging, set_console_error_panic_hook, }; /// Starts the client. @@ -37,18 +37,14 @@ async fn start_inner( log_directives: String, ) -> Result> { set_console_error_panic_hook(); - let telemetry_worker = init_logging_and_telemetry(&log_directives)?; + init_logging(&log_directives)?; let chain_spec = match chain_spec { Some(chain_spec) => ChainSpec::from_json_bytes(chain_spec.as_bytes().to_vec()) .map_err(|e| format!("{:?}", e))?, None => crate::chain_spec::development_config(), }; - let telemetry_handle = telemetry_worker.handle(); - let config = browser_configuration( - chain_spec, - Some(telemetry_handle), - ).await?; + let config = browser_configuration(chain_spec).await?; info!("Substrate browser node"); info!("✌️ version {}", config.impl_version); @@ -60,10 +56,8 @@ async fn start_inner( // Create the service. This is the most heavy initialization step. let (task_manager, rpc_handlers) = crate::service::new_light_base(config) - .map(|(components, rpc_handlers, _, _, _, _)| (components, rpc_handlers)) + .map(|(components, rpc_handlers, _, _, _)| (components, rpc_handlers)) .map_err(|e| format!("{:?}", e))?; - task_manager.spawn_handle().spawn("telemetry", telemetry_worker.run()); - Ok(browser_utils::start_client(task_manager, rpc_handlers)) } diff --git a/bin/node/cli/src/chain_spec.rs b/bin/node/cli/src/chain_spec.rs index 43383bb3c3a96..ae1418981f167 100644 --- a/bin/node/cli/src/chain_spec.rs +++ b/bin/node/cli/src/chain_spec.rs @@ -443,7 +443,7 @@ pub(crate) mod tests { Ok(sc_service_test::TestNetComponents::new(task_manager, client, network, transaction_pool)) }, |config| { - let (keep_alive, _, _, client, network, transaction_pool) = new_light_base(config)?; + let (keep_alive, _, client, network, transaction_pool) = new_light_base(config)?; Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) } ); diff --git a/bin/node/cli/src/service.rs b/bin/node/cli/src/service.rs index 92f30a72577d6..1351782315be7 100644 --- a/bin/node/cli/src/service.rs +++ b/bin/node/cli/src/service.rs @@ -33,7 +33,7 @@ use sp_runtime::traits::Block as BlockT; use futures::prelude::*; use sc_client_api::{ExecutorProvider, RemoteBackend}; use node_executor::Executor; -use sc_telemetry::{TelemetryConnectionNotifier, TelemetrySpan}; +use sc_telemetry::{Telemetry, TelemetryWorker}; use sc_consensus_babe::SlotProportion; type FullClient = sc_service::TFullClient; @@ -43,7 +43,9 @@ type FullGrandpaBlockImport = grandpa::GrandpaBlockImport; type LightClient = sc_service::TLightClient; -pub fn new_partial(config: &Configuration) -> Result Result, sc_transaction_pool::FullPool, @@ -58,12 +60,31 @@ pub fn new_partial(config: &Configuration) -> Result, ), grandpa::SharedVoterState, + Option, ) >, ServiceError> { + let telemetry = config.telemetry_endpoints.clone() + .filter(|x| !x.is_empty()) + .map(|endpoints| -> Result<_, sc_telemetry::Error> { + let worker = TelemetryWorker::new(16)?; + let telemetry = worker.handle().new_telemetry(endpoints); + Ok((worker, telemetry)) + }) + .transpose()?; + let (client, backend, keystore_container, task_manager) = - sc_service::new_full_parts::(&config)?; + sc_service::new_full_parts::( + &config, + telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()), + )?; let client = Arc::new(client); + let telemetry = telemetry + .map(|(worker, telemetry)| { + task_manager.spawn_handle().spawn("telemetry", worker.run()); + telemetry + }); + let select_chain = sc_consensus::LongestChain::new(backend.clone()); let transaction_pool = sc_transaction_pool::BasicPool::new_full( @@ -75,7 +96,10 @@ pub fn new_partial(config: &Configuration) -> Result), select_chain.clone(), + client.clone(), + &(client.clone() as Arc<_>), + select_chain.clone(), + telemetry.as_ref().map(|x| x.handle()), )?; let justification_import = grandpa_block_import.clone(); @@ -97,6 +121,7 @@ pub fn new_partial(config: &Configuration) -> Result Result Result { +pub fn new_full( + config: Configuration, +) -> Result { new_full_base(config, |_, _| ()).map(|NewFullBase { task_manager, .. }| { task_manager }) } -pub fn new_light_base(mut config: Configuration) -> Result<( - TaskManager, RpcHandlers, Option, Arc, +pub fn new_light_base( + mut config: Configuration, +) -> Result<( + TaskManager, + RpcHandlers, + Arc, Arc::Hash>>, Arc>> ), ServiceError> { + let telemetry = config.telemetry_endpoints.clone() + .filter(|x| !x.is_empty()) + .map(|endpoints| -> Result<_, sc_telemetry::Error> { + #[cfg(feature = "browser")] + let transport = Some( + sc_telemetry::ExtTransport::new(libp2p_wasm_ext::ffi::websocket_transport()) + ); + #[cfg(not(feature = "browser"))] + let transport = None; + + let worker = TelemetryWorker::with_transport(16, transport)?; + let telemetry = worker.handle().new_telemetry(endpoints); + Ok((worker, telemetry)) + }) + .transpose()?; + let (client, backend, keystore_container, mut task_manager, on_demand) = - sc_service::new_light_parts::(&config)?; + sc_service::new_light_parts::( + &config, + telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()), + )?; + + let mut telemetry = telemetry + .map(|(worker, telemetry)| { + task_manager.spawn_handle().spawn("telemetry", worker.run()); + telemetry + }); config.network.extra_sets.push(grandpa::grandpa_peers_set_config()); @@ -393,6 +448,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<( client.clone(), &(client.clone() as Arc<_>), select_chain.clone(), + telemetry.as_ref().map(|x| x.handle()), )?; let justification_import = grandpa_block_import.clone(); @@ -414,6 +470,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<( &task_manager.spawn_essential_handle(), config.prometheus_registry(), sp_consensus::NeverCanAuthor, + telemetry.as_ref().map(|x| x.handle()), )?; let (network, network_status_sinks, system_rpc_tx, network_starter) = @@ -443,10 +500,7 @@ pub fn new_light_base(mut config: Configuration) -> Result<( let rpc_extensions = node_rpc::create_light(light_deps); - let telemetry_span = TelemetrySpan::new(); - let _telemetry_span_entered = telemetry_span.enter(); - - let (rpc_handlers, telemetry_connection_notifier) = + let rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams { on_demand: Some(on_demand), remote_blockchain: Some(backend.remote_blockchain()), @@ -457,13 +511,12 @@ pub fn new_light_base(mut config: Configuration) -> Result<( config, backend, network_status_sinks, system_rpc_tx, network: network.clone(), task_manager: &mut task_manager, - telemetry_span: Some(telemetry_span.clone()), + telemetry: telemetry.as_mut(), })?; Ok(( task_manager, rpc_handlers, - telemetry_connection_notifier, client, network, transaction_pool, @@ -471,8 +524,10 @@ pub fn new_light_base(mut config: Configuration) -> Result<( } /// Builds a new service for a light client. -pub fn new_light(config: Configuration) -> Result { - new_light_base(config).map(|(task_manager, _, _, _, _, _)| { +pub fn new_light( + config: Configuration, +) -> Result { + new_light_base(config).map(|(task_manager, _, _, _, _)| { task_manager }) } @@ -553,7 +608,7 @@ mod tests { Ok((node, (inherent_data_providers, setup_handles.unwrap()))) }, |config| { - let (keep_alive, _, _, client, network, transaction_pool) = new_light_base(config)?; + let (keep_alive, _, client, network, transaction_pool) = new_light_base(config)?; Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) }, |service, &mut (ref inherent_data_providers, (ref mut block_import, ref babe_link))| { @@ -580,6 +635,7 @@ mod tests { service.client(), service.transaction_pool(), None, + None, ); let epoch_descriptor = babe_link.epoch_changes().lock().epoch_descriptor_for_child_of( @@ -708,7 +764,7 @@ mod tests { Ok(sc_service_test::TestNetComponents::new(task_manager, client, network, transaction_pool)) }, |config| { - let (keep_alive, _, _, client, network, transaction_pool) = new_light_base(config)?; + let (keep_alive, _, client, network, transaction_pool) = new_light_base(config)?; Ok(sc_service_test::TestNetComponents::new(keep_alive, client, network, transaction_pool)) }, vec![ diff --git a/bin/node/inspect/src/command.rs b/bin/node/inspect/src/command.rs index a1a9c947a561b..9c14a71375f5f 100644 --- a/bin/node/inspect/src/command.rs +++ b/bin/node/inspect/src/command.rs @@ -34,7 +34,7 @@ impl InspectCmd { RA: Send + Sync + 'static, EX: NativeExecutionDispatch + 'static, { - let client = new_full_client::(&config)?; + let client = new_full_client::(&config, None)?; let inspect = Inspector::::new(client); match &self.command { diff --git a/bin/node/testing/src/bench.rs b/bin/node/testing/src/bench.rs index 668284101bee3..cc6d7587dd517 100644 --- a/bin/node/testing/src/bench.rs +++ b/bin/node/testing/src/bench.rs @@ -427,6 +427,7 @@ impl BenchDb { ExecutionExtensions::new(profile.into_execution_strategies(), None, None), Box::new(task_executor.clone()), None, + None, Default::default(), ).expect("Should not fail"); diff --git a/client/basic-authorship/src/basic_authorship.rs b/client/basic-authorship/src/basic_authorship.rs index 0c5bb7abefa5b..93ee4fc1445de 100644 --- a/client/basic-authorship/src/basic_authorship.rs +++ b/client/basic-authorship/src/basic_authorship.rs @@ -32,7 +32,7 @@ use sp_runtime::{ traits::{Block as BlockT, Hash as HashT, Header as HeaderT, DigestFor, BlakeTwo256}, }; use sp_transaction_pool::{TransactionPool, InPoolTransaction}; -use sc_telemetry::{telemetry, CONSENSUS_INFO}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; use sc_block_builder::{BlockBuilderApi, BlockBuilderProvider}; use sp_api::{ProvideRuntimeApi, ApiExt}; use futures::{future, future::{Future, FutureExt}, channel::oneshot, select}; @@ -60,9 +60,10 @@ pub struct ProposerFactory { transaction_pool: Arc, /// Prometheus Link, metrics: PrometheusMetrics, + max_block_size: usize, + telemetry: Option, /// phantom member to pin the `Backend`/`ProofRecording` type. _phantom: PhantomData<(B, PR)>, - max_block_size: usize, } impl ProposerFactory { @@ -74,14 +75,16 @@ impl ProposerFactory { client: Arc, transaction_pool: Arc, prometheus: Option<&PrometheusRegistry>, + telemetry: Option, ) -> Self { ProposerFactory { spawn_handle: Box::new(spawn_handle), - client, transaction_pool, metrics: PrometheusMetrics::new(prometheus), - _phantom: PhantomData, max_block_size: DEFAULT_MAX_BLOCK_SIZE, + telemetry, + client, + _phantom: PhantomData, } } } @@ -95,14 +98,16 @@ impl ProposerFactory { client: Arc, transaction_pool: Arc, prometheus: Option<&PrometheusRegistry>, + telemetry: Option, ) -> Self { ProposerFactory { spawn_handle: Box::new(spawn_handle), client, transaction_pool, metrics: PrometheusMetrics::new(prometheus), - _phantom: PhantomData, max_block_size: DEFAULT_MAX_BLOCK_SIZE, + telemetry, + _phantom: PhantomData, } } } @@ -147,8 +152,9 @@ impl ProposerFactory transaction_pool: self.transaction_pool.clone(), now, metrics: self.metrics.clone(), - _phantom: PhantomData, max_block_size: self.max_block_size, + telemetry: self.telemetry.clone(), + _phantom: PhantomData, }; proposer @@ -189,8 +195,9 @@ pub struct Proposer { transaction_pool: Arc, now: Box time::Instant + Send + Sync>, metrics: PrometheusMetrics, - _phantom: PhantomData<(B, PR)>, max_block_size: usize, + telemetry: Option, + _phantom: PhantomData<(B, PR)>, } impl sp_consensus::Proposer for @@ -371,7 +378,10 @@ impl Proposer .collect::>() .join(", ") ); - telemetry!(CONSENSUS_INFO; "prepared_block_for_proposing"; + telemetry!( + self.telemetry; + CONSENSUS_INFO; + "prepared_block_for_proposing"; "number" => ?block.header().number(), "hash" => ?::Hash::from(block.header().hash()), ); @@ -461,6 +471,7 @@ mod tests { client.clone(), txpool.clone(), None, + None, ); let cell = Mutex::new((false, time::Instant::now())); @@ -508,6 +519,7 @@ mod tests { client.clone(), txpool.clone(), None, + None, ); let cell = Mutex::new((false, time::Instant::now())); @@ -564,6 +576,7 @@ mod tests { client.clone(), txpool.clone(), None, + None, ); let proposer = proposer_factory.init_with_now( @@ -639,6 +652,7 @@ mod tests { client.clone(), txpool.clone(), None, + None, ); let mut propose_block = | client: &TestClient, diff --git a/client/basic-authorship/src/lib.rs b/client/basic-authorship/src/lib.rs index ccf73cc93f197..acaf85db76336 100644 --- a/client/basic-authorship/src/lib.rs +++ b/client/basic-authorship/src/lib.rs @@ -45,6 +45,7 @@ //! client.clone(), //! txpool.clone(), //! None, +//! None, //! ); //! //! // From this factory, we create a `Proposer`. diff --git a/client/cli/src/arg_enums.rs b/client/cli/src/arg_enums.rs index 4b1f197cf3ea5..aeb3eeacc6f2c 100644 --- a/client/cli/src/arg_enums.rs +++ b/client/cli/src/arg_enums.rs @@ -64,7 +64,6 @@ arg_enum! { #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum TracingReceiver { Log, - Telemetry, } } @@ -72,7 +71,6 @@ impl Into for TracingReceiver { fn into(self) -> sc_tracing::TracingReceiver { match self { TracingReceiver::Log => sc_tracing::TracingReceiver::Log, - TracingReceiver::Telemetry => sc_tracing::TracingReceiver::Telemetry, } } } diff --git a/client/cli/src/config.rs b/client/cli/src/config.rs index 748e3b1012695..289d6dc7cc39f 100644 --- a/client/cli/src/config.rs +++ b/client/cli/src/config.rs @@ -33,7 +33,6 @@ use sc_service::config::{ TaskExecutor, TelemetryEndpoints, TransactionPoolOptions, WasmExecutionMethod, }; use sc_service::{ChainSpec, TracingReceiver, KeepBlocks, TransactionStorageMode}; -use sc_telemetry::TelemetryHandle; use sc_tracing::logging::LoggerBuilder; use std::net::SocketAddr; use std::path::PathBuf; @@ -470,7 +469,6 @@ pub trait CliConfiguration: Sized { &self, cli: &C, task_executor: TaskExecutor, - telemetry_handle: Option, ) -> Result { let is_dev = self.is_dev()?; let chain_id = self.chain_id(is_dev)?; @@ -488,12 +486,7 @@ pub trait CliConfiguration: Sized { let max_runtime_instances = self.max_runtime_instances()?.unwrap_or(8); let is_validator = role.is_authority(); let (keystore_remote, keystore) = self.keystore_config(&config_dir)?; - let telemetry_endpoints = telemetry_handle - .as_ref() - .and_then(|_| self.telemetry_endpoints(&chain_spec).transpose()) - .transpose()? - // Don't initialise telemetry if `telemetry_endpoints` == Some([]) - .filter(|x| !x.is_empty()); + let telemetry_endpoints = self.telemetry_endpoints(&chain_spec)?; let unsafe_pruning = self .import_params() @@ -548,7 +541,6 @@ pub trait CliConfiguration: Sized { role, base_path: Some(base_path), informant_output_format: Default::default(), - telemetry_handle, }) } @@ -579,16 +571,12 @@ pub trait CliConfiguration: Sized { /// 1. Sets the panic handler /// 2. Initializes the logger /// 3. Raises the FD limit - fn init(&self) -> Result { + fn init(&self) -> Result<()> { sp_panic_handler::set(&C::support_url(), &C::impl_version()); let mut logger = LoggerBuilder::new(self.log_filters()?); logger.with_log_reloading(!self.is_log_filter_reloading_disabled()?); - if let Some(transport) = self.telemetry_external_transport()? { - logger.with_transport(transport); - } - if let Some(tracing_targets) = self.tracing_targets()? { let tracing_receiver = self.tracing_receiver()?; logger.with_profiling(tracing_receiver, tracing_targets); @@ -598,7 +586,7 @@ pub trait CliConfiguration: Sized { logger.with_colors(false); } - let telemetry_worker = logger.init()?; + logger.init()?; if let Some(new_limit) = fdlimit::raise_fd_limit() { if new_limit < RECOMMENDED_OPEN_FILE_DESCRIPTOR_LIMIT { @@ -610,7 +598,7 @@ pub trait CliConfiguration: Sized { } } - Ok(telemetry_worker) + Ok(()) } } diff --git a/client/cli/src/lib.rs b/client/cli/src/lib.rs index 602c53272ea59..f81c5160ca82b 100644 --- a/client/cli/src/lib.rs +++ b/client/cli/src/lib.rs @@ -37,7 +37,6 @@ pub use params::*; pub use runner::*; pub use sc_service::{ChainSpec, Role}; use sc_service::{Configuration, TaskExecutor}; -use sc_telemetry::TelemetryHandle; pub use sc_tracing::logging::LoggerBuilder; pub use sp_version::RuntimeVersion; use std::io::Write; @@ -214,16 +213,15 @@ pub trait SubstrateCli: Sized { &self, command: &T, task_executor: TaskExecutor, - telemetry_handle: Option, ) -> error::Result { - command.create_configuration(self, task_executor, telemetry_handle) + command.create_configuration(self, task_executor) } /// Create a runner for the command provided in argument. This will create a Configuration and /// a tokio runtime fn create_runner(&self, command: &T) -> error::Result> { - let telemetry_worker = command.init::()?; - Runner::new(self, command, telemetry_worker) + command.init::()?; + Runner::new(self, command) } /// Native runtime version. diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index 61a7fe9b01454..b512588a204c8 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -25,7 +25,6 @@ use futures::select; use futures::{future, future::FutureExt, Future}; use log::info; use sc_service::{Configuration, TaskType, TaskManager}; -use sc_telemetry::{TelemetryHandle, TelemetryWorker}; use sp_utils::metrics::{TOKIO_THREADS_ALIVE, TOKIO_THREADS_TOTAL}; use std::marker::PhantomData; use sc_service::Error as ServiceError; @@ -115,7 +114,6 @@ where pub struct Runner { config: Configuration, tokio_runtime: tokio::runtime::Runtime, - telemetry_worker: TelemetryWorker, phantom: PhantomData, } @@ -124,7 +122,6 @@ impl Runner { pub fn new( cli: &C, command: &T, - telemetry_worker: TelemetryWorker, ) -> Result> { let tokio_runtime = build_runtime()?; let runtime_handle = tokio_runtime.handle().clone(); @@ -138,16 +135,12 @@ impl Runner { } }; - let telemetry_handle = telemetry_worker.handle(); - Ok(Runner { config: command.create_configuration( cli, task_executor.into(), - Some(telemetry_handle), )?, tokio_runtime, - telemetry_worker, phantom: PhantomData, }) } @@ -197,7 +190,6 @@ impl Runner { { self.print_node_infos(); let mut task_manager = self.tokio_runtime.block_on(initialize(self.config))?; - task_manager.spawn_handle().spawn("telemetry_worker", self.telemetry_worker.run()); let res = self.tokio_runtime.block_on(main(task_manager.future().fuse())); self.tokio_runtime.block_on(task_manager.clean_shutdown()); Ok(res?) @@ -236,11 +228,4 @@ impl Runner { pub fn config_mut(&mut self) -> &mut Configuration { &mut self.config } - - /// Get a new [`TelemetryHandle`]. - /// - /// This is used when you want to register with the [`TelemetryWorker`]. - pub fn telemetry_handle(&self) -> TelemetryHandle { - self.telemetry_worker.handle() - } } diff --git a/client/consensus/aura/src/import_queue.rs b/client/consensus/aura/src/import_queue.rs index 638931477a99c..d3ed2bea3e115 100644 --- a/client/consensus/aura/src/import_queue.rs +++ b/client/consensus/aura/src/import_queue.rs @@ -45,7 +45,7 @@ use sp_api::ProvideRuntimeApi; use sp_core::crypto::Pair; use sp_inherents::{InherentDataProviders, InherentData}; use sp_timestamp::InherentError as TIError; -use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_TRACE, CONSENSUS_DEBUG, CONSENSUS_INFO}; use sc_consensus_slots::{CheckedHeader, SlotCompatible, check_equivocation}; use sp_consensus_slots::Slot; use sp_api::ApiExt; @@ -129,6 +129,7 @@ pub struct AuraVerifier { inherent_data_providers: InherentDataProviders, can_author_with: CAW, check_for_equivocation: CheckForEquivocation, + telemetry: Option, } impl AuraVerifier { @@ -137,12 +138,14 @@ impl AuraVerifier { inherent_data_providers: InherentDataProviders, can_author_with: CAW, check_for_equivocation: CheckForEquivocation, + telemetry: Option, ) -> Self { Self { client, inherent_data_providers, can_author_with, check_for_equivocation, + telemetry, phantom: PhantomData, } } @@ -197,7 +200,10 @@ impl AuraVerifier where "halting for block {} seconds in the future", diff ); - telemetry!(CONSENSUS_INFO; "aura.halting_for_future_block"; + telemetry!( + self.telemetry; + CONSENSUS_INFO; + "aura.halting_for_future_block"; "diff" => ?diff ); thread::sleep(Duration::from_secs(diff)); @@ -287,7 +293,12 @@ impl Verifier for AuraVerifier where } trace!(target: "aura", "Checked {:?}; importing.", pre_header); - telemetry!(CONSENSUS_TRACE; "aura.checked_and_importing"; "pre_header" => ?pre_header); + telemetry!( + self.telemetry; + CONSENSUS_TRACE; + "aura.checked_and_importing"; + "pre_header" => ?pre_header, + ); // Look for an authorities-change log. let maybe_keys = pre_header.digest() @@ -314,8 +325,13 @@ impl Verifier for AuraVerifier where } CheckedHeader::Deferred(a, b) => { debug!(target: "aura", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); - telemetry!(CONSENSUS_DEBUG; "aura.header_too_far_in_future"; - "hash" => ?hash, "a" => ?a, "b" => ?b + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "aura.header_too_far_in_future"; + "hash" => ?hash, + "a" => ?a, + "b" => ?b, ); Err(format!("Header {:?} rejected: too far in the future", hash)) } @@ -485,6 +501,8 @@ pub struct ImportQueueParams<'a, Block, I, C, S, CAW> { pub check_for_equivocation: CheckForEquivocation, /// The duration of one slot. pub slot_duration: SlotDuration, + /// Telemetry instance used to report telemetry metrics. + pub telemetry: Option, } /// Start an import queue for the Aura consensus algorithm. @@ -499,6 +517,7 @@ pub fn import_queue<'a, P, Block, I, C, S, CAW>( can_author_with, check_for_equivocation, slot_duration, + telemetry, }: ImportQueueParams<'a, Block, I, C, S, CAW> ) -> Result, sp_consensus::Error> where Block: BlockT, @@ -530,6 +549,7 @@ pub fn import_queue<'a, P, Block, I, C, S, CAW>( inherent_data_providers, can_author_with, check_for_equivocation, + telemetry, ); Ok(BasicQueue::new( diff --git a/client/consensus/aura/src/lib.rs b/client/consensus/aura/src/lib.rs index 12ce0e1697139..bdeb4f15f322f 100644 --- a/client/consensus/aura/src/lib.rs +++ b/client/consensus/aura/src/lib.rs @@ -56,6 +56,7 @@ use sp_keystore::{SyncCryptoStorePtr, SyncCryptoStore}; use sp_inherents::{InherentDataProviders, InherentData}; use sp_timestamp::{TimestampInherentData, InherentType as TimestampInherent}; use sc_consensus_slots::{SlotInfo, SlotCompatible, StorageChanges, BackoffAuthoringBlocksStrategy}; +use sc_telemetry::TelemetryHandle; use sp_consensus_slots::Slot; mod import_queue; @@ -149,6 +150,8 @@ pub struct StartAuraParams { /// slot. However, the proposing can still take longer when there is some lenience factor applied, /// because there were no blocks produced for some slots. pub block_proposal_slot_portion: SlotProportion, + /// Telemetry instance used to report telemetry metrics. + pub telemetry: Option, } /// Start the aura worker. The returned future should be run in a futures executor. @@ -166,6 +169,7 @@ pub fn start_aura( keystore, can_author_with, block_proposal_slot_portion, + telemetry, }: StartAuraParams, ) -> Result, sp_consensus::Error> where B: BlockT, @@ -184,13 +188,14 @@ pub fn start_aura( BS: BackoffAuthoringBlocksStrategy> + Send + 'static, { let worker = AuraWorker { - client, + client: client.clone(), block_import: Arc::new(Mutex::new(block_import)), env, keystore, sync_oracle: sync_oracle.clone(), force_authoring, backoff_authoring_blocks, + telemetry, _key_type: PhantomData::

, block_proposal_slot_portion, }; @@ -218,6 +223,7 @@ struct AuraWorker { force_authoring: bool, backoff_authoring_blocks: Option, block_proposal_slot_portion: SlotProportion, + telemetry: Option, _key_type: PhantomData

, } @@ -371,6 +377,10 @@ where })) } + fn telemetry(&self) -> Option { + self.telemetry.clone() + } + fn proposing_remaining_duration( &self, head: &B::Header, @@ -595,6 +605,7 @@ mod tests { inherent_data_providers, AlwaysCanAuthor, CheckForEquivocation::Yes, + None, ) }, PeersClient::Light(_, _) => unreachable!("No (yet) tests for light client + Aura"), @@ -670,6 +681,7 @@ mod tests { keystore, can_author_with: sp_consensus::AlwaysCanAuthor, block_proposal_slot_portion: SlotProportion::new(0.5), + telemetry: None, }).expect("Starts aura")); } @@ -729,6 +741,7 @@ mod tests { sync_oracle: DummyOracle.clone(), force_authoring: false, backoff_authoring_blocks: Some(BackoffAuthoringOnFinalizedHeadLagging::default()), + telemetry: None, _key_type: PhantomData::, block_proposal_slot_portion: SlotProportion::new(0.5), }; @@ -777,6 +790,7 @@ mod tests { sync_oracle: DummyOracle.clone(), force_authoring: false, backoff_authoring_blocks: Option::<()>::None, + telemetry: None, _key_type: PhantomData::, block_proposal_slot_portion: SlotProportion::new(0.5), }; diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 861f82c0090a5..1ea38820c965c 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -90,7 +90,7 @@ use sp_runtime::{ use sp_api::{ProvideRuntimeApi, NumberFor}; use parking_lot::Mutex; use sp_inherents::{InherentDataProviders, InherentData}; -use sc_telemetry::{telemetry, CONSENSUS_TRACE, CONSENSUS_DEBUG}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_TRACE, CONSENSUS_DEBUG}; use sp_consensus::{ BlockImport, Environment, Proposer, BlockCheckParams, ForkChoiceStrategy, BlockImportParams, BlockOrigin, Error as ConsensusError, @@ -402,6 +402,9 @@ pub struct BabeParams { /// slot. However, the proposing can still take longer when there is some lenience factor applied, /// because there were no blocks produced for some slots. pub block_proposal_slot_portion: SlotProportion, + + /// Handle use to report telemetries. + pub telemetry: Option, } /// Start the babe worker. @@ -418,13 +421,15 @@ pub fn start_babe(BabeParams { babe_link, can_author_with, block_proposal_slot_portion, + telemetry, }: BabeParams) -> Result< BabeWorker, sp_consensus::Error, > where B: BlockT, C: ProvideRuntimeApi + ProvideCache + ProvideUncles + BlockchainEvents - + HeaderBackend + HeaderMetadata + Send + Sync + 'static, + + HeaderBackend + HeaderMetadata + + Send + Sync + 'static, C::Api: BabeApi, SC: SelectChain + 'static, E: Environment + Send + Sync + 'static, @@ -453,6 +458,7 @@ pub fn start_babe(BabeParams { slot_notification_sinks: slot_notification_sinks.clone(), config: config.clone(), block_proposal_slot_portion, + telemetry, }; register_babe_inherent_data_provider(&inherent_data_providers, config.slot_duration())?; @@ -609,6 +615,7 @@ struct BabeSlotWorker { slot_notification_sinks: SlotNotificationSinks, config: Config, block_proposal_slot_portion: SlotProportion, + telemetry: Option, } impl sc_consensus_slots::SimpleSlotWorker @@ -799,6 +806,10 @@ where })) } + fn telemetry(&self) -> Option { + self.telemetry.clone() + } + fn proposing_remaining_duration( &self, parent_head: &B::Header, @@ -947,6 +958,7 @@ pub struct BabeVerifier { epoch_changes: SharedEpochChanges, time_source: TimeSource, can_author_with: CAW, + telemetry: Option, } impl BabeVerifier @@ -1174,6 +1186,7 @@ where trace!(target: "babe", "Checked {:?}; importing.", pre_header); telemetry!( + self.telemetry; CONSENSUS_TRACE; "babe.checked_and_importing"; "pre_header" => ?pre_header); @@ -1192,7 +1205,10 @@ where } CheckedHeader::Deferred(a, b) => { debug!(target: "babe", "Checking {:?} failed; {:?}, {:?}.", hash, a, b); - telemetry!(CONSENSUS_DEBUG; "babe.header_too_far_in_future"; + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "babe.header_too_far_in_future"; "hash" => ?hash, "a" => ?a, "b" => ?b ); Err(Error::::TooFarInFuture(hash).into()) @@ -1599,11 +1615,13 @@ pub fn import_queue( spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, can_author_with: CAW, + telemetry: Option, ) -> ClientResult> where Inner: BlockImport> + Send + Sync + 'static, - Client: ProvideRuntimeApi + ProvideCache + Send + Sync + AuxStore + 'static, - Client: HeaderBackend + HeaderMetadata, + Client: ProvideRuntimeApi + ProvideCache + HeaderBackend + + HeaderMetadata + AuxStore + + Send + Sync + 'static, Client::Api: BlockBuilderApi + BabeApi + ApiExt, SelectChain: sp_consensus::SelectChain + 'static, CAW: CanAuthorWith + Send + Sync + 'static, @@ -1611,13 +1629,14 @@ pub fn import_queue( register_babe_inherent_data_provider(&inherent_data_providers, babe_link.config.slot_duration)?; let verifier = BabeVerifier { - client, select_chain, inherent_data_providers, config: babe_link.config, epoch_changes: babe_link.epoch_changes, time_source: babe_link.time_source, can_author_with, + telemetry, + client, }; Ok(BasicQueue::new( diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index 9ffffc37fd3bf..d3e51b020326c 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -320,6 +320,7 @@ impl TestNetFactory for BabeTestNet { epoch_changes: data.link.epoch_changes.clone(), time_source: data.link.time_source.clone(), can_author_with: AlwaysCanAuthor, + telemetry: None, }, mutator: MUTATOR.with(|m| m.borrow().clone()), } @@ -432,6 +433,7 @@ fn run_one_test( keystore, can_author_with: sp_consensus::AlwaysCanAuthor, block_proposal_slot_portion: SlotProportion::new(0.5), + telemetry: None, }).expect("Starts babe")); } futures::executor::block_on(future::select( diff --git a/client/consensus/manual-seal/src/lib.rs b/client/consensus/manual-seal/src/lib.rs index 320f196c1052c..64de70939503c 100644 --- a/client/consensus/manual-seal/src/lib.rs +++ b/client/consensus/manual-seal/src/lib.rs @@ -300,6 +300,7 @@ mod tests { client.clone(), pool.clone(), None, + None, ); // this test checks that blocks are created as soon as transactions are imported into the pool. let (sender, receiver) = futures::channel::oneshot::channel(); @@ -371,6 +372,7 @@ mod tests { client.clone(), pool.clone(), None, + None, ); // this test checks that blocks are created as soon as an engine command is sent over the stream. let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024); @@ -446,6 +448,7 @@ mod tests { client.clone(), pool.clone(), None, + None, ); // this test checks that blocks are created as soon as an engine command is sent over the stream. let (mut sink, commands_stream) = futures::channel::mpsc::channel(1024); diff --git a/client/consensus/slots/src/lib.rs b/client/consensus/slots/src/lib.rs index 1b40ac102d5d7..037402260c0d3 100644 --- a/client/consensus/slots/src/lib.rs +++ b/client/consensus/slots/src/lib.rs @@ -47,7 +47,7 @@ use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, Header, HashFor, NumberFor} }; -use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_WARN, CONSENSUS_INFO}; /// The changes that need to applied to the storage to create the state for a block. /// @@ -180,6 +180,9 @@ pub trait SimpleSlotWorker { /// Returns a `Proposer` to author on top of the given block. fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer; + /// Returns a [`TelemetryHandle`] if any. + fn telemetry(&self) -> Option; + /// Remaining duration for proposing. fn proposing_remaining_duration( &self, @@ -197,6 +200,7 @@ pub trait SimpleSlotWorker { >::Proposal: Unpin + Send + 'static, { let (timestamp, slot) = (slot_info.timestamp, slot_info.slot); + let telemetry = self.telemetry(); let proposing_remaining_duration = self.proposing_remaining_duration(&chain_head, &slot_info); @@ -219,7 +223,9 @@ pub trait SimpleSlotWorker { warn!("Unable to fetch epoch data at block {:?}: {:?}", chain_head.hash(), err); telemetry!( - CONSENSUS_WARN; "slots.unable_fetching_authorities"; + telemetry; + CONSENSUS_WARN; + "slots.unable_fetching_authorities"; "slot" => ?chain_head.hash(), "err" => ?err, ); @@ -238,6 +244,7 @@ pub trait SimpleSlotWorker { { debug!(target: self.logging_target(), "Skipping proposal slot. Waiting for the network."); telemetry!( + telemetry; CONSENSUS_DEBUG; "slots.skipping_proposal_slot"; "authorities_len" => authorities_len, @@ -263,24 +270,29 @@ pub trait SimpleSlotWorker { ); telemetry!( + telemetry; CONSENSUS_DEBUG; "slots.starting_authorship"; "slot_num" => *slot, "timestamp" => timestamp, ); - let awaiting_proposer = self.proposer(&chain_head).map_err(move |err| { - warn!("Unable to author block in slot {:?}: {:?}", slot, err); + let awaiting_proposer = { + let telemetry = telemetry.clone(); + self.proposer(&chain_head).map_err(move |err| { + warn!("Unable to author block in slot {:?}: {:?}", slot, err); - telemetry!( - CONSENSUS_WARN; - "slots.unable_authoring_block"; - "slot" => *slot, - "err" => ?err - ); + telemetry!( + telemetry; + CONSENSUS_WARN; + "slots.unable_authoring_block"; + "slot" => *slot, + "err" => ?err + ); - err - }); + err + }) + }; let logs = self.pre_digest_data(slot, &claim); @@ -295,7 +307,8 @@ pub trait SimpleSlotWorker { proposing_remaining_duration.mul_f32(0.98), ).map_err(|e| sp_consensus::Error::ClientImport(format!("{:?}", e)))); - let proposal_work = + let proposal_work = { + let telemetry = telemetry.clone(); futures::future::select(proposing, proposing_remaining).map(move |v| match v { Either::Left((b, _)) => b.map(|b| (b, claim)), Either::Right(_) => { @@ -307,6 +320,7 @@ pub trait SimpleSlotWorker { #[cfg(build_type="debug")] info!("👉 Recompile your node in `--release` mode to mitigate this problem."); telemetry!( + telemetry; CONSENSUS_INFO; "slots.discarding_proposal_took_too_long"; "slot" => *slot, @@ -314,7 +328,8 @@ pub trait SimpleSlotWorker { Err(sp_consensus::Error::ClientImport("Timeout in the Slots proposer".into())) }, - }); + }) + }; let block_import_params_maker = self.block_import_params(); let block_import = self.block_import(); @@ -343,7 +358,10 @@ pub trait SimpleSlotWorker { header_hash, ); - telemetry!(CONSENSUS_INFO; "slots.pre_sealed_block"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "slots.pre_sealed_block"; "header_num" => ?header_num, "hash_now" => ?block_import_params.post_hash(), "hash_previously" => ?header_hash, @@ -359,7 +377,9 @@ pub trait SimpleSlotWorker { ); telemetry!( - CONSENSUS_WARN; "slots.err_with_block_built_on"; + telemetry; + CONSENSUS_WARN; + "slots.err_with_block_built_on"; "hash" => ?parent_hash, "err" => ?err, ); @@ -449,7 +469,8 @@ where Either::Right(future::ready(Ok(()))) } else { Either::Left( - worker.on_slot(chain_head, slot_info).then(|_| future::ready(Ok(()))) + worker.on_slot(chain_head, slot_info) + .then(|_| future::ready(Ok(()))) ) } }).then(|res| { diff --git a/client/finality-grandpa/src/authorities.rs b/client/finality-grandpa/src/authorities.rs index 11d3d4ba691da..1854a33d29f1f 100644 --- a/client/finality-grandpa/src/authorities.rs +++ b/client/finality-grandpa/src/authorities.rs @@ -23,7 +23,7 @@ use parking_lot::RwLock; use finality_grandpa::voter_set::VoterSet; use parity_scale_codec::{Encode, Decode}; use log::debug; -use sc_telemetry::{telemetry, CONSENSUS_INFO}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; use sp_finality_grandpa::{AuthorityId, AuthorityList}; use std::cmp::Ord; @@ -43,8 +43,8 @@ pub enum Error { #[display(fmt = "Multiple pending forced authority set changes are not allowed.")] MultiplePendingForcedAuthoritySetChanges, #[display( - fmt = "A pending forced authority set change could not be applied since it must be applied after \ - the pending standard change at #{}", + fmt = "A pending forced authority set change could not be applied since it must be applied \ + after the pending standard change at #{}", _0 )] ForcedAuthoritySetChangeDependencyUnsatisfied(N), @@ -278,9 +278,13 @@ where let hash = pending.canon_hash.clone(); let number = pending.canon_height.clone(); - debug!(target: "afg", "Inserting potential standard set change signaled at block {:?} \ - (delayed by {:?} blocks).", - (&number, &hash), pending.delay); + debug!( + target: "afg", + "Inserting potential standard set change signaled at block {:?} (delayed by {:?} + blocks).", + (&number, &hash), + pending.delay, + ); self.pending_standard_changes.import( hash, @@ -289,8 +293,10 @@ where is_descendent_of, )?; - debug!(target: "afg", "There are now {} alternatives for the next pending standard change (roots), \ - and a total of {} pending standard changes (across all forks).", + debug!( + target: "afg", + "There are now {} alternatives for the next pending standard change (roots), and a + total of {} pending standard changes (across all forks).", self.pending_standard_changes.roots().count(), self.pending_standard_changes.iter().count(), ); @@ -326,9 +332,12 @@ where )) .unwrap_or_else(|i| i); - debug!(target: "afg", "Inserting potential forced set change at block {:?} \ - (delayed by {:?} blocks).", - (&pending.canon_height, &pending.canon_hash), pending.delay); + debug!( + target: "afg", + "Inserting potential forced set change at block {:?} (delayed by {:?} blocks).", + (&pending.canon_height, &pending.canon_hash), + pending.delay, + ); self.pending_forced_changes.insert(idx, pending); @@ -409,6 +418,7 @@ where best_number: N, is_descendent_of: &F, initial_sync: bool, + telemetry: Option, ) -> Result, Error> where F: Fn(&H, &H) -> Result, @@ -461,6 +471,7 @@ where ); telemetry!( + telemetry; CONSENSUS_INFO; "afg.applying_forced_authority_set_change"; "block" => ?change.canon_height @@ -505,6 +516,7 @@ where finalized_number: N, is_descendent_of: &F, initial_sync: bool, + telemetry: Option<&TelemetryHandle>, ) -> Result, Error> where F: Fn(&H, &H) -> Result, @@ -544,7 +556,10 @@ where "👴 Applying authority set change scheduled at block #{:?}", change.canon_height, ); - telemetry!(CONSENSUS_INFO; "afg.applying_scheduled_authority_set_change"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.applying_scheduled_authority_set_change"; "block" => ?change.canon_height ); @@ -894,6 +909,7 @@ mod tests { _ => unreachable!(), }), false, + None, ).unwrap(); assert!(status.changed); @@ -913,6 +929,7 @@ mod tests { _ => unreachable!(), }), false, + None, ).unwrap(); assert!(status.changed); @@ -971,7 +988,7 @@ mod tests { // trying to finalize past `change_c` without finalizing `change_a` first assert!(matches!( - authorities.apply_standard_changes("hash_d", 40, &is_descendent_of, false), + authorities.apply_standard_changes("hash_d", 40, &is_descendent_of, false, None), Err(Error::ForkTree(fork_tree::Error::UnfinalizedAncestor)) )); assert_eq!(authorities.authority_set_changes, AuthoritySetChanges::empty()); @@ -981,6 +998,7 @@ mod tests { 15, &is_descendent_of, false, + None, ).unwrap(); assert!(status.changed); @@ -996,6 +1014,7 @@ mod tests { 40, &is_descendent_of, false, + None, ).unwrap(); assert!(status.changed); @@ -1138,7 +1157,7 @@ mod tests { // too early and there's no forced changes to apply. assert!( authorities - .apply_forced_changes("hash_a10", 10, &static_is_descendent_of(true), false) + .apply_forced_changes("hash_a10", 10, &static_is_descendent_of(true), false, None) .unwrap() .is_none() ); @@ -1146,7 +1165,7 @@ mod tests { // too late. assert!( authorities - .apply_forced_changes("hash_a16", 16, &is_descendent_of_a, false) + .apply_forced_changes("hash_a16", 16, &is_descendent_of_a, false, None) .unwrap() .is_none() ); @@ -1154,7 +1173,7 @@ mod tests { // on time -- chooses the right change for this fork. assert_eq!( authorities - .apply_forced_changes("hash_a15", 15, &is_descendent_of_a, false) + .apply_forced_changes("hash_a15", 15, &is_descendent_of_a, false, None) .unwrap() .unwrap(), ( @@ -1202,7 +1221,7 @@ mod tests { // it should be enacted at the same block that signaled it assert!( authorities - .apply_forced_changes("hash_a", 5, &static_is_descendent_of(false), false) + .apply_forced_changes("hash_a", 5, &static_is_descendent_of(false), false, None) .unwrap() .is_some() ); @@ -1269,27 +1288,27 @@ mod tests { // the forced change cannot be applied since the pending changes it depends on // have not been applied yet. assert!(matches!( - authorities.apply_forced_changes("hash_d45", 45, &static_is_descendent_of(true), false), + authorities.apply_forced_changes("hash_d45", 45, &static_is_descendent_of(true), false, None), Err(Error::ForcedAuthoritySetChangeDependencyUnsatisfied(15)) )); assert_eq!(authorities.authority_set_changes, AuthoritySetChanges::empty()); // we apply the first pending standard change at #15 authorities - .apply_standard_changes("hash_a15", 15, &static_is_descendent_of(true), false) + .apply_standard_changes("hash_a15", 15, &static_is_descendent_of(true), false, None) .unwrap(); assert_eq!(authorities.authority_set_changes, AuthoritySetChanges(vec![(0, 15)])); // but the forced change still depends on the next standard change assert!(matches!( - authorities.apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false), + authorities.apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false, None), Err(Error::ForcedAuthoritySetChangeDependencyUnsatisfied(20)) )); assert_eq!(authorities.authority_set_changes, AuthoritySetChanges(vec![(0, 15)])); // we apply the pending standard change at #20 authorities - .apply_standard_changes("hash_b", 20, &static_is_descendent_of(true), false) + .apply_standard_changes("hash_b", 20, &static_is_descendent_of(true), false, None) .unwrap(); assert_eq!(authorities.authority_set_changes, AuthoritySetChanges(vec![(0, 15), (1, 20)])); @@ -1298,7 +1317,7 @@ mod tests { // at #35. subsequent forced changes on the same branch must be kept assert_eq!( authorities - .apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false) + .apply_forced_changes("hash_d", 45, &static_is_descendent_of(true), false, None) .unwrap() .unwrap(), ( @@ -1395,7 +1414,7 @@ mod tests { // we apply the change at A0 which should prune it and the fork at B authorities - .apply_standard_changes("hash_a0", 5, &is_descendent_of, false) + .apply_standard_changes("hash_a0", 5, &is_descendent_of, false, None) .unwrap(); // the next change is now at A1 (#10) @@ -1583,14 +1602,14 @@ mod tests { // applying the standard change at A should not prune anything // other then the change that was applied authorities - .apply_standard_changes("A", 5, &is_descendent_of, false) + .apply_standard_changes("A", 5, &is_descendent_of, false, None) .unwrap(); assert_eq!(authorities.pending_changes().count(), 6); // same for B authorities - .apply_standard_changes("B", 10, &is_descendent_of, false) + .apply_standard_changes("B", 10, &is_descendent_of, false, None) .unwrap(); assert_eq!(authorities.pending_changes().count(), 5); @@ -1599,7 +1618,7 @@ mod tests { // finalizing C2 should clear all forced changes authorities - .apply_standard_changes("C2", 15, &is_descendent_of, false) + .apply_standard_changes("C2", 15, &is_descendent_of, false, None) .unwrap(); assert_eq!(authorities.pending_forced_changes.len(), 0); @@ -1607,7 +1626,7 @@ mod tests { // finalizing C0 should clear all forced changes but D let mut authorities = authorities2; authorities - .apply_standard_changes("C0", 15, &is_descendent_of, false) + .apply_standard_changes("C0", 15, &is_descendent_of, false, None) .unwrap(); assert_eq!(authorities.pending_forced_changes.len(), 1); diff --git a/client/finality-grandpa/src/aux_schema.rs b/client/finality-grandpa/src/aux_schema.rs index 1ce3c7999f24c..43c45b9f10ae1 100644 --- a/client/finality-grandpa/src/aux_schema.rs +++ b/client/finality-grandpa/src/aux_schema.rs @@ -137,7 +137,7 @@ struct V2AuthoritySet { } pub(crate) fn load_decode( - backend: &B, + backend: &B, key: &[u8] ) -> ClientResult> { match backend.get_aux(key)? { diff --git a/client/finality-grandpa/src/communication/gossip.rs b/client/finality-grandpa/src/communication/gossip.rs index 9f5582e5cea6b..a6c51f7eeee72 100644 --- a/client/finality-grandpa/src/communication/gossip.rs +++ b/client/finality-grandpa/src/communication/gossip.rs @@ -90,7 +90,7 @@ use sc_network::{ObservedRole, PeerId, ReputationChange}; use parity_scale_codec::{Encode, Decode}; use sp_finality_grandpa::AuthorityId; -use sc_telemetry::{telemetry, CONSENSUS_DEBUG}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG}; use log::{trace, debug}; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use prometheus_endpoint::{CounterVec, Opts, PrometheusError, register, Registry, U64}; @@ -744,7 +744,7 @@ impl Inner { fn note_set(&mut self, set_id: SetId, authorities: Vec) -> MaybeMessage { { let local_view = match self.local_view { - ref mut x @ None => x.get_or_insert(LocalView::new( + ref mut x @ None => x.get_or_insert(LocalView::new( set_id, Round(1), )), @@ -828,7 +828,12 @@ impl Inner { // ensure authority is part of the set. if !self.authorities.contains(&full.message.id) { debug!(target: "afg", "Message from unknown voter: {}", full.message.id); - telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id); + telemetry!( + self.config.telemetry; + CONSENSUS_DEBUG; + "afg.bad_msg_signature"; + "signature" => ?full.message.id, + ); return Action::Discard(cost::UNKNOWN_VOTER); } @@ -840,7 +845,12 @@ impl Inner { full.set_id.0, ) { debug!(target: "afg", "Bad message signature {}", full.message.id); - telemetry!(CONSENSUS_DEBUG; "afg.bad_msg_signature"; "signature" => ?full.message.id); + telemetry!( + self.config.telemetry; + CONSENSUS_DEBUG; + "afg.bad_msg_signature"; + "signature" => ?full.message.id, + ); return Action::Discard(cost::BAD_SIGNATURE); } @@ -866,7 +876,10 @@ impl Inner { if full.message.precommits.len() != full.message.auth_data.len() || full.message.precommits.is_empty() { debug!(target: "afg", "Malformed compact commit"); - telemetry!(CONSENSUS_DEBUG; "afg.malformed_compact_commit"; + telemetry!( + self.config.telemetry; + CONSENSUS_DEBUG; + "afg.malformed_compact_commit"; "precommits_len" => ?full.message.precommits.len(), "auth_data_len" => ?full.message.auth_data.len(), "precommits_is_empty" => ?full.message.precommits.is_empty(), @@ -1277,6 +1290,7 @@ pub(super) struct GossipValidator { set_state: environment::SharedVoterSetState, report_sender: TracingUnboundedSender, metrics: Option, + telemetry: Option, } impl GossipValidator { @@ -1287,6 +1301,7 @@ impl GossipValidator { config: crate::Config, set_state: environment::SharedVoterSetState, prometheus_registry: Option<&Registry>, + telemetry: Option, ) -> (GossipValidator, TracingUnboundedReceiver) { let metrics = match prometheus_registry.map(Metrics::register) { Some(Ok(metrics)) => Some(metrics), @@ -1303,6 +1318,7 @@ impl GossipValidator { set_state, report_sender: tx, metrics, + telemetry, }; (val, rx) @@ -1411,7 +1427,12 @@ impl GossipValidator { Err(e) => { message_name = None; debug!(target: "afg", "Error decoding message: {}", e); - telemetry!(CONSENSUS_DEBUG; "afg.err_decoding_msg"; "" => ""); + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "afg.err_decoding_msg"; + "" => "", + ); let len = std::cmp::min(i32::max_value() as usize, data.len()) as i32; Action::Discard(Misbehavior::UndecodablePacket(len).cost()) @@ -1630,6 +1651,7 @@ mod tests { name: None, is_authority: true, observer_enabled: true, + telemetry: None, } } @@ -1797,6 +1819,7 @@ mod tests { config(), voter_set_state(), None, + None, ); let set_id = 1; @@ -1833,6 +1856,7 @@ mod tests { config(), voter_set_state(), None, + None, ); let set_id = 1; let auth = AuthorityId::from_slice(&[1u8; 32]); @@ -1878,6 +1902,7 @@ mod tests { config(), voter_set_state(), None, + None, ); let set_id = 1; @@ -1947,6 +1972,7 @@ mod tests { config(), set_state.clone(), None, + None, ); let set_id = 1; @@ -2002,6 +2028,7 @@ mod tests { config(), set_state.clone(), None, + None, ); // the validator starts at set id 2 @@ -2082,6 +2109,7 @@ mod tests { config(), voter_set_state(), None, + None, ); // the validator starts at set id 1. @@ -2156,6 +2184,7 @@ mod tests { config, voter_set_state(), None, + None, ); // the validator starts at set id 1. @@ -2190,6 +2219,7 @@ mod tests { config(), voter_set_state(), None, + None, ); // the validator starts at set id 1. @@ -2250,6 +2280,7 @@ mod tests { config, voter_set_state(), None, + None, ); // the validator starts at set id 1. @@ -2289,6 +2320,7 @@ mod tests { config(), voter_set_state(), None, + None, ); // the validator starts at set id 1. @@ -2322,6 +2354,7 @@ mod tests { config, voter_set_state(), None, + None, ); // the validator start at set id 0 @@ -2401,6 +2434,7 @@ mod tests { config(), voter_set_state(), None, + None, ); // the validator start at set id 0 @@ -2441,6 +2475,7 @@ mod tests { config, voter_set_state(), None, + None, ); // the validator start at set id 0 @@ -2490,7 +2525,7 @@ mod tests { #[test] fn only_gossip_commits_to_peers_on_same_set() { - let (val, _) = GossipValidator::::new(config(), voter_set_state(), None); + let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); // the validator start at set id 1 val.note_set(SetId(1), Vec::new(), |_, _| {}); @@ -2568,7 +2603,7 @@ mod tests { #[test] fn expire_commits_from_older_rounds() { - let (val, _) = GossipValidator::::new(config(), voter_set_state(), None); + let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); let commit = |round, set_id, target_number| { let commit = finality_grandpa::CompactCommit { @@ -2619,7 +2654,7 @@ mod tests { #[test] fn allow_noting_different_authorities_for_same_set() { - let (val, _) = GossipValidator::::new(config(), voter_set_state(), None); + let (val, _) = GossipValidator::::new(config(), voter_set_state(), None, None); let a1 = vec![AuthorityId::from_slice(&[0; 32])]; val.note_set(SetId(1), a1.clone(), |_, _| {}); diff --git a/client/finality-grandpa/src/communication/mod.rs b/client/finality-grandpa/src/communication/mod.rs index d502741465d23..0d287cc96e535 100644 --- a/client/finality-grandpa/src/communication/mod.rs +++ b/client/finality-grandpa/src/communication/mod.rs @@ -42,7 +42,7 @@ use sc_network::{NetworkService, ReputationChange}; use sc_network_gossip::{GossipEngine, Network as GossipNetwork}; use parity_scale_codec::{Encode, Decode}; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT, NumberFor}; -use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO}; use crate::{ CatchUp, Commit, CommunicationIn, CommunicationOutH, @@ -192,6 +192,8 @@ pub(crate) struct NetworkBridge> { // just an `UnboundedReceiver`, one could also switch to a multi-producer-*multi*-consumer // channel implementation. gossip_validator_report_stream: Arc>>, + + telemetry: Option, } impl> Unpin for NetworkBridge {} @@ -206,11 +208,13 @@ impl> NetworkBridge { config: crate::Config, set_state: crate::environment::SharedVoterSetState, prometheus_registry: Option<&Registry>, + telemetry: Option, ) -> Self { let (validator, report_stream) = GossipValidator::new( config, set_state.clone(), prometheus_registry, + telemetry.clone(), ); let validator = Arc::new(validator); @@ -268,6 +272,7 @@ impl> NetworkBridge { neighbor_sender: neighbor_packet_sender, neighbor_packet_worker: Arc::new(Mutex::new(neighbor_packet_worker)), gossip_validator_report_stream: Arc::new(Mutex::new(report_stream)), + telemetry, } } @@ -320,6 +325,7 @@ impl> NetworkBridge { }); let topic = round_topic::(round.0, set_id.0); + let telemetry = self.telemetry.clone(); let incoming = self.gossip_engine.lock().messages_for(topic) .filter_map(move |notification| { let decoded = GossipMessage::::decode(&mut ¬ification.message[..]); @@ -339,21 +345,30 @@ impl> NetworkBridge { if voters.len().get() <= TELEMETRY_VOTERS_LIMIT { match &msg.message.message { PrimaryPropose(propose) => { - telemetry!(CONSENSUS_INFO; "afg.received_propose"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.received_propose"; "voter" => ?format!("{}", msg.message.id), "target_number" => ?propose.target_number, "target_hash" => ?propose.target_hash, ); }, Prevote(prevote) => { - telemetry!(CONSENSUS_INFO; "afg.received_prevote"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.received_prevote"; "voter" => ?format!("{}", msg.message.id), "target_number" => ?prevote.target_number, "target_hash" => ?prevote.target_hash, ); }, Precommit(precommit) => { - telemetry!(CONSENSUS_INFO; "afg.received_precommit"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.received_precommit"; "voter" => ?format!("{}", msg.message.id), "target_number" => ?precommit.target_number, "target_hash" => ?precommit.target_hash, @@ -379,6 +394,7 @@ impl> NetworkBridge { network: self.gossip_engine.clone(), sender: tx, has_voted, + telemetry: self.telemetry.clone(), }; // Combine incoming votes from external GRANDPA nodes with outgoing @@ -412,6 +428,7 @@ impl> NetworkBridge { voters, self.validator.clone(), self.neighbor_sender.clone(), + self.telemetry.clone(), ); let outgoing = CommitsOut::::new( @@ -420,6 +437,7 @@ impl> NetworkBridge { is_voter, self.validator.clone(), self.neighbor_sender.clone(), + self.telemetry.clone(), ); let outgoing = outgoing.with(|out| { @@ -491,72 +509,80 @@ fn incoming_global( voters: Arc>, gossip_validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, + telemetry: Option, ) -> impl Stream> { - let process_commit = move | - msg: FullCommitMessage, - mut notification: sc_network_gossip::TopicNotification, - gossip_engine: &Arc>>, - gossip_validator: &Arc>, - voters: &VoterSet, - | { - if voters.len().get() <= TELEMETRY_VOTERS_LIMIT { - let precommits_signed_by: Vec = - msg.message.auth_data.iter().map(move |(_, a)| { - format!("{}", a) - }).collect(); - - telemetry!(CONSENSUS_INFO; "afg.received_commit"; - "contains_precommits_signed_by" => ?precommits_signed_by, - "target_number" => ?msg.message.target_number.clone(), - "target_hash" => ?msg.message.target_hash.clone(), - ); - } - - if let Err(cost) = check_compact_commit::( - &msg.message, - voters, - msg.round, - msg.set_id, - ) { - if let Some(who) = notification.sender { - gossip_engine.lock().report(who, cost); + let process_commit = { + let telemetry = telemetry.clone(); + move | + msg: FullCommitMessage, + mut notification: sc_network_gossip::TopicNotification, + gossip_engine: &Arc>>, + gossip_validator: &Arc>, + voters: &VoterSet, + | { + if voters.len().get() <= TELEMETRY_VOTERS_LIMIT { + let precommits_signed_by: Vec = + msg.message.auth_data.iter().map(move |(_, a)| { + format!("{}", a) + }).collect(); + + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.received_commit"; + "contains_precommits_signed_by" => ?precommits_signed_by, + "target_number" => ?msg.message.target_number.clone(), + "target_hash" => ?msg.message.target_hash.clone(), + ); } - return None; - } - - let round = msg.round; - let set_id = msg.set_id; - let commit = msg.message; - let finalized_number = commit.target_number; - let gossip_validator = gossip_validator.clone(); - let gossip_engine = gossip_engine.clone(); - let neighbor_sender = neighbor_sender.clone(); - let cb = move |outcome| match outcome { - voter::CommitProcessingOutcome::Good(_) => { - // if it checks out, gossip it. not accounting for - // any discrepancy between the actual ghost and the claimed - // finalized number. - gossip_validator.note_commit_finalized( - round, - set_id, - finalized_number, - |to, neighbor| neighbor_sender.send(to, neighbor), - ); + if let Err(cost) = check_compact_commit::( + &msg.message, + voters, + msg.round, + msg.set_id, + telemetry.as_ref(), + ) { + if let Some(who) = notification.sender { + gossip_engine.lock().report(who, cost); + } - gossip_engine.lock().gossip_message(topic, notification.message.clone(), false); + return None; } - voter::CommitProcessingOutcome::Bad(_) => { - // report peer and do not gossip. - if let Some(who) = notification.sender.take() { - gossip_engine.lock().report(who, cost::INVALID_COMMIT); + + let round = msg.round; + let set_id = msg.set_id; + let commit = msg.message; + let finalized_number = commit.target_number; + let gossip_validator = gossip_validator.clone(); + let gossip_engine = gossip_engine.clone(); + let neighbor_sender = neighbor_sender.clone(); + let cb = move |outcome| match outcome { + voter::CommitProcessingOutcome::Good(_) => { + // if it checks out, gossip it. not accounting for + // any discrepancy between the actual ghost and the claimed + // finalized number. + gossip_validator.note_commit_finalized( + round, + set_id, + finalized_number, + |to, neighbor| neighbor_sender.send(to, neighbor), + ); + + gossip_engine.lock().gossip_message(topic, notification.message.clone(), false); } - } - }; + voter::CommitProcessingOutcome::Bad(_) => { + // report peer and do not gossip. + if let Some(who) = notification.sender.take() { + gossip_engine.lock().report(who, cost::INVALID_COMMIT); + } + } + }; - let cb = voter::Callback::Work(Box::new(cb)); + let cb = voter::Callback::Work(Box::new(cb)); - Some(voter::CommunicationIn::Commit(round.0, commit, cb)) + Some(voter::CommunicationIn::Commit(round.0, commit, cb)) + } }; let process_catch_up = move | @@ -573,6 +599,7 @@ fn incoming_global( &msg.message, voters, msg.set_id, + telemetry.clone(), ) { if let Some(who) = notification.sender { gossip_engine.lock().report(who, cost); @@ -629,6 +656,7 @@ impl> Clone for NetworkBridge { neighbor_sender: self.neighbor_sender.clone(), neighbor_packet_worker: self.neighbor_packet_worker.clone(), gossip_validator_report_stream: self.gossip_validator_report_stream.clone(), + telemetry: self.telemetry.clone(), } } } @@ -655,6 +683,7 @@ pub(crate) struct OutgoingMessages { sender: mpsc::Sender>, network: Arc>>, has_voted: HasVoted, + telemetry: Option, } impl Unpin for OutgoingMessages {} @@ -717,7 +746,9 @@ impl Sink> for OutgoingMessages ); telemetry!( - CONSENSUS_DEBUG; "afg.announcing_blocks_to_voted_peers"; + self.telemetry; + CONSENSUS_DEBUG; + "afg.announcing_blocks_to_voted_peers"; "block" => ?target_hash, "round" => ?self.round, "set_id" => ?self.set_id, ); @@ -756,6 +787,7 @@ fn check_compact_commit( voters: &VoterSet, round: Round, set_id: SetId, + telemetry: Option<&TelemetryHandle>, ) -> Result<(), ReputationChange> { // 4f + 1 = equivocations from f voters. let f = voters.total_weight() - voters.threshold(); @@ -797,7 +829,12 @@ fn check_compact_commit( &mut buf, ) { debug!(target: "afg", "Bad commit message signature {}", id); - telemetry!(CONSENSUS_DEBUG; "afg.bad_commit_msg_signature"; "id" => ?id); + telemetry!( + telemetry; + CONSENSUS_DEBUG; + "afg.bad_commit_msg_signature"; + "id" => ?id, + ); let cost = Misbehavior::BadCommitMessage { signatures_checked: i as i32, blocks_loaded: 0, @@ -817,6 +854,7 @@ fn check_catch_up( msg: &CatchUp, voters: &VoterSet, set_id: SetId, + telemetry: Option, ) -> Result<(), ReputationChange> { // 4f + 1 = equivocations from f voters. let f = voters.total_weight() - voters.threshold(); @@ -867,6 +905,7 @@ fn check_catch_up( set_id: SetIdNumber, mut signatures_checked: usize, buf: &mut Vec, + telemetry: Option, ) -> Result where B: BlockT, I: Iterator, &'a AuthorityId, &'a AuthoritySignature)>, @@ -885,7 +924,12 @@ fn check_catch_up( buf, ) { debug!(target: "afg", "Bad catch up message signature {}", id); - telemetry!(CONSENSUS_DEBUG; "afg.bad_catch_up_msg_signature"; "id" => ?id); + telemetry!( + telemetry; + CONSENSUS_DEBUG; + "afg.bad_catch_up_msg_signature"; + "id" => ?id, + ); let cost = Misbehavior::BadCatchUpMessage { signatures_checked: signatures_checked as i32, @@ -909,6 +953,7 @@ fn check_catch_up( set_id.0, 0, &mut buf, + telemetry.clone(), )?; // check signatures on all contained precommits. @@ -920,6 +965,7 @@ fn check_catch_up( set_id.0, signatures_checked, &mut buf, + telemetry, )?; Ok(()) @@ -932,6 +978,7 @@ struct CommitsOut { is_voter: bool, gossip_validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, + telemetry: Option, } impl CommitsOut { @@ -942,6 +989,7 @@ impl CommitsOut { is_voter: bool, gossip_validator: Arc>, neighbor_sender: periodic::NeighborPacketSender, + telemetry: Option, ) -> Self { CommitsOut { network, @@ -949,6 +997,7 @@ impl CommitsOut { is_voter, gossip_validator, neighbor_sender, + telemetry, } } } @@ -968,8 +1017,12 @@ impl Sink<(RoundNumber, Commit)> for CommitsOut { let (round, commit) = input; let round = Round(round); - telemetry!(CONSENSUS_DEBUG; "afg.commit_issued"; - "target_number" => ?commit.target_number, "target_hash" => ?commit.target_hash, + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "afg.commit_issued"; + "target_number" => ?commit.target_number, + "target_hash" => ?commit.target_hash, ); let (precommits, auth_data) = commit.precommits.into_iter() .map(|signed| (signed.precommit, (signed.signature, signed.id))) diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index 4abea991cec37..dc37a1615f415 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -139,6 +139,7 @@ fn config() -> crate::Config { name: None, is_authority: true, observer_enabled: true, + telemetry: None, } } @@ -189,6 +190,7 @@ pub(crate) fn make_test_network() -> ( config(), voter_set_state(), None, + None, ); ( diff --git a/client/finality-grandpa/src/environment.rs b/client/finality-grandpa/src/environment.rs index 7925a674c2983..5bb525549b18c 100644 --- a/client/finality-grandpa/src/environment.rs +++ b/client/finality-grandpa/src/environment.rs @@ -39,7 +39,7 @@ use sp_runtime::generic::BlockId; use sp_runtime::traits::{ Block as BlockT, Header as HeaderT, NumberFor, Zero, }; -use sc_telemetry::{telemetry, CONSENSUS_DEBUG, CONSENSUS_INFO}; +use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_INFO}; use crate::{ local_authority_id, CommandOrError, Commit, Config, Error, NewAuthoritySet, Precommit, Prevote, @@ -445,6 +445,7 @@ pub(crate) struct Environment, SC, pub(crate) voting_rule: VR, pub(crate) metrics: Option, pub(crate) justification_sender: Option>, + pub(crate) telemetry: Option, pub(crate) _phantom: PhantomData, } @@ -891,7 +892,10 @@ where }; let report_prevote_metrics = |prevote: &Prevote| { - telemetry!(CONSENSUS_DEBUG; "afg.prevote_issued"; + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "afg.prevote_issued"; "round" => round, "target_number" => ?prevote.target_number, "target_hash" => ?prevote.target_hash, @@ -950,7 +954,10 @@ where }; let report_precommit_metrics = |precommit: &Precommit| { - telemetry!(CONSENSUS_DEBUG; "afg.precommit_issued"; + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "afg.precommit_issued"; "round" => round, "target_number" => ?precommit.target_number, "target_hash" => ?precommit.target_hash, @@ -1146,6 +1153,7 @@ where (round, commit).into(), false, self.justification_sender.as_ref(), + self.telemetry.clone(), ) } @@ -1210,6 +1218,7 @@ pub(crate) fn finalize_block( justification_or_commit: JustificationOrCommit, initial_sync: bool, justification_sender: Option<&GrandpaJustificationSender>, + telemetry: Option, ) -> Result<(), CommandOrError>> where Block: BlockT, @@ -1245,6 +1254,7 @@ where number, &is_descendent_of::(&*client, None), initial_sync, + None, ).map_err(|e| Error::Safety(e.to_string()))?; // send a justification notification if a sender exists and in case of error log it. @@ -1320,7 +1330,10 @@ where warn!(target: "afg", "Error applying finality to block {:?}: {:?}", (hash, number), e); e })?; - telemetry!(CONSENSUS_INFO; "afg.finalized_blocks_up_to"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.finalized_blocks_up_to"; "number" => ?number, "hash" => ?hash, ); @@ -1340,7 +1353,10 @@ where ); } - telemetry!(CONSENSUS_INFO; "afg.generating_new_authority_set"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.generating_new_authority_set"; "number" => ?canon_number, "hash" => ?canon_hash, "authorities" => ?set_ref.to_vec(), "set_id" => ?new_id, diff --git a/client/finality-grandpa/src/finality_proof.rs b/client/finality-grandpa/src/finality_proof.rs index c88faa2498928..b79b3190739d6 100644 --- a/client/finality-grandpa/src/finality_proof.rs +++ b/client/finality-grandpa/src/finality_proof.rs @@ -247,9 +247,6 @@ where .map_err(|_| ClientError::JustificationDecode)?; justification.verify(current_set_id, ¤t_authorities)?; - use sc_telemetry::{telemetry, CONSENSUS_INFO}; - telemetry!(CONSENSUS_INFO; "afg.finality_proof_ok"; - "finalized_header_hash" => ?proof.block); Ok(proof) } diff --git a/client/finality-grandpa/src/import.rs b/client/finality-grandpa/src/import.rs index 2c86319dd54af..22d7b7fd5bcc8 100644 --- a/client/finality-grandpa/src/import.rs +++ b/client/finality-grandpa/src/import.rs @@ -24,6 +24,7 @@ use parking_lot::RwLockWriteGuard; use sp_blockchain::{BlockStatus, well_known_cache_keys}; use sc_client_api::{backend::Backend, utils::is_descendent_of}; +use sc_telemetry::TelemetryHandle; use sp_utils::mpsc::TracingUnboundedSender; use sp_api::TransactionFor; @@ -62,6 +63,7 @@ pub struct GrandpaBlockImport { send_voter_commands: TracingUnboundedSender>>, authority_set_hard_forks: HashMap>>, justification_sender: GrandpaJustificationSender, + telemetry: Option, _phantom: PhantomData, } @@ -76,6 +78,7 @@ impl Clone for send_voter_commands: self.send_voter_commands.clone(), authority_set_hard_forks: self.authority_set_hard_forks.clone(), justification_sender: self.justification_sender.clone(), + telemetry: self.telemetry.clone(), _phantom: PhantomData, } } @@ -338,7 +341,13 @@ where let applied_changes = { let forced_change_set = guard .as_mut() - .apply_forced_changes(hash, number, &is_descendent_of, initial_sync) + .apply_forced_changes( + hash, + number, + &is_descendent_of, + initial_sync, + self.telemetry.clone(), + ) .map_err(|e| ConsensusError::ClientImport(e.to_string())) .map_err(ConsensusError::from)?; @@ -355,8 +364,11 @@ where let canon_hash = self.inner.header(BlockId::Number(canon_number)) .map_err(|e| ConsensusError::ClientImport(e.to_string()))? - .expect("the given block number is less or equal than the current best finalized number; \ - current best finalized number must exist in chain; qed.") + .expect( + "the given block number is less or equal than the current best + finalized number; current best finalized number must exist in + chain; qed." + ) .hash(); NewAuthoritySet { @@ -557,6 +569,7 @@ impl GrandpaBlockImport>>, authority_set_hard_forks: Vec<(SetId, PendingChange>)>, justification_sender: GrandpaJustificationSender, + telemetry: Option, ) -> GrandpaBlockImport { // check for and apply any forced authority set hard fork that applies // to the *current* authority set. @@ -600,6 +613,7 @@ impl GrandpaBlockImport, /// The keystore that manages the keys of this node. pub keystore: Option, + /// TelemetryHandle instance. + pub telemetry: Option, } impl Config { @@ -451,6 +453,7 @@ pub struct LinkHalf { voter_commands_rx: TracingUnboundedReceiver>>, justification_sender: GrandpaJustificationSender, justification_stream: GrandpaJustificationStream, + telemetry: Option, } impl LinkHalf { @@ -501,6 +504,7 @@ pub fn block_import( client: Arc, genesis_authorities_provider: &dyn GenesisAuthoritySetProvider, select_chain: SC, + telemetry: Option, ) -> Result< ( GrandpaBlockImport, @@ -518,6 +522,7 @@ where genesis_authorities_provider, select_chain, Default::default(), + telemetry, ) } @@ -531,6 +536,7 @@ pub fn block_import_with_authority_set_hard_forks genesis_authorities_provider: &dyn GenesisAuthoritySetProvider, select_chain: SC, authority_set_hard_forks: Vec<(SetId, (Block::Hash, NumberFor), AuthorityList)>, + telemetry: Option, ) -> Result< ( GrandpaBlockImport, @@ -550,13 +556,19 @@ where &*client, genesis_hash, >::zero(), - || { - let authorities = genesis_authorities_provider.get()?; - telemetry!(CONSENSUS_DEBUG; "afg.loading_authorities"; - "authorities_len" => ?authorities.len() - ); - Ok(authorities) - } + { + let telemetry = telemetry.clone(); + move || { + let authorities = genesis_authorities_provider.get()?; + telemetry!( + telemetry; + CONSENSUS_DEBUG; + "afg.loading_authorities"; + "authorities_len" => ?authorities.len() + ); + Ok(authorities) + } + }, )?; let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command"); @@ -590,6 +602,7 @@ where voter_commands_tx, authority_set_hard_forks, justification_sender.clone(), + telemetry.clone(), ), LinkHalf { client, @@ -598,6 +611,7 @@ where voter_commands_rx, justification_sender, justification_stream, + telemetry, }, )) } @@ -660,14 +674,14 @@ pub struct GrandpaParams { /// `sc_network` crate, it is assumed that the Grandpa notifications protocol has been passed /// to the configuration of the networking. See [`grandpa_peers_set_config`]. pub network: N, - /// If supplied, can be used to hook on telemetry connection established events. - pub telemetry_on_connect: Option>, /// A voting rule used to potentially restrict target votes. pub voting_rule: VR, /// The prometheus metrics registry. pub prometheus_registry: Option, /// The voter state is exposed at an RPC endpoint. pub shared_voter_state: SharedVoterState, + /// TelemetryHandle instance. + pub telemetry: Option, } /// Returns the configuration value to put in @@ -706,10 +720,10 @@ where mut config, link, network, - telemetry_on_connect, voting_rule, prometheus_registry, shared_voter_state, + telemetry, } = grandpa_params; // NOTE: we have recently removed `run_grandpa_observer` from the public @@ -725,6 +739,7 @@ where voter_commands_rx, justification_sender, justification_stream: _, + telemetry: _, } = link; let network = NetworkBridge::new( @@ -732,11 +747,16 @@ where config.clone(), persistent_data.set_state.clone(), prometheus_registry.as_ref(), + telemetry.clone(), ); let conf = config.clone(); - let telemetry_task = if let Some(telemetry_on_connect) = telemetry_on_connect { + let telemetry_task = if let Some(telemetry_on_connect) = telemetry + .as_ref() + .map(|x| x.on_connect_stream()) + { let authorities = persistent_data.authority_set.clone(); + let telemetry = telemetry.clone(); let events = telemetry_on_connect .for_each(move |_| { let current_authorities = authorities.current_authorities(); @@ -751,10 +771,13 @@ where let authorities = serde_json::to_string(&authorities).expect( "authorities is always at least an empty vector; \ - elements are always of type string", + elements are always of type string", ); - telemetry!(CONSENSUS_INFO; "afg.authority_set"; + telemetry!( + telemetry; + CONSENSUS_INFO; + "afg.authority_set"; "authority_id" => authority_id.to_string(), "authority_set_id" => ?set_id, "authorities" => authorities, @@ -778,6 +801,7 @@ where prometheus_registry, shared_voter_state, justification_sender, + telemetry, ); let voter_work = voter_work.map(|res| match res { @@ -816,7 +840,7 @@ struct VoterWork, SC, VR> { env: Arc>, voter_commands_rx: TracingUnboundedReceiver>>, network: NetworkBridge, - + telemetry: Option, /// Prometheus metrics. metrics: Option, } @@ -843,6 +867,7 @@ where prometheus_registry: Option, shared_voter_state: SharedVoterState, justification_sender: GrandpaJustificationSender, + telemetry: Option, ) -> Self { let metrics = match prometheus_registry.as_ref().map(Metrics::register) { Some(Ok(metrics)) => Some(metrics), @@ -866,6 +891,7 @@ where voter_set_state: persistent_data.set_state, metrics: metrics.as_ref().map(|m| m.environment.clone()), justification_sender: Some(justification_sender), + telemetry: telemetry.clone(), _phantom: PhantomData, }); @@ -877,6 +903,7 @@ where env, voter_commands_rx, network, + telemetry, metrics, }; work.rebuild_voter(); @@ -892,7 +919,10 @@ where let authority_id = local_authority_id(&self.env.voters, self.env.config.keystore.as_ref()) .unwrap_or_default(); - telemetry!(CONSENSUS_DEBUG; "afg.starting_new_voter"; + telemetry!( + self.telemetry; + CONSENSUS_DEBUG; + "afg.starting_new_voter"; "name" => ?self.env.config.name(), "set_id" => ?self.env.set_id, "authority_id" => authority_id.to_string(), @@ -911,7 +941,10 @@ where "authorities is always at least an empty vector; elements are always of type string", ); - telemetry!(CONSENSUS_INFO; "afg.authority_set"; + telemetry!( + self.telemetry; + CONSENSUS_INFO; + "afg.authority_set"; "number" => ?chain_info.finalized_number, "hash" => ?chain_info.finalized_hash, "authority_id" => authority_id.to_string(), @@ -971,7 +1004,10 @@ where let voters: Vec = new.authorities.iter().map(move |(a, _)| { format!("{}", a) }).collect(); - telemetry!(CONSENSUS_INFO; "afg.voter_command_change_authorities"; + telemetry!( + self.telemetry; + CONSENSUS_INFO; + "afg.voter_command_change_authorities"; "number" => ?new.canon_number, "hash" => ?new.canon_hash, "voters" => ?voters, @@ -992,10 +1028,11 @@ where })?; let voters = Arc::new(VoterSet::new(new.authorities.into_iter()) - .expect("new authorities come from pending change; \ - pending change comes from `AuthoritySet`; \ - `AuthoritySet` validates authorities is non-empty and weights are non-zero; \ - qed." + .expect( + "new authorities come from pending change; \ + pending change comes from `AuthoritySet`; \ + `AuthoritySet` validates authorities is non-empty and weights are non-zero; \ + qed." ) ); @@ -1011,6 +1048,7 @@ where voting_rule: self.env.voting_rule.clone(), metrics: self.env.metrics.clone(), justification_sender: self.env.justification_sender.clone(), + telemetry: self.telemetry.clone(), _phantom: PhantomData, }); diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index 3054a9df61c56..c0eab15e4f455 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -29,6 +29,7 @@ use log::{debug, info, warn}; use sp_keystore::SyncCryptoStorePtr; use sp_consensus::SelectChain; use sc_client_api::backend::Backend; +use sc_telemetry::TelemetryHandle; use sp_utils::mpsc::TracingUnboundedReceiver; use sp_runtime::traits::{NumberFor, Block as BlockT}; use sp_blockchain::HeaderMetadata; @@ -67,6 +68,7 @@ fn grandpa_observer( last_finalized_number: NumberFor, commits: S, note_round: F, + telemetry: Option, ) -> impl Future>>> where NumberFor: BlockNumberOps, @@ -121,6 +123,7 @@ where (round, commit).into(), false, justification_sender.as_ref(), + telemetry.clone(), ) { Ok(_) => {}, Err(e) => return future::err(e), @@ -172,7 +175,8 @@ where persistent_data, voter_commands_rx, justification_sender, - .. + justification_stream: _, + telemetry, } = link; let network = NetworkBridge::new( @@ -180,15 +184,17 @@ where config.clone(), persistent_data.set_state.clone(), None, + telemetry.clone(), ); let observer_work = ObserverWork::new( - client, + client.clone(), network, persistent_data, config.keystore, voter_commands_rx, Some(justification_sender), + telemetry.clone(), ); let observer_work = observer_work @@ -210,6 +216,7 @@ struct ObserverWork> { keystore: Option, voter_commands_rx: TracingUnboundedReceiver>>, justification_sender: Option>, + telemetry: Option, _phantom: PhantomData, } @@ -228,6 +235,7 @@ where keystore: Option, voter_commands_rx: TracingUnboundedReceiver>>, justification_sender: Option>, + telemetry: Option, ) -> Self { let mut work = ObserverWork { @@ -240,6 +248,7 @@ where keystore: keystore.clone(), voter_commands_rx, justification_sender, + telemetry, _phantom: PhantomData, }; work.rebuild_observer(); @@ -289,6 +298,7 @@ where last_finalized_number, global_in, note_round, + self.telemetry.clone(), ); self.observer = Box::pin(observer); @@ -429,6 +439,7 @@ mod tests { None, voter_command_rx, None, + None, ); // Trigger a reputation change through the gossip validator. diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index 921b49db61c25..6824a8ed04273 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -120,6 +120,7 @@ impl TestNetFactory for GrandpaTestNet { client.clone(), &self.test_config, LongestChain::new(backend.clone()), + None, ).expect("Could not create block import for fresh peer."); let justification_import = Box::new(import.clone()); ( @@ -252,13 +253,14 @@ fn initialize_grandpa( name: Some(format!("peer#{}", peer_id)), is_authority: true, observer_enabled: true, + telemetry: None, }, link, network: net_service, - telemetry_on_connect: None, voting_rule: (), prometheus_registry: None, shared_voter_state: SharedVoterState::empty(), + telemetry: None, }; let voter = run_grandpa_voter(grandpa_params).expect("all in order with client and network"); @@ -395,13 +397,14 @@ fn finalize_3_voters_1_full_observer() { name: Some(format!("peer#{}", peer_id)), is_authority: true, observer_enabled: true, + telemetry: None, }, link: link, network: net_service, - telemetry_on_connect: None, voting_rule: (), prometheus_registry: None, shared_voter_state: SharedVoterState::empty(), + telemetry: None, }; run_grandpa_voter(grandpa_params).expect("all in order with client and network") @@ -488,13 +491,14 @@ fn transition_3_voters_twice_1_full_observer() { name: Some(format!("peer#{}", peer_id)), is_authority: true, observer_enabled: true, + telemetry: None, }, link, network: net_service, - telemetry_on_connect: None, voting_rule: (), prometheus_registry: None, shared_voter_state: SharedVoterState::empty(), + telemetry: None, }; voters.push(run_grandpa_voter(grandpa_params).expect("all in order with client and network")); @@ -921,6 +925,7 @@ fn voter_persists_its_votes() { name: Some(format!("peer#{}", 1)), is_authority: true, observer_enabled: true, + telemetry: None, }; let set_state = { @@ -939,6 +944,7 @@ fn voter_persists_its_votes() { config.clone(), set_state, None, + None, ) }; @@ -964,13 +970,14 @@ fn voter_persists_its_votes() { name: Some(format!("peer#{}", 0)), is_authority: true, observer_enabled: true, + telemetry: None, }, link, network: net_service, - telemetry_on_connect: None, voting_rule: VotingRulesBuilder::default().build(), prometheus_registry: None, shared_voter_state: SharedVoterState::empty(), + telemetry: None, }; run_grandpa_voter(grandpa_params).expect("all in order with client and network") @@ -1006,13 +1013,14 @@ fn voter_persists_its_votes() { name: Some(format!("peer#{}", 0)), is_authority: true, observer_enabled: true, + telemetry: None, }, link, network: net_service, - telemetry_on_connect: None, voting_rule: VotingRulesBuilder::default().build(), prometheus_registry: None, shared_voter_state: SharedVoterState::empty(), + telemetry: None, }; run_grandpa_voter(grandpa_params) @@ -1165,6 +1173,7 @@ fn finalize_3_voters_1_light_observer() { name: Some("observer".to_string()), is_authority: false, observer_enabled: true, + telemetry: None, }, net.peers[3].data.lock().take().expect("link initialized at startup; qed"), net.peers[3].network_service().clone(), @@ -1206,13 +1215,14 @@ fn voter_catches_up_to_latest_round_when_behind() { name: Some(format!("peer#{}", peer_id)), is_authority: true, observer_enabled: true, + telemetry: None, }, link, network: net.lock().peer(peer_id).network_service().clone(), - telemetry_on_connect: None, voting_rule: (), prometheus_registry: None, shared_voter_state: SharedVoterState::empty(), + telemetry: None, }; Box::pin(run_grandpa_voter(grandpa_params).expect("all in order with client and network")) @@ -1328,6 +1338,7 @@ where name: None, is_authority: true, observer_enabled: true, + telemetry: None, }; let network = NetworkBridge::new( @@ -1335,6 +1346,7 @@ where config.clone(), set_state.clone(), None, + None, ); Environment { @@ -1349,6 +1361,7 @@ where voting_rule, metrics: None, justification_sender: None, + telemetry: None, _phantom: PhantomData, } } diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index c6119695ace71..d402564be727c 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -91,5 +91,3 @@ grandpa = { version = "0.9.0", package = "sc-finality-grandpa", path = "../final grandpa-primitives = { version = "3.0.0", package = "sp-finality-grandpa", path = "../../primitives/finality-grandpa" } tokio = { version = "0.2.25", default-features = false } async-std = { version = "1.6.5", default-features = false } -tracing-subscriber = "0.2.15" -tracing-log = "0.1.1" diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 8a5f63ab7b1d7..2c8557a5456e6 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -55,8 +55,8 @@ use wasm_timer::SystemTime; use sc_telemetry::{ telemetry, ConnectionMessage, - TelemetryConnectionNotifier, - TelemetrySpan, + Telemetry, + TelemetryHandle, SUBSTRATE_INFO, }; use sp_transaction_pool::MaintainedTransactionPool; @@ -213,17 +213,17 @@ pub type TLightClientWithBackend = Client< >; trait AsCryptoStoreRef { - fn keystore_ref(&self) -> Arc; - fn sync_keystore_ref(&self) -> Arc; + fn keystore_ref(&self) -> Arc; + fn sync_keystore_ref(&self) -> Arc; } impl AsCryptoStoreRef for Arc where T: CryptoStore + SyncCryptoStore + 'static { - fn keystore_ref(&self) -> Arc { - self.clone() - } - fn sync_keystore_ref(&self) -> Arc { - self.clone() - } + fn keystore_ref(&self) -> Arc { + self.clone() + } + fn sync_keystore_ref(&self) -> Arc { + self.clone() + } } /// Construct and hold different layers of Keystore wrappers @@ -291,16 +291,18 @@ impl KeystoreContainer { /// Creates a new full client for the given config. pub fn new_full_client( config: &Configuration, + telemetry: Option, ) -> Result, Error> where TBl: BlockT, TExecDisp: NativeExecutionDispatch + 'static, { - new_full_parts(config).map(|parts| parts.0) + new_full_parts(config, telemetry).map(|parts| parts.0) } /// Create the initial parts of a full node. pub fn new_full_parts( config: &Configuration, + telemetry: Option, ) -> Result, Error> where TBl: BlockT, TExecDisp: NativeExecutionDispatch + 'static, @@ -356,6 +358,7 @@ pub fn new_full_parts( extensions, Box::new(task_manager.spawn_handle()), config.prometheus_config.as_ref().map(|config| config.registry.clone()), + telemetry, ClientConfig { offchain_worker_enabled : config.offchain_worker.enabled, offchain_indexing_api: config.offchain_worker.indexing_enabled, @@ -377,6 +380,7 @@ pub fn new_full_parts( /// Create the initial parts of a light node. pub fn new_light_parts( config: &Configuration, + telemetry: Option, ) -> Result, Error> where TBl: BlockT, TExecDisp: NativeExecutionDispatch + 'static, @@ -421,6 +425,7 @@ pub fn new_light_parts( executor, Box::new(task_manager.spawn_handle()), config.prometheus_config.as_ref().map(|config| config.registry.clone()), + telemetry, )?); Ok((client, backend, keystore_container, task_manager, on_demand)) @@ -447,6 +452,7 @@ pub fn new_client( execution_extensions: ExecutionExtensions, spawn_handle: Box, prometheus_registry: Option, + telemetry: Option, config: ClientConfig, ) -> Result< crate::client::Client< @@ -470,6 +476,7 @@ pub fn new_client( bad_blocks, execution_extensions, prometheus_registry, + telemetry, config, )?) } @@ -501,10 +508,8 @@ pub struct SpawnTasksParams<'a, TBl: BlockT, TCl, TExPool, TRpc, Backend> { pub network_status_sinks: NetworkStatusSinks, /// A Sender for RPC requests. pub system_rpc_tx: TracingUnboundedSender>, - /// Telemetry span. - /// - /// This span needs to be entered **before** calling [`spawn_tasks()`]. - pub telemetry_span: Option, + /// Telemetry instance for this node. + pub telemetry: Option<&'a mut Telemetry>, } /// Build a shared offchain workers instance. @@ -541,13 +546,12 @@ pub fn build_offchain_workers( /// Spawn the tasks that are required to run a node. pub fn spawn_tasks( params: SpawnTasksParams, -) -> Result<(RpcHandlers, Option), Error> +) -> Result where TCl: ProvideRuntimeApi + HeaderMetadata + Chain + BlockBackend + BlockIdTo + ProofProvider + HeaderBackend + BlockchainEvents + ExecutorProvider + UsageProvider + - StorageProvider + CallApiAt + - Send + 'static, + StorageProvider + CallApiAt + Send + 'static, >::Api: sp_api::Metadata + sc_offchain::OffchainWorkerApi + @@ -573,7 +577,7 @@ pub fn spawn_tasks( network, network_status_sinks, system_rpc_tx, - telemetry_span, + telemetry, } = params; let chain_info = client.usage_info().chain; @@ -584,12 +588,16 @@ pub fn spawn_tasks( config.dev_key_seed.clone().map(|s| vec![s]).unwrap_or_default(), ).map_err(|e| Error::Application(Box::new(e)))?; - let telemetry_connection_notifier = init_telemetry( - &mut config, - telemetry_span, - network.clone(), - client.clone(), - ); + let telemetry = telemetry + .map(|telemetry| { + init_telemetry( + &mut config, + network.clone(), + client.clone(), + telemetry, + ) + }) + .transpose()?; info!("📦 Highest known block at #{}", chain_info.best_number); @@ -603,7 +611,11 @@ pub fn spawn_tasks( spawn_handle.spawn( "on-transaction-imported", - transaction_notifications(transaction_pool.clone(), network.clone()), + transaction_notifications( + transaction_pool.clone(), + network.clone(), + telemetry.clone(), + ), ); // Prometheus metrics. @@ -611,7 +623,7 @@ pub fn spawn_tasks( config.prometheus_config.clone() { // Set static metrics. - let metrics = MetricsService::with_prometheus(®istry, &config)?; + let metrics = MetricsService::with_prometheus(telemetry.clone(), ®istry, &config)?; spawn_handle.spawn( "prometheus-endpoint", prometheus_endpoint::init_prometheus(port, registry).map(drop) @@ -619,7 +631,7 @@ pub fn spawn_tasks( metrics } else { - MetricsService::new() + MetricsService::new(telemetry.clone()) }; // Periodically updated metrics and telemetry updates. @@ -659,12 +671,13 @@ pub fn spawn_tasks( task_manager.keep_alive((config.base_path, rpc, rpc_handlers.clone())); - Ok((rpc_handlers, telemetry_connection_notifier)) + Ok(rpc_handlers) } async fn transaction_notifications( transaction_pool: Arc, network: Arc::Hash>>, + telemetry: Option, ) where TBl: BlockT, @@ -676,9 +689,12 @@ async fn transaction_notifications( .for_each(move |hash| { network.propagate_transaction(hash); let status = transaction_pool.status(); - telemetry!(SUBSTRATE_INFO; "txpool.import"; + telemetry!( + telemetry; + SUBSTRATE_INFO; + "txpool.import"; "ready" => status.ready, - "future" => status.future + "future" => status.future, ); ready(()) }) @@ -687,12 +703,10 @@ async fn transaction_notifications( fn init_telemetry>( config: &mut Configuration, - telemetry_span: Option, network: Arc::Hash>>, client: Arc, -) -> Option { - let telemetry_span = telemetry_span?; - let endpoints = config.telemetry_endpoints.clone()?; + telemetry: &mut Telemetry, +) -> sc_telemetry::Result { let genesis_hash = client.block_hash(Zero::zero()).ok().flatten().unwrap_or_default(); let connection_message = ConnectionMessage { name: config.network.node_name.to_owned(), @@ -708,13 +722,9 @@ fn init_telemetry>( network_id: network.local_peer_id().to_base58(), }; - config.telemetry_handle - .as_mut() - .map(|handle| handle.start_telemetry( - telemetry_span, - endpoints, - connection_message, - )) + telemetry.start_telemetry(connection_message)?; + + Ok(telemetry.handle()) } fn gen_handler( diff --git a/client/service/src/client/call_executor.rs b/client/service/src/client/call_executor.rs index 8c7ca645b0ffe..176c68096e97c 100644 --- a/client/service/src/client/call_executor.rs +++ b/client/service/src/client/call_executor.rs @@ -368,6 +368,7 @@ mod tests { None, Box::new(TaskExecutor::new()), None, + None, Default::default(), ).expect("Creates a client"); diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 6e9fdea0925f7..07e8e005fa1a8 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -35,7 +35,11 @@ use sp_core::{ }; #[cfg(feature="test-helpers")] use sp_keystore::SyncCryptoStorePtr; -use sc_telemetry::{telemetry, SUBSTRATE_INFO}; +use sc_telemetry::{ + telemetry, + TelemetryHandle, + SUBSTRATE_INFO, +}; use sp_runtime::{ Justification, BuildStorage, generic::{BlockId, SignedBlock, DigestItem}, @@ -115,6 +119,7 @@ pub struct Client where Block: BlockT { block_rules: BlockRules, execution_extensions: ExecutionExtensions, config: ClientConfig, + telemetry: Option, _phantom: PhantomData, } @@ -152,6 +157,7 @@ pub fn new_in_mem( genesis_storage: &S, keystore: Option, prometheus_registry: Option, + telemetry: Option, spawn_handle: Box, config: ClientConfig, ) -> sp_blockchain::Result( keystore, spawn_handle, prometheus_registry, + telemetry, config, ) } @@ -196,6 +203,7 @@ pub fn new_with_backend( keystore: Option, spawn_handle: Box, prometheus_registry: Option, + telemetry: Option, config: ClientConfig, ) -> sp_blockchain::Result, Block, RA>> where @@ -218,6 +226,7 @@ pub fn new_with_backend( Default::default(), extensions, prometheus_registry, + telemetry, config, ) } @@ -298,6 +307,7 @@ impl Client where bad_blocks: BadBlocks, execution_extensions: ExecutionExtensions, prometheus_registry: Option, + telemetry: Option, config: ClientConfig, ) -> sp_blockchain::Result { if backend.blockchain().header(BlockId::Number(Zero::zero()))?.is_none() { @@ -330,6 +340,7 @@ impl Client where block_rules: BlockRules::new(fork_blocks, bad_blocks), execution_extensions, config, + telemetry, _phantom: Default::default(), }) } @@ -672,7 +683,10 @@ impl Client where if origin != BlockOrigin::NetworkInitialSync || rand::thread_rng().gen_bool(0.1) { - telemetry!(SUBSTRATE_INFO; "block.import"; + telemetry!( + self.telemetry; + SUBSTRATE_INFO; + "block.import"; "height" => height, "best" => ?hash, "origin" => ?origin @@ -989,10 +1003,13 @@ impl Client where let header = self.header(&BlockId::Hash(*last))? .expect( "Header already known to exist in DB because it is \ - indicated in the tree route; qed" + indicated in the tree route; qed" ); - telemetry!(SUBSTRATE_INFO; "notify.finalized"; + telemetry!( + self.telemetry; + SUBSTRATE_INFO; + "notify.finalized"; "height" => format!("{}", header.number()), "best" => ?last, ); @@ -1002,7 +1019,7 @@ impl Client where let header = self.header(&BlockId::Hash(finalized_hash))? .expect( "Header already known to exist in DB because it is \ - indicated in the tree route; qed" + indicated in the tree route; qed" ); let notification = FinalityNotification { @@ -1991,9 +2008,10 @@ impl backend::AuxStore for &Client } impl sp_consensus::block_validation::Chain for Client - where BE: backend::Backend, - E: CallExecutor, - B: BlockT + where + BE: backend::Backend, + E: CallExecutor, + B: BlockT, { fn block_status( &self, diff --git a/client/service/src/client/light.rs b/client/service/src/client/light.rs index 5b5c0cb0eb38f..3b29a0e1a92ca 100644 --- a/client/service/src/client/light.rs +++ b/client/service/src/client/light.rs @@ -22,6 +22,7 @@ use std::sync::Arc; use sc_executor::RuntimeInfo; use sp_core::traits::{CodeExecutor, SpawnNamed}; +use sc_telemetry::TelemetryHandle; use sp_runtime::BuildStorage; use sp_runtime::traits::{Block as BlockT, HashFor}; use sp_blockchain::Result as ClientResult; @@ -31,7 +32,6 @@ use super::{call_executor::LocalCallExecutor, client::{Client, ClientConfig}}; use sc_client_api::light::Storage as BlockchainStorage; use sc_light::{Backend, GenesisCallExecutor}; - /// Create an instance of light client. pub fn new_light( backend: Arc>>, @@ -39,6 +39,7 @@ pub fn new_light( code_executor: E, spawn_handle: Box, prometheus_registry: Option, + telemetry: Option, ) -> ClientResult< Client< Backend>, @@ -70,6 +71,7 @@ pub fn new_light( Default::default(), Default::default(), prometheus_registry, + telemetry, ClientConfig::default(), ) } diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 4f0d426bdba42..f82a877545e8c 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -96,11 +96,6 @@ pub struct Configuration { /// External WASM transport for the telemetry. If `Some`, when connection to a telemetry /// endpoint, this transport will be tried in priority before all others. pub telemetry_external_transport: Option, - /// Telemetry handle. - /// - /// This is a handle to a `TelemetryWorker` instance. It is used to initialize the telemetry for - /// a substrate node. - pub telemetry_handle: Option, /// The default number of 64KB pages to allocate for Wasm execution pub default_heap_pages: Option, /// Should offchain workers be executed. diff --git a/client/service/src/error.rs b/client/service/src/error.rs index caa54700da916..9c653219ca130 100644 --- a/client/service/src/error.rs +++ b/client/service/src/error.rs @@ -46,6 +46,9 @@ pub enum Error { #[error(transparent)] Keystore(#[from] sc_keystore::Error), + #[error(transparent)] + Telemetry(#[from] sc_telemetry::Error), + #[error("Best chain selection strategy (SelectChain) is not provided.")] SelectChainRequired, diff --git a/client/service/src/metrics.rs b/client/service/src/metrics.rs index 4fbfa4d77f08f..43e5b8eaaded7 100644 --- a/client/service/src/metrics.rs +++ b/client/service/src/metrics.rs @@ -21,7 +21,7 @@ use std::{convert::TryFrom, time::SystemTime}; use crate::{NetworkStatus, NetworkState, NetworkStatusSinks, config::Configuration}; use futures_timer::Delay; use prometheus_endpoint::{register, Gauge, U64, Registry, PrometheusError, Opts, GaugeVec}; -use sc_telemetry::{telemetry, SUBSTRATE_INFO}; +use sc_telemetry::{telemetry, TelemetryHandle, SUBSTRATE_INFO}; use sp_api::ProvideRuntimeApi; use sp_runtime::traits::{NumberFor, Block, SaturatedConversion, UniqueSaturatedInto}; use sp_transaction_pool::{PoolStatus, MaintainedTransactionPool}; @@ -112,23 +112,26 @@ pub struct MetricsService { last_update: Instant, last_total_bytes_inbound: u64, last_total_bytes_outbound: u64, + telemetry: Option, } impl MetricsService { /// Creates a `MetricsService` that only sends information /// to the telemetry. - pub fn new() -> Self { + pub fn new(telemetry: Option) -> Self { MetricsService { metrics: None, last_total_bytes_inbound: 0, last_total_bytes_outbound: 0, last_update: Instant::now(), + telemetry, } } /// Creates a `MetricsService` that sends metrics /// to prometheus alongside the telemetry. pub fn with_prometheus( + telemetry: Option, registry: &Registry, config: &Configuration, ) -> Result { @@ -149,6 +152,7 @@ impl MetricsService { last_total_bytes_inbound: 0, last_total_bytes_outbound: 0, last_update: Instant::now(), + telemetry, }) } @@ -245,6 +249,7 @@ impl MetricsService { // Update/send metrics that are always available. telemetry!( + self.telemetry; SUBSTRATE_INFO; "system.interval"; "height" => best_number, @@ -307,6 +312,7 @@ impl MetricsService { }; telemetry!( + self.telemetry; SUBSTRATE_INFO; "system.interval"; "peers" => num_peers, @@ -328,6 +334,7 @@ impl MetricsService { // Send network state information, if any. if let Some(net_state) = net_state { telemetry!( + self.telemetry; SUBSTRATE_INFO; "system.network_state"; "state" => net_state, diff --git a/client/service/src/task_manager/tests.rs b/client/service/src/task_manager/tests.rs index 762348ba9fa5d..09768a19339f2 100644 --- a/client/service/src/task_manager/tests.rs +++ b/client/service/src/task_manager/tests.rs @@ -20,14 +20,7 @@ use crate::config::TaskExecutor; use crate::task_manager::TaskManager; use futures::{future::FutureExt, pin_mut, select}; use parking_lot::Mutex; -use sc_telemetry::TelemetrySpan; -use std::{any::Any, env, sync::Arc, time::Duration}; -use tracing::{event::Event, span::Id, subscriber::Subscriber}; -use tracing_subscriber::{ - layer::{Context, SubscriberExt}, - registry::LookupSpan, - Layer, -}; +use std::{any::Any, sync::Arc, time::Duration}; #[derive(Clone, Debug)] struct DropTester(Arc>); @@ -317,94 +310,3 @@ fn ensure_task_manager_future_continues_when_childs_not_essential_task_fails() { runtime.block_on(task_manager.clean_shutdown()); assert_eq!(drop_tester, 0); } - -struct TestLayer { - spans_found: Arc>>>, -} - -impl Layer for TestLayer -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - fn on_event(&self, _: &Event<'_>, ctx: Context) { - let mut spans_found = self.spans_found.lock(); - - if spans_found.is_some() { - panic!("on_event called multiple times"); - } - - *spans_found = Some(ctx.scope().map(|x| x.id()).collect()); - } -} - -fn setup_subscriber() -> ( - impl Subscriber + for<'a> LookupSpan<'a>, - Arc>>>, -) { - let spans_found = Arc::new(Mutex::new(Default::default())); - let layer = TestLayer { - spans_found: spans_found.clone(), - }; - let subscriber = tracing_subscriber::fmt().finish().with(layer); - (subscriber, spans_found) -} - -/// This is not an actual test, it is used by the `telemetry_span_is_forwarded_to_task` test. -/// The given test will call the test executable and only execute this one test that -/// test that the telemetry span and the prefix span are forwarded correctly. This needs to be done -/// in a separate process to avoid interfering with the other tests. -#[test] -fn subprocess_telemetry_span_is_forwarded_to_task() { - if env::var("SUBPROCESS_TEST").is_err() { - return; - } - - let (subscriber, spans_found) = setup_subscriber(); - tracing_log::LogTracer::init().unwrap(); - let _sub_guard = tracing::subscriber::set_global_default(subscriber); - - let mut runtime = tokio::runtime::Runtime::new().unwrap(); - - let prefix_span = tracing::info_span!("prefix"); - let _enter_prefix_span = prefix_span.enter(); - - let telemetry_span = TelemetrySpan::new(); - let _enter_telemetry_span = telemetry_span.enter(); - - let handle = runtime.handle().clone(); - let task_executor = TaskExecutor::from(move |fut, _| handle.spawn(fut).map(|_| ())); - let task_manager = new_task_manager(task_executor); - - let (sender, receiver) = futures::channel::oneshot::channel(); - - task_manager.spawn_handle().spawn( - "log-something", - async move { - log::info!("boo!"); - sender.send(()).unwrap(); - } - .boxed(), - ); - - runtime.block_on(receiver).unwrap(); - runtime.block_on(task_manager.clean_shutdown()); - - let spans = spans_found.lock().take().unwrap(); - assert_eq!(2, spans.len()); - - assert_eq!(spans[0], prefix_span.id().unwrap()); - assert_eq!(spans[1], telemetry_span.span().id().unwrap()); -} - -#[test] -fn telemetry_span_is_forwarded_to_task() { - let executable = env::current_exe().unwrap(); - let output = std::process::Command::new(executable) - .env("SUBPROCESS_TEST", "1") - .args(&["--nocapture", "subprocess_telemetry_span_is_forwarded_to_task"]) - .output() - .unwrap(); - println!("{}", String::from_utf8(output.stdout).unwrap()); - eprintln!("{}", String::from_utf8(output.stderr).unwrap()); - assert!(output.status.success()); -} diff --git a/client/service/test/src/client/mod.rs b/client/service/test/src/client/mod.rs index 17e9ac6db1894..122782ee51ef5 100644 --- a/client/service/test/src/client/mod.rs +++ b/client/service/test/src/client/mod.rs @@ -1744,6 +1744,7 @@ fn cleans_up_closed_notification_sinks_on_block_import() { &substrate_test_runtime_client::GenesisParameters::default().genesis_storage(), None, None, + None, Box::new(TaskExecutor::new()), Default::default(), ) diff --git a/client/service/test/src/lib.rs b/client/service/test/src/lib.rs index 6c99f83d4c517..a80c53a8c21c5 100644 --- a/client/service/test/src/lib.rs +++ b/client/service/test/src/lib.rs @@ -267,7 +267,6 @@ fn node_config. + +#[allow(missing_docs)] +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("IO Error")] + IoError(#[from] std::io::Error), + #[error("This telemetry instance has already been initialized!")] + TelemetryAlreadyInitialized, + #[error("The telemetry worker has been dropped already.")] + TelemetryWorkerDropped, +} + +#[allow(missing_docs)] +pub type Result = std::result::Result; diff --git a/client/telemetry/src/layer.rs b/client/telemetry/src/layer.rs deleted file mode 100644 index 0ce3f97620da9..0000000000000 --- a/client/telemetry/src/layer.rs +++ /dev/null @@ -1,149 +0,0 @@ -// This file is part of Substrate. - -// Copyright (C) 2021 Parity Technologies (UK) Ltd. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 - -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use crate::{initialize_transport, TelemetryWorker}; -use futures::channel::mpsc; -use libp2p::wasm_ext::ExtTransport; -use parking_lot::Mutex; -use std::convert::TryInto; -use std::io; -use tracing::{Event, Id, Subscriber}; -use tracing_subscriber::{layer::Context, registry::LookupSpan, Layer}; - -/// Span name used to report the telemetry. -pub const TELEMETRY_LOG_SPAN: &str = "telemetry-logger"; - -/// `Layer` that handles the logs for telemetries. -#[derive(Debug)] -pub struct TelemetryLayer(Mutex>); - -impl TelemetryLayer { - /// Create a new [`TelemetryLayer`] and [`TelemetryWorker`]. - /// - /// The `buffer_size` defaults to 16. - /// - /// The [`ExtTransport`] is used in WASM contexts where we need some binding between the - /// networking provided by the operating system or environment and libp2p. - /// - /// > **Important**: Each individual call to `write` corresponds to one message. There is no - /// > internal buffering going on. In the context of WebSockets, each `write` - /// > must be one individual WebSockets frame. - pub fn new( - buffer_size: Option, - telemetry_external_transport: Option, - ) -> io::Result<(Self, TelemetryWorker)> { - let transport = initialize_transport(telemetry_external_transport)?; - let worker = TelemetryWorker::new(buffer_size.unwrap_or(16), transport); - let sender = worker.message_sender(); - Ok((Self(Mutex::new(sender)), worker)) - } -} - -impl Layer for TelemetryLayer -where - S: Subscriber + for<'a> LookupSpan<'a>, -{ - fn on_event(&self, event: &Event<'_>, ctx: Context) { - if event.metadata().target() != TELEMETRY_LOG_SPAN { - return; - } - - if let Some(span) = ctx.lookup_current() { - let parents = span.parents(); - - if let Some(span) = std::iter::once(span) - .chain(parents) - .find(|x| x.name() == TELEMETRY_LOG_SPAN) - { - let id = span.id(); - let mut attrs = TelemetryAttrs::new(id.clone()); - let mut vis = TelemetryAttrsVisitor(&mut attrs); - event.record(&mut vis); - - if let TelemetryAttrs { - verbosity: Some(verbosity), - json: Some(json), - .. - } = attrs - { - match self.0.lock().try_send(( - id, - verbosity - .try_into() - .expect("telemetry log message verbosity are u8; qed"), - json, - )) { - Err(err) if err.is_full() => eprintln!("Telemetry buffer overflowed!"), - _ => {} - } - } else { - // NOTE: logging in this function doesn't work - eprintln!( - "missing fields in telemetry log: {:?}. This can happen if \ - `tracing::info_span!` is (mis-)used with the telemetry target \ - directly; you should use the `telemetry!` macro.", - event, - ); - } - } - } - } -} - -#[derive(Debug)] -struct TelemetryAttrs { - verbosity: Option, - json: Option, - id: Id, -} - -impl TelemetryAttrs { - fn new(id: Id) -> Self { - Self { - verbosity: None, - json: None, - id, - } - } -} - -#[derive(Debug)] -struct TelemetryAttrsVisitor<'a>(&'a mut TelemetryAttrs); - -impl<'a> tracing::field::Visit for TelemetryAttrsVisitor<'a> { - fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) { - // noop - } - - fn record_u64(&mut self, field: &tracing::field::Field, value: u64) { - if field.name() == "verbosity" { - (*self.0).verbosity = Some(value); - } - } - - fn record_str(&mut self, field: &tracing::field::Field, value: &str) { - if field.name() == "json" { - (*self.0).json = Some(format!( - r#"{{"id":{},"ts":{:?},"payload":{}}}"#, - self.0.id.into_u64(), - chrono::Local::now().to_rfc3339().to_string(), - value, - )); - } - } -} diff --git a/client/telemetry/src/lib.rs b/client/telemetry/src/lib.rs index b398ee86de4ec..8d3b605db01a5 100644 --- a/client/telemetry/src/lib.rs +++ b/client/telemetry/src/lib.rs @@ -29,7 +29,7 @@ //! identify which substrate node is reporting the telemetry. Every task spawned using sc-service's //! `TaskManager` automatically inherit this span. //! -//! Substrate's nodes initialize/register with the [`TelemetryWorker`] using a [`TelemetryHandle`]. +//! Substrate's nodes initialize/register with the [`TelemetryWorker`] using a [`TelemetryWorkerHandle`]. //! This handle can be cloned and passed around. It uses an asynchronous channel to communicate with //! the running [`TelemetryWorker`] dedicated to registration. Registering can happen at any point //! in time during the process execution. @@ -39,61 +39,45 @@ use futures::{channel::mpsc, prelude::*}; use libp2p::Multiaddr; use log::{error, warn}; +use parking_lot::Mutex; use serde::Serialize; -use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver}; use std::collections::HashMap; -use tracing::Id; +use std::sync::{atomic, Arc}; pub use libp2p::wasm_ext::ExtTransport; +pub use log; pub use serde_json; -pub use tracing; mod endpoints; -mod layer; +mod error; mod node; mod transport; pub use endpoints::*; -pub use layer::*; +pub use error::*; use node::*; use transport::*; /// Substrate DEBUG log level. -pub const SUBSTRATE_DEBUG: u8 = 9; +pub const SUBSTRATE_DEBUG: VerbosityLevel = 9; /// Substrate INFO log level. -pub const SUBSTRATE_INFO: u8 = 0; +pub const SUBSTRATE_INFO: VerbosityLevel = 0; /// Consensus TRACE log level. -pub const CONSENSUS_TRACE: u8 = 9; +pub const CONSENSUS_TRACE: VerbosityLevel = 9; /// Consensus DEBUG log level. -pub const CONSENSUS_DEBUG: u8 = 5; +pub const CONSENSUS_DEBUG: VerbosityLevel = 5; /// Consensus WARN log level. -pub const CONSENSUS_WARN: u8 = 4; +pub const CONSENSUS_WARN: VerbosityLevel = 4; /// Consensus INFO log level. -pub const CONSENSUS_INFO: u8 = 1; +pub const CONSENSUS_INFO: VerbosityLevel = 1; -pub(crate) type TelemetryMessage = (Id, u8, String); +/// Telemetry message verbosity. +pub type VerbosityLevel = u8; -/// A handle representing a telemetry span, with the capability to enter the span if it exists. -#[derive(Debug, Clone)] -pub struct TelemetrySpan(tracing::Span); - -impl TelemetrySpan { - /// Enters this span, returning a guard that will exit the span when dropped. - pub fn enter(&self) -> tracing::span::Entered { - self.0.enter() - } - - /// Constructs a new [`TelemetrySpan`]. - pub fn new() -> Self { - Self(tracing::error_span!(TELEMETRY_LOG_SPAN)) - } - - /// Return a clone of the underlying `tracing::Span` instance. - pub fn span(&self) -> tracing::Span { - self.0.clone() - } -} +pub(crate) type Id = u64; +pub(crate) type TelemetryPayload = serde_json::Map; +pub(crate) type TelemetryMessage = (Id, VerbosityLevel, TelemetryPayload); /// Message sent when the connection (re-)establishes. #[derive(Debug, Serialize)] @@ -129,64 +113,79 @@ pub struct TelemetryWorker { message_sender: mpsc::Sender, register_receiver: mpsc::UnboundedReceiver, register_sender: mpsc::UnboundedSender, + id_counter: Arc, transport: WsTrans, } impl TelemetryWorker { - pub(crate) fn new(buffer_size: usize, transport: WsTrans) -> Self { + /// Instantiate a new [`TelemetryWorker`] which can run in background. + /// + /// Only one is needed per process. + pub fn new(buffer_size: usize) -> Result { + let transport = initialize_transport(None)?; let (message_sender, message_receiver) = mpsc::channel(buffer_size); let (register_sender, register_receiver) = mpsc::unbounded(); - Self { + Ok(Self { message_receiver, message_sender, register_receiver, register_sender, + id_counter: Arc::new(atomic::AtomicU64::new(1)), transport, - } + }) } - /// Get a new [`TelemetryHandle`]. + /// Instantiate a new [`TelemetryWorker`] which can run in background. /// - /// This is used when you want to register with the [`TelemetryWorker`]. - pub fn handle(&self) -> TelemetryHandle { - TelemetryHandle { - message_sender: self.register_sender.clone(), - } + /// Only one is needed per process. + pub fn with_transport(buffer_size: usize, transport: Option) -> Result { + let transport = initialize_transport(transport)?; + let (message_sender, message_receiver) = mpsc::channel(buffer_size); + let (register_sender, register_receiver) = mpsc::unbounded(); + + Ok(Self { + message_receiver, + message_sender, + register_receiver, + register_sender, + id_counter: Arc::new(atomic::AtomicU64::new(1)), + transport, + }) } - /// Get a clone of the channel's `Sender` used to send telemetry events. - pub(crate) fn message_sender(&self) -> mpsc::Sender { - self.message_sender.clone() + /// Get a new [`TelemetryWorkerHandle`]. + /// + /// This is used when you want to register with the [`TelemetryWorker`]. + pub fn handle(&self) -> TelemetryWorkerHandle { + TelemetryWorkerHandle { + message_sender: self.message_sender.clone(), + register_sender: self.register_sender.clone(), + id_counter: self.id_counter.clone(), + } } /// Run the telemetry worker. /// /// This should be run in a background task. - pub async fn run(self) { - let Self { - mut message_receiver, - message_sender: _, - mut register_receiver, - register_sender: _, - transport, - } = self; - - let mut node_map: HashMap> = HashMap::new(); + pub async fn run(mut self) { + let mut node_map: HashMap> = HashMap::new(); let mut node_pool: HashMap = HashMap::new(); + let mut pending_connection_notifications: Vec<_> = Vec::new(); loop { futures::select! { - message = message_receiver.next() => Self::process_message( + message = self.message_receiver.next() => Self::process_message( message, &mut node_pool, &node_map, ).await, - init_payload = register_receiver.next() => Self::process_register( + init_payload = self.register_receiver.next() => Self::process_register( init_payload, &mut node_pool, &mut node_map, - transport.clone(), + &mut pending_connection_notifications, + self.transport.clone(), ).await, } } @@ -195,7 +194,8 @@ impl TelemetryWorker { async fn process_register( input: Option, node_pool: &mut HashMap>, - node_map: &mut HashMap>, + node_map: &mut HashMap>, + pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>, transport: WsTrans, ) { let input = input.expect("the stream is never closed; qed"); @@ -212,7 +212,7 @@ impl TelemetryWorker { Ok(serde_json::Value::Object(mut value)) => { value.insert("msg".into(), "system.connected".into()); let mut obj = serde_json::Map::new(); - obj.insert("id".to_string(), id.into_u64().into()); + obj.insert("id".to_string(), id.into()); obj.insert("payload".to_string(), value.into()); Some(obj) } @@ -245,6 +245,16 @@ impl TelemetryWorker { }); node.connection_messages.extend(connection_message.clone()); + + pending_connection_notifications.retain(|(addr_b, connection_message)| { + if *addr_b == addr { + node.telemetry_connection_notifier + .push(connection_message.clone()); + false + } else { + true + } + }); } } Register::Notifier { @@ -252,15 +262,15 @@ impl TelemetryWorker { connection_notifier, } => { for addr in addresses { + // If the Node has been initialized, we directly push the connection_notifier. + // Otherwise we push it to a queue that will be consumed when the connection + // initializes, thus ensuring that the connection notifier will be sent to the + // Node when it becomes available. if let Some(node) = node_pool.get_mut(&addr) { node.telemetry_connection_notifier .push(connection_notifier.clone()); } else { - log::error!( - target: "telemetry", - "Received connection notifier for unknown node ({}). This is a bug.", - addr, - ); + pending_connection_notifications.push((addr, connection_notifier.clone())); } } } @@ -271,21 +281,31 @@ impl TelemetryWorker { async fn process_message( input: Option, node_pool: &mut HashMap>, - node_map: &HashMap>, + node_map: &HashMap>, ) { - let (id, verbosity, message) = input.expect("the stream is never closed; qed"); + let (id, verbosity, payload) = input.expect("the stream is never closed; qed"); + + let ts = chrono::Local::now().to_rfc3339().to_string(); + let mut message = serde_json::Map::new(); + message.insert("id".into(), id.into()); + message.insert("ts".into(), ts.into()); + message.insert("payload".into(), payload.into()); let nodes = if let Some(nodes) = node_map.get(&id) { nodes } else { - // This is a normal error because the telemetry span is entered before the telemetry - // is initialized so it is possible that some messages in the beginning don't get - // through. + // This is a normal error because the telemetry ID exists before the telemetry is + // initialized. log::trace!( target: "telemetry", "Received telemetry log for unknown id ({:?}): {}", id, - message, + serde_json::to_string(&message) + .unwrap_or_else(|err| format!( + "could not be serialized ({}): {:?}", + err, + message, + )), ); return; }; @@ -304,12 +324,17 @@ impl TelemetryWorker { if let Some(node) = node_pool.get_mut(&addr) { let _ = node.send(message.clone()).await; } else { - log::error!( + log::debug!( target: "telemetry", "Received message for unknown node ({}). This is a bug. \ Message sent: {}", addr, - message, + serde_json::to_string(&message) + .unwrap_or_else(|err| format!( + "could not be serialized ({}): {:?}", + err, + message, + )), ); } } @@ -318,11 +343,41 @@ impl TelemetryWorker { /// Handle to the [`TelemetryWorker`] thats allows initializing the telemetry for a Substrate node. #[derive(Debug, Clone)] -pub struct TelemetryHandle { - message_sender: mpsc::UnboundedSender, +pub struct TelemetryWorkerHandle { + message_sender: mpsc::Sender, + register_sender: mpsc::UnboundedSender, + id_counter: Arc, } -impl TelemetryHandle { +impl TelemetryWorkerHandle { + /// Instantiate a new [`Telemetry`] object. + pub fn new_telemetry(&mut self, endpoints: TelemetryEndpoints) -> Telemetry { + let addresses = endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(); + + Telemetry { + message_sender: self.message_sender.clone(), + register_sender: self.register_sender.clone(), + id: self.id_counter.fetch_add(1, atomic::Ordering::Relaxed), + connection_notifier: TelemetryConnectionNotifier { + register_sender: self.register_sender.clone(), + addresses, + }, + endpoints: Some(endpoints), + } + } +} + +/// A telemetry instance that can be used to send telemetry messages. +#[derive(Debug)] +pub struct Telemetry { + message_sender: mpsc::Sender, + register_sender: mpsc::UnboundedSender, + id: Id, + connection_notifier: TelemetryConnectionNotifier, + endpoints: Option, +} + +impl Telemetry { /// Initialize the telemetry with the endpoints provided in argument for the current substrate /// node. /// @@ -333,42 +388,67 @@ impl TelemetryHandle { /// /// The `connection_message` argument is a JSON object that is sent every time the connection /// (re-)establishes. - pub fn start_telemetry( - &mut self, - span: TelemetrySpan, - endpoints: TelemetryEndpoints, - connection_message: ConnectionMessage, - ) -> TelemetryConnectionNotifier { - let Self { message_sender } = self; - - let connection_notifier = TelemetryConnectionNotifier { - message_sender: message_sender.clone(), - addresses: endpoints.0.iter().map(|(addr, _)| addr.clone()).collect(), + pub fn start_telemetry(&mut self, connection_message: ConnectionMessage) -> Result<()> { + let endpoints = match self.endpoints.take() { + Some(x) => x, + None => return Err(Error::TelemetryAlreadyInitialized), }; - match span.0.id() { - Some(id) => { - match message_sender.unbounded_send(Register::Telemetry { - id, - endpoints, - connection_message, - }) { - Ok(()) => {} - Err(err) => error!( - target: "telemetry", - "Could not initialize telemetry: \ - the telemetry is probably already running: {}", - err, - ), - } - } - None => error!( + self.register_sender + .unbounded_send(Register::Telemetry { + id: self.id, + endpoints, + connection_message, + }) + .map_err(|_| Error::TelemetryWorkerDropped) + } + + /// Make a new clonable handle to this [`Telemetry`]. This is used for reporting telemetries. + pub fn handle(&self) -> TelemetryHandle { + TelemetryHandle { + message_sender: Arc::new(Mutex::new(self.message_sender.clone())), + id: self.id, + connection_notifier: self.connection_notifier.clone(), + } + } +} + +/// Handle to a [`Telemetry`]. +/// +/// Used to report telemetry messages. +#[derive(Debug, Clone)] +pub struct TelemetryHandle { + message_sender: Arc>>, + id: Id, + connection_notifier: TelemetryConnectionNotifier, +} + +impl TelemetryHandle { + /// Send telemetry messages. + pub fn send_telemetry(&self, verbosity: VerbosityLevel, payload: TelemetryPayload) { + match self + .message_sender + .lock() + .try_send((self.id, verbosity, payload)) + { + Ok(()) => {} + Err(err) if err.is_full() => log::trace!( target: "telemetry", - "Could not initialize telemetry: the span could not be entered", + "Telemetry channel full.", + ), + Err(_) => log::trace!( + target: "telemetry", + "Telemetry channel closed.", ), } + } - connection_notifier + /// Get event stream for telemetry connection established events. + /// + /// This function will return an error if the telemetry has already been started by + /// [`Telemetry::start_telemetry`]. + pub fn on_connect_stream(&self) -> ConnectionNotifierReceiver { + self.connection_notifier.on_connect_stream() } } @@ -376,18 +456,14 @@ impl TelemetryHandle { /// (re-)establishes. #[derive(Clone, Debug)] pub struct TelemetryConnectionNotifier { - message_sender: mpsc::UnboundedSender, + register_sender: mpsc::UnboundedSender, addresses: Vec, } impl TelemetryConnectionNotifier { - /// Get event stream for telemetry connection established events. - /// - /// This function will return an error if the telemetry has already been started by - /// [`TelemetryHandle::start_telemetry`]. - pub fn on_connect_stream(&self) -> TracingUnboundedReceiver<()> { - let (message_sender, message_receiver) = tracing_unbounded("mpsc_telemetry_on_connect"); - if let Err(err) = self.message_sender.unbounded_send(Register::Notifier { + fn on_connect_stream(&self) -> ConnectionNotifierReceiver { + let (message_sender, message_receiver) = connection_notifier_channel(); + if let Err(err) = self.register_sender.unbounded_send(Register::Notifier { addresses: self.addresses.clone(), connection_notifier: message_sender, }) { @@ -428,34 +504,34 @@ enum Register { /// # let authority_id = 42_u64; /// # let set_id = (43_u64, 44_u64); /// # let authorities = vec![45_u64]; -/// telemetry!(CONSENSUS_INFO; "afg.authority_set"; -/// "authority_id" => authority_id.to_string(), -/// "authority_set_id" => ?set_id, -/// "authorities" => authorities, +/// # let telemetry: Option = None; +/// telemetry!( +/// telemetry; // an `Option` +/// CONSENSUS_INFO; +/// "afg.authority_set"; +/// "authority_id" => authority_id.to_string(), +/// "authority_set_id" => ?set_id, +/// "authorities" => authorities, /// ); /// ``` #[macro_export(local_inner_macros)] macro_rules! telemetry { - ( $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{ - let verbosity: u8 = $verbosity; - match format_fields_to_json!($($t)*) { - Err(err) => { - $crate::tracing::error!( - target: "telemetry", - "Could not serialize value for telemetry: {}", - err, - ); - }, - Ok(mut json) => { - // NOTE: the span id will be added later in the JSON for the greater good - json.insert("msg".into(), $msg.into()); - let serialized_json = $crate::serde_json::to_string(&json) - .expect("contains only string keys; qed"); - $crate::tracing::info!(target: $crate::TELEMETRY_LOG_SPAN, - verbosity, - json = serialized_json.as_str(), - ); - }, + ( $telemetry:expr; $verbosity:expr; $msg:expr; $( $t:tt )* ) => {{ + if let Some(telemetry) = $telemetry.as_ref() { + let verbosity: $crate::VerbosityLevel = $verbosity; + match format_fields_to_json!($($t)*) { + Err(err) => { + $crate::log::debug!( + target: "telemetry", + "Could not serialize value for telemetry: {}", + err, + ); + }, + Ok(mut json) => { + json.insert("msg".into(), $msg.into()); + telemetry.send_telemetry(verbosity, json); + }, + } } }}; } diff --git a/client/telemetry/src/node.rs b/client/telemetry/src/node.rs index e47bc2f9634f7..2d1a04b00a4cd 100644 --- a/client/telemetry/src/node.rs +++ b/client/telemetry/src/node.rs @@ -16,6 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::TelemetryPayload; +use futures::channel::mpsc; use futures::prelude::*; use libp2p::core::transport::Transport; use libp2p::Multiaddr; @@ -23,7 +25,13 @@ use rand::Rng as _; use std::{fmt, mem, pin::Pin, task::Context, task::Poll, time::Duration}; use wasm_timer::Delay; -pub(crate) type ConnectionNotifierSender = sp_utils::mpsc::TracingUnboundedSender<()>; +pub(crate) type ConnectionNotifierSender = mpsc::Sender<()>; +pub(crate) type ConnectionNotifierReceiver = mpsc::Receiver<()>; + +pub(crate) fn connection_notifier_channel() -> (ConnectionNotifierSender, ConnectionNotifierReceiver) +{ + mpsc::channel(0) +} /// Handler for a single telemetry node. /// @@ -45,7 +53,7 @@ pub(crate) struct Node { /// Transport used to establish new connections. transport: TTrans, /// Messages that are sent when the connection (re-)establishes. - pub(crate) connection_messages: Vec>, + pub(crate) connection_messages: Vec, /// Notifier for when the connection (re-)establishes. pub(crate) telemetry_connection_notifier: Vec, } @@ -123,7 +131,7 @@ where pub(crate) enum Infallible {} -impl Sink for Node +impl Sink for Node where TTrans: Clone + Unpin, TTrans::Dial: Unpin, @@ -234,16 +242,28 @@ where Poll::Ready(Ok(())) } - fn start_send(mut self: Pin<&mut Self>, item: String) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, item: TelemetryPayload) -> Result<(), Self::Error> { match &mut self.socket { - NodeSocket::Connected(conn) => { - let _ = conn.sink.start_send_unpin(item.into()).expect("boo"); - } + NodeSocket::Connected(conn) => match serde_json::to_vec(&item) { + Ok(data) => { + let _ = conn.sink.start_send_unpin(data); + } + Err(err) => log::debug!( + target: "telemetry", + "Could not serialize payload: {}", + err, + ), + }, _socket => { log::trace!( target: "telemetry", "Message has been discarded: {}", - item, + serde_json::to_string(&item) + .unwrap_or_else(|err| format!( + "could not be serialized ({}): {:?}", + err, + item, + )), ); } } diff --git a/client/tracing/Cargo.toml b/client/tracing/Cargo.toml index 34aa9d9d4e7f0..a119fb48b34e2 100644 --- a/client/tracing/Cargo.toml +++ b/client/tracing/Cargo.toml @@ -30,7 +30,6 @@ tracing-core = "0.1.17" tracing-log = "0.1.1" tracing-subscriber = "0.2.15" sp-tracing = { version = "3.0.0", path = "../../primitives/tracing" } -sc-telemetry = { version = "3.0.0", path = "../telemetry" } sc-tracing-proc-macro = { version = "3.0.0", path = "./proc-macro" } [target.'cfg(target_os = "unknown")'.dependencies] diff --git a/client/tracing/src/lib.rs b/client/tracing/src/lib.rs index 2b0044a6f25b0..41947d4c0ed8e 100644 --- a/client/tracing/src/lib.rs +++ b/client/tracing/src/lib.rs @@ -24,7 +24,7 @@ //! //! See `sp-tracing` for examples on how to use tracing. //! -//! Currently we provide `Log` (default), `Telemetry` variants for `Receiver` +//! Currently we only provide `Log` (default). #![warn(missing_docs)] @@ -46,7 +46,6 @@ use tracing_subscriber::{ CurrentSpan, layer::{Layer, Context}, }; -use sc_telemetry::{telemetry, SUBSTRATE_INFO}; use sp_tracing::{WASM_NAME_KEY, WASM_TARGET_KEY, WASM_TRACE_IDENTIFIER}; #[doc(hidden)] @@ -67,8 +66,6 @@ pub struct ProfilingLayer { pub enum TracingReceiver { /// Output to logger Log, - /// Output to telemetry - Telemetry, } impl Default for TracingReceiver { @@ -214,10 +211,6 @@ impl ProfilingLayer { pub fn new(receiver: TracingReceiver, targets: &str) -> Self { match receiver { TracingReceiver::Log => Self::new_with_handler(Box::new(LogTraceHandler), targets), - TracingReceiver::Telemetry => Self::new_with_handler( - Box::new(TelemetryTraceHandler), - targets, - ), } } @@ -392,33 +385,6 @@ impl TraceHandler for LogTraceHandler { } } -/// TraceHandler for sending span data to telemetry, -/// Please see telemetry documentation for details on how to specify endpoints and -/// set the required telemetry level to activate tracing messages -pub struct TelemetryTraceHandler; - -impl TraceHandler for TelemetryTraceHandler { - fn handle_span(&self, span_datum: SpanDatum) { - telemetry!(SUBSTRATE_INFO; "tracing.profiling"; - "name" => span_datum.name, - "target" => span_datum.target, - "time" => span_datum.overall_time.as_nanos(), - "id" => span_datum.id.into_u64(), - "parent_id" => span_datum.parent_id.as_ref().map(|i| i.into_u64()), - "values" => span_datum.values - ); - } - - fn handle_event(&self, event: TraceEvent) { - telemetry!(SUBSTRATE_INFO; "tracing.event"; - "name" => event.name, - "target" => event.target, - "parent_id" => event.parent_id.as_ref().map(|i| i.into_u64()), - "values" => event.values - ); - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/client/tracing/src/logging/event_format.rs b/client/tracing/src/logging/event_format.rs index 37f9ed16ead7d..25fd2f3ba3d70 100644 --- a/client/tracing/src/logging/event_format.rs +++ b/client/tracing/src/logging/event_format.rs @@ -62,10 +62,6 @@ where S: Subscriber + for<'a> LookupSpan<'a>, N: for<'a> FormatFields<'a> + 'static, { - if event.metadata().target() == sc_telemetry::TELEMETRY_LOG_SPAN { - return Ok(()); - } - let writer = &mut MaybeColorWriter::new(self.enable_color, writer); let normalized_meta = event.normalized_metadata(); let meta = normalized_meta.as_ref().unwrap_or_else(|| event.metadata()); diff --git a/client/tracing/src/logging/mod.rs b/client/tracing/src/logging/mod.rs index 433e3ee4931c5..187b6a387f328 100644 --- a/client/tracing/src/logging/mod.rs +++ b/client/tracing/src/logging/mod.rs @@ -29,16 +29,16 @@ mod layers; pub use directives::*; pub use sc_tracing_proc_macro::*; -use sc_telemetry::{ExtTransport, TelemetryWorker}; use std::io; use tracing::Subscriber; use tracing_subscriber::{ + filter::LevelFilter, fmt::time::ChronoLocal, fmt::{ format, FormatEvent, FormatFields, Formatter, Layer as FmtLayer, MakeWriter, SubscriberBuilder, }, - layer::{self, SubscriberExt}, filter::LevelFilter, + layer::{self, SubscriberExt}, registry::LookupSpan, EnvFilter, FmtSubscriber, Layer, Registry, }; @@ -75,8 +75,6 @@ fn prepare_subscriber( directives: &str, profiling_targets: Option<&str>, force_colors: Option, - telemetry_buffer_size: Option, - telemetry_external_transport: Option, builder_hook: impl Fn( SubscriberBuilder< format::DefaultFields, @@ -85,7 +83,7 @@ fn prepare_subscriber( fn() -> std::io::Stderr, >, ) -> SubscriberBuilder, -) -> Result<(impl Subscriber + for<'a> LookupSpan<'a>, TelemetryWorker)> +) -> Result LookupSpan<'a>> where N: for<'writer> FormatFields<'writer> + 'static, E: FormatEvent + 'static, @@ -130,10 +128,9 @@ where if let Some(profiling_targets) = profiling_targets { env_filter = parse_user_directives(env_filter, profiling_targets)?; - env_filter = env_filter - .add_directive( - parse_default_directive("sc_tracing=trace").expect("provided directive is valid") - ); + env_filter = env_filter.add_directive( + parse_default_directive("sc_tracing=trace").expect("provided directive is valid"), + ); } let max_level_hint = Layer::::max_level_hint(&env_filter); @@ -164,8 +161,6 @@ where "%Y-%m-%d %H:%M:%S%.3f".to_string() }); - let (telemetry_layer, telemetry_worker) = - sc_telemetry::TelemetryLayer::new(telemetry_buffer_size, telemetry_external_transport)?; let event_format = EventFormat { timer, display_target: !simple, @@ -187,20 +182,18 @@ where #[cfg(not(target_os = "unknown"))] let builder = builder_hook(builder); - let subscriber = builder.finish().with(PrefixLayer).with(telemetry_layer); + let subscriber = builder.finish().with(PrefixLayer); #[cfg(target_os = "unknown")] let subscriber = subscriber.with(ConsoleLogLayer::new(event_format)); - Ok((subscriber, telemetry_worker)) + Ok(subscriber) } /// A builder that is used to initialize the global logger. pub struct LoggerBuilder { directives: String, profiling: Option<(crate::TracingReceiver, String)>, - telemetry_buffer_size: Option, - telemetry_external_transport: Option, log_reloading: bool, force_colors: Option, } @@ -211,8 +204,6 @@ impl LoggerBuilder { Self { directives: directives.into(), profiling: None, - telemetry_buffer_size: None, - telemetry_external_transport: None, log_reloading: true, force_colors: None, } @@ -234,18 +225,6 @@ impl LoggerBuilder { self } - /// Set a custom buffer size for the telemetry. - pub fn with_telemetry_buffer_size(&mut self, buffer_size: usize) -> &mut Self { - self.telemetry_buffer_size = Some(buffer_size); - self - } - - /// Set a custom network transport (used for the telemetry). - pub fn with_transport(&mut self, transport: ExtTransport) -> &mut Self { - self.telemetry_external_transport = Some(transport); - self - } - /// Force enable/disable colors. pub fn with_colors(&mut self, enable: bool) -> &mut Self { self.force_colors = Some(enable); @@ -255,64 +234,56 @@ impl LoggerBuilder { /// Initialize the global logger /// /// This sets various global logging and tracing instances and thus may only be called once. - pub fn init(self) -> Result { + pub fn init(self) -> Result<()> { if let Some((tracing_receiver, profiling_targets)) = self.profiling { if self.log_reloading { - let (subscriber, telemetry_worker) = prepare_subscriber( + let subscriber = prepare_subscriber( &self.directives, Some(&profiling_targets), self.force_colors, - self.telemetry_buffer_size, - self.telemetry_external_transport, |builder| enable_log_reloading!(builder), )?; let profiling = crate::ProfilingLayer::new(tracing_receiver, &profiling_targets); tracing::subscriber::set_global_default(subscriber.with(profiling))?; - Ok(telemetry_worker) + Ok(()) } else { - let (subscriber, telemetry_worker) = prepare_subscriber( + let subscriber = prepare_subscriber( &self.directives, Some(&profiling_targets), self.force_colors, - self.telemetry_buffer_size, - self.telemetry_external_transport, |builder| builder, )?; let profiling = crate::ProfilingLayer::new(tracing_receiver, &profiling_targets); tracing::subscriber::set_global_default(subscriber.with(profiling))?; - Ok(telemetry_worker) + Ok(()) } } else { if self.log_reloading { - let (subscriber, telemetry_worker) = prepare_subscriber( + let subscriber = prepare_subscriber( &self.directives, None, self.force_colors, - self.telemetry_buffer_size, - self.telemetry_external_transport, |builder| enable_log_reloading!(builder), )?; tracing::subscriber::set_global_default(subscriber)?; - Ok(telemetry_worker) + Ok(()) } else { - let (subscriber, telemetry_worker) = prepare_subscriber( + let subscriber = prepare_subscriber( &self.directives, None, self.force_colors, - self.telemetry_buffer_size, - self.telemetry_external_transport, |builder| builder, )?; tracing::subscriber::set_global_default(subscriber)?; - Ok(telemetry_worker) + Ok(()) } } } @@ -335,7 +306,8 @@ mod tests { #[test] fn test_logger_filters() { if env::var("RUN_TEST_LOGGER_FILTERS").is_ok() { - let test_directives = "afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error"; + let test_directives = + "afg=debug,sync=trace,client=warn,telemetry,something-with-dash=error"; init_logger(&test_directives); tracing::dispatcher::get_default(|dispatcher| { diff --git a/test-utils/client/src/lib.rs b/test-utils/client/src/lib.rs index cdeefccc4086e..d8cc40d5561c1 100644 --- a/test-utils/client/src/lib.rs +++ b/test-utils/client/src/lib.rs @@ -229,6 +229,7 @@ impl TestClientBuilder Result { - let transport = ExtTransport::new(ffi::websocket_transport()); - let mut logger = LoggerBuilder::new(pattern); - logger.with_transport(transport); - logger.init() +pub fn init_logging(pattern: &str) -> Result<(), sc_tracing::logging::Error> { + LoggerBuilder::new(pattern).init() } /// Create a service configuration from a chain spec. @@ -51,7 +45,6 @@ pub fn init_logging_and_telemetry( /// This configuration contains good defaults for a browser light client. pub async fn browser_configuration( chain_spec: GenericChainSpec, - telemetry_handle: Option, ) -> Result> where G: RuntimeGenesis + 'static, @@ -82,7 +75,6 @@ where async {} }).into(), telemetry_external_transport: Some(transport), - telemetry_handle, role: Role::Light, database: { info!("Opening Indexed DB database '{}'...", name);