@@ -76,7 +76,7 @@ private function executeMessage(\RdKafka\Message $message): void
76
76
try {
77
77
$ this ->config ->getConsumer ()->handle ($ message ->payload );
78
78
$ success = true ;
79
- $ this ->commit ($ message , true );
79
+ $ this ->commit ($ message , $ attempts , true );
80
80
} catch (\Throwable $ exception ) {
81
81
$ this ->logger ->error ($ message ->offset , $ attempts , $ exception );
82
82
@@ -86,7 +86,7 @@ private function executeMessage(\RdKafka\Message $message): void
86
86
} while (!$ success );
87
87
}
88
88
89
- private function commit (\RdKafka \Message $ message , bool $ success ): void
89
+ private function commit (\RdKafka \Message $ message , int $ attempts , bool $ success ): void
90
90
{
91
91
try {
92
92
if (!$ success && !is_null ($ this ->config ->getDlq ())) {
@@ -103,7 +103,8 @@ private function commit(\RdKafka\Message $message, bool $success): void
103
103
return ;
104
104
}
105
105
} catch (\Throwable $ throwable ) {
106
- $ this ->logger ->error ($ message , $ throwable , 'MESSAGE_COMMIT ' );
106
+ $ offset = property_exists ($ message , 'offset ' ) ? $ message ->offset : null ;
107
+ $ this ->logger ->error ($ offset , $ attempts , $ throwable );
107
108
if ($ throwable ->getCode () != RD_KAFKA_RESP_ERR__NO_OFFSET ) {
108
109
throw $ throwable ;
109
110
}
@@ -122,7 +123,7 @@ private function isMaxAttemptReached(\RdKafka\Message $message, int $attempts):
122
123
$ this ->config ->getMaxAttempts ()->hasMaxAttempts () &&
123
124
$ this ->config ->getMaxAttempts ()->hasReachedMaxAttempts ($ attempts )
124
125
) {
125
- $ this ->commit ($ message , false );
126
+ $ this ->commit ($ message , $ attempts , false );
126
127
return true ;
127
128
}
128
129
0 commit comments