Skip to content

Commit 4c27c2d

Browse files
author
Vladimir Ershov
committed
Merge commit 'ba22bba356045511f495d21691d19e44a9bc7d0e' into PGPROEE9_6_scheduler
Conflicts: contrib/pgpro_scheduler/src/pgpro_scheduler.c contrib/pgpro_scheduler/src/scheduler_job.c contrib/pgpro_scheduler/src/scheduler_manager.c contrib/pgpro_scheduler/src/scheduler_spi_utils.c
2 parents 30dc23f + ba22bba commit 4c27c2d

11 files changed

+548
-341
lines changed

contrib/pgpro_scheduler/src/memutils.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44

55
MemoryContext SchedulerWorkerContext = NULL;
66

7+
MemoryContext init_mem_ctx(const char *name)
8+
{
9+
return AllocSetContextCreate(TopMemoryContext, name,
10+
ALLOCSET_DEFAULT_MINSIZE,
11+
ALLOCSET_DEFAULT_INITSIZE,
12+
ALLOCSET_DEFAULT_MAXSIZE);
13+
}
14+
715
MemoryContext init_worker_mem_ctx(const char *name)
816
{
917
AssertState(SchedulerWorkerContext == NULL);
@@ -31,4 +39,5 @@ void delete_worker_mem_ctx(void)
3139
{
3240
MemoryContextSwitchTo(TopMemoryContext);
3341
MemoryContextDelete(SchedulerWorkerContext);
42+
SchedulerWorkerContext = NULL;
3443
}

contrib/pgpro_scheduler/src/memutils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
extern MemoryContext SchedulerWorkerContext;
88

99
MemoryContext init_worker_mem_ctx(const char *name);
10+
MemoryContext init_mem_ctx(const char *name);
1011
MemoryContext switch_to_worker_context(void);
1112
void *worker_alloc(Size size);
1213
void delete_worker_mem_ctx(void);

contrib/pgpro_scheduler/src/pgpro_scheduler.c

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ char *make_date_from_timestamp(TimestampTz ts, bool hires)
119119
fsec_t fsec;
120120
const char *tzn;
121121

122-
timestamp2tm(ts, &tz, &dt, &fsec, &tzn, NULL );
122+
timestamp2tm(ts, &tz, &dt, &fsec, &tzn, NULL );
123123
sprintf(str, "%04d-%02d-%02d %02d:%02d:%02d", dt.tm_year , dt.tm_mon,
124124
dt.tm_mday, dt.tm_hour, dt.tm_min, dt.tm_sec);
125125
if(!hires) str[16] = 0;
@@ -171,14 +171,14 @@ char *set_schema(const char *name, bool get_old)
171171
bool free_name = false;
172172

173173
if(get_old)
174-
current = _copy_string((char *)GetConfigOption("search_path", true, false));
174+
current = _mcopy_string(NULL, (char *)GetConfigOption("search_path", true, false));
175175
if(name)
176176
{
177177
schema_name = (char *)name;
178178
}
179179
else
180180
{
181-
schema_name = _copy_string((char *)GetConfigOption("schedule.schema", true, false));
181+
schema_name = _mcopy_string(NULL, (char *)GetConfigOption("schedule.schema", true, false));
182182
free_name = true;
183183
}
184184
SetConfigOption("search_path", schema_name, PGC_USERSET, PGC_S_SESSION);
@@ -264,7 +264,7 @@ char_array_t *readBasesToCheck(void)
264264
{
265265
appendStringInfo(&sql, "'%s'", names->data[i]);
266266
if(i + 1 != names->n) appendStringInfo(&sql, ",");
267-
}
267+
}
268268
destroyCharArray(names);
269269
appendStringInfo(&sql, ")");
270270

@@ -304,7 +304,7 @@ void parent_scheduler_main(Datum arg)
304304
CurrentResourceOwner = ResourceOwnerCreate(NULL, "pgpro_scheduler");
305305

306306
init_worker_mem_ctx("Parent scheduler context");
307-
elog(LOG, "Start PostgresPro scheduler.");
307+
elog(LOG, "Start PostgresPro scheduler.");
308308

