Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some early benchmark tests #4

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ clean:
find . -type f -regex ".*\.py[co]$$" -delete
find . -type d -name "__pycache__" -delete
rm -rf dispatcher.egg-info/

benchmark:
py.test tests/benchmark/ --benchmark-columns=mean,min,max,stddev,rounds
12 changes: 12 additions & 0 deletions dispatcher/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import multiprocessing
import os
from types import SimpleNamespace

from dispatcher.utils import DuplicateBehavior, MessageAction
from dispatcher.worker.task import work_loop
Expand Down Expand Up @@ -91,6 +92,12 @@ def __init__(self, num_workers, fd_lock=None):
self.management_lock = asyncio.Lock()
self.fd_lock = fd_lock or asyncio.Lock()

self.events = self._create_events()

def _create_events(self):
"Benchmark tests have to re-create this because they use same object in different event loops"
return SimpleNamespace(queue_cleared=asyncio.Event())

async def start_working(self, dispatcher):
self.read_results_task = asyncio.create_task(self.read_results_forever())
self.read_results_task.add_done_callback(dispatcher.fatal_error_callback)
Expand Down Expand Up @@ -257,11 +264,16 @@ async def dispatch_task(self, message):
self.management_event.set() # kick manager task to start auto-scale up

async def drain_queue(self):
work_done = False
while requeue_message := self.get_unblocked_message():
if (not self.get_free_worker()) or self.shutting_down:
return
self.queued_messages.remove(requeue_message)
await self.dispatch_task(requeue_message)
work_done = True

if work_done:
self.events.queue_cleared.set()

async def process_finished(self, worker, message):
uuid = message.get('uuid', '<unknown>')
Expand Down
3 changes: 2 additions & 1 deletion dispatcher/worker/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ def work_loop(worker_id, queue, finished_queue):
result = worker.perform_work(message)

# Indicate that the task is finished by putting a message in the finished_queue
finished_queue.put(worker.get_finished_message(result, message, time_started))
to_send = worker.get_finished_message(result, message, time_started)
finished_queue.put(to_send)

finished_queue.put(worker.get_shutdown_message())
logger.debug(f'Worker {worker_id} informed the pool manager that we have exited')
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,6 @@ line_length = 160

[project.optional-dependencies]
pg_notify = ["psycopg[binary]"]

[tool.pytest.ini_options]
log_cli_level = "DEBUG"
1 change: 1 addition & 0 deletions test_work/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .math import fibonacci
17 changes: 17 additions & 0 deletions test_work/math.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from dispatcher.publish import task


# demo values of n in seconds
# 30 - 0.052
# 29 - 0.035
# 28 - 0.024
# 27 - 0.015
# 26 - 0.012
# 25 - 0.0097

@task(queue='test_channel')
def fibonacci(n):
if n <= 1:
return n
else:
return fibonacci(n-1) + fibonacci(n-2)
160 changes: 160 additions & 0 deletions tests/benchmark/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
import asyncio
import contextlib
import multiprocessing
import time

import pytest

from dispatcher.brokers.pg_notify import get_connection
from dispatcher.main import DispatcherMain


class PoolServer:
"""Before you read more, know there are 3 contexts involved.

This produces a method to be passed to pytest-benchmark.
That method has to be ran inside a context manager,
which will run (and stop) the relevant dispatcher code in a background process.
"""

def run_benchmark_test(self, queue_in, queue_out, times):
print(f'submitting message to pool server {times}')
queue_in.put(str(times))
print('waiting for reply message from pool server')
message_in = queue_out.get()
print(f'finished running round with {times} messages, got: {message_in}')
if message_in == 'error':
raise Exception('Test subprocess runner exception, look back in logs')

@classmethod
async def run_pool(cls, queue_in, queue_out, workers, function='lambda: __import__("time").sleep(0.01)'):
dispatcher = DispatcherMain({"producers": {"brokers": {}}, "pool": {"max_workers": workers}})
pool = dispatcher.pool
await pool.start_working(dispatcher)

print('waiting for message to start test')
loop = asyncio.get_event_loop()
while True:
print('pool server listening on queue_in')
message = await loop.run_in_executor(None, queue_in.get)
print(f'pool server got message {message}')
if message == 'stop':
print('shutting down pool server')
pool.shutdown()
break
else:
times = int(message.strip())
print('creating cleared event task')
cleared_event = asyncio.create_task(pool.events.queue_cleared.wait())
print('creating tasks for submissions')
submissions = [pool.dispatch_task({'task': function, 'uuid': str(i)}) for i in range(times)]
print('awaiting submission task')
await asyncio.gather(*submissions)
print('waiting for cleared event')
await cleared_event
pool.events.queue_cleared.clear()
await loop.run_in_executor(None, queue_out.put, 'done')
print('exited forever loop of pool server')

@classmethod
def run_pool_loop(cls, queue_in, queue_out, workers, **kwargs):
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(cls.run_pool(queue_in, queue_out, workers, **kwargs))
except Exception:
import traceback

traceback.print_exc()
# We are in a subprocess here, so even if we handle the exception
# the main process will not know and still wait forever
# so give them a kick on our way out
print('sending error message after error')
queue_out.put('error')
finally:
print('closing asyncio loop')
loop.close()
print('finished closing async loop')

def start_server(self, workers, **kwargs):
self.queue_in = multiprocessing.Queue()
self.queue_out = multiprocessing.Queue()
process = multiprocessing.Process(target=self.run_pool_loop, args=(self.queue_in, self.queue_out, workers), kwargs=kwargs)
process.start()
return process

