-
-
Notifications
You must be signed in to change notification settings - Fork 9.6k
[Messenger] Kafka Transport Bridge #39712
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Messenger] Kafka Transport Bridge #39712
Conversation
c5d506c
to
38ae438
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution.
I wonder how difficult it might be to test against an actual Kafka instance in our integration test suite. What do you think?
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportFactoryTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
@derrabus thank you for your review! I'll get started with the fixes tomorrow. Actual integration tests are possible. I have a test that writes and reads. I see there is a kafka instance running in travis, so that's cool! Do you think we could install the current rdkafka on travis? |
Our integration tests run on GitHub Actions. That extension should already be enabled there. |
Oh nice. What about travis though? This test run (Unit Tests) fails because of the missing rdkafka. |
There's a block |
846e72b
to
97ce904
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for working on this.
Few comments from my side. I think you should remove some logs, and reduce the verbosity.
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Fixtures/TestMessage.php
Outdated
Show resolved
Hide resolved
public function testSupports() | ||
{ | ||
static::assertTrue($this->factory->supports('kafka://my-local-kafka:9092', [])); | ||
static::assertTrue($this->factory->supports('kafka+ssl://my-staging-kafka:9093', [])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use querystring parameter sslmode=disable
for consistency with sqs adapter and pgsql
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The TLS config is done through settings passed in the conf
array, and conforms to the underlying librdkafka format. The protocol in the DSN is only used to recognize this DSN as a Kafka-DSN.
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaMessageStamp.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransport.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransportFactory.php
Outdated
Show resolved
Hide resolved
$conf = new KafkaConf(); | ||
|
||
// Set a rebalance callback to log partition assignments (optional) | ||
$conf->setRebalanceCb($this->createRebalanceCb($this->logger)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it have to be a \Closure
couldn't it be a callable? $conf->setRebalanceCb([$this, 'rebalanceCb']);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik it can be a callable
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it :)
// Set a rebalance callback to log partition assignments (optional) | ||
$conf->setRebalanceCb($this->createRebalanceCb($this->logger)); | ||
|
||
$brokers = $this->stripProtocol($dsn); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dns is not only about hosts, other bdriges uses queryString to provide parameters.
ie: kafka://host:port?flushTimeout=5000
private function stripProtocol(string $dsn): array | ||
{ | ||
$brokers = []; | ||
foreach (explode(',', $dsn) as $currentBroker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds fragile to me. What is somebody send kafka://host?param=option1,option2
To provide multiple hosts I would be consistent with redis DSN in cache component
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransportFactory.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiver.php
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiverProperties.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaReceiverProperties.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaSender.php
Outdated
Show resolved
Hide resolved
9aea4b6
to
f00af8a
Compare
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportIntegrationTest.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Tests/Transport/KafkaTransportTest.php
Outdated
Show resolved
Hide resolved
'key' => $message->key, | ||
'partition' => $message->partition, | ||
'offset' => $message->offset, | ||
'timestamp' => $message->timestamp, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
$this->logger->info('Kafka: Partition EOF reached. Waiting for next message ...'); | ||
break; | ||
case \RD_KAFKA_RESP_ERR__TIMED_OUT: | ||
$this->logger->debug('Kafka: Consumer timeout.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We had a lot of discussion in phprdkafka about this. It's an internal message coming from librdkafka which has a lot of different possible meanings - some are "normal", like described above (especially if partition EOF reporting is disabled via config, which it ideally shouldn't be - so it generates EOF, not this). Others might indicate a problem.
librdkafka.h described this error as:
RD_KAFKA_RESP_ERR__TIMED_OUT | Operation timed out |
---|
$this->logger->debug('Kafka: Consumer timeout.'); | ||
break; | ||
case \RD_KAFKA_RESP_ERR__TRANSPORT: | ||
$this->logger->debug('Kafka: Broker transport failure.'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above unfortunately - usually consumer can recover from this if connection can be reestablished. In this case I'd move this up to an error
level, because if this happens then something wrong is happening with the broker / communication, but as said - consumer can recover from this.
$message = $transportStamp->getMessage(); | ||
|
||
if ($this->properties->isCommitAsync()) { | ||
$consumer->commitAsync($message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using commit
(synchronous) usually drops the throughput of the implementation (afaik). From my experience people that use phprdkafka usually switch to async commits strictly because the difference in speed it has.
@nick-zh can you support / deny me on this?
$payload['body'], | ||
$payload['key'] ?? null, | ||
$payload['headers'] ?? null, | ||
$payload['timestamp_ms'] ?? null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's allowed by phprdkafka:
timestamp_ms
Timestamp that should be set for the message, if not set, the broker will set it.
In this case, it should be omitted or read from a stamp.
$conf = new KafkaConf(); | ||
|
||
// Set a rebalance callback to log partition assignments (optional) | ||
$conf->setRebalanceCb($this->createRebalanceCb($this->logger)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Afaik it can be a callable
.
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaSender.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransport.php
Outdated
Show resolved
Hide resolved
src/Symfony/Component/Messenger/Bridge/Kafka/Transport/KafkaTransportFactory.php
Outdated
Show resolved
Hide resolved
You are welcome 😃 |
9a372cc
to
0a89864
Compare
043b54f
to
b2f3785
Compare
b2f3785
to
3614ebd
Compare
FYI, a second PR has been opened on this topic: #51070. |
@derrabus Oh that's interesting! This PR is just waiting for a review at the moment. |
Although I think it'd have been better to contribute to this PR instead of opening a new (competing) one, the implementation in #51070 seems more inline with the main component design and what we have in other bridges. I'm also pretty sure some concerns addressed here helped making the other implementation better. All in all, I'm going to take the easy path and close this one in favor of #51070. Sorry about that, thanks a lot for your work on this. |
This PR integrates a simple Kafka Transport for Messenger.