Skip to content

Commit 8926ec5

Browse files
author
Vladimir Ershov
committed
Merge commit 'ced1d588f41e96557e6b7e111342e24878ca41c1' into PGPROEE9_6_scheduler
2 parents 40dd6e0 + ced1d58 commit 8926ec5

24 files changed

+792
-206
lines changed

contrib/pgpro_scheduler/pgpro_scheduler--2.1.sql

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ BEGIN
370370
END IF;
371371

372372
IF node IS NULL THEN
373-
node := 'master';
373+
node := nodename();
374374
END IF;
375375

376376
IF run_as IS NOT NULL AND run_as <> session_user THEN
@@ -482,7 +482,7 @@ CREATE FUNCTION _possible_args() RETURNS jsonb AS
482482
$BODY$
483483
BEGIN
484484
RETURN jsonb_build_object(
485-
'node', 'node name (default: master)',
485+
'node', 'node name (default: ' || nodename() || ')',
486486
'name', 'job name',
487487
'comments', 'some comments on job',
488488
'cron', 'cron string rule',
@@ -717,7 +717,7 @@ BEGIN
717717
cron := _get_cron_from_attrs(params, NULL);
718718
commands := _get_commands_from_attrs(params);
719719
executor := _get_executor_from_attrs(params);
720-
node := 'master';
720+
node := nodename();
721721
mi := 1;
722722

723723
IF params?'start_date' THEN
@@ -1426,6 +1426,11 @@ CREATE FUNCTION cron2jsontext(CSTRING)
14261426
AS 'MODULE_PATHNAME', 'cron_string_to_json_text'
14271427
LANGUAGE C IMMUTABLE;
14281428

1429+
CREATE FUNCTION nodename()
1430+
RETURNS text
1431+
AS 'MODULE_PATHNAME', 'nodename'
1432+
LANGUAGE C IMMUTABLE;
1433+
14291434
--------------
14301435
-- TRIGGERS --
14311436
--------------

contrib/pgpro_scheduler/src/cron_string.c

Lines changed: 42 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ char *_cps_append_string(char *str, char *to_add)
4949
{
5050
int end = str ? strlen(str): 0;
5151
int len = strlen(to_add);
52-
53-
str = realloc(str, end + len + 1);
52+
53+
str = str? repalloc(str, end + len + 1): palloc(end + len + 1);
5454
memcpy(str+end, to_add, len);
5555
str[end+len] = 0;
5656

@@ -99,9 +99,9 @@ void _cps_clean_charchar(char **data, int len)
9999

100100
for(i=0; i < len; i++)
101101
{
102-
free(data[i]);
102+
pfree(data[i]);
103103
}
104-
free(data);
104+
pfree(data);
105105
}
106106

107107
char **_cps_split_by(char sep, char *src, int *N)
@@ -115,15 +115,15 @@ char **_cps_split_by(char sep, char *src, int *N)
115115
{
116116
if(src[i] == sep) n++;
117117
}
118-
res = malloc(sizeof(char *)*n);
118+
res = palloc(sizeof(char *)*n);
119119
*N = n;
120120

121121
for(i=0; i < len; i++)
122122
{
123123
if(src[i] == sep)
124124
{
125125
tmp[ti++] = 0;
126-
res[ri] = (char *)malloc(sizeof(char)*(ti));
126+
res[ri] = (char *)palloc(sizeof(char)*(ti));
127127
memcpy(res[ri++], tmp, ti);
128128
ti = 0;
129129
}
@@ -133,7 +133,7 @@ char **_cps_split_by(char sep, char *src, int *N)
133133
}
134134
}
135135
tmp[ti++] = 0;
136-
res[ri] = malloc(sizeof(char)*(ti+1));
136+
res[ri] = palloc(sizeof(char)*(ti+1));
137137
memcpy(res[ri], tmp, ti+1);
138138

139139
return res;
@@ -154,7 +154,7 @@ int *_cps_parse_range(char *src, int start, int end, int *len)
154154

155155
*len = 0;
156156

