Skip to content

Commit

Permalink
Merge pull request #33 from nordic-institute/develop
Browse files Browse the repository at this point in the history
X-Road Metrics Release 1.1.0
  • Loading branch information
wisecrow authored Mar 6, 2023
2 parents b9cdbd4 + 9ed9e47 commit 4e77c9b
Show file tree
Hide file tree
Showing 58 changed files with 3,787 additions and 735 deletions.
4 changes: 4 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Contributing to X-Road Metrics

The X-Road Metrics extension project follows the same general guidelines that the core X-Road project uses, please
follow [this link](https://github.com/nordic-institute/X-Road/blob/develop/CONTRIBUTING.md) to learn about them.
6 changes: 6 additions & 0 deletions anonymizer_module/etc/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ mongodb:
host: localhost
user: <FILL>
password: <FILL>
# set to True to enable secure connection
tls:
# path to CA pem file
tls-ca-file:

postgres:
host: localhost
Expand All @@ -83,6 +87,8 @@ postgres:
database-name: <FILL>
table-name: logs
buffer-size: 10000
ssl-mode:
ssl-root-cert:
readonly-users:
- <FILL>
- <FILL>
Expand Down
6 changes: 5 additions & 1 deletion anonymizer_module/opmon_anonymizer/iio/mongodbmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ def __init__(self, settings, logger):
xroad = settings['xroad']['instance']
self._logger = logger

self.client = MongoClient(self.get_mongo_uri(settings))
connect_args = {
'tls': bool(settings['mongodb'].get('tls')),
'tlsCAFile': settings['mongodb'].get('tls-ca-file'),
}
self.client = MongoClient(self.get_mongo_uri(settings), **connect_args)
self.query_db = self.client[f"query_db_{xroad}"]
self.state_db = self.client[f"anonymizer_state_{xroad}"]

Expand Down
31 changes: 17 additions & 14 deletions anonymizer_module/opmon_anonymizer/iio/postgresql_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ def __init__(self, postgres_settings, table_schema, index_columns, logger):
self._table_name = postgres_settings['table-name']
self._readonly_users = postgres_settings['readonly-users']
self._connection_string = self._get_connection_string()
self._connect_args = {
'sslmode': postgres_settings.get('ssl-mode'),
'sslrootcert': postgres_settings.get('ssl-root-cert')
}

self._field_order = [field_name for field_name, _ in table_schema]
if table_schema:
Expand All @@ -48,7 +52,7 @@ def add_data(self, data):
for datum in data:
datum['requestInDate'] = datetime.fromtimestamp(datum['requestInTs'] / 1000).strftime('%Y-%m-%d')

with pg.connect(self._connection_string) as connection:
with pg.connect(self._connection_string, **self._connect_args) as connection:
cursor = connection.cursor()
query = self._generate_insert_query(cursor, data)
cursor.execute(query)
Expand All @@ -73,7 +77,7 @@ def _generate_insert_query(self, cursor, data):

def is_alive(self):
try:
with pg.connect(self._connection_string) as connection:
with pg.connect(self._connection_string, **self._connect_args) as connection:
pass
return True

Expand All @@ -86,7 +90,7 @@ def is_alive(self):

def _ensure_table(self, schema, index_columns):
try:
with pg.connect(self._connection_string) as connection:
with pg.connect(self._connection_string, **self._connect_args) as connection:
cursor = connection.cursor()
if not self._table_exists(cursor):
self._create_table(cursor, schema, index_columns)
Expand All @@ -99,13 +103,13 @@ def _ensure_table(self, schema, index_columns):
raise

def _table_exists(self, cursor):
cursor.execute(f"""
cursor.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = 'public'
AND table_name = '{self._table_name}'
AND table_name = %s
);
""")
""", (cursor._table_name,))

return cursor.fetchone()[0]

Expand All @@ -121,18 +125,17 @@ def _create_table(self, cursor, schema, index_columns):

def _ensure_privileges(self):
try:
with pg.connect(self._connection_string) as connection:
with pg.connect(self._connection_string, **self._connect_args) as connection:
cursor = connection.cursor()

for readonly_user in self._readonly_users:
try:
cursor.execute("GRANT USAGE ON SCHEMA public TO {readonly_user};".format(**{
'readonly_user': readonly_user
}))
cursor.execute("GRANT SELECT ON {table_name} TO {readonly_user};".format(**{
'table_name': self._table_name,
'readonly_user': readonly_user
}))
cursor.execute(
"GRANT USAGE ON SCHEMA public TO %s;", (readonly_user,)
)
cursor.execute(
"GRANT SELECT ON %s TO %s;", (self._table_name, readonly_user)
)
except Exception:
pass # Privileges existed

Expand Down
9 changes: 5 additions & 4 deletions anonymizer_module/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@
from setuptools import setup, find_packages

requirements = [
"pymongo==3.10.1",
"pyyaml==5.3.1",
"psycopg2==2.8.6",
"python-dateutil==2.8.1",
'setuptools==67.4.0',
'pymongo==3.10.1',
'pyyaml==5.4.1',
'psycopg2==2.8.6',
'python-dateutil==2.8.1',
]

classifiers = [
Expand Down
40 changes: 34 additions & 6 deletions collector_module/etc/settings.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Default settings file for X-Road Metrics Collector-module.
# Fill in your MongoDB and X-Road configuration.
#
#
# To run collector for many different X-Road instances, you can create settings
# profiles. For example to have profiles DEV, TEST and PROD copy this file to
# settings_DEV.yaml, settings_TEST.yaml and settings_PROD.yaml.
Expand All @@ -26,11 +26,11 @@ collector:
# 1 year = 31536000
records-from-offset: 31536000
records-to-offset: 100

# Repeat query to fetch additional data only if server has returned at least as much records.
# By default servers should return 10000 records, so this value should be smaller.
repeat-min-records: 50

# How many times to repeat query if server has more records ("nextRecordsFrom" is returned by previous query).
# Set to 0 to disable query repeating.
# If this value is too low and script is executed rarely then some data may be lost.
Expand All @@ -41,6 +41,24 @@ collector:
# Only one collector instance can be running at a time.
pid-directory: /var/run/xroad-metrics/collector

# Write collected documents to files in addition to database if documents-log-directory is provided and not empty.
# Make sure this directory is writable by xroad-metrics user.
# Collector will create subdirectories "<Instance>/<Year>/<Month>/Day" for logs collected during that day.
# Collector will not automatically remove older logs.
# documents-log-directory: /var/lib/collector
documents-log-directory:

# If document writing to files is enabled then python RotatingFileHandler is used to write documents to log files.
# documents-log-file-size (RotatingFileHandler maxBytes parameter) sets maximum allowed file size in bytes.
# documents-log-max-files (RotatingFileHandler backupCount parameter) sets maximum count of log backup files.
# Rotated files will have a suffix ".<n>".
# Limits are applied separately for each server, because every server has its own log file.
# If log file size and count limiting is not required then set both parameters to "0" to disable log rotation.
# documents-log-file-size: 100000000
# documents-log-max-files: 100
documents-log-file-size: 0
documents-log-max-files: 0

xroad:
instance: <FILL>

Expand All @@ -49,13 +67,19 @@ xroad:
protocol: http://
host: <FILL>
timeout: 10

# Security server used to contact
security-server:
protocol: http://
host: <FILL>
timeout: 60.0

# path to client's certificate
tls-client-certificate:
# path to client's private key
tls-client-key:
# path to server's certificate
tls-server-certificate:

# X-Road service configuration used to fetch operational monitoring requests.
monitoring-client:
memberclass: <FILL>
Expand All @@ -66,11 +90,15 @@ mongodb:
host: localhost
user: <FILL>
password: <FILL>
# set to True to enable secure connection
tls:
# path to CA pem file
tls-ca-file:

logger:
name: collector
module: collector

# Possible logging levels from least to most verbose are:
# CRITICAL, FATAL, ERROR, WARNING, INFO, DEBUG
level: INFO
Expand Down
55 changes: 53 additions & 2 deletions collector_module/opmon_collector/collector_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
# THE SOFTWARE.

import json
import logging
from logging.handlers import RotatingFileHandler
import datetime
import os
import multiprocessing
import re
import uuid
Expand Down Expand Up @@ -54,6 +58,8 @@ def work(self):
try:
response = self._request_opmon_data()
self.records = self._parse_attachment(response)
if self.settings['collector'].get('documents-log-directory', ''):
self._store_records_to_file()
self._store_records_to_database()
self.batch_start = self._parse_next_records_from_response(response) or self.batch_end
self.server_m.set_next_records_timestamp(self.server_key, self.batch_start)
Expand Down Expand Up @@ -117,7 +123,15 @@ def _request_opmon_data(self):
sec_server_settings = self.settings['xroad']['security-server']
url = sec_server_settings['protocol'] + sec_server_settings['host']
timeout = sec_server_settings['timeout']
response = requests.post(url, data=body, headers=headers, timeout=timeout)
client_cert = (
sec_server_settings.get('tls-client-certificate'),
sec_server_settings.get('tls-client-key')
)
server_cert = sec_server_settings.get('tls-server-certificate')
response = requests.post(
url, data=body, headers=headers, timeout=timeout,
cert=client_cert, verify=server_cert
)
response.raise_for_status()
return response
except Exception as e:
Expand All @@ -138,7 +152,44 @@ def _parse_attachment(self, opmon_response):
self.log_warn("Cannot parse response attachment.", '')
raise e

def _store_records_to_database(self):
def _get_records_logger(self) -> logging.Logger:
host_name = re.sub('[^0-9a-zA-Z.-]+', '.', self.server_data['server'])
records_logger = logging.getLogger(host_name)

now = datetime.datetime.now()
log_path = f"{self.settings['collector']['documents-log-directory']}" \
f"/{self.settings['xroad']['instance']}/{now.year:04d}/{now.month:02d}/{now.day:02d}/"

base_filename = log_path + host_name + '.log'
rotating_host_handlers = [
handler for handler in records_logger.handlers
if isinstance(handler, RotatingFileHandler)
and handler.baseFilename == base_filename
]
if not rotating_host_handlers:
if not os.path.exists(log_path):
os.makedirs(log_path)

handler = RotatingFileHandler(
base_filename,
maxBytes=self.settings['collector'].get('documents-log-file-size') or 0,
backupCount=self.settings['collector'].get('documents-log-max-files') or 0
)

records_logger.setLevel(logging.INFO)
records_logger.addHandler(handler)
return records_logger

def _store_records_to_file(self) -> None:
if len(self.records):
self.log_info(f'Appending {len(self.records)} documents to log file.')
records_logger = self._get_records_logger()
for record in self.records:
records_logger.info(json.dumps(record, separators=(',', ':')))
else:
self.log_warn('No documents to append to log file!', '')

def _store_records_to_database(self) -> None:
if len(self.records):
self.log_info(f"Adding {len(self.records)} documents.")
try:
Expand Down
14 changes: 9 additions & 5 deletions collector_module/opmon_collector/database_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ def __init__(self, mongo_settings, xroad_instance, logger_manager):
self.db_collector_state = f'collector_state_{xroad_instance}'
self.collector_id = f'collector_{xroad_instance}'
self.logger_m = logger_manager
self.connect_args = {
'tls': bool(mongo_settings.get('tls')),
'tlsCAFile': mongo_settings.get('tls-ca-file'),
}

@staticmethod
def get_mongo_uri(mongo_settings):
Expand All @@ -51,7 +55,7 @@ def get_timestamp():

def save_server_list_to_database(self, server_list):
try:
client = pymongo.MongoClient(self.mongo_uri)
client = pymongo.MongoClient(self.mongo_uri, **self.connect_args)
db = client[self.db_collector_state]
collection = db['server_list']
data = dict()
Expand All @@ -68,7 +72,7 @@ def get_server_list_from_database(self):
Get the most recent server list from MongoDB
"""
try:
client = pymongo.MongoClient(self.mongo_uri)
client = pymongo.MongoClient(self.mongo_uri, **self.connect_args)
db = client[self.db_collector_state]
data = db['server_list'].find({'collector_id': self.collector_id}).sort([('timestamp', -1)]).limit(1)[0]
return data['server_list'], data['timestamp']
Expand All @@ -80,7 +84,7 @@ def get_next_records_timestamp(self, server_key, records_from_offset):
""" Returns next records_from pointer for the given server
"""
try:
client = pymongo.MongoClient(self.mongo_uri)
client = pymongo.MongoClient(self.mongo_uri, **self.connect_args)
db = client[self.db_collector_state]
collection = db['collector_pointer']
cur = collection.find_one({'server': server_key})
Expand All @@ -101,7 +105,7 @@ def get_next_records_timestamp(self, server_key, records_from_offset):

def set_next_records_timestamp(self, server_key, records_from):
try:
client = pymongo.MongoClient(self.mongo_uri)
client = pymongo.MongoClient(self.mongo_uri, **self.connect_args)
db = client[self.db_collector_state]
collection = db['collector_pointer']

Expand All @@ -124,7 +128,7 @@ def set_next_records_timestamp(self, server_key, records_from):

def insert_data_to_raw_messages(self, data_list):
try:
client = pymongo.MongoClient(self.mongo_uri)
client = pymongo.MongoClient(self.mongo_uri, **self.connect_args)
db = client[self.db_name]
raw_msg = db['raw_messages']
# Add timestamp to data list
Expand Down
20 changes: 20 additions & 0 deletions collector_module/opmon_collector/tests/test_collector_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,26 @@ def test_collector_worker_work_max_repeats(mock_server_manager, basic_data, mock
assert worker.status == CollectorWorker.Status.DATA_AVAILABLE


@responses.activate
@pytest.mark.parametrize('documents_log_dir, num_records_logged_to_file', [('Test', 5230), (None, 0)])
def test_collector_worker_logs_to_file(documents_log_dir, num_records_logged_to_file,
mock_server_manager, basic_data, mock_response_contents, caplog):
responses.add(responses.POST, 'http://x-road-ss', body=mock_response_contents[0], status=200)

basic_data['settings']['collector']['documents-log-directory'] = documents_log_dir
basic_data['settings']['collector']['repeat-limit'] = 1

worker = CollectorWorker(basic_data)
result, error = worker.work()

if error is not None:
raise error
records = mock_server_manager.insert_data_to_raw_messages.call_args_list[0][0][0]

assert len(records) == 5230
assert len(caplog.records) == num_records_logged_to_file


def test_worker_status(mock_server_manager, basic_data):
worker = CollectorWorker(basic_data)

Expand Down
Loading

0 comments on commit 4e77c9b

Please sign in to comment.