From 1e1303863b1ba967e5a900bbe8e4e862682e562e Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 17 Oct 2024 15:56:17 -0400 Subject: [PATCH 1/2] Add benchmark tests Adopt new error handling patterns done elsewhere Propoerly parameterize the worker number --- Makefile | 3 + dispatcher/pool.py | 17 +++ dispatcher/worker/task.py | 3 +- pyproject.toml | 3 + test_work/__init__.py | 1 + test_work/math.py | 17 +++ tests/benchmark/conftest.py | 160 ++++++++++++++++++++++++ tests/benchmark/pool/test_clear_time.py | 28 +++++ tests/benchmark/test_full_server.py | 13 ++ tests/conftest.py | 1 - tests/integration/test_main.py | 1 - tests/unit/test_pool.py | 15 +++ tools/write_messages.py | 9 ++ 13 files changed, 268 insertions(+), 3 deletions(-) create mode 100644 test_work/__init__.py create mode 100644 test_work/math.py create mode 100644 tests/benchmark/conftest.py create mode 100644 tests/benchmark/pool/test_clear_time.py create mode 100644 tests/benchmark/test_full_server.py create mode 100644 tests/unit/test_pool.py diff --git a/Makefile b/Makefile index 2eb18f2..3756229 100644 --- a/Makefile +++ b/Makefile @@ -14,3 +14,6 @@ clean: find . -type f -regex ".*\.py[co]$$" -delete find . -type d -name "__pycache__" -delete rm -rf dispatcher.egg-info/ + +benchmark: + py.test tests/benchmark/ --benchmark-columns=mean,min,max,stddev,rounds diff --git a/dispatcher/pool.py b/dispatcher/pool.py index 60b2575..5dca408 100644 --- a/dispatcher/pool.py +++ b/dispatcher/pool.py @@ -2,6 +2,7 @@ import logging import multiprocessing import os +from types import SimpleNamespace from dispatcher.utils import DuplicateBehavior, MessageAction from dispatcher.worker.task import work_loop @@ -91,6 +92,12 @@ def __init__(self, num_workers, fd_lock=None): self.management_lock = asyncio.Lock() self.fd_lock = fd_lock or asyncio.Lock() + self.events = self._create_events() + + def _create_events(self): + "Benchmark tests have to re-create this because they use same object in different event loops" + return SimpleNamespace(queue_cleared=asyncio.Event()) + async def start_working(self, dispatcher): self.read_results_task = asyncio.create_task(self.read_results_forever()) self.read_results_task.add_done_callback(dispatcher.fatal_error_callback) @@ -281,6 +288,16 @@ async def process_finished(self, worker, message): worker.mark_finished_task() self.finished_count += 1 + if self.queued_messages: + if not self.shutting_down: + requeue_message = self.queued_messages.pop() + await self.dispatch_task(requeue_message) + logger.debug('submitted work due to finishing other work') + else: + if not any(worker.current_task for worker in self.workers.values()): + self.events.queue_cleared.set() + logger.debug('work queue has been cleared') + async def read_results_forever(self): """Perpetual task that continuously waits for task completions.""" loop = asyncio.get_event_loop() diff --git a/dispatcher/worker/task.py b/dispatcher/worker/task.py index 6cfeba1..67bf2d9 100644 --- a/dispatcher/worker/task.py +++ b/dispatcher/worker/task.py @@ -223,7 +223,8 @@ def work_loop(worker_id, queue, finished_queue): result = worker.perform_work(message) # Indicate that the task is finished by putting a message in the finished_queue - finished_queue.put(worker.get_finished_message(result, message, time_started)) + to_send = worker.get_finished_message(result, message, time_started) + finished_queue.put(to_send) finished_queue.put(worker.get_shutdown_message()) logger.debug(f'Worker {worker_id} informed the pool manager that we have exited') diff --git a/pyproject.toml b/pyproject.toml index faa9314..54ce80c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,3 +49,6 @@ line_length = 160 [project.optional-dependencies] pg_notify = ["psycopg[binary]"] + +[tool.pytest.ini_options] +log_cli_level = "DEBUG" diff --git a/test_work/__init__.py b/test_work/__init__.py new file mode 100644 index 0000000..e29d9d1 --- /dev/null +++ b/test_work/__init__.py @@ -0,0 +1 @@ +from .math import fibonacci \ No newline at end of file diff --git a/test_work/math.py b/test_work/math.py new file mode 100644 index 0000000..8243608 --- /dev/null +++ b/test_work/math.py @@ -0,0 +1,17 @@ +from dispatcher.publish import task + + +# demo values of n in seconds +# 30 - 0.052 +# 29 - 0.035 +# 28 - 0.024 +# 27 - 0.015 +# 26 - 0.012 +# 25 - 0.0097 + +@task(queue='test_channel') +def fibonacci(n): + if n <= 1: + return n + else: + return fibonacci(n-1) + fibonacci(n-2) diff --git a/tests/benchmark/conftest.py b/tests/benchmark/conftest.py new file mode 100644 index 0000000..ac0479a --- /dev/null +++ b/tests/benchmark/conftest.py @@ -0,0 +1,160 @@ +import asyncio +import contextlib +import multiprocessing +import time + +import pytest + +from dispatcher.brokers.pg_notify import get_connection +from dispatcher.main import DispatcherMain + + +class PoolServer: + """Before you read more, know there are 3 contexts involved. + + This produces a method to be passed to pytest-benchmark. + That method has to be ran inside a context manager, + which will run (and stop) the relevant dispatcher code in a background process. + """ + + def run_benchmark_test(self, queue_in, queue_out, times): + print(f'submitting message to pool server {times}') + queue_in.put(str(times)) + print('waiting for reply message from pool server') + message_in = queue_out.get() + print(f'finished running round with {times} messages, got: {message_in}') + if message_in == 'error': + raise Exception('Test subprocess runner exception, look back in logs') + + @classmethod + async def run_pool(cls, queue_in, queue_out, workers, function='lambda: __import__("time").sleep(0.01)'): + dispatcher = DispatcherMain({"producers": {"brokers": {}}, "pool": {"max_workers": workers}}) + pool = dispatcher.pool + await pool.start_working(dispatcher) + + print('waiting for message to start test') + loop = asyncio.get_event_loop() + while True: + print('pool server listening on queue_in') + message = await loop.run_in_executor(None, queue_in.get) + print(f'pool server got message {message}') + if message == 'stop': + print('shutting down pool server') + pool.shutdown() + break + else: + times = int(message.strip()) + print('creating cleared event task') + cleared_event = asyncio.create_task(pool.events.queue_cleared.wait()) + print('creating tasks for submissions') + submissions = [pool.dispatch_task({'task': function, 'uuid': str(i)}) for i in range(times)] + print('awaiting submission task') + await asyncio.gather(*submissions) + print('waiting for cleared event') + await cleared_event + pool.events.queue_cleared.clear() + await loop.run_in_executor(None, queue_out.put, 'done') + print('exited forever loop of pool server') + + @classmethod + def run_pool_loop(cls, queue_in, queue_out, workers, **kwargs): + loop = asyncio.get_event_loop() + try: + loop.run_until_complete(cls.run_pool(queue_in, queue_out, workers, **kwargs)) + except Exception: + import traceback + + traceback.print_exc() + # We are in a subprocess here, so even if we handle the exception + # the main process will not know and still wait forever + # so give them a kick on our way out + print('sending error message after error') + queue_out.put('error') + finally: + print('closing asyncio loop') + loop.close() + print('finished closing async loop') + + def start_server(self, workers, **kwargs): + self.queue_in = multiprocessing.Queue() + self.queue_out = multiprocessing.Queue() + process = multiprocessing.Process(target=self.run_pool_loop, args=(self.queue_in, self.queue_out, workers), kwargs=kwargs) + process.start() + return process + + @contextlib.contextmanager + def with_server(self, *args, **kwargs): + process = self.start_server(*args, **kwargs) + try: + yield self + finally: + self.queue_in.put('stop') + process.terminate() # SIGTERM + # Poll to close process resources, due to race condition where it is not still running + for i in range(3): + time.sleep(0.1) + try: + process.close() + break + except Exception: + if i == 2: + raise + + +# List of channels to listen on +CHANNELS = ['test_channel', 'test_channel2', 'test_channel2'] + +# Database connection details +CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777" + + +class FullServer(PoolServer): + def run_benchmark_test(self, queue_in, queue_out, times): + print('sending wakeup message to set new clear event') + queue_in.put('wake') + print('sending pg_notify messages') + function = 'lambda: __import__("time").sleep(0.01)' + conn = get_connection({"conninfo": CONNECTION_STRING}) + with conn.cursor() as cur: + for i in range(times): + cur.execute(f"SELECT pg_notify('test_channel', '{function}');") + print('waiting for reply message from pool server') + message_in = queue_out.get() + print(f'finished running round with {times} messages, got: {message_in}') + + @classmethod + async def run_pool(cls, queue_in, queue_out, workers): + dispatcher = DispatcherMain( + {"producers": {"brokers": {"pg_notify": {"conninfo": CONNECTION_STRING}, "channels": CHANNELS}}, "pool": {"max_workers": workers}} + ) + await dispatcher.start_working() + + print('waiting for message to start test') + loop = asyncio.get_event_loop() + while True: + print('pool server listening on queue_in') + message = await loop.run_in_executor(None, queue_in.get) + print(f'pool server got message {message}') + if message == 'stop': + print('shutting down server') + dispatcher.shutdown() + break + print('creating cleared event task') + cleared_event = asyncio.create_task(dispatcher.pool.events.queue_cleared.wait()) + print('waiting for cleared event') + await cleared_event + dispatcher.pool.events.queue_cleared.clear() + await loop.run_in_executor(None, queue_out.put, 'done') + print('exited forever loop of pool server') + + +@pytest.fixture +def with_pool_server(): + server_thing = PoolServer() + return server_thing.with_server + + +@pytest.fixture +def with_full_server(): + server_thing = FullServer() + return server_thing.with_server diff --git a/tests/benchmark/pool/test_clear_time.py b/tests/benchmark/pool/test_clear_time.py new file mode 100644 index 0000000..4e5202e --- /dev/null +++ b/tests/benchmark/pool/test_clear_time.py @@ -0,0 +1,28 @@ +import os +import sys + +import pytest + + +@pytest.mark.benchmark(group="by_task") +@pytest.mark.parametrize('times', [1, 10, 100, 1000]) +def test_clear_sleep_by_task_number(benchmark, times, with_pool_server): + with with_pool_server(4, function='lambda: __import__("time").sleep(0.01)') as pool_server: + benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, times) + + +@pytest.mark.benchmark(group="by_worker_sleep") +@pytest.mark.parametrize('workers', [1, 4, 12, 24, 50, 75]) +def test_clear_sleep_by_worker_count(benchmark, workers, with_pool_server): + with with_pool_server(workers, function='lambda: __import__("time").sleep(0.01)') as pool_server: + benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, 100) + + +@pytest.mark.benchmark(group="by_worker_math") +@pytest.mark.parametrize('workers', [1, 4, 12, 24, 50, 75]) +def test_clear_math_by_worker_count(benchmark, workers, with_pool_server): + root_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))) + sys.path.append(root_dir) + + with with_pool_server(workers, function='lambda: __import__("test_work.math").fibonacci(26)') as pool_server: + benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, 100) diff --git a/tests/benchmark/test_full_server.py b/tests/benchmark/test_full_server.py new file mode 100644 index 0000000..d74da90 --- /dev/null +++ b/tests/benchmark/test_full_server.py @@ -0,0 +1,13 @@ +import pytest + + +@pytest.mark.benchmark(group="by_system") +def test_clear_time_with_full_server(benchmark, with_full_server): + with with_full_server(4) as server: + benchmark(server.run_benchmark_test, server.queue_in, server.queue_out, 100) + + +@pytest.mark.benchmark(group="by_system") +def test_clear_time_with_only_pool(benchmark, with_pool_server): + with with_pool_server(4) as pool_server: + benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, 100) diff --git a/tests/conftest.py b/tests/conftest.py index d9c8ca9..c1ef2b3 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,6 @@ from dispatcher.main import DispatcherMain - # List of channels to listen on CHANNELS = ['test_channel', 'test_channel2', 'test_channel2'] diff --git a/tests/integration/test_main.py b/tests/integration/test_main.py index 2259638..901f39d 100644 --- a/tests/integration/test_main.py +++ b/tests/integration/test_main.py @@ -4,7 +4,6 @@ from dispatcher.brokers.pg_notify import publish_message - # List of channels to listen on CHANNELS = ['test_channel', 'test_channel2', 'test_channel2'] diff --git a/tests/unit/test_pool.py b/tests/unit/test_pool.py new file mode 100644 index 0000000..95f71c0 --- /dev/null +++ b/tests/unit/test_pool.py @@ -0,0 +1,15 @@ +import asyncio + +import pytest + +from dispatcher.pool import WorkerPool + + +@pytest.mark.asyncio +async def test_no_op_task(): + pool = WorkerPool(1) + await pool.start_working() + cleared_task = asyncio.create_task(pool.events.queue_cleared.wait()) + await pool.dispatch_task('lambda: None') + await cleared_task + await pool.shutdown() diff --git a/tools/write_messages.py b/tools/write_messages.py index bcb21eb..22910a7 100644 --- a/tools/write_messages.py +++ b/tools/write_messages.py @@ -16,6 +16,12 @@ from test_methods import print_hello, sleep_function, sleep_discard +import os, sys + +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +from test_work.math import fibonacci + # Database connection details CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777" @@ -33,6 +39,9 @@ def main(): # Send the notification publish_message(channel, message, config={'conninfo': CONNECTION_STRING}) # await send_notification(channel, message) + + fibonacci.apply_async(args=[29], config={'conninfo': CONNECTION_STRING}) + # send more than number of workers quickly print('') print('writing 15 messages fast') From 2a162bb772ae8a8dc81d2c1be5c13d42ebe904df Mon Sep 17 00:00:00 2001 From: AlanCoding Date: Thu, 19 Dec 2024 14:25:10 -0500 Subject: [PATCH 2/2] Move event trigger to drain_queue method --- dispatcher/pool.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/dispatcher/pool.py b/dispatcher/pool.py index 5dca408..2739190 100644 --- a/dispatcher/pool.py +++ b/dispatcher/pool.py @@ -264,11 +264,16 @@ async def dispatch_task(self, message): self.management_event.set() # kick manager task to start auto-scale up async def drain_queue(self): + work_done = False while requeue_message := self.get_unblocked_message(): if (not self.get_free_worker()) or self.shutting_down: return self.queued_messages.remove(requeue_message) await self.dispatch_task(requeue_message) + work_done = True + + if work_done: + self.events.queue_cleared.set() async def process_finished(self, worker, message): uuid = message.get('uuid', '') @@ -288,16 +293,6 @@ async def process_finished(self, worker, message): worker.mark_finished_task() self.finished_count += 1 - if self.queued_messages: - if not self.shutting_down: - requeue_message = self.queued_messages.pop() - await self.dispatch_task(requeue_message) - logger.debug('submitted work due to finishing other work') - else: - if not any(worker.current_task for worker in self.workers.values()): - self.events.queue_cleared.set() - logger.debug('work queue has been cleared') - async def read_results_forever(self): """Perpetual task that continuously waits for task completions.""" loop = asyncio.get_event_loop()