157-
values = malloc(sizeof(char)*range_len);
157+
values = palloc(sizeof(char)*range_len);
158158
memset(values, 0, sizeof(char)*range_len);
159159

160160
parts = _cps_split_by(',', src, &nparts);
@@ -284,7 +284,7 @@ int *_cps_parse_range(char *src, int start, int end, int *len)
284284

285285
if(*len > 0)
286286
{
287-
ptr = malloc(sizeof(int)*(*len));
287+
ptr = palloc(sizeof(int)*(*len));
288288
}
289289
else
290290
{
@@ -301,7 +301,7 @@ int *_cps_parse_range(char *src, int start, int end, int *len)
301301
ptr = NULL;
302302
ni = 0;
303303
}
304-
free(values);
304+
pfree(values);
305305
_cps_clean_charchar(parts, nparts);
306306

307307
return ptr;
@@ -311,8 +311,8 @@ char *_cps_subst_str(char *str, char **subst_array, int subst_len, int step)
311311
{
312312
int len = strlen(str);
313313
char *new_str = NULL;
314-
int *candidats = (int *)malloc(sizeof(int)*subst_len);
315-
int *new_cands = (int *)malloc(sizeof(int)*subst_len);
314+
int *candidats = (int *)palloc(sizeof(int)*subst_len);
315+
int *new_cands = (int *)palloc(sizeof(int)*subst_len);
316316
int cands_num = 0;
317317
int new_cands_num = 0;
318318
int this_clen;
@@ -359,12 +359,15 @@ char *_cps_subst_str(char *str, char **subst_array, int subst_len, int step)
359359
candidats[j] = j;
360360
}
361361
matches_count++;
362-
matches = realloc(matches, (sizeof(match_ent_t) * matches_count));
362+
matches =
363+
matches ?
364+
repalloc(matches, (sizeof(match_ent_t) * matches_count)):
365+
palloc(sizeof(match_ent_t) * matches_count);
363366
matches[matches_count-1].start = i - cand_step;
364367
matches[matches_count-1].end = i;
365368

366369
matches[matches_count-1].subst_len = sprintf(buff, "%d", has_match + step);
367-
matches[matches_count-1].subst = malloc(sizeof(matches[matches_count-1].subst_len));
370+
matches[matches_count-1].subst = palloc(sizeof(matches[matches_count-1].subst_len));
368371
memcpy(matches[matches_count-1].subst, buff, matches[matches_count-1].subst_len);
369372

370373
cands_num = subst_len;
@@ -394,8 +397,8 @@ char *_cps_subst_str(char *str, char **subst_array, int subst_len, int step)
394397
new_cands_num = 0;
395398
}
396399
}
397-
free(candidats);
398-
free(new_cands);
400+
pfree(candidats);
401+
pfree(new_cands);
399402

400403
if(matches_count > 0)
401404
{
@@ -404,7 +407,7 @@ char *_cps_subst_str(char *str, char **subst_array, int subst_len, int step)
404407
new_len += (matches[i].end - matches[i].start + 1) - matches[i].subst_len ;
405408
}
406409
new_len = len - new_len;
407-
new_str = malloc(sizeof(char) * new_len+1);
410+
new_str = palloc(sizeof(char) * new_len+1);
408411

409412
for(i=0; i < matches_count; i++)
410413
{
@@ -415,16 +418,16 @@ char *_cps_subst_str(char *str, char **subst_array, int subst_len, int step)
415418
new_curr += matches[i].subst_len;
416419
curr = matches[i].end+1;
417420

418-
free(matches[i].subst);
421+
pfree(matches[i].subst);
419422
}
420423
if(curr < (len-1))
421424
{
422425
memcpy(&new_str[new_curr], &(str[curr]), len - curr);
423426
new_curr += (len - curr);
424427
}
425428
new_str[new_curr] = 0;
426-
free(matches);
427-
free(str);
429+
pfree(matches);
430+
pfree(str);
428431
return new_str;
429432
}
430433

@@ -433,13 +436,13 @@ char *_cps_subst_str(char *str, char **subst_array, int subst_len, int step)
433436

