Skip to content

Commit 16b5a14

Browse files
author
Vladimir Ershov
committed
do not die
1 parent 1bc1681 commit 16b5a14

File tree

6 files changed

+296
-126
lines changed

6 files changed

+296
-126
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,21 +28,28 @@ CREATE TABLE at_jobs_submitted(
2828
CREATE INDEX ON at_jobs_submitted(at,submit_time);
2929
CREATE INDEX ON at_jobs_submitted (last_start_available, node);
3030

31-
CREATE TABLE at_jobs_process(
32-
start_time timestamp with time zone default now()
33-
) INHERITS (at_jobs_submitted);
34-
35-
ALTER TABLE at_jobs_process ADD primary key (id);
36-
CREATE INDEX at_jobs_process_node_at_idx on at_jobs_process (node, at);
37-
38-
CREATE TABLE at_jobs_done(
39-
status boolean,
40-
reason text,
41-
done_time timestamp with time zone default now()
42-
) INHERITS (at_jobs_process);
43-
44-
ALTER TABLE at_jobs_done ADD primary key (id);
45-
CREATE INDEX at_jobs_done_node_at_idx on at_jobs_done (node, at);
31+
-- CREATE TABLE at_jobs_process(
32+
-- start_time timestamp with time zone default now()
33+
-- ) INHERITS (at_jobs_submitted);
34+
35+
CREATE TABLE at_jobs_process (like at_jobs_submitted including all);
36+
ALTER TABLE at_jobs_process ADD start_time timestamp with time zone default now();
37+
38+
-- ALTER TABLE at_jobs_process ADD primary key (id);
39+
CREATE INDEX at_jobs_process_node_at_idx on at_jobs_process (node, at);
40+
41+
-- CREATE TABLE at_jobs_done(
42+
-- status boolean,
43+
-- reason text,
44+
-- done_time timestamp with time zone default now()
45+
-- ) INHERITS (at_jobs_process);
46+
CREATE TABLE at_jobs_done (like at_jobs_process including all);
47+
ALTER TABLE at_jobs_done ADD status boolean;
48+
ALTER TABLE at_jobs_done ADD reason text;
49+
ALTER TABLE at_jobs_done ADD done_time timestamp with time zone default now();
50+
51+
--ALTER TABLE at_jobs_done ADD primary key (id);
52+
-- CREATE INDEX at_jobs_done_node_at_idx on at_jobs_done (node, at);
4653

4754
CREATE TABLE cron(
4855
id SERIAL PRIMARY KEY,

src/pgpro_scheduler.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ char *scheduler_nodename = NULL;
4747
char *scheduler_transaction_state = NULL;
4848
int scheduler_max_workers = 2;
4949
int scheduler_at_max_workers = 2;
50+
int scheduler_worker_job_limit = 1;
5051
bool scheduler_service_enabled = false;
5152
char *scheduler_schema = NULL;
5253
/* Custom GUC done */
@@ -497,6 +498,20 @@ void _PG_init(void)
497498
NULL,
498499
NULL
499500
);
501+
DefineCustomIntVariable(
502+
"schedule.worker_job_limit",
503+
"How much job can worker serve before shutdown",
504+
NULL,
505+
&scheduler_worker_job_limit,
506+
1,
507+
1,
508+
20000,
509+
PGC_SUSET,
510+
0,
511+
NULL,
512+
NULL,
513+
NULL
514+
);
500515
pg_scheduler_startup();
501516
}
502517

src/scheduler_executor.c

Lines changed: 71 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,16 @@ handle_sigterm(SIGNAL_ARGS)
5656
errno = save_errno;
5757
}
5858

59+
int read_worker_job_limit(void)
60+
{
61+
const char *opt;
62+
int var;
63+
64+
opt = GetConfigOption("schedule.worker_job_limit", false, false);
65+
if(opt == NULL) return 1;
66+
var = atoi(opt);
67+
return var;
68+
}
5969

