-
Notifications
You must be signed in to change notification settings - Fork 3.7k
/
Copy pathmetrics.rs
287 lines (253 loc) · 9.86 KB
/
metrics.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// Copyright © Aptos Foundation
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0
use aptos_metrics_core::{
exponential_buckets, histogram_opts, register_histogram_vec, register_int_counter,
register_int_counter_vec, register_int_gauge, HistogramTimer, HistogramVec, IntCounter,
IntCounterVec, IntGauge,
};
use once_cell::sync::Lazy;
use std::time::Instant;
// Subscription stream termination labels
pub const MAX_CONSECUTIVE_REQUESTS_LABEL: &str = "max_consecutive_requests";
// Histogram buckets for tracking chunk sizes of data responses
const DATA_RESPONSE_CHUNK_SIZE_BUCKETS: &[f64] = &[
1.0, 2.0, 4.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0,
7500.0, 10_000.0, 12_500.0, 15_000.0, 17_500.0, 20_000.0, 25_000.0, 30_000.0, 35_000.0,
40_000.0, 45_000.0, 50_000.0, 75_000.0, 100_000.0,
];
// Latency buckets for network latencies (i.e., the defaults only go up
// to 10 seconds, but we usually require more).
const NETWORK_LATENCY_BUCKETS: [f64; 14] = [
0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 7.5, 10.0, 15.0, 20.0, 30.0, 40.0, 60.0,
];
/// Counter for the number of active data streams
pub static ACTIVE_DATA_STREAMS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_data_streaming_service_active_data_streams",
"Counters related to the number of active data streams",
)
.unwrap()
});
/// Counter for the number of times there was a send failure
pub static DATA_STREAM_SEND_FAILURE: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_data_streaming_service_stream_send_failure",
"Counters related to send failures along the data stream",
)
.unwrap()
});
/// Counter for the creation of new data streams
pub static CREATE_DATA_STREAM: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_create_data_stream",
"Counters related to the creation of new data streams",
&["request_type"]
)
.unwrap()
});
/// Counter for the creation of new subscription streams
pub static CREATE_SUBSCRIPTION_STREAM: Lazy<IntCounter> = Lazy::new(|| {
register_int_counter!(
"aptos_data_streaming_service_create_subscription_stream",
"Counters related to the creation of new subscription streams",
)
.unwrap()
});
/// Counter for the termination of existing data streams
pub static TERMINATE_DATA_STREAM: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_terminate_data_stream",
"Counters related to the termination of existing data streams",
&["feedback_type"]
)
.unwrap()
});
/// Counter for the termination of existing subscription streams
pub static TERMINATE_SUBSCRIPTION_STREAM: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_terminate_subscription_stream",
"Counters related to the termination of existing subscription streams",
&["termination_reason"]
)
.unwrap()
});
/// Counter for stream progress check errors
pub static CHECK_STREAM_PROGRESS_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_check_progress_error",
"Counters related to stream progress check errors",
&["error_type"]
)
.unwrap()
});
/// Counter for global data summary errors
pub static GLOBAL_DATA_SUMMARY_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_global_summary_error",
"Counters related to global data summary errors",
&["error_type"]
)
.unwrap()
});
/// Counter for tracking sent data requests
pub static SENT_DATA_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_sent_data_requests",
"Counters related to sent data requests",
&["request_type"]
)
.unwrap()
});
/// Counter for tracking sent data requests for missing data
pub static SENT_DATA_REQUESTS_FOR_MISSING_DATA: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_sent_data_requests_for_missing_data",
"Counters related to sent data requests for missing data",
&["request_type"]
)
.unwrap()
});
/// Counter for tracking data requests that were retried (including
/// the new timeouts).
pub static RETRIED_DATA_REQUESTS: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_retried_data_requests",
"Counters related to retried data requests",
&["request_type", "request_timeout"]
)
.unwrap()
});
/// Counter for the number of max concurrent prefetching requests
pub static MAX_CONCURRENT_PREFETCHING_REQUESTS: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_data_streaming_service_max_concurrent_prefetching_requests",
"The number of max concurrent prefetching requests",
)
.unwrap()
});
/// Counter for the number of pending data responses
pub static PENDING_DATA_RESPONSES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_data_streaming_service_pending_data_responses",
"Counters related to the number of pending data responses",
)
.unwrap()
});
/// Counter for the number of complete pending data responses
pub static COMPLETE_PENDING_DATA_RESPONSES: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_data_streaming_service_complete_pending_data_responses",
"Counters related to the number of complete pending data responses",
)
.unwrap()
});
/// Counter for tracking received data responses
pub static RECEIVED_DATA_RESPONSE: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_received_data_response",
"Counters related to received data responses",
&["response_type"]
)
.unwrap()
});
/// Counter for tracking the sizes of received data chunks
pub static RECEIVED_DATA_RESPONSE_CHUNK_SIZE: Lazy<HistogramVec> = Lazy::new(|| {
let histogram_opts = histogram_opts!(
"aptos_data_streaming_service_received_data_chunk_sizes",
"Counter for tracking sizes of data chunks received by the data stream",
DATA_RESPONSE_CHUNK_SIZE_BUCKETS.to_vec()
);
register_histogram_vec!(histogram_opts, &["request_type", "response_type"]).unwrap()
});
/// Counter for tracking received data responses
pub static RECEIVED_RESPONSE_ERROR: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"aptos_data_streaming_service_received_response_error",
"Counters related to received response errors",
&["error_type"]
)
.unwrap()
});
/// Counter that keeps track of the subscription stream lag (versions)
pub static SUBSCRIPTION_STREAM_LAG: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"aptos_data_streaming_service_subscription_stream_lag",
"Counters related to the subscription stream lag",
)
.unwrap()
});
/// Time it takes to process a data request
pub static DATA_REQUEST_PROCESSING_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
let histogram_opts = histogram_opts!(
"aptos_data_streaming_service_data_request_processing_latency",
"Counters related to data request processing latencies",
NETWORK_LATENCY_BUCKETS.to_vec()
);
register_histogram_vec!(histogram_opts, &["request_type"]).unwrap()
});
/// Time it takes to send a data notification after a successful data response
pub static DATA_NOTIFICATION_SEND_LATENCY: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_data_streaming_service_data_notification_send_latency",
"Counters related to the data notification send latency",
&["label"],
exponential_buckets(/*start=*/ 1e-3, /*factor=*/ 2.0, /*count=*/ 30).unwrap(),
)
.unwrap()
});
/// Increments the given counter with the single label value.
pub fn increment_counter(counter: &Lazy<IntCounterVec>, label: &str) {
counter.with_label_values(&[label]).inc();
}
/// Increments the given counter with two label values.
pub fn increment_counter_multiple_labels(
counter: &Lazy<IntCounterVec>,
first_label: &str,
second_label: &str,
) {
counter
.with_label_values(&[first_label, second_label])
.inc();
}
/// Adds a new observation for the given histogram and label
pub fn observe_duration(histogram: &Lazy<HistogramVec>, label: &str, start_time: Instant) {
histogram
.with_label_values(&[label])
.observe(start_time.elapsed().as_secs_f64());
}
/// Adds a new observation for the given histogram, labels and value
pub fn observe_values(
histogram: &Lazy<HistogramVec>,
first_label: &str,
second_label: &str,
value: u64,
) {
histogram
.with_label_values(&[first_label, second_label])
.observe(value as f64);
}
/// Sets the number of active data streams
pub fn set_active_data_streams(value: usize) {
ACTIVE_DATA_STREAMS.set(value as i64);
}
/// Sets the number of max concurrent requests
pub fn set_max_concurrent_requests(value: u64) {
MAX_CONCURRENT_PREFETCHING_REQUESTS.set(value as i64);
}
/// Sets the number of complete pending data responses
pub fn set_complete_pending_data_responses(value: u64) {
COMPLETE_PENDING_DATA_RESPONSES.set(value as i64);
}
/// Sets the number of pending data responses
pub fn set_pending_data_responses(value: u64) {
PENDING_DATA_RESPONSES.set(value as i64);
}
/// Sets the subscription stream lag
pub fn set_subscription_stream_lag(value: u64) {
SUBSCRIPTION_STREAM_LAG.set(value as i64);
}
/// Starts the timer for the provided histogram and label values.
pub fn start_timer(histogram: &Lazy<HistogramVec>, label: String) -> HistogramTimer {
histogram.with_label_values(&[&label]).start_timer()
}