Skip to content

Commit f00af8a

Browse files
Cleanup. Thank You @jderusse @OskarStark
1 parent 08fe7e7 commit f00af8a

14 files changed

+19
-71
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ jobs:
162162
echo "::endgroup::"
163163
164164
- name: Run tests
165-
run: ./phpunit --group integration -vvv src/Symfony/Component/Messenger/Bridge/Kafka/Tests
165+
run: ./phpunit --group integration -v
166166
env:
167167
REDIS_HOST: localhost
168168
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'

src/Symfony/Component/Messenger/Bridge/Kafka/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Kafka Messenger
2-
==============
2+
===============
33

44
Provides Kafka integration for Symfony Messenger.
55

src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
<?php
22

3-
declare(strict_types=1);
4-
53
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
64

75
use PHPUnit\Framework\TestCase;

src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
<?php
22

3-
declare(strict_types=1);
4-
53
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
64

75
use Closure;
@@ -54,7 +52,7 @@ protected function setUp(): void
5452
$this->testStartTime = $this->testStartTime ?? new \DateTimeImmutable();
5553
}
5654

57-
public function serializerProvider()
55+
public function serializerProvider(): array
5856
{
5957
$serializer = new Serializer();
6058
$phpSerializer = new PhpSerializer();
@@ -159,7 +157,7 @@ public function createPHPSerializerDecodeClosure(SerializerInterface $serializer
159157
};
160158
}
161159

162-
private function getTopicName()
160+
private function getTopicName(): string
163161
{
164162
return self::TOPIC_NAME.'_'.$this->testStartTime->getTimestamp().'_'.$this->testIteration;
165163
}

src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportTest.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
<?php
22

3-
declare(strict_types=1);
4-
53
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
64

