2
2
3
3
namespace Jenssegers \Mongodb \Bus ;
4
4
5
+ use Carbon \CarbonImmutable ;
5
6
use Closure ;
6
- use Illuminate \Bus \DatabaseBatchRepository ;
7
+ use Illuminate \Bus \{DatabaseBatchRepository , PendingBatch , UpdatedBatchJobCounts };
8
+ use Illuminate \Support \Str ;
9
+ use MongoDB \Operation \FindOneAndUpdate ;
7
10
8
11
class MongoDatabaseBatchRepository extends DatabaseBatchRepository
9
12
{
@@ -20,21 +23,98 @@ protected function updateAtomicValues(string $batchId, Closure $callback)
20
23
});
21
24
}
22
25
26
+ public function decrementPendingJobs (string $ batchId , string $ jobId )
27
+ {
28
+ $ values = $ this ->connection ->table ($ this ->table )->raw (function ($ collection ) use ($ batchId , $ jobId ) {
29
+ return $ collection ->findOneAndUpdate (
30
+ ['id ' => $ batchId ],
31
+ ['$inc ' => ['pending_jobs ' => -1 ]],
32
+ ['upsert ' => true , 'returnDocument ' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER ]
33
+ );
34
+ });
35
+
36
+ return new UpdatedBatchJobCounts (
37
+ $ values ['pending_jobs ' ],
38
+ $ values ['failed_jobs ' ]
39
+ );
40
+ }
41
+
42
+ /**
43
+ * Increment the total number of failed jobs for the batch.
44
+ *
45
+ * @param string $batchId
46
+ * @param string $jobId
47
+ * @return \Illuminate\Bus\UpdatedBatchJobCounts
48
+ */
49
+ public function incrementFailedJobs (string $ batchId , string $ jobId )
50
+ {
51
+ $ values = $ this ->connection ->table ($ this ->table )->raw (function ($ collection ) use ($ batchId , $ jobId ) {
52
+ return $ collection ->findOneAndUpdate (
53
+ ['id ' => $ batchId ],
54
+ [
55
+ '$push ' => ['failed_job_ids ' => $ jobId ],
56
+ '$inc ' => ['failed_jobs ' => 1 ]
57
+ ],
58
+ ['upsert ' => true , 'returnDocument ' => FindOneAndUpdate::RETURN_DOCUMENT_AFTER ]
59
+ );
60
+ });
61
+
62
+ return new UpdatedBatchJobCounts (
63
+ $ values ['pending_jobs ' ],
64
+ $ values ['failed_jobs ' ]
65
+ );
66
+ }
67
+
23
68
public function incrementTotalJobs (string $ batchId , int $ amount )
24
69
{
25
70
$ this ->connection ->table ($ this ->table )->where ('id ' , $ batchId )->update ([
26
- '$set ' => [
27
- 'finished_at ' => null ,
28
- ],
29
- '$inc ' => [
30
- 'total_jobs ' => $ amount ,
31
- 'pending_jobs ' => $ amount ,
32
- ],
71
+ '$set ' => ['finished_at ' => null ],
72
+ '$inc ' => ['total_jobs ' => $ amount , 'pending_jobs ' => $ amount ]
73
+ ]);
74
+ }
75
+
76
+ public function cancel (string $ batchId )
77
+ {
78
+ $ this ->connection ->table ($ this ->table )->where ('id ' , $ batchId )->update ([
79
+ '$set ' => ['cancelled_at ' => time (), 'finished_at ' => time ()]
80
+ ]);
81
+ }
82
+
83
+ public function store (PendingBatch $ batch )
84
+ {
85
+ $ id = (string ) Str::orderedUuid ();
86
+
87
+ $ this ->connection ->table ($ this ->table )->insert ([
88
+ 'id ' => $ id ,
89
+ 'name ' => $ batch ->name ,
90
+ 'total_jobs ' => 0 ,
91
+ 'pending_jobs ' => 0 ,
92
+ 'failed_jobs ' => 0 ,
93
+ 'failed_job_ids ' => [],
94
+ 'options ' => $ this ->serialize ($ batch ->options ),
95
+ 'created_at ' => time (),
96
+ 'cancelled_at ' => null ,
97
+ 'finished_at ' => null ,
33
98
]);
99
+
100
+ return $ this ->find ($ id );
34
101
}
35
102
36
103
protected function toBatch ($ batch )
37
104
{
38
- return parent ::toBatch ((object ) $ batch );
105
+ $ batch = (object ) $ batch ;
106
+ return $ this ->factory ->make (
107
+ $ this ,
108
+ $ batch ->id ,
109
+ $ batch ->name ,
110
+ (int ) $ batch ->total_jobs ,
111
+ (int ) $ batch ->pending_jobs ,
112
+ (int ) $ batch ->failed_jobs ,
113
+ (array ) $ batch ->failed_job_ids ,
114
+ $ this ->unserialize ($ batch ->options ),
115
+ CarbonImmutable::createFromTimestamp ($ batch ->created_at ),
116
+ $ batch ->cancelled_at ? CarbonImmutable::createFromTimestamp ($ batch ->cancelled_at ) : $ batch ->cancelled_at ,
117
+ $ batch ->finished_at ? CarbonImmutable::createFromTimestamp ($ batch ->finished_at ) : $ batch ->finished_at
118
+ );
39
119
}
40
120
}
0 commit comments