@@ -70,14 +70,17 @@ protected function getNextAvailableJobAndReserve($queue)
70
70
$ job = $ this ->database ->getCollection ($ this ->table )->findOneAndUpdate (
71
71
[
72
72
'queue ' => $ this ->getQueue ($ queue ),
73
- 'reserved ' => 0 ,
73
+ 'reserved ' => [ ' $ne ' => 1 ] ,
74
74
'available_at ' => ['$lte ' => Carbon::now ()->getTimestamp ()],
75
75
],
76
76
[
77
77
'$set ' => [
78
78
'reserved ' => 1 ,
79
79
'reserved_at ' => Carbon::now ()->getTimestamp (),
80
80
],
81
+ '$inc ' => [
82
+ 'attempts ' => 1 ,
83
+ ],
81
84
],
82
85
[
83
86
'returnDocument ' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER ,
@@ -101,24 +104,15 @@ protected function getNextAvailableJobAndReserve($queue)
101
104
protected function releaseJobsThatHaveBeenReservedTooLong ($ queue )
102
105
{
103
106
$ expiration = Carbon::now ()->subSeconds ($ this ->retryAfter )->getTimestamp ();
104
- $ now = time ();
105
107
106
108
$ reserved = $ this ->database ->collection ($ this ->table )
107
109
->where ('queue ' , $ this ->getQueue ($ queue ))
108
- ->where (function ($ query ) use ($ expiration , $ now ) {
109
- // Check for available jobs
110
- $ query ->where (function ($ query ) use ($ now ) {
111
- $ query ->whereNull ('reserved_at ' );
112
- $ query ->where ('available_at ' , '<= ' , $ now );
113
- });
114
-
115
- // Check for jobs that are reserved but have expired
116
- $ query ->orWhere ('reserved_at ' , '<= ' , $ expiration );
117
- })->get ();
110
+ ->whereNotNull ('reserved_at ' )
111
+ ->where ('reserved_at ' , '<= ' , $ expiration )
112
+ ->get ();
118
113
119
114
foreach ($ reserved as $ job ) {
120
- $ attempts = $ job ['attempts ' ] + 1 ;
121
- $ this ->releaseJob ($ job ['_id ' ], $ attempts );
115
+ $ this ->releaseJob ($ job ['_id ' ], $ job ['attempts ' ]);
122
116
}
123
117
}
124
118
@@ -145,4 +139,4 @@ public function deleteReserved($queue, $id)
145
139
{
146
140
$ this ->database ->collection ($ this ->table )->where ('_id ' , $ id )->delete ();
147
141
}
148
- }
142
+ }
0 commit comments