-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Kafka Transport Bridge #51070
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 7.4
Are you sure you want to change the base?
Changes from 1 commit
fd88c3b
a44b50f
f1999fe
da6acef
668f67c
db50233
f82f7b2
cf98322
98a53bc
9622b67
6566e72
0123d2f
452c93c
49a2783
8f2906d
3fc3693
d396315
ea73ebe
05cad42
0d0c290
ca7e261
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
/Tests export-ignore | ||
/phpunit.xml.dist export-ignore | ||
/.gitattributes export-ignore | ||
/.gitignore export-ignore |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
vendor/ | ||
composer.lock | ||
phpunit.xml |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,7 @@ | ||||||
CHANGELOG | ||||||
========= | ||||||
|
||||||
6.4 | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This needs to be updated
Suggested change
|
||||||
--- | ||||||
|
||||||
* Introduced the Kafka bridge. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please follow the contributing rules for changelog entries: https://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry
Suggested change
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Bridge\Kafka\Callback; | ||
|
||
use Psr\Log\LoggerInterface; | ||
use RdKafka\KafkaConsumer; | ||
|
||
final class LoggingErrorCallback | ||
{ | ||
public function __construct( | ||
private readonly LoggerInterface $logger, | ||
) { | ||
} | ||
|
||
public function __invoke(KafkaConsumer $kafka, int $err, string $reason): void | ||
{ | ||
$this->logger->error($reason, [ | ||
'error_code' => $err, | ||
]); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Bridge\Kafka\Callback; | ||
|
||
use Psr\Log\LoggerInterface; | ||
|
||
final class LoggingLogCallback | ||
{ | ||
public function __construct( | ||
private readonly LoggerInterface $logger, | ||
) { | ||
} | ||
|
||
public function __invoke(object $kafka, int $level, string $facility, string $message): void | ||
{ | ||
$this->logger->log( | ||
$level, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is broken. you need to convert the kafka level into a |
||
$message, | ||
[ | ||
'facility' => $facility, | ||
], | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Bridge\Kafka\Callback; | ||
|
||
use Psr\Log\LoggerInterface; | ||
use RdKafka\KafkaConsumer; | ||
use RdKafka\TopicPartition; | ||
|
||
final class LoggingRebalanceCallback | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should those be |
||
{ | ||
public function __construct( | ||
private readonly LoggerInterface $logger, | ||
) { | ||
} | ||
|
||
/** | ||
* @param list<TopicPartition>|null $topicPartitions | ||
*/ | ||
public function __invoke(KafkaConsumer $kafka, ?int $err, array $topicPartitions = null): void | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why making the second argument nullable and the third argument nullable and optional ? this does not match the documented signature of the rebalance callback where those are always passed. Making them nullable means that the code inside them has to deal with |
||
{ | ||
$topicPartitions = $topicPartitions ?? []; | ||
|
||
switch ($err) { | ||
case \RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: | ||
foreach ($topicPartitions as $topicPartition) { | ||
$this->logger->info( | ||
sprintf( | ||
'Rebalancing %s %s %s as the assignment changed', | ||
$topicPartition->getTopic(), | ||
$topicPartition->getPartition(), | ||
$topicPartition->getOffset(), | ||
), | ||
[ | ||
'topic' => $topicPartition->getTopic(), | ||
'partition' => $topicPartition->getPartition(), | ||
'offset' => $topicPartition->getOffset(), | ||
'error_code' => $err, | ||
], | ||
); | ||
} | ||
$kafka->assign($topicPartitions); | ||
break; | ||
|
||
case \RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: | ||
foreach ($topicPartitions as $topicPartition) { | ||
$this->logger->info( | ||
sprintf( | ||
'Rebalancing %s %s %s as the assignment was revoked', | ||
$topicPartition->getTopic(), | ||
$topicPartition->getPartition(), | ||
$topicPartition->getOffset(), | ||
), | ||
[ | ||
'topic' => $topicPartition->getTopic(), | ||
'partition' => $topicPartition->getPartition(), | ||
'offset' => $topicPartition->getOffset(), | ||
'error_code' => $err, | ||
], | ||
); | ||
} | ||
$kafka->assign(null); | ||
break; | ||
|
||
default: | ||
if (\count($topicPartitions)) { | ||
foreach ($topicPartitions as $topicPartition) { | ||
$this->logger->error( | ||
sprintf( | ||
'Rebalancing %s %s %s due to error code %d', | ||
$topicPartition->getTopic(), | ||
$topicPartition->getPartition(), | ||
$topicPartition->getOffset(), | ||
$err, | ||
), | ||
[ | ||
'topic' => $topicPartition->getTopic(), | ||
'partition' => $topicPartition->getPartition(), | ||
'offset' => $topicPartition->getOffset(), | ||
'error_code' => $err, | ||
], | ||
); | ||
} | ||
} else { | ||
$this->logger->error( | ||
sprintf( | ||
'Rebalancing error code %d', | ||
$err, | ||
), | ||
[ | ||
'error_code' => $err, | ||
] | ||
); | ||
} | ||
$kafka->assign(null); | ||
break; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
Copyright (c) 2018-present Fabien Potencier | ||
|
||
Permission is hereby granted, free of charge, to any person obtaining a copy | ||
of this software and associated documentation files (the "Software"), to deal | ||
in the Software without restriction, including without limitation the rights | ||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
copies of the Software, and to permit persons to whom the Software is furnished | ||
to do so, subject to the following conditions: | ||
|
||
The above copyright notice and this permission notice shall be included in all | ||
copies or substantial portions of the Software. | ||
|
||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | ||
THE SOFTWARE. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
Kafka Messenger | ||
=============== | ||
|
||
Provides Kafka integration for Symfony Messenger. | ||
|
||
Resources | ||
--------- | ||
|
||
* [Contributing](https://symfony.com/doc/current/contributing/index.html) | ||
* [Report issues](https://github.com/symfony/symfony/issues) and | ||
[send Pull Requests](https://github.com/symfony/symfony/pulls) | ||
in the [main Symfony repository](https://github.com/symfony/symfony) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Bridge\Kafka\Stamp; | ||
|
||
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; | ||
|
||
final class KafkaMessageStamp implements NonSendableStampInterface | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should those stamps be |
||
{ | ||
public function __construct( | ||
public int $partition, | ||
public int $messageFlags, | ||
public ?string $key, | ||
) { | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Bridge\Kafka\Stamp; | ||
|
||
use RdKafka\Message; | ||
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface; | ||
|
||
final class KafkaReceivedMessageStamp implements NonSendableStampInterface | ||
{ | ||
public function __construct( | ||
public Message $message, | ||
) { | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
<?php | ||
|
||
declare(strict_types=1); | ||
|
||
/* | ||
* This file is part of the Symfony package. | ||
* | ||
* (c) Fabien Potencier <fabien@symfony.com> | ||
* | ||
* For the full copyright and license information, please view the LICENSE | ||
* file that was distributed with this source code. | ||
*/ | ||
|
||
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Callback; | ||
|
||
use PHPUnit\Framework\TestCase; | ||
use Psr\Log\LoggerInterface; | ||
use RdKafka\KafkaConsumer; | ||
use Symfony\Component\Messenger\Bridge\Kafka\Callback\LoggingErrorCallback; | ||
|
||
/** | ||
* @requires extension rdkafka | ||
*/ | ||
final class LoggingErrorCallbackTest extends TestCase | ||
{ | ||
public function testInvoke(): void | ||
{ | ||
$logger = $this->createMock(LoggerInterface::class); | ||
$logger->expects(self::once()) | ||
->method('error') | ||
->with('test error message', ['error_code' => 1]); | ||
|
||
$consumer = $this->createMock(KafkaConsumer::class); | ||
|
||
$callback = new LoggingErrorCallback($logger); | ||
$callback($consumer, 1, 'test error message'); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.