309309
SetConfigOption("application_name", "pgp-s supervisor", PGC_USERSET, PGC_S_SESSION);
310310
pgstat_report_activity(STATE_RUNNING, "Initialize");
@@ -358,7 +358,7 @@ void parent_scheduler_main(Datum arg)
358358
destroyCharArray(names);
359359
}
360360
}
361-
else
361+
else
362362
{
363363
for(i=0; i < pool->n; i++)
364364
{
@@ -417,7 +417,7 @@ pg_scheduler_startup(void)
417417
memcpy(worker.bgw_library_name, "pgpro_scheduler", 16);
418418
memcpy(worker.bgw_name, "pgpro scheduler", 16);
419419

420-
RegisterBackgroundWorker(&worker);
420+
RegisterBackgroundWorker(&worker);
421421
}
422422

423423
void _PG_init(void)
@@ -468,7 +468,7 @@ void _PG_init(void)
468468
"schedule.transaction_state",
469469
"State of scheduler executor transaction",
470470
"If not under scheduler executor process the variable has no mean and has a value = 'undefined', possible values: progress, success, failure",
471-
&scheduler_transaction_state ,
471+
&scheduler_transaction_state ,
472472
"undefined",
473473
PGC_INTERNAL,
474474
0,
@@ -483,7 +483,7 @@ void _PG_init(void)
483483
&scheduler_max_workers,
484484
2,
485485
1,
486-
100,
486+
1000,
487487
PGC_SUSET,
488488
0,
489489
NULL,
@@ -497,7 +497,7 @@ void _PG_init(void)
497497
&scheduler_max_parallel_workers,
498498
2,
499499
1,
500-
100,
500+
1000,
501501
PGC_SUSET,
502502
0,
503503
NULL,
@@ -542,14 +542,14 @@ cron_string_to_json_text(PG_FUNCTION_ARGS)
542542
text *text_p;
543543
int len;
544544
char *error = NULL;
545-
545+
546546
if(PG_ARGISNULL(0))
547-
{
547+
{
548548
PG_RETURN_NULL();
549549
}
550550
source = PG_GETARG_CSTRING(0);
551551
jsonText = parse_crontab_to_json_text(source);
552-
552+
553553
if(jsonText)
554554
{
555555
len = strlen(jsonText);

contrib/pgpro_scheduler/src/scheduler_executor.c

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,13 @@ void executor_worker_main(Datum arg)
139139
}
140140
else if(result < 0)
141141
{
142+
if(result == -100)
143+
{
144+
snprintf(shared->message, PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX,
145+
"Cannot allocate memory");
146+
shared->worker_exit = true;
147+
shared->status = SchdExecutorError;
148+
}
142149
delete_worker_mem_ctx();
143150
dsm_detach(seg);
144151
proc_exit(0);
@@ -159,9 +166,11 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
159166
{
160167
executor_error_t EE;
161168
char *error = NULL;
162-
int i;
169+
int i, ret;
163170
job_t *job;
164171
spi_response_t *r;
172+
MemoryContext old, mem;
173+
char buffer[1024];
165174

166175
EE.n = 0;
167176
EE.errors = NULL;
@@ -174,6 +183,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
174183
return 0;
175184
}
176185

186+
mem = init_mem_ctx("executor");
187+
old = MemoryContextSwitchTo(mem);
188+
177189
*status = shared->status = SchdExecutorWork;
178190
shared->message[0] = 0;
179191

@@ -187,6 +199,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
187199
shared->worker_exit = true;
188200
*status = shared->status = SchdExecutorError;
189201

202+
MemoryContextSwitchTo(old);
203+
MemoryContextDelete(mem);
204+
190205
return -1;
191206
}
192207
current_job_id = job->cron_id;
@@ -207,6 +222,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
207222
}
208223
*status = shared->worker_exit = true;
209224
shared->status = SchdExecutorError;
225+
MemoryContextSwitchTo(old);
226+
MemoryContextDelete(mem);
210227
return -2;
211228
}
212229

