Skip to content

Commit b2f3785

Browse files
Kafka Transport Bridge
1 parent 0383b06 commit b2f3785

23 files changed

+1222
-4
lines changed

.github/workflows/docker-compose.yml

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
version: "3.9"
2+
services:
3+
postgres:
4+
image: postgres:9.6-alpine
5+
ports:
6+
- 5432:5432
7+
environment:
8+
POSTGRES_PASSWORD: 'password'
9+
ldap:
10+
image: bitnami/openldap
11+
ports:
12+
- 3389:3389
13+
environment:
14+
LDAP_ADMIN_USERNAME: admin
15+
LDAP_ADMIN_PASSWORD: symfony
16+
LDAP_ROOT: dc=symfony,dc=com
17+
LDAP_PORT_NUMBER: 3389
18+
LDAP_USERS: a
19+
LDAP_PASSWORDS: a
20+
redis:
21+
image: redis:6.0.0
22+
ports:
23+
- 16379:6379
24+
redis-cluster:
25+
image: grokzen/redis-cluster:5.0.4
26+
ports:
27+
- 7000:7000
28+
- 7001:7001
29+
- 7002:7002
30+
- 7003:7003
31+
- 7004:7004
32+
- 7005:7005
33+
- 7006:7006
34+
environment:
35+
STANDALONE: 1
36+
redis-sentinel:
37+
image: bitnami/redis-sentinel:6.0
38+
ports:
39+
- 26379:26379
40+
environment:
41+
REDIS_MASTER_HOST: redis
42+
REDIS_MASTER_SET: redis_sentinel
43+
REDIS_SENTINEL_QUORUM: 1
44+
memcached:
45+
image: memcached:1.6.5
46+
ports:
47+
- 11211:11211
48+
rabbitmq:
49+
image: rabbitmq:3.8.3
50+
ports:
51+
- 5672:5672
52+
mongodb:
53+
image: mongo
54+
ports:
55+
- 27017:27017
56+
couchbase:
57+
image: couchbase:6.5.1
58+
ports:
59+
- 8091:8091
60+
- 8092:8092
61+
- 8093:8093
62+
- 8094:8094
63+
- 11210:11210
64+
sqs:
65+
image: asyncaws/testing-sqs
66+
ports:
67+
- 9494:9494
68+
zookeeper:
69+
image: wurstmeister/zookeeper:3.4.6
70+
kafka:
71+
image: wurstmeister/kafka:2.12-2.0.1
72+
ports:
73+
- 9092:9092
74+
environment:
75+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
76+
KAFKA_CREATE_TOPICS: 'test-topic:1:1:compact'
77+
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
78+
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
79+
KAFKA_ADVERTISED_PORT: 9092
80+

.github/workflows/integration-tests.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ jobs:
9898
ports:
9999
- 9092:9092
100100
env:
101-
KAFKA_AUTO_CREATE_TOPICS_ENABLE: false
101+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
102102
KAFKA_CREATE_TOPICS: 'test-topic:1:1:compact'
103103
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
104104
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
@@ -177,6 +177,7 @@ jobs:
177177
MESSENGER_AMQP_DSN: amqp://localhost/%2f/messages
178178
MESSENGER_SQS_DSN: "sqs://localhost:9494/messages?sslmode=disable&poll_timeout=0.01"
179179
MESSENGER_SQS_FIFO_QUEUE_DSN: "sqs://localhost:9494/messages.fifo?sslmode=disable&poll_timeout=0.01"
180+
MESSENGER_KAFKA_DSN: kafka://localhost:9092
180181
KAFKA_BROKER: 127.0.0.1:9092
181182
POSTGRES_HOST: localhost
182183

.github/workflows/psalm.yml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ jobs:
2020
runs-on: Ubuntu-20.04
2121

2222
steps:
23-
- name: Setup PHP
23+
- name: Install system dependencies
24+
run: sudo apt-get update && sudo apt-get install librdkafka-dev
25+
26+
- name: Set up PHP
2427
uses: shivammathur/setup-php@v2
2528
with:
2629
php-version: '8.1'
27-
extensions: "json,couchbase,memcached,mongodb,redis,xsl,ldap,dom,relay"
30+
extensions: "json,couchbase,memcached,mongodb,redis,xsl,ldap,dom,relay,rdkafka"
2831
ini-values: "memory_limit=-1"
2932
coverage: none
3033

