Skip to content

[Messenger] Kafka Transport Bridge #51070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: 7.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[Messenger] Added Kafka Transport Bridge
  • Loading branch information
andythorne committed Aug 14, 2023
commit fd88c3ba39a22a83c3ee9500d6f2ab83f5921ac2
4 changes: 2 additions & 2 deletions .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jobs:
ports:
- 9092:9092
env:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_CREATE_TOPICS: 'test-topic:1:1:compact'
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:
uses: shivammathur/setup-php@v2
with:
coverage: "none"
extensions: "json,couchbase-3.2.2,memcached,mongodb-1.12.0,redis,rdkafka,xsl,ldap,relay"
extensions: "json,couchbase-3.2.2,memcached,mongodb-1.12.0,redis,rdkafka,xsl,ldap,relay,rdkafka"
ini-values: date.timezone=UTC,memory_limit=-1,default_socket_timeout=10,session.gc_probability=0,apc.enable_cli=1,zend.assertions=1
php-version: "${{ matrix.php }}"
tools: pecl
Expand Down
12 changes: 11 additions & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
name: Unit Tests

env:
extensions: amqp,apcu,igbinary,intl,mbstring,memcached,redis,relay
extensions: amqp,apcu,igbinary,intl,mbstring,memcached,redis,relay,rdkafka

strategy:
matrix:
Expand All @@ -43,6 +43,16 @@ jobs:
with:
fetch-depth: 2

- name: Install system dependencies
run: |
echo "::group::apt-get update"
sudo apt-get update
echo "::endgroup::"

echo "::group::install tools & libraries"
sudo apt-get install librdkafka-dev
echo "::endgroup::"

- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
Expand Down
4 changes: 4 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/Tests export-ignore
/phpunit.xml.dist export-ignore
/.gitattributes export-ignore
/.gitignore export-ignore
3 changes: 3 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
vendor/
composer.lock
phpunit.xml
7 changes: 7 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CHANGELOG
=========

6.4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated

Suggested change
6.4
7.4

---

* Introduced the Kafka bridge.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please follow the contributing rules for changelog entries: https://symfony.com/doc/current/contributing/code/conventions.html#writing-a-changelog-entry

Suggested change
* Introduced the Kafka bridge.
* Introduce the Kafka bridge.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Callback;

use Psr\Log\LoggerInterface;
use RdKafka\KafkaConsumer;

final class LoggingErrorCallback
{
public function __construct(
private readonly LoggerInterface $logger,
) {
}

public function __invoke(KafkaConsumer $kafka, int $err, string $reason): void
{
$this->logger->error($reason, [
'error_code' => $err,
]);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Callback;

use Psr\Log\LoggerInterface;

final class LoggingLogCallback
{
public function __construct(
private readonly LoggerInterface $logger,
) {
}

public function __invoke(object $kafka, int $level, string $facility, string $message): void
{
$this->logger->log(
$level,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is broken. you need to convert the kafka level into a \Psr\Log\LogLevel value. Otherwise, you have no guarantee that things will work fine as the only mandatory supported values for the log method in PSR-3 are those string constants. Any other supported value for the level is implementation-defined and so you cannot rely on it when targeting only the PSR-3 interface without knowing the implementation.

$message,
[
'facility' => $facility,
],
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Callback;

use Psr\Log\LoggerInterface;
use RdKafka\KafkaConsumer;
use RdKafka\TopicPartition;

final class LoggingRebalanceCallback
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should those be @internal or no ?

{
public function __construct(
private readonly LoggerInterface $logger,
) {
}

/**
* @param list<TopicPartition>|null $topicPartitions
*/
public function __invoke(KafkaConsumer $kafka, ?int $err, array $topicPartitions = null): void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why making the second argument nullable and the third argument nullable and optional ? this does not match the documented signature of the rebalance callback where those are always passed.

Making them nullable means that the code inside them has to deal with null values which this may not be necessary.

{
$topicPartitions = $topicPartitions ?? [];

switch ($err) {
case \RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
foreach ($topicPartitions as $topicPartition) {
$this->logger->info(
sprintf(
'Rebalancing %s %s %s as the assignment changed',
$topicPartition->getTopic(),
$topicPartition->getPartition(),
$topicPartition->getOffset(),
),
[
'topic' => $topicPartition->getTopic(),
'partition' => $topicPartition->getPartition(),
'offset' => $topicPartition->getOffset(),
'error_code' => $err,
],
);
}
$kafka->assign($topicPartitions);
break;

case \RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
foreach ($topicPartitions as $topicPartition) {
$this->logger->info(
sprintf(
'Rebalancing %s %s %s as the assignment was revoked',
$topicPartition->getTopic(),
$topicPartition->getPartition(),
$topicPartition->getOffset(),
),
[
'topic' => $topicPartition->getTopic(),
'partition' => $topicPartition->getPartition(),
'offset' => $topicPartition->getOffset(),
'error_code' => $err,
],
);
}
$kafka->assign(null);
break;

default:
if (\count($topicPartitions)) {
foreach ($topicPartitions as $topicPartition) {
$this->logger->error(
sprintf(
'Rebalancing %s %s %s due to error code %d',
$topicPartition->getTopic(),
$topicPartition->getPartition(),
$topicPartition->getOffset(),
$err,
),
[
'topic' => $topicPartition->getTopic(),
'partition' => $topicPartition->getPartition(),
'offset' => $topicPartition->getOffset(),
'error_code' => $err,
],
);
}
} else {
$this->logger->error(
sprintf(
'Rebalancing error code %d',
$err,
),
[
'error_code' => $err,
]
);
}
$kafka->assign(null);
break;
}
}
}
19 changes: 19 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright (c) 2018-present Fabien Potencier

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
12 changes: 12 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Kafka Messenger
===============

Provides Kafka integration for Symfony Messenger.

Resources
---------

* [Contributing](https://symfony.com/doc/current/contributing/index.html)
* [Report issues](https://github.com/symfony/symfony/issues) and
[send Pull Requests](https://github.com/symfony/symfony/pulls)
in the [main Symfony repository](https://github.com/symfony/symfony)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Stamp;

use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

final class KafkaMessageStamp implements NonSendableStampInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should those stamps be @internal or no ?

{
public function __construct(
public int $partition,
public int $messageFlags,
public ?string $key,
) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Stamp;

use RdKafka\Message;
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;

final class KafkaReceivedMessageStamp implements NonSendableStampInterface
{
public function __construct(
public Message $message,
) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?php

declare(strict_types=1);

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Callback;

use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use RdKafka\KafkaConsumer;
use Symfony\Component\Messenger\Bridge\Kafka\Callback\LoggingErrorCallback;

/**
* @requires extension rdkafka
*/
final class LoggingErrorCallbackTest extends TestCase
{
public function testInvoke(): void
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects(self::once())
->method('error')
->with('test error message', ['error_code' => 1]);

$consumer = $this->createMock(KafkaConsumer::class);

$callback = new LoggingErrorCallback($logger);
$callback($consumer, 1, 'test error message');
}
}
Loading