changed
CHANGELOG.md
|
@@ -5,6 +5,30 @@ All notable changes to this project will be documented in this file.
|
5
5
|
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
6
6
|
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
7
7
|
|
8
|
+ ## [0.6.0] - 2019-03-18
|
9
|
+
|
10
|
+ ### Added
|
11
|
+
|
12
|
+ - [Kiq.Encoder] Wrap `Jason.encode` and attempt to sanitize terms that can't be
|
13
|
+ represented as JSON. This is primarily a safeguard against enqueuing jobs with
|
14
|
+ structs or tuples as arguments.
|
15
|
+
|
16
|
+ ### Fixed
|
17
|
+
|
18
|
+ - [Kiq.Client.Resurrection] Correct usage of SCAN to ensure all backup queues
|
19
|
+ will be found for re-enqueueing.
|
20
|
+
|
21
|
+ ### Changed
|
22
|
+
|
23
|
+ - [Kiq.Job] Keys within job arguments are no longer converted to atoms. This is
|
24
|
+ for safe execution and is in line with how Phoenix passes controller
|
25
|
+ arguments.
|
26
|
+ - [Kiq.Client] Enqueue all jobs in a single pipelined command. This moves all of
|
27
|
+ the unique, scheduling and enqueueing logic into a lua script. The script
|
28
|
+ reduces the number of round trips to redis and enabled pipelining. Overall
|
29
|
+ this improved job enqueue/execute performance from 1800 jobs/sec to slightly
|
30
|
+ more than 3700 jobs/sec.
|
31
|
+
|
8
32
|
## [0.5.0] — 2018-12-14
|
9
33
|
|
10
34
|
### Added
|
changed
hex_metadata.config
|
@@ -4,63 +4,65 @@
|
4
4
|
<<"Robust job queue compatible with Sidekiq Enterprise, powered by GenStage and Redis">>}.
|
5
5
|
{<<"elixir">>,<<"~> 1.6">>}.
|
6
6
|
{<<"files">>,
|
7
|
- [<<"lib">>,<<"lib/kiq">>,<<"lib/kiq.ex">>,<<"lib/kiq/client">>,
|
8
|
- <<"lib/kiq/client.ex">>,<<"lib/kiq/client/cleanup.ex">>,
|
9
|
- <<"lib/kiq/client/introspection.ex">>,<<"lib/kiq/client/leadership.ex">>,
|
10
|
- <<"lib/kiq/client/locking.ex">>,<<"lib/kiq/client/queueing.ex">>,
|
11
|
- <<"lib/kiq/client/resurrection.ex">>,<<"lib/kiq/client/stats.ex">>,
|
12
|
- <<"lib/kiq/config.ex">>,<<"lib/kiq/heartbeat.ex">>,
|
13
|
- <<"lib/kiq/identity.ex">>,<<"lib/kiq/job.ex">>,<<"lib/kiq/logger.ex">>,
|
14
|
- <<"lib/kiq/naming.ex">>,<<"lib/kiq/necromancer.ex">>,<<"lib/kiq/periodic">>,
|
15
|
- <<"lib/kiq/periodic.ex">>,<<"lib/kiq/periodic/crontab.ex">>,
|
16
|
- <<"lib/kiq/periodic/parser.ex">>,<<"lib/kiq/periodic/scheduler.ex">>,
|
17
|
- <<"lib/kiq/pool">>,<<"lib/kiq/pool.ex">>,<<"lib/kiq/pool/supervisor.ex">>,
|
18
|
- <<"lib/kiq/queue">>,<<"lib/kiq/queue/consumer.ex">>,
|
19
|
- <<"lib/kiq/queue/producer.ex">>,<<"lib/kiq/queue/runner.ex">>,
|
20
|
- <<"lib/kiq/queue/scheduler.ex">>,<<"lib/kiq/queue/supervisor.ex">>,
|
21
|
- <<"lib/kiq/reporter">>,<<"lib/kiq/reporter.ex">>,
|
22
|
- <<"lib/kiq/reporter/instrumenter.ex">>,<<"lib/kiq/reporter/logger.ex">>,
|
23
|
- <<"lib/kiq/reporter/producer.ex">>,<<"lib/kiq/reporter/retryer.ex">>,
|
24
|
- <<"lib/kiq/reporter/stats.ex">>,<<"lib/kiq/reporter/supervisor.ex">>,
|
25
|
- <<"lib/kiq/reporter/unlocker.ex">>,<<"lib/kiq/running_job.ex">>,
|
26
|
- <<"lib/kiq/script.ex">>,<<"lib/kiq/senator.ex">>,
|
27
|
- <<"lib/kiq/supervisor.ex">>,<<"lib/kiq/testing.ex">>,
|
28
|
- <<"lib/kiq/timestamp.ex">>,<<"lib/kiq/worker.ex">>,<<"lib/mix">>,
|
29
|
- <<"lib/mix/kiq.ex">>,<<"lib/mix/tasks">>,
|
7
|
+ [<<"lib">>,<<"lib/kiq.ex">>,<<"lib/mix">>,<<"lib/mix/tasks">>,
|
30
8
|
<<"lib/mix/tasks/kiq.gen.reporter.ex">>,
|
31
|
- <<"lib/mix/tasks/kiq.gen.supervisor.ex">>,
|
32
|
- <<"lib/mix/tasks/kiq.gen.worker.ex">>,<<"priv">>,<<"priv/scripts">>,
|
33
|
- <<"priv/scripts/dequeue.lua">>,<<"priv/scripts/deschedule.lua">>,
|
34
|
- <<"priv/scripts/reelect.lua">>,<<"priv/scripts/resign.lua">>,
|
35
|
- <<"priv/scripts/resurrect.lua">>,<<".formatter.exs">>,<<"mix.exs">>,
|
36
|
- <<"README.md">>,<<"LICENSE.txt">>,<<"CHANGELOG.md">>]}.
|
9
|
+ <<"lib/mix/tasks/kiq.gen.worker.ex">>,
|
10
|
+ <<"lib/mix/tasks/kiq.gen.supervisor.ex">>,<<"lib/mix/kiq.ex">>,
|
11
|
+ <<"lib/kiq">>,<<"lib/kiq/job.ex">>,<<"lib/kiq/supervisor.ex">>,
|
12
|
+ <<"lib/kiq/naming.ex">>,<<"lib/kiq/periodic">>,
|
13
|
+ <<"lib/kiq/periodic/crontab.ex">>,<<"lib/kiq/periodic/scheduler.ex">>,
|
14
|
+ <<"lib/kiq/periodic/parser.ex">>,<<"lib/kiq/database">>,
|
15
|
+ <<"lib/kiq/reporter.ex">>,<<"lib/kiq/client.ex">>,
|
16
|
+ <<"lib/kiq/necromancer.ex">>,<<"lib/kiq/encoder.ex">>,
|
17
|
+ <<"lib/kiq/logger.ex">>,<<"lib/kiq/worker.ex">>,<<"lib/kiq/reporter">>,
|
18
|
+ <<"lib/kiq/reporter/supervisor.ex">>,<<"lib/kiq/reporter/producer.ex">>,
|
19
|
+ <<"lib/kiq/reporter/instrumenter.ex">>,<<"lib/kiq/reporter/logger.ex">>,
|
20
|
+ <<"lib/kiq/reporter/retryer.ex">>,<<"lib/kiq/reporter/stats.ex">>,
|
21
|
+ <<"lib/kiq/reporter/unlocker.ex">>,<<"lib/kiq/running_job.ex">>,
|
22
|
+ <<"lib/kiq/config.ex">>,<<"lib/kiq/periodic.ex">>,
|
23
|
+ <<"lib/kiq/heartbeat.ex">>,<<"lib/kiq/testing.ex">>,<<"lib/kiq/pool.ex">>,
|
24
|
+ <<"lib/kiq/timestamp.ex">>,<<"lib/kiq/script.ex">>,
|
25
|
+ <<"lib/kiq/identity.ex">>,<<"lib/kiq/queue">>,
|
26
|
+ <<"lib/kiq/queue/supervisor.ex">>,<<"lib/kiq/queue/producer.ex">>,
|
27
|
+ <<"lib/kiq/queue/runner.ex">>,<<"lib/kiq/queue/scheduler.ex">>,
|
28
|
+ <<"lib/kiq/queue/consumer.ex">>,<<"lib/kiq/senator.ex">>,<<"lib/kiq/pool">>,
|
29
|
+ <<"lib/kiq/pool/supervisor.ex">>,<<"lib/kiq/client">>,
|
30
|
+ <<"lib/kiq/client/queueing.ex">>,<<"lib/kiq/client/leadership.ex">>,
|
31
|
+ <<"lib/kiq/client/cleanup.ex">>,<<"lib/kiq/client/resurrection.ex">>,
|
32
|
+ <<"lib/kiq/client/locking.ex">>,<<"lib/kiq/client/stats.ex">>,
|
33
|
+ <<"lib/kiq/client/introspection.ex">>,<<"priv">>,<<"priv/scripts">>,
|
34
|
+ <<"priv/scripts/dequeue.lua">>,<<"priv/scripts/enqueue.lua">>,
|
35
|
+ <<"priv/scripts/deschedule.lua">>,<<"priv/scripts/reelect.lua">>,
|
36
|
+ <<"priv/scripts/resign.lua">>,<<"priv/scripts/resurrect.lua">>,
|
37
|
+ <<".formatter.exs">>,<<"mix.exs">>,<<"README.md">>,<<"LICENSE.txt">>,
|
38
|
+ <<"CHANGELOG.md">>]}.
|
37
39
|
{<<"licenses">>,[<<"MIT">>]}.
|
38
40
|
{<<"links">>,[{<<"github">>,<<"https://github.com/sorentwo/kiq">>}]}.
|
39
41
|
{<<"name">>,<<"kiq">>}.
|
40
42
|
{<<"requirements">>,
|
41
|
- [[{<<"app">>,<<"redix">>},
|
42
|
- {<<"name">>,<<"redix">>},
|
43
|
+ [[{<<"app">>,<<"jason">>},
|
44
|
+ {<<"name">>,<<"jason">>},
|
43
45
|
{<<"optional">>,false},
|
44
46
|
{<<"repository">>,<<"hexpm">>},
|
45
|
- {<<"requirement">>,<<"~> 0.9">>}],
|
46
|
- [{<<"app">>,<<"nimble_parsec">>},
|
47
|
- {<<"name">>,<<"nimble_parsec">>},
|
48
|
- {<<"optional">>,false},
|
49
|
- {<<"repository">>,<<"hexpm">>},
|
50
|
- {<<"requirement">>,<<"~> 0.4.0">>}],
|
47
|
+ {<<"requirement">>,<<"~> 1.1">>}],
|
51
48
|
[{<<"app">>,<<"gen_stage">>},
|
52
49
|
{<<"name">>,<<"gen_stage">>},
|
53
50
|
{<<"optional">>,false},
|
54
51
|
{<<"repository">>,<<"hexpm">>},
|
55
52
|
{<<"requirement">>,<<"~> 0.14">>}],
|
53
|
+ [{<<"app">>,<<"nimble_parsec">>},
|
54
|
+ {<<"name">>,<<"nimble_parsec">>},
|
55
|
+ {<<"optional">>,false},
|
56
|
+ {<<"repository">>,<<"hexpm">>},
|
57
|
+ {<<"requirement">>,<<"~> 0.4.0">>}],
|
58
|
+ [{<<"app">>,<<"redix">>},
|
59
|
+ {<<"name">>,<<"redix">>},
|
60
|
+ {<<"optional">>,false},
|
61
|
+ {<<"repository">>,<<"hexpm">>},
|
62
|
+ {<<"requirement">>,<<"~> 0.9">>}],
|
56
63
|
[{<<"app">>,<<"telemetry">>},
|
57
64
|
{<<"name">>,<<"telemetry">>},
|
58
65
|
{<<"optional">>,false},
|
59
66
|
{<<"repository">>,<<"hexpm">>},
|
60
|
- {<<"requirement">>,<<"~> 0.2">>}],
|
61
|
- [{<<"app">>,<<"jason">>},
|
62
|
- {<<"name">>,<<"jason">>},
|
63
|
- {<<"optional">>,false},
|
64
|
- {<<"repository">>,<<"hexpm">>},
|
65
|
- {<<"requirement">>,<<"~> 1.1">>}]]}.
|
66
|
- {<<"version">>,<<"0.5.0">>}.
|
67
|
+ {<<"requirement">>,<<"~> 0.2">>}]]}.
|
68
|
+ {<<"version">>,<<"0.6.0">>}.
|
changed
lib/kiq.ex
|
@@ -107,6 +107,10 @@ defmodule Kiq do
|
107
107
|
Redis. This defaults to `10ms`, though it will back-off by a factor of `1.5`
|
108
108
|
if there are any connection errors.
|
109
109
|
|
110
|
+ * `:periodics` — A list of job scheduling tuples in the form `{schedule,
|
111
|
+ worker}` or `{schedule, worker, options}`. See
|
112
|
+ [Periodic Jobs](#module-periodic-jobs) for details.
|
113
|
+
|
110
114
|
* `:pool_size` — Controls the number of Redis connections available to Kiq,
|
111
115
|
defaults to 5.
|
changed
lib/kiq/client.ex
|
@@ -135,11 +135,9 @@ defmodule Kiq.Client do
|
135
135
|
defp perform_flush(%State{pool: pool, table: table, test_mode: :disabled} = state) do
|
136
136
|
try do
|
137
137
|
conn = Pool.checkout(pool)
|
138
|
+ jobs = :ets.foldl(fn {_key, val}, acc -> [val | acc] end, [], table)
|
138
139
|
|
139
|
- table
|
140
|
- |> :ets.tab2list()
|
141
|
- |> Enum.each(&Queueing.enqueue(conn, elem(&1, 1)))
|
142
|
-
|
140
|
+ :ok = Queueing.enqueue(conn, jobs)
|
143
141
|
true = :ets.delete_all_objects(table)
|
144
142
|
|
145
143
|
transition_to_success(state)
|
changed
lib/kiq/client/queueing.ex
|
@@ -1,36 +1,27 @@
|
1
1
|
defmodule Kiq.Client.Queueing do
|
2
2
|
@moduledoc false
|
3
3
|
|
4
|
- import Redix, only: [command!: 2, noreply_command!: 2, noreply_pipeline!: 2]
|
5
|
- import Kiq.Naming, only: [queue_name: 1, backup_name: 2, unlock_name: 1]
|
4
|
+ import Redix
|
5
|
+ import Kiq.Naming
|
6
6
|
|
7
7
|
alias Kiq.{Job, Script, Timestamp}
|
8
8
|
|
9
9
|
@typep conn :: GenServer.server()
|
10
|
- @typep resp :: {:ok, Job.t()}
|
11
|
-
|
12
|
- @retry_set "retry"
|
13
|
- @schedule_set "schedule"
|
14
10
|
|
15
11
|
@external_resource Script.path("dequeue")
|
16
12
|
@external_resource Script.path("deschedule")
|
13
|
+ @external_resource Script.path("enqueue")
|
17
14
|
@dequeue_sha Script.hash("dequeue")
|
15
|
+ @enqueue_sha Script.hash("enqueue")
|
18
16
|
@deschedule_sha Script.hash("deschedule")
|
19
17
|
|
20
|
- @spec enqueue(conn(), Job.t()) :: resp()
|
21
|
- def enqueue(conn, %Job{} = job) do
|
22
|
- job
|
23
|
- |> check_unique(conn)
|
24
|
- |> case do
|
25
|
- {:ok, %Job{at: at} = job} when is_float(at) ->
|
26
|
- schedule_job(job, @schedule_set, conn)
|
18
|
+ @spec enqueue(conn(), list(Job.t())) :: :ok
|
19
|
+ def enqueue(_conn, []), do: :ok
|
27
20
|
|
28
|
- {:ok, job} ->
|
29
|
- push_job(job, conn)
|
21
|
+ def enqueue(conn, jobs) when is_list(jobs) do
|
22
|
+ commands = for job <- jobs, do: enqueue_command(job)
|
30
23
|
|
31
|
- {:locked, job} ->
|
32
|
- {:ok, job}
|
33
|
- end
|
24
|
+ noreply_pipeline!(conn, commands)
|
34
25
|
end
|
35
26
|
|
36
27
|
@spec dequeue(conn(), binary(), binary(), pos_integer()) :: list(iodata())
|
|
@@ -43,54 +34,43 @@ defmodule Kiq.Client.Queueing do
|
43
34
|
|
44
35
|
@spec deschedule(conn(), binary()) :: :ok
|
45
36
|
def deschedule(conn, set) when is_binary(set) do
|
46
|
- conn
|
47
|
- |> command!(["EVALSHA", @deschedule_sha, "1", set, Timestamp.to_score()])
|
48
|
- |> Enum.map(&Job.decode/1)
|
49
|
- |> Enum.each(&push_job(&1, conn))
|
50
|
-
|
51
|
- :ok
|
37
|
+ noreply_command!(conn, ["EVALSHA", @deschedule_sha, "1", set, Timestamp.to_score()])
|
52
38
|
end
|
53
39
|
|
54
|
- @spec retry(conn(), Job.t()) :: resp()
|
55
|
- def retry(conn, %Job{retry: retry, retry_count: count} = job)
|
40
|
+ @spec retry(conn(), Job.t()) :: :ok
|
41
|
+ def retry(conn, %Job{at: at, retry: retry, retry_count: count} = job)
|
56
42
|
when is_integer(retry) or (retry == true and count > 0) do
|
57
|
- schedule_job(job, @retry_set, conn)
|
43
|
+ noreply_command!(conn, ["ZADD", "retry", Timestamp.to_score(at), Job.encode(job)])
|
58
44
|
end
|
59
45
|
|
60
46
|
# Helpers
|
61
47
|
|
62
|
- defp check_unique(%{unlocks_at: unlocks_at} = job, conn) when is_float(unlocks_at) do
|
63
|
- unlocks_in = trunc((unlocks_at - Timestamp.unix_now()) * 1_000)
|
64
|
- unlock_name = unlock_name(job.unique_token)
|
48
|
+ defp enqueue_command(%Job{queue: queue} = job) do
|
49
|
+ {job, enqueue_at} = maybe_enqueue_at(job)
|
50
|
+ {unique_key, unlocks_in} = maybe_unlocks_in(job)
|
65
51
|
|
66
|
- command = ["SET", unlock_name, to_string(unlocks_at), "PX", to_string(unlocks_in), "NX"]
|
52
|
+ eval_keys = ["EVALSHA", @enqueue_sha, "1", unique_key]
|
53
|
+ eval_args = [Job.encode(job), queue, enqueue_at, unlocks_in]
|
67
54
|
|
68
|
- case command!(conn, command) do
|
69
|
- "OK" -> {:ok, job}
|
70
|
- _res -> {:locked, job}
|
55
|
+ eval_keys ++ eval_args
|
56
|
+ end
|
57
|
+
|
58
|
+ defp maybe_enqueue_at(%Job{at: at} = job) do
|
59
|
+ if is_float(at) do
|
60
|
+ {job, Timestamp.to_score(job.at)}
|
61
|
+ else
|
62
|
+ {%{job | enqueued_at: Timestamp.unix_now()}, nil}
|
71
63
|
end
|
72
64
|
end
|
73
65
|
|
74
|
- defp check_unique(job, _client), do: {:ok, job}
|
66
|
+ defp maybe_unlocks_in(%Job{unique_token: unique_token, unlocks_at: unlocks_at}) do
|
67
|
+ if is_float(unlocks_at) do
|
68
|
+ unique_key = unlock_name(unique_token)
|
69
|
+ unlocks_in = trunc((unlocks_at - Timestamp.unix_now()) * 1_000)
|
75
70
|
|
76
|
- defp push_job(%{queue: queue} = job, conn) do
|
77
|
- job = %Job{job | enqueued_at: Timestamp.unix_now()}
|
78
|
-
|
79
|
- commands = [
|
80
|
- ["SADD", "queues", queue],
|
81
|
- ["LPUSH", queue_name(queue), Job.encode(job)]
|
82
|
- ]
|
83
|
-
|
84
|
- :ok = noreply_pipeline!(conn, commands)
|
85
|
-
|
86
|
- {:ok, job}
|
87
|
- end
|
88
|
-
|
89
|
- defp schedule_job(%Job{at: at} = job, set, conn) do
|
90
|
- score = Timestamp.to_score(at)
|
91
|
-
|
92
|
- :ok = noreply_command!(conn, ["ZADD", set, score, Job.encode(job)])
|
93
|
-
|
94
|
- {:ok, job}
|
71
|
+ {unique_key, to_string(unlocks_in)}
|
72
|
+ else
|
73
|
+ {nil, nil}
|
74
|
+ end
|
95
75
|
end
|
96
76
|
end
|
changed
lib/kiq/client/resurrection.ex
|
@@ -30,8 +30,7 @@ defmodule Kiq.Client.Resurrection do
|
30
30
|
# `SCAN` instead of `KEYS` just in case there are dozens or hundreds of
|
31
31
|
# backup queues.
|
32
32
|
defp fetch_backups(backups, cursor, conn) do
|
33
|
- case command!(conn, ["SCAN", cursor, "MATCH", "queue:backup|*"]) do
|
34
|
- [_cursor, []] -> backups
|
33
|
+ case command!(conn, ["SCAN", cursor, "MATCH", "queue:backup|*", "COUNT", "1000"]) do
|
35
34
|
["0", results] -> backups ++ results
|
36
35
|
[cursor, results] -> fetch_backups(backups ++ results, cursor, conn)
|
37
36
|
end
|
changed
lib/kiq/client/stats.ex
|
@@ -3,7 +3,7 @@ defmodule Kiq.Client.Stats do
|
3
3
|
|
4
4
|
import Redix, only: [noreply_pipeline: 2]
|
5
5
|
|
6
|
- alias Kiq.{Heartbeat, RunningJob, Timestamp}
|
6
|
+ alias Kiq.{Encoder, Heartbeat, RunningJob, Timestamp}
|
7
7
|
|
8
8
|
@typep conn :: GenServer.server()
|
9
9
|
@typep resp :: :ok | {:error, atom() | Redix.Error.t()}
|
|
@@ -14,7 +14,7 @@ defmodule Kiq.Client.Stats do
|
14
14
|
|
15
15
|
wkey = "#{key}:workers"
|
16
16
|
beat = Timestamp.unix_now() |> to_string()
|
17
|
- info = Jason.encode!(heartbeat)
|
17
|
+ info = Encoder.encode(heartbeat)
|
18
18
|
|
19
19
|
commands = [
|
20
20
|
["SADD", "processes", key],
|
changed
lib/kiq/config.ex
|
@@ -31,6 +31,7 @@ defmodule Kiq.Config do
|
31
31
|
schedulers: list(binary()),
|
32
32
|
senator_name: name(),
|
33
33
|
server?: boolean(),
|
34
|
+ stats_flush_interval: pos_integer(),
|
34
35
|
test_mode: :disabled | :sandbox
|
35
36
|
}
|
36
37
|
|
|
@@ -53,6 +54,7 @@ defmodule Kiq.Config do
|
53
54
|
schedulers: ~w(retry schedule),
|
54
55
|
senator_name: Senator,
|
55
56
|
server?: true,
|
57
|
+ stats_flush_interval: 1_000,
|
56
58
|
test_mode: :disabled
|
57
59
|
|
58
60
|
@spec new(map() | Keyword.t()) :: t()
|
added
lib/kiq/encoder.ex
|
@@ -0,0 +1,59 @@
|
1
|
+ defmodule Kiq.Encoder do
|
2
|
+ @moduledoc """
|
3
|
+ This module wraps `Jason.encode/1` to provide consistent and safe JSON encoding.
|
4
|
+
|
5
|
+ Typically this module shouldn't be used outside of Kiq, but the function is
|
6
|
+ documented for reference on how job arguments are encoded.
|
7
|
+ """
|
8
|
+
|
9
|
+ @doc """
|
10
|
+ Safely encode terms into JSON, if possible.
|
11
|
+
|
12
|
+ Some Elixir/Erlang types can't be represented as JSON. To make encoding as reliable as possible
|
13
|
+ the `encode/1` function attempts to convert incompatible terms into JSON friendly values.
|
14
|
+
|
15
|
+ The following sanitization is applied (recursively):
|
16
|
+
|
17
|
+ * `struct` - Converted to a map using `Map.from_struct/1`
|
18
|
+ * `tuple` — Converted to a list
|
19
|
+ * `pid`, `port`, `reference` — Converted to a string using inspect, i.e. "#PID<0.101.0>"
|
20
|
+ """
|
21
|
+ @spec encode(any()) :: binary()
|
22
|
+ def encode(term) do
|
23
|
+ case Jason.encode(term) do
|
24
|
+ {:ok, encoded} ->
|
25
|
+ encoded
|
26
|
+
|
27
|
+ {:error, %Protocol.UndefinedError{}} ->
|
28
|
+ term
|
29
|
+ |> sanitize()
|
30
|
+ |> Jason.encode!()
|
31
|
+ end
|
32
|
+ end
|
33
|
+
|
34
|
+ defp sanitize(%_{} = struct) do
|
35
|
+ struct
|
36
|
+ |> Map.from_struct()
|
37
|
+ |> sanitize()
|
38
|
+ end
|
39
|
+
|
40
|
+ defp sanitize(map) when is_map(map) do
|
41
|
+ for {key, val} <- map, into: %{}, do: {key, sanitize(val)}
|
42
|
+ end
|
43
|
+
|
44
|
+ defp sanitize(list) when is_list(list) do
|
45
|
+ for term <- list, do: sanitize(term)
|
46
|
+ end
|
47
|
+
|
48
|
+ defp sanitize(tuple) when is_tuple(tuple) do
|
49
|
+ tuple
|
50
|
+ |> Tuple.to_list()
|
51
|
+ |> sanitize()
|
52
|
+ end
|
53
|
+
|
54
|
+ defp sanitize(input) when is_pid(input) or is_port(input) or is_reference(input) do
|
55
|
+ inspect(input)
|
56
|
+ end
|
57
|
+
|
58
|
+ defp sanitize(term), do: term
|
59
|
+ end
|
changed
lib/kiq/job.ex
|
@@ -42,7 +42,7 @@ defmodule Kiq.Job do
|
42
42
|
[1]: https://github.com/mperham/sidekiq/wiki/Job-Format
|
43
43
|
"""
|
44
44
|
|
45
|
- alias Kiq.Timestamp
|
45
|
+ alias Kiq.{Encoder, Timestamp}
|
46
46
|
|
47
47
|
@type t :: %__MODULE__{
|
48
48
|
jid: binary(),
|
|
@@ -179,7 +179,7 @@ defmodule Kiq.Job do
|
179
179
|
def encode(%__MODULE__{} = job) do
|
180
180
|
map = to_map(job)
|
181
181
|
|
182
|
- with {:ok, encoded} <- Jason.encode(map) do
|
182
|
+ with {:ok, encoded} <- Encoder.encode(map) do
|
183
183
|
encoded
|
184
184
|
end
|
185
185
|
end
|
|
@@ -187,8 +187,7 @@ defmodule Kiq.Job do
|
187
187
|
@doc """
|
188
188
|
Decode an encoded job from JSON into a Job struct.
|
189
189
|
|
190
|
- All keys are atomized, including keys within arguments. This does _not_ use
|
191
|
- `String.to_existing_atom/1`, so be wary of encoding large maps.
|
190
|
+ All job keys are atomized except for those within arguments.
|
192
191
|
|
193
192
|
# Example
|
194
193
|
|
|
@@ -198,12 +197,14 @@ defmodule Kiq.Job do
|
198
197
|
|
199
198
|
iex> job = Kiq.Job.decode(~s({"class":"MyWorker","args":{"a":1}}))
|
200
199
|
...> Map.get(job, :args)
|
201
|
- %{a: 1}
|
200
|
+ %{"a" => 1}
|
202
201
|
"""
|
203
202
|
@spec decode(input :: binary()) :: t() | {:error, Exception.t()}
|
204
203
|
def decode(input) when is_binary(input) do
|
205
|
- with {:ok, decoded} <- Jason.decode(input, keys: :atoms) do
|
206
|
- new(decoded)
|
204
|
+ with {:ok, decoded} <- Jason.decode(input) do
|
205
|
+ decoded
|
206
|
+ |> Map.new(fn {key, val} -> {String.to_existing_atom(key), val} end)
|
207
|
+ |> new()
|
207
208
|
end
|
208
209
|
end
|
changed
lib/kiq/logger.ex
|
@@ -3,12 +3,14 @@ defmodule Kiq.Logger do
|
3
3
|
|
4
4
|
require Logger
|
5
5
|
|
6
|
+ alias Kiq.Encoder
|
7
|
+
|
6
8
|
@spec log(map()) :: :ok
|
7
9
|
def log(payload) when is_map(payload) do
|
8
10
|
Logger.info(fn ->
|
9
11
|
payload
|
10
12
|
|> Map.put(:source, "kiq")
|
11
|
- |> Jason.encode!()
|
13
|
+ |> Encoder.encode()
|
12
14
|
end)
|
13
15
|
end
|
14
16
|
end
|
changed
lib/kiq/queue/scheduler.ex
|
@@ -16,7 +16,7 @@ defmodule Kiq.Queue.Scheduler do
|
16
16
|
defmodule State do
|
17
17
|
@moduledoc false
|
18
18
|
|
19
|
- defstruct fetch_interval: 1_000, pool: nil, set: nil
|
19
|
+ defstruct [:pool, :set, fetch_interval: 1_000]
|
20
20
|
end
|
21
21
|
|
22
22
|
@spec start_link(opts :: options()) :: GenServer.on_start()
|
|
@@ -26,9 +26,12 @@ defmodule Kiq.Queue.Scheduler do
|
26
26
|
GenServer.start_link(__MODULE__, opts, name: name)
|
27
27
|
end
|
28
28
|
|
29
|
- @spec random_interval(average :: pos_integer()) :: pos_integer()
|
30
|
- def random_interval(average) do
|
31
|
- trunc(average * :rand.uniform() + average / 2)
|
29
|
+ @doc false
|
30
|
+ @spec random_interval(pos_integer(), pos_integer()) :: pos_integer()
|
31
|
+ def random_interval(average, jitter \\ 10) do
|
32
|
+ weight = 1.0 - (jitter / 2 - :rand.uniform(jitter)) / 100.0
|
33
|
+
|
34
|
+ trunc(average * weight)
|
32
35
|
end
|
33
36
|
|
34
37
|
# Callbacks
|
changed
lib/kiq/reporter/stats.ex
|
@@ -20,18 +20,17 @@ defmodule Kiq.Reporter.Stats do
|
20
20
|
@impl GenStage
|
21
21
|
def init(opts) do
|
22
22
|
{conf, opts} = Keyword.pop(opts, :config)
|
23
|
- {fint, opts} = Keyword.pop(opts, :flush_interval, 1_000)
|
24
23
|
|
25
24
|
Process.flag(:trap_exit, true)
|
26
25
|
|
27
|
- state =
|
28
|
- State
|
29
|
- |> struct!(pool: conf.pool_name, queues: conf.queues, flush_interval: fint)
|
30
|
- |> schedule_flush()
|
26
|
+ state = %State{
|
27
|
+ heartbeat: Heartbeat.new(queues: conf.queues, identity: conf.identity),
|
28
|
+ flush_interval: conf.stats_flush_interval,
|
29
|
+ pool: conf.pool_name,
|
30
|
+ queues: conf.queues
|
31
|
+ }
|
31
32
|
|
32
|
- heartbeat = Heartbeat.new(queues: state.queues, identity: conf.identity)
|
33
|
-
|
34
|
- {:consumer, %State{state | heartbeat: heartbeat}, opts}
|
33
|
+ {:consumer, schedule_flush(state), opts}
|
35
34
|
end
|
36
35
|
|
37
36
|
@impl GenStage
|
changed
lib/kiq/running_job.ex
|
@@ -1,7 +1,7 @@
|
1
1
|
defmodule Kiq.RunningJob do
|
2
2
|
@moduledoc false
|
3
3
|
|
4
|
- alias Kiq.{Job, Timestamp}
|
4
|
+ alias Kiq.{Encoder, Job, Timestamp}
|
5
5
|
|
6
6
|
@type t :: %__MODULE__{key: binary(), encoded: binary()}
|
7
7
|
|
|
@@ -11,7 +11,7 @@ defmodule Kiq.RunningJob do
|
11
11
|
def new(%Job{pid: pid, queue: queue} = job) do
|
12
12
|
details = %{queue: queue, payload: Job.to_map(job), run_at: Timestamp.unix_now()}
|
13
13
|
|
14
|
- %__MODULE__{key: format_key(pid), encoded: Jason.encode!(details)}
|
14
|
+ %__MODULE__{key: format_key(pid), encoded: Encoder.encode(details)}
|
15
15
|
end
|
16
16
|
|
17
17
|
defp format_key(pid) when is_pid(pid) or is_reference(pid) do
|
changed
lib/kiq/timestamp.ex
|
@@ -9,7 +9,7 @@ defmodule Kiq.Timestamp do
|
9
9
|
value
|
10
10
|
|> Kernel.*(1_000_000)
|
11
11
|
|> trunc()
|
12
|
- |> DateTime.from_unix!(:microseconds)
|
12
|
+ |> DateTime.from_unix!(:microsecond)
|
13
13
|
end
|
14
14
|
|
15
15
|
@doc false
|
|
@@ -24,17 +24,17 @@ defmodule Kiq.Timestamp do
|
24
24
|
@spec unix_now() :: float()
|
25
25
|
def unix_now do
|
26
26
|
DateTime.utc_now()
|
27
|
- |> DateTime.to_unix(:microseconds)
|
27
|
+ |> DateTime.to_unix(:microsecond)
|
28
28
|
|> to_float()
|
29
29
|
end
|
30
30
|
|
31
31
|
@doc false
|
32
32
|
@spec unix_in(offset :: integer(), unit :: atom()) :: float()
|
33
|
- def unix_in(offset, unit \\ :seconds) when is_integer(offset) do
|
33
|
+ def unix_in(offset, unit \\ :second) when is_integer(offset) do
|
34
34
|
NaiveDateTime.utc_now()
|
35
35
|
|> NaiveDateTime.add(offset, unit)
|
36
36
|
|> DateTime.from_naive!("Etc/UTC")
|
37
|
- |> DateTime.to_unix(:microseconds)
|
37
|
+ |> DateTime.to_unix(:microsecond)
|
38
38
|
|> to_float()
|
39
39
|
end
|
40
40
|
|
|
@@ -44,7 +44,7 @@ defmodule Kiq.Timestamp do
|
44
44
|
|
45
45
|
def to_score(%DateTime{} = time) do
|
46
46
|
time
|
47
|
- |> DateTime.to_unix(:microseconds)
|
47
|
+ |> DateTime.to_unix(:microsecond)
|
48
48
|
|> to_score()
|
49
49
|
end
|
changed
mix.exs
|
@@ -1,7 +1,7 @@
|
1
1
|
defmodule Kiq.MixProject do
|
2
2
|
use Mix.Project
|
3
3
|
|
4
|
- @version "0.5.0"
|
4
|
+ @version "0.6.0"
|
5
5
|
|
6
6
|
def project do
|
7
7
|
[
|
changed
priv/scripts/deschedule.lua
|
@@ -1,7 +1,25 @@
|
1
|
+ -- Find all jobs with a time "score" greater than the provided value and
|
2
|
+ -- enqueue then individually. Executing this atomically within a script
|
3
|
+ -- prevents any race conditions and avoids round-trip serialization of each
|
4
|
+ -- job.
|
5
|
+
|
1
6
|
local jobs = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
|
7
|
+ local count = 0
|
2
8
|
|
3
9
|
for _idx, job in ipairs(jobs) do
|
4
10
|
redis.call("zrem", KEYS[1], job)
|
11
|
+
|
12
|
+ -- It is vastly faster to parse out the queue with a `match` than to decode
|
13
|
+ -- the payload. Additionally, `cjson` will truncate large integers and
|
14
|
+ -- doesn't maintain the exact job structure.
|
15
|
+ local queue = string.match(job, '"queue":"(%w+)"')
|
16
|
+
|
17
|
+ -- The queue set key and naming convention are hard coded here for the sake
|
18
|
+ -- of simplicity.
|
19
|
+ redis.call("sadd", "queues", queue)
|
20
|
+ redis.call("lpush", "queue:" .. queue, job)
|
21
|
+
|
22
|
+ count = count + 1
|
5
23
|
end
|
6
24
|
|
7
|
- return jobs
|
25
|
+ return count
|
added
priv/scripts/enqueue.lua
|
@@ -0,0 +1,34 @@
|
1
|
+ -- Atomically enqueue or schedule jobs while optionally enforcing uniqueness.
|
2
|
+ --
|
3
|
+ -- The returned status code indicates which action was taken:
|
4
|
+ --
|
5
|
+ -- 0 - The job was skipped due to a unique lock
|
6
|
+ -- 1 - The job was enqueued for immediate execution
|
7
|
+ -- 2 - The job was scheduled for future execution
|
8
|
+
|
9
|
+ local unique_key = KEYS[1]
|
10
|
+
|
11
|
+ local job = ARGV[1]
|
12
|
+ local queue = ARGV[2]
|
13
|
+ local enqueue_at = tonumber(ARGV[3])
|
14
|
+ local unlocks_in = tonumber(ARGV[4])
|
15
|
+
|
16
|
+ local is_unlocked = true
|
17
|
+ local status = 0
|
18
|
+
|
19
|
+ if unlocks_in then
|
20
|
+ is_unlocked = redis.call("set", unique_key, unlocks_in, "px", unlocks_in, "nx")
|
21
|
+ end
|
22
|
+
|
23
|
+ if is_unlocked and enqueue_at then
|
24
|
+ redis.call("zadd", "schedule", enqueue_at, job)
|
25
|
+
|
26
|
+ status = 2
|
27
|
+ elseif is_unlocked then
|
28
|
+ redis.call("sadd", "queues", queue)
|
29
|
+ redis.call("lpush", "queue:" .. queue, job)
|
30
|
+
|
31
|
+ status = 1
|
32
|
+ end
|
33
|
+
|
34
|
+ return status
|