Skip to content

Commit 1bc1681

Browse files
author
Vladimir Ershov
committed
a step toward reusable workers
1 parent 8e8475a commit 1bc1681

File tree

4 files changed

+138
-56
lines changed

4 files changed

+138
-56
lines changed

src/scheduler_executor.c

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,9 @@ void executor_worker_main(Datum arg)
6161
{
6262
schd_executor_share_t *shared;
6363
dsm_segment *seg;
64-
job_t *job;
65-
int i;
66-
executor_error_t EE;
67-
int ret;
68-
char *error = NULL;
69-
/* bool use_pg_vars = true; */
70-
/* bool success = true; */
71-
schd_executor_status_t status;
72-
73-
EE.n = 0;
74-
EE.errors = NULL;
64+
int result;
65+
int64 jobs_done = 0;
66+
int64 worker_jobs_limit = 1;
7567

7668
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler_executor");
7769
seg = dsm_attach(DatumGetInt32(arg));
@@ -87,13 +79,44 @@ void executor_worker_main(Datum arg)
8779
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
8880
errmsg("executor corrupted dynamic shared memory segment")));
8981
}
90-
status = shared->status = SchdExecutorWork;
91-
shared->message[0] = 0;
9282

9383
SetConfigOption("application_name", "pgp-s executor", PGC_USERSET, PGC_S_SESSION);
9484
pgstat_report_activity(STATE_RUNNING, "initialize");
9585
init_worker_mem_ctx("ExecutorMemoryContext");
9686
BackgroundWorkerInitializeConnection(shared->database, NULL);
87+
/* TODO check latch, wait signals, die */
88+
while(1)
89+
{
90+
result = do_one_job(shared);
91+
if(result < 0)
92+
{
93+
delete_worker_mem_ctx();
94+
dsm_detach(seg);
95+
proc_exit(0);
96+
}
97+
if(++jobs_done >= worker_jobs_limit) break;
98+
}
99+
100+
shared->worker_exit = true;
101+
102+
delete_worker_mem_ctx();
103+
dsm_detach(seg);
104+
proc_exit(0);
105+
}
106+
107+
int do_one_job(schd_executor_share_t *shared)
108+
{
109+
executor_error_t EE;
110+
char *error = NULL;
111+
schd_executor_status_t status;
112+
int i;
113+
job_t *job;
114+
int ret;
115+
116+
EE.n = 0;
117+
EE.errors = NULL;
118+
status = shared->status = SchdExecutorWork;
119+
shared->message[0] = 0;
97120

98121
pgstat_report_activity(STATE_RUNNING, "initialize job");
99122
job = initializeExecutorJob(shared);
@@ -103,9 +126,9 @@ void executor_worker_main(Datum arg)
103126
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
104127
"Cannot retrive job information");
105128
shared->status = SchdExecutorError;
106-
delete_worker_mem_ctx();
107-
dsm_detach(seg);
108-
proc_exit(0);
129+
shared->worker_exit = true;
130+
131+
return -1;
109132
}
110133
current_job_id = job->cron_id;
111134
pgstat_report_activity(STATE_RUNNING, "job initialized");
@@ -124,9 +147,8 @@ void executor_worker_main(Datum arg)
124147
"Cannot set session auth: unknown error");
125148
}
126149
shared->status = SchdExecutorError;
127-
delete_worker_mem_ctx();
128-
dsm_detach(seg);
129-
proc_exit(0);
150+
shared->worker_exit = true;
151+
return -2;
130152
}
131153

132154
pqsignal(SIGTERM, handle_sigterm);
@@ -198,7 +220,11 @@ void executor_worker_main(Datum arg)
198220
if(job->attempt >= job->resubmit_limit)
199221
{
200222
status = SchdExecutorError;
223+
#ifdef HAVE_INT64
201224
push_executor_error(&EE, "Cannot resubmit: limit reached (%ld)", job->resubmit_limit);
225+
#else
226+
push_executor_error(&EE, "Cannot resubmit: limit reached (%lld)", job->resubmit_limit);
227+
#endif
202228
resubmit_current_job = 0;
203229
}
204230
else
@@ -215,11 +241,6 @@ void executor_worker_main(Datum arg)
215241
}
216242
if(job->next_time_statement)
217243
{
218-
/* if(use_pg_vars)
219-
{
220-
set_pg_var(success, &EE);
221-
}
222-
*/
223244
shared->next_time = get_next_excution_time(job->next_time_statement, &EE);
224245
if(shared->next_time == 0)
225246
{
@@ -240,11 +261,11 @@ void executor_worker_main(Datum arg)
240261
shared->next_time = timestamp_add_seconds(0, resubmit_current_job);
241262
resubmit_current_job = 0;
242263
}
264+
destroy_job(job, 1);
243265

244-
delete_worker_mem_ctx();
245-
dsm_detach(seg);
246-
proc_exit(0);
266+
return 1;
247267
}
268+
248269