@contextlib.contextmanager
def with_server(self, *args, **kwargs):
process = self.start_server(*args, **kwargs)
try:
yield self
finally:
self.queue_in.put('stop')
process.terminate() # SIGTERM
# Poll to close process resources, due to race condition where it is not still running
for i in range(3):
time.sleep(0.1)
try:
process.close()
break
except Exception:
if i == 2:
raise


# List of channels to listen on
CHANNELS = ['test_channel', 'test_channel2', 'test_channel2']

# Database connection details
CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777"


class FullServer(PoolServer):
def run_benchmark_test(self, queue_in, queue_out, times):
print('sending wakeup message to set new clear event')
queue_in.put('wake')
print('sending pg_notify messages')
function = 'lambda: __import__("time").sleep(0.01)'
conn = get_connection({"conninfo": CONNECTION_STRING})
with conn.cursor() as cur:
for i in range(times):
cur.execute(f"SELECT pg_notify('test_channel', '{function}');")
print('waiting for reply message from pool server')
message_in = queue_out.get()
print(f'finished running round with {times} messages, got: {message_in}')

@classmethod
async def run_pool(cls, queue_in, queue_out, workers):
dispatcher = DispatcherMain(
{"producers": {"brokers": {"pg_notify": {"conninfo": CONNECTION_STRING}, "channels": CHANNELS}}, "pool": {"max_workers": workers}}
)
await dispatcher.start_working()

print('waiting for message to start test')
loop = asyncio.get_event_loop()
while True:
print('pool server listening on queue_in')
message = await loop.run_in_executor(None, queue_in.get)
print(f'pool server got message {message}')
if message == 'stop':
print('shutting down server')
dispatcher.shutdown()
break
print('creating cleared event task')
cleared_event = asyncio.create_task(dispatcher.pool.events.queue_cleared.wait())
print('waiting for cleared event')
await cleared_event
dispatcher.pool.events.queue_cleared.clear()
await loop.run_in_executor(None, queue_out.put, 'done')
print('exited forever loop of pool server')


@pytest.fixture
def with_pool_server():
server_thing = PoolServer()
return server_thing.with_server


@pytest.fixture
def with_full_server():
server_thing = FullServer()
return server_thing.with_server
28 changes: 28 additions & 0 deletions tests/benchmark/pool/test_clear_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os
import sys

import pytest


@pytest.mark.benchmark(group="by_task")
@pytest.mark.parametrize('times', [1, 10, 100, 1000])
def test_clear_sleep_by_task_number(benchmark, times, with_pool_server):
with with_pool_server(4, function='lambda: __import__("time").sleep(0.01)') as pool_server:
benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, times)


@pytest.mark.benchmark(group="by_worker_sleep")
@pytest.mark.parametrize('workers', [1, 4, 12, 24, 50, 75])
def test_clear_sleep_by_worker_count(benchmark, workers, with_pool_server):
with with_pool_server(workers, function='lambda: __import__("time").sleep(0.01)') as pool_server:
benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, 100)


@pytest.mark.benchmark(group="by_worker_math")
@pytest.mark.parametrize('workers', [1, 4, 12, 24, 50, 75])
def test_clear_math_by_worker_count(benchmark, workers, with_pool_server):
root_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))))
sys.path.append(root_dir)

with with_pool_server(workers, function='lambda: __import__("test_work.math").fibonacci(26)') as pool_server:
benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, 100)
13 changes: 13 additions & 0 deletions tests/benchmark/test_full_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import pytest


@pytest.mark.benchmark(group="by_system")
def test_clear_time_with_full_server(benchmark, with_full_server):
with with_full_server(4) as server:
benchmark(server.run_benchmark_test, server.queue_in, server.queue_out, 100)


@pytest.mark.benchmark(group="by_system")
def test_clear_time_with_only_pool(benchmark, with_pool_server):
with with_pool_server(4) as pool_server:
benchmark(pool_server.run_benchmark_test, pool_server.queue_in, pool_server.queue_out, 100)
1 change: 0 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from dispatcher.main import DispatcherMain


# List of channels to listen on
CHANNELS = ['test_channel', 'test_channel2', 'test_channel2']

Expand Down
1 change: 0 additions & 1 deletion tests/integration/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from dispatcher.brokers.pg_notify import publish_message


# List of channels to listen on
CHANNELS = ['test_channel', 'test_channel2', 'test_channel2']

Expand Down
15 changes: 15 additions & 0 deletions tests/unit/test_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import asyncio

import pytest

from dispatcher.pool import WorkerPool


@pytest.mark.asyncio
async def test_no_op_task():
pool = WorkerPool(1)
await pool.start_working()
cleared_task = asyncio.create_task(pool.events.queue_cleared.wait())
await pool.dispatch_task('lambda: None')
await cleared_task
await pool.shutdown()
9 changes: 9 additions & 0 deletions tools/write_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

from test_methods import print_hello, sleep_function, sleep_discard

import os, sys

sys.path.append(os.path.dirname(os.path.dirname(__file__)))

from test_work.math import fibonacci

# Database connection details
CONNECTION_STRING = "dbname=dispatch_db user=dispatch password=dispatching host=localhost port=55777"

Expand All @@ -33,6 +39,9 @@ def main():
# Send the notification
publish_message(channel, message, config={'conninfo': CONNECTION_STRING})
# await send_notification(channel, message)

fibonacci.apply_async(args=[29], config={'conninfo': CONNECTION_STRING})

# send more than number of workers quickly
print('')
print('writing 15 messages fast')
Expand Down
Loading