@@ -48,8 +48,8 @@ public function __construct(Config $config)
48
48
49
49
public function consume (): void
50
50
{
51
- $ this ->consumer = new KafkaConsumer ($ this ->setConf ());
52
- $ this ->producer = new Producer ($ this ->setConf ());
51
+ $ this ->consumer = new KafkaConsumer ($ this ->setConsumerConf ());
52
+ $ this ->producer = new Producer ($ this ->setProducerConf ());
53
53
54
54
$ this ->committer = CommitterBuilder::withConsumer ($ this ->consumer )
55
55
->andRetry (new NativeSleeper (), $ this ->config ->getMaxCommitRetries ())
@@ -71,13 +71,12 @@ private function doConsume()
71
71
$ this ->handleMessage ($ message );
72
72
}
73
73
74
- private function setConf (): Conf
74
+ private function setConsumerConf (): Conf
75
75
{
76
76
$ conf = new Conf ();
77
77
$ conf ->set ('auto.offset.reset ' , 'smallest ' );
78
78
$ conf ->set ('queued.max.messages.kbytes ' , '10000 ' );
79
79
$ conf ->set ('enable.auto.commit ' , 'false ' );
80
- $ conf ->set ('compression.codec ' , 'gzip ' );
81
80
$ conf ->set ('max.poll.interval.ms ' , '86400000 ' );
82
81
$ conf ->set ('group.id ' , $ this ->config ->getGroupId ());
83
82
$ conf ->set ('bootstrap.servers ' , $ this ->config ->getBroker ());
@@ -92,6 +91,22 @@ private function setConf(): Conf
92
91
return $ conf ;
93
92
}
94
93
94
+ private function setProducerConf (): Conf
95
+ {
96
+ $ conf = new Conf ();
97
+ $ conf ->set ('compression.codec ' , 'gzip ' );
98
+ $ conf ->set ('bootstrap.servers ' , $ this ->config ->getBroker ());
99
+ $ conf ->set ('security.protocol ' , $ this ->config ->getSecurityProtocol ());
100
+
101
+ if ($ this ->config ->isPlainText () && $ this ->config ->getSasl () !== null ) {
102
+ $ conf ->set ('sasl.username ' , $ this ->config ->getSasl ()->getUsername ());
103
+ $ conf ->set ('sasl.password ' , $ this ->config ->getSasl ()->getPassword ());
104
+ $ conf ->set ('sasl.mechanisms ' , $ this ->config ->getSasl ()->getMechanisms ());
105
+ }
106
+
107
+ return $ conf ;
108
+ }
109
+
95
110
private function executeMessage (Message $ message ): void
96
111
{
97
112
try {
0 commit comments