@@ -139,6 +139,13 @@ void executor_worker_main(Datum arg)
139
139
}
140
140
else if (result < 0 )
141
141
{
142
+ if (result == -100 )
143
+ {
144
+ snprintf (shared -> message , PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
145
+ "Cannot allocate memory" );
146
+ shared -> worker_exit = true;
147
+ shared -> status = SchdExecutorError ;
148
+ }
142
149
delete_worker_mem_ctx ();
143
150
dsm_detach (seg );
144
151
proc_exit (0 );
@@ -159,9 +166,11 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
159
166
{
160
167
executor_error_t EE ;
161
168
char * error = NULL ;
162
- int i ;
169
+ int i , ret ;
163
170
job_t * job ;
164
171
spi_response_t * r ;
172
+ MemoryContext old , mem ;
173
+ char buffer [1024 ];
165
174
166
175
EE .n = 0 ;
167
176
EE .errors = NULL ;
@@ -174,6 +183,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
174
183
return 0 ;
175
184
}
176
185
186
+ mem = init_mem_ctx ("executor" );
187
+ old = MemoryContextSwitchTo (mem );
188
+
177
189
* status = shared -> status = SchdExecutorWork ;
178
190
shared -> message [0 ] = 0 ;
179
191
@@ -187,6 +199,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
187
199
shared -> worker_exit = true;
188
200
* status = shared -> status = SchdExecutorError ;
189
201
202
+ MemoryContextSwitchTo (old );
203
+ MemoryContextDelete (mem );
204
+
190
205
return -1 ;
191
206
}
192
207
current_job_id = job -> cron_id ;
@@ -207,6 +222,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
207
222
}
208
223
* status = shared -> worker_exit = true;
209
224
shared -> status = SchdExecutorError ;
225
+ MemoryContextSwitchTo (old );
226
+ MemoryContextDelete (mem );
210
227
return -2 ;
211
228
}
212
229
@@ -230,30 +247,35 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
230
247
}
231
248
if (job -> type == AtJob && i == 0 && job -> sql_params_n > 0 )
232
249
{
233
- r = execute_spi_params_prepared (job -> dosql [i ], job -> sql_params_n , job -> sql_params );
250
+ r = execute_spi_params_prepared (mem , job -> dosql [i ], job -> sql_params_n , job -> sql_params );
234
251
}
235
252
else
236
253
{
237
- r = execute_spi (job -> dosql [i ]);
254
+ r = execute_spi (mem , job -> dosql [i ]);
238
255
}
256
+ snprintf (buffer , 1024 , "finalize: %s" , job -> dosql [i ]);
257
+ if (!r ) return -100 ; /* cannot allocate memory */
258
+ pgstat_report_activity (STATE_RUNNING , buffer );
239
259
if (r -> retval < 0 )
240
260
{
241
261
/* success = false; */
242
262
* status = SchdExecutorError ;
243
263
if (r -> error )
244
264
{
245
- push_executor_error (& EE , "error in command #%d: %s" ,
265
+ ret = push_executor_error (& EE , "error in command #%d: %s" ,
246
266
i + 1 , r -> error );
247
267
}
248
268
else
249
269
{
250
- push_executor_error (& EE , "error in command #%d: code: %d" ,
270
+ ret = push_executor_error (& EE , "error in command #%d: code: %d" ,
251
271
i + 1 , r -> retval );
252
272
}
273
+ if (ret < 0 ) return -100 ; /* cannot alloc memory */
253
274
destroy_spi_data (r );
254
275
ABORT_SPI_SNAP ();
255
276
SetConfigOption ("schedule.transaction_state" , "failure" , PGC_INTERNAL , PGC_S_SESSION );
256
- executor_onrollback (job , & EE );
277
+ if (executor_onrollback (mem , job , & EE ) == -14000 )
278
+ return -100 ; /* cannot alloc memory */
257
279
258
280
break ;
259
281
}
@@ -321,6 +343,8 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
321
343
322
344
SetSessionAuthorization (BOOTSTRAP_SUPERUSERID , true);
323
345
ResetAllOptions ();
346
+ MemoryContextSwitchTo (old );
347
+ MemoryContextDelete (mem );
324
348
325
349
return 1 ;
326
350
}
@@ -336,23 +360,24 @@ int set_session_authorization(char *username, char **error)
336
360
int rv ;
337
361
char * sql = "select oid, rolsuper from pg_catalog.pg_roles where rolname = $1" ;
338
362
char buff [1024 ];
363
+ MemoryContext mem = CurrentMemoryContext ;
339
364
340
365
values [0 ] = CStringGetTextDatum (username );
341
366
START_SPI_SNAP ();
342
- r = execute_spi_sql_with_args (sql , 1 , types , values , NULL );
367
+ r = execute_spi_sql_with_args (mem , sql , 1 , types , values , NULL );
343
368
344
369
if (r -> retval < 0 )
345
370
{
346
371
rv = r -> retval ;
347
- * error = _copy_string ( r -> error );
372
+ * error = _mcopy_string ( mem , r -> error );
348
373
destroy_spi_data (r );
349
374
return rv ;
350
375
}
351
376
if (r -> n_rows == 0 )
352
377
{
353
378
STOP_SPI_SNAP ();
354
379
sprintf (buff , "Cannot find user with name: %s" , username );
355
- * error = _copy_string ( buff );
380
+ * error = _mcopy_string ( mem , buff );
356
381
destroy_spi_data (r );
357
382
358
383
return -200 ;
@@ -415,7 +440,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
415
440
416
441
START_SPI_SNAP ();
417
442
pgstat_report_activity (STATE_RUNNING , "culc next time execution time" );
418
- r = execute_spi (sql );
443
+ r = execute_spi (CurrentMemoryContext , sql );
419
444
if (r -> retval < 0 )
420
445
{
421
446
if (r -> error )
@@ -460,7 +485,7 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
460
485
return ts ;
461
486
}
462
487
463
- int executor_onrollback (job_t * job , executor_error_t * ee )
488
+ int executor_onrollback (MemoryContext mem , job_t * job , executor_error_t * ee )
464
489
{
465
490
int rv ;
466
491
spi_response_t * r ;
@@ -469,16 +494,18 @@ int executor_onrollback(job_t *job, executor_error_t *ee)
469
494
pgstat_report_activity (STATE_RUNNING , "execure onrollback" );
470
495
471
496
START_SPI_SNAP ();
472
- r = execute_spi (job -> onrollback );
497
+ r = execute_spi (mem , job -> onrollback );
473
498
if (r -> retval < 0 )
474
499
{
475
500
if (r -> error )
476
501
{
477
- push_executor_error (ee , "onrollback error: %s" , r -> error );
502
+ if (push_executor_error (ee , "onrollback error: %s" , r -> error ) < 0 )
503
+ return -14000 ;
478
504
}
479
505
else
480
506
{
481
- push_executor_error (ee , "onrollback error: unknown: %d" , r -> retval );
507
+ if (push_executor_error (ee , "onrollback error: unknown: %d" , r -> retval ) < 0 )
508
+ return -14000 ;
482
509
}
483
510
ABORT_SPI_SNAP ();
484
511
}
@@ -502,7 +529,7 @@ void set_pg_var(bool result, executor_error_t *ee)
502
529
503
530
vals [0 ] = PointerGetDatum (cstring_to_text (result ? "success" : "failure" ));
504
531
505
- r = execute_spi_sql_with_args (sql , 1 , argtypes , vals , NULL );
532
+ r = execute_spi_sql_with_args (NULL , sql , 1 , argtypes , vals , NULL );
506
533
if (r -> retval < 0 )
507
534
{
508
535
if (r -> error )
@@ -571,6 +598,10 @@ int push_executor_error(executor_error_t *e, char *fmt, ...)
571
598
{
572
599
e -> errors = repalloc (e -> errors , sizeof (char * ) * (e -> n + 1 ));
573
600
}
601
+ if (e -> errors == NULL )
602
+ {
603
+ return -1 ;
604
+ }
574
605
e -> errors [e -> n ] = worker_alloc (sizeof (char )* (len + 1 ));
575
606
memcpy (e -> errors [e -> n ], buf , len + 1 );
576
607
@@ -712,14 +743,16 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
712
743
int set_ret ;
713
744
char buff [512 ];
714
745
spi_response_t * r ;
746
+ MemoryContext old ;
747
+ MemoryContext mem = init_mem_ctx ("at job processor" );
748
+ old = MemoryContextSwitchTo (mem );
715
749
716
750
* status = shared -> status = SchdExecutorWork ;
717
751
718
752
pgstat_report_activity (STATE_RUNNING , "initialize at job" );
719
753
START_SPI_SNAP ();
720
754
721
- /* job = get_next_at_job_with_lock(shared->nodename, &error); */
722
- job = get_at_job_for_process (shared -> nodename , & error );
755
+ job = get_at_job_for_process (mem , shared -> nodename , & error );
723
756
if (!job )
724
757
{
725
758
if (error )
@@ -765,7 +798,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
765
798
return -1 ;
766
799
}
767
800
STOP_SPI_SNAP ();
768
- elog (LOG , "JOB MOVED TO DONE" );
801
+ MemoryContextSwitchTo (old );
802
+ MemoryContextDelete (mem );
769
803
return 1 ;
770
804
}
771
805
@@ -780,11 +814,11 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
780
814
781
815
if (job -> sql_params_n > 0 )
782
816
{
783
- r = execute_spi_params_prepared (job -> dosql [0 ], job -> sql_params_n , job -> sql_params );
817
+ r = execute_spi_params_prepared (mem , job -> dosql [0 ], job -> sql_params_n , job -> sql_params );
784
818
}
785
819
else
786
820
{
787
- r = execute_spi (job -> dosql [0 ]);
821
+ r = execute_spi (mem , job -> dosql [0 ]);
788
822
}
789
823
if (job -> timelimit )
790
824
{
@@ -819,6 +853,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
819
853
if (set_ret > 0 )
820
854
{
821
855
STOP_SPI_SNAP ();
856
+ MemoryContextSwitchTo (old );
857
+ MemoryContextDelete (mem );
822
858
return 1 ;
823
859
}
824
860
if (set_error )
@@ -831,6 +867,8 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
831
867
elog (LOG , "AT_EXECUTOR ERROR: set log: unknown error" );
832
868
}
833
869
ABORT_SPI_SNAP ();
870
+ MemoryContextSwitchTo (old );
871
+ MemoryContextDelete (mem );
834
872
835
873
return -1 ;
836
874
}
@@ -846,7 +884,7 @@ Oid set_session_authorization_by_name(char *rolename, char **error)
846
884
if (!HeapTupleIsValid (roleTup ))
847
885
{
848
886
snprintf (buffer , 512 , "There is no user name: %s" , rolename );
849
- * error = _copy_string ( buffer );
887
+ * error = _mcopy_string ( NULL , buffer );
850
888
return InvalidOid ;
851
889
}
852
890
rform = (Form_pg_authid ) GETSTRUCT (roleTup );
0 commit comments