diff --git a/src/Jenssegers/Mongodb/Queue/MongoQueue.php b/src/Jenssegers/Mongodb/Queue/MongoQueue.php index 44249456a..369ef6b72 100644 --- a/src/Jenssegers/Mongodb/Queue/MongoQueue.php +++ b/src/Jenssegers/Mongodb/Queue/MongoQueue.php @@ -64,7 +64,7 @@ protected function getNextAvailableJobAndReserve($queue) $job = $this->database->getCollection($this->table)->findOneAndUpdate( [ 'queue' => $this->getQueue($queue), - 'reserved' => 0, + 'reserved' => ['$ne' => 1], 'available_at' => ['$lte' => Carbon::now()->getTimestamp()], ], [ @@ -72,6 +72,9 @@ protected function getNextAvailableJobAndReserve($queue) 'reserved' => 1, 'reserved_at' => Carbon::now()->getTimestamp(), ], + '$inc' => [ + 'attempts' => 1, + ], ], [ 'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER, @@ -94,24 +97,15 @@ protected function getNextAvailableJobAndReserve($queue) protected function releaseJobsThatHaveBeenReservedTooLong($queue) { $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp(); - $now = time(); $reserved = $this->database->collection($this->table) ->where('queue', $this->getQueue($queue)) - ->where(function ($query) use ($expiration, $now) { - // Check for available jobs - $query->where(function ($query) use ($now) { - $query->whereNull('reserved_at'); - $query->where('available_at', '<=', $now); - }); - - // Check for jobs that are reserved but have expired - $query->orWhere('reserved_at', '<=', $expiration); - })->get(); + ->whereNotNull('reserved_at') + ->where('reserved_at', '<=', $expiration) + ->get(); foreach ($reserved as $job) { - $attempts = $job['attempts'] + 1; - $this->releaseJob($job['_id'], $attempts); + $this->releaseJob($job['_id'], $job['attempts']); } } diff --git a/tests/QueueTest.php b/tests/QueueTest.php index 21306bfb1..66075f980 100644 --- a/tests/QueueTest.php +++ b/tests/QueueTest.php @@ -63,4 +63,23 @@ public function testFindFailJobNull(): void $this->assertNull($provider->find(1)); } + + public function testIncrementAttempts(): void + { + $job_id = Queue::push('test1', ['action' => 'QueueJobExpired'], 'test'); + $this->assertNotNull($job_id); + $job_id = Queue::push('test2', ['action' => 'QueueJobExpired'], 'test'); + $this->assertNotNull($job_id); + + $job = Queue::pop('test'); + $this->assertEquals(1, $job->attempts()); + $job->delete(); + + $others_jobs = Queue::getDatabase() + ->table(Config::get('queue.connections.database.table')) + ->get(); + + $this->assertCount(1, $others_jobs); + $this->assertEquals(0, $others_jobs[0]['attempts']); + } }