diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ec142b46..318e340e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. ## [4.4.0] - unreleased * Support collection name prefix by @GromNaN in [#2930](https://github.com/mongodb/laravel-mongodb/pull/2930) +* Add `mongodb` driver for Batching by @GromNaN in [#2904](https://github.com/mongodb/laravel-mongodb/pull/2904) +* Rename queue option `table` to `collection` +* Replace queue option `expire` with `retry_after` ## [4.3.0] - 2024-04-26 diff --git a/composer.json b/composer.json index 8c038819e..84229b00f 100644 --- a/composer.json +++ b/composer.json @@ -41,6 +41,9 @@ "spatie/laravel-query-builder": "^5.6", "phpstan/phpstan": "^1.10" }, + "conflict": { + "illuminate/bus": "< 10.37.2" + }, "suggest": { "mongodb/builder": "Provides a fluent aggregation builder for MongoDB pipelines" }, @@ -62,7 +65,8 @@ "laravel": { "providers": [ "MongoDB\\Laravel\\MongoDBServiceProvider", - "MongoDB\\Laravel\\MongoDBQueueServiceProvider" + "MongoDB\\Laravel\\MongoDBQueueServiceProvider", + "MongoDB\\Laravel\\MongoDBBusServiceProvider" ] } }, diff --git a/docs/queues.txt b/docs/queues.txt index 330662913..ccac29ba6 100644 --- a/docs/queues.txt +++ b/docs/queues.txt @@ -11,7 +11,7 @@ Queues .. meta:: :keywords: php framework, odm, code example -If you want to use MongoDB as your database backend for Laravel Queue, change +If you want to use MongoDB as your database backend for Laravel Queue, change the driver in ``config/queue.php``: .. code-block:: php @@ -20,27 +20,107 @@ the driver in ``config/queue.php``: 'database' => [ 'driver' => 'mongodb', // You can also specify your jobs specific database created on config/database.php - 'connection' => 'mongodb-job', - 'table' => 'jobs', + 'connection' => 'mongodb', + 'collection' => 'jobs', 'queue' => 'default', - 'expire' => 60, + 'retry_after' => 60, ], ], -If you want to use MongoDB to handle failed jobs, change the database in +.. list-table:: + :header-rows: 1 + :widths: 25 75 + + * - Setting + - Description + + * - ``driver`` + - **Required**. Specifies the queue driver to use. Must be ``mongodb``. + + * - ``connection`` + - The database connection used to store jobs. It must be a ``mongodb`` connection. The driver uses the default connection if a connection is not specified. + + * - ``collection`` + - **Required**. Name of the MongoDB collection to store jobs to process. + + * - ``queue`` + - **Required**. Name of the queue. + + * - ``retry_after`` + - Specifies how many seconds the queue connection should wait before retrying a job that is being processed. Defaults to ``60``. + +If you want to use MongoDB to handle failed jobs, change the database in ``config/queue.php``: .. code-block:: php 'failed' => [ 'driver' => 'mongodb', - // You can also specify your jobs specific database created on config/database.php - 'database' => 'mongodb-job', - 'table' => 'failed_jobs', + 'database' => 'mongodb', + 'collection' => 'failed_jobs', ], +.. list-table:: + :header-rows: 1 + :widths: 25 75 + + * - Setting + - Description + + * - ``driver`` + - **Required**. Specifies the queue driver to use. Must be ``mongodb``. + + * - ``connection`` + - The database connection used to store jobs. It must be a ``mongodb`` connection. The driver uses the default connection if a connection is not specified. + + * - ``collection`` + - Name of the MongoDB collection to store failed jobs. Defaults to ``failed_jobs``. + + Add the service provider in ``config/app.php``: .. code-block:: php MongoDB\Laravel\MongoDBQueueServiceProvider::class, + + +Job Batching +------------ + +`Job batching `__ +is a Laravel feature to execute a batch of jobs and subsequent actions before, +after, and during the execution of the jobs from the queue. + +With MongoDB, you don't have to create any collection before using job batching. +The ``job_batches`` collection is created automatically to store meta +information about your job batches, such as their completion percentage. + +.. code-block:: php + + 'batching' => [ + 'driver' => 'mongodb', + 'database' => 'mongodb', + 'collection' => 'job_batches', + ], + +.. list-table:: + :header-rows: 1 + :widths: 25 75 + + * - Setting + - Description + + * - ``driver`` + - **Required**. Specifies the queue driver to use. Must be ``mongodb``. + + * - ``connection`` + - The database connection used to store jobs. It must be a ``mongodb`` connection. The driver uses the default connection if a connection is not specified. + + * - ``collection`` + - Name of the MongoDB collection to store job batches. Defaults to ``job_batches``. + +Add the service provider in ``config/app.php``: + +.. code-block:: php + + MongoDB\Laravel\MongoDBBusServiceProvider::class, diff --git a/phpstan-baseline.neon b/phpstan-baseline.neon index 99579fa0a..fdef24410 100644 --- a/phpstan-baseline.neon +++ b/phpstan-baseline.neon @@ -1,5 +1,10 @@ parameters: ignoreErrors: + - + message: "#^Access to an undefined property Illuminate\\\\Container\\\\Container\\:\\:\\$config\\.$#" + count: 3 + path: src/MongoDBBusServiceProvider.php + - message: "#^Method Illuminate\\\\Database\\\\Eloquent\\\\Model\\:\\:push\\(\\) invoked with 3 parameters, 0 required\\.$#" count: 2 diff --git a/src/Bus/MongoBatchRepository.php b/src/Bus/MongoBatchRepository.php new file mode 100644 index 000000000..dd0713f97 --- /dev/null +++ b/src/Bus/MongoBatchRepository.php @@ -0,0 +1,278 @@ +collection = $connection->getCollection($collection); + + parent::__construct($factory, $connection, $collection); + } + + #[Override] + public function get($limit = 50, $before = null): array + { + if (is_string($before)) { + $before = new ObjectId($before); + } + + return $this->collection->find( + $before ? ['_id' => ['$lt' => $before]] : [], + [ + 'limit' => $limit, + 'sort' => ['_id' => -1], + 'typeMap' => ['root' => 'array', 'document' => 'array', 'array' => 'array'], + ], + )->toArray(); + } + + #[Override] + public function find(string $batchId): ?Batch + { + $batchId = new ObjectId($batchId); + + $batch = $this->collection->findOne( + ['_id' => $batchId], + [ + // If the select query is executed faster than the database replication takes place, + // then no batch is found. In that case an exception is thrown because jobs are added + // to a null batch. + 'readPreference' => new ReadPreference(ReadPreference::PRIMARY), + 'typeMap' => ['root' => 'array', 'array' => 'array', 'document' => 'array'], + ], + ); + + return $batch ? $this->toBatch($batch) : null; + } + + #[Override] + public function store(PendingBatch $batch): Batch + { + $batch = [ + 'name' => $batch->name, + 'total_jobs' => 0, + 'pending_jobs' => 0, + 'failed_jobs' => 0, + 'failed_job_ids' => [], + // Serialization is required for Closures + 'options' => serialize($batch->options), + 'created_at' => $this->getUTCDateTime(), + 'cancelled_at' => null, + 'finished_at' => null, + ]; + $result = $this->collection->insertOne($batch); + + return $this->toBatch(['_id' => $result->getInsertedId()] + $batch); + } + + #[Override] + public function incrementTotalJobs(string $batchId, int $amount): void + { + $batchId = new ObjectId($batchId); + $this->collection->updateOne( + ['_id' => $batchId], + [ + '$inc' => [ + 'total_jobs' => $amount, + 'pending_jobs' => $amount, + ], + '$set' => [ + 'finished_at' => null, + ], + ], + ); + } + + #[Override] + public function decrementPendingJobs(string $batchId, string $jobId): UpdatedBatchJobCounts + { + $batchId = new ObjectId($batchId); + $values = $this->collection->findOneAndUpdate( + ['_id' => $batchId], + [ + '$inc' => ['pending_jobs' => -1], + '$pull' => ['failed_job_ids' => $jobId], + ], + [ + 'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1], + 'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER, + ], + ); + + return new UpdatedBatchJobCounts( + $values['pending_jobs'], + $values['failed_jobs'], + ); + } + + #[Override] + public function incrementFailedJobs(string $batchId, string $jobId): UpdatedBatchJobCounts + { + $batchId = new ObjectId($batchId); + $values = $this->collection->findOneAndUpdate( + ['_id' => $batchId], + [ + '$inc' => ['failed_jobs' => 1], + '$push' => ['failed_job_ids' => $jobId], + ], + [ + 'projection' => ['pending_jobs' => 1, 'failed_jobs' => 1], + 'returnDocument' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER, + ], + ); + + return new UpdatedBatchJobCounts( + $values['pending_jobs'], + $values['failed_jobs'], + ); + } + + #[Override] + public function markAsFinished(string $batchId): void + { + $batchId = new ObjectId($batchId); + $this->collection->updateOne( + ['_id' => $batchId], + ['$set' => ['finished_at' => $this->getUTCDateTime()]], + ); + } + + #[Override] + public function cancel(string $batchId): void + { + $batchId = new ObjectId($batchId); + $this->collection->updateOne( + ['_id' => $batchId], + [ + '$set' => [ + 'cancelled_at' => $this->getUTCDateTime(), + 'finished_at' => $this->getUTCDateTime(), + ], + ], + ); + } + + #[Override] + public function delete(string $batchId): void + { + $batchId = new ObjectId($batchId); + $this->collection->deleteOne(['_id' => $batchId]); + } + + /** Execute the given Closure within a storage specific transaction. */ + #[Override] + public function transaction(Closure $callback): mixed + { + return $this->connection->transaction($callback); + } + + /** Rollback the last database transaction for the connection. */ + #[Override] + public function rollBack(): void + { + $this->connection->rollBack(); + } + + /** Prune the entries older than the given date. */ + #[Override] + public function prune(DateTimeInterface $before): int + { + $result = $this->collection->deleteMany( + ['finished_at' => ['$ne' => null, '$lt' => new UTCDateTime($before)]], + ); + + return $result->getDeletedCount(); + } + + /** Prune all the unfinished entries older than the given date. */ + public function pruneUnfinished(DateTimeInterface $before): int + { + $result = $this->collection->deleteMany( + [ + 'finished_at' => null, + 'created_at' => ['$lt' => new UTCDateTime($before)], + ], + ); + + return $result->getDeletedCount(); + } + + /** Prune all the cancelled entries older than the given date. */ + public function pruneCancelled(DateTimeInterface $before): int + { + $result = $this->collection->deleteMany( + [ + 'cancelled_at' => ['$ne' => null], + 'created_at' => ['$lt' => new UTCDateTime($before)], + ], + ); + + return $result->getDeletedCount(); + } + + /** @param array $batch */ + #[Override] + protected function toBatch($batch): Batch + { + return $this->factory->make( + $this, + $batch['_id'], + $batch['name'], + $batch['total_jobs'], + $batch['pending_jobs'], + $batch['failed_jobs'], + $batch['failed_job_ids'], + unserialize($batch['options']), + $this->toCarbon($batch['created_at']), + $this->toCarbon($batch['cancelled_at']), + $this->toCarbon($batch['finished_at']), + ); + } + + private function getUTCDateTime(): UTCDateTime + { + // Using Carbon so the current time can be modified for tests + return new UTCDateTime(Carbon::now()); + } + + /** @return ($date is null ? null : CarbonImmutable) */ + private function toCarbon(?UTCDateTime $date): ?CarbonImmutable + { + if ($date === null) { + return null; + } + + return CarbonImmutable::createFromTimestampMsUTC((string) $date); + } +} diff --git a/src/MongoDBBusServiceProvider.php b/src/MongoDBBusServiceProvider.php new file mode 100644 index 000000000..c77ccd118 --- /dev/null +++ b/src/MongoDBBusServiceProvider.php @@ -0,0 +1,46 @@ +app->singleton(MongoBatchRepository::class, function (Container $app) { + return new MongoBatchRepository( + $app->make(BatchFactory::class), + $app->make('db')->connection($app->config->get('queue.batching.database')), + $app->config->get('queue.batching.collection', 'job_batches'), + ); + }); + + /** @see BusServiceProvider::registerBatchServices() */ + $this->app->extend(BatchRepository::class, function (BatchRepository $repository, Container $app) { + $driver = $app->config->get('queue.batching.driver'); + + return match ($driver) { + 'mongodb' => $app->make(MongoBatchRepository::class), + default => $repository, + }; + }); + } + + public function provides() + { + return [ + BatchRepository::class, + MongoBatchRepository::class, + ]; + } +} diff --git a/src/MongoDBQueueServiceProvider.php b/src/MongoDBQueueServiceProvider.php index aa67f7405..ea7a06176 100644 --- a/src/MongoDBQueueServiceProvider.php +++ b/src/MongoDBQueueServiceProvider.php @@ -9,6 +9,9 @@ use MongoDB\Laravel\Queue\Failed\MongoFailedJobProvider; use function array_key_exists; +use function trigger_error; + +use const E_USER_DEPRECATED; class MongoDBQueueServiceProvider extends QueueServiceProvider { @@ -51,6 +54,15 @@ protected function registerFailedJobServices() */ protected function mongoFailedJobProvider(array $config): MongoFailedJobProvider { - return new MongoFailedJobProvider($this->app['db'], $config['database'], $config['table']); + if (! isset($config['collection']) && isset($config['table'])) { + trigger_error('Since mongodb/laravel-mongodb 4.4: Using "table" option for the queue is deprecated. Use "collection" instead.', E_USER_DEPRECATED); + $config['collection'] = $config['table']; + } + + return new MongoFailedJobProvider( + $this->app['db'], + $config['database'] ?? null, + $config['collection'] ?? 'failed_jobs', + ); } } diff --git a/src/Queue/MongoConnector.php b/src/Queue/MongoConnector.php index 4f987694a..be51d4fe1 100644 --- a/src/Queue/MongoConnector.php +++ b/src/Queue/MongoConnector.php @@ -8,6 +8,10 @@ use Illuminate\Database\ConnectionResolverInterface; use Illuminate\Queue\Connectors\ConnectorInterface; +use function trigger_error; + +use const E_USER_DEPRECATED; + class MongoConnector implements ConnectorInterface { /** @@ -32,11 +36,21 @@ public function __construct(ConnectionResolverInterface $connections) */ public function connect(array $config) { + if (! isset($config['collection']) && isset($config['table'])) { + trigger_error('Since mongodb/laravel-mongodb 4.4: Using "table" option in queue configuration is deprecated. Use "collection" instead.', E_USER_DEPRECATED); + $config['collection'] = $config['table']; + } + + if (! isset($config['retry_after']) && isset($config['expire'])) { + trigger_error('Since mongodb/laravel-mongodb 4.4: Using "expire" option in queue configuration is deprecated. Use "retry_after" instead.', E_USER_DEPRECATED); + $config['retry_after'] = $config['expire']; + } + return new MongoQueue( $this->connections->connection($config['connection'] ?? null), - $config['table'], - $config['queue'], - $config['expire'] ?? 60, + $config['collection'] ?? 'jobs', + $config['queue'] ?? 'default', + $config['retry_after'] ?? 60, ); } } diff --git a/tests/Bus/Fixtures/ChainHeadJob.php b/tests/Bus/Fixtures/ChainHeadJob.php new file mode 100644 index 000000000..c964e59f9 --- /dev/null +++ b/tests/Bus/Fixtures/ChainHeadJob.php @@ -0,0 +1,15 @@ +getCollection('job_batches')->drop(); + + unset( + $_SERVER['__catch.batch'], + $_SERVER['__catch.count'], + $_SERVER['__catch.exception'], + $_SERVER['__finally.batch'], + $_SERVER['__finally.count'], + $_SERVER['__progress.batch'], + $_SERVER['__progress.count'], + $_SERVER['__then.batch'], + $_SERVER['__then.count'], + ); + + parent::tearDown(); + } + + /** @see BusBatchTest::test_jobs_can_be_added_to_the_batch */ + public function testJobsCanBeAddedToTheBatch(): void + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $thirdJob = function () { + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once()->with(m::on(function ($args) use ($job, $secondJob) { + return $args[0] === $job && + $args[1] === $secondJob && + $args[2] instanceof CallQueuedClosure + && is_string($args[2]->batchId); + }), '', 'test-queue'); + + $batch = $batch->add([$job, $secondJob, $thirdJob]); + + $this->assertEquals(3, $batch->totalJobs); + $this->assertEquals(3, $batch->pendingJobs); + $this->assertIsString($job->batchId); + $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt); + } + + /** @see BusBatchTest::test_successful_jobs_can_be_recorded */ + public function testSuccessfulJobsCanBeRecorded() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once(); + + $batch = $batch->add([$job, $secondJob]); + $this->assertEquals(2, $batch->pendingJobs); + + $batch->recordSuccessfulJob('test-id'); + $batch->recordSuccessfulJob('test-id'); + + $this->assertInstanceOf(Batch::class, $_SERVER['__finally.batch']); + $this->assertInstanceOf(Batch::class, $_SERVER['__progress.batch']); + $this->assertInstanceOf(Batch::class, $_SERVER['__then.batch']); + + $batch = $batch->fresh(); + $this->assertEquals(0, $batch->pendingJobs); + $this->assertTrue($batch->finished()); + $this->assertEquals(1, $_SERVER['__finally.count']); + $this->assertEquals(2, $_SERVER['__progress.count']); + $this->assertEquals(1, $_SERVER['__then.count']); + } + + /** @see BusBatchTest::test_failed_jobs_can_be_recorded_while_not_allowing_failures */ + public function testFailedJobsCanBeRecordedWhileNotAllowingFailures() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue, $allowFailures = false); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once(); + + $batch = $batch->add([$job, $secondJob]); + $this->assertEquals(2, $batch->pendingJobs); + + $batch->recordFailedJob('test-id', new RuntimeException('Something went wrong.')); + $batch->recordFailedJob('test-id', new RuntimeException('Something else went wrong.')); + + $this->assertInstanceOf(Batch::class, $_SERVER['__finally.batch']); + $this->assertFalse(isset($_SERVER['__then.batch'])); + + $batch = $batch->fresh(); + $this->assertEquals(2, $batch->pendingJobs); + $this->assertEquals(2, $batch->failedJobs); + $this->assertTrue($batch->finished()); + $this->assertTrue($batch->cancelled()); + $this->assertEquals(1, $_SERVER['__finally.count']); + $this->assertEquals(0, $_SERVER['__progress.count']); + $this->assertEquals(1, $_SERVER['__catch.count']); + $this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage()); + } + + /** @see BusBatchTest::test_failed_jobs_can_be_recorded_while_allowing_failures */ + public function testFailedJobsCanBeRecordedWhileAllowingFailures() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue, $allowFailures = true); + + $job = new class + { + use Batchable; + }; + + $secondJob = new class + { + use Batchable; + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once(); + + $batch = $batch->add([$job, $secondJob]); + $this->assertEquals(2, $batch->pendingJobs); + + $batch->recordFailedJob('test-id', new RuntimeException('Something went wrong.')); + $batch->recordFailedJob('test-id', new RuntimeException('Something else went wrong.')); + + // While allowing failures this batch never actually completes... + $this->assertFalse(isset($_SERVER['__then.batch'])); + + $batch = $batch->fresh(); + $this->assertEquals(2, $batch->pendingJobs); + $this->assertEquals(2, $batch->failedJobs); + $this->assertFalse($batch->finished()); + $this->assertFalse($batch->cancelled()); + $this->assertEquals(1, $_SERVER['__catch.count']); + $this->assertEquals(2, $_SERVER['__progress.count']); + $this->assertSame('Something went wrong.', $_SERVER['__catch.exception']->getMessage()); + } + + /** @see BusBatchTest::test_batch_can_be_cancelled */ + public function testBatchCanBeCancelled() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $batch->cancel(); + + $batch = $batch->fresh(); + + $this->assertTrue($batch->cancelled()); + } + + /** @see BusBatchTest::test_batch_can_be_deleted */ + public function testBatchCanBeDeleted() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $batch->delete(); + + $batch = $batch->fresh(); + + $this->assertNull($batch); + } + + /** @see BusBatchTest::test_batch_state_can_be_inspected */ + public function testBatchStateCanBeInspected() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $this->assertFalse($batch->finished()); + $batch->finishedAt = now(); + $this->assertTrue($batch->finished()); + + $batch->options['progress'] = []; + $this->assertFalse($batch->hasProgressCallbacks()); + $batch->options['progress'] = [1]; + $this->assertTrue($batch->hasProgressCallbacks()); + + $batch->options['then'] = []; + $this->assertFalse($batch->hasThenCallbacks()); + $batch->options['then'] = [1]; + $this->assertTrue($batch->hasThenCallbacks()); + + $this->assertFalse($batch->allowsFailures()); + $batch->options['allowFailures'] = true; + $this->assertTrue($batch->allowsFailures()); + + $this->assertFalse($batch->hasFailures()); + $batch->failedJobs = 1; + $this->assertTrue($batch->hasFailures()); + + $batch->options['catch'] = []; + $this->assertFalse($batch->hasCatchCallbacks()); + $batch->options['catch'] = [1]; + $this->assertTrue($batch->hasCatchCallbacks()); + + $this->assertFalse($batch->cancelled()); + $batch->cancelledAt = now(); + $this->assertTrue($batch->cancelled()); + + $this->assertIsString(json_encode($batch)); + } + + /** @see BusBatchTest:test_chain_can_be_added_to_batch: */ + public function testChainCanBeAddedToBatch() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $chainHeadJob = new ChainHeadJob(); + + $secondJob = new SecondTestJob(); + + $thirdJob = new ThirdTestJob(); + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once()->with(m::on(function ($args) use ($chainHeadJob, $secondJob, $thirdJob) { + return $args[0] === $chainHeadJob + && serialize($secondJob) === $args[0]->chained[0] + && serialize($thirdJob) === $args[0]->chained[1]; + }), '', 'test-queue'); + + $batch = $batch->add([ + [$chainHeadJob, $secondJob, $thirdJob], + ]); + + $this->assertEquals(3, $batch->totalJobs); + $this->assertEquals(3, $batch->pendingJobs); + $this->assertSame('test-queue', $chainHeadJob->chainQueue); + $this->assertIsString($chainHeadJob->batchId); + $this->assertIsString($secondJob->batchId); + $this->assertIsString($thirdJob->batchId); + $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt); + } + + /** @see BusBatchTest::createTestBatch() */ + private function createTestBatch(Factory $queue, $allowFailures = false) + { + $connection = DB::connection('mongodb'); + $this->assertInstanceOf(Connection::class, $connection); + + $repository = new MongoBatchRepository(new BatchFactory($queue), $connection, 'job_batches'); + + $pendingBatch = (new PendingBatch(new Container(), collect())) + ->progress(function (Batch $batch) { + $_SERVER['__progress.batch'] = $batch; + $_SERVER['__progress.count']++; + }) + ->then(function (Batch $batch) { + $_SERVER['__then.batch'] = $batch; + $_SERVER['__then.count']++; + }) + ->catch(function (Batch $batch, $e) { + $_SERVER['__catch.batch'] = $batch; + $_SERVER['__catch.exception'] = $e; + $_SERVER['__catch.count']++; + }) + ->finally(function (Batch $batch) { + $_SERVER['__finally.batch'] = $batch; + $_SERVER['__finally.count']++; + }) + ->allowFailures($allowFailures) + ->onConnection('test-connection') + ->onQueue('test-queue'); + + return $repository->store($pendingBatch); + } +}