1
1
import logging
2
2
import signal
3
3
import time
4
+ import os
5
+ import socket
6
+ from uuid import uuid4
4
7
5
8
from datetime import datetime
6
9
from itertools import repeat
18
21
19
22
20
23
class Scheduler (object ):
24
+ redis_scheduler_namespace_prefix = 'rq:scheduler_instance:'
21
25
scheduler_key = 'rq:scheduler'
26
+ scheduler_lock_key = 'rq:scheduler_lock'
22
27
scheduled_jobs_key = 'rq:scheduler:scheduled_jobs'
23
28
queue_class = Queue
24
29
job_class = Job
25
30
26
31
def __init__ (self , queue_name = 'default' , queue = None , interval = 60 , connection = None ,
27
- job_class = None , queue_class = None ):
32
+ job_class = None , queue_class = None , name = None ):
28
33
from rq .connections import resolve_connection
29
34
self .connection = resolve_connection (connection )
30
35
self ._queue = queue
@@ -38,14 +43,25 @@ def __init__(self, queue_name='default', queue=None, interval=60, connection=Non
38
43
self .job_class = backend_class (self , 'job_class' , override = job_class )
39
44
self .queue_class = backend_class (self , 'queue_class' ,
40
45
override = queue_class )
46
+ self .name = name or uuid4 ().hex
47
+
48
+ @property
49
+ def key (self ):
50
+ """Returns the schedulers Redis hash key."""
51
+ return self .redis_scheduler_namespace_prefix + self .name
52
+
53
+ @property
54
+ def pid (self ):
55
+ """The current process ID."""
56
+ return os .getpid ()
41
57
42
58
def register_birth (self ):
43
59
self .log .info ('Registering birth' )
44
- if self .connection .exists (self .scheduler_key ) and \
45
- not self .connection .hexists (self .scheduler_key , 'death' ):
46
- raise ValueError ("There's already an active RQ scheduler" )
60
+ if self .connection .exists (self .key ) and \
61
+ not self .connection .hexists (self .key , 'death' ):
62
+ raise ValueError ("There's already an active RQ scheduler named: {0!r}" . format ( self . name ) )
47
63
48
- key = self .scheduler_key
64
+ key = self .key
49
65
now = time .time ()
50
66
51
67
with self .connection .pipeline () as p :
@@ -61,8 +77,8 @@ def register_death(self):
61
77
"""Registers its own death."""
62
78
self .log .info ('Registering death' )
63
79
with self .connection .pipeline () as p :
64
- p .hset (self .scheduler_key , 'death' , time .time ())
65
- p .expire (self .scheduler_key , 60 )
80
+ p .hset (self .key , 'death' , time .time ())
81
+ p .expire (self .key , 60 )
66
82
p .execute ()
67
83
68
84
def acquire_lock (self ):
@@ -72,7 +88,7 @@ def acquire_lock(self):
72
88
73
89
This function returns True if a lock is acquired. False otherwise.
74
90
"""
75
- key = '%s_lock' % self .scheduler_key
91
+ key = self .scheduler_lock_key
76
92
now = time .time ()
77
93
expires = int (self ._interval ) + 10
78
94
self ._lock_acquired = self .connection .set (
@@ -83,10 +99,12 @@ def remove_lock(self):
83
99
"""
84
100
Remove acquired lock.
85
101
"""
86
- key = '%s_lock' % self .scheduler_key
102
+ key = self .scheduler_lock_key
87
103
88
104
if self ._lock_acquired :
89
105
self .connection .delete (key )
106
+ self ._lock_acquired = False
107
+ self .log .debug ('{}: Lock Removed' .format (self .key ))
90
108
91
109
def _install_signal_handlers (self ):
92
110
"""
@@ -397,11 +415,17 @@ def enqueue_jobs(self):
397
415
jobs = self .get_jobs_to_queue ()
398
416
for job in jobs :
399
417
self .enqueue_job (job )
400
-
401
- # Refresh scheduler key's expiry
402
- self .connection .expire (self .scheduler_key , int (self ._interval ) + 10 )
418
+
403
419
return jobs
404
420
421
+ def heartbeat (self ):
422
+ """Refreshes schedulers key, typically by extending the
423
+ expiration time of the scheduler, effectively making this a "heartbeat"
424
+ to not expire the scheduler until the timeout passes.
425
+ """
426
+ self .log .debug ('{}: Sending a HeartBeat' .format (self .key ))
427
+ self .connection .expire (self .key , int (self ._interval ) + 10 )
428
+
405
429
def run (self , burst = False ):
406
430
"""
407
431
Periodically check whether there's any job that should be put in the queue (score
@@ -414,10 +438,13 @@ def run(self, burst=False):
414
438
try :
415
439
while True :
416
440
self .log .debug ("Entering run loop" )
441
+ self .heartbeat ()
417
442
418
443
start_time = time .time ()
419
444
if self .acquire_lock ():
445
+ self .log .debug ('{}: Acquired Lock' .format (self .key ))
420
446
self .enqueue_jobs ()
447
+ self .heartbeat ()
421
448
self .remove_lock ()
422
449
423
450
if burst :
0 commit comments