Skip to content

Commit 963efe8

Browse files
author
Vladimir Ershov
committed
Merge commit '4ab60603d6e350bd036a7d172d5a3c01fa43157a' into PGPROEE9_6_scheduler
2 parents a69f3b3 + 4ab6060 commit 963efe8

File tree

10 files changed

+298
-91
lines changed

10 files changed

+298
-91
lines changed

contrib/pgpro_scheduler/pgpro_scheduler--2.0.sql

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@ SET search_path TO schedule;
55

66
CREATE TYPE job_status_t AS ENUM ('working', 'done', 'error');
77
CREATE TYPE job_at_status_t AS ENUM ('submitted', 'processing', 'done');
8+
CREATE SEQUENCE schedule.at_jobs_submitted_id_seq;
89

910
CREATE TABLE at_jobs_submitted(
10-
id SERIAL PRIMARY KEY,
11+
id int PRIMARY KEY,
1112
node text,
1213
name text,
1314
comments text,
@@ -37,6 +38,8 @@ ALTER TABLE at_jobs_done ADD status boolean;
3738
ALTER TABLE at_jobs_done ADD reason text;
3839
ALTER TABLE at_jobs_done ADD done_time timestamp with time zone default now();
3940

41+
ALTER TABLE at_jobs_submitted ALTER id SET default nextval('schedule.at_jobs_submitted_id_seq');
42+
4043

4144
CREATE TABLE cron(
4245
id SERIAL PRIMARY KEY,

contrib/pgpro_scheduler/src/char_array.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ char_array_t *makeCharArray(void)
2525

2626
char_array_t *sortCharArray(char_array_t *a)
2727
{
28+
if(a->n <= 1) return a;
2829
qsort(a->data, a->n, sizeof(char *), __sort_char_string);
2930

3031
return a;

contrib/pgpro_scheduler/src/memutils.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,6 @@ void *worker_alloc(Size size)
2929

3030
void delete_worker_mem_ctx(void)
3131
{
32+
MemoryContextSwitchTo(TopMemoryContext);
3233
MemoryContextDelete(SchedulerWorkerContext);
3334
}

contrib/pgpro_scheduler/src/pgpro_scheduler.c

Lines changed: 33 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ char_array_t *readBasesToCheck(void)
212212
pgstat_report_activity(STATE_RUNNING, "read configuration");
213213
result = makeCharArray();
214214

215-
value = GetConfigOption("schedule.database", 1, 0);
215+
value = GetConfigOption("schedule.database", true, false);
216216
if(!value || strlen(value) == 0)
217217
{
218218
return result;
@@ -254,6 +254,7 @@ char_array_t *readBasesToCheck(void)
254254
pfree(clean_value);
255255
if(names->n == 0)
256256
{
257+
destroyCharArray(names);
257258
return result;
258259
}
259260

@@ -264,45 +265,39 @@ char_array_t *readBasesToCheck(void)
264265
appendStringInfo(&sql, "'%s'", names->data[i]);
265266
if(i + 1 != names->n) appendStringInfo(&sql, ",");
266267
}
268+
destroyCharArray(names);
267269
appendStringInfo(&sql, ")");
268-
SetCurrentStatementStartTimestamp();
269-
StartTransactionCommand();
270-
SPI_connect();
271-
PushActiveSnapshot(GetTransactionSnapshot());
270+
271+
START_SPI_SNAP();
272272

273273
ret = SPI_execute(sql.data, true, 0);
274274
if (ret != SPI_OK_SELECT)
275275
{
276-
SPI_finish();
277-
PopActiveSnapshot();
278-
CommitTransactionCommand();
276+
STOP_SPI_SNAP();
277+
elog(ERROR, "cannot select from pg_database");
279278
}
280-
destroyCharArray(names);
281279
processed = SPI_processed;
282280
if(processed == 0)
283281
{
284-
SPI_finish();
285-
PopActiveSnapshot();
286-
CommitTransactionCommand();
282+
STOP_SPI_SNAP();
287283
return result;
288284
}
289285
for(i=0; i < processed; i++)
290286
{
291287
clean_value = SPI_getvalue(SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1);
292288
pushCharArray(result, clean_value);
293289
}
294-
SPI_finish();
295-
PopActiveSnapshot();
296-
CommitTransactionCommand();
290+
STOP_SPI_SNAP();
297291
sortCharArray(result);
292+
298293
return result;
299294
}
300295

301296
void parent_scheduler_main(Datum arg)
302297
{
303298
int rc = 0, i;
304299
char_array_t *names = NULL;
305-
schd_managers_poll_t *poll;
300+
schd_managers_poll_t *pool;
306301
schd_manager_share_t *shared;
307302
bool refresh = false;
308303

@@ -319,10 +314,10 @@ void parent_scheduler_main(Datum arg)
319314

320315
BackgroundWorkerInitializeConnection("postgres", NULL);
321316
names = readBasesToCheck();
322-
poll = initSchedulerManagerPool(names);
317+
pool = initSchedulerManagerPool(names);
323318
destroyCharArray(names);
324319

325-
set_supervisor_pgstatus(poll);
320+
set_supervisor_pgstatus(pool);
326321

327322
while(!got_sigterm)
328323
{
@@ -334,62 +329,62 @@ void parent_scheduler_main(Datum arg)
334329
ProcessConfigFile(PGC_SIGHUP);
335330
refresh = false;
336331
names = NULL;
337-
if(is_scheduler_enabled() != poll->enabled)
332+
if(is_scheduler_enabled() != pool->enabled)
338333
{
339-
if(poll->enabled)
334+
if(pool->enabled)
340335
{
341-
poll->enabled = false;
342-
stopAllManagers(poll);
343-
set_supervisor_pgstatus(poll);
336+
pool->enabled = false;
337+
stopAllManagers(pool);
338+
set_supervisor_pgstatus(pool);
344339
}
345340
else
346341
{
347342
refresh = true;
348-
poll->enabled = true;
343+
pool->enabled = true;
349344
names = readBasesToCheck();
350345
}
351346
}
352-
else if(poll->enabled)
347+
else if(pool->enabled)
353348
{
354349
names = readBasesToCheck();
355-
if(isBaseListChanged(names, poll)) refresh = true;
350+
if(isBaseListChanged(names, pool)) refresh = true;
356351
else destroyCharArray(names);
357352
}
358353

359354
if(refresh)
360355
{
361-
refreshManagers(names, poll);
362-
set_supervisor_pgstatus(poll);
356+
refreshManagers(names, pool);
357+
set_supervisor_pgstatus(pool);
363358
destroyCharArray(names);
364359
}
365360
}
366361
else
367362
{
368-
for(i=0; i < poll->n; i++)
363+
for(i=0; i < pool->n; i++)
369364
{
370-
shared = dsm_segment_address(poll->workers[i]->shared);
365+
shared = dsm_segment_address(pool->workers[i]->shared);
371366

372367
if(shared->setbychild)
373368
{
374-
/* elog(LOG, "got status change from: %s", poll->workers[i]->dbname); */
369+
/* elog(LOG, "got status change from: %s", pool->workers[i]->dbname); */
375370
shared->setbychild = false;
376371
if(shared->status == SchdManagerConnected)
377372
{
378-
poll->workers[i]->connected = true;
373+
pool->workers[i]->connected = true;
379374
}
380375
else if(shared->status == SchdManagerQuit)
381376
{
382-
removeManagerFromPoll(poll, poll->workers[i]->dbname, 1, true);
383-
set_supervisor_pgstatus(poll);
377+
removeManagerFromPoll(pool, pool->workers[i]->dbname, 1, true);
378+
set_supervisor_pgstatus(pool);
384379
}
385380
else if(shared->status == SchdManagerDie)
386381
{
387-
removeManagerFromPoll(poll, poll->workers[i]->dbname, 1, false);
388-
set_supervisor_pgstatus(poll);
382+
removeManagerFromPoll(pool, pool->workers[i]->dbname, 1, false);
383+
set_supervisor_pgstatus(pool);
389384
}
390385
else
391386
{
392-
elog(WARNING, "manager: %s set strange status: %d", poll->workers[i]->dbname, shared->status);
387+
elog(WARNING, "manager: %s set strange status: %d", pool->workers[i]->dbname, shared->status);
393388
}
394389
}
395390
}
@@ -399,7 +394,7 @@ void parent_scheduler_main(Datum arg)
399394
CHECK_FOR_INTERRUPTS();
400395
ResetLatch(MyLatch);
401396
}
402-
stopAllManagers(poll);
397+
stopAllManagers(pool);
403398
delete_worker_mem_ctx();
404399

405400
proc_exit(0);

contrib/pgpro_scheduler/src/sched_manager_poll.c

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ char *supervisor_state(schd_managers_poll_t *poll)
4343

4444
if(!poll->enabled)
4545
{
46-
status = palloc(sizeof(char) * 9);
46+
status = worker_alloc(sizeof(char) * 9);
4747
memcpy(status, "disabled", 8);
4848
status[8] = 0;
4949
return status;
@@ -52,13 +52,13 @@ char *supervisor_state(schd_managers_poll_t *poll)
5252
len = dbnames ? strlen(dbnames): 0;
5353
if(len == 0)
5454
{
55-
status = palloc(sizeof(char)*26);
55+
status = worker_alloc(sizeof(char)*26);
5656
memcpy(status, "waiting databases to set", 25);
5757
status[25] = 0;
5858
}
5959
else
6060
{
61-
status = palloc(sizeof(char) * (len + 10));
61+
status = worker_alloc(sizeof(char) * (len + 10));
6262
memcpy(status, "work on: ", 9);
6363
memcpy(status+9, dbnames, len);
6464
status[len+9] = 0;
@@ -82,7 +82,7 @@ char *poll_dbnames(schd_managers_poll_t *poll)
8282
if(i < (poll->n - 1))
8383
appendStringInfo(&string, ", ");
8484
}
85-
out = palloc(sizeof(char) * (string.len + 1));
85+
out = worker_alloc(sizeof(char) * (string.len + 1));
8686
memcpy(out, string.data, string.len);
8787
out[string.len] = 0;
8888
pfree(string.data);
@@ -297,29 +297,18 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
297297
schd_manager_t *man;
298298
schd_manager_share_t *shm_data;
299299
Size segsize;
300-
/* shm_toc_estimator e;
301-
shm_toc *toc; */
302300
dsm_segment *seg;
303301

304-
/* shm_toc_initialize_estimator(&e);
305-
shm_toc_estimate_chunk(&e, sizeof(schd_manager_share_t));
306-
shm_toc_estimate_keys(&e, 1);
307-
segsize = shm_toc_estimate(&e); */
308302
segsize = (Size)sizeof(schd_manager_share_t);
309303

310304
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
311305
seg = dsm_create(segsize, 0);
312306

313-
man = palloc(sizeof(schd_manager_t));
314-
man->dbname = palloc(sizeof(char *) * (strlen(name) + 1));
307+
man = worker_alloc(sizeof(schd_manager_t));
308+
man->dbname = worker_alloc(sizeof(char *) * (strlen(name) + 1));
315309
man->connected = false;
316310
memcpy(man->dbname, name, strlen(name) + 1);
317311
man->shared = seg;
318-
/* toc = shm_toc_create(PGPRO_SHM_TOC_MAGIC, dsm_segment_address(man->shared),
319-
segsize);
320-
321-
shm_data = shm_toc_allocate(toc, sizeof(schd_manager_share_t));
322-
shm_toc_insert(toc, 0, shm_data); */
323312
shm_data = dsm_segment_address(man->shared);
324313

325314
shm_data->setbyparent = true;
@@ -331,7 +320,7 @@ int addManagerToPoll(schd_managers_poll_t *poll, char *name, int sort)
331320
pos = poll->n++;
332321
poll->workers = poll->workers ?
333322
repalloc(poll->workers, sizeof(schd_manager_t *) * poll->n):
334-
palloc(sizeof(schd_manager_t *));
323+
worker_alloc(sizeof(schd_manager_t *));
335324
poll->workers[pos] = man;
336325
if(sort) _sortPollManagers(poll);
337326

0 commit comments

Comments
 (0)