.github/workflows/unit-tests.yml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ jobs:
2121
name: Tests
2222

2323
env:
24-
extensions: amqp,apcu,igbinary,intl,mbstring,memcached,redis
24+
extensions: amqp,apcu,igbinary,intl,mbstring,memcached,redis,rdkafka
2525

2626
strategy:
2727
matrix:
@@ -43,6 +43,9 @@ jobs:
4343
with:
4444
fetch-depth: 2
4545

46+
- name: Install system dependencies
47+
run: sudo apt-get update && sudo apt-get install librdkafka-dev
48+
4649
- name: Setup PHP
4750
uses: shivammathur/setup-php@v2
4851
with:
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
/Tests export-ignore
2+
/phpunit.xml.dist export-ignore
3+
/.gitattributes export-ignore
4+
/.gitignore export-ignore
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
vendor/
2+
composer.lock
3+
phpunit.xml
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
CHANGELOG
2+
=========
3+
4+
6.3
5+
---
6+
7+
* Add the bridge
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
Copyright (c) 2023-present Fabien Potencier
2+
3+
Permission is hereby granted, free of charge, to any person obtaining a copy
4+
of this software and associated documentation files (the "Software"), to deal
5+
in the Software without restriction, including without limitation the rights
6+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7+
copies of the Software, and to permit persons to whom the Software is furnished
8+
to do so, subject to the following conditions:
9+
10+
The above copyright notice and this permission notice shall be included in all
11+
copies or substantial portions of the Software.
12+
13+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19+
THE SOFTWARE.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
Kafka Messenger
2+
===============
3+
4+
Provides Kafka integration for Symfony Messenger.
5+
6+
Resources
7+
---------
8+
9+
* [Contributing](https://symfony.com/doc/current/contributing/index.html)
10+
* [Report issues](https://github.com/symfony/symfony/issues) and
11+
[send Pull Requests](https://github.com/symfony/symfony/pulls)
12+
in the [main Symfony repository](https://github.com/symfony/symfony)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<?php
2+
3+
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Fixtures;
4+
5+
class TestMessage
6+
{
7+
public function __construct(
8+
public ?string $data = null
9+
){
10+
}
11+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
13+
14+
use PHPUnit\Framework\MockObject\MockObject;
15+
use PHPUnit\Framework\TestCase;
16+
use Psr\Log\LoggerInterface;
17+
use RdKafka\Conf as KafkaConf;
18+
use RdKafka\Producer as KafkaProducer;
19+
use RdKafka\ProducerTopic;
20+
use Symfony\Component\Messenger\Bridge\Kafka\Tests\Fixtures\TestMessage;
21+
use Symfony\Component\Messenger\Bridge\Kafka\Transport\KafkaMessageSendStamp;
22+
use Symfony\Component\Messenger\Bridge\Kafka\Transport\KafkaSender;
23+
use Symfony\Component\Messenger\Bridge\Kafka\Transport\RdKafkaFactory;
24+
use Symfony\Component\Messenger\Envelope;
25+
use Symfony\Component\Messenger\Transport\Sender\SenderInterface;
26+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
27+
28+
/**
29+
* @author Konstantin Scheumann <konstantin@konstantin.codes>
30+
*
31+
* @requires extension rdkafka
32+
*/
33+
class KafkaSenderTest extends TestCase
34+
{
35+
/** @var MockObject|SerializerInterface */
36+
private $serializer;
37+
38+
/** @var MockObject|KafkaProducer */
39+
private $rdKafkaProducer;
40+
41+
/** @var MockObject|RdKafkaFactory */
42+
private $rdKafkaFactory;
43+
44+
protected function setUp(): void
45+
{
46+
parent::setUp();
47+
48+
$this->serializer = $this->createMock(SerializerInterface::class);
49+
50+
$this->rdKafkaFactory = $this->createMock(RdKafkaFactory::class);
51+
52+
$this->rdKafkaProducer = $this->createMock(KafkaProducer::class);
53+
$this->rdKafkaFactory
54+
->method('createProducer')
55+
->willReturn($this->rdKafkaProducer);
56+
}
57+
58+
public function testConstruct()
59+
{
60+
$sender = new KafkaSender(
61+
$this->createMock(LoggerInterface::class),
62+
$this->serializer,
63+
$this->rdKafkaFactory,
64+
new KafkaConf(),
65+
[]
66+
);
67+
68+
static::assertInstanceOf(SenderInterface::class, $sender);
69+
}
70+
71+
public function testSend()
72+
{
73+
$sender = new KafkaSender(
74+
$this->createMock(LoggerInterface::class),
75+
$this->serializer,
76+
$this->rdKafkaFactory,
77+
new KafkaConf(),
78+
[
79+
'topic_name' => 'test_topic_kafka_sender_test',
80+
'flush_timeout' => 10000,
81+
'flush_retries' => 10,
82+
'conf' => [],
83+
]
84+
);
85+
86+
$this->serializer->expects(static::once())
87+
->method('encode')
88+
->willReturn([
89+
'body' => '{"data":"my_test_data"}',
90+
'headers' => [
91+
'type' => TestMessage::class,
92+
'Content-Type' => 'application/json',
93+
]
94+
]);
95+
96+
$mockProducerTopic = $this->createMock(ProducerTopic::class);
97+
$this->rdKafkaProducer->expects(static::once())
98+
->method('newTopic')
99+
->with('test_topic_kafka_sender_test')
100+
->willReturn($mockProducerTopic);
101+
102+
$mockProducerTopic->expects(static::once())
103+
->method('producev')
104+
->with(
105+
5,
106+
\RD_KAFKA_MSG_F_BLOCK,
107+
'{"data":"my_test_data"}',
108+
'test_key_123',
109+
[
110+
'type' => TestMessage::class,
111+
'Content-Type' => 'application/json',
112+
],
113+
1681790400
114+
);
115+
116+
$sender->send(new Envelope(
117+
new TestMessage('my_test_data'),
118+
[
119+
new KafkaMessageSendStamp([
120+
'partition' => 5,
121+
'msgflags' => \RD_KAFKA_MSG_F_BLOCK,
122+
'key' => 'test_key_123',
123+
'timestamp_ms' => 1681790400,
124+
])
125+
]
126+
));
127+
}
128+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Messenger\Bridge\Kafka\Tests\Transport;
13+
14+
use PHPUnit\Framework\TestCase;
15+
use Psr\Log\NullLogger;
16+
use Symfony\Component\Messenger\Bridge\Kafka\Transport\KafkaTransportFactory;
17+
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
18+
use Symfony\Component\Messenger\Transport\TransportInterface;
19+
20+
/**
21+
* @author Konstantin Scheumann <konstantin@konstantin.codes>
22+
*
23+
* @requires extension rdkafka
24+
*/
25+
class KafkaTransportFactoryTest extends TestCase
26+
{
27+
private KafkaTransportFactory $factory;
28+
private SerializerInterface $serializer;
29+
30+
protected function setUp(): void
31+
{
32+
parent::setUp();
33+
34+
$this->factory = new KafkaTransportFactory(new NullLogger());
35+
$this->serializer = $this->createMock(SerializerInterface::class);
36+
}
37+
38+
public function testSupports()
39+
{
40+
static::assertTrue($this->factory->supports('kafka://my-local-kafka:9092', []));
41+
static::assertTrue($this->factory->supports('kafka://prod-kafka-01:9093,prod-kafka-01:9093,prod-kafka-01:9093', []));
42+
}
43+
44+
public function testCreateTransport()
45+
{
46+
$transport = $this->factory->createTransport(
47+
'kafka://my-local-kafka:9092',
48+
[
49+
'conf' => [],
50+
'consumer' => [
51+
'topics' => [
52+
'test',
53+
],
54+
'receive_timeout' => 10000,
55+
'conf' => [],
56+
],
57+
],
58+
$this->serializer
59+
);
60+
61+
static::assertInstanceOf(TransportInterface::class, $transport);
62+
}
63+
64+
public function testCreateTransportFromDsn()
65+
{
66+
$transport = $this->factory->createTransport(
67+
'kafka://kafka1,kafka2:9092?consumer[topics][0]=test&consumer[receive_timeout]=10000',
68+
[],
69+
$this->serializer
70+
);
71+
72+
static::assertInstanceOf(TransportInterface::class, $transport);
73+
}
74+
}

0 commit comments

Comments
 (0)