changed CHANGELOG.md
 
@@ -5,6 +5,30 @@ All notable changes to this project will be documented in this file.
5
5
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
6
6
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
7
7
8
+ ## [0.6.0] - 2019-03-18
9
+
10
+ ### Added
11
+
12
+ - [Kiq.Encoder] Wrap `Jason.encode` and attempt to sanitize terms that can't be
13
+ represented as JSON. This is primarily a safeguard against enqueuing jobs with
14
+ structs or tuples as arguments.
15
+
16
+ ### Fixed
17
+
18
+ - [Kiq.Client.Resurrection] Correct usage of SCAN to ensure all backup queues
19
+ will be found for re-enqueueing.
20
+
21
+ ### Changed
22
+
23
+ - [Kiq.Job] Keys within job arguments are no longer converted to atoms. This is
24
+ for safe execution and is in line with how Phoenix passes controller
25
+ arguments.
26
+ - [Kiq.Client] Enqueue all jobs in a single pipelined command. This moves all of
27
+ the unique, scheduling and enqueueing logic into a lua script. The script
28
+ reduces the number of round trips to redis and enabled pipelining. Overall
29
+ this improved job enqueue/execute performance from 1800 jobs/sec to slightly
30
+ more than 3700 jobs/sec.
31
+
8
32
## [0.5.0] — 2018-12-14
9
33
10
34
### Added
changed hex_metadata.config
 
