Skip to content

Commit a4f01da

Browse files
author
Vladimir Ershov
committed
very beginning
1 parent abc1274 commit a4f01da

File tree

7 files changed

+191
-86
lines changed

7 files changed

+191
-86
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ OBJS = src/pgpro_scheduler.o src/cron_string.o src/sched_manager_poll.o \
55
src/scheduler_executor.o \
66
$(WIN32RES)
77
EXTENSION = pgpro_scheduler
8-
DATA = pgpro_scheduler--1.0.sql
8+
DATA = pgpro_scheduler--2.0.sql
99
REGRESS = install_pgpro_scheduler cron_string
1010
#REGRESS_OPTS = --user=postgres
1111
EXTRA_REGRESS_OPTS=--temp-config=$(top_srcdir)/$(subdir)/conf.add

pgpro_scheduler--1.0.sql renamed to pgpro_scheduler--2.0.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,29 @@
33
CREATE SCHEMA IF NOT EXISTS schedule;
44

55
CREATE TYPE schedule.job_status AS ENUM ('working', 'done', 'error');
6+
CREATE TYPE schedule.at_job_status AS ENUM ('submitted', 'working', 'done', 'error');
7+
8+
CREATE TABLE schedule.at_jobs(
9+
id SERIAL PRIMARY KEY,
10+
node text,
11+
name text,
12+
comments text,
13+
at timestamp with time zone default now(),
14+
do_sql text[],
15+
same_transaction boolean DEFAULT false,
16+
onrollback_statement text,
17+
executor text,
18+
owner text,
19+
postpone interval,
20+
max_run_time interval,
21+
max_instances integer default 1,
22+
status at_job_status default 'submited',
23+
submit_time timestamp with time zone default now(),
24+
started timestamp with time zone,
25+
finished timestamp with time zone,
26+
reason text
27+
);
28+
CREATE INDEX at_jobs_status_node_at_idx on schedule.at (status, node, at);
629

730
CREATE TABLE schedule.cron(
831
id SERIAL PRIMARY KEY,

pgpro_scheduler.control

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
comment = 'Postgres Pro Scheduler'
2-
default_version = '1.0'
2+
default_version = '2.0'
33
module_pathname = '$libdir/pgpro_scheduler'
44
relocatable = true

src/scheduler_job.c

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111
#include "memutils.h"
1212
#include "port.h"
1313

14-
job_t *init_scheduler_job(job_t *j)
14+
job_t *init_scheduler_job(job_t *j, unsigned char type)
1515
{
1616
if(j == NULL) j = worker_alloc(sizeof(job_t));
1717
memset(j, 0, sizeof(job_t));
1818
j->is_active = false;
19+
j->type = type;
1920

2021
return j;
2122
}
@@ -61,7 +62,41 @@ job_t *get_jobs_to_do(char *nodename, int *n, int *is_error)
6162
return jobs;
6263
}
6364

64-
job_t *get_expired_jobs(char *nodename, int *n, int *is_error)
65+
job_t *get_expired_at_jobs(char *nodename, int *n, int *is_error)
66+
{
67+
StringInfoData sql;
68+
job_t *jobs = NULL;
69+
int ret, got, i;
70+
71+
*n = *is_error = 0;
72+
initStringInfo(&sql);
73+
appendStringInfo(&sql, "select at, last_start_available, id from at_jobs where last_start_available < 'now' and status = 'submitted' and node = '%s'", nodename);
74+
ret = SPI_execute(sql.data, true, 0);
75+
if(ret == SPI_OK_SELECT)
76+
{
77+
got = SPI_processed;
78+
if(got > 0)
79+
{
80+
*n = got;
81+
jobs = worker_alloc(sizeof(job_t) * got);
82+
for(i=0; i < got; i++)
83+
{
84+
init_scheduler_job(&(jobs[i]), 2);
85+
jobs[i].start_at = get_timestamp_from_spi(i, 1, 0);
86+
jobs[i].last_start_avail = get_timestamp_from_spi(i, 2, 0);
87+
jobs[i].cron_id = get_int_from_spi(i, 3, 0);
88+
jobs[i].node = _copy_string(nodename);
89+
}
90+
}
91+
}
92+
else
93+
{
94+
*is_error = 1;
95+
}
96+
return jobs;
97+
}
98+
99+
job_t *get_expired_cron_jobs(char *nodename, int *n, int *is_error)
65100
{
66101
StringInfoData sql;
67102
job_t *jobs = NULL;
@@ -80,7 +115,7 @@ job_t *get_expired_jobs(char *nodename, int *n, int *is_error)
80115
jobs = worker_alloc(sizeof(job_t) * got);
81116
for(i=0; i < got; i++)
82117
{
83-
init_scheduler_job(&(jobs[i]));
118+
init_scheduler_job(&(jobs[i]), 1);
84119
jobs[i].start_at = get_timestamp_from_spi(i, 1, 0);
85120
jobs[i].last_start_avail = get_timestamp_from_spi(i, 2, 0);
86121
jobs[i].cron_id = get_int_from_spi(i, 3, 0);

src/scheduler_job.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
#include "port.h"
1212

1313
typedef struct {
14+
unsigned char type; /* 1 - cron job, 2 - at job */
1415
int cron_id;
1516
TimestampTz start_at;
1617
char *node;
@@ -30,7 +31,7 @@ typedef struct {
3031
} job_t;
3132

3233
job_t *init_scheduler_job(job_t *j);
33-
job_t *get_expired_jobs(char *nodename, int *n, int *is_error);
34+
job_t *get_expired_cron_jobs(char *nodename, int *n, int *is_error);
3435
job_t *get_jobs_to_do(char *nodename, int *n, int *is_error);
3536
job_t *set_job_error(job_t *j, const char *fmt, ...) pg_attribute_printf(2, 3);
3637
int move_job_to_log(job_t *j, bool status);

0 commit comments

Comments
 (0)