diff --git a/scripts/process_logs/README.md b/scripts/process_logs/README.md index 2cbd84ae7a..4a376f16ee 100644 --- a/scripts/process_logs/README.md +++ b/scripts/process_logs/README.md @@ -71,6 +71,7 @@ Contains dictionary of output log files, each with following options: - ``: log level (DEBUG, INFO, WARNING, etc) - ``: source file which emitted message - ``: function which emitted message + - ``: message body - ``: any user-defined attribute, explained later in *chains* section @@ -97,6 +98,23 @@ of matcher, value is parameter. For submatchers that don't have parameters (for example, other custom matchers) value should be omitted. Custom matcher matches message if any of its sub-matchers matches message. +Matcher name resolution rules are: +- check if this is any of builtin matchers (listed below) +- check if this is a custom matcher +- consider this custom attribute matcher + +Attribute matchers check if message has given attribute, and, when value is +provided, if attribute contains given value. For example, matcher +```yaml +- is_request: +``` +checks that message has attribute `is_request`, and matcher +```yaml +- reqId: 42 +``` +checks that message has attribute `reqId` containing value `42` + + #### Builtin matchers - `timestamp`: checks if message timestamp is within defined limits. Parameter for this matcher is dictionary with `min` and `max` values @@ -137,17 +155,6 @@ matches message if any of its sub-matchers matches message. - `node`: message is actually from node, not replica - `master`: message is from master replica or from node - `backup`: message is from any of backup replicas -- `tag`: checks if message is tagged with specified tag (tagging explained - later in *chains* section), for example: - ```yaml - - tag: NETWORK - ``` -- `attribute`: check if message has custom attribute (explained later in - *chains* section) with matching name and value, specified as a dictionary - with single key-value pair, for example: - ```yaml - - attribute: {viewNo: 2} - ``` - `any`: check if message matches any submatcher, specified as a list: ```yaml - any: @@ -262,24 +269,39 @@ commands. - `and`: perform action when message is matched - `return`: action to perform is to return from current chain to calling one - `drop`: action to perform is to drop message altogether +- `track_requests`: track requests, adding multiple attributes to relevant + messages: + - reqId: request identifier + - TODO: list other attibutes - `tag`: optionally checks if message matches some regex pattern and sets custom tags and/or attributes on it. Parameter for this command is dictionary with following keys: - `pattern`: pattern to match message body - - `tags`: list of tags to add to message on match - `attributes`: dictionary of custom attributes to add to message on match, - with keys being attribute names and values index of matching regex group. + with keys being attribute names and values being one of: + - empty: indicating tag-like attribute which doesn't have any value + - arbitrary string: sets this string as attribute value + - `group `: sets matching regex group as attribute value For example: ```yaml - tag: pattern: sending (\w+), viewNo: (\d+) - tags: [SEND] attributes: - message_type: 1 - view_no: 2 + is_message: + message_action: send + message_type: group 1 + view_no: group 2 + ``` + will add following attributes to messages containing + `sending COMMIT, viewNo: 23`: + ``` + is_message: [] + message_action: send + message_type: COMMIT + view_no: 23 ``` - will add tag `SEND` to messages containing `sending COMMIT, viewNo: 23`, - and adds custom attributes `message_type: COMMIT` and `view_no: 23`. + It's possible for multiple tagger to write to same attribute multiple + times, in this case it will contain all written values. ## Standard (in process_logs.yml) matchers and chains diff --git a/scripts/process_logs/example_network.yml b/scripts/process_logs/example_network.yml index 203c225423..c53f27eaef 100644 --- a/scripts/process_logs/example_network.yml +++ b/scripts/process_logs/example_network.yml @@ -9,7 +9,7 @@ input_logs: # List of input logs chains: # List of processing chains main: # Main processing chain - - match or drop: [network_message] # Drop everything except network-related messages + - match or drop: network_message # Drop everything except network-related messages - log_total: # Call chain log_total - log_ping_pong: # Call chain log_ping_pong - log_disconnect: # Call chain log_disconnect @@ -20,12 +20,12 @@ chains: # List of processing chains # By default return message to calling chain log_ping_pong: - - match: [network_ping_pong] # Return unless message is network ping pong + - match: network_ping_pong # Return unless message is network ping pong - log time: {general: ping_pong} # Add message to general-ping_pong timelog sink - drop: # Drop message, since we'll no longer need it log_disconnect: - - match: [network_disconnect] # Return unless message is network disconnect + - match: network_disconnect # Return unless message is network disconnect - log time: {general: disconnect} # Add message to general-disconnect timelog sink - log time: {alarms: disconnect} # Add message to alarms-disconnect timelog sink - drop: # Drop message, since we'll no longer need it @@ -34,7 +34,7 @@ outputs: logs: # List of output log sinks network: # Network log sink filename: output.log # Filename where to put messages - pattern: | | # Message format + pattern: | | # Message format timelogs: # List of timelog general: # General timelog diff --git a/scripts/process_logs/example_process.yml b/scripts/process_logs/example_process.yml index 387f3c38c0..e23b75d277 100644 --- a/scripts/process_logs/example_process.yml +++ b/scripts/process_logs/example_process.yml @@ -31,7 +31,7 @@ outputs: logs: output: filename: output.log - pattern: | | | | + pattern: | | | | timelogs: monitoring: diff --git a/scripts/process_logs/example_track_req.yml b/scripts/process_logs/example_track_req.yml index 5c86ad5810..4c503a2866 100644 --- a/scripts/process_logs/example_track_req.yml +++ b/scripts/process_logs/example_track_req.yml @@ -10,27 +10,19 @@ input_logs: # List of input logs chains: # List of processing chains main: # Main processing chain - match: [replica: master] - - tag_requests: - - match: [tag: REQUEST] + - track_requests: + - match: request - log line: output - stat_received: - - stat_already_processed: - stat_ordered: stat_received: - - match: [tag: RECEIVED] - - log count: { stats: received } + - match: [request: received] - log time: { stats: received } - drop: - stat_already_processed: - - match: [tag: ALREADY_PROCESSED] - - log count: { stats: already_processed } - - drop: - stat_ordered: - - match: [tag: ORDERED] - - log count: { stats: ordered } + - match: [request: ordered] - log time: { stats: ordered } - drop: @@ -44,8 +36,3 @@ outputs: graphs: received: blue ordered: green - - counters: - stats: - format: | - requests: received, already_processed, ordered diff --git a/scripts/process_logs/example_track_single_req.yml b/scripts/process_logs/example_track_single_req.yml new file mode 100644 index 0000000000..05c463bb18 --- /dev/null +++ b/scripts/process_logs/example_track_single_req.yml @@ -0,0 +1,19 @@ + +input_logs: # List of input logs + - path: . # Where to look for log files + recursive: yes # Whether to descend into subdirectories recursively + pattern: (Node\d+)\.log # Log file regex pattern to look for + node_group: 1 # Group number that matches node identifier + only_timestamped: yes # Whether to discard non-timestamped lines + + +chains: # List of processing chains + main: # Main processing chain + - track_requests: + - match: [reqId: 1521518081149711394] + - log line: output + +outputs: + logs: # List of output log sinks + output: # View change log sink + filename: output_.log # Filename where to put messages diff --git a/scripts/process_logs/example_view_change.yml b/scripts/process_logs/example_view_change.yml index 0885596c85..95d427bce0 100644 --- a/scripts/process_logs/example_view_change.yml +++ b/scripts/process_logs/example_view_change.yml @@ -17,13 +17,13 @@ chains: # List of processing chains - log_view_change_done: # Call chain log_view_change_done log_view_change_start: - - match: [view_change_start] # Return unless message is view change start + - match: view_change_start # Return unless message is view change start - log line: view_change_start_done # Save message to view_change_start_done log sink - log time: {view_change: start} # Add message to view_change-start timelog sink - drop: # Drop message, since we'll no longer need it log_view_change_done: - - match: [view_change_done] # Return unless message is view change done + - match: view_change_done # Return unless message is view change done - log line: view_change_start_done # Save message to view_change_start_done log sink - log time: {view_change: done} # Add message to view_change-done timelog sink - drop: # Drop message, since we'll no longer need it @@ -32,10 +32,10 @@ outputs: logs: # List of output log sinks view_change: # View change log sink filename: view_change.log # Filename where to put messages - pattern: | | # Message format + pattern: | | # Message format view_change_start_done: # View change start/done log sink filename: view_change_sd_.log # Filename where to put messages - pattern: | | # Message format + pattern: | | # Message format timelogs: # List of timelog view_change: # View changes timelog diff --git a/scripts/process_logs/process_logs b/scripts/process_logs/process_logs index 997e24d225..c6e1ac93a9 100755 --- a/scripts/process_logs/process_logs +++ b/scripts/process_logs/process_logs @@ -1,9 +1,10 @@ #!/usr/bin/env python3 -import os, sys, re, yaml +import os, sys, re, gzip, yaml from collections import namedtuple from datetime import datetime, timedelta from string import Formatter +from ast import literal_eval from multiprocessing import Pool import matplotlib.pyplot as plt @@ -79,25 +80,28 @@ def input_logs(): # Log message ########################################################################################### -REPLICA_NONE="-" +REPLICA_NONE = "-" + class LogMessage: - def __init__(self, message, node=None, replica=REPLICA_NONE, timestamp=None, level=None, source=None, func=None): - self.message = message + def __init__(self, body, node=None, replica=REPLICA_NONE, timestamp=None, level=None, source=None, func=None): + self.body = body self.timestamp = timestamp self.node = node self.replica = replica self.level = level self.source = source self.func = func - self.tags = set() self.attributes = {} - def set_tag(self, name): - self.tags.add(name) - - def set_attribute(self, name, value): - self.attributes[name] = value + def set_attribute(self, name, value=None): + if value: + try: + self.attributes[name].add(value) + except KeyError: + self.attributes[name] = set([value]) + else: + self.attributes.setdefault(name, set()) _replica_matcher = re.compile("^REPLICA:\((\w+):(\d+)\)").search @@ -118,18 +122,21 @@ def _parse_messages(f, node): minute=int(tokens[0][14:16]), second=int(tokens[0][17:19]), microsecond=int(tokens[0][20:23]) * 1000) - message = tokens[4] + body = tokens[4] replica = REPLICA_NONE - if message.startswith("REPLICA:"): - m = _replica_matcher(message) + if body.startswith("REPLICA:"): + m = _replica_matcher(body) replica = int(m.group(2)) - message = message[m.end():] - yield LogMessage(message, node, replica, timestamp, tokens[1], tokens[2], tokens[3]) + body = body[m.end():] + yield LogMessage(body, node, replica, timestamp, tokens[1], tokens[2], tokens[3]) def messages_in_log(log): print("Processing {}...".format(log.filename)) - with open(log.filename, "r") as f: + open_fn = open + if log.filename.endswith(".gz"): + open_fn = gzip.open + with open_fn(log.filename, "rt") as f: if log.rule["only_timestamped"]: for message in _parse_messages(f, log.node): if message.timestamp is not None: @@ -210,7 +217,7 @@ def match_message(pattern): m = re.compile(pattern).search def match(message): - return m(message.message) is not None + return m(message.body) is not None return match @@ -225,16 +232,14 @@ def match_replica(replica): return lambda message: message.replica == replica -def match_tag(tag): - return lambda message: tag in message.tags - - def match_attribute(params): name, value = kv_from_item(params) + if value is None: + return lambda message: name in message.attributes def match(message): try: - return message.attributes[name] == value + return str(value) in message.attributes[name] except KeyError: return False @@ -254,10 +259,6 @@ def _create_matcher(config): return match_message(params) if name == "replica": return match_replica(params) - if name == "tag": - return match_tag(params) - if name == "attribute": - return match_attribute(params) if name == "any": matchers = [_create_matcher(p) for p in params] return lambda message: any(m(message) for m in matchers) @@ -265,12 +266,11 @@ def _create_matcher(config): matchers = [_create_matcher(p) for p in params] return lambda message: all(m(message) for m in matchers) - try: - params = global_config["matchers"][name] + params = global_config["matchers"].get(name) + if params is not None: return _create_matcher({"any": params}) - except KeyError: - print("WARNING: Unknown matcher", name) - return lambda m: True + + return match_attribute(config) ########################################################################################### @@ -286,6 +286,8 @@ def rule_process(chain): def rule_match(operation, condition, action, config): + if isinstance(config, str): + config = [config] matchers = [_create_matcher(matcher) for matcher in config] if operation == "all": @@ -318,17 +320,20 @@ def rule_timeshift(params): def rule_tag(params): match = re.compile(params.get("pattern", "")).search - tags = params.get("tags", []) attributes = params.get("attributes", {}) def process(message, output): - m = match(message.message) + m = match(message.body) if m is None: return - for tag in tags: - message.set_tag(tag) - for name, idx in attributes.items(): - message.set_attribute(name, m.group(idx)) + for name, value in attributes.items(): + if value is None: + message.set_attribute(name) + return + if isinstance(value, str) and value.startswith("group "): + message.set_attribute(name, m.group(int(value[6:]))) + return + message.set_attribute(name, value) return process @@ -360,6 +365,10 @@ def rule_log_count(target): return process +def track_requests(message, output): + output.requests.process_message(message) + + def _create_rule(config): name, params = kv_from_item(config) @@ -380,6 +389,8 @@ def _create_rule(config): return rule_log_line(params) if name == "log count": return rule_log_count(params) + if name == "track_requests": + return track_requests return rule_process(name) @@ -439,7 +450,7 @@ class OutputLog: def __init__(self, config): self.filename = parse_format_string(config.get("filename", "output.log")) self.pattern = parse_format_string( - config.get("pattern", " | | | | ")) + config.get("pattern", " | | | | ")) self.log_files = {} def add_message(self, message): @@ -619,6 +630,153 @@ class LogCounter: return node_counter +########################################################################################### +# Request tracking facilities +########################################################################################### + +def _merge_timestamps(self, other): + if not self: + return other + if not other: + return self + return min(self, other) + + +class RequestData: + def __init__(self): + self.received = None + self.ordered = None + + def set_received(self, timestamp): + self.received = _merge_timestamps(self.received, timestamp) + + def set_ordered(self, timestamp): + self.ordered = _merge_timestamps(self.ordered, timestamp) + + @property + def is_received(self): + return self.received is not None + + @property + def is_ordered(self): + return self.ordered is not None + + @property + def time_to_order(self): + return self.ordered - self.received + + def merge(self, other): + self.received = _merge_timestamps(self.received, other.received) + self.ordered = _merge_timestamps(self.ordered, other.ordered) + + +class NodeRequestData: + def __init__(self): + self.requests = {} + self._match_req_id = re.compile("'reqId': (\d+)").search + self._pattern_req_list = "\[(?:\('\w+', \d+\)(?:, )?)*\]" + self._match_ordered_req_list = re.compile("requests ordered ({})".format(self._pattern_req_list)).search + self._match_discarded_req_list = re.compile("discarded ({})".format(self._pattern_req_list)).search + + def process_message(self, message): + self._set_attributes(message) + if self._check_received(message): + return + if self._check_already_processed(message): + return + if self._check_batch_ordered(message): + return + + def merge(self, other): + for name, request in other.requests.items(): + self._request(name).merge(request) + + def dump(self, name): + received = set(id for id, req in self.requests.items() if req.is_received) + ordered = set(id for id, req in self.requests.items() if req.is_ordered) + time_to_order = [self.requests[id].time_to_order.total_seconds() for id in received & ordered] + if time_to_order: + print("{}: {}/{} received/ordered, {}/{}/{} min/avg/max seconds to process".format( + name, len(self.requests), len(ordered), + round(min(time_to_order), 2), + round(sum(time_to_order) / len(time_to_order), 2), + round(max(time_to_order), 2))) + else: + print("{}: {}/{} received/ordered".format(name, len(self.requests), len(ordered))) + + def _request(self, name): + try: + return self.requests[name] + except: + request = RequestData() + self.requests[name] = request + return request + + def _parse_reqId(self, message): + if "'reqId':" not in message.body: + return + m = self._match_req_id(message.body) + if not m: return + return m.group(1) + + def _set_attributes(self, message): + reqId = self._parse_reqId(message) + if reqId: message.set_attribute("reqId", reqId) + + def _check_received(self, message): + if "received client request" not in message.body: + return + message.set_attribute("request", "received") + reqId = self._parse_reqId(message) + self._request(reqId).set_received(message.timestamp) + return True + + def _check_already_processed(self, message): + if "returning REPLY from already processed REQUEST" not in message.body: + return + message.set_attribute("request", "already_processed") + return True + + def _check_batch_ordered(self, message): + if "ordered batch request" not in message.body: + return + message.set_attribute("request", "ordered") + ordered = self._match_ordered_req_list(message.body) + ordered = literal_eval(ordered.group(1)) + for _, reqId in ordered: + reqId = str(reqId) + message.set_attribute("reqId", reqId) + self._request(reqId).set_ordered(message.timestamp) + return True + + +class AllRequestData: + def __init__(self): + self.nodes = {} + + def process_message(self, message): + self._node(message.node).process_message(message) + + def merge(self, other): + for name, node in other.nodes.items(): + self._node(name).merge(node) + + def dump(self): + if not self.nodes: + return + print("Requests statistics:") + for name, node in self.nodes.items(): + node.dump(name) + + def _node(self, node): + try: + return self.nodes[node] + except KeyError: + node_data = NodeRequestData() + self.nodes[node] = node_data + return node_data + + ########################################################################################### # Output data ########################################################################################### @@ -629,6 +787,7 @@ class OutputData: self.logs = {name: OutputLog(params) for name, params in config.get("logs", {}).items()} self.timelogs = {name: TimeLog(params) for name, params in config.get("timelogs", {}).items()} self.counters = {name: LogCounter(params) for name, params in config.get("counters", {}).items()} + self.requests = AllRequestData() def merge(self, other): for name in set(self.logs) & set(other.logs): @@ -637,8 +796,10 @@ class OutputData: self.timelogs[name].merge(other.timelogs[name]) for name in set(self.counters) & set(other.counters): self.counters[name].merge(other.counters[name]) + self.requests.merge(other.requests) def dump(self): + self.requests.dump() for log in self.logs.values(): log.dump() for name, timelog in self.timelogs.items(): diff --git a/scripts/process_logs/process_logs.yml b/scripts/process_logs/process_logs.yml index df650ac997..ccfd14c8f9 100644 --- a/scripts/process_logs/process_logs.yml +++ b/scripts/process_logs/process_logs.yml @@ -159,46 +159,19 @@ chains: - match: [message_sending] - tag: pattern: sending message (\w+) - tags: [MESSAGE, SEND] attributes: - message_type: 1 + message: send + message_type: group 1 tag_message_inbox: - match: [message_inbox] - tag: pattern: appending to nodeInbox (\w+) - tags: [MESSAGE, INBOX] attributes: - message_type: 1 + message: inbox + message_type: group 1 -######################################################################### -# Request taggers -######################################################################### - - tag_requests: - - tag_received_requests: - - tag_already_processed_requests: - - tag_ordered_requests: - - tag_received_requests: - - match: [message: received client request] - - tag: {tags: [REQUEST, RECEIVED]} - - tag: - pattern: "'reqId': (\\d+)" - attributes: {reqId: 1} - - tag_already_processed_requests: - - match: [message: returning REPLY from already processed REQUEST] - - tag: {tags: [REQUEST, ALREADY_PROCESSED]} - - tag: - pattern: "'reqId': (\\d+)" - attributes: {reqId: 1} - - tag_ordered_requests: - - match: [message: ordered batch request] - - tag: {tags: [REQUEST, ORDERED]} - ######################################################################### # To be done filters #########################################################################