@@ -4,63 +4,65 @@
4
4
<<"Robust job queue compatible with Sidekiq Enterprise, powered by GenStage and Redis">>}.
5
5
{<<"elixir">>,<<"~> 1.6">>}.
6
6
{<<"files">>,
7
- [<<"lib">>,<<"lib/kiq">>,<<"lib/kiq.ex">>,<<"lib/kiq/client">>,
8
- <<"lib/kiq/client.ex">>,<<"lib/kiq/client/cleanup.ex">>,
9
- <<"lib/kiq/client/introspection.ex">>,<<"lib/kiq/client/leadership.ex">>,
10
- <<"lib/kiq/client/locking.ex">>,<<"lib/kiq/client/queueing.ex">>,
11
- <<"lib/kiq/client/resurrection.ex">>,<<"lib/kiq/client/stats.ex">>,
12
- <<"lib/kiq/config.ex">>,<<"lib/kiq/heartbeat.ex">>,
13
- <<"lib/kiq/identity.ex">>,<<"lib/kiq/job.ex">>,<<"lib/kiq/logger.ex">>,
14
- <<"lib/kiq/naming.ex">>,<<"lib/kiq/necromancer.ex">>,<<"lib/kiq/periodic">>,
15
- <<"lib/kiq/periodic.ex">>,<<"lib/kiq/periodic/crontab.ex">>,
16
- <<"lib/kiq/periodic/parser.ex">>,<<"lib/kiq/periodic/scheduler.ex">>,
17
- <<"lib/kiq/pool">>,<<"lib/kiq/pool.ex">>,<<"lib/kiq/pool/supervisor.ex">>,
18
- <<"lib/kiq/queue">>,<<"lib/kiq/queue/consumer.ex">>,
19
- <<"lib/kiq/queue/producer.ex">>,<<"lib/kiq/queue/runner.ex">>,
20
- <<"lib/kiq/queue/scheduler.ex">>,<<"lib/kiq/queue/supervisor.ex">>,
21
- <<"lib/kiq/reporter">>,<<"lib/kiq/reporter.ex">>,
22
- <<"lib/kiq/reporter/instrumenter.ex">>,<<"lib/kiq/reporter/logger.ex">>,
23
- <<"lib/kiq/reporter/producer.ex">>,<<"lib/kiq/reporter/retryer.ex">>,
24
- <<"lib/kiq/reporter/stats.ex">>,<<"lib/kiq/reporter/supervisor.ex">>,
25
- <<"lib/kiq/reporter/unlocker.ex">>,<<"lib/kiq/running_job.ex">>,
26
- <<"lib/kiq/script.ex">>,<<"lib/kiq/senator.ex">>,
27
- <<"lib/kiq/supervisor.ex">>,<<"lib/kiq/testing.ex">>,
28
- <<"lib/kiq/timestamp.ex">>,<<"lib/kiq/worker.ex">>,<<"lib/mix">>,
29
- <<"lib/mix/kiq.ex">>,<<"lib/mix/tasks">>,
7
+ [<<"lib">>,<<"lib/kiq.ex">>,<<"lib/mix">>,<<"lib/mix/tasks">>,
30
8
<<"lib/mix/tasks/kiq.gen.reporter.ex">>,
31
- <<"lib/mix/tasks/kiq.gen.supervisor.ex">>,
32
- <<"lib/mix/tasks/kiq.gen.worker.ex">>,<<"priv">>,<<"priv/scripts">>,
33
- <<"priv/scripts/dequeue.lua">>,<<"priv/scripts/deschedule.lua">>,
34
- <<"priv/scripts/reelect.lua">>,<<"priv/scripts/resign.lua">>,
35
- <<"priv/scripts/resurrect.lua">>,<<".formatter.exs">>,<<"mix.exs">>,
36
- <<"README.md">>,<<"LICENSE.txt">>,<<"CHANGELOG.md">>]}.
9
+ <<"lib/mix/tasks/kiq.gen.worker.ex">>,
10
+ <<"lib/mix/tasks/kiq.gen.supervisor.ex">>,<<"lib/mix/kiq.ex">>,
11
+ <<"lib/kiq">>,<<"lib/kiq/job.ex">>,<<"lib/kiq/supervisor.ex">>,
12
+ <<"lib/kiq/naming.ex">>,<<"lib/kiq/periodic">>,
13
+ <<"lib/kiq/periodic/crontab.ex">>,<<"lib/kiq/periodic/scheduler.ex">>,
14
+ <<"lib/kiq/periodic/parser.ex">>,<<"lib/kiq/database">>,
15
+ <<"lib/kiq/reporter.ex">>,<<"lib/kiq/client.ex">>,
16
+ <<"lib/kiq/necromancer.ex">>,<<"lib/kiq/encoder.ex">>,
17
+ <<"lib/kiq/logger.ex">>,<<"lib/kiq/worker.ex">>,<<"lib/kiq/reporter">>,
18
+ <<"lib/kiq/reporter/supervisor.ex">>,<<"lib/kiq/reporter/producer.ex">>,
19
+ <<"lib/kiq/reporter/instrumenter.ex">>,<<"lib/kiq/reporter/logger.ex">>,
20
+ <<"lib/kiq/reporter/retryer.ex">>,<<"lib/kiq/reporter/stats.ex">>,
21
+ <<"lib/kiq/reporter/unlocker.ex">>,<<"lib/kiq/running_job.ex">>,
22
+ <<"lib/kiq/config.ex">>,<<"lib/kiq/periodic.ex">>,
23
+ <<"lib/kiq/heartbeat.ex">>,<<"lib/kiq/testing.ex">>,<<"lib/kiq/pool.ex">>,
24
+ <<"lib/kiq/timestamp.ex">>,<<"lib/kiq/script.ex">>,
25
+ <<"lib/kiq/identity.ex">>,<<"lib/kiq/queue">>,
26
+ <<"lib/kiq/queue/supervisor.ex">>,<<"lib/kiq/queue/producer.ex">>,
27
+ <<"lib/kiq/queue/runner.ex">>,<<"lib/kiq/queue/scheduler.ex">>,
28
+ <<"lib/kiq/queue/consumer.ex">>,<<"lib/kiq/senator.ex">>,<<"lib/kiq/pool">>,
29
+ <<"lib/kiq/pool/supervisor.ex">>,<<"lib/kiq/client">>,
30
+ <<"lib/kiq/client/queueing.ex">>,<<"lib/kiq/client/leadership.ex">>,
31
+ <<"lib/kiq/client/cleanup.ex">>,<<"lib/kiq/client/resurrection.ex">>,
32
+ <<"lib/kiq/client/locking.ex">>,<<"lib/kiq/client/stats.ex">>,
33
+ <<"lib/kiq/client/introspection.ex">>,<<"priv">>,<<"priv/scripts">>,
34
+ <<"priv/scripts/dequeue.lua">>,<<"priv/scripts/enqueue.lua">>,
35
+ <<"priv/scripts/deschedule.lua">>,<<"priv/scripts/reelect.lua">>,
36
+ <<"priv/scripts/resign.lua">>,<<"priv/scripts/resurrect.lua">>,
37
+ <<".formatter.exs">>,<<"mix.exs">>,<<"README.md">>,<<"LICENSE.txt">>,
38
+ <<"CHANGELOG.md">>]}.
37
39
{<<"licenses">>,[<<"MIT">>]}.
38
40
{<<"links">>,[{<<"github">>,<<"https://github.com/sorentwo/kiq">>}]}.
39
41
{<<"name">>,<<"kiq">>}.
40
42
{<<"requirements">>,
41
- [[{<<"app">>,<<"redix">>},
42
- {<<"name">>,<<"redix">>},
43
+ [[{<<"app">>,<<"jason">>},
44
+ {<<"name">>,<<"jason">>},
43
45
{<<"optional">>,false},
44
46
{<<"repository">>,<<"hexpm">>},
45
- {<<"requirement">>,<<"~> 0.9">>}],
46
- [{<<"app">>,<<"nimble_parsec">>},
47
- {<<"name">>,<<"nimble_parsec">>},
48
- {<<"optional">>,false},
49
- {<<"repository">>,<<"hexpm">>},
50
- {<<"requirement">>,<<"~> 0.4.0">>}],
47
+ {<<"requirement">>,<<"~> 1.1">>}],
51
48
[{<<"app">>,<<"gen_stage">>},
52
49
{<<"name">>,<<"gen_stage">>},
53
50
{<<"optional">>,false},
54
51
{<<"repository">>,<<"hexpm">>},
55
52
{<<"requirement">>,<<"~> 0.14">>}],
53
+ [{<<"app">>,<<"nimble_parsec">>},
54
+ {<<"name">>,<<"nimble_parsec">>},
55
+ {<<"optional">>,false},
56
+ {<<"repository">>,<<"hexpm">>},
57
+ {<<"requirement">>,<<"~> 0.4.0">>}],
58
+ [{<<"app">>,<<"redix">>},
59
+ {<<"name">>,<<"redix">>},
60
+ {<<"optional">>,false},
61
+ {<<"repository">>,<<"hexpm">>},
62
+ {<<"requirement">>,<<"~> 0.9">>}],
56
63
[{<<"app">>,<<"telemetry">>},
57
64
{<<"name">>,<<"telemetry">>},
58
65
{<<"optional">>,false},
59
66
{<<"repository">>,<<"hexpm">>},
60
- {<<"requirement">>,<<"~> 0.2">>}],
61
- [{<<"app">>,<<"jason">>},
62
- {<<"name">>,<<"jason">>},
63
- {<<"optional">>,false},
64
- {<<"repository">>,<<"hexpm">>},
65
- {<<"requirement">>,<<"~> 1.1">>}]]}.
66
- {<<"version">>,<<"0.5.0">>}.
67
+ {<<"requirement">>,<<"~> 0.2">>}]]}.
68
+ {<<"version">>,<<"0.6.0">>}.
changed lib/kiq.ex
 
