Skip to content

Commit e9aa92c

Browse files
author
Vladimir Ershov
committed
memory leak fix
1 parent d414013 commit e9aa92c

10 files changed

+261
-177
lines changed

src/memutils.c

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44

55
MemoryContext SchedulerWorkerContext = NULL;
66

7+
MemoryContext init_mem_ctx(const char *name)
8+
{
9+
return AllocSetContextCreate(TopMemoryContext, name,
10+
ALLOCSET_DEFAULT_MINSIZE,
11+
ALLOCSET_DEFAULT_INITSIZE,
12+
ALLOCSET_DEFAULT_MAXSIZE);
13+
}
14+
715
MemoryContext init_worker_mem_ctx(const char *name)
816
{
917
AssertState(SchedulerWorkerContext == NULL);

src/memutils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
extern MemoryContext SchedulerWorkerContext;
88

99
MemoryContext init_worker_mem_ctx(const char *name);
10+
MemoryContext init_mem_ctx(const char *name);
1011
MemoryContext switch_to_worker_context(void);
1112
void *worker_alloc(Size size);
1213
void delete_worker_mem_ctx(void);

src/pgpro_scheduler.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,14 +171,14 @@ char *set_schema(const char *name, bool get_old)
171171
bool free_name = false;
172172

173173
if(get_old)
174-
current = _copy_string((char *)GetConfigOption("search_path", true, false));
174+
current = _mcopy_string(NULL, (char *)GetConfigOption("search_path", true, false));
175175
if(name)
176176
{
177177
schema_name = (char *)name;
178178
}
179179
else
180180
{
181-
schema_name = _copy_string((char *)GetConfigOption("schedule.schema", true, false));
181+
schema_name = _mcopy_string(NULL, (char *)GetConfigOption("schedule.schema", true, false));
182182
free_name = true;
183183
}
184184
SetConfigOption("search_path", schema_name, PGC_USERSET, PGC_S_SESSION);
@@ -483,7 +483,7 @@ void _PG_init(void)
483483
&scheduler_max_workers,
484484
2,
485485
1,
486-
100,
486+
1000,
487487
PGC_SUSET,
488488
0,
489489
NULL,
@@ -497,7 +497,7 @@ void _PG_init(void)
497497
&scheduler_max_parallel_workers,
498498
2,
499499
1,
500-
100,
500+
1000,
501501
PGC_SUSET,
502502
0,
503503
NULL,

src/scheduler_executor.c

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
162162
int i;
163163
job_t *job;
164164
spi_response_t *r;
165+
MemoryContext old;
166+
MemoryContext mem = init_mem_ctx("executor");
167+
168+
old = MemoryContextSwitchTo(mem);
165169

166170
EE.n = 0;
167171
EE.errors = NULL;
@@ -230,11 +234,11 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
230234
}
231235
if(job->type == AtJob && i == 0 && job->sql_params_n > 0)
232236
{
233-
r = execute_spi_params_prepared(job->dosql[i], job->sql_params_n, job->sql_params);
237+
r = execute_spi_params_prepared(mem, job->dosql[i], job->sql_params_n, job->sql_params);
234238
}
235239
else
236240
{
237-
r = execute_spi(job->dosql[i]);
241+
r = execute_spi(mem, job->dosql[i]);
238242
}
239243
if(r->retval < 0)
240244
{
@@ -321,6 +325,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
321325

322326
SetSessionAuthorization(BOOTSTRAP_SUPERUSERID, true);
323327
ResetAllOptions();
328+
MemoryContextSwitchTo(old);
329+
MemoryContextDelete(mem);
324330

325331
return 1;
326332
}
@@ -336,23 +342,24 @@ int set_session_authorization(char *username, char **error)
336342
int rv;
337343
char *sql = "select oid, rolsuper from pg_catalog.pg_roles where rolname = $1";
338344
char buff[1024];
345+
MemoryContext mem = CurrentMemoryContext;
339346

340347
values[0] = CStringGetTextDatum(username);
341348
START_SPI_SNAP();
342-
r = execute_spi_sql_with_args(sql, 1, types, values, NULL);
349+
r = execute_spi_sql_with_args(mem, sql, 1, types, values, NULL);
343350

