Skip to content

Commit 9aea4b6

Browse files
Cleanup. Thank You @jderusse @OskarStark
1 parent 08fe7e7 commit 9aea4b6

14 files changed

+15
-68
lines changed

.github/workflows/tests.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ jobs:
162162
echo "::endgroup::"
163163
164164
- name: Run tests
165-
run: ./phpunit --group integration -vvv src/Symfony/Component/Messenger/Bridge/Kafka/Tests
165+
run: ./phpunit --group integration -v
166166
env:
167167
REDIS_HOST: localhost
168168
REDIS_CLUSTER_HOSTS: 'localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'

src/Symfony/Component/Messenger/Bridge/Kafka/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Kafka Messenger
2-
==============
2+
===============
33

44
Provides Kafka integration for Symfony Messenger.
55

src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
<?php
22

3-
declare(strict_types=1);
4-
53
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
64

75
use PHPUnit\Framework\TestCase;

src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
<?php
22

3-
declare(strict_types=1);
4-
53
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
64

75
use Closure;
@@ -54,7 +52,7 @@ protected function setUp(): void
5452
$this->testStartTime = $this->testStartTime ?? new \DateTimeImmutable();
5553
}
5654

57-
public function serializerProvider()
55+
public function serializerProvider(): array
5856
{
5957
$serializer = new Serializer();
6058
$phpSerializer = new PhpSerializer();
@@ -159,7 +157,7 @@ public function createPHPSerializerDecodeClosure(SerializerInterface $serializer
159157
};
160158
}
161159

162-
private function getTopicName()
160+
private function getTopicName(): string
163161
{
164162
return self::TOPIC_NAME.'_'.$this->testStartTime->getTimestamp().'_'.$this->testIteration;
165163
}

src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportTest.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
<?php
22

3-
declare(strict_types=1);
4-
53
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
64

