|
| 1 | +BEGIN; |
| 2 | + |
| 3 | +CREATE EXTENSION IF NOT EXISTS pgcrypto SCHEMA public CASCADE; |
| 4 | + |
| 5 | +CREATE TYPE task_status_code_value AS ENUM ( |
| 6 | + 'pending', -- waiting for an actor to perform it (new or being retried) |
| 7 | + 'in-progress', -- task is being performed by an actor |
| 8 | + 'aborted', -- an actor has aborted this task with message, will not be reissued |
| 9 | + 'completed' -- an actor has completed this task with message, will not reissued |
| 10 | +); |
| 11 | + |
| 12 | +CREATE TABLE IF NOT EXISTS tasks ( |
| 13 | + id VARCHAR NOT NULL PRIMARY KEY, -- nanoid |
| 14 | + |
| 15 | + action VARCHAR NOT NULL, -- name (type) of action to perform |
| 16 | + body TEXT, -- data used by action |
| 17 | + status TEXT, -- status text defined by action, visible to action |
| 18 | + |
| 19 | + status_code task_status_code_value NOT NULL DEFAULT 'pending', -- internal status code, used by parade to issue tasks |
| 20 | + |
| 21 | + num_tries INTEGER NOT NULL DEFAULT 0, -- number of attempts actors have made on this task |
| 22 | + num_failures INTEGER NOT NULL DEFAULT 0, -- number of those attempts that failed |
| 23 | + max_tries INTEGER, |
| 24 | + |
| 25 | + total_dependencies INTEGER, -- number of tasks that must signal this task |
| 26 | + num_signals INTEGER NOT NULL DEFAULT 0, -- number of tasks that have already signalled this task |
| 27 | + |
| 28 | + actor_id VARCHAR, -- ID of performing actor if in-progress |
| 29 | + action_deadline TIMESTAMPTZ, -- offer this task to other actors once action_deadline has elapsed |
| 30 | + performance_token UUID, |
| 31 | + |
| 32 | + -- BUG(ariels): add REFERENCES dependency to each of the to_signal_after |
| 33 | + -- tasks. Or at least add triggers that perform ON DELETE |
| 34 | + -- CASCADE. |
| 35 | + to_signal_after VARCHAR ARRAY, -- IDs to signal after performing this task |
| 36 | + notify_channel_after VARCHAR -- (if non-NULL) name of a channel to NOTIFY when this task ends |
| 37 | +); |
| 38 | + |
| 39 | +-- Returns true if task with this id, code and deadline can |
| 40 | +-- be allocated. |
| 41 | +CREATE OR REPLACE FUNCTION can_allocate_task(id VARCHAR, code task_status_code_value, deadline TIMESTAMPTZ, num_signals INTEGER, total_dependencies INTEGER) |
| 42 | +RETURNS BOOLEAN |
| 43 | +LANGUAGE sql IMMUTABLE AS $$ |
| 44 | + SELECT (code = 'pending' OR (code = 'in-progress' AND deadline < NOW())) AND |
| 45 | + (total_dependencies IS NULL OR num_signals = total_dependencies) |
| 46 | +$$; |
| 47 | + |
| 48 | +-- Marks up to `max_tasks' on one of `actions' as in-progress and |
| 49 | +-- belonging to `actor_id' and returns their ids and a "performance |
| 50 | +-- token". Both must be returned to complete the task successfully. |
| 51 | +CREATE OR REPLACE FUNCTION own_tasks( |
| 52 | + max_tasks INTEGER, actions VARCHAR ARRAY, owner_id VARCHAR, max_duration INTERVAL |
| 53 | +) |
| 54 | +RETURNS TABLE(task_id VARCHAR, token UUID, num_failures INTEGER, action VARCHAR, body TEXT) |
| 55 | +LANGUAGE sql VOLATILE AS $$ |
| 56 | + UPDATE tasks |
| 57 | + SET actor_id = owner_id, |
| 58 | + status_code = 'in-progress', |
| 59 | + num_tries = num_tries + 1, |
| 60 | + performance_token = public.gen_random_uuid(), |
| 61 | + action_deadline = NOW() + max_duration -- NULL if max_duration IS NULL |
| 62 | + WHERE id IN ( |
| 63 | + SELECT id |
| 64 | + FROM tasks |
| 65 | + WHERE can_allocate_task(id, status_code, action_deadline, num_signals, total_dependencies) AND |
| 66 | + action = ANY(actions) AND |
| 67 | + (max_tries IS NULL OR num_tries < max_tries) |
| 68 | + -- maybe: AND not_before <= NOW() |
| 69 | + -- maybe: ORDER BY priority (eventually) |
| 70 | + ORDER BY random() |
| 71 | + FOR UPDATE SKIP LOCKED |
| 72 | + LIMIT max_tasks) |
| 73 | + RETURNING id, performance_token, num_failures, action, body |
| 74 | +$$; |
| 75 | + |
| 76 | +-- Extends ownership of task id by an extra max_duration, if it is still locked with performance |
| 77 | +-- token. |
| 78 | +CREATE OR REPLACE FUNCTION extend_task_deadline( |
| 79 | + task_id VARCHAR, token UUID, max_duration INTERVAL |
| 80 | +) RETURNS BOOLEAN |
| 81 | +LANGUAGE sql VOLATILE AS $$ |
| 82 | + UPDATE tasks |
| 83 | + SET action_deadline = NOW() + max_duration |
| 84 | + WHERE id = task_id AND performance_token = token |
| 85 | + RETURNING true; |
| 86 | +$$; |
| 87 | + |
| 88 | +CREATE OR REPLACE FUNCTION no_more_tries(r tasks) |
| 89 | +RETURNS BOOLEAN LANGUAGE sql IMMUTABLE AS $$ |
| 90 | + SELECT $1.num_tries >= $1.max_tries |
| 91 | +$$; |
| 92 | + |
| 93 | +-- Returns an owned task id that was locked with token. It is an error |
| 94 | +-- to return a task with the wrong token; that can happen if the |
| 95 | +-- deadline expired and the task was given to another actor. |
| 96 | +CREATE OR REPLACE FUNCTION return_task( |
| 97 | + task_id VARCHAR, token UUID, result_status TEXT, result_status_code task_status_code_value |
| 98 | +) RETURNS INTEGER |
| 99 | +LANGUAGE plpgsql AS $$ |
| 100 | +DECLARE |
| 101 | + num_updated INTEGER; |
| 102 | + channel VARCHAR; |
| 103 | + to_signal VARCHAR ARRAY; |
| 104 | + |
| 105 | + |
| 106 | + |
| 107 | +BEGIN |
| 108 | + CASE result_status_code |
| 109 | + WHEN 'aborted', 'completed' THEN |
| 110 | + UPDATE tasks INTO channel, to_signal |
| 111 | + SET status = result_status, |
| 112 | + status_code = result_status_code, |
| 113 | + actor_id = NULL, |
| 114 | + performance_token = NULL |
| 115 | + WHERE id = task_id AND performance_token = token |
| 116 | + RETURNING notify_channel_after, to_signal_after; |
| 117 | + WHEN 'pending' THEN |
| 118 | + UPDATE tasks INTO channel, to_signal |
| 119 | + SET status = result_status, |
| 120 | + status_code = (CASE WHEN no_more_tries(tasks) THEN 'aborted' ELSE 'pending' END)::task_status_code_value, |
| 121 | + actor_id = NULL, |
| 122 | + performance_token = NULL |
| 123 | + WHERE id = task_id AND performance_token = token |
| 124 | + RETURNING (CASE WHEN no_more_tries(tasks) THEN notify_channel_after ELSE NULL END), |
| 125 | + (CASE WHEN no_more_tries(tasks) THEN to_signal_after ELSE NULL END); |
| 126 | + ELSE |
| 127 | + RAISE EXCEPTION 'cannot return task to status %', result_status; |
| 128 | + END CASE; |
| 129 | + |
| 130 | + GET DIAGNOSTICS num_updated := ROW_COUNT; |
| 131 | + |
| 132 | + UPDATE tasks |
| 133 | + SET num_signals = num_signals+1, |
| 134 | + num_failures = num_failures + CASE WHEN result_status_code = 'aborted'::task_status_code_value THEN 1 ELSE 0 END |
| 135 | + WHERE id = ANY(to_signal); |
| 136 | + |
| 137 | + IF channel IS NOT NULL THEN |
| 138 | + PERFORM pg_notify(channel, NULL); |
| 139 | + END IF; |
| 140 | + |
| 141 | + RETURN num_updated; |
| 142 | +END; |
| 143 | +$$; |
| 144 | + |
| 145 | +-- (Utility for delete_task function: remove all dependencies from task ID, returning ids of any |
| 146 | +-- tasks with no remaining dependencies.) |
| 147 | +CREATE OR REPLACE FUNCTION remove_task_dependencies(task_id VARCHAR) |
| 148 | +RETURNS SETOF VARCHAR |
| 149 | +LANGUAGE sql VOLATILE AS $$ |
| 150 | +WITH updates AS ( |
| 151 | + SELECT UNNEST(to_signal_after) effect_id, |
| 152 | + (CASE WHEN status_code IN ('aborted', 'completed') THEN 0 ELSE 1 END) delta |
| 153 | + FROM tasks |
| 154 | + WHERE id=task_id), |
| 155 | + signalled_ids AS ( |
| 156 | + UPDATE tasks |
| 157 | + SET total_dependencies = total_dependencies-updates.delta |
| 158 | + FROM updates |
| 159 | + WHERE id=updates.effect_id |
| 160 | + RETURNING (CASE WHEN total_dependencies = num_signals THEN tasks.id ELSE NULL END) id |
| 161 | +) |
| 162 | +SELECT id FROM signalled_ids WHERE id IS NOT NULL; |
| 163 | +$$; |
| 164 | + |
| 165 | +CREATE TYPE tasks_recurse_value AS ENUM ('new', 'in-progress', 'done'); |
| 166 | + |
| 167 | +-- Deletes taskIds from column id of task_id_name (with columns id (an ID) and mark (a |
| 168 | +-- recurse_value), presumably a temporary table) and empties it, decrements each of its |
| 169 | +-- dependent tasks, and deletes that task (effectively recursively) if it has no further |
| 170 | +-- dependencies. Uses table tasks for storage of to-be-deleted tasks during the operation. |
| 171 | +-- Returns the total number of tasks deleted. No abort marking is performed -- make sure to |
| 172 | +-- abort the task first! |
| 173 | +CREATE OR REPLACE FUNCTION delete_tasks(task_id_name TEXT) RETURNS VOID LANGUAGE plpgsql AS $$ |
| 174 | +DECLARE |
| 175 | + total_num_updated INTEGER; |
| 176 | + num_updated INTEGER; |
| 177 | + row_count INTEGER; |
| 178 | +BEGIN |
| 179 | + LOOP |
| 180 | + EXECUTE format($Q$ |
| 181 | + UPDATE %1$I SET mark='in-progress' WHERE mark='new' |
| 182 | + $Q$, task_id_name); |
| 183 | + EXECUTE format($Q$ |
| 184 | + WITH new_to_delete AS ( |
| 185 | + SELECT remove_task_dependencies(id) id FROM %1$I WHERE mark='in-progress' |
| 186 | + ) |
| 187 | + INSERT INTO %1$I (SELECT id, 'new' mark FROM new_to_delete) |
| 188 | + $Q$, task_id_name); |
| 189 | + GET DIAGNOSTICS row_count = ROW_COUNT; |
| 190 | + EXIT WHEN row_count=0; |
| 191 | + EXECUTE format($Q$ |
| 192 | + UPDATE %1$I SET mark='done' WHERE mark='in-progress' |
| 193 | + $Q$, task_id_name); |
| 194 | + END LOOP; |
| 195 | + EXECUTE format($Q$ |
| 196 | + DELETE FROM tasks WHERE id IN (SELECT id FROM %1$I) |
| 197 | + $Q$, task_id_name); |
| 198 | +END; |
| 199 | +$$; |
| 200 | + |
| 201 | +END; |
0 commit comments