344351
if(r->retval < 0)
345352
{
346353
rv = r->retval;
347-
*error = _copy_string(r->error);
354+
*error = _mcopy_string(mem, r->error);
348355
destroy_spi_data(r);
349356
return rv;
350357
}
351358
if(r->n_rows == 0)
352359
{
353360
STOP_SPI_SNAP();
354361
sprintf(buff, "Cannot find user with name: %s", username);
355-
*error = _copy_string(buff);
362+
*error = _mcopy_string(mem, buff);
356363
destroy_spi_data(r);
357364

358365
return -200;
@@ -415,7 +422,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
415422

416423
START_SPI_SNAP();
417424
pgstat_report_activity(STATE_RUNNING, "culc next time execution time");
418-
r = execute_spi(sql);
425+
r = execute_spi(CurrentMemoryContext, sql);
419426
if(r->retval < 0)
420427
{
421428
if(r->error)
@@ -469,7 +476,7 @@ int executor_onrollback(job_t *job, executor_error_t *ee)
469476
pgstat_report_activity(STATE_RUNNING, "execure onrollback");
470477

471478
START_SPI_SNAP();
472-
r = execute_spi(job->onrollback);
479+
r = execute_spi(CurrentMemoryContext, job->onrollback);
473480
if(r->retval < 0)
474481
{
475482
if(r->error)
@@ -502,7 +509,7 @@ void set_pg_var(bool result, executor_error_t *ee)
502509

503510
vals[0] = PointerGetDatum(cstring_to_text(result ? "success": "failure"));
504511

505-
r = execute_spi_sql_with_args(sql, 1, argtypes, vals, NULL);
512+
r = execute_spi_sql_with_args(NULL, sql, 1, argtypes, vals, NULL);
506513
if(r->retval < 0)
507514
{
508515
if(r->error)
@@ -712,14 +719,16 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
712719
int set_ret;
713720
char buff[512];
714721
spi_response_t *r;
722+
MemoryContext old;
723+
MemoryContext mem = init_mem_ctx("at job processor");
724+
old = MemoryContextSwitchTo(mem);
715725

716726
*status = shared->status = SchdExecutorWork;
717727

718728
pgstat_report_activity(STATE_RUNNING, "initialize at job");
719729
START_SPI_SNAP();
720730

721-
/* job = get_next_at_job_with_lock(shared->nodename, &error); */
722-
job = get_at_job_for_process(shared->nodename, &error);
731+
job = get_at_job_for_process(mem, shared->nodename, &error);
723732
if(!job)
724733
{
725734
if(error)
@@ -765,7 +774,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
765774
return -1;
766775
}
767776
STOP_SPI_SNAP();
768-
elog(LOG, "JOB MOVED TO DONE");
777+
MemoryContextSwitchTo(old);
778+
MemoryContextDelete(mem);
769779
return 1;
770780
}
771781

@@ -780,11 +790,11 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
780790

781791
if(job->sql_params_n > 0)
782792
{
783-
r = execute_spi_params_prepared(job->dosql[0], job->sql_params_n, job->sql_params);
793+
r = execute_spi_params_prepared(mem, job->dosql[0], job->sql_params_n, job->sql_params);
784794
}
785795
else
786796
{
787-
r = execute_spi(job->dosql[0]);
797+
r = execute_spi(mem, job->dosql[0]);
788798
}
789799
if(job->timelimit)
790800
{
@@ -819,6 +829,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
819829
if(set_ret > 0)
820830
{
821831
STOP_SPI_SNAP();
832+
MemoryContextSwitchTo(old);
833+
MemoryContextDelete(mem);
822834
return 1;
823835
}
824836
if(set_error)
@@ -831,6 +843,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
831843
elog(LOG, "AT_EXECUTOR ERROR: set log: unknown error");
832844
}
833845
ABORT_SPI_SNAP();
846+
MemoryContextSwitchTo(old);
847+
MemoryContextDelete(mem);
834848

835849
return -1;
836850
}
@@ -846,7 +860,7 @@ Oid set_session_authorization_by_name(char *rolename, char **error)
846860
if(!HeapTupleIsValid(roleTup))
847861
{
848862
snprintf(buffer, 512, "There is no user name: %s", rolename);
849-
*error = _copy_string(buffer);
863+
*error = _mcopy_string(NULL, buffer);
850864
return InvalidOid;
851865
}
852866
rform = (Form_pg_authid) GETSTRUCT(roleTup);

0 commit comments

Comments
 (0)