6070
void executor_worker_main(Datum arg)
6171
{
@@ -64,6 +74,9 @@ void executor_worker_main(Datum arg)
6474
int result;
6575
int64 jobs_done = 0;
6676
int64 worker_jobs_limit = 1;
77+
int rc = 0;
78+
schd_executor_status_t status;
79+
PGPROC *parent;
6780

6881
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler_executor");
6982
seg = dsm_attach(DatumGetInt32(arg));
@@ -72,6 +85,7 @@ void executor_worker_main(Datum arg)
7285
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7386
errmsg("executor unable to map dynamic shared memory segment")));
7487
shared = dsm_segment_address(seg);
88+
parent = BackendPidGetProc(MyBgworkerEntry->bgw_notify_pid);
7589

7690
if(shared->status != SchdExecutorInit)
7791
{
@@ -84,38 +98,77 @@ void executor_worker_main(Datum arg)
8498
pgstat_report_activity(STATE_RUNNING, "initialize");
8599
init_worker_mem_ctx("ExecutorMemoryContext");
86100
BackgroundWorkerInitializeConnection(shared->database, NULL);
87-
/* TODO check latch, wait signals, die */
101+
worker_jobs_limit = read_worker_job_limit();
102+
103+
pqsignal(SIGTERM, handle_sigterm);
104+
pqsignal(SIGHUP, worker_spi_sighup);
105+
BackgroundWorkerUnblockSignals();
106+
88107
while(1)
89108
{
90-
result = do_one_job(shared);
91-
if(result < 0)
109+
/* we need it if idle worker recieve SIGHUP an realize that it done
110+
too mach */
111+
status = SchdExecutorLimitReached;
112+
113+
if(got_sighup)
114+
{
115+
got_sighup = false;
116+
ProcessConfigFile(PGC_SIGHUP);
117+
worker_jobs_limit = read_worker_job_limit();
118+
}
119+
result = do_one_job(shared, &status);
120+
if(result > 0)
121+
{
122+
if(++jobs_done >= worker_jobs_limit)
123+
{
124+
shared->worker_exit = true;
125+
shared->status = status;
126+
break;
127+
}
128+
else
129+
{
130+
shared->status = status;
131+
}
132+
SetLatch(&parent->procLatch);
133+
}
134+
else if(result < 0)
92135
{
93136
delete_worker_mem_ctx();
94137
dsm_detach(seg);
95138
proc_exit(0);
96139
}
97-
if(++jobs_done >= worker_jobs_limit) break;
98-
}
99140

100-
shared->worker_exit = true;
141+
pgstat_report_activity(STATE_IDLE, "waiting for a job");
142+
rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0L);
143+
ResetLatch(MyLatch);
144+
if(rc && rc & WL_POSTMASTER_DEATH) break;
145+
}
101146

102147
delete_worker_mem_ctx();
103148
dsm_detach(seg);
104149
proc_exit(0);
105150
}
106151

107-
int do_one_job(schd_executor_share_t *shared)
152+
int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
108153
{
109154
executor_error_t EE;
110155
char *error = NULL;
111-
schd_executor_status_t status;
112156
int i;
113157
job_t *job;
114158
int ret;
115159

116160
EE.n = 0;
117161
EE.errors = NULL;
118-
status = shared->status = SchdExecutorWork;
162+
if(shared->new_job)
163+
{
164+
shared->new_job = false;
165+
}
166+
else
167+
{
168+
return 0;
169+
}
170+
171+
*status = shared->status = SchdExecutorWork;
119172
shared->message[0] = 0;
120173

121174
pgstat_report_activity(STATE_RUNNING, "initialize job");
@@ -125,8 +178,8 @@ int do_one_job(schd_executor_share_t *shared)
125178
if(shared->message[0] == 0)
126179
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
127180
"Cannot retrive job information");
128-
shared->status = SchdExecutorError;
129181
shared->worker_exit = true;
182+
*status = shared->status = SchdExecutorError;
130183

