27
27
*/
28
28
class Connection
29
29
{
30
+ private const DEFAULT_OPTIONS = [
31
+ 'stream ' => 'messages ' ,
32
+ 'group ' => 'symfony ' ,
33
+ 'consumer ' => 'consumer ' ,
34
+ 'auto_setup ' => true ,
35
+ ];
36
+
30
37
private $ connection ;
31
38
private $ stream ;
32
39
private $ group ;
@@ -38,9 +45,10 @@ public function __construct(array $configuration, array $connectionCredentials =
38
45
$ this ->connection = $ redis ?: new \Redis ();
39
46
$ this ->connection ->connect ($ connectionCredentials ['host ' ] ?? '127.0.0.1 ' , $ connectionCredentials ['port ' ] ?? 6379 );
40
47
$ this ->connection ->setOption (\Redis::OPT_SERIALIZER , $ redisOptions ['serializer ' ] ?? \Redis::SERIALIZER_PHP );
41
- $ this ->stream = $ configuration ['stream ' ] ?? '' ?: 'messages ' ;
42
- $ this ->group = $ configuration ['group ' ] ?? '' ?: 'symfony ' ;
43
- $ this ->consumer = $ configuration ['consumer ' ] ?? '' ?: 'consumer ' ;
48
+ $ this ->stream = $ configuration ['stream ' ] ?? self ::DEFAULT_OPTIONS ['stream ' ];
49
+ $ this ->group = $ configuration ['group ' ] ?? self ::DEFAULT_OPTIONS ['group ' ];
50
+ $ this ->consumer = $ configuration ['consumer ' ] ?? self ::DEFAULT_OPTIONS ['consumer ' ];
51
+ $ this ->autoSetup = $ configuration ['auto_setup ' ] ?? self ::DEFAULT_OPTIONS ['auto_setup ' ];
44
52
}
45
53
46
54
public static function fromDsn (string $ dsn , array $ redisOptions = [], \Redis $ redis = null ): self
@@ -51,9 +59,9 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
51
59
52
60
$ pathParts = explode ('/ ' , $ parsedUrl ['path ' ] ?? '' );
53
61
54
- $ stream = $ pathParts [1 ] ?? '' ;
55
- $ group = $ pathParts [2 ] ?? '' ;
56
- $ consumer = $ pathParts [3 ] ?? '' ;
62
+ $ stream = $ pathParts [1 ] ?? null ;
63
+ $ group = $ pathParts [2 ] ?? null ;
64
+ $ consumer = $ pathParts [3 ] ?? null ;
57
65
58
66
$ connectionCredentials = [
59
67
'host ' => $ parsedUrl ['host ' ] ?? '127.0.0.1 ' ,
@@ -64,11 +72,21 @@ public static function fromDsn(string $dsn, array $redisOptions = [], \Redis $re
64
72
parse_str ($ parsedUrl ['query ' ], $ redisOptions );
65
73
}
66
74
67
- return new self (['stream ' => $ stream , 'group ' => $ group , 'consumer ' => $ consumer ], $ connectionCredentials , $ redisOptions , $ redis );
75
+ $ autoSetup = null ;
76
+ if (\array_key_exists ('auto_setup ' , $ redisOptions )) {
77
+ $ autoSetup = filter_var ($ redisOptions ['auto_setup ' ], FILTER_VALIDATE_BOOLEAN );
78
+ unset($ redisOptions ['auto_setup ' ]);
79
+ }
80
+
81
+ return new self (['stream ' => $ stream , 'group ' => $ group , 'consumer ' => $ consumer , 'auto_setup ' => $ autoSetup ], $ connectionCredentials , $ redisOptions , $ redis );
68
82
}
69
83
70
84
public function get (): ?array
71
85
{
86
+ if ($ this ->autoSetup ) {
87
+ $ this ->setup ();
88
+ }
89
+
72
90
$ messageId = '> ' ; // will receive new messages
73
91
74
92
if ($ this ->couldHavePendingMessages ) {
@@ -141,6 +159,10 @@ public function reject(string $id): void
141
159
142
160
public function add (string $ body , array $ headers ): void
143
161
{
162
+ if ($ this ->autoSetup ) {
163
+ $ this ->setup ();
164
+ }
165
+
144
166
$ e = null ;
145
167
try {
146
168
$ added = $ this ->connection ->xadd ($ this ->stream , '* ' , ['message ' => json_encode (
@@ -161,5 +183,7 @@ public function setup(): void
161
183
} catch (\RedisException $ e ) {
162
184
throw new TransportException ($ e ->getMessage (), 0 , $ e );
163
185
}
186
+
187
+ $ this ->autoSetup = false ;
164
188
}
165
189
}
0 commit comments