Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Multiple Schedulers #212

Merged
merged 3 commits into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 39 additions & 12 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import signal
import time
import os
import socket
from uuid import uuid4

from datetime import datetime
from itertools import repeat
Expand All @@ -18,13 +21,15 @@


class Scheduler(object):
redis_scheduler_namespace_prefix = 'rq:scheduler_instance:'
scheduler_key = 'rq:scheduler'
scheduler_lock_key = 'rq:scheduler_lock'
scheduled_jobs_key = 'rq:scheduler:scheduled_jobs'
queue_class = Queue
job_class = Job

def __init__(self, queue_name='default', queue=None, interval=60, connection=None,
job_class=None, queue_class=None):
job_class=None, queue_class=None, name=None):
from rq.connections import resolve_connection
self.connection = resolve_connection(connection)
self._queue = queue
Expand All @@ -38,14 +43,25 @@ def __init__(self, queue_name='default', queue=None, interval=60, connection=Non
self.job_class = backend_class(self, 'job_class', override=job_class)
self.queue_class = backend_class(self, 'queue_class',
override=queue_class)
self.name = name or uuid4().hex

@property
def key(self):
"""Returns the schedulers Redis hash key."""
return self.redis_scheduler_namespace_prefix + self.name

@property
def pid(self):
"""The current process ID."""
return os.getpid()

def register_birth(self):
self.log.info('Registering birth')
if self.connection.exists(self.scheduler_key) and \
not self.connection.hexists(self.scheduler_key, 'death'):
raise ValueError("There's already an active RQ scheduler")
if self.connection.exists(self.key) and \
not self.connection.hexists(self.key, 'death'):
raise ValueError("There's already an active RQ scheduler named: {0!r}".format(self.name))

key = self.scheduler_key
key = self.key
now = time.time()

with self.connection.pipeline() as p:
Expand All @@ -61,8 +77,8 @@ def register_death(self):
"""Registers its own death."""
self.log.info('Registering death')
with self.connection.pipeline() as p:
p.hset(self.scheduler_key, 'death', time.time())
p.expire(self.scheduler_key, 60)
p.hset(self.key, 'death', time.time())
p.expire(self.key, 60)
p.execute()

def acquire_lock(self):
Expand All @@ -72,7 +88,7 @@ def acquire_lock(self):

This function returns True if a lock is acquired. False otherwise.
"""
key = '%s_lock' % self.scheduler_key
key = self.scheduler_lock_key
now = time.time()
expires = int(self._interval) + 10
self._lock_acquired = self.connection.set(
Expand All @@ -83,10 +99,12 @@ def remove_lock(self):
"""
Remove acquired lock.
"""
key = '%s_lock' % self.scheduler_key
key = self.scheduler_lock_key

if self._lock_acquired:
self.connection.delete(key)
self._lock_acquired = False
self.log.debug('{}: Lock Removed'.format(self.key))

def _install_signal_handlers(self):
"""
Expand Down Expand Up @@ -387,11 +405,17 @@ def enqueue_jobs(self):
jobs = self.get_jobs_to_queue()
for job in jobs:
self.enqueue_job(job)

# Refresh scheduler key's expiry
self.connection.expire(self.scheduler_key, int(self._interval) + 10)

return jobs

def heartbeat(self):
"""Refreshes schedulers key, typically by extending the
expiration time of the scheduler, effectively making this a "heartbeat"
to not expire the scheduler until the timeout passes.
"""
self.log.debug('{}: Sending a HeartBeat'.format(self.key))
self.connection.expire(self.key, int(self._interval) + 10)

def run(self, burst=False):
"""
Periodically check whether there's any job that should be put in the queue (score
Expand All @@ -404,10 +428,13 @@ def run(self, burst=False):
try:
while True:
self.log.debug("Entering run loop")
self.heartbeat()

start_time = time.time()
if self.acquire_lock():
self.log.debug('{}: Acquired Lock'.format(self.key))
self.enqueue_jobs()
self.heartbeat()
self.remove_lock()

if burst:
Expand Down
50 changes: 45 additions & 5 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def test_acquire_lock(self):
interval so it automatically expires if scheduler is unexpectedly
terminated.
"""
key = '%s_lock' % Scheduler.scheduler_key
key = Scheduler.scheduler_lock_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler = Scheduler(connection=self.testconn, interval=20)
self.assertTrue(scheduler.acquire_lock())
Expand All @@ -56,7 +56,7 @@ def test_no_two_schedulers_acquire_lock(self):
same time. When removing the lock, only the scheduler which
originally acquired the lock can remove the lock.
"""
key = '%s_lock' % Scheduler.scheduler_key
key = Scheduler.scheduler_lock_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler2 = Scheduler(connection=self.testconn, interval=20)
Expand All @@ -68,6 +68,48 @@ def test_no_two_schedulers_acquire_lock(self):
scheduler1.remove_lock()
self.assertNotIn(key, tl(self.testconn.keys('*')))

def test_multiple_schedulers_are_running_simultaneously(self):
"""
Even though only 1 Schedulder holds the lock and performs the scheduling.
Multiple schedulders are still registered to take over in case the original
scheduler goes down.
"""
lock_key = Scheduler.scheduler_lock_key
self.assertNotIn(lock_key, tl(self.testconn.keys('*')))
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler2 = Scheduler(connection=self.testconn, interval=20)
scheduler1.register_birth()
self.assertIn(scheduler1.key, tl(self.testconn.keys('*')))
scheduler2.register_birth()
self.assertIn(scheduler2.key, tl(self.testconn.keys('*')))
scheduler1.acquire_lock()
scheduler2.acquire_lock()
self.assertIn(scheduler1.key, tl(self.testconn.keys('*')))
self.assertIn(scheduler2.key, tl(self.testconn.keys('*')))

def test_lock_handover_between_multiple_schedulers(self):
lock_key = Scheduler.scheduler_lock_key
self.assertNotIn(lock_key, tl(self.testconn.keys('*')))
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler2 = Scheduler(connection=self.testconn, interval=20)
scheduler1.register_birth()
scheduler1.acquire_lock()
scheduler2.register_birth()
scheduler2.acquire_lock()
# Both schedulers are still active/registered
self.assertIn(scheduler1.key, tl(self.testconn.keys('*')))
self.assertIn(scheduler2.key, tl(self.testconn.keys('*')))
scheduler1.remove_lock()
self.assertNotIn(lock_key, tl(self.testconn.keys('*')))
scheduler2.acquire_lock()
self.assertIn(lock_key, tl(self.testconn.keys('*')))

def test_same_scheduler_cant_register_multiple_times(self):
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler1.register_birth()
self.assertIn(scheduler1.key, tl(self.testconn.keys('*')))
self.assertRaises(ValueError, scheduler1.register_birth)

def test_create_job(self):
"""
Ensure that jobs are created properly.
Expand Down Expand Up @@ -632,9 +674,7 @@ def test_small_float_interval(self):
"""
Test that scheduler accepts 'interval' of type float, less than 1 second.
"""
key = Scheduler.scheduler_key
lock_key = '%s_lock' % Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
lock_key = Scheduler.scheduler_lock_key
scheduler = Scheduler(connection=self.testconn, interval=0.1) # testing interval = 0.1 second
self.assertEqual(scheduler._interval, 0.1)

Expand Down