From 5c5c03e3e2929529811b698fb11c569fa37cb73b Mon Sep 17 00:00:00 2001 From: MarioCamaraNeto Date: Fri, 30 Apr 2021 20:17:21 -0300 Subject: [PATCH 1/3] Change ordering LIFO to FIFO. Push in left and pop from right --- src/drivers/redis/Queue.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/drivers/redis/Queue.php b/src/drivers/redis/Queue.php index 994d9e865f..c170dc47e5 100644 --- a/src/drivers/redis/Queue.php +++ b/src/drivers/redis/Queue.php @@ -162,10 +162,10 @@ protected function reserve($timeout) protected function moveExpired($from) { $now = time(); - if ($expired = $this->redis->zrevrangebyscore($from, $now, '-inf')) { + if ($expired = $this->redis->zrangebyscore($from, '-inf', $now)) { $this->redis->zremrangebyscore($from, '-inf', $now); foreach ($expired as $id) { - $this->redis->rpush("$this->channel.waiting", $id); + $this->redis->lpush("$this->channel.waiting", $id); } } } From 2bdc5b4c0fc765524fc53ab4d184c117ac220ac7 Mon Sep 17 00:00:00 2001 From: MarioCamaraNeto Date: Fri, 30 Apr 2021 22:14:04 -0300 Subject: [PATCH 2/3] Tests --- tests/app/SimpleJob.php | 3 ++- tests/drivers/TestCase.php | 15 ++++++++++++++- tests/drivers/redis/QueueTest.php | 21 +++++++++++++++++++++ 3 files changed, 37 insertions(+), 2 deletions(-) diff --git a/tests/app/SimpleJob.php b/tests/app/SimpleJob.php index 5347436a0a..e9dad60d70 100644 --- a/tests/app/SimpleJob.php +++ b/tests/app/SimpleJob.php @@ -19,10 +19,11 @@ class SimpleJob extends BaseObject implements JobInterface { public $uid; + public $data; public function execute($queue) { - file_put_contents($this->getFileName(), ''); + file_put_contents($this->getFileName(), $this->data); } public function getFileName() diff --git a/tests/drivers/TestCase.php b/tests/drivers/TestCase.php index f79ce68e90..25aa214e04 100644 --- a/tests/drivers/TestCase.php +++ b/tests/drivers/TestCase.php @@ -26,10 +26,11 @@ abstract protected function getQueue(); /** * @return SimpleJob */ - protected function createSimpleJob() + protected function createSimpleJob($data='') { return new SimpleJob([ 'uid' => uniqid(), + 'data'=> $data ]); } @@ -65,6 +66,18 @@ protected function assertSimpleJobLaterDone(SimpleJob $job, $delay) $this->assertGreaterThanOrEqual($time, filemtime($job->getFileName())); } + /** + * @param SimpleJob $job + * @param int $delay + */ + protected function assertSimpleJobDelayedFifoDone(SimpleJob $job1, SimpleJob $job2, $msg) + { + $this->assertFileExists($job1->getFileName()); + $this->assertFileExists($job2->getFileName()); + + $this->assertGreaterThanOrEqual(file_get_contents($job1->getFileName()), file_get_contents($job2->getFileName()), $msg); + } + /** * @inheritdoc */ diff --git a/tests/drivers/redis/QueueTest.php b/tests/drivers/redis/QueueTest.php index 9a26052aa5..5faf8e554c 100644 --- a/tests/drivers/redis/QueueTest.php +++ b/tests/drivers/redis/QueueTest.php @@ -58,6 +58,27 @@ public function testLater() $this->assertSimpleJobLaterDone($job, 2); } + public function testLaterFifo() + { + $this->startProcess(['php', 'yii', 'queue/listen', '1']); + + $job1 = $this->createSimpleJob('1'); + $this->getQueue()->delay(2)->push($job1); + + $job2 = $this->createSimpleJob('2'); + $this->getQueue()->delay( 2)->push($job2); + + $job3 = $this->createSimpleJob('3'); + $this->getQueue()->delay(2)->push($job3); + + $this->assertSimpleJobLaterDone($job1, 2); + $this->assertSimpleJobLaterDone($job2, 2); + $this->assertSimpleJobLaterDone($job3, 2); + + $this->assertSimpleJobDelayedFifoDone($job1, $job2, 'Job1 < Job2'); + $this->assertSimpleJobDelayedFifoDone($job2, $job3, 'Job2 < Job3'); + } + public function testRetry() { $this->startProcess(['php', 'yii', 'queue/listen', '1']); From 8f506b84d2384faadcb1c8a62fb163660d633292 Mon Sep 17 00:00:00 2001 From: MarioCamaraNeto Date: Mon, 3 May 2021 23:13:15 -0300 Subject: [PATCH 3/3] Test FIFO with delay jobs --- tests/app/HeavyJob.php | 34 +++++++++++++++++++++++++ tests/app/SimpleJob.php | 3 +-- tests/drivers/TestCase.php | 42 ++++++++++++++++++++++++++----- tests/drivers/redis/QueueTest.php | 32 ++++++++++++++--------- 4 files changed, 91 insertions(+), 20 deletions(-) create mode 100644 tests/app/HeavyJob.php diff --git a/tests/app/HeavyJob.php b/tests/app/HeavyJob.php new file mode 100644 index 0000000000..170994885e --- /dev/null +++ b/tests/app/HeavyJob.php @@ -0,0 +1,34 @@ + + */ +class HeavyJob extends BaseObject implements JobInterface +{ + public $uid; + public $load; + + public function execute($queue) + { + file_put_contents($this->getFileName(), ''); + sleep($this->load); + } + + public function getFileName() + { + return Yii::getAlias("@runtime/job-{$this->uid}.lock"); + } +} diff --git a/tests/app/SimpleJob.php b/tests/app/SimpleJob.php index e9dad60d70..5347436a0a 100644 --- a/tests/app/SimpleJob.php +++ b/tests/app/SimpleJob.php @@ -19,11 +19,10 @@ class SimpleJob extends BaseObject implements JobInterface { public $uid; - public $data; public function execute($queue) { - file_put_contents($this->getFileName(), $this->data); + file_put_contents($this->getFileName(), ''); } public function getFileName() diff --git a/tests/drivers/TestCase.php b/tests/drivers/TestCase.php index 25aa214e04..d2927ff42d 100644 --- a/tests/drivers/TestCase.php +++ b/tests/drivers/TestCase.php @@ -7,6 +7,7 @@ namespace tests\drivers; +use tests\app\HeavyJob; use Yii; use tests\app\SimpleJob; use yii\queue\Queue; @@ -26,11 +27,22 @@ abstract protected function getQueue(); /** * @return SimpleJob */ - protected function createSimpleJob($data='') + protected function createSimpleJob() { return new SimpleJob([ 'uid' => uniqid(), - 'data'=> $data + ]); + } + + /** + * @param int $load + * @return HeavyJob + */ + protected function createHeavyJob($load=0) + { + return new HeavyJob([ + 'uid' => uniqid(), + 'load' =>$load ]); } @@ -67,15 +79,33 @@ protected function assertSimpleJobLaterDone(SimpleJob $job, $delay) } /** - * @param SimpleJob $job + * @param HeavyJob $job * @param int $delay + * @param $initialTime */ - protected function assertSimpleJobDelayedFifoDone(SimpleJob $job1, SimpleJob $job2, $msg) + protected function assertHeavyJobLaterDone(HeavyJob $job, $delay, $initialTime) + { + $time = $initialTime + $delay; + sleep($delay); + $timeout = 5000000; // 5 sec + $step = 50000; + while (!file_exists($job->getFileName()) && $timeout > 0) { + usleep($step); + $timeout -= $step; + } + $this->assertFileExists($job->getFileName()); + $this->assertGreaterThanOrEqual($time, filemtime($job->getFileName())); + } + + /** + * @param HeavyJob $job + * @param int $delay + */ + protected function assertHeavyJobDelayedFifoDone(HeavyJob $job1, HeavyJob $job2, $msg) { $this->assertFileExists($job1->getFileName()); $this->assertFileExists($job2->getFileName()); - - $this->assertGreaterThanOrEqual(file_get_contents($job1->getFileName()), file_get_contents($job2->getFileName()), $msg); + $this->assertGreaterThanOrEqual(filemtime($job1->getFileName()), filemtime($job2->getFileName()), $msg); } /** diff --git a/tests/drivers/redis/QueueTest.php b/tests/drivers/redis/QueueTest.php index 5faf8e554c..c8cefeec33 100644 --- a/tests/drivers/redis/QueueTest.php +++ b/tests/drivers/redis/QueueTest.php @@ -58,25 +58,33 @@ public function testLater() $this->assertSimpleJobLaterDone($job, 2); } - public function testLaterFifo() + public function testHeavyJobDelayedFifo() { $this->startProcess(['php', 'yii', 'queue/listen', '1']); - $job1 = $this->createSimpleJob('1'); - $this->getQueue()->delay(2)->push($job1); + $delay = 1; + $load = $delay+1; + $initialTimeFirstBatch = time(); - $job2 = $this->createSimpleJob('2'); - $this->getQueue()->delay( 2)->push($job2); + $job1 = $this->createHeavyJob($load); + $this->getQueue()->delay($delay)->push($job1); - $job3 = $this->createSimpleJob('3'); - $this->getQueue()->delay(2)->push($job3); + $job2 = $this->createHeavyJob($load); + $this->getQueue()->delay($delay)->push($job2); - $this->assertSimpleJobLaterDone($job1, 2); - $this->assertSimpleJobLaterDone($job2, 2); - $this->assertSimpleJobLaterDone($job3, 2); + sleep($delay); - $this->assertSimpleJobDelayedFifoDone($job1, $job2, 'Job1 < Job2'); - $this->assertSimpleJobDelayedFifoDone($job2, $job3, 'Job2 < Job3'); + $initialTimeSecondBatch = time(); + $job3 = $this->createHeavyJob($load); + $this->getQueue()->delay($delay)->push($job3); + + + $this->assertHeavyJobLaterDone($job1, $delay, $initialTimeFirstBatch); + $this->assertHeavyJobLaterDone($job2, $delay, $initialTimeFirstBatch); + $this->assertHeavyJobLaterDone($job3, $delay, $initialTimeSecondBatch); + + $this->assertHeavyJobDelayedFifoDone($job1, $job2, 'Job1 < Job2'); + $this->assertHeavyJobDelayedFifoDone($job2, $job3, 'Job2 < Job3'); } public function testRetry()