434437
void destroyCronEnt(cron_ent_t *e)
435438
{
436-
if(e->mins) free(e->mins);
437-
if(e->hour) free(e->hour);
438-
if(e->day) free(e->day);
439-
if(e->month) free(e->month);
440-
if(e->wdays) free(e->wdays);
439+
if(e->mins) pfree(e->mins);
440+
if(e->hour) pfree(e->hour);
441+
if(e->day) pfree(e->day);
442+
if(e->month) pfree(e->month);
443+
if(e->wdays) pfree(e->wdays);
441444

442-
free(e);
445+
pfree(e);
443446
}
444447

445448
cron_ent_t *parse_crontab(char *cron_str)
@@ -453,8 +456,8 @@ cron_ent_t *parse_crontab(char *cron_str)
453456

454457
if(N == 7 && strcmp(cron_str, "@reboot") == 0)
455458
{
456-
CE = (cron_ent_t *)malloc(sizeof(cron_ent_t));
457-
memset((void *)CE, 0, sizeof(cron_ent_t));
459+
CE = (cron_ent_t *)palloc(sizeof(cron_ent_t));
460+
memset((void *)CE, 0, sizeof(cron_ent_t));
458461
CE->onstart = 1;
459462

460463
return CE;
@@ -469,7 +472,7 @@ cron_ent_t *parse_crontab(char *cron_str)
469472
else
470473
{
471474
tmp[ti++] = 0;
472-
entrs[en] = malloc(sizeof(char) * ti);
475+
entrs[en] = palloc(sizeof(char) * ti);
473476
memcpy(entrs[en], tmp, ti);
474477
ti = 0;
475478
en++;
@@ -478,22 +481,21 @@ cron_ent_t *parse_crontab(char *cron_str)
478481
if(ti)
479482
{
480483
tmp[ti++] = 0;
481-
entrs[en] = malloc(sizeof(char) * ti);
484+
entrs[en] = palloc(sizeof(char) * ti);
482485
memcpy(entrs[en], tmp, ti);
483486
}
484487
if(en != 4)
485488
{
486489
_cps_set_error(55, "cron string wrong format");
487490
for(i=0; i < 5; i++)
488491
{
489-
if(entrs[i]) free(entrs[i]);
492+
if(entrs[i]) pfree(entrs[i]);
490493
}
491494
return NULL;
492495
}
493496

494-
CE = (cron_ent_t *)malloc(sizeof(cron_ent_t));
495-
memset((void *)CE, 0, sizeof(cron_ent_t));
496-
497+
CE = (cron_ent_t *)palloc(sizeof(cron_ent_t));
498+
memset((void *)CE, 0, sizeof(cron_ent_t));
497499

498500
entrs[3] = _cps_subst_str(entrs[3], cps_month_subst_data, 12, 1);
499501
entrs[4] = _cps_subst_str(entrs[4], cps_wday_subst_data, 7, 0);
@@ -506,7 +508,7 @@ cron_ent_t *parse_crontab(char *cron_str)
506508

507509
for(i=0; i < 5; i++)
508510
{
509-
if(entrs[i]) free(entrs[i]);
511+
if(entrs[i]) pfree(entrs[i]);
510512
}
511513

512514
if(CE->wdays) return CE;
@@ -537,31 +539,31 @@ char *parse_crontab_to_json_text(char *cron_str)
537539
out = _cps_append_string(out, "\"minutes\": ");
538540
tmp_out = _cps_make_array(cron->mins, cron->mins_len);
539541
out = _cps_append_string(out, tmp_out);
540-
free(tmp_out);
542+
pfree(tmp_out);
541543
out = _cps_append_string(out, ", ");
542544

543545
out = _cps_append_string(out, "\"hours\": ");
544546
tmp_out = _cps_make_array(cron->hour, cron->hour_len);
545547
out = _cps_append_string(out, tmp_out);
546-
free(tmp_out);
548+
pfree(tmp_out);
547549
out = _cps_append_string(out, ", ");
548550

549551
out = _cps_append_string(out, "\"days\": ");
550552
tmp_out = _cps_make_array(cron->day, cron->day_len);
551553
out = _cps_append_string(out, tmp_out);
552-
free(tmp_out);
554+
pfree(tmp_out);
553555
out = _cps_append_string(out, ", ");
554556

555557
out = _cps_append_string(out, "\"months\": ");
556558
tmp_out = _cps_make_array(cron->month, cron->month_len);
557559
out = _cps_append_string(out, tmp_out);
558-
free(tmp_out);
560+
pfree(tmp_out);
559561
out = _cps_append_string(out, ", ");
560562

561563
out = _cps_append_string(out, "\"wdays\": ");
562564
tmp_out = _cps_make_array(cron->wdays, cron->wdays_len);
563565
out = _cps_append_string(out, tmp_out);
564-
free(tmp_out);
566+
pfree(tmp_out);
565567
}
566568
out = _cps_append_string(out, "}");
567569

contrib/pgpro_scheduler/src/memutils.c

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ MemoryContext init_mem_ctx(const char *name)
1212
ALLOCSET_DEFAULT_MAXSIZE);
1313
}
1414

15+
bool is_worker_context_initialized(void)
16+
{
17+
return SchedulerWorkerContext == NULL ? false: true;
18+
}
19+
1520
MemoryContext init_worker_mem_ctx(const char *name)
1621
{
1722
AssertState(SchedulerWorkerContext == NULL);
@@ -29,12 +34,33 @@ MemoryContext switch_to_worker_context(void)
2934
return MemoryContextSwitchTo(SchedulerWorkerContext);
3035
}
3136

37+
void check_scheduler_context(void)
38+
{
39+
if(MemoryContextIsValid(SchedulerWorkerContext))
40+
{
41+
elog(LOG, "scheduler context: ok");
42+
}
43+
else
44+
{
45+
elog(LOG, "scheduler context: broken");
46+
}
47+
}
48+
3249
void *worker_alloc(Size size)
3350
{
3451
AssertState(SchedulerWorkerContext != NULL);
3552
return MemoryContextAlloc(SchedulerWorkerContext, size);
3653
}
3754

55+
void drop_worker_context(void)
56+
{
57+
if(SchedulerWorkerContext)
58+
{
59+
MemoryContextDelete(SchedulerWorkerContext);
60+
SchedulerWorkerContext = NULL;
61+
}
62+
}
63+
3864
void delete_worker_mem_ctx(MemoryContext old)
3965
{
4066
if(!old) old = TopMemoryContext;
@@ -43,3 +69,35 @@ void delete_worker_mem_ctx(MemoryContext old)
4369
MemoryContextDelete(SchedulerWorkerContext);
4470
SchedulerWorkerContext = NULL;
4571
}
72+
73+
char *_mcopy_string(MemoryContext ctx, char *str)
74+
{
75+
int len = strlen(str);
76+
char *cpy;
77+
78+
if(!ctx) ctx = SchedulerWorkerContext;
79+
80+
cpy = MemoryContextAlloc(ctx, sizeof(char) * (len+1));
81+
if(!cpy) return NULL;
82+
83+
memcpy(cpy, str, len);
84+
cpy[len] = 0;
85+
86+
return cpy;
87+
}
88+
89+
char *my_copy_string(char *str)
90+
{
91+
int len = strlen(str);
92+
char *cpy;
93+
94+
cpy = palloc(sizeof(char) * (len+1));
95+
if(!cpy) return NULL;
96+
97+
memcpy(cpy, str, len);
98+
cpy[len] = 0;
99+
100+
return cpy;
101+
}
102+
103+

contrib/pgpro_scheduler/src/memutils.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,4 +12,11 @@ MemoryContext switch_to_worker_context(void);
1212
void *worker_alloc(Size size);
1313
void delete_worker_mem_ctx(MemoryContext toswitch);
1414

15+
char *_mcopy_string(MemoryContext ctx, char *str);
16+
char *my_copy_string(char *str);
17+
void check_scheduler_context(void);
18+
19+
bool is_worker_context_initialized(void);
20+
void drop_worker_context(void);
21+
1522
#endif

0 commit comments

Comments
 (0)