Skip to content

Commit 17e9358

Browse files
author
Vladimir Ershov
committed
parallel workers
1 parent 267ed06 commit 17e9358

8 files changed

+129
-55
lines changed

pgpro_scheduler--2.0.sql

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,15 @@ CREATE TABLE at_jobs_submitted(
2828
CREATE INDEX ON at_jobs_submitted(at,submit_time);
2929
CREATE INDEX ON at_jobs_submitted (last_start_available, node);
3030

31-
-- CREATE TABLE at_jobs_process(
32-
-- start_time timestamp with time zone default now()
33-
-- ) INHERITS (at_jobs_submitted);
34-
3531
CREATE TABLE at_jobs_process (like at_jobs_submitted including all);
3632
ALTER TABLE at_jobs_process ADD start_time timestamp with time zone default now();
37-
38-
-- ALTER TABLE at_jobs_process ADD primary key (id);
3933
CREATE INDEX at_jobs_process_node_at_idx on at_jobs_process (node, at);
4034

41-
-- CREATE TABLE at_jobs_done(
42-
-- status boolean,
43-
-- reason text,
44-
-- done_time timestamp with time zone default now()
45-
-- ) INHERITS (at_jobs_process);
4635
CREATE TABLE at_jobs_done (like at_jobs_process including all);
4736
ALTER TABLE at_jobs_done ADD status boolean;
4837
ALTER TABLE at_jobs_done ADD reason text;
4938
ALTER TABLE at_jobs_done ADD done_time timestamp with time zone default now();
5039

51-
--ALTER TABLE at_jobs_done ADD primary key (id);
52-
-- CREATE INDEX at_jobs_done_node_at_idx on at_jobs_done (node, at);
5340

5441
CREATE TABLE cron(
5542
id SERIAL PRIMARY KEY,

src/pgpro_scheduler.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ char *scheduler_databases = NULL;
4646
char *scheduler_nodename = NULL;
4747
char *scheduler_transaction_state = NULL;
4848
int scheduler_max_workers = 2;
49-
int scheduler_at_max_workers = 2;
49+
int scheduler_max_parallel_workers = 2;
5050
int scheduler_worker_job_limit = 1;
5151
bool scheduler_service_enabled = false;
5252
char *scheduler_schema = NULL;
@@ -496,10 +496,10 @@ void _PG_init(void)
496496
NULL
497497
);
498498
DefineCustomIntVariable(
499-
"schedule.at_max_workers",
499+
"schedule.max_parallel_workers",
500500
"How much workers can serve at jobs on one database",
501501
NULL,
502-
&scheduler_at_max_workers,
502+
&scheduler_max_parallel_workers,
503503
2,
504504
1,
505505
100,

src/scheduler_executor.c

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "catalog/pg_authid.h"
1616
#include "utils/syscache.h"
1717
#include "access/htup_details.h"
18+
#include "utils/timeout.h"
1819

1920
#include "pgstat.h"
2021
#include "fmgr.h"
@@ -316,6 +317,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
316317
}
317318
destroy_job(job, 1);
318319

320+
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);
321+
ResetAllOptions();
322+
319323
return 1;
320324
}
321325

@@ -607,15 +611,13 @@ resubmit(PG_FUNCTION_ARGS)
607611

608612
void at_executor_worker_main(Datum arg)
609613
{
610-
schd_executor_share_t *shared;
614+
schd_executor_share_state_t *shared;
611615
dsm_segment *seg;
612616
int result;
613617
int rc = 0;
614618
schd_executor_status_t status;
615619
bool lets_sleep = false;
616620
/* PGPROC *parent; */
617-
double begin, elapsed;
618-
struct timeval tv;
619621

620622
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler_executor");
621623
seg = dsm_attach(DatumGetInt32(arg));
@@ -632,6 +634,7 @@ void at_executor_worker_main(Datum arg)
632634
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
633635
errmsg("executor corrupted dynamic shared memory segment")));
634636
}
637+
shared->start_at = GetCurrentTimestamp();
635638

636639
SetConfigOption("application_name", "pgp-s at executor", PGC_USERSET, PGC_S_SESSION);
637640
pgstat_report_activity(STATE_RUNNING, "initialize");
@@ -644,18 +647,14 @@ void at_executor_worker_main(Datum arg)
644647