249270
int set_session_authorization(char *username, char **error)
250271
{

src/scheduler_executor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ typedef struct {
3333

3434
bool set_invalid;
3535
char set_invalid_reason[PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX];
36+
37+
bool worker_exit;
3638
} schd_executor_share_t;
3739

3840
typedef struct {
@@ -48,6 +50,7 @@ int executor_onrollback(job_t *job, executor_error_t *ee);
4850
void set_pg_var(bool resulti, executor_error_t *ee);
4951
int push_executor_error(executor_error_t *e, char *fmt, ...) pg_attribute_printf(2, 3);
5052
int set_session_authorization(char *username, char **error);
53+
int do_one_job(schd_executor_share_t *shared);
5154

5255
extern Datum get_self_id(PG_FUNCTION_ARGS);
5356
extern Datum resubmit(PG_FUNCTION_ARGS);

src/scheduler_manager.c

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,9 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
680680
scheduler_manager_pool_t *p;
681681
scheduler_manager_slot_t *item;
682682
int ret;
683+
int idx;
684+
schd_executor_share_t *sdata;
685+
PGPROC *worker;
683686

684687
p = job->type == CronJob ? &(ctx->cron) : &(ctx->at);
685688
if(p->free == 0)
@@ -691,27 +694,52 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
691694
set_cron_job_started(job): set_at_job_started(job);
692695
if(ret)
693696
{
694-
item = worker_alloc(sizeof(scheduler_manager_slot_t));
695-
item->job = worker_alloc(sizeof(job_t));
696-
memcpy(item->job, job, sizeof(job_t));
697+
idx = p->len - p->free; /* next free slot */
697698

698-
item->started = GetCurrentTimestamp();
699-
item->wait_worker_to_die = false;
700-
item->stop_it = job->timelimit ?
699+
if(p->slots[idx] && p->slots[idx]->is_free)
700+
{
701+
item = p->slots[idx];
702+
item->job = worker_alloc(sizeof(job_t));
703+
memcpy(item->job, job, sizeof(job_t));
704+
item->started = GetCurrentTimestamp();
705+
item->wait_worker_to_die = false;
706+
item->stop_it = job->timelimit ?
701707
timestamp_add_seconds(0, job->timelimit): 0;
708+
sdata = dsm_segment_address(item->shared);
702709

703-
if(launch_executor_worker(ctx, item) == 0)
704-
{
705-
pfree(item->job);
706-
pfree(item);
707-
return 0;
710+
init_executor_shared_data(sdata, ctx, item->job);
711+
worker = BackendPidGetProc(item->pid);
712+
if(worker)
713+
{
714+
item->is_free = false;
715+
SetLatch(&worker->procLatch);
716+
}
717+
else
718+
{
719+
return 0;
720+
}
708721
}
722+
else
723+
{
724+
/* need to launch new worker to process job */
725+
item = worker_alloc(sizeof(scheduler_manager_slot_t));
726+
item->job = worker_alloc(sizeof(job_t));
727+
memcpy(item->job, job, sizeof(job_t));
728+
729+
item->started = item->worker_started = GetCurrentTimestamp();
730+
item->wait_worker_to_die = false;
731+
item->stop_it = job->timelimit ?
732+
timestamp_add_seconds(0, job->timelimit): 0;
709733

710-
/* rrr = rand() % 30;
711-
elog(LOG, " -- set timeout in %d sec", rrr);
712-
item->stop_it = timestamp_add_seconds(0, rrr); */
713-
714-
p->slots[p->len - (p->free--)] = item;
734+
if(launch_executor_worker(ctx, item) == 0)
735+
{
736+
pfree(item->job);
737+
pfree(item);
738+
return 0;
739+
}
740+
p->slots[idx] = item;
741+
}
742+
p->free--;
715743
job->cron_id = -1; /* job copied to slot - no need to be destroyed */
716744

717745
return 1;
@@ -739,17 +767,7 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
739767
item->shared = seg;
740768
shm_data = dsm_segment_address(item->shared);
741769

742-
shm_data->status = SchdExecutorInit;
743-
memcpy(shm_data->database, ctx->database, strlen(ctx->database));
744-
memcpy(shm_data->nodename, ctx->nodename, strlen(ctx->nodename));
745-
memcpy(shm_data->user, item->job->executor, NAMEDATALEN);
746-
shm_data->cron_id = item->job->cron_id;
747-
shm_data->start_at = item->job->start_at;
748-
shm_data->type = item->job->type;
749-
shm_data->message[0] = 0;
750-
shm_data->next_time = 0;
751-
shm_data->set_invalid = false;
752-
shm_data->set_invalid_reason[0] = 0;
770+
init_executor_shared_data(shm_data, ctx, item->job);
753771

754772
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
755773
BGWORKER_BACKEND_DATABASE_CONNECTION;
@@ -783,6 +801,21 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
783801
return item->pid;
784802
}
785803

804+
void init_executor_shared_data(schd_executor_share_t *data, scheduler_manager_ctx_t *ctx, job_t *job)
805+
{
806+
data->status = SchdExecutorInit;
807+
memcpy(data->database, ctx->database, strlen(ctx->database));
808+
memcpy(data->nodename, ctx->nodename, strlen(ctx->nodename));
809+
memcpy(data->user, job->executor, NAMEDATALEN);
810+
data->cron_id = job->cron_id;
811+
data->start_at = job->start_at;
812+
data->type = job->type;
813+
data->message[0] = 0;
814+
data->next_time = 0;
815+
data->set_invalid = false;
816+
data->set_invalid_reason[0] = 0;
817+
}
818+
786819
int scheduler_start_jobs(scheduler_manager_ctx_t *ctx, task_type_t type)
787820
{
788821
int interval = 20;
@@ -955,12 +988,14 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
955988
{
956989
toremove[nremove].pos = i;
957990
toremove[nremove].reason = RmWaitWorker;
991+
toremove[nremove].vanish_item = true;
958992
nremove++;
959993
}
960994
else if(item->stop_it && item->stop_it < GetCurrentTimestamp())
961995
{
962996
toremove[nremove].pos = i;
963997
toremove[nremove].reason = RmTimeout;
998+
toremove[nremove].vanish_item = true;
964999
nremove++;
9651000
}
9661001
else
@@ -970,12 +1005,14 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
9701005
{
9711006
toremove[nremove].pos = i;
9721007
toremove[nremove].reason = shm_data->status == SchdExecutorDone ? RmDone: RmError;
1008+
toremove[nremove].vanish_item = shm_data->worker_exit;
9731009
nremove++;
9741010
}
9751011
else if(shm_data->status == SchdExecutorResubmit)
9761012
{
9771013
toremove[nremove].pos = i;
9781014
toremove[nremove].reason = RmDoneResubmit;
1015+
toremove[nremove].vanish_item = shm_data->worker_exit;
9791016
nremove++;
9801017
}
9811018
}
@@ -1100,13 +1137,28 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
11001137
STOP_SPI_SNAP();
11011138

11021139
last = p->len - p->free - 1;
1103-
destroy_slot_item(item);
1140+
if(toremove[i].vanish_item)
1141+
{
1142+
destroy_slot_item(item);
1143+
}
1144+
else
1145+
{
1146+
item->is_free = true;
1147+
}
11041148

11051149
if(toremove[i].pos != last)
11061150
{
11071151
_pdebug("--- move from %d to %d", last, toremove[i].pos);
11081152
p->slots[toremove[i].pos] = p->slots[last];
1109-
p->slots[last] = NULL;
1153+
if(toremove[i].vanish_item)
1154+
{
1155+
p->slots[last] = NULL;
1156+
}
1157+
else
1158+
{
1159+
p->slots[last] = item;
1160+
}
1161+
11101162
for(j=i+1; j < nremove; j++)
11111163
{
11121164
if(toremove[j].pos == last)

src/scheduler_manager.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include "utils/memutils.h"
1010
#include "bit_array.h"
1111
#include "scheduler_job.h"
12+
#include "scheduler_executor.h"
1213

1314
#define CEO_MIN_POS 0
1415
#define CEO_HRS_POS 1
@@ -35,9 +36,13 @@ typedef enum {
3536
typedef struct {
3637
int pos;
3738
schd_remove_reason_t reason;
39+
bool vanish_item;
3840
} scheduler_rm_item_t;
3941

4042
typedef struct {
43+
TimestampTz worker_started;
44+
bool is_free;
45+
4146
TimestampTz started;
4247
TimestampTz stop_it;
4348

@@ -110,5 +115,6 @@ int set_at_job_started(job_t *job);
110115
int init_manager_pool(scheduler_manager_pool_t *p, int N);
111116
int refresh_manager_pool(const char *database, const char *name, scheduler_manager_pool_t *p, int N);
112117
void destroy_scheduler_manager_pool(scheduler_manager_pool_t *p);
118+
void init_executor_shared_data(schd_executor_share_t *data, scheduler_manager_ctx_t *ctx, job_t *job);
113119

114120
#endif

0 commit comments

Comments
 (0)