Skip to content

Commit 77d2473

Browse files
author
Vladimir Ershov
committed
cancel job added
1 parent 0b1a087 commit 77d2473

File tree

3 files changed

+44
-6
lines changed

3 files changed

+44
-6
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ CREATE TABLE at_jobs_submitted(
2222
resubmit_limit bigint default 100,
2323
postpone interval,
2424
max_run_time interval,
25+
canceled boolean default false,
2526
submit_time timestamp with time zone default now()
2627
);
2728
CREATE INDEX at_jobs_submitted_node_at_idx on at_jobs_submitted (node, at);
@@ -161,6 +162,31 @@ CREATE FUNCTION resubmit(run_after interval default NULL)
161162
AS 'MODULE_PATHNAME', 'resubmit'
162163
LANGUAGE C IMMUTABLE;
163164

165+
CREATE FUNCTION cancel_job(job_id bigint) RETURNS boolean AS
166+
$BODY$
167+
DECLARE
168+
s_count int;
169+
BEGIN
170+
EXECUTE 'SELECT count(*) FROM at_jobs_submitted WHERE owner = session_user AND id = $1' INTO s_count USING job_id;
171+
IF s_count > 0 THEN
172+
UPDATE at_jobs_submitted SET canceled = true WHERE "id" = job_id;
173+
WITH moved_rows AS (DELETE from ONLY at_jobs_submitted a WHERE a.id = job_id RETURNING a.*) INSERT INTO at_jobs_done SELECT *, NULL as start_time, false as status, 'job was canceled' as reason FROM moved_rows;
174+
RETURN true;
175+
ELSE
176+
EXECUTE 'SELECT count(*) FROM at_jobs_process WHERE owner = session_user AND id = $1' INTO s_count USING job_id;
177+
IF s_count > 0 THEN
178+
UPDATE at_jobs_process SET canceled = true WHERE "id" = job_id;
179+
RETURN true;
180+
END IF;
181+
END IF;
182+
183+
RETURN false;
184+
END
185+
$BODY$
186+
LANGUAGE plpgsql
187+
SECURITY DEFINER set search_path FROM CURRENT;
188+
189+
164190
CREATE FUNCTION submit_job(
165191
query text,
166192
params text[] default NULL,
@@ -1283,7 +1309,7 @@ CREATE VIEW job_status AS
12831309
id, node, name, comments, at as run_after,
12841310
do_sql as query, params, depends_on, executor as run_as, attempt,
12851311
resubmit_limit, postpone as max_wait_interval,
1286-
max_run_time as max_duration, submit_time,
1312+
max_run_time as max_duration, submit_time, canceled,
12871313
start_time, status as is_success, reason as error, done_time,
12881314
'done'::job_at_status_t status
12891315
FROM schedule.at_jobs_done where owner = session_user
@@ -1292,7 +1318,7 @@ CREATE VIEW job_status AS
12921318
id, node, name, comments, at as run_after,
12931319
do_sql as query, params, depends_on, executor as run_as, attempt,
12941320
resubmit_limit, postpone as max_wait_interval,
1295-
max_run_time as max_duration, submit_time, start_time,
1321+
max_run_time as max_duration, submit_time, canceled, start_time,
12961322
NULL as is_success, NULL as error, NULL as done_time,
12971323
'processing'::job_at_status_t status
12981324
FROM ONLY schedule.at_jobs_process where owner = session_user
@@ -1301,7 +1327,7 @@ CREATE VIEW job_status AS
13011327
id, node, name, comments, at as run_after,
13021328
do_sql as query, params, depends_on, executor as run_as, attempt,
13031329
resubmit_limit, postpone as max_wait_interval,
1304-
max_run_time as max_duration, submit_time,
1330+
max_run_time as max_duration, submit_time, canceled,
13051331
NULL as start_time, NULL as is_success, NULL as error,
13061332
NULL as done_time,
13071333
'submitted'::job_at_status_t status

src/scheduler_job.c

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ job_t *_at_get_jobs_to_do(char *nodename, int *n, int *is_error, int limit)
167167
Oid argtypes[2] = { TEXTOID, INT4OID };
168168
Datum values[2];
169169
/* const char *get_job_sql = "select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted where at <= 'now' and (last_start_available is NULL OR last_start_available > 'now') AND node = $1 order by at, submit_time limit $2"; */
170-
const char *get_job_sql = "select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 order by at, submit_time limit $2";
170+
const char *get_job_sql = "select id, at, last_start_available, max_run_time, executor from ONLY at_jobs_submitted s where ((not exists ( select * from ONLY at_jobs_submitted s2 where s2.id = any(s.depends_on)) AND not exists ( select * from ONLY at_jobs_process p where p.id = any(s.depends_on)) AND s.depends_on is NOT NULL and s.at IS NULL) OR ( s.at IS NOT NULL AND at <= 'now' and (last_start_available is NULL OR last_start_available > 'now'))) and node = $1 and not canceled order by at, submit_time limit $2";
171171

172172
*is_error = *n = 0;
173173
START_SPI_SNAP();
@@ -366,9 +366,17 @@ int resubmit_at_job(job_t *j, TimestampTz next)
366366
int ret;
367367
const char *sql = "WITH moved_rows AS (DELETE from ONLY at_jobs_process a WHERE a.id = $1 RETURNING a.*) INSERT INTO at_jobs_submitted SELECT id, node, name, comments, $2, do_sql, params, depends_on, executor, owner, last_start_available, attempt +1 , resubmit_limit, postpone, max_run_time, submit_time FROM moved_rows";
368368

369+
369370
values[0] = Int32GetDatum(j->cron_id);
370371
values[1] = TimestampTzGetDatum(next);
371-
ret = SPI_execute_with_args(sql, 2, argtypes, values, NULL, false, 0);
372+
if(select_count_with_args("SELECT count(*) FROM at_jobs_process WHERE NOT canceled and id = $1", 1, argtypes, values, NULL))
373+
{
374+
ret = SPI_execute_with_args(sql, 2, argtypes, values, NULL, false, 0);
375+
}
376+
else
377+
{
378+
return -2;
379+
}
372380

373381
return ret > 0 ? 1: ret;
374382
}

src/scheduler_manager.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1081,7 +1081,11 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10811081
{
10821082
if(item->job->type == AtJob)
10831083
{
1084-
resubmit_at_job(item->job, shm_data->next_time);
1084+
if(resubmit_at_job(item->job, shm_data->next_time) == -2)
1085+
{
1086+
set_job_error(item->job, "was canceled while processing");
1087+
move_job_to_log(item->job, false, true);
1088+
}
10851089
}
10861090
else
10871091
{

0 commit comments

Comments
 (0)