@@ -107,6 +107,10 @@ defmodule Kiq do
107
107
Redis. This defaults to `10ms`, though it will back-off by a factor of `1.5`
108
108
if there are any connection errors.
109
109
110
+ * `:periodics` — A list of job scheduling tuples in the form `{schedule,
111
+ worker}` or `{schedule, worker, options}`. See
112
+ [Periodic Jobs](#module-periodic-jobs) for details.
113
+
110
114
* `:pool_size` — Controls the number of Redis connections available to Kiq,
111
115
defaults to 5.
changed lib/kiq/client.ex
 
@@ -135,11 +135,9 @@ defmodule Kiq.Client do
135
135
defp perform_flush(%State{pool: pool, table: table, test_mode: :disabled} = state) do
136
136
try do
137
137
conn = Pool.checkout(pool)
138
+ jobs = :ets.foldl(fn {_key, val}, acc -> [val | acc] end, [], table)
138
139
139
- table
140
- |> :ets.tab2list()
141
- |> Enum.each(&Queueing.enqueue(conn, elem(&1, 1)))
142
-
140
+ :ok = Queueing.enqueue(conn, jobs)
143
141
true = :ets.delete_all_objects(table)
144
142
145
143
transition_to_success(state)
changed lib/kiq/client/queueing.ex
 
@@ -1,36 +1,27 @@
1
1
defmodule Kiq.Client.Queueing do
2
2
@moduledoc false
3
3
4
- import Redix, only: [command!: 2, noreply_command!: 2, noreply_pipeline!: 2]
5
- import Kiq.Naming, only: [queue_name: 1, backup_name: 2, unlock_name: 1]
4
+ import Redix
5
+ import Kiq.Naming
6
6
7
7
alias Kiq.{Job, Script, Timestamp}
8
8
9
9
@typep conn :: GenServer.server()
10
- @typep resp :: {:ok, Job.t()}
11
-
12
- @retry_set "retry"
13
- @schedule_set "schedule"
14
10
15
11
@external_resource Script.path("dequeue")
16
12
@external_resource Script.path("deschedule")
13
+ @external_resource Script.path("enqueue")
17
14
@dequeue_sha Script.hash("dequeue")
15
+ @enqueue_sha Script.hash("enqueue")
18
16
@deschedule_sha Script.hash("deschedule")
19
17
20
- @spec enqueue(conn(), Job.t()) :: resp()
21
- def enqueue(conn, %Job{} = job) do
22
- job
23
- |> check_unique(conn)
24
- |> case do
25
- {:ok, %Job{at: at} = job} when is_float(at) ->
26
- schedule_job(job, @schedule_set, conn)
18
+ @spec enqueue(conn(), list(Job.t())) :: :ok
19
+ def enqueue(_conn, []), do: :ok
27
20
28
- {:ok, job} ->
29
- push_job(job, conn)
21
+ def enqueue(conn, jobs) when is_list(jobs) do
22
+ commands = for job <- jobs, do: enqueue_command(job)
30
23
31
- {:locked, job} ->
32
- {:ok, job}
33
- end
24
+ noreply_pipeline!(conn, commands)
34
25
end
35
26
36
27
@spec dequeue(conn(), binary(), binary(), pos_integer()) :: list(iodata())
 
@@ -43,54 +34,43 @@ defmodule Kiq.Client.Queueing do
43
34
44
35
@spec deschedule(conn(), binary()) :: :ok
45
36
def deschedule(conn, set) when is_binary(set) do
46
- conn
47
- |> command!(["EVALSHA", @deschedule_sha, "1", set, Timestamp.to_score()])
48
- |> Enum.map(&Job.decode/1)
49
- |> Enum.each(&push_job(&1, conn))
50
-
51
- :ok
37
+ noreply_command!(conn, ["EVALSHA", @deschedule_sha, "1", set, Timestamp.to_score()])
52
38
end
53
39
54
- @spec retry(conn(), Job.t()) :: resp()
55
- def retry(conn, %Job{retry: retry, retry_count: count} = job)
40
+ @spec retry(conn(), Job.t()) :: :ok
41
+ def retry(conn, %Job{at: at, retry: retry, retry_count: count} = job)
56
42
when is_integer(retry) or (retry == true and count > 0) do
57
- schedule_job(job, @retry_set, conn)
43
+ noreply_command!(conn, ["ZADD", "retry", Timestamp.to_score(at), Job.encode(job)])
58
44
end
59
45
60
46
# Helpers
61
47
62
- defp check_unique(%{unlocks_at: unlocks_at} = job, conn) when is_float(unlocks_at) do
63
- unlocks_in = trunc((unlocks_at - Timestamp.unix_now()) * 1_000)
64
- unlock_name = unlock_name(job.unique_token)
48
+ defp enqueue_command(%Job{queue: queue} = job) do
49
+ {job, enqueue_at} = maybe_enqueue_at(job)
50
+ {unique_key, unlocks_in} = maybe_unlocks_in(job)
65
51
66
- command = ["SET", unlock_name, to_string(unlocks_at), "PX", to_string(unlocks_in), "NX"]
52
+ eval_keys = ["EVALSHA", @enqueue_sha, "1", unique_key]
53
+ eval_args = [Job.encode(job), queue, enqueue_at, unlocks_in]
67
54
68
- case command!(conn, command) do
69
- "OK" -> {:ok, job}
70
- _res -> {:locked, job}
55
+ eval_keys ++ eval_args
56
+ end
57
+
58
+ defp maybe_enqueue_at(%Job{at: at} = job) do
59
+ if is_float(at) do
60
+ {job, Timestamp.to_score(job.at)}
61
+ else
62
+ {%{job | enqueued_at: Timestamp.unix_now()}, nil}
71
63
end
72
64
end
73
65
74
- defp check_unique(job, _client), do: {:ok, job}
66
+ defp maybe_unlocks_in(%Job{unique_token: unique_token, unlocks_at: unlocks_at}) do
67
+ if is_float(unlocks_at) do
68
+ unique_key = unlock_name(unique_token)
69
+ unlocks_in = trunc((unlocks_at - Timestamp.unix_now()) * 1_000)
75
70
76
- defp push_job(%{queue: queue} = job, conn) do
77
- job = %Job{job | enqueued_at: Timestamp.unix_now()}
78
-
79
- commands = [
80
- ["SADD", "queues", queue],
81
- ["LPUSH", queue_name(queue), Job.encode(job)]
82
- ]
83
-
84
- :ok = noreply_pipeline!(conn, commands)
85
-
86
- {:ok, job}
87
- end
88
-
89
- defp schedule_job(%Job{at: at} = job, set, conn) do
90
- score = Timestamp.to_score(at)
91
-
92
- :ok = noreply_command!(conn, ["ZADD", set, score, Job.encode(job)])
93
-
94
- {:ok, job}
71
+ {unique_key, to_string(unlocks_in)}
72
+ else
73
+ {nil, nil}
74
+ end
95
75
end
96
76
end
changed lib/kiq/client/resurrection.ex
 
@@ -30,8 +30,7 @@ defmodule Kiq.Client.Resurrection do
30
30
# `SCAN` instead of `KEYS` just in case there are dozens or hundreds of
31
31
# backup queues.
32
32
defp fetch_backups(backups, cursor, conn) do
33
- case command!(conn, ["SCAN", cursor, "MATCH", "queue:backup|*"]) do
34
- [_cursor, []] -> backups
33
+ case command!(conn, ["SCAN", cursor, "MATCH", "queue:backup|*", "COUNT", "1000"]) do
35
34
["0", results] -> backups ++ results
36
35
[cursor, results] -> fetch_backups(backups ++ results, cursor, conn)
37
36
end
changed lib/kiq/client/stats.ex
 
@@ -3,7 +3,7 @@ defmodule Kiq.Client.Stats do
3
3
4
4
import Redix, only: [noreply_pipeline: 2]
5
5
6
- alias Kiq.{Heartbeat, RunningJob, Timestamp}
6
+ alias Kiq.{Encoder, Heartbeat, RunningJob, Timestamp}
7
7
8
8
@typep conn :: GenServer.server()
9
9
@typep resp :: :ok | {:error, atom() | Redix.Error.t()}
 
@@ -14,7 +14,7 @@ defmodule Kiq.Client.Stats do
14
14
15
15
wkey = "#{key}:workers"
16
16
beat = Timestamp.unix_now() |> to_string()
17
- info = Jason.encode!(heartbeat)
17
+ info = Encoder.encode(heartbeat)
18
18
19
19
commands = [
20
20
["SADD", "processes", key],
changed lib/kiq/config.ex
 
@@ -31,6 +31,7 @@ defmodule Kiq.Config do
31
31
schedulers: list(binary()),
32
32
senator_name: name(),
33
33
server?: boolean(),
34
+ stats_flush_interval: pos_integer(),
34
35
test_mode: :disabled | :sandbox
35
36
}
36
37
 
@@ -53,6 +54,7 @@ defmodule Kiq.Config do
53
54
schedulers: ~w(retry schedule),
54
55
senator_name: Senator,
55
56
server?: true,
57
+ stats_flush_interval: 1_000,
56
58
test_mode: :disabled
57
59
58
60
@spec new(map() | Keyword.t()) :: t()
added lib/kiq/encoder.ex
 
@@ -0,0 +1,59 @@
1
+ defmodule Kiq.Encoder do
2
+ @moduledoc """
3
+ This module wraps `Jason.encode/1` to provide consistent and safe JSON encoding.
4
+
5
+ Typically this module shouldn't be used outside of Kiq, but the function is
6
+ documented for reference on how job arguments are encoded.
7
+ """
8
+
9
+ @doc """
10
+ Safely encode terms into JSON, if possible.
11
+
12
+ Some Elixir/Erlang types can't be represented as JSON. To make encoding as reliable as possible
13
+ the `encode/1` function attempts to convert incompatible terms into JSON friendly values.
14
+
15
+ The following sanitization is applied (recursively):
16
+
17
+ * `struct` - Converted to a map using `Map.from_struct/1`
18
+ * `tuple` — Converted to a list
19
+ * `pid`, `port`, `reference` — Converted to a string using inspect, i.e. "#PID<0.101.0>"
20
+ """
21
+ @spec encode(any()) :: binary()
22
+ def encode(term) do
23
+ case Jason.encode(term) do
24
+ {:ok, encoded} ->
25
+ encoded
26
+
27
+ {:error, %Protocol.UndefinedError{}} ->
28
+ term
29
+ |> sanitize()
30
+ |> Jason.encode!()
31
+ end
32
+ end
33
+
34
+ defp sanitize(%_{} = struct) do
35
+ struct
36
+ |> Map.from_struct()
37
+ |> sanitize()
38
+ end
39
+
40
+ defp sanitize(map) when is_map(map) do
41
+ for {key, val} <- map, into: %{}, do: {key, sanitize(val)}
42
+ end
43
+
44
+ defp sanitize(list) when is_list(list) do
45
+ for term <- list, do: sanitize(term)
46
+ end
47
+
48
+ defp sanitize(tuple) when is_tuple(tuple) do
49
+ tuple
50
+ |> Tuple.to_list()
51
+ |> sanitize()
52
+ end
53
+
54
+ defp sanitize(input) when is_pid(input) or is_port(input) or is_reference(input) do
55
+ inspect(input)
56
+ end
57
+
58
+ defp sanitize(term), do: term
59
+ end
changed lib/kiq/job.ex
 
@@ -42,7 +42,7 @@ defmodule Kiq.Job do
42
42
[1]: https://github.com/mperham/sidekiq/wiki/Job-Format
43
43
"""
44
44
45
- alias Kiq.Timestamp
45
+ alias Kiq.{Encoder, Timestamp}
46
46
47
47
@type t :: %__MODULE__{
48
48
jid: binary(),
 
@@ -179,7 +179,7 @@ defmodule Kiq.Job do
179
179
def encode(%__MODULE__{} = job) do
180
180
map = to_map(job)
181
181
182
- with {:ok, encoded} <- Jason.encode(map) do
182
+ with {:ok, encoded} <- Encoder.encode(map) do
183
183
encoded
184
184
end
185
185
end
 
@@ -187,8 +187,7 @@ defmodule Kiq.Job do
187
187
@doc """
188
188
Decode an encoded job from JSON into a Job struct.
189
189
190
- All keys are atomized, including keys within arguments. This does _not_ use
191
- `String.to_existing_atom/1`, so be wary of encoding large maps.
190
+ All job keys are atomized except for those within arguments.
192
191
193
192
# Example
194
193
 
@@ -198,12 +197,14 @@ defmodule Kiq.Job do
198
197
199
198
iex> job = Kiq.Job.decode(~s({"class":"MyWorker","args":{"a":1}}))
200
199
...> Map.get(job, :args)
201
- %{a: 1}
200
+ %{"a" => 1}
202
201
"""
203
202
@spec decode(input :: binary()) :: t() | {:error, Exception.t()}
204
203
def decode(input) when is_binary(input) do
205
- with {:ok, decoded} <- Jason.decode(input, keys: :atoms) do
206
- new(decoded)
204
+ with {:ok, decoded} <- Jason.decode(input) do
205
+ decoded
206
+ |> Map.new(fn {key, val} -> {String.to_existing_atom(key), val} end)
207
+ |> new()
207
208
end
208
209
end
changed lib/kiq/logger.ex
 
@@ -3,12 +3,14 @@ defmodule Kiq.Logger do
3
3
4
4
require Logger
5
5
6
+ alias Kiq.Encoder
7
+
6
8
@spec log(map()) :: :ok
7
9
def log(payload) when is_map(payload) do
8
10
Logger.info(fn ->
9
11
payload
10
12
|> Map.put(:source, "kiq")
11
- |> Jason.encode!()
13
+ |> Encoder.encode()
12
14
end)
13
15
end
14
16
end
changed lib/kiq/queue/scheduler.ex
 
@@ -16,7 +16,7 @@ defmodule Kiq.Queue.Scheduler do
16
16
defmodule State do
17
17
@moduledoc false
18
18
19
- defstruct fetch_interval: 1_000, pool: nil, set: nil
19
+ defstruct [:pool, :set, fetch_interval: 1_000]
20
20
end
21
21
22
22
@spec start_link(opts :: options()) :: GenServer.on_start()
 
@@ -26,9 +26,12 @@ defmodule Kiq.Queue.Scheduler do
26
26
GenServer.start_link(__MODULE__, opts, name: name)
27
27
end
28
28
29
- @spec random_interval(average :: pos_integer()) :: pos_integer()
30
- def random_interval(average) do
31
- trunc(average * :rand.uniform() + average / 2)
29
+ @doc false
30
+ @spec random_interval(pos_integer(), pos_integer()) :: pos_integer()
31
+ def random_interval(average, jitter \\ 10) do
32
+ weight = 1.0 - (jitter / 2 - :rand.uniform(jitter)) / 100.0
33
+
34
+ trunc(average * weight)
32
35
end
33
36
34
37
# Callbacks
changed lib/kiq/reporter/stats.ex
 
@@ -20,18 +20,17 @@ defmodule Kiq.Reporter.Stats do
20
20
@impl GenStage
21
21
def init(opts) do
22
22
{conf, opts} = Keyword.pop(opts, :config)
23
- {fint, opts} = Keyword.pop(opts, :flush_interval, 1_000)
24
23
25
24
Process.flag(:trap_exit, true)
26
25
27
- state =
28
- State
29
- |> struct!(pool: conf.pool_name, queues: conf.queues, flush_interval: fint)
30
- |> schedule_flush()
26
+ state = %State{
27
+ heartbeat: Heartbeat.new(queues: conf.queues, identity: conf.identity),
28
+ flush_interval: conf.stats_flush_interval,
29
+ pool: conf.pool_name,
30
+ queues: conf.queues
31
+ }
31
32
32
- heartbeat = Heartbeat.new(queues: state.queues, identity: conf.identity)
33
-
34
- {:consumer, %State{state | heartbeat: heartbeat}, opts}
33
+ {:consumer, schedule_flush(state), opts}
35
34
end
36
35
37
36
@impl GenStage
changed lib/kiq/running_job.ex
 
@@ -1,7 +1,7 @@
1
1
defmodule Kiq.RunningJob do
2
2
@moduledoc false
3
3
4
- alias Kiq.{Job, Timestamp}
4
+ alias Kiq.{Encoder, Job, Timestamp}
5
5
6
6
@type t :: %__MODULE__{key: binary(), encoded: binary()}
7
7
 
@@ -11,7 +11,7 @@ defmodule Kiq.RunningJob do
11
11
def new(%Job{pid: pid, queue: queue} = job) do
12
12
details = %{queue: queue, payload: Job.to_map(job), run_at: Timestamp.unix_now()}
13
13
14
- %__MODULE__{key: format_key(pid), encoded: Jason.encode!(details)}
14
+ %__MODULE__{key: format_key(pid), encoded: Encoder.encode(details)}
15
15
end
16
16
17
17
defp format_key(pid) when is_pid(pid) or is_reference(pid) do
changed lib/kiq/timestamp.ex
 
@@ -9,7 +9,7 @@ defmodule Kiq.Timestamp do
9
9
value
10
10
|> Kernel.*(1_000_000)
11
11
|> trunc()
12
- |> DateTime.from_unix!(:microseconds)
12
+ |> DateTime.from_unix!(:microsecond)
13
13
end
14
14
15
15
@doc false
 
@@ -24,17 +24,17 @@ defmodule Kiq.Timestamp do
24
24
@spec unix_now() :: float()
25
25
def unix_now do
26
26
DateTime.utc_now()
27
- |> DateTime.to_unix(:microseconds)
27
+ |> DateTime.to_unix(:microsecond)
28
28
|> to_float()
29
29
end
30
30
31
31
@doc false
32
32
@spec unix_in(offset :: integer(), unit :: atom()) :: float()
33
- def unix_in(offset, unit \\ :seconds) when is_integer(offset) do
33
+ def unix_in(offset, unit \\ :second) when is_integer(offset) do
34
34
NaiveDateTime.utc_now()
35
35
|> NaiveDateTime.add(offset, unit)
36
36
|> DateTime.from_naive!("Etc/UTC")
37
- |> DateTime.to_unix(:microseconds)
37
+ |> DateTime.to_unix(:microsecond)
38
38
|> to_float()
39
39
end
40
40
 
@@ -44,7 +44,7 @@ defmodule Kiq.Timestamp do
44
44
45
45
def to_score(%DateTime{} = time) do
46
46
time
47
- |> DateTime.to_unix(:microseconds)
47
+ |> DateTime.to_unix(:microsecond)
48
48
|> to_score()
49
49
end
changed mix.exs
 
@@ -1,7 +1,7 @@
1
1
defmodule Kiq.MixProject do
2
2
use Mix.Project
3
3
4
- @version "0.5.0"
4
+ @version "0.6.0"
5
5
6
6
def project do
7
7
[
changed priv/scripts/deschedule.lua
 
@@ -1,7 +1,25 @@
1
+ -- Find all jobs with a time "score" greater than the provided value and
2
+ -- enqueue then individually. Executing this atomically within a script
3
+ -- prevents any race conditions and avoids round-trip serialization of each
4
+ -- job.
5
+
1
6
local jobs = redis.call("zrangebyscore", KEYS[1], "0", ARGV[1])
7
+ local count = 0
2
8
3
9
for _idx, job in ipairs(jobs) do
4
10
redis.call("zrem", KEYS[1], job)
11
+
12
+ -- It is vastly faster to parse out the queue with a `match` than to decode
13
+ -- the payload. Additionally, `cjson` will truncate large integers and
14
+ -- doesn't maintain the exact job structure.
15
+ local queue = string.match(job, '"queue":"(%w+)"')
16
+
17
+ -- The queue set key and naming convention are hard coded here for the sake
18
+ -- of simplicity.
19
+ redis.call("sadd", "queues", queue)
20
+ redis.call("lpush", "queue:" .. queue, job)
21
+
22
+ count = count + 1
5
23
end
6
24
7
- return jobs
25
+ return count
added priv/scripts/enqueue.lua
 
@@ -0,0 +1,34 @@
1
+ -- Atomically enqueue or schedule jobs while optionally enforcing uniqueness.
2
+ --
3
+ -- The returned status code indicates which action was taken:
4
+ --
5
+ -- 0 - The job was skipped due to a unique lock
6
+ -- 1 - The job was enqueued for immediate execution
7
+ -- 2 - The job was scheduled for future execution
8
+
9
+ local unique_key = KEYS[1]
10
+
11
+ local job = ARGV[1]
12
+ local queue = ARGV[2]
13
+ local enqueue_at = tonumber(ARGV[3])
14
+ local unlocks_in = tonumber(ARGV[4])
15
+
16
+ local is_unlocked = true
17
+ local status = 0
18
+
19
+ if unlocks_in then
20
+ is_unlocked = redis.call("set", unique_key, unlocks_in, "px", unlocks_in, "nx")
21
+ end
22
+
23
+ if is_unlocked and enqueue_at then
24
+ redis.call("zadd", "schedule", enqueue_at, job)
25
+
26
+ status = 2
27
+ elseif is_unlocked then
28
+ redis.call("sadd", "queues", queue)
29
+ redis.call("lpush", "queue:" .. queue, job)
30
+
31
+ status = 1
32
+ end
33
+
34
+ return status