File tree Expand file tree Collapse file tree 1 file changed +41
-0
lines changed Expand file tree Collapse file tree 1 file changed +41
-0
lines changed Original file line number Diff line number Diff line change 6
6
7
7
class LuaScripts
8
8
{
9
+ /**
10
+ * KEYS[1] - The queue we are reading message
11
+ * KEYS[2] - The reserved queue we are moving message to
12
+ * ARGV[1] - Now timestamp
13
+ * ARGV[2] - Redelivery at timestamp
14
+ */
15
+ public static function receiveMessage ()
16
+ {
17
+ return <<<LUA
18
+ local message = redis.call('RPOP', KEYS[1])
19
+
20
+ if (not message) then
21
+ return nil
22
+ end
23
+
24
+ local jsonSuccess, json = pcall(cjson.decode, message);
25
+
26
+ if (not jsonSuccess) then
27
+ return nil
28
+ end
29
+
30
+ if (nil == json['headers']['attempts']) then
31
+ json['headers']['attempts'] = 0
32
+ end
33
+
34
+ if (0 == json['headers']['attempts'] and nil ~= json['headers']['expires_at']) then
35
+ if (tonumber(ARGV[1]) > json['headers']['expires_at']) then
36
+ return nil
37
+ end
38
+ end
39
+
40
+ json['headers']['attempts'] = json['headers']['attempts'] + 1
41
+
42
+ message = cjson.encode(json)
43
+
44
+ redis.call('ZADD', KEYS[2], tonumber(ARGV[2]), message)
45
+
46
+ return message
47
+ LUA ;
48
+ }
49
+
9
50
/**
10
51
* Get the Lua script to migrate expired messages back onto the queue.
11
52
*
You can’t perform that action at this time.
0 commit comments