Skip to content

[Messenger] Kafka Transport Bridge #39712

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

KonstantinCodes
Copy link
Contributor

@KonstantinCodes KonstantinCodes commented Jan 4, 2021

Q A
Branch? 6.3
Bug fix? no
New feature? yes
Deprecations? no
Tickets Fix #35549
License MIT
Doc PR symfony/symfony-docs#15884

This PR integrates a simple Kafka Transport for Messenger.

@KonstantinCodes KonstantinCodes requested a review from sroze as a code owner January 4, 2021 16:02
@KonstantinCodes KonstantinCodes changed the title Kafka Transport Bridge [Messenger] Kafka Transport Bridge Jan 4, 2021
@KonstantinCodes KonstantinCodes force-pushed the feature/kafka-messenger branch from c5d506c to 38ae438 Compare January 4, 2021 16:10
@nicolas-grekas nicolas-grekas added this to the 5.x milestone Jan 4, 2021
Copy link
Member

@derrabus derrabus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your contribution.

I wonder how difficult it might be to test against an actual Kafka instance in our integration test suite. What do you think?

@KonstantinCodes
Copy link
Contributor Author

@derrabus thank you for your review! I'll get started with the fixes tomorrow.

Actual integration tests are possible. I have a test that writes and reads. I see there is a kafka instance running in travis, so that's cool!

Do you think we could install the current rdkafka on travis?

@derrabus
Copy link
Member

derrabus commented Jan 4, 2021

Do you think we could install the current rdkafka on travis?

Our integration tests run on GitHub Actions. That extension should already be enabled there.

@KonstantinCodes
Copy link
Contributor Author

Oh nice. What about travis though? This test run (Unit Tests) fails because of the missing rdkafka.

@derrabus
Copy link
Member

derrabus commented Jan 4, 2021

There's a block Install extra PHP extensions in .travis.yaml. You can add the extension there.

@KonstantinCodes KonstantinCodes force-pushed the feature/kafka-messenger branch 3 times, most recently from 846e72b to 97ce904 Compare January 5, 2021 10:15
Copy link
Member

@jderusse jderusse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank for working on this.

Few comments from my side. I think you should remove some logs, and reduce the verbosity.

public function testSupports()
{
static::assertTrue($this->factory->supports('kafka://my-local-kafka:9092', []));
static::assertTrue($this->factory->supports('kafka+ssl://my-staging-kafka:9093', []));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use querystring parameter sslmode=disable for consistency with sqs adapter and pgsql

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TLS config is done through settings passed in the conf array, and conforms to the underlying librdkafka format. The protocol in the DSN is only used to recognize this DSN as a Kafka-DSN.

$conf = new KafkaConf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb($this->createRebalanceCb($this->logger));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it have to be a \Closure couldn't it be a callable? $conf->setRebalanceCb([$this, 'rebalanceCb']);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik it can be a callable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it :)

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb($this->createRebalanceCb($this->logger));

$brokers = $this->stripProtocol($dsn);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dns is not only about hosts, other bdriges uses queryString to provide parameters.
ie: kafka://host:port?flushTimeout=5000

private function stripProtocol(string $dsn): array
{
$brokers = [];
foreach (explode(',', $dsn) as $currentBroker) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds fragile to me. What is somebody send kafka://host?param=option1,option2

To provide multiple hosts I would be consistent with redis DSN in cache component

@KonstantinCodes KonstantinCodes force-pushed the feature/kafka-messenger branch 4 times, most recently from 9aea4b6 to f00af8a Compare January 6, 2021 11:14
Comment on lines 76 to 61
'key' => $message->key,
'partition' => $message->partition,
'offset' => $message->offset,
'timestamp' => $message->timestamp,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jderusse I second the opinion that those keys are valuable and often used, especially when cross-language communication between applications is needed. As you can see values passed to serializer come from Kafka Message object, which would be otherwise unavailable to end user.

Ping @nick-zh.

$this->logger->info('Kafka: Partition EOF reached. Waiting for next message ...');
break;
case \RD_KAFKA_RESP_ERR__TIMED_OUT:
$this->logger->debug('Kafka: Consumer timeout.');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a lot of discussion in phprdkafka about this. It's an internal message coming from librdkafka which has a lot of different possible meanings - some are "normal", like described above (especially if partition EOF reporting is disabled via config, which it ideally shouldn't be - so it generates EOF, not this). Others might indicate a problem.

librdkafka.h described this error as:

https://docs.confluent.io/2.0.0/clients/librdkafka/rdkafka_8h.html#a03509bab51072c72a8dcf52337e6d5cba191a3d68aab046a25af5e861a5ce394e

RD_KAFKA_RESP_ERR__TIMED_OUT Operation timed out

$this->logger->debug('Kafka: Consumer timeout.');
break;
case \RD_KAFKA_RESP_ERR__TRANSPORT:
$this->logger->debug('Kafka: Broker transport failure.');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above unfortunately - usually consumer can recover from this if connection can be reestablished. In this case I'd move this up to an error level, because if this happens then something wrong is happening with the broker / communication, but as said - consumer can recover from this.

$message = $transportStamp->getMessage();

if ($this->properties->isCommitAsync()) {
$consumer->commitAsync($message);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using commit (synchronous) usually drops the throughput of the implementation (afaik). From my experience people that use phprdkafka usually switch to async commits strictly because the difference in speed it has.

@nick-zh can you support / deny me on this?

$payload['body'],
$payload['key'] ?? null,
$payload['headers'] ?? null,
$payload['timestamp_ms'] ?? null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's allowed by phprdkafka:

timestamp_ms
Timestamp that should be set for the message, if not set, the broker will set it.

In this case, it should be omitted or read from a stamp.

$conf = new KafkaConf();

// Set a rebalance callback to log partition assignments (optional)
$conf->setRebalanceCb($this->createRebalanceCb($this->logger));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik it can be a callable.

@OskarStark
Copy link
Contributor

You are welcome 😃

@KonstantinCodes KonstantinCodes force-pushed the feature/kafka-messenger branch 8 times, most recently from 9a372cc to 0a89864 Compare December 17, 2022 08:52
@KonstantinCodes KonstantinCodes force-pushed the feature/kafka-messenger branch 6 times, most recently from 043b54f to b2f3785 Compare February 4, 2023 04:47
@derrabus
Copy link
Member

FYI, a second PR has been opened on this topic: #51070.

@KonstantinCodes
Copy link
Contributor Author

@derrabus Oh that's interesting! This PR is just waiting for a review at the moment.

@nicolas-grekas nicolas-grekas modified the milestones: 6.4, 7.1 Nov 15, 2023
@chalasr
Copy link
Member

chalasr commented Apr 6, 2024

Although I think it'd have been better to contribute to this PR instead of opening a new (competing) one, the implementation in #51070 seems more inline with the main component design and what we have in other bridges. I'm also pretty sure some concerns addressed here helped making the other implementation better. All in all, I'm going to take the easy path and close this one in favor of #51070. Sorry about that, thanks a lot for your work on this.

@chalasr chalasr closed this Apr 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Messenger][RFC] Kafka transport