Skip to content

Commit 733cadd

Browse files
committed
Maintain backward compatibility
1 parent 65134d7 commit 733cadd

File tree

2 files changed

+29
-1
lines changed

2 files changed

+29
-1
lines changed

pkg/rdkafka/RdKafkaContext.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,17 @@ public function purgeQueue(Queue $queue): void
138138
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
139139
}
140140

141+
public static function getLibrdKafkaVersion(): string
142+
{
143+
if (!defined('RD_KAFKA_VERSION')) {
144+
throw new \RuntimeException('RD_KAFKA_VERSION constant is not defined. Phprdkafka is probably not installed');
145+
}
146+
$major = (RD_KAFKA_VERSION & 0xFF000000) >> 24;
147+
$minor = (RD_KAFKA_VERSION & 0x00FF0000) >> 16;
148+
$patch = (RD_KAFKA_VERSION & 0x0000FF00) >> 8;
149+
return "$major.$minor.$patch";
150+
}
151+
141152
private function getProducer(): VendorProducer
142153
{
143154
if (null === $this->producer) {

pkg/rdkafka/RdKafkaProducer.php

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,24 @@ public function send(Destination $destination, Message $message): void
4242
$key = $message->getKey() ?: $destination->getKey() ?: null;
4343

4444
$topic = $this->producer->newTopic($destination->getTopicName(), $destination->getConf());
45-
$topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
45+
46+
// Note: Topic::producev method exists in phprdkafka > 3.1.0
47+
// Headers in payload are maintained for backwards compatibility with apps that might run on lower phprdkafka version
48+
if (method_exists($topic, 'producev')) {
49+
// Phprdkafka <= 3.1.0 will fail calling `producev` on librdkafka >= 1.0.0 causing segfault
50+
if (version_compare(RdKafkaContext::getLibrdKafkaVersion(), '1.0.0', '>=')
51+
&& version_compare(phpversion('rdkafka'), '3.1.0', '<=')) {
52+
trigger_error(
53+
'Phprdkafka <= 3.1.0 is incompatible with librdkafka 1.0.0 when calling `producev`',
54+
E_USER_WARNING
55+
);
56+
} else {
57+
$topic->producev($partition, 0 /* must be 0 */, $payload, $key, $message->getHeaders());
58+
return;
59+
}
60+
}
61+
62+
$topic->produce($partition, 0 /* must be 0 */, $payload, $key);
4663
}
4764

4865
/**

0 commit comments

Comments
 (0)