Skip to content

Commit

Permalink
Merge branch 'jobBasedQueueOptions' of https://github.com/themsaid/fr…
Browse files Browse the repository at this point in the history
…amework into themsaid-jobBasedQueueOptions
  • Loading branch information
taylorotwell committed Nov 4, 2016
2 parents 0bb98dc + 5377009 commit b85d3a9
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 30 deletions.
44 changes: 44 additions & 0 deletions src/Illuminate/Queue/Jobs/Job.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ abstract class Job
*/
protected $instance;

/**
* The command instance.
*
* @var mixed
*/
protected $command;

/**
* The IoC container instance.
*
Expand Down Expand Up @@ -234,6 +241,43 @@ public function payload()
return json_decode($this->getRawBody(), true);
}

/**
* The underlying command.
*
* @return mixed
*/
public function getCommand()
{
if ($this->command) {
return $this->command;
}

$payload = $this->payload();

return $this->command = isset($payload['data']['command'])
? unserialize($payload['data']['command']) : null;
}

/**
* The number of times to attempt a job.
*
* @return int
*/
public function retries()
{
return $this->getCommand() ? $this->getCommand()->retries : null;
}

/**
* The number of seconds the job can run.
*
* @return int
*/
public function timeout()
{
return $this->getCommand() ? $this->getCommand()->timeout : null;
}

/**
* Get the name of the queue the job belongs to.
*
Expand Down
88 changes: 58 additions & 30 deletions src/Illuminate/Queue/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,14 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
$lastRestart = $this->getTimestampOfLastQueueRestart();

while (true) {
$this->registerTimeoutHandler($options);
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

if ($this->daemonShouldRun()) {
$this->runNextJob($connectionName, $queue, $options);
$this->registerTimeoutHandler($job, $options);

if ($job && $this->daemonShouldRun()) {
$this->runJob($job, $connectionName, $options);
} else {
$this->sleep($options->sleep);
}
Expand All @@ -88,15 +92,18 @@ public function daemon($connectionName, $queue, WorkerOptions $options)
/**
* Register the worker timeout handler (PHP 7.1+).
*
* @param \Illuminate\Contracts\Queue\Job|null $job
* @param WorkerOptions $options
* @return void
*/
protected function registerTimeoutHandler(WorkerOptions $options)
protected function registerTimeoutHandler($job, WorkerOptions $options)
{
if (version_compare(PHP_VERSION, '7.1.0') < 0 || ! extension_loaded('pcntl')) {
return;
}

$timeout = $job && $job->timeout() !== null ? $job->timeout() : $options->timeout;

pcntl_async_signals(true);

pcntl_signal(SIGALRM, function () {
Expand All @@ -105,7 +112,7 @@ protected function registerTimeoutHandler(WorkerOptions $options)
exit(1);
});

pcntl_alarm($options->timeout + $options->sleep);
pcntl_alarm($timeout + $options->sleep);
}

/**
Expand Down Expand Up @@ -137,27 +144,40 @@ protected function daemonShouldRun()
* @return void
*/
public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);

// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->runJob($job, $connectionName, $options);
}

$this->sleep($options->sleep);
}

/**
* Process the given job.
*
* @param \Illuminate\Contracts\Queue\Job $job
* @param string $connectionName
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*/
protected function runJob($job, $connectionName, WorkerOptions $options)
{
try {
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
return $this->process(
$connectionName, $job, $options
);

// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
// for the specified number of seconds, then keep processing jobs after sleep.
if ($job) {
return $this->process(
$connectionName, $job, $options
);
}
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
$this->exceptions->report(new FatalThrowableError($e));
}

$this->sleep($options->sleep);
}

/**
Expand All @@ -169,10 +189,16 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
*/
protected function getNextJob($connection, $queue)
{
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
try {
foreach (explode(',', $queue) as $queue) {
if (! is_null($job = $connection->pop($queue))) {
return $job;
}
}
} catch (Exception $e) {
$this->exceptions->report($e);
} catch (Throwable $e) {
$this->exceptions->report(new FatalThrowableError($e));
}
}

Expand Down Expand Up @@ -255,6 +281,8 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti
*/
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = $job->retries() !== null ? $job->retries() : $maxTries;

if ($maxTries === 0 || $job->attempts() <= $maxTries) {
return;
}
Expand All @@ -280,6 +308,8 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $
protected function markJobAsFailedIfHasExceededMaxAttempts(
$connectionName, $job, $maxTries, $e
) {
$maxTries = $job->retries() !== null ? $job->retries() : $maxTries;

if ($maxTries === 0 || $job->attempts() < $maxTries) {
return;
}
Expand All @@ -301,16 +331,14 @@ protected function failJob($connectionName, $job, $e)
return;
}

try {
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();
// If the job has failed, we will delete it, call the "failed" method and then call
// an event indicating the job has failed so it can be logged if needed. This is
// to allow every developer to better keep monitor of their failed queue jobs.
$job->delete();

$job->failed($e);
} finally {
$this->raiseFailedJobEvent($connectionName, $job, $e);
}
$job->failed($e);

$this->raiseFailedJobEvent($connectionName, $job, $e);
}

/**
Expand Down
22 changes: 22 additions & 0 deletions tests/Queue/QueueWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ public function test_job_is_failed_if_it_has_already_exceeded_max_attempts()
$this->events->shouldNotHaveReceived('fire', [Mockery::type(JobProcessed::class)]);
}

public function test_job_based_max_retries()
{
$job = new WorkerFakeJob(function ($job) {
$job->attempts++;
});
$job->attempts = 2;

$job->retries = 10;

$worker = $this->getWorker('default', ['queue' => [$job]]);
$worker->runNextJob('default', 'queue', $this->workerOptions(['maxTries' => 1]));

$this->assertFalse($job->deleted);
$this->assertNull($job->failedWith);
}

/**
* Helpers...
*/
Expand Down Expand Up @@ -223,6 +239,7 @@ class WorkerFakeJob
public $callback;
public $deleted = false;
public $releaseAfter;
public $retries;
public $attempts = 0;
public $failedWith;

Expand All @@ -243,6 +260,11 @@ public function payload()
return [];
}

public function retries()
{
return $this->retries;
}

public function delete()
{
$this->deleted = true;
Expand Down

0 comments on commit b85d3a9

Please sign in to comment.