645648
while(1)
646649
{
650+
if(shared->stop_worker) break;
647651
if(got_sighup)
648652
{
649653
got_sighup = false;
650654
ProcessConfigFile(PGC_SIGHUP);
651655
}
652656
CHECK_FOR_INTERRUPTS();
653-
gettimeofday(&tv, NULL);
654-
begin = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000;
655657
result = process_one_job(shared, &status);
656-
gettimeofday(&tv, NULL);
657-
elapsed = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000 - begin;
658-
elog(LOG, "job done %d = %f", result, elapsed);
659658

660659
if(result == 0)
661660
{
@@ -680,42 +679,35 @@ void at_executor_worker_main(Datum arg)
680679
}
681680
}
682681

682+
if(shared->stop_worker)
683+
{
684+
elog(LOG, "at worker stopped by parent signal");
685+
}
686+
683687
delete_worker_mem_ctx();
684688
dsm_detach(seg);
685689
proc_exit(0);
686690
}
687691

688-
int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
692+
int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t *status)
689693
{
690694
char *error = NULL;
691695
job_t *job;
692696
int ret;
693697
char buff[512];
694-
double begin, elapsed;
695-
struct timeval tv;
696698

697699
*status = shared->status = SchdExecutorWork;
698-
shared->message[0] = 0;
699700

700701
pgstat_report_activity(STATE_RUNNING, "initialize job");
701702
START_SPI_SNAP();
702703

703-
gettimeofday(&tv, NULL);
704-
begin = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000;
705-
706704
job = get_next_at_job_with_lock(shared->nodename, &error);
707705

708-
gettimeofday(&tv, NULL);
709-
elapsed = ((double)tv.tv_sec)*1000 + ((double)tv.tv_usec)/1000 - begin;
710-
elog(LOG, "got jobs = %f", elapsed);
711-
712706
if(!job)
713707
{
714708
if(error)
715709
{
716710
shared->status = SchdExecutorIdling;
717-
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
718-
"Cannot get job: %s", error);
719711
elog(LOG, "AT EXECUTOR: ERROR: %s", error);
720712
pfree(error);
721713
ABORT_SPI_SNAP();
@@ -734,15 +726,11 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
734726
if(error)
735727
{
736728
set_at_job_done(job, error, 0);
737-
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
738-
"Cannot set session auth: %s", error);
739729
pfree(error);
740730
}
741731
else
742732
{
743733
set_at_job_done(job, "Unknown set session auth error", 0);
744-
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
745-
"Cannot set session auth: unknown error");
746734
}
747735
shared->status = SchdExecutorIdling;
748736
STOP_SPI_SNAP();
@@ -761,6 +749,7 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
761749
sprintf(buff, "%lld", job->timelimit * 1000);
762750
#endif
763751
SetConfigOption("statement_timeout", buff, PGC_SUSET, PGC_S_OVERRIDE);
752+
enable_timeout_after(STATEMENT_TIMEOUT, StatementTimeout);
764753
}
765754

766755
if(job->sql_params_n > 0)
@@ -771,6 +760,10 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
771760
{
772761
ret = execute_spi(job->dosql[0], &error);
773762
}
763+
if(job->timelimit)
764+
{
765+
disable_timeout(STATEMENT_TIMEOUT, false);
766+
}
774767
ResetAllOptions();
775768
SetConfigOption("enable_seqscan", "off", PGC_USERSET, PGC_S_SESSION);
776769
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);

src/scheduler_executor.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ typedef struct {
4141
bool worker_exit;
4242
} schd_executor_share_t;
4343

44+
typedef struct {
45+
char database[PGPRO_SCHEDULER_DBNAME_MAX];
46+
char nodename[PGPRO_SCHEDULER_NODENAME_MAX];
47+
TimestampTz start_at;
48+
49+
schd_executor_status_t status;
50+
51+
bool stop_worker;
52+
} schd_executor_share_state_t;
53+
4454
typedef struct {
4555
int n;
4656
char **errors;
@@ -57,7 +67,7 @@ int set_session_authorization(char *username, char **error);
5767
int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status);
5868
int read_worker_job_limit(void);
5969
void at_executor_worker_main(Datum arg);
60-
int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *status);
70+
int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t *status);
6171
Oid set_session_authorization_by_name(char *rolename, char **error);
6272

6373
extern Datum get_self_id(PG_FUNCTION_ARGS);

src/scheduler_job.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,7 @@ int set_at_job_done(job_t *job, char *error, int64 resubmit)
454454

455455
ret = SPI_execute_with_args(sql, n, argtypes, values, nulls, false, 0);
456456

457+
457458
set_schema(oldpath, false);
458459
pfree(oldpath);
459460
if(this_error) pfree(this_error);

0 commit comments

Comments
 (0)