75
use PHPUnit\Framework\MockObject\MockObject;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaMessageStamp.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Message;
@@ -21,7 +19,6 @@
2119
*/
2220
final class KafkaMessageStamp implements NonSendableStampInterface
2321
{
24-
/** @var Message */
2522
private $message;
2623

2724
public function __construct(Message $message)

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use Psr\Log\LoggerInterface;
@@ -26,11 +24,8 @@
2624
class KafkaReceiver implements ReceiverInterface
2725
{
2826
private $logger;
29-
3027
private $serializer;
31-
3228
private $rdKafkaFactory;
33-
3429
private $properties;
3530

3631
/** @var KafkaConsumer */
@@ -39,12 +34,8 @@ class KafkaReceiver implements ReceiverInterface
3934
/** @var bool */
4035
private $subscribed = false;
4136

42-
public function __construct(
43-
LoggerInterface $logger,
44-
SerializerInterface $serializer,
45-
RdKafkaFactory $rdKafkaFactory,
46-
KafkaReceiverProperties $properties
47-
) {
37+
public function __construct(LoggerInterface $logger, SerializerInterface $serializer, RdKafkaFactory $rdKafkaFactory, KafkaReceiverProperties $properties)
38+
{
4839
$this->logger = $logger;
4940
$this->serializer = $serializer;
5041
$this->rdKafkaFactory = $rdKafkaFactory;
@@ -57,7 +48,7 @@ public function get(): iterable
5748

5849
switch ($message->err) {
5950
case \RD_KAFKA_RESP_ERR_NO_ERROR:
60-
$this->logger->info(sprintf(
51+
$this->logger->debug(sprintf(
6152
'Kafka: Message %s %s %s received ',
6253
$message->topic_name,
6354
$message->partition,
@@ -101,7 +92,7 @@ public function ack(Envelope $envelope): void
10192
if ($this->properties->isCommitAsync()) {
10293
$consumer->commitAsync($message);
10394

104-
$this->logger->info(sprintf(
95+
$this->logger->debug(sprintf(
10596
'Offset topic=%s partition=%s offset=%s to be committed asynchronously.',
10697
$message->topic_name,
10798
$message->partition,
@@ -110,7 +101,7 @@ public function ack(Envelope $envelope): void
110101
} else {
111102
$consumer->commit($message);
112103

113-
$this->logger->info(sprintf(
104+
$this->logger->debug(sprintf(
114105
'Offset topic=%s partition=%s offset=%s successfully committed.',
115106
$message->topic_name,
116107
$message->partition,
@@ -121,15 +112,15 @@ public function ack(Envelope $envelope): void
121112

122113
public function reject(Envelope $envelope): void
123114
{
124-
// Do nothing. auto commit should be set to false!
115+
// Do nothing.
125116
}
126117

127118
private function getSubscribedConsumer(): KafkaConsumer
128119
{
129120
$consumer = $this->getConsumer();
130121

131122
if (false === $this->subscribed) {
132-
$this->logger->info('Partition assignment...');
123+
$this->logger->debug(sprintf('Partition assignment for topic "%s" ...', $this->properties->getTopicName()));
133124
$consumer->subscribe([$this->properties->getTopicName()]);
134125

135126
$this->subscribed = true;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiverProperties.php

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Conf as KafkaConf;
@@ -20,24 +18,13 @@
2018
*/
2119
final class KafkaReceiverProperties
2220
{
23-
/** @var KafkaConf */
2421
private $kafkaConf;
25-
26-
/** @var string */
2722
private $topicName;
28-
29-
/** @var int */
3023
private $receiveTimeoutMs;
31-
32-
/** @var bool */
3324
private $commitAsync;
3425

35-
public function __construct(
36-
KafkaConf $kafkaConf,
37-
string $topicName,
38-
int $receiveTimeoutMs,
39-
bool $commitAsync
40-
) {
26+
public function __construct(KafkaConf $kafkaConf, string $topicName, int $receiveTimeoutMs, bool $commitAsync)
27+
{
4128
$this->kafkaConf = $kafkaConf;
4229
$this->topicName = $topicName;
4330
$this->receiveTimeoutMs = $receiveTimeoutMs;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaSender.php

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use Psr\Log\LoggerInterface;
@@ -26,11 +24,8 @@
2624
class KafkaSender implements SenderInterface
2725
{
2826
private $logger;
29-
3027
private $serializer;
31-
3228
private $rdKafkaFactory;
33-
3429
private $properties;
3530

3631
/** @var KafkaProducer */
@@ -65,7 +60,7 @@ public function send(Envelope $envelope): Envelope
6560
);
6661

6762
$code = \RD_KAFKA_RESP_ERR_NO_ERROR;
68-
for ($flushRetries = 0; $flushRetries < $this->properties->getFlushRetries() + 1; ++$flushRetries) {
63+
for ($flushRetries = 0; $flushRetries <= $this->properties->getFlushRetries(); ++$flushRetries) {
6964
$code = $producer->flush($this->properties->getFlushTimeoutMs());
7065
if (\RD_KAFKA_RESP_ERR_NO_ERROR === $code) {
7166
$this->logger->info(sprintf('Kafka message sent%s', \array_key_exists('key', $payload) ? ' with key '.$payload['key'] : ''));

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaSenderProperties.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Conf as KafkaConf;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransport.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use Psr\Log\LoggerInterface;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransportFactory.php

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,13 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

16-
use function explode;
1714
use Psr\Log\LoggerInterface;
1815
use Psr\Log\NullLogger;
19-
use const RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
20-
use const RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
2116
use RdKafka\Conf as KafkaConf;
2217
use RdKafka\KafkaConsumer;
2318
use RdKafka\TopicPartition;
24-
use function sprintf;
25-
use function str_replace;
26-
use function strpos;
2719
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2820
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
2921
use Symfony\Component\Messenger\Transport\TransportInterface;
@@ -42,9 +34,8 @@ class KafkaTransportFactory implements TransportFactoryInterface
4234

4335
private $logger;
4436

45-
public function __construct(
46-
?LoggerInterface $logger
47-
) {
37+
public function __construct(?LoggerInterface $logger)
38+
{
4839
$this->logger = $logger ?? new NullLogger();
4940
}
5041

@@ -69,7 +60,7 @@ public function createTransport(string $dsn, array $options, SerializerInterface
6960
$brokers = $this->stripProtocol($dsn);
7061
$conf->set('metadata.broker.list', implode(',', $brokers));
7162

72-
foreach (array_merge($options['topic_conf'] ?? [], $options['kafka_conf'] ?? []) as $option => $value) {
63+
foreach ($options['kafka_conf'] ?? [] as $option => $value) {
7364
$conf->set($option, $value);
7465
}
7566

@@ -112,14 +103,14 @@ private function createRebalanceCb(LoggerInterface $logger): \Closure
112103
$topicPartitions = $topicPartitions ?? [];
113104

114105
switch ($err) {
115-
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
106+
case \RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
116107
foreach ($topicPartitions as $topicPartition) {
117108
$logger->info(sprintf('Assign: %s %s %s', $topicPartition->getTopic(), $topicPartition->getPartition(), $topicPartition->getOffset()));
118109
}
119110
$kafka->assign($topicPartitions);
120111
break;
121112

122-
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
113+
case \RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
123114
foreach ($topicPartitions as $topicPartition) {
124115
$logger->info(sprintf('Assign: %s %s %s', $topicPartition->getTopic(), $topicPartition->getPartition(), $topicPartition->getOffset()));
125116
}

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/RdKafkaFactory.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Conf;

src/Symfony/Component/Messenger/Bridge/Kafka/composer.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
"license": "MIT",
77
"require": {
88
"php": ">=7.2.5",
9-
"ext-json": "*",
109
"ext-rdkafka": "^4.0",
1110
"symfony/messenger": "^5.0",
1211
"psr/log": "^1.1"

0 commit comments

Comments
 (0)