-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Description
Hi,
I think there is something wrong in the MongoQueue class. Could you have a look at this?
First, this request does not seem to work properly on my server when the "reserved" field is not set which is the case for any newly inserted job.
// Jenssegers\Mongodb\Queue\MongoQueue
// getNextAvailableJobAndReserve($queue)
// ...
$job = $this->database->getCollection($this->table)->findOneAndUpdate(
[
'queue' => $this->getQueue($queue),
'reserved' => 0,
'available_at' => ['$lte' => Carbon::now()->getTimestamp()],
],
[
'$set' => [
'reserved' => 1,
'reserved_at' => Carbon::now()->getTimestamp(),
],
],
[
'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
'sort' => ['available_at' => 1],
]
);
// ...
I changed it like this and it seems to work:
// Jenssegers\Mongodb\Queue\MongoQueue
$job = $this->database->getCollection($this->table)->findOneAndUpdate(
[
'queue' => $this->getQueue($queue),
//'reserved' => 0,
'reserved' => ['$ne' => 1],
'available_at' => ['$lte' => Carbon::now()->getTimestamp()],
],
[
'$set' => [
'reserved' => 1,
'reserved_at' => Carbon::now()->getTimestamp(),
],
],
[
'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
'sort' => ['available_at' => 1],
]
);
Most importantly, I don't understand what you tried to do in the "releaseJobsThatHaveBeenReservedTooLong" method. Based on its name, I would assume that this method reset the jobs (so setting reserved to 0 and reserved_at to null) that have run for too long so that then can be tried one more time. But in this case why also selecting jobs that are just ready to be processed?
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
$now = time();
$reserved = $this->database->collection($this->table)
->where('queue', $this->getQueue($queue))
->where(function ($query) use ($expiration, $now) {
// Check for available jobs
//** WHY selecting these ones? **
$query->where(function ($query) use ($now) {
$query->whereNull('reserved_at');
$query->where('available_at', '<=', $now);
});
// Check for jobs that are reserved but have expired
// Yes, make sense!
$query->orWhere('reserved_at', '<=', $expiration);
})->get();
foreach ($reserved as $job) {
$attempts = $job['attempts'] + 1;
$this->releaseJob($job['_id'], $attempts);
}
}
What happened on my server is that ready to be processed jobs where picked up by this method to be released, setting then the "reserved" field to 0 (which in turn could allow then to be selected by the getNextAvailableJobAndReserve method).
Unfortunately, under some load, these jobs were picked up many times by the "releaseJobsThatHaveBeenReservedTooLong", increasing the "attempts" field before they even get a chance to be picked up by the "getNextAvailableJobAndReserve". Ultimately, they would end up as
failed jobs because of too many attempts while they actually never had been processed!
So I updated this method like this to fix this issue:
protected function releaseJobsThatHaveBeenReservedTooLong($queue)
{
$expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
$now = time();
$reserved = $this->database->collection($this->table)
->where('queue', $this->getQueue($queue))
->where(function ($query) use ($expiration, $now) {
// Check for jobs that are reserved but have expired
$query->where('reserved', '=', 1);
$query->where('reserved_at', '<=', $expiration);
})->get();
foreach ($reserved as $job) {
$attempts = $job['attempts'] + 1;
$this->releaseJob($job['_id'], $attempts);
}
}
This seems to work fine for me but I have the feeling that I misunderstood something. Could you have a look and tell me if I'm correct?
Thanks!