@@ -230,30 +247,35 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
230247
}
231248
if(job->type == AtJob && i == 0 && job->sql_params_n > 0)
232249
{
233-
r = execute_spi_params_prepared(job->dosql[i], job->sql_params_n, job->sql_params);
250+
r = execute_spi_params_prepared(mem, job->dosql[i], job->sql_params_n, job->sql_params);
234251
}
235252
else
236253
{
237-
r = execute_spi(job->dosql[i]);
254+
r = execute_spi(mem, job->dosql[i]);
238255
}
256+
snprintf(buffer, 1024, "finalize: %s", job->dosql[i]);
257+
if(!r) return -100; /* cannot allocate memory */
258+
pgstat_report_activity(STATE_RUNNING, buffer);
239259
if(r->retval < 0)
240260
{
241261
/* success = false; */
242262
*status = SchdExecutorError;
243263
if(r->error)
244264
{
245-
push_executor_error(&EE, "error in command #%d: %s",
265+
ret = push_executor_error(&EE, "error in command #%d: %s",
246266
i+1, r->error);
247267
}
248268
else
249269
{
250-
push_executor_error(&EE, "error in command #%d: code: %d",
270+
ret = push_executor_error(&EE, "error in command #%d: code: %d",
251271
i+1, r->retval);
252272
}
273+
if(ret < 0) return -100; /* cannot alloc memory */
253274
destroy_spi_data(r);
254275
ABORT_SPI_SNAP();
255276
SetConfigOption("schedule.transaction_state", "failure", PGC_INTERNAL, PGC_S_SESSION);
256-
executor_onrollback(job, &EE);
277+
if(executor_onrollback(mem, job, &EE) == -14000)
278+
return -100; /* cannot alloc memory */
257279

258280
break;
259281
}
@@ -321,6 +343,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
321343

322344
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);
323345
ResetAllOptions();
346+
MemoryContextSwitchTo(old);
347+
MemoryContextDelete(mem);
324348

325349
return 1;
326350
}
@@ -336,23 +360,24 @@ int set_session_authorization(char *username, char **error)
336360
int rv;
337361
char *sql = "select oid, rolsuper from pg_catalog.pg_roles where rolname = $1";
338362
char buff[1024];
363+
MemoryContext mem = CurrentMemoryContext;
339364

340365
values[0] = CStringGetTextDatum(username);
341366
START_SPI_SNAP();
342-
r = execute_spi_sql_with_args(sql, 1, types, values, NULL);
367+
r = execute_spi_sql_with_args(mem, sql, 1, types, values, NULL);
343368

