Skip to content

Commit dc8ba02

Browse files
committed
Completed PsrLogging tests
1 parent 6ac7821 commit dc8ba02

File tree

3 files changed

+160
-32
lines changed

3 files changed

+160
-32
lines changed

src/Symfony/Bundle/FrameworkBundle/DependencyInjection/FrameworkExtension.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
use Symfony\Component\Messenger\Bridge as MessengerBridge;
116116
use Symfony\Component\Messenger\Bridge\Kafka\Callback\CallbackManager;
117117
use Symfony\Component\Messenger\Bridge\Kafka\Callback\CallbackProcessorInterface;
118+
use Symfony\Component\Messenger\Bridge\Kafka\Transport\KafkaFactory;
118119
use Symfony\Component\Messenger\Command\StatsCommand;
119120
use Symfony\Component\Messenger\EventListener\StopWorkerOnSignalsListener;
120121
use Symfony\Component\Messenger\Handler\BatchHandlerInterface;
@@ -2224,6 +2225,7 @@ private function registerMessengerConfiguration(array $config, ContainerBuilder
22242225
$container->removeDefinition('messenger.transport.beanstalkd.factory');
22252226
$container->removeDefinition('messenger.transport.kafka.factory');
22262227
$container->removeDefinition(CallbackManager::class);
2228+
$container->removeDefinition(KafkaFactory::class);
22272229
$container->removeAlias(SerializerInterface::class);
22282230
} else {
22292231
$container->getDefinition('messenger.transport.symfony_serializer')

src/Symfony/Component/Messenger/Bridge/Kafka/Callback/PsrLoggingProcessor.php

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public function consume(Message $message): void
7676
'Timed out waiting for message'
7777
),
7878
\RD_KAFKA_RESP_ERR__TRANSPORT => $this->logger->warning(
79-
'Kafka: Broker transport failure.',
79+
'Kafka Broker transport failure',
8080
),
8181
default => $this->logger->error(sprintf(
8282
'Error occurred while consuming message from Kafka: %s',
@@ -88,32 +88,40 @@ public function consume(Message $message): void
8888
public function offsetCommit(object $kafka, int $err, $partitions): void
8989
{
9090
foreach ($partitions as $partition) {
91-
$this->logger->info(sprintf(
92-
'Offset topic=%s partition=%s offset=%s code=%d successfully committed.',
93-
$partition->getTopic(),
94-
$partition->getPartition(),
95-
$partition->getOffset(),
96-
$err,
97-
));
91+
$this->logger->info(
92+
sprintf(
93+
'Offset topic=%s partition=%s offset=%s code=%d successfully committed.',
94+
$partition->getTopic(),
95+
$partition->getPartition(),
96+
$partition->getOffset(),
97+
$err,
98+
),
99+
[
100+
'topic' => $partition->getTopic(),
101+
'partition' => $partition->getPartition(),
102+
'offset' => $partition->getOffset(),
103+
'error_code' => $err,
104+
],
105+
);
98106
}
99107
}
100108

101109
public function rebalance(KafkaConsumer $kafka, int $err, $partitions): void
102110
{
103111
switch ($err) {
104112
case \RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
105-
foreach ($partitions as $topicPartition) {
113+
foreach ($partitions as $partition) {
106114
$this->logger->info(
107115
sprintf(
108116
'Rebalancing %s %s %s as the assignment changed',
109-
$topicPartition->getTopic(),
110-
$topicPartition->getPartition(),
111-
$topicPartition->getOffset(),
117+
$partition->getTopic(),
118+
$partition->getPartition(),
119+
$partition->getOffset(),
112120
),
113121
[
114-
'topic' => $topicPartition->getTopic(),
115-
'partition' => $topicPartition->getPartition(),
116-
'offset' => $topicPartition->getOffset(),
122+
'topic' => $partition->getTopic(),
123+
'partition' => $partition->getPartition(),
124+
'offset' => $partition->getOffset(),
117125
'error_code' => $err,
118126
],
119127
);
@@ -122,18 +130,18 @@ public function rebalance(KafkaConsumer $kafka, int $err, $partitions): void
122130
break;
123131

124132
case \RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
125-
foreach ($partitions as $topicPartition) {
133+
foreach ($partitions as $partition) {
126134
$this->logger->info(
127135
sprintf(
128136
'Rebalancing %s %s %s as the assignment was revoked',
129-
$topicPartition->getTopic(),
130-
$topicPartition->getPartition(),
131-
$topicPartition->getOffset(),
137+
$partition->getTopic(),
138+
$partition->getPartition(),
139+
$partition->getOffset(),
132140
),
133141
[
134-
'topic' => $topicPartition->getTopic(),
135-
'partition' => $topicPartition->getPartition(),
136-
'offset' => $topicPartition->getOffset(),
142+
'topic' => $partition->getTopic(),
143+
'partition' => $partition->getPartition(),
144+
'offset' => $partition->getOffset(),
137145
'error_code' => $err,
138146
],
139147
);
@@ -142,19 +150,19 @@ public function rebalance(KafkaConsumer $kafka, int $err, $partitions): void
142150
break;
143151

144152
default:
145-
foreach ($partitions as $topicPartition) {
153+
foreach ($partitions as $partition) {
146154
$this->logger->error(
147155
sprintf(
148156
'Rebalancing %s %s %s due to error code %d',
149-
$topicPartition->getTopic(),
150-
$topicPartition->getPartition(),
151-
$topicPartition->getOffset(),
157+
$partition->getTopic(),
158+
$partition->getPartition(),
159+
$partition->getOffset(),
152160
$err,
153161
),
154162
[
155-
'topic' => $topicPartition->getTopic(),
156-
'partition' => $topicPartition->getPartition(),
157-
'offset' => $topicPartition->getOffset(),
163+
'topic' => $partition->getTopic(),
164+
'partition' => $partition->getPartition(),
165+
'offset' => $partition->getOffset(),
158166
'error_code' => $err,
159167
],
160168
);

src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Callback/PsrLoggingProcessorTest.php

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use Psr\Log\LoggerInterface;
1818
use Psr\Log\LogLevel;
1919
use RdKafka\KafkaConsumer;
20+
use RdKafka\Message;
2021
use RdKafka\Producer;
2122
use RdKafka\TopicPartition;
2223
use Symfony\Component\Messenger\Bridge\Kafka\Callback\PsrLoggingProcessor;
@@ -84,7 +85,7 @@ public function testLog(int $level, $expectedLevel)
8485
$this->processor->log($consumer, $level, 'facility-value', 'test error message');
8586
}
8687

87-
public function testInvokeWithAssignPartitions()
88+
public function testRebalanceWithAssignPartitions()
8889
{
8990
$topic = 'topic1';
9091
$partition = 1;
@@ -112,7 +113,7 @@ public function testInvokeWithAssignPartitions()
112113
$this->processor->rebalance($consumer, \RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS, [$topicPartition]);
113114
}
114115

115-
public function testInvokeWithRevokePartitions()
116+
public function testRebalanceWithRevokePartitions()
116117
{
117118
$topic = 'topic1';
118119
$partition = 1;
@@ -136,7 +137,7 @@ public function testInvokeWithRevokePartitions()
136137
$this->processor->rebalance($consumer, \RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS, [$topicPartition]);
137138
}
138139

139-
public function testInvokeWithUnknownReason()
140+
public function testRebalanceWithUnknownReason()
140141
{
141142
$topic = 'topic1';
142143
$partition = 1;
@@ -160,4 +161,121 @@ public function testInvokeWithUnknownReason()
160161

161162
$this->processor->rebalance($consumer, $errorCode, [$topicPartition]);
162163
}
164+
165+
public function testConsumeWithNoError()
166+
{
167+
$partition = 1;
168+
$payload = 'test payload';
169+
170+
$message = new Message();
171+
$message->err = \RD_KAFKA_RESP_ERR_NO_ERROR;
172+
$message->partition = $partition;
173+
$message->payload = $payload;
174+
175+
$this->logger->expects(self::once())
176+
->method('debug')
177+
->with(
178+
sprintf(
179+
'Message consumed from Kafka on partition %s: %s',
180+
$partition,
181+
$payload,
182+
)
183+
);
184+
185+
$this->processor->consume($message);
186+
}
187+
188+
public function testConsumeWithPartitionEofError()
189+
{
190+
$partition = 1;
191+
$payload = 'test payload';
192+
193+
$message = new Message();
194+
$message->err = \RD_KAFKA_RESP_ERR__PARTITION_EOF;
195+
$message->partition = $partition;
196+
$message->payload = $payload;
197+
198+
$this->logger->expects(self::once())
199+
->method('info')
200+
->with('No more messages; Waiting for more');
201+
202+
$this->processor->consume($message);
203+
}
204+
205+
public function testConsumeWithTimedOutError()
206+
{
207+
$partition = 1;
208+
$payload = 'test payload';
209+
210+
$message = new Message();
211+
$message->err = \RD_KAFKA_RESP_ERR__TIMED_OUT;
212+
$message->partition = $partition;
213+
$message->payload = $payload;
214+
215+
$this->logger->expects(self::once())
216+
->method('debug')
217+
->with('Timed out waiting for message');
218+
219+
$this->processor->consume($message);
220+
}
221+
222+
public function testConsumeWithTransportError()
223+
{
224+
$partition = 1;
225+
$payload = 'test payload';
226+
227+
$message = new Message();
228+
$message->err = \RD_KAFKA_RESP_ERR__TRANSPORT;
229+
$message->partition = $partition;
230+
$message->payload = $payload;
231+
232+
$this->logger->expects(self::once())
233+
->method('warning')
234+
->with('Kafka Broker transport failure');
235+
236+
$this->processor->consume($message);
237+
}
238+
239+
public function testConsumeWithGenericError()
240+
{
241+
$partition = 1;
242+
$payload = 'test payload';
243+
244+
$message = new Message();
245+
$message->err = \RD_KAFKA_RESP_ERR__RESOLVE;
246+
$message->partition = $partition;
247+
$message->payload = $payload;
248+
249+
$this->logger->expects(self::once())
250+
->method('error')
251+
->with('Error occurred while consuming message from Kafka: Local: Host resolution failure');
252+
253+
$this->processor->consume($message);
254+
}
255+
256+
public function testOffsetCommit()
257+
{
258+
$topic = 'topic1';
259+
$partition = 1;
260+
$offset = 2;
261+
262+
$kafka = new \stdClass();
263+
$err = 1;
264+
265+
$this->logger->expects(self::once())
266+
->method('info')
267+
->with(
268+
'Offset topic=topic1 partition=1 offset=2 code=1 successfully committed.',
269+
[
270+
'topic' => $topic,
271+
'partition' => $partition,
272+
'offset' => $offset,
273+
'error_code' => $err,
274+
],
275+
);
276+
277+
$topicPartition = new TopicPartition($topic, $partition, $offset);
278+
279+
$this->processor->offsetCommit($kafka, $err, [$topicPartition]);
280+
}
163281
}

0 commit comments

Comments
 (0)