75
use PHPUnit\Framework\MockObject\MockObject;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaMessageStamp.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Message;
@@ -21,7 +19,6 @@
2119
*/
2220
final class KafkaMessageStamp implements NonSendableStampInterface
2321
{
24-
/** @var Message */
2522
private $message;
2623

2724
public function __construct(Message $message)

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use Psr\Log\LoggerInterface;
@@ -26,11 +24,8 @@
2624
class KafkaReceiver implements ReceiverInterface
2725
{
2826
private $logger;
29-
3027
private $serializer;
31-
3228
private $rdKafkaFactory;
33-
3429
private $properties;
3530

3631
/** @var KafkaConsumer */
@@ -39,12 +34,7 @@ class KafkaReceiver implements ReceiverInterface
3934
/** @var bool */
4035
private $subscribed = false;
4136

42-
public function __construct(
43-
LoggerInterface $logger,
44-
SerializerInterface $serializer,
45-
RdKafkaFactory $rdKafkaFactory,
46-
KafkaReceiverProperties $properties
47-
) {
37+
public function __construct(LoggerInterface $logger, SerializerInterface $serializer, RdKafkaFactory $rdKafkaFactory, KafkaReceiverProperties $properties) {
4838
$this->logger = $logger;
4939
$this->serializer = $serializer;
5040
$this->rdKafkaFactory = $rdKafkaFactory;
@@ -57,7 +47,7 @@ public function get(): iterable
5747

5848
switch ($message->err) {
5949
case \RD_KAFKA_RESP_ERR_NO_ERROR:
60-
$this->logger->info(sprintf(
50+
$this->logger->debug(sprintf(
6151
'Kafka: Message %s %s %s received ',
6252
$message->topic_name,
6353
$message->partition,
@@ -101,7 +91,7 @@ public function ack(Envelope $envelope): void
10191
if ($this->properties->isCommitAsync()) {
10292
$consumer->commitAsync($message);
10393

104-
$this->logger->info(sprintf(
94+
$this->logger->debug(sprintf(
10595
'Offset topic=%s partition=%s offset=%s to be committed asynchronously.',
10696
$message->topic_name,
10797
$message->partition,
@@ -110,7 +100,7 @@ public function ack(Envelope $envelope): void
110100
} else {
111101
$consumer->commit($message);
112102

113-
$this->logger->info(sprintf(
103+
$this->logger->debug(sprintf(
114104
'Offset topic=%s partition=%s offset=%s successfully committed.',
115105
$message->topic_name,
116106
$message->partition,
@@ -121,15 +111,15 @@ public function ack(Envelope $envelope): void
121111

122112
public function reject(Envelope $envelope): void
123113
{
124-
// Do nothing. auto commit should be set to false!
114+
// Do nothing.
125115
}
126116

127117
private function getSubscribedConsumer(): KafkaConsumer
128118
{
129119
$consumer = $this->getConsumer();
130120

131121
if (false === $this->subscribed) {
132-
$this->logger->info('Partition assignment...');
122+
$this->logger->debug(sprintf('Partition assignment for topic "%s" ...', $this->properties->getTopicName()));
133123
$consumer->subscribe([$this->properties->getTopicName()]);
134124

135125
$this->subscribed = true;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiverProperties.php

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Conf as KafkaConf;
@@ -20,24 +18,12 @@
2018
*/
2119
final class KafkaReceiverProperties
2220
{
23-
/** @var KafkaConf */
2421
private $kafkaConf;
25-
26-
/** @var string */
2722
private $topicName;
28-
29-
/** @var int */
3023
private $receiveTimeoutMs;
31-
32-
/** @var bool */
3324
private $commitAsync;
3425

35-
public function __construct(
36-
KafkaConf $kafkaConf,
37-
string $topicName,
38-
int $receiveTimeoutMs,
39-
bool $commitAsync
40-
) {
26+
public function __construct(KafkaConf $kafkaConf, string $topicName, int $receiveTimeoutMs, bool $commitAsync) {
4127
$this->kafkaConf = $kafkaConf;
4228
$this->topicName = $topicName;
4329
$this->receiveTimeoutMs = $receiveTimeoutMs;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaSender.php

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use Psr\Log\LoggerInterface;
@@ -26,11 +24,8 @@
2624
class KafkaSender implements SenderInterface
2725
{
2826
private $logger;
29-
3027
private $serializer;
31-
3228
private $rdKafkaFactory;
33-
3429
private $properties;
3530

3631
/** @var KafkaProducer */
@@ -65,7 +60,7 @@ public function send(Envelope $envelope): Envelope
6560
);
6661

6762
$code = \RD_KAFKA_RESP_ERR_NO_ERROR;
68-
for ($flushRetries = 0; $flushRetries < $this->properties->getFlushRetries() + 1; ++$flushRetries) {
63+
for ($flushRetries = 0; $flushRetries <= $this->properties->getFlushRetries(); ++$flushRetries) {
6964
$code = $producer->flush($this->properties->getFlushTimeoutMs());
7065
if (\RD_KAFKA_RESP_ERR_NO_ERROR === $code) {
7166
$this->logger->info(sprintf('Kafka message sent%s', \array_key_exists('key', $payload) ? ' with key '.$payload['key'] : ''));

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaSenderProperties.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Conf as KafkaConf;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransport.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use Psr\Log\LoggerInterface;

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransportFactory.php

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,21 +9,13 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

16-
use function explode;
1714
use Psr\Log\LoggerInterface;
1815
use Psr\Log\NullLogger;
19-
use const RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS;
20-
use const RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS;
2116
use RdKafka\Conf as KafkaConf;
2217
use RdKafka\KafkaConsumer;
2318
use RdKafka\TopicPartition;
24-
use function sprintf;
25-
use function str_replace;
26-
use function strpos;
2719
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
2820
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
2921
use Symfony\Component\Messenger\Transport\TransportInterface;
@@ -69,7 +61,7 @@ public function createTransport(string $dsn, array $options, SerializerInterface
6961
$brokers = $this->stripProtocol($dsn);
7062
$conf->set('metadata.broker.list', implode(',', $brokers));
7163

72-
foreach (array_merge($options['topic_conf'] ?? [], $options['kafka_conf'] ?? []) as $option => $value) {
64+
foreach ($options['kafka_conf'] ?? [] as $option => $value) {
7365
$conf->set($option, $value);
7466
}
7567

@@ -112,14 +104,14 @@ private function createRebalanceCb(LoggerInterface $logger): \Closure
112104
$topicPartitions = $topicPartitions ?? [];
113105

114106
switch ($err) {
115-
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
107+
case \RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
116108
foreach ($topicPartitions as $topicPartition) {
117109
$logger->info(sprintf('Assign: %s %s %s', $topicPartition->getTopic(), $topicPartition->getPartition(), $topicPartition->getOffset()));
118110
}
119111
$kafka->assign($topicPartitions);
120112
break;
121113

122-
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
114+
case \RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
123115
foreach ($topicPartitions as $topicPartition) {
124116
$logger->info(sprintf('Assign: %s %s %s', $topicPartition->getTopic(), $topicPartition->getPartition(), $topicPartition->getOffset()));
125117
}

src/Symfony/Component/Messenger/Bridge/Kafka/Transport/RdKafkaFactory.php

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
* file that was distributed with this source code.
1010
*/
1111

12-
declare(strict_types=1);
13-
1412
namespace Symfony\Component\Messenger\Bridge\Kafka\Transport;
1513

1614
use RdKafka\Conf;

src/Symfony/Component/Messenger/Bridge/Kafka/composer.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
"license": "MIT",
77
"require": {
88
"php": ">=7.2.5",
9-
"ext-json": "*",
109
"ext-rdkafka": "^4.0",
1110
"symfony/messenger": "^5.0",
1211
"psr/log": "^1.1"

0 commit comments

Comments
 (0)