344369
if(r->retval < 0)
345370
{
346371
rv = r->retval;
347-
*error = _copy_string(r->error);
372+
*error = _mcopy_string(mem, r->error);
348373
destroy_spi_data(r);
349374
return rv;
350375
}
351376
if(r->n_rows == 0)
352377
{
353378
STOP_SPI_SNAP();
354379
sprintf(buff, "Cannot find user with name: %s", username);
355-
*error = _copy_string(buff);
380+
*error = _mcopy_string(mem, buff);
356381
destroy_spi_data(r);
357382

358383
return -200;
@@ -415,7 +440,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
415440

416441
START_SPI_SNAP();
417442
pgstat_report_activity(STATE_RUNNING, "culc next time execution time");
418-
r = execute_spi(sql);
443+
r = execute_spi(CurrentMemoryContext, sql);
419444
if(r->retval < 0)
420445
{
421446
if(r->error)
@@ -460,7 +485,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
460485
return ts;
461486
}
462487

463-
int executor_onrollback(job_t *job, executor_error_t *ee)
488+
int executor_onrollback(MemoryContext mem, job_t *job, executor_error_t *ee)
464489
{
465490
int rv;
466491
spi_response_t *r;
@@ -469,16 +494,18 @@ int executor_onrollback(job_t *job, executor_error_t *ee)
469494
pgstat_report_activity(STATE_RUNNING, "execure onrollback");
470495

471496
START_SPI_SNAP();
472-
r = execute_spi(job->onrollback);
497+
r = execute_spi(mem, job->onrollback);
473498
if(r->retval < 0)
474499
{
475500
if(r->error)
476501
{
477-
push_executor_error(ee, "onrollback error: %s", r->error);
502+
if(push_executor_error(ee, "onrollback error: %s", r->error) < 0)
503+
return -14000;
478504
}
479505
else
480506
{
481-
push_executor_error(ee, "onrollback error: unknown: %d", r->retval);
507+
if(push_executor_error(ee, "onrollback error: unknown: %d", r->retval) < 0)
508+
return -14000;
482509
}
483510
ABORT_SPI_SNAP();
484511
}
@@ -502,7 +529,7 @@ void set_pg_var(bool result, executor_error_t *ee)
502529

503530
vals[0] = PointerGetDatum(cstring_to_text(result ? "success": "failure"));
504531

505-
r = execute_spi_sql_with_args(sql, 1, argtypes, vals, NULL);
532+
r = execute_spi_sql_with_args(NULL, sql, 1, argtypes, vals, NULL);
506533
if(r->retval < 0)
507534
{
508535
if(r->error)
@@ -571,6 +598,10 @@ int push_executor_error(executor_error_t *e, char *fmt, ...)
571598
{
572599
e->errors = repalloc(e->errors, sizeof(char *) * (e->n+1));
573600
}
601+
if(e->errors == NULL)
602+
{
603+
return -1;
604+
}
574605
e->errors[e->n] = worker_alloc(sizeof(char)*(len + 1));
575606
memcpy(e->errors[e->n], buf, len+1);
576607

@@ -712,14 +743,16 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
712743
int set_ret;
713744
char buff[512];
714745
spi_response_t *r;
746+
MemoryContext old;
747+
MemoryContext mem = init_mem_ctx("at job processor");
748+
old = MemoryContextSwitchTo(mem);
715749

716750
*status = shared->status = SchdExecutorWork;
717751

718752
pgstat_report_activity(STATE_RUNNING, "initialize at job");
719753
START_SPI_SNAP();
720754

721-
/* job = get_next_at_job_with_lock(shared->nodename, &error); */
722-
job = get_at_job_for_process(shared->nodename, &error);
755+
job = get_at_job_for_process(mem, shared->nodename, &error);
723756
if(!job)
724757
{
725758
if(error)
@@ -765,7 +798,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
765798
return -1;
766799
}
767800
STOP_SPI_SNAP();
768-
elog(LOG, "JOB MOVED TO DONE");
801+
MemoryContextSwitchTo(old);
802+
MemoryContextDelete(mem);
769803
return 1;
770804
}
771805

@@ -780,11 +814,11 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
780814

781815
if(job->sql_params_n > 0)
782816
{
783-
r = execute_spi_params_prepared(job->dosql[0], job->sql_params_n, job->sql_params);
817+
r = execute_spi_params_prepared(mem, job->dosql[0], job->sql_params_n, job->sql_params);
784818
}
785819
else
786820
{
787-
r = execute_spi(job->dosql[0]);
821+
r = execute_spi(mem, job->dosql[0]);
788822
}
789823
if(job->timelimit)
790824
{
@@ -819,6 +853,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
819853
if(set_ret > 0)
820854
{
821855
STOP_SPI_SNAP();
856+
MemoryContextSwitchTo(old);
857+
MemoryContextDelete(mem);
822858
return 1;
823859
}
824860
if(set_error)
@@ -831,6 +867,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
831867
elog(LOG, "AT_EXECUTOR ERROR: set log: unknown error");
832868
}
833869
ABORT_SPI_SNAP();
870+
MemoryContextSwitchTo(old);
871+
MemoryContextDelete(mem);
834872

835873
return -1;
836874
}
@@ -846,7 +884,7 @@ Oid set_session_authorization_by_name(char *rolename, char **error)
846884
if(!HeapTupleIsValid(roleTup))
847885
{
848886
snprintf(buffer, 512, "There is no user name: %s", rolename);
849-
*error = _copy_string(buffer);
887+
*error = _mcopy_string(NULL, buffer);
850888
return InvalidOid;
851889
}
852890
rform = (Form_pg_authid) GETSTRUCT(roleTup);

0 commit comments

Comments
 (0)