From 9933b78dbe0ddfd92b304d6eecf42a069cd98b82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Tue, 23 Apr 2024 22:07:35 +0200 Subject: [PATCH 01/15] PHPORM-81 implement mmongodb driver for batch --- src/Bus/MongoBatchRepository.php | 200 +++++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 src/Bus/MongoBatchRepository.php diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php new file mode 100644 index 000000000..0294708f8 --- /dev/null +++ b/src/Bus/MongoBatchRepository.php @@ -0,0 +1,200 @@ +collection->find( + $before ? ['_id' => ['$lt' => $before]] : [], + [ + 'limit' => $limit, + 'sort' => ['_id' => -1], + 'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'], + ], + ); + } + + #[Override] + public function find(string $batchId) + { + $batchId = new ObjectId($batchId); + + return $this->collection->findOne( + ['_id' => $batchId], + ['readPreference' => ReadPreference::PRIMARY], + ); + } + + #[Override] + public function store(PendingBatch $batch) + { + $this->collection->insertOne([ + 'name' => $batch->name, + 'total_jobs' => 0, + 'pending_jobs' => 0, + 'failed_jobs' => 0, + 'failed_job_ids' => '[]', + 'options' => $this->serialize($batch->options), + 'created_at' => new UTCDateTime(Carbon::now()), + 'cancelled_at' => null, + 'finished_at' => null, + ]); + } + + #[Override] + public function incrementTotalJobs(string $batchId, int $amount) + { + $batchId = new ObjectId($batchId); + $this->collection->updateOne( + ['_id' => $batchId], + [ + '$inc' => [ + 'total_jobs' => $amount, + 'pending_jobs' => $amount, + ], + '$set' => [ + 'finished_at' => null, + ], + ], + ); + } + + #[Override] + public function decrementPendingJobs(string $batchId, string $jobId) + { + $batchId = new ObjectId($batchId); + $values = $this->collection->findOneAndUpdate( + ['_id' => $batchId], + [ + '$dec' => ['pending_jobs' => 1], + ], + [ + 'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1], + ], + ); + + return new UpdatedBatchJobCounts( + $values['pending_jobs'], + $values['failed_jobs'], + ); + } + + #[Override] + public function incrementFailedJobs(string $batchId, string $jobId) + { + // TODO: Implement incrementFailedJobs() method. + } + + #[Override] + public function markAsFinished(string $batchId) + { + $batchId = new ObjectId($batchId); + $this->collection->updateOne( + ['_id' => $batchId], + ['$set' => ['finished_at' => new UTCDateTime(Carbon::now())]], + ); + } + + #[Override] + public function cancel(string $batchId) + { + $batchId = new ObjectId($batchId); + $this->collection->updateOne( + ['_id' => $batchId], + [ + '$set' => [ + 'cancelled_at' => new UTCDateTime(Carbon::now()), + 'finished_at' => new UTCDateTime(Carbon::now()), + ], + ], + ); + } + + #[Override] + public function delete(string $batchId) + { + $batchId = new ObjectId($batchId); + $this->collection->deleteOne(['_id' => $batchId]); + } + + #[Override] + public function transaction(Closure $callback) + { + // Transactions are not necessary + return $callback(); + } + + /** Update an atomic value within the batch. */ + #[Override] + public function rollBack() + { + throw new BadMethodCallException('Not implemented'); + } + + /** Mark the batch that has the given ID as finished. */ + #[Override] + public function prune(DateTimeInterface $before): int + { + $result = $this->collection->deleteMany( + ['finished_at' => ['$ne' => null, '$lt' => new UTCDateTime($before)]], + ); + + return $result->getDeletedCount(); + } + + /** Prune all the unfinished entries older than the given date. */ + public function pruneUnfinished(DateTimeInterface $before): int + { + $result = $this->collection->deleteMany( + [ + 'finished_at' => null, + 'created_at' => ['$lt' => new UTCDateTime($before)], + ], + ); + + return $result->getDeletedCount(); + } + + /** Prune all the cancelled entries older than the given date. */ + public function pruneCancelled(DateTimeInterface $before): int + { + $result = $this->collection->deleteMany( + [ + 'cancelled_at' => ['$ne' => null], + 'created_at' => ['$lt' => new UTCDateTime($before)], + ], + ); + + return $result->getDeletedCount(); + } +} From 6df56045dd23b0d6e885869aef39d47c8ca9cf05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Fri, 26 Apr 2024 17:46:10 +0200 Subject: [PATCH 02/15] wip --- src/Bus/MongoBatchRepository.php | 99 ++++++++++++++++++++++++++----- src/MongoDBBusServiceProvider.php | 53 +++++++++++++++++ 2 files changed, 136 insertions(+), 16 deletions(-) create mode 100644 src/MongoDBBusServiceProvider.php diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php index 0294708f8..cf1167731 100644 --- a/src/Bus/MongoBatchRepository.php +++ b/src/Bus/MongoBatchRepository.php @@ -3,12 +3,16 @@ namespace MongoDB\Laravel\Bus; use BadMethodCallException; +use Carbon\CarbonImmutable; use Closure; use DateTimeInterface; +use Illuminate\Bus\Batch; +use Illuminate\Bus\BatchFactory; use Illuminate\Bus\DatabaseBatchRepository; use Illuminate\Bus\PendingBatch; use Illuminate\Bus\PrunableBatchRepository; use Illuminate\Bus\UpdatedBatchJobCounts; +use Illuminate\Database\Connection; use Illuminate\Support\Carbon; use MongoDB\BSON\ObjectId; use MongoDB\BSON\UTCDateTime; @@ -16,19 +20,29 @@ use MongoDB\Laravel\Collection; use Override; +use function assert; +use function date_default_timezone_get; use function is_string; // Extending DatabaseBatchRepository is necessary so methods pruneUnfinished and pruneCancelled // are called by PruneBatchesCommand class MongoBatchRepository extends DatabaseBatchRepository implements PrunableBatchRepository { + private Collection $collection; + public function __construct( - private Collection $collection, + BatchFactory $factory, + Connection $connection, + string $collection, ) { + assert($connection instanceof \MongoDB\Laravel\Connection); + $this->collection = $connection->getCollection($collection); + + parent::__construct($factory, $connection, $collection); } #[Override] - public function get($limit = 50, $before = null) + public function get($limit = 50, $before = null): array { if (is_string($before)) { $before = new ObjectId($before); @@ -41,22 +55,36 @@ public function get($limit = 50, $before = null) 'sort' => ['_id' => -1], 'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'], ], - ); + )->toArray(); } #[Override] - public function find(string $batchId) + public function find(string $batchId): ?Batch { $batchId = new ObjectId($batchId); - return $this->collection->findOne( + $batch = $this->collection->findOne( ['_id' => $batchId], ['readPreference' => ReadPreference::PRIMARY], ); + + return $this->factory->make( + $this, + $batch['id'], + $batch['name'], + $batch['total_jobs'], + $batch['pending_jobs'], + $batch['failed_jobs'], + $batch['failed_job_ids'], + $batch['options'], + CarbonImmutable::createFromTimestamp($batch['created_at']->getTimestamp(), date_default_timezone_get()), + $batch['cancelled_at'] ? CarbonImmutable::createFromTimestamp($batch['cancelled_at']->getTimestamp(), date_default_timezone_get()) : null, + $batch['finished_at'] ? CarbonImmutable::createFromTimestamp($batch['finished_at']->getTimestamp(), date_default_timezone_get()) : null, + ); } #[Override] - public function store(PendingBatch $batch) + public function store(PendingBatch $batch): Batch { $this->collection->insertOne([ 'name' => $batch->name, @@ -72,7 +100,7 @@ public function store(PendingBatch $batch) } #[Override] - public function incrementTotalJobs(string $batchId, int $amount) + public function incrementTotalJobs(string $batchId, int $amount): void { $batchId = new ObjectId($batchId); $this->collection->updateOne( @@ -90,13 +118,14 @@ public function incrementTotalJobs(string $batchId, int $amount) } #[Override] - public function decrementPendingJobs(string $batchId, string $jobId) + public function decrementPendingJobs(string $batchId, string $jobId): UpdatedBatchJobCounts { $batchId = new ObjectId($batchId); $values = $this->collection->findOneAndUpdate( ['_id' => $batchId], [ '$dec' => ['pending_jobs' => 1], + '$pull' => ['failed_job_ids' => $jobId], ], [ 'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1], @@ -110,13 +139,28 @@ public function decrementPendingJobs(string $batchId, string $jobId) } #[Override] - public function incrementFailedJobs(string $batchId, string $jobId) + public function incrementFailedJobs(string $batchId, string $jobId): UpdatedBatchJobCounts { - // TODO: Implement incrementFailedJobs() method. + $batchId = new ObjectId($batchId); + $values = $this->collection->findOneAndUpdate( + ['_id' => $batchId], + [ + '$inc' => ['pending_jobs' => 1], + '$push' => ['failed_job_ids' => $jobId], + ], + [ + 'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1], + ], + ); + + return new UpdatedBatchJobCounts( + $values['pending_jobs'], + $values['failed_jobs'], + ); } #[Override] - public function markAsFinished(string $batchId) + public function markAsFinished(string $batchId): void { $batchId = new ObjectId($batchId); $this->collection->updateOne( @@ -126,7 +170,7 @@ public function markAsFinished(string $batchId) } #[Override] - public function cancel(string $batchId) + public function cancel(string $batchId): void { $batchId = new ObjectId($batchId); $this->collection->updateOne( @@ -141,22 +185,27 @@ public function cancel(string $batchId) } #[Override] - public function delete(string $batchId) + public function delete(string $batchId): void { $batchId = new ObjectId($batchId); $this->collection->deleteOne(['_id' => $batchId]); } + /** Execute the given Closure within a storage specific transaction. */ #[Override] - public function transaction(Closure $callback) + public function transaction(Closure $callback): mixed { // Transactions are not necessary return $callback(); } - /** Update an atomic value within the batch. */ + /** + * Rollback the last database transaction for the connection. + * + * Not implemented. + */ #[Override] - public function rollBack() + public function rollBack(): void { throw new BadMethodCallException('Not implemented'); } @@ -197,4 +246,22 @@ public function pruneCancelled(DateTimeInterface $before): int return $result->getDeletedCount(); } + + #[Override] + protected function toBatch($batch): Batch + { + return $this->factory->make( + $this, + $batch->id, + $batch->name, + $batch->total_jobs, + $batch->pending_jobs, + $batch->failed_jobs, + $batch->failed_job_ids, + $batch->options, + CarbonImmutable::createFromTimestamp($batch->created_at->getTimestamp(), date_default_timezone_get()), + $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at->getTimestamp(), date_default_timezone_get()) : null, + $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at->getTimestamp(), date_default_timezone_get()) : null, + ); + } } diff --git a/src/MongoDBBusServiceProvider.php b/src/MongoDBBusServiceProvider.php new file mode 100644 index 000000000..2e4d81cd2 --- /dev/null +++ b/src/MongoDBBusServiceProvider.php @@ -0,0 +1,53 @@ +app->singleton(MongoBatchRepository::class, function (Container $app) { + return new MongoBatchRepository( + $app->make(BatchFactory::class), + $app->make('db')->connection($app->config->get('queue.batching.database')), + $app->config->get('queue.batching.collection', 'job_batches'), + ); + }); + + /** @see BusServiceProvider::registerBatchServices() */ + $this->app->extend(BatchRepository::class, function (BatchRepository $repository, Container $app) { + $driver = $app->config->get('queue.batching.driver'); + + return $driver === 'mongodb' + ? $app->make(MongoBatchRepository::class) + : $repository; + }); + // Add database driver. + $this->app->resolving('db', function ($db) { + $db->extend('mongodb', function ($config, $name) { + $config['name'] = $name; + + return new Connection($config); + }); + }); + } + + public function provides() + { + return [ + BatchRepository::class, + MongoBatchRepository::class, + ]; + } +} From 408cf082420a4bf55d63766698abfa4de25d3e46 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Thu, 2 May 2024 11:53:33 +0200 Subject: [PATCH 03/15] First test --- src/Bus/MongoBatchRepository.php | 49 +++++++----- tests/Bus/MongoBatchRepositoryTest.php | 100 +++++++++++++++++++++++++ 2 files changed, 129 insertions(+), 20 deletions(-) create mode 100644 tests/Bus/MongoBatchRepositoryTest.php diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php index cf1167731..1b52cb496 100644 --- a/src/Bus/MongoBatchRepository.php +++ b/src/Bus/MongoBatchRepository.php @@ -2,7 +2,6 @@ namespace MongoDB\Laravel\Bus; -use BadMethodCallException; use Carbon\CarbonImmutable; use Closure; use DateTimeInterface; @@ -12,15 +11,14 @@ use Illuminate\Bus\PendingBatch; use Illuminate\Bus\PrunableBatchRepository; use Illuminate\Bus\UpdatedBatchJobCounts; -use Illuminate\Database\Connection; use Illuminate\Support\Carbon; use MongoDB\BSON\ObjectId; use MongoDB\BSON\UTCDateTime; use MongoDB\Driver\ReadPreference; use MongoDB\Laravel\Collection; +use MongoDB\Laravel\Connection; use Override; -use function assert; use function date_default_timezone_get; use function is_string; @@ -35,7 +33,6 @@ public function __construct( Connection $connection, string $collection, ) { - assert($connection instanceof \MongoDB\Laravel\Connection); $this->collection = $connection->getCollection($collection); parent::__construct($factory, $connection, $collection); @@ -65,38 +62,43 @@ public function find(string $batchId): ?Batch $batch = $this->collection->findOne( ['_id' => $batchId], - ['readPreference' => ReadPreference::PRIMARY], + [ + 'readPreference' => new ReadPreference(ReadPreference::PRIMARY), + 'typeMap' => ['root' => 'array', 'array' => 'array', 'document' => 'array'], + ], ); return $this->factory->make( $this, - $batch['id'], + $batch['_id'], $batch['name'], $batch['total_jobs'], $batch['pending_jobs'], $batch['failed_jobs'], $batch['failed_job_ids'], $batch['options'], - CarbonImmutable::createFromTimestamp($batch['created_at']->getTimestamp(), date_default_timezone_get()), - $batch['cancelled_at'] ? CarbonImmutable::createFromTimestamp($batch['cancelled_at']->getTimestamp(), date_default_timezone_get()) : null, - $batch['finished_at'] ? CarbonImmutable::createFromTimestamp($batch['finished_at']->getTimestamp(), date_default_timezone_get()) : null, + $this->toCarbon($batch['created_at']), + $this->toCarbon($batch['cancelled_at']), + $this->toCarbon($batch['finished_at']), ); } #[Override] public function store(PendingBatch $batch): Batch { - $this->collection->insertOne([ + $result = $this->collection->insertOne([ 'name' => $batch->name, 'total_jobs' => 0, 'pending_jobs' => 0, 'failed_jobs' => 0, - 'failed_job_ids' => '[]', - 'options' => $this->serialize($batch->options), + 'failed_job_ids' => [], + 'options' => $batch->options, 'created_at' => new UTCDateTime(Carbon::now()), 'cancelled_at' => null, 'finished_at' => null, ]); + + return $this->find($result->getInsertedId()); } #[Override] @@ -195,19 +197,16 @@ public function delete(string $batchId): void #[Override] public function transaction(Closure $callback): mixed { - // Transactions are not necessary - return $callback(); + return $this->connection->transaction(fn () => $callback()); } /** * Rollback the last database transaction for the connection. - * - * Not implemented. */ #[Override] public function rollBack(): void { - throw new BadMethodCallException('Not implemented'); + $this->connection->rollBack(); } /** Mark the batch that has the given ID as finished. */ @@ -259,9 +258,19 @@ protected function toBatch($batch): Batch $batch->failed_jobs, $batch->failed_job_ids, $batch->options, - CarbonImmutable::createFromTimestamp($batch->created_at->getTimestamp(), date_default_timezone_get()), - $batch->cancelled_at ? CarbonImmutable::createFromTimestamp($batch->cancelled_at->getTimestamp(), date_default_timezone_get()) : null, - $batch->finished_at ? CarbonImmutable::createFromTimestamp($batch->finished_at->getTimestamp(), date_default_timezone_get()) : null, + $this->toCarbon($batch->created_at), + $this->toCarbon($batch->cancelled_at), + $this->toCarbon($batch->finished_at), ); } + + /** @return ($date is null ? null : CarbonImmutable) */ + private function toCarbon(?UTCDateTime $date): ?CarbonImmutable + { + if ($date === null) { + return null; + } + + return CarbonImmutable::createFromTimestamp((string) $date, date_default_timezone_get()); + } } diff --git a/tests/Bus/MongoBatchRepositoryTest.php b/tests/Bus/MongoBatchRepositoryTest.php new file mode 100644 index 000000000..a54ca54f0 --- /dev/null +++ b/tests/Bus/MongoBatchRepositoryTest.php @@ -0,0 +1,100 @@ +createTestBatch($queue); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $thirdJob = function () { + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once()->with(m::on(function ($args) use ($job, $secondJob) { + return $args[0] === $job && + $args[1] === $secondJob && + $args[2] instanceof CallQueuedClosure + && is_string($args[2]->batchId); + }), '', 'test-queue'); + + $batch = $batch->add([$job, $secondJob, $thirdJob]); + + $this->assertEquals(3, $batch->totalJobs); + $this->assertEquals(3, $batch->pendingJobs); + $this->assertIsString($job->batchId); + $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt); + } + + /** @see BusBatchTest::createTestBatch() */ + private function createTestBatch($queue, $allowFailures = false) + { + $connection = DB::connection('mongodb'); + $this->assertInstanceOf(Connection::class, $connection); + + $repository = new MongoBatchRepository(new BatchFactory($queue), $connection, 'job_batches'); + + $pendingBatch = (new PendingBatch(new Container(), collect())) + ->progress(function (Batch $batch) { + $_SERVER['__progress.batch'] = $batch; + $_SERVER['__progress.count']++; + }) + ->then(function (Batch $batch) { + $_SERVER['__then.batch'] = $batch; + $_SERVER['__then.count']++; + }) + ->catch(function (Batch $batch, $e) { + $_SERVER['__catch.batch'] = $batch; + $_SERVER['__catch.exception'] = $e; + $_SERVER['__catch.count']++; + }) + ->finally(function (Batch $batch) { + $_SERVER['__finally.batch'] = $batch; + $_SERVER['__finally.count']++; + }) + ->allowFailures($allowFailures) + ->onConnection('test-connection') + ->onQueue('test-queue'); + + return $repository->store($pendingBatch); + } +} From 556cd392ea237b9c78dc637a3dd54e421a0b43ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Thu, 2 May 2024 17:01:17 +0200 Subject: [PATCH 04/15] Import Laravel tests --- src/Bus/MongoBatchRepository.php | 46 ++--- tests/Bus/Fixtures/ChainHeadJob.php | 15 ++ tests/Bus/Fixtures/SecondTestJob.php | 15 ++ tests/Bus/Fixtures/ThirdTestJob.php | 15 ++ tests/Bus/MongoBatchRepositoryTest.php | 270 ++++++++++++++++++++++++- 5 files changed, 332 insertions(+), 29 deletions(-) create mode 100644 tests/Bus/Fixtures/ChainHeadJob.php create mode 100644 tests/Bus/Fixtures/SecondTestJob.php create mode 100644 tests/Bus/Fixtures/ThirdTestJob.php diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php index 1b52cb496..40b3028a3 100644 --- a/src/Bus/MongoBatchRepository.php +++ b/src/Bus/MongoBatchRepository.php @@ -17,10 +17,13 @@ use MongoDB\Driver\ReadPreference; use MongoDB\Laravel\Collection; use MongoDB\Laravel\Connection; +use MongoDB\Operation\FindOneAndUpdate; use Override; use function date_default_timezone_get; use function is_string; +use function serialize; +use function unserialize; // Extending DatabaseBatchRepository is necessary so methods pruneUnfinished and pruneCancelled // are called by PruneBatchesCommand @@ -68,19 +71,7 @@ public function find(string $batchId): ?Batch ], ); - return $this->factory->make( - $this, - $batch['_id'], - $batch['name'], - $batch['total_jobs'], - $batch['pending_jobs'], - $batch['failed_jobs'], - $batch['failed_job_ids'], - $batch['options'], - $this->toCarbon($batch['created_at']), - $this->toCarbon($batch['cancelled_at']), - $this->toCarbon($batch['finished_at']), - ); + return $batch ? $this->toBatch($batch) : null; } #[Override] @@ -92,7 +83,8 @@ public function store(PendingBatch $batch): Batch 'pending_jobs' => 0, 'failed_jobs' => 0, 'failed_job_ids' => [], - 'options' => $batch->options, + // Serialization is required for Closures + 'options' => serialize($batch->options), 'created_at' => new UTCDateTime(Carbon::now()), 'cancelled_at' => null, 'finished_at' => null, @@ -126,11 +118,12 @@ public function decrementPendingJobs(string $batchId, string $jobId): UpdatedBat $values = $this->collection->findOneAndUpdate( ['_id' => $batchId], [ - '$dec' => ['pending_jobs' => 1], + '$inc' => ['pending_jobs' => -1], '$pull' => ['failed_job_ids' => $jobId], ], [ 'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1], + 'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER, ], ); @@ -147,11 +140,12 @@ public function incrementFailedJobs(string $batchId, string $jobId): UpdatedBatc $values = $this->collection->findOneAndUpdate( ['_id' => $batchId], [ - '$inc' => ['pending_jobs' => 1], + '$inc' => ['failed_jobs' => 1], '$push' => ['failed_job_ids' => $jobId], ], [ 'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1], + 'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER, ], ); @@ -251,16 +245,16 @@ protected function toBatch($batch): Batch { return $this->factory->make( $this, - $batch->id, - $batch->name, - $batch->total_jobs, - $batch->pending_jobs, - $batch->failed_jobs, - $batch->failed_job_ids, - $batch->options, - $this->toCarbon($batch->created_at), - $this->toCarbon($batch->cancelled_at), - $this->toCarbon($batch->finished_at), + $batch['_id'], + $batch['name'], + $batch['total_jobs'], + $batch['pending_jobs'], + $batch['failed_jobs'], + $batch['failed_job_ids'], + unserialize($batch['options']), + $this->toCarbon($batch['created_at']), + $this->toCarbon($batch['cancelled_at']), + $this->toCarbon($batch['finished_at']), ); } diff --git a/tests/Bus/Fixtures/ChainHeadJob.php b/tests/Bus/Fixtures/ChainHeadJob.php new file mode 100644 index 000000000..c964e59f9 --- /dev/null +++ b/tests/Bus/Fixtures/ChainHeadJob.php @@ -0,0 +1,15 @@ +getCollection('job_batches')->drop(); + + unset( + $_SERVER['__catch.batch'], + $_SERVER['__catch.count'], + $_SERVER['__catch.exception'], + $_SERVER['__finally.batch'], + $_SERVER['__finally.count'], + $_SERVER['__progress.batch'], + $_SERVER['__progress.count'], + $_SERVER['__then.batch'], + $_SERVER['__then.count'], + ); + + parent::tearDown(); + } /** @see BusBatchTest::test_jobs_can_be_added_to_the_batch */ public function testJobsCanBeAddedToTheBatch(): void @@ -65,8 +98,239 @@ public function testJobsCanBeAddedToTheBatch(): void $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt); } + /** @see BusBatchTest::test_successful_jobs_can_be_recorded */ + public function testSuccessfulJobsCanBeRecorded() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once(); + + $batch = $batch->add([$job, $secondJob]); + $this->assertEquals(2, $batch->pendingJobs); + + $batch->recordSuccessfulJob('test-id'); + $batch->recordSuccessfulJob('test-id'); + + $this->assertInstanceOf(Batch::class, $_SERVER['__finally.batch']); + $this->assertInstanceOf(Batch::class, $_SERVER['__progress.batch']); + $this->assertInstanceOf(Batch::class, $_SERVER['__then.batch']); + + $batch = $batch->fresh(); + $this->assertEquals(0, $batch->pendingJobs); + $this->assertTrue($batch->finished()); + $this->assertEquals(1, $_SERVER['__finally.count']); + $this->assertEquals(2, $_SERVER['__progress.count']); + $this->assertEquals(1, $_SERVER['__then.count']); + } + + /** @see BusBatchTest::test_failed_jobs_can_be_recorded_while_not_allowing_failures */ + public function testFailedJobsCanBeRecordedWhileNotAllowingFailures() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue, $allowFailures = false); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once(); + + $batch = $batch->add([$job, $secondJob]); + $this->assertEquals(2, $batch->pendingJobs); + + $batch->recordFailedJob('test-id', new RuntimeException('Something went wrong.')); + $batch->recordFailedJob('test-id', new RuntimeException('Something else went wrong.')); + + $this->assertInstanceOf(Batch::class, $_SERVER['__finally.batch']); + $this->assertFalse(isset($_SERVER['__then.batch'])); + + $batch = $batch->fresh(); + $this->assertEquals(2, $batch->pendingJobs); + $this->assertEquals(2, $batch->failedJobs); + $this->assertTrue($batch->finished()); + $this->assertTrue($batch->cancelled()); + $this->assertEquals(1, $_SERVER['__finally.count']); + $this->assertEquals(0, $_SERVER['__progress.count']); + $this->assertEquals(1, $_SERVER['__catch.count']); + $this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage()); + } + + /** @see BusBatchTest::test_failed_jobs_can_be_recorded_while_allowing_failures */ + public function testFailedJobsCanBeRecordedWhileAllowingFailures() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue, $allowFailures = true); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once(); + + $batch = $batch->add([$job, $secondJob]); + $this->assertEquals(2, $batch->pendingJobs); + + $batch->recordFailedJob('test-id', new RuntimeException('Something went wrong.')); + $batch->recordFailedJob('test-id', new RuntimeException('Something else went wrong.')); + + // While allowing failures this batch never actually completes... + $this->assertFalse(isset($_SERVER['__then.batch'])); + + $batch = $batch->fresh(); + $this->assertEquals(2, $batch->pendingJobs); + $this->assertEquals(2, $batch->failedJobs); + $this->assertFalse($batch->finished()); + $this->assertFalse($batch->cancelled()); + $this->assertEquals(1, $_SERVER['__catch.count']); + $this->assertEquals(2, $_SERVER['__progress.count']); + $this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage()); + } + + /** @see BusBatchTest::test_batch_can_be_cancelled */ + public function testBatchCanBeCancelled() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $batch->cancel(); + + $batch = $batch->fresh(); + + $this->assertTrue($batch->cancelled()); + } + + /** @see BusBatchTest::test_batch_can_be_deleted */ + public function testBatchCanBeDeleted() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $batch->delete(); + + $batch = $batch->fresh(); + + $this->assertNull($batch); + } + + /** @see BusBatchTest::test_batch_state_can_be_inspected */ + public function testBatchStateCanBeInspected() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $this->assertFalse($batch->finished()); + $batch->finishedAt = now(); + $this->assertTrue($batch->finished()); + + $batch->options['progress'] = []; + $this->assertFalse($batch->hasProgressCallbacks()); + $batch->options['progress'] = [1]; + $this->assertTrue($batch->hasProgressCallbacks()); + + $batch->options['then'] = []; + $this->assertFalse($batch->hasThenCallbacks()); + $batch->options['then'] = [1]; + $this->assertTrue($batch->hasThenCallbacks()); + + $this->assertFalse($batch->allowsFailures()); + $batch->options['allowFailures'] = true; + $this->assertTrue($batch->allowsFailures()); + + $this->assertFalse($batch->hasFailures()); + $batch->failedJobs = 1; + $this->assertTrue($batch->hasFailures()); + + $batch->options['catch'] = []; + $this->assertFalse($batch->hasCatchCallbacks()); + $batch->options['catch'] = [1]; + $this->assertTrue($batch->hasCatchCallbacks()); + + $this->assertFalse($batch->cancelled()); + $batch->cancelledAt = now(); + $this->assertTrue($batch->cancelled()); + + $this->assertIsString(json_encode($batch)); + } + + /** @see BusBatchTest:test_chain_can_be_added_to_batch: */ + public function testChainCanBeAddedToBatch() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $chainHeadJob = new ChainHeadJob(); + + $secondJob = new SecondTestJob(); + + $thirdJob = new ThirdTestJob(); + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once()->with(m::on(function ($args) use ($chainHeadJob, $secondJob, $thirdJob) { + return $args[0] === $chainHeadJob + && serialize($secondJob) === $args[0]->chained[0] + && serialize($thirdJob) === $args[0]->chained[1]; + }), '', 'test-queue'); + + $batch = $batch->add([ + [$chainHeadJob, $secondJob, $thirdJob], + ]); + + $this->assertEquals(3, $batch->totalJobs); + $this->assertEquals(3, $batch->pendingJobs); + $this->assertSame('test-queue', $chainHeadJob->chainQueue); + $this->assertIsString($chainHeadJob->batchId); + $this->assertIsString($secondJob->batchId); + $this->assertIsString($thirdJob->batchId); + $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt); + } + /** @see BusBatchTest::createTestBatch() */ - private function createTestBatch($queue, $allowFailures = false) + private function createTestBatch(Factory $queue, $allowFailures = false) { $connection = DB::connection('mongodb'); $this->assertInstanceOf(Connection::class, $connection); From f882b29d454597addf19a44832cd95febac5c008 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Thu, 2 May 2024 17:14:52 +0200 Subject: [PATCH 05/15] - --- composer.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 8c038819e..c2faf5333 100644 --- a/composer.json +++ b/composer.json @@ -62,7 +62,8 @@ "laravel": { "providers": [ "MongoDB\\Laravel\\MongoDBServiceProvider", - "MongoDB\\Laravel\\MongoDBQueueServiceProvider" + "MongoDB\\Laravel\\MongoDBQueueServiceProvider", + "MongoDB\\Laravel\\MongoDBBusServiceProvider" ] } }, From a711c4d79693238a1ba0c4e92410c6388ff51729 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Thu, 2 May 2024 19:53:19 +0200 Subject: [PATCH 06/15] Update docs and review option names --- docs/queues.txt | 94 ++++++++++++++++++++++++++--- src/MongoDBQueueServiceProvider.php | 14 ++++- src/Queue/MongoConnector.php | 20 +++++- 3 files changed, 117 insertions(+), 11 deletions(-) diff --git a/docs/queues.txt b/docs/queues.txt index 330662913..f8032314d 100644 --- a/docs/queues.txt +++ b/docs/queues.txt @@ -11,7 +11,7 @@ Queues .. meta:: :keywords: php framework, odm, code example -If you want to use MongoDB as your database backend for Laravel Queue, change +If you want to use MongoDB as your database backend for Laravel Queue, change the driver in ``config/queue.php``: .. code-block:: php @@ -20,27 +20,107 @@ the driver in ``config/queue.php``: 'database' => [ 'driver' => 'mongodb', // You can also specify your jobs specific database created on config/database.php - 'connection' => 'mongodb-job', - 'table' => 'jobs', + 'connection' => 'mongodb', + 'collection' => 'jobs', 'queue' => 'default', 'expire' => 60, ], ], -If you want to use MongoDB to handle failed jobs, change the database in +.. list-table:: + :header-rows: 1 + :widths: 25 75 + + * - Setting + - Description + + * - ``driver`` + - **Required**. Specifies the queue driver to use. Must be ``mongodb``. + + * - ``connection`` + - Uses the default connection by default. The database connection used to store jobs. It must be a ``mongodb`` connection. + + * - ``collection`` + - **Required**. Name of the MongoDB collection to store jobs to process. + + * - ``queue`` + - **Required**. Name of the queue. + + * - ``retry_after`` + - Default ``60``. Specify how many seconds the queue connection should wait before retrying a job that is being processed + +If you want to use MongoDB to handle failed jobs, change the database in ``config/queue.php``: .. code-block:: php 'failed' => [ 'driver' => 'mongodb', - // You can also specify your jobs specific database created on config/database.php - 'database' => 'mongodb-job', - 'table' => 'failed_jobs', + 'database' => 'mongodb', + 'collection' => 'failed_jobs', ], +.. list-table:: + :header-rows: 1 + :widths: 25 75 + + * - Setting + - Description + + * - ``driver`` + - **Required**. Specifies the queue driver to use. Must be ``mongodb``. + + * - ``connection`` + - Uses the default connection by default. The database connection used to store jobs. It must be a ``mongodb`` connection. + + * - ``collection`` + - Default ``failed_jobs``. Name of the MongoDB collection to store failed jobs. + + Add the service provider in ``config/app.php``: .. code-block:: php MongoDB\Laravel\MongoDBQueueServiceProvider::class, + + +Job Batching +------------ + +`Laravel's job batching https://laravel.com/docs/{+laravel-docs-version+}/queues#job-batching>`__ +feature allows you to easily execute a batch of jobs and then perform some action +when the batch of jobs has completed executing. + +With MongoDB, you don't have to create any collection before using this feature. +The collection ``job_batches`` will be created automatically to store meta +information about your job batches, such as their completion percentage + +.. code-block:: php + + 'batching' => [ + 'driver' => 'mongodb', + 'database' => 'mongodb', + 'collection' => 'job_batches', + ], + +.. list-table:: + :header-rows: 1 + :widths: 25 75 + + * - Setting + - Description + + * - ``driver`` + - **Required**. Specifies the queue driver to use. Must be ``mongodb``. + + * - ``connection`` + - Uses the default connection by default. The database connection used to store jobs. It must be a ``mongodb`` connection. + + * - ``collection`` + - Default ``job_batches``. Name of the MongoDB collection to store job batches. + +Add the service provider in ``config/app.php``: + +.. code-block:: php + + MongoDB\Laravel\MongoDBBusServiceProvider::class, diff --git a/src/MongoDBQueueServiceProvider.php b/src/MongoDBQueueServiceProvider.php index aa67f7405..ea7a06176 100644 --- a/src/MongoDBQueueServiceProvider.php +++ b/src/MongoDBQueueServiceProvider.php @@ -9,6 +9,9 @@ use MongoDB\Laravel\Queue\Failed\MongoFailedJobProvider; use function array_key_exists; +use function trigger_error; + +use const E_USER_DEPRECATED; class MongoDBQueueServiceProvider extends QueueServiceProvider { @@ -51,6 +54,15 @@ protected function registerFailedJobServices() */ protected function mongoFailedJobProvider(array $config): MongoFailedJobProvider { - return new MongoFailedJobProvider($this->app['db'], $config['database'], $config['table']); + if (! isset($config['collection']) && isset($config['table'])) { + trigger_error('Since mongodb/laravel-mongodb 4.4: Using "table" option for the queue is deprecated. Use "collection" instead.', E_USER_DEPRECATED); + $config['collection'] = $config['table']; + } + + return new MongoFailedJobProvider( + $this->app['db'], + $config['database'] ?? null, + $config['collection'] ?? 'failed_jobs', + ); } } diff --git a/src/Queue/MongoConnector.php b/src/Queue/MongoConnector.php index 4f987694a..be51d4fe1 100644 --- a/src/Queue/MongoConnector.php +++ b/src/Queue/MongoConnector.php @@ -8,6 +8,10 @@ use Illuminate\Database\ConnectionResolverInterface; use Illuminate\Queue\Connectors\ConnectorInterface; +use function trigger_error; + +use const E_USER_DEPRECATED; + class MongoConnector implements ConnectorInterface { /** @@ -32,11 +36,21 @@ public function __construct(ConnectionResolverInterface $connections) */ public function connect(array $config) { + if (! isset($config['collection']) && isset($config['table'])) { + trigger_error('Since mongodb/laravel-mongodb 4.4: Using "table" option in queue configuration is deprecated. Use "collection" instead.', E_USER_DEPRECATED); + $config['collection'] = $config['table']; + } + + if (! isset($config['retry_after']) && isset($config['expire'])) { + trigger_error('Since mongodb/laravel-mongodb 4.4: Using "expire" option in queue configuration is deprecated. Use "retry_after" instead.', E_USER_DEPRECATED); + $config['retry_after'] = $config['expire']; + } + return new MongoQueue( $this->connections->connection($config['connection'] ?? null), - $config['table'], - $config['queue'], - $config['expire'] ?? 60, + $config['collection'] ?? 'jobs', + $config['queue'] ?? 'default', + $config['retry_after'] ?? 60, ); } } From 4a3037054ba802b69c5d67e360199f433d260584 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Fri, 3 May 2024 11:16:45 +0200 Subject: [PATCH 07/15] Adjust constraint for PendingBatch::progress https://github.com/laravel/framework/releases/tag/v10.37.2 --- composer.json | 3 +++ 1 file changed, 3 insertions(+) diff --git a/composer.json b/composer.json index c2faf5333..84229b00f 100644 --- a/composer.json +++ b/composer.json @@ -41,6 +41,9 @@ "spatie/laravel-query-builder": "^5.6", "phpstan/phpstan": "^1.10" }, + "conflict": { + "illuminate/bus": "< 10.37.2" + }, "suggest": { "mongodb/builder": "Provides a fluent aggregation builder for MongoDB pipelines" }, From d2b494241b478a83ca4efab98525f8e9673c31b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Fri, 3 May 2024 14:33:33 +0200 Subject: [PATCH 08/15] Update changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ec142b46..2e9a74e9d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ All notable changes to this project will be documented in this file. ## [4.4.0] - unreleased * Support collection name prefix by @GromNaN in [#2930](https://github.com/mongodb/laravel-mongodb/pull/2930) +* Add `mongodb` driver for Batching by @GromNaN in [#2904](https://github.com/mongodb/laravel-mongodb/pull/2904) +* Rename queue option `table` to `collection` ## [4.3.0] - 2024-04-26 From 8dbdbd3e416c22e04212422915a2aa65a59a88f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Mon, 6 May 2024 20:33:33 +0200 Subject: [PATCH 09/15] Doc fix --- docs/queues.txt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/queues.txt b/docs/queues.txt index f8032314d..deb8a0829 100644 --- a/docs/queues.txt +++ b/docs/queues.txt @@ -87,13 +87,13 @@ Add the service provider in ``config/app.php``: Job Batching ------------ -`Laravel's job batching https://laravel.com/docs/{+laravel-docs-version+}/queues#job-batching>`__ -feature allows you to easily execute a batch of jobs and then perform some action -when the batch of jobs has completed executing. +`Job batching https://laravel.com/docs/{+laravel-docs-version+}/queues#job-batching>`__ +is a Laravel feature to execute batch of jobs and subsequent actions before, +after and during the execution of the jobs from the queue. With MongoDB, you don't have to create any collection before using this feature. The collection ``job_batches`` will be created automatically to store meta -information about your job batches, such as their completion percentage +information about your job batches, such as their completion percentage. .. code-block:: php From c542ed8ea8a4abb05853baba63b179b734bc66d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Mon, 6 May 2024 20:42:09 +0200 Subject: [PATCH 10/15] Add getUTCDateTime using Carbon::now --- src/Bus/MongoBatchRepository.php | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php index 40b3028a3..088a3d93c 100644 --- a/src/Bus/MongoBatchRepository.php +++ b/src/Bus/MongoBatchRepository.php @@ -20,7 +20,6 @@ use MongoDB\Operation\FindOneAndUpdate; use Override; -use function date_default_timezone_get; use function is_string; use function serialize; use function unserialize; @@ -66,6 +65,9 @@ public function find(string $batchId): ?Batch $batch = $this->collection->findOne( ['_id' => $batchId], [ + // If the select query is executed faster than the database replication takes place, + // then no batch is found. In that case an exception is thrown because jobs are added + // to a null batch. 'readPreference' => new ReadPreference(ReadPreference::PRIMARY), 'typeMap' => ['root' => 'array', 'array' => 'array', 'document' => 'array'], ], @@ -85,7 +87,7 @@ public function store(PendingBatch $batch): Batch 'failed_job_ids' => [], // Serialization is required for Closures 'options' => serialize($batch->options), - 'created_at' => new UTCDateTime(Carbon::now()), + 'created_at' => $this->getUTCDateTime(), 'cancelled_at' => null, 'finished_at' => null, ]); @@ -161,7 +163,7 @@ public function markAsFinished(string $batchId): void $batchId = new ObjectId($batchId); $this->collection->updateOne( ['_id' => $batchId], - ['$set' => ['finished_at' => new UTCDateTime(Carbon::now())]], + ['$set' => ['finished_at' => $this->getUTCDateTime()]], ); } @@ -173,8 +175,8 @@ public function cancel(string $batchId): void ['_id' => $batchId], [ '$set' => [ - 'cancelled_at' => new UTCDateTime(Carbon::now()), - 'finished_at' => new UTCDateTime(Carbon::now()), + 'cancelled_at' => $this->getUTCDateTime(), + 'finished_at' => $this->getUTCDateTime(), ], ], ); @@ -194,16 +196,14 @@ public function transaction(Closure $callback): mixed return $this->connection->transaction(fn () => $callback()); } - /** - * Rollback the last database transaction for the connection. - */ + /** Rollback the last database transaction for the connection. */ #[Override] public function rollBack(): void { $this->connection->rollBack(); } - /** Mark the batch that has the given ID as finished. */ + /** Prune the entries older than the given date. */ #[Override] public function prune(DateTimeInterface $before): int { @@ -258,6 +258,12 @@ protected function toBatch($batch): Batch ); } + private function getUTCDateTime(): UTCDateTime + { + // Using Carbon so the current time can be modified for tests + return new UTCDateTime(Carbon::now()); + } + /** @return ($date is null ? null : CarbonImmutable) */ private function toCarbon(?UTCDateTime $date): ?CarbonImmutable { @@ -265,6 +271,6 @@ private function toCarbon(?UTCDateTime $date): ?CarbonImmutable return null; } - return CarbonImmutable::createFromTimestamp((string) $date, date_default_timezone_get()); + return CarbonImmutable::createFromTimestampMsUTC((string) $date); } } From 212a8cc5eafb9a235444e3cd0e4488b4883bc318 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Mon, 20 May 2024 14:39:18 +0200 Subject: [PATCH 11/15] Wrapping callback not necessary --- src/Bus/MongoBatchRepository.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php index 088a3d93c..8c2308d29 100644 --- a/src/Bus/MongoBatchRepository.php +++ b/src/Bus/MongoBatchRepository.php @@ -193,7 +193,7 @@ public function delete(string $batchId): void #[Override] public function transaction(Closure $callback): mixed { - return $this->connection->transaction(fn () => $callback()); + return $this->connection->transaction($callback); } /** Rollback the last database transaction for the connection. */ From c02c7ca8d77a72b0ce2c5bea2b1566a86b8965b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Mon, 20 May 2024 14:55:24 +0200 Subject: [PATCH 12/15] Update phpstan baseline --- phpstan-baseline.neon | 5 +++++ src/MongoDBBusServiceProvider.php | 15 ++++----------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 99579fa0a..fdef24410 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,5 +1,10 @@ parameters: ignoreErrors: + - + message: "#^Access to an undefined property Illuminate\\\\Container\\\\Container\\:\\:\\$config\\.$#" + count: 3 + path: src/MongoDBBusServiceProvider.php + - message: "#^Method Illuminate\\\\Database\\\\Eloquent\\\\Model\\:\\:push\\(\\) invoked with 3 parameters, 0 required\\.$#" count: 2 diff --git a/src/MongoDBBusServiceProvider.php b/src/MongoDBBusServiceProvider.php index 2e4d81cd2..c77ccd118 100644 --- a/src/MongoDBBusServiceProvider.php +++ b/src/MongoDBBusServiceProvider.php @@ -29,17 +29,10 @@ public function register() $this->app->extend(BatchRepository::class, function (BatchRepository $repository, Container $app) { $driver = $app->config->get('queue.batching.driver'); - return $driver === 'mongodb' - ? $app->make(MongoBatchRepository::class) - : $repository; - }); - // Add database driver. - $this->app->resolving('db', function ($db) { - $db->extend('mongodb', function ($config, $name) { - $config['name'] = $name; - - return new Connection($config); - }); + return match ($driver) { + 'mongodb' => $app->make(MongoBatchRepository::class), + default => $repository, + }; }); } From b912ce72375f778aada117b1167221dcc5eea3e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Mon, 20 May 2024 16:41:49 +0200 Subject: [PATCH 13/15] Skip find query when storing a batch --- docs/queues.txt | 2 +- src/Bus/MongoBatchRepository.php | 8 +++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/queues.txt b/docs/queues.txt index deb8a0829..81629b433 100644 --- a/docs/queues.txt +++ b/docs/queues.txt @@ -89,7 +89,7 @@ Job Batching `Job batching https://laravel.com/docs/{+laravel-docs-version+}/queues#job-batching>`__ is a Laravel feature to execute batch of jobs and subsequent actions before, -after and during the execution of the jobs from the queue. +after, and during the execution of the jobs from the queue. With MongoDB, you don't have to create any collection before using this feature. The collection ``job_batches`` will be created automatically to store meta diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php index 8c2308d29..dd0713f97 100644 --- a/src/Bus/MongoBatchRepository.php +++ b/src/Bus/MongoBatchRepository.php @@ -79,7 +79,7 @@ public function find(string $batchId): ?Batch #[Override] public function store(PendingBatch $batch): Batch { - $result = $this->collection->insertOne([ + $batch = [ 'name' => $batch->name, 'total_jobs' => 0, 'pending_jobs' => 0, @@ -90,9 +90,10 @@ public function store(PendingBatch $batch): Batch 'created_at' => $this->getUTCDateTime(), 'cancelled_at' => null, 'finished_at' => null, - ]); + ]; + $result = $this->collection->insertOne($batch); - return $this->find($result->getInsertedId()); + return $this->toBatch(['_id' => $result->getInsertedId()] + $batch); } #[Override] @@ -240,6 +241,7 @@ public function pruneCancelled(DateTimeInterface $before): int return $result->getDeletedCount(); } + /** @param array $batch */ #[Override] protected function toBatch($batch): Batch { From 5cfed4b8ec47caf7f88458d849c35686974b2a47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Mon, 20 May 2024 16:52:15 +0200 Subject: [PATCH 14/15] Update docs --- CHANGELOG.md | 1 + docs/queues.txt | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e9a74e9d..318e340e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ All notable changes to this project will be documented in this file. * Support collection name prefix by @GromNaN in [#2930](https://github.com/mongodb/laravel-mongodb/pull/2930) * Add `mongodb` driver for Batching by @GromNaN in [#2904](https://github.com/mongodb/laravel-mongodb/pull/2904) * Rename queue option `table` to `collection` +* Replace queue option `expire` with `retry_after` ## [4.3.0] - 2024-04-26 diff --git a/docs/queues.txt b/docs/queues.txt index 81629b433..0a647f0b7 100644 --- a/docs/queues.txt +++ b/docs/queues.txt @@ -23,7 +23,7 @@ the driver in ``config/queue.php``: 'connection' => 'mongodb', 'collection' => 'jobs', 'queue' => 'default', - 'expire' => 60, + 'retry_after' => 60, ], ], From d0fd299b8ef27c5d00b7348549ba511661e7a129 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Tamarelle?= Date: Tue, 21 May 2024 09:09:25 +0200 Subject: [PATCH 15/15] Apply suggestions from code review Co-authored-by: Jordan Smith <45415425+jordan-smith721@users.noreply.github.com> --- docs/queues.txt | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/docs/queues.txt b/docs/queues.txt index 0a647f0b7..ccac29ba6 100644 --- a/docs/queues.txt +++ b/docs/queues.txt @@ -38,7 +38,7 @@ the driver in ``config/queue.php``: - **Required**. Specifies the queue driver to use. Must be ``mongodb``. * - ``connection`` - - Uses the default connection by default. The database connection used to store jobs. It must be a ``mongodb`` connection. + - The database connection used to store jobs. It must be a ``mongodb`` connection. The driver uses the default connection if a connection is not specified. * - ``collection`` - **Required**. Name of the MongoDB collection to store jobs to process. @@ -47,7 +47,7 @@ the driver in ``config/queue.php``: - **Required**. Name of the queue. * - ``retry_after`` - - Default ``60``. Specify how many seconds the queue connection should wait before retrying a job that is being processed + - Specifies how many seconds the queue connection should wait before retrying a job that is being processed. Defaults to ``60``. If you want to use MongoDB to handle failed jobs, change the database in ``config/queue.php``: @@ -71,10 +71,10 @@ If you want to use MongoDB to handle failed jobs, change the database in - **Required**. Specifies the queue driver to use. Must be ``mongodb``. * - ``connection`` - - Uses the default connection by default. The database connection used to store jobs. It must be a ``mongodb`` connection. + - The database connection used to store jobs. It must be a ``mongodb`` connection. The driver uses the default connection if a connection is not specified. * - ``collection`` - - Default ``failed_jobs``. Name of the MongoDB collection to store failed jobs. + - Name of the MongoDB collection to store failed jobs. Defaults to ``failed_jobs``. Add the service provider in ``config/app.php``: @@ -87,12 +87,12 @@ Add the service provider in ``config/app.php``: Job Batching ------------ -`Job batching https://laravel.com/docs/{+laravel-docs-version+}/queues#job-batching>`__ -is a Laravel feature to execute batch of jobs and subsequent actions before, +`Job batching `__ +is a Laravel feature to execute a batch of jobs and subsequent actions before, after, and during the execution of the jobs from the queue. -With MongoDB, you don't have to create any collection before using this feature. -The collection ``job_batches`` will be created automatically to store meta +With MongoDB, you don't have to create any collection before using job batching. +The ``job_batches`` collection is created automatically to store meta information about your job batches, such as their completion percentage. .. code-block:: php @@ -114,10 +114,10 @@ information about your job batches, such as their completion percentage. - **Required**. Specifies the queue driver to use. Must be ``mongodb``. * - ``connection`` - - Uses the default connection by default. The database connection used to store jobs. It must be a ``mongodb`` connection. + - The database connection used to store jobs. It must be a ``mongodb`` connection. The driver uses the default connection if a connection is not specified. * - ``collection`` - - Default ``job_batches``. Name of the MongoDB collection to store job batches. + - Name of the MongoDB collection to store job batches. Defaults to ``job_batches``. Add the service provider in ``config/app.php``: