Skip to content

Commit 69e7755

Browse files
committed
add exit status flow
1 parent 565f0f7 commit 69e7755

File tree

6 files changed

+72
-11
lines changed

6 files changed

+72
-11
lines changed

pkg/enqueue/Consumption/Context/End.php

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,23 @@ final class End
2727
*/
2828
private $logger;
2929

30-
public function __construct(Context $context, int $startTime, int $endTime, LoggerInterface $logger)
31-
{
30+
/**
31+
* @var int
32+
*/
33+
private $exitStatus;
34+
35+
public function __construct(
36+
Context $context,
37+
int $startTime,
38+
int $endTime,
39+
LoggerInterface $logger,
40+
?int $exitStatus = null
41+
) {
3242
$this->context = $context;
3343
$this->logger = $logger;
3444
$this->startTime = $startTime;
3545
$this->endTime = $endTime;
46+
$this->exitStatus = $exitStatus;
3647
}
3748

3849
public function getContext(): Context
@@ -60,4 +71,9 @@ public function getEndTime(): int
6071
{
6172
return $this->startTime;
6273
}
74+
75+
public function getExitStatus(): ?int
76+
{
77+
return $this->exitStatus;
78+
}
6379
}

pkg/enqueue/Consumption/Context/PostConsume.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ final class PostConsume
4343
*/
4444
private $executionInterrupted;
4545

46+
/**
47+
* @var int
48+
*/
49+
private $exitStatus;
50+
4651
public function __construct(Context $context, SubscriptionConsumer $subscriptionConsumer, int $receivedMessagesCount, int $cycle, int $startTime, LoggerInterface $logger)
4752
{
4853
$this->context = $context;
@@ -85,13 +90,19 @@ public function getLogger(): LoggerInterface
8590
return $this->logger;
8691
}
8792

93+
public function getExitStatus(): ?int
94+
{
95+
return $this->exitStatus;
96+
}
97+
8898
public function isExecutionInterrupted(): bool
8999
{
90100
return $this->executionInterrupted;
91101
}
92102

93-
public function interruptExecution(): void
103+
public function interruptExecution(?int $exitStatus = null): void
94104
{
105+
$this->exitStatus = $exitStatus;
95106
$this->executionInterrupted = true;
96107
}
97108
}

pkg/enqueue/Consumption/Context/PostMessageReceived.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ final class PostMessageReceived
4545
*/
4646
private $executionInterrupted;
4747

48+
/**
49+
* @var int
50+
*/
51+
private $exitStatus;
52+
4853
public function __construct(
4954
Context $context,
5055
Consumer $consumer,
@@ -96,13 +101,19 @@ public function getResult()
96101
return $this->result;
97102
}
98103

104+
public function getExitStatus(): ?int
105+
{
106+
return $this->exitStatus;
107+
}
108+
99109
public function isExecutionInterrupted(): bool
100110
{
101111
return $this->executionInterrupted;
102112
}
103113

104-
public function interruptExecution(): void
114+
public function interruptExecution(?int $exitStatus = null): void
105115
{
116+
$this->exitStatus = $exitStatus;
106117
$this->executionInterrupted = true;
107118
}
108119
}

pkg/enqueue/Consumption/Context/PreConsume.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,11 @@ final class PreConsume
4343
*/
4444
private $executionInterrupted;
4545

46+
/**
47+
* @var int
48+
*/
49+
private $exitStatus;
50+
4651
public function __construct(Context $context, SubscriptionConsumer $subscriptionConsumer, LoggerInterface $logger, int $cycle, int $receiveTimeout, int $startTime)
4752
{
4853
$this->context = $context;
@@ -85,13 +90,19 @@ public function getStartTime(): int
8590
return $this->startTime;
8691
}
8792

93+
public function getExitStatus(): ?int
94+
{
95+
return $this->exitStatus;
96+
}
97+
8898
public function isExecutionInterrupted(): bool
8999
{
90100
return $this->executionInterrupted;
91101
}
92102

93-
public function interruptExecution(): void
103+
public function interruptExecution(?int $exitStatus = null): void
94104
{
105+
$this->exitStatus = $exitStatus;
95106
$this->executionInterrupted = true;
96107
}
97108
}

pkg/enqueue/Consumption/Context/Start.php

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@ final class Start
3838
*/
3939
private $executionInterrupted;
4040

41+
/**
42+
* @var int
43+
*/
44+
private $exitStatus;
45+
4146
/**
4247
* @param BoundProcessor[] $processors
4348
*/
@@ -105,13 +110,19 @@ public function changeBoundProcessors(array $processors): void
105110
});
106111
}
107112

113+
public function getExitStatus(): ?int
114+
{
115+
return $this->exitStatus;
116+
}
117+
108118
public function isExecutionInterrupted(): bool
109119
{
110120
return $this->executionInterrupted;
111121
}
112122

113-
public function interruptExecution(): void
123+
public function interruptExecution(?int $exitStatus = null): void
114124
{
125+
$this->exitStatus = $exitStatus;
115126
$this->executionInterrupted = true;
116127
}
117128
}

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
147147
$extension->onStart($start);
148148

149149
if ($start->isExecutionInterrupted()) {
150-
$this->onEnd($extension, $startTime);
150+
$this->onEnd($extension, $startTime, $start->getExitStatus());
151151

152152
return;
153153
}
@@ -256,7 +256,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
256256
$extension->onPreConsume($preConsume);
257257

258258
if ($preConsume->isExecutionInterrupted()) {
259-
$this->onEnd($extension, $startTime, $subscriptionConsumer);
259+
$this->onEnd($extension, $startTime, $preConsume->getExitStatus(), $subscriptionConsumer);
260260

261261
return;
262262
}
@@ -267,7 +267,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
267267
$extension->onPostConsume($postConsume);
268268

269269
if ($interruptExecution || $postConsume->isExecutionInterrupted()) {
270-
$this->onEnd($extension, $startTime, $subscriptionConsumer);
270+
$this->onEnd($extension, $startTime, $postConsume->getExitStatus(), $subscriptionConsumer);
271271

272272
return;
273273
}
@@ -286,11 +286,12 @@ public function setFallbackSubscriptionConsumer(SubscriptionConsumer $fallbackSu
286286
$this->fallbackSubscriptionConsumer = $fallbackSubscriptionConsumer;
287287
}
288288

289-
private function onEnd(ExtensionInterface $extension, int $startTime, SubscriptionConsumer $subscriptionConsumer = null): void
289+
private function onEnd(ExtensionInterface $extension, int $startTime, ?int $exitStatus = null, SubscriptionConsumer $subscriptionConsumer = null): void
290290
{
291291
$endTime = (int) (microtime(true) * 1000);
292292

293-
$extension->onEnd(new End($this->interopContext, $startTime, $endTime, $this->logger));
293+
$endContext = new End($this->interopContext, $startTime, $endTime, $this->logger, $exitStatus);
294+
$extension->onEnd($endContext);
294295

295296
if ($subscriptionConsumer) {
296297
$subscriptionConsumer->unsubscribeAll();

0 commit comments

Comments
 (0)