Skip to content

[Messenger] Kafka Transport Bridge #39712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions .github/workflows/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
version: "3.9"
services:
postgres:
image: postgres:9.6-alpine
ports:
- 5432:5432
environment:
POSTGRES_PASSWORD: 'password'
ldap:
image: bitnami/openldap
ports:
- 3389:3389
environment:
LDAP_ADMIN_USERNAME: admin
LDAP_ADMIN_PASSWORD: symfony
LDAP_ROOT: dc=symfony,dc=com
LDAP_PORT_NUMBER: 3389
LDAP_USERS: a
LDAP_PASSWORDS: a
redis:
image: redis:6.0.0
ports:
- 16379:6379
redis-cluster:
image: grokzen/redis-cluster:5.0.4
ports:
- 7000:7000
- 7001:7001
- 7002:7002
- 7003:7003
- 7004:7004
- 7005:7005
- 7006:7006
environment:
STANDALONE: 1
redis-sentinel:
image: bitnami/redis-sentinel:6.0
ports:
- 26379:26379
environment:
REDIS_MASTER_HOST: redis
REDIS_MASTER_SET: redis_sentinel
REDIS_SENTINEL_QUORUM: 1
memcached:
image: memcached:1.6.5
ports:
- 11211:11211
rabbitmq:
image: rabbitmq:3.8.3
ports:
- 5672:5672
mongodb:
image: mongo
ports:
- 27017:27017
couchbase:
image: couchbase:6.5.1
ports:
- 8091:8091
- 8092:8092
- 8093:8093
- 8094:8094
- 11210:11210
sqs:
image: asyncaws/testing-sqs
ports:
- 9494:9494
zookeeper:
image: wurstmeister/zookeeper:3.4.6
kafka:
image: wurstmeister/kafka:2.12-2.0.1
ports:
- 9092:9092
environment:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_CREATE_TOPICS: 'test-topic:1:1:compact'
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_ADVERTISED_PORT: 9092

3 changes: 2 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ jobs:
ports:
- 9092:9092
env:
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_CREATE_TOPICS: 'test-topic:1:1:compact'
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
Expand Down Expand Up @@ -177,6 +177,7 @@ jobs:
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages
MESSENGER_SQS_DSN: "sqs://localhost:9494/messages?sslmode=disable&poll_timeout=0.01"
MESSENGER_SQS_FIFO_QUEUE_DSN: "sqs://localhost:9494/messages.fifo?sslmode=disable&poll_timeout=0.01"
MESSENGER_KAFKA_DSN: kafka://localhost:9092
KAFKA_BROKER: 127.0.0.1:9092
POSTGRES_HOST: localhost

Expand Down
7 changes: 5 additions & 2 deletions .github/workflows/psalm.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@ jobs:
runs-on: Ubuntu-20.04

steps:
- name: Setup PHP
- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install librdkafka-dev

- name: Set up PHP
uses: shivammathur/setup-php@v2
with:
php-version: '8.1'
extensions: "json,couchbase,memcached,mongodb,redis,xsl,ldap,dom,relay"
extensions: "json,couchbase,memcached,mongodb,redis,xsl,ldap,dom,relay,rdkafka"
ini-values: "memory_limit=-1"
coverage: none

Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
name: Tests

env:
extensions: amqp,apcu,igbinary,intl,mbstring,memcached,redis
extensions: amqp,apcu,igbinary,intl,mbstring,memcached,redis,rdkafka

strategy:
matrix:
Expand All @@ -43,6 +43,9 @@ jobs:
with:
fetch-depth: 2

- name: Install system dependencies
run: sudo apt-get update && sudo apt-get install librdkafka-dev

- name: Setup PHP
uses: shivammathur/setup-php@v2
with:
Expand Down
4 changes: 4 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/Tests export-ignore
/phpunit.xml.dist export-ignore
/.gitattributes export-ignore
/.gitignore export-ignore
3 changes: 3 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
vendor/
composer.lock
phpunit.xml
7 changes: 7 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CHANGELOG
=========

6.3
---

* Add the bridge
19 changes: 19 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright (c) 2023-present Fabien Potencier

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
12 changes: 12 additions & 0 deletions src/Symfony/Component/Messenger/Bridge/Kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
Kafka Messenger
===============

Provides Kafka integration for Symfony Messenger.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a DSN example (you can find some in the Notifier bridges). Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely! I was actually not sure where to write the docs. There is the separate docs repo, should I create a page there as well? Thanks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please

Copy link
Contributor Author

@KonstantinCodes KonstantinCodes Oct 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added symfony/symfony-docs#15884 :)
Is this good enough? I saw other bridges also don't really use this README.md file

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are doing it in Notifier component for all bridges, I think its helpful to add it here too

Resources
---------