131184
return -1;
132185
}
@@ -146,14 +199,11 @@ int do_one_job(schd_executor_share_t *shared)
146199
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
147200
"Cannot set session auth: unknown error");
148201
}
202+
*status = shared->worker_exit = true;
149203
shared->status = SchdExecutorError;
150-
shared->worker_exit = true;
151204
return -2;
152205
}
153206

154-
pqsignal(SIGTERM, handle_sigterm);
155-
BackgroundWorkerUnblockSignals();
156-
157207
pgstat_report_activity(STATE_RUNNING, "process job");
158208
CHECK_FOR_INTERRUPTS();
159209
/* rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 0);
@@ -183,7 +233,7 @@ int do_one_job(schd_executor_share_t *shared)
183233
if(ret < 0)
184234
{
185235
/* success = false; */
186-
status = SchdExecutorError;
236+
*status = SchdExecutorError;
187237
if(error)
188238
{
189239
push_executor_error(&EE, "error in command #%d: %s",
@@ -209,7 +259,7 @@ int do_one_job(schd_executor_share_t *shared)
209259
}
210260
}
211261
}
212-
if(status != SchdExecutorError)
262+
if(*status != SchdExecutorError)
213263
{
214264
if(job->same_transaction)
215265
{
@@ -219,8 +269,8 @@ int do_one_job(schd_executor_share_t *shared)
219269
{
220270
if(job->attempt >= job->resubmit_limit)
221271
{
222-
status = SchdExecutorError;
223-
#ifdef HAVE_INT64
272+
*status = SchdExecutorError;
273+
#ifdef HAVE_LONG_INT_64
224274
push_executor_error(&EE, "Cannot resubmit: limit reached (%ld)", job->resubmit_limit);
225275
#else
226276
push_executor_error(&EE, "Cannot resubmit: limit reached (%lld)", job->resubmit_limit);
@@ -229,12 +279,12 @@ int do_one_job(schd_executor_share_t *shared)
229279
}
230280
else
231281
{
232-
status = SchdExecutorResubmit;
282+
*status = SchdExecutorResubmit;
233283
}
234284
}
235285
else
236286
{
237-
status = SchdExecutorDone;
287+
*status = SchdExecutorDone;
238288
}
239289

240290
SetConfigOption("schedule.transaction_state", "success", PGC_INTERNAL, PGC_S_SESSION);
@@ -255,8 +305,7 @@ int do_one_job(schd_executor_share_t *shared)
255305
{
256306
set_shared_message(shared, &EE);
257307
}
258-
shared->status = status;
259-
if(status == SchdExecutorResubmit)
308+
if(*status == SchdExecutorResubmit)
260309
{
261310
shared->next_time = timestamp_add_seconds(0, resubmit_current_job);
262311
resubmit_current_job = 0;

src/scheduler_executor.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ typedef enum {
1212
SchdExecutorWork,
1313
SchdExecutorDone,
1414
SchdExecutorResubmit,
15-
SchdExecutorError
15+
SchdExecutorError,
16+
SchdExecutorLimitReached
1617
} schd_executor_status_t;
1718

1819
typedef struct {
20+
bool new_job;
21+
1922
char database[PGPRO_SCHEDULER_DBNAME_MAX];
2023
char nodename[PGPRO_SCHEDULER_NODENAME_MAX];
2124
char user[NAMEDATALEN];
@@ -50,7 +53,8 @@ int executor_onrollback(job_t *job, executor_error_t *ee);
5053
void set_pg_var(bool resulti, executor_error_t *ee);
5154
int push_executor_error(executor_error_t *e, char *fmt, ...) pg_attribute_printf(2, 3);
5255
int set_session_authorization(char *username, char **error);
53-
int do_one_job(schd_executor_share_t *shared);
56+
int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status);
57+
int read_worker_job_limit(void);
5458

5559
extern Datum get_self_id(PG_FUNCTION_ARGS);
5660
extern Datum resubmit(PG_FUNCTION_ARGS);

0 commit comments

Comments
 (0)