diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 994d9e865f..c170dc47e5 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -162,10 +162,10 @@ protected function reserve($timeout) protected function moveExpired($from) { $now = time(); - if ($expired = $this->redis->zrevrangebyscore($from, $now, '-inf')) { + if ($expired = $this->redis->zrangebyscore($from, '-inf', $now)) { $this->redis->zremrangebyscore($from, '-inf', $now); foreach ($expired as $id) { - $this->redis->rpush("$this->channel.waiting", $id); + $this->redis->lpush("$this->channel.waiting", $id); } } } diff --git a/tests/app/HeavyJob.php b/tests/app/HeavyJob.php new file mode 100644 index 0000000000..170994885e --- /dev/null +++ b/tests/app/HeavyJob.php @@ -0,0 +1,34 @@ + + */ +class HeavyJob extends BaseObject implements JobInterface +{ + public $uid; + public $load; + + public function execute($queue) + { + file_put_contents($this->getFileName(), ''); + sleep($this->load); + } + + public function getFileName() + { + return Yii::getAlias("@runtime/job-{$this->uid}.lock"); + } +} diff --git a/tests/drivers/TestCase.php b/tests/drivers/TestCase.php index f79ce68e90..d2927ff42d 100644 --- a/tests/drivers/TestCase.php +++ b/tests/drivers/TestCase.php @@ -7,6 +7,7 @@ namespace tests\drivers; +use tests\app\HeavyJob; use Yii; use tests\app\SimpleJob; use yii\queue\Queue; @@ -33,6 +34,18 @@ protected function createSimpleJob() ]); } + /** + * @param int $load + * @return HeavyJob + */ + protected function createHeavyJob($load=0) + { + return new HeavyJob([ + 'uid' => uniqid(), + 'load' =>$load + ]); + } + /** * @param SimpleJob $job */ @@ -65,6 +78,36 @@ protected function assertSimpleJobLaterDone(SimpleJob $job, $delay) $this->assertGreaterThanOrEqual($time, filemtime($job->getFileName())); } + /** + * @param HeavyJob $job + * @param int $delay + * @param $initialTime + */ + protected function assertHeavyJobLaterDone(HeavyJob $job, $delay, $initialTime) + { + $time = $initialTime + $delay; + sleep($delay); + $timeout = 5000000; // 5 sec + $step = 50000; + while (!file_exists($job->getFileName()) && $timeout > 0) { + usleep($step); + $timeout -= $step; + } + $this->assertFileExists($job->getFileName()); + $this->assertGreaterThanOrEqual($time, filemtime($job->getFileName())); + } + + /** + * @param HeavyJob $job + * @param int $delay + */ + protected function assertHeavyJobDelayedFifoDone(HeavyJob $job1, HeavyJob $job2, $msg) + { + $this->assertFileExists($job1->getFileName()); + $this->assertFileExists($job2->getFileName()); + $this->assertGreaterThanOrEqual(filemtime($job1->getFileName()), filemtime($job2->getFileName()), $msg); + } + /** * @inheritdoc */ diff --git a/tests/drivers/redis/QueueTest.php b/tests/drivers/redis/QueueTest.php index 9a26052aa5..c8cefeec33 100644 --- a/tests/drivers/redis/QueueTest.php +++ b/tests/drivers/redis/QueueTest.php @@ -58,6 +58,35 @@ public function testLater() $this->assertSimpleJobLaterDone($job, 2); } + public function testHeavyJobDelayedFifo() + { + $this->startProcess(['php', 'yii', 'queue/listen', '1']); + + $delay = 1; + $load = $delay+1; + $initialTimeFirstBatch = time(); + + $job1 = $this->createHeavyJob($load); + $this->getQueue()->delay($delay)->push($job1); + + $job2 = $this->createHeavyJob($load); + $this->getQueue()->delay($delay)->push($job2); + + sleep($delay); + + $initialTimeSecondBatch = time(); + $job3 = $this->createHeavyJob($load); + $this->getQueue()->delay($delay)->push($job3); + + + $this->assertHeavyJobLaterDone($job1, $delay, $initialTimeFirstBatch); + $this->assertHeavyJobLaterDone($job2, $delay, $initialTimeFirstBatch); + $this->assertHeavyJobLaterDone($job3, $delay, $initialTimeSecondBatch); + + $this->assertHeavyJobDelayedFifoDone($job1, $job2, 'Job1 < Job2'); + $this->assertHeavyJobDelayedFifoDone($job2, $job3, 'Job2 < Job3'); + } + public function testRetry() { $this->startProcess(['php', 'yii', 'queue/listen', '1']);