* [Contributing](https://symfony.com/doc/current/contributing/index.html)
* [Report issues](https://github.com/symfony/symfony/issues) and
[send Pull Requests](https://github.com/symfony/symfony/pulls)
in the [main Symfony repository](https://github.com/symfony/symfony)
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Fixtures;

class TestMessage
{
public function __construct(
public ?string $data = null
){
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;

use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Psr\Log\LoggerInterface;
use RdKafka\Conf as KafkaConf;
use RdKafka\Producer as KafkaProducer;
use RdKafka\ProducerTopic;
use Symfony\Component\Messenger\Bridge\Kafka\Tests\Fixtures\TestMessage;
use Symfony\Component\Messenger\Bridge\Kafka\Transport\KafkaMessageSendStamp;
use Symfony\Component\Messenger\Bridge\Kafka\Transport\KafkaSender;
use Symfony\Component\Messenger\Bridge\Kafka\Transport\RdKafkaFactory;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;

/**
* @author Konstantin Scheumann <konstantin@konstantin.codes>
*
* @requires extension rdkafka
*/
class KafkaSenderTest extends TestCase
{
/** @var MockObject|SerializerInterface */
private $serializer;

/** @var MockObject|KafkaProducer */
private $rdKafkaProducer;

/** @var MockObject|RdKafkaFactory */
private $rdKafkaFactory;

protected function setUp(): void
{
parent::setUp();

$this->serializer = $this->createMock(SerializerInterface::class);

$this->rdKafkaFactory = $this->createMock(RdKafkaFactory::class);

$this->rdKafkaProducer = $this->createMock(KafkaProducer::class);
$this->rdKafkaFactory
->method('createProducer')
->willReturn($this->rdKafkaProducer);
}

public function testConstruct()
{
$sender = new KafkaSender(
$this->createMock(LoggerInterface::class),
$this->serializer,
$this->rdKafkaFactory,
new KafkaConf(),
[]
);

static::assertInstanceOf(SenderInterface::class, $sender);
}

public function testSend()
{
$sender = new KafkaSender(
$this->createMock(LoggerInterface::class),
$this->serializer,
$this->rdKafkaFactory,
new KafkaConf(),
[
'topic_name' => 'test_topic_kafka_sender_test',
'flush_timeout' => 10000,
'flush_retries' => 10,
'conf' => [],
]
);

$this->serializer->expects(static::once())
->method('encode')
->willReturn([
'body' => '{"data":"my_test_data"}',
'headers' => [
'type' => TestMessage::class,
'Content-Type' => 'application/json',
],
]);

$mockProducerTopic = $this->createMock(ProducerTopic::class);
$this->rdKafkaProducer->expects(static::once())
->method('newTopic')
->with('test_topic_kafka_sender_test')
->willReturn($mockProducerTopic);

$mockProducerTopic->expects(static::once())
->method('producev')
->with(
5,
\RD_KAFKA_MSG_F_BLOCK,
'{"data":"my_test_data"}',
'test_key_123',
[
'type' => TestMessage::class,
'Content-Type' => 'application/json',
],
1681790400
);

$sender->send(new Envelope(
new TestMessage('my_test_data'),
[
new KafkaMessageSendStamp([
'partition' => 5,
'msgflags' => \RD_KAFKA_MSG_F_BLOCK,
'key' => 'test_key_123',
'timestamp_ms' => 1681790400,
]),
]
));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;

use PHPUnit\Framework\TestCase;
use Psr\Log\NullLogger;
use Symfony\Component\Messenger\Bridge\Kafka\Transport\KafkaTransportFactory;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;

/**
* @author Konstantin Scheumann <konstantin@konstantin.codes>
*
* @requires extension rdkafka
*/
class KafkaTransportFactoryTest extends TestCase
{
private KafkaTransportFactory $factory;
private SerializerInterface $serializer;

protected function setUp(): void
{
parent::setUp();

$this->factory = new KafkaTransportFactory(new NullLogger());
$this->serializer = $this->createMock(SerializerInterface::class);
}

public function testSupports()
{
static::assertTrue($this->factory->supports('kafka://my-local-kafka:9092', []));
static::assertTrue($this->factory->supports('kafka://prod-kafka-01:9093,prod-kafka-01:9093,prod-kafka-01:9093', []));
}

public function testCreateTransport()
{
$transport = $this->factory->createTransport(
'kafka://my-local-kafka:9092',
[
'conf' => [],
'consumer' => [
'topics' => [
'test',
],
'receive_timeout' => 10000,
'conf' => [],
],
],
$this->serializer
);

static::assertInstanceOf(TransportInterface::class, $transport);
}

public function testCreateTransportFromDsn()
{
$transport = $this->factory->createTransport(
'kafka://kafka1,kafka2:9092?consumer[topics][0]=test&consumer[receive_timeout]=10000',
[],
$this->serializer
);

static::assertInstanceOf(TransportInterface::class, $transport);
}
}
Loading