Skip to content

Commit c9e3912

Browse files
committed
Added batch queue support.
1 parent b6a082c commit c9e3912

File tree

2 files changed

+46
-0
lines changed

2 files changed

+46
-0
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
namespace Jenssegers\Mongodb\Bus;
4+
5+
use Closure;
6+
use Illuminate\Bus\DatabaseBatchRepository;
7+
8+
class MongoDatabaseBatchRepository extends DatabaseBatchRepository {
9+
protected function updateAtomicValues(string $batchId, Closure $callback) {
10+
return $this->connection->transaction(function () use ($batchId, $callback) {
11+
$batch = (object)$this->connection->table($this->table)->where('id', $batchId)
12+
->lockForUpdate()
13+
->first();
14+
15+
return is_null($batch) ? [] : tap($callback($batch), function ($values) use ($batchId) {
16+
$this->connection->table($this->table)->where('id', $batchId)->update($values);
17+
});
18+
});
19+
}
20+
21+
public function incrementTotalJobs(string $batchId, int $amount) {
22+
$this->connection->table($this->table)->where('id', $batchId)->update([
23+
'$set' => [
24+
'finished_at' => null
25+
],
26+
'$inc' => [
27+
'total_jobs' => $amount,
28+
'pending_jobs' => $amount
29+
]
30+
]);
31+
}
32+
33+
protected function toBatch($batch) {
34+
return parent::toBatch((object)$batch);
35+
}
36+
}

src/MongodbServiceProvider.php

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
namespace Jenssegers\Mongodb;
44

5+
use Illuminate\Bus\{BatchFactory, DatabaseBatchRepository};
56
use Illuminate\Support\ServiceProvider;
7+
use Jenssegers\Mongodb\Bus\MongoDatabaseBatchRepository;
68
use Jenssegers\Mongodb\Eloquent\Model;
79
use Jenssegers\Mongodb\Queue\MongoConnector;
810

@@ -38,5 +40,13 @@ public function register()
3840
return new MongoConnector($this->app['db']);
3941
});
4042
});
43+
44+
$this->app->extend(DatabaseBatchRepository::class, function ($command, $app) {
45+
return new MongoDatabaseBatchRepository(
46+
$app->make(BatchFactory::class),
47+
$app->make('db')->connection($app->config->get('queue.batching.database')),
48+
$app->config->get('queue.batching.table', 'job_batches')
49+
);
50+
});
4151
}
4252
}

0 commit comments

Comments
 (0)