Skip to content

Commit b906e18

Browse files
committed
Call flush method on shutdown / context closing
This allows compatibility with phprdkafka 4.0
1 parent c20cc5a commit b906e18

File tree

8 files changed

+46
-26
lines changed

8 files changed

+46
-26
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ script:
6868
- if [ "$PHPSTAN" = true ] && [ ! -z "${PKG_PHP_CHANGED_FILES}" ]; then docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon -- ${PKG_PHP_CHANGED_FILES[@]} ; fi
6969
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
7070
- if [ "$FUNCTIONAL_TESTS" = true ]; then bin/test.sh --exclude-group=rdkafka; fi
71-
- if [ "RDKAFKA_TESTS" = true ]; then bin/test.sh --group=rdkafka; fi
71+
- if [ "$RDKAFKA_TESTS" = true ]; then bin/test.sh --group=rdkafka; fi
7272

7373
notifications:
7474
webhooks:

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
"doctrine/doctrine-bundle": "~1.2|^2",
6060
"doctrine/mongodb-odm-bundle": "^3.5|^4",
6161
"alcaeus/mongo-php-adapter": "^1.0",
62-
"kwn/php-rdkafka-stubs": "^1.0.2",
62+
"kwn/php-rdkafka-stubs": "^1.0.2 | ^2.0",
6363
"friendsofphp/php-cs-fixer": "^2"
6464
},
6565
"autoload": {

docker-compose.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ services:
9999
- '2181:2181'
100100

101101
kafka:
102-
image: 'wurstmeister/kafka:0.10.2.1'
102+
image: 'wurstmeister/kafka'
103103
ports:
104104
- '9092:9092'
105105
environment:
106106
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
107-
volumes:
108-
- '/var/run/docker.sock:/var/run/docker.sock'
107+
KAFKA_ADVERTISED_HOST_NAME: kafka
108+
KAFKA_ADVERTISED_PORT: 9092
109109

110110
google-pubsub:
111111
image: 'google/cloud-sdk:latest'

pkg/rdkafka/RdKafkaConnectionFactory.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class RdKafkaConnectionFactory implements ConnectionFactory
2828
* 'partitioner' => null, // https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-topicconf.setpartitioner.html
2929
* 'log_level' => null,
3030
* 'commit_async' => false,
31+
* 'shutdown_timeout' => -1, // https://github.com/arnaud-lb/php-rdkafka#proper-shutdown
3132
* ]
3233
*
3334
* or

pkg/rdkafka/RdKafkaContext.php

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class RdKafkaContext implements Context
3636
private $conf;
3737

3838
/**
39-
* @var Producer
39+
* @var RdKafkaProducer
4040
*/
4141
private $producer;
4242

@@ -96,7 +96,23 @@ public function createTemporaryQueue(): Queue
9696
*/
9797
public function createProducer(): Producer
9898
{
99-
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
99+
if (!isset($this->producer)) {
100+
$producer = new VendorProducer($this->getConf());
101+
102+
if (isset($this->config['log_level'])) {
103+
$producer->setLogLevel($this->config['log_level']);
104+
}
105+
106+
$this->producer = new RdKafkaProducer($producer, $this->getSerializer());
107+
108+
// Once created RdKafkaProducer can store messages internally that need to be delivered before PHP shuts
109+
// down. Otherwise, we are bound to lose messages in transit.
110+
// Note that it is generally preferable to call "close" method explicitly before shutdown starts, since
111+
// otherwise we might not have access to some objects, like database connections.
112+
register_shutdown_function([$this->producer, 'flush'], $this->config['shutdown_timeout'] ?? -1);
113+
}
114+
115+
return $this->producer;
100116
}
101117

102118
/**
@@ -139,6 +155,11 @@ public function close(): void
139155
foreach ($kafkaConsumers as $kafkaConsumer) {
140156
$kafkaConsumer->unsubscribe();
141157
}
158+
159+
// Compatibility with phprdkafka 4.0.
160+
if (isset($this->producer)) {
161+
$this->producer->flush($this->config['shutdown_timeout'] ?? -1);
162+
}
142163
}
143164

144165
public function createSubscriptionConsumer(): SubscriptionConsumer
@@ -163,19 +184,6 @@ public static function getLibrdKafkaVersion(): string
163184
return "$major.$minor.$patch";
164185
}
165186

166-
private function getProducer(): VendorProducer
167-
{
168-
if (null === $this->producer) {
169-
$this->producer = new VendorProducer($this->getConf());
170-
171-
if (isset($this->config['log_level'])) {
172-
$this->producer->setLogLevel($this->config['log_level']);
173-
}
174-
}
175-
176-
return $this->producer;
177-
}
178-
179187
private function getConf(): Conf
180188
{
181189
if (null === $this->conf) {

pkg/rdkafka/RdKafkaProducer.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,12 @@ public function getTimeToLive(): ?int
111111
{
112112
return null;
113113
}
114+
115+
public function flush(int $timeout): void
116+
{
117+
// Flush method is exposed in phprdkafka 4.0
118+
if (method_exists($this->producer, 'flush')) {
119+
$this->producer->flush($timeout);
120+
}
121+
}
114122
}

pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@ public function test()
1919

2020
$topic = $this->createTopic($context, uniqid('', true));
2121

22-
$consumer = $context->createConsumer($topic);
23-
2422
$expectedBody = __CLASS__.time();
23+
$producer = $context->createProducer();
24+
$producer->send($topic, $context->createMessage($expectedBody));
25+
26+
// Calling close causes Producer to flush (wait for messages to be delivered to Kafka)
27+
$context->close();
28+
29+
$consumer = $context->createConsumer($topic);
2530

2631
$context->createProducer()->send($topic, $context->createMessage($expectedBody));
2732

@@ -48,8 +53,6 @@ protected function createContext()
4853

4954
$context = (new RdKafkaConnectionFactory($config))->createContext();
5055

51-
sleep(3);
52-
5356
return $context;
5457
}
5558
}

pkg/rdkafka/composer.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
"license": "MIT",
88
"require": {
99
"php": "^7.1.3",
10-
"ext-rdkafka": "^3.0.3",
10+
"ext-rdkafka": "^3.0.3|^4.0",
1111
"queue-interop/queue-interop": "^0.8"
1212
},
1313
"require-dev": {
1414
"phpunit/phpunit": "~7.5",
1515
"enqueue/test": "0.10.x-dev",
1616
"enqueue/null": "0.10.x-dev",
1717
"queue-interop/queue-spec": "^0.6",
18-
"kwn/php-rdkafka-stubs": "^1.0.2"
18+
"kwn/php-rdkafka-stubs": "^1.0.2 | ^2.0"
1919
},
2020
"support": {
2121
"email": "opensource@forma-pro.com",

0 commit comments

Comments
 (0)