Skip to content

Bug in MongoQueue class #1242

@JuCarr

Description

@JuCarr

@jenssegers

Hi,

I think there is something wrong in the MongoQueue class. Could you have a look at this?

First, this request does not seem to work properly on my server when the "reserved" field is not set which is the case for any newly inserted job.

        // Jenssegers\Mongodb\Queue\MongoQueue
        // getNextAvailableJobAndReserve($queue)

        // ...
    
        $job = $this->database->getCollection($this->table)->findOneAndUpdate(
            [
                'queue' => $this->getQueue($queue),
                'reserved' => 0,
                'available_at' => ['$lte' => Carbon::now()->getTimestamp()],
            ],
            [
                '$set' => [
                    'reserved' => 1,
                    'reserved_at' => Carbon::now()->getTimestamp(),
                ],
            ],
            [
                'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
                'sort' => ['available_at' => 1],
            ]
        );

        // ...

I changed it like this and it seems to work:

        // Jenssegers\Mongodb\Queue\MongoQueue
        $job = $this->database->getCollection($this->table)->findOneAndUpdate(
            [
                'queue' => $this->getQueue($queue),
                //'reserved' => 0,
                'reserved'     => ['$ne' => 1],
                'available_at' => ['$lte' => Carbon::now()->getTimestamp()],
            ],
            [
                '$set' => [
                    'reserved' => 1,
                    'reserved_at' => Carbon::now()->getTimestamp(),
                ],
            ],
            [
                'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER,
                'sort' => ['available_at' => 1],
            ]
        );

Most importantly, I don't understand what you tried to do in the "releaseJobsThatHaveBeenReservedTooLong" method. Based on its name, I would assume that this method reset the jobs (so setting reserved to 0 and reserved_at to null) that have run for too long so that then can be tried one more time. But in this case why also selecting jobs that are just ready to be processed?

    protected function releaseJobsThatHaveBeenReservedTooLong($queue)
    {
        $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
        $now = time();

        $reserved = $this->database->collection($this->table)
            ->where('queue', $this->getQueue($queue))
            ->where(function ($query) use ($expiration, $now) {
                // Check for available jobs
                //** WHY selecting these ones? **
                $query->where(function ($query) use ($now) {
                    $query->whereNull('reserved_at');
                    $query->where('available_at', '<=', $now);
                });

                // Check for jobs that are reserved but have expired
               // Yes, make sense!
                $query->orWhere('reserved_at', '<=', $expiration);

            })->get();

        foreach ($reserved as $job) {
            $attempts = $job['attempts'] + 1;
            $this->releaseJob($job['_id'], $attempts);
        }
    }

What happened on my server is that ready to be processed jobs where picked up by this method to be released, setting then the "reserved" field to 0 (which in turn could allow then to be selected by the getNextAvailableJobAndReserve method).

Unfortunately, under some load, these jobs were picked up many times by the "releaseJobsThatHaveBeenReservedTooLong", increasing the "attempts" field before they even get a chance to be picked up by the "getNextAvailableJobAndReserve". Ultimately, they would end up as
failed jobs because of too many attempts while they actually never had been processed!

So I updated this method like this to fix this issue:

    protected function releaseJobsThatHaveBeenReservedTooLong($queue)
    {
        $expiration = Carbon::now()->subSeconds($this->retryAfter)->getTimestamp();
        $now = time();

        $reserved = $this->database->collection($this->table)
            ->where('queue', $this->getQueue($queue))
            ->where(function ($query) use ($expiration, $now) {

                // Check for jobs that are reserved but have expired
                $query->where('reserved', '=', 1);
                $query->where('reserved_at', '<=', $expiration);

            })->get();

        foreach ($reserved as $job) {
            $attempts = $job['attempts'] + 1;
            $this->releaseJob($job['_id'], $attempts);
        }
    }

This seems to work fine for me but I have the feeling that I misunderstood something. Could you have a look and tell me if I'm correct?

Thanks!

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions