-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathARedisMutex.php
executable file
·199 lines (180 loc) · 4.92 KB
/
ARedisMutex.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
<?php
/**
* Represents a redis mutex.
*
* Simple usage:
* <pre>
* $mutex = new ARedisMutex("someOperation");
* $mutex->block(); // blocks execution until the resource becomes available
* // do something
* $mutex->unlock(); // release the lock
* </pre>
*
* With events:
* <pre>
* $mutex = new ARedisMutex("someOperation");
* $mutex->afterLock = function(CEvent $event) {
* echo "Locked!\n";
* // do some processing here
* $event->sender->unlock(); // finally, unlock the mutex
* };
* $mutex->afterUnlock = function(CEvent $event) {
* echo "Unlocked!";
* }
* $mutex->block(); // triggers appropriate events when the resource becomes available
* </pre>
* @author Charles Pick
* @package packages.redis
*/
class ARedisMutex extends ARedisEntity {
/**
* The number of seconds before this mutex will automatically expire
* @var integer
*/
public $expiresAfter = 10;
/**
* The number of micro seconds to sleep for between poll requests.
* Defaults to half a second.
* @var integer
*/
public $pollDelay = 500000;
/**
* The time the mutex expires at
* @var integer
*/
protected $_expiresAt;
/**
* Attempts to lock the mutex, returns true if successful or false if the mutex is locked by another process.
* @return boolean whether the lock was successful or not
*/
public function lock() {
if (!$this->beforeLock()) {
return false;
}
$redis = $this->getConnection()->getClient();
if (!$redis->setnx($this->name, $this->getExpiresAt(true))) {
// see if this mutex has expired
$value = $redis->get($this->name);
if ($value > microtime(true)) {
return false;
}
else {
//prevent a race codition if another process finds it expired at the same time
$current = $redis->getset($this->name, $this->getExpiresAt(true));
if ($value != $current){
return false;
}
}
}
$this->afterLock();
return true;
}
/**
* Attempts to unlock the mutex, returns true if successful, or false if the mutex is in use by another process
* @return boolean whether the unlock was successful or not
*/
public function unlock() {
if (!$this->beforeUnlock()) {
return false;
}
$redis = $this->getConnection()->getClient();
$value = $redis->get($this->name);
$decimalPlaces = max(strlen(substr($value,strpos($value,"."))),strlen(substr($this->_expiresAt,strpos($this->_expiresAt,".")))) - 1;
if (bccomp($value,$this->_expiresAt,$decimalPlaces) == -1 && bccomp($value,microtime(true),$decimalPlaces) == 1) {
return false;
}
$redis->delete($this->name);
$this->afterUnlock();
return true;
}
/**
* Blocks program execution until the lock becomes available
* @return ARedisMutex $this after the lock is opened
*/
public function block() {
while($this->lock() === false) {
usleep($this->pollDelay);
}
return $this;
}
/**
* Invoked before the mutex is locked.
* The default implementation raises the onBeforeLock event
* @return boolean true if the lock should continue
*/
public function beforeLock() {
$event = new CModelEvent();
$event->sender = $this;
$this->onBeforeLock($event);
return $event->isValid;
}
/**
* Invoked after the mutex is locked.
* The default implementation raises the onAfterLock event
*/
public function afterLock() {
$event = new CEvent;
$event->sender = $this;
$this->onAfterLock($event);
}
/**
* Invoked before the mutex is unlocked.
* The default implementation raises the onBeforeUnlock event
* @return boolean true if the unlock should continue
*/
public function beforeUnlock() {
$event = new CModelEvent;
$event->sender = $this;
$this->onBeforeUnlock($event);
return $event->isValid;
}
/**
* Invoked after the mutex is unlocked.
* The default implementation raises the onAfterUnlock event
*/
public function afterUnlock() {
$event = new CEvent;
$event->sender = $this;
$this->onAfterUnlock($event);
}
/**
* Raises the onBeforeLock event
* @param CEvent $event the event to raise
*/
public function onBeforeLock($event) {
$this->raiseEvent("onBeforeLock",$event);
}
/**
* Raises the onAfterLock event
* @param CEvent $event the event to raise
*/
public function onAfterLock($event) {
$this->raiseEvent("onAfterLock",$event);
}
/**
* Raises the onBeforeUnlock event
* @param CEvent $event the event to raise
*/
public function onBeforeUnlock($event) {
$this->raiseEvent("onBeforeUnlock",$event);
}
/**
* Raises the onAfterUnlock event
* @param CEvent $event the event to raise
*/
public function onAfterUnlock($event) {
$this->raiseEvent("onAfterUnlock",$event);
}
/**
* Gets the time the mutex expires
* @param boolean $forceRecalculate whether to force recalculation or not
* @return float the time the mutex expires
*/
public function getExpiresAt($forceRecalculate = false)
{
if ($forceRecalculate || $this->_expiresAt === null) {
$this->_expiresAt = $this->expiresAfter + microtime(true);
}
return $this->_expiresAt;
}
}