15
15
#include "catalog/pg_authid.h"
16
16
#include "utils/syscache.h"
17
17
#include "access/htup_details.h"
18
+ #include "utils/timeout.h"
18
19
19
20
#include "pgstat.h"
20
21
#include "fmgr.h"
@@ -316,6 +317,9 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
316
317
}
317
318
destroy_job (job , 1 );
318
319
320
+ SetSessionAuthorization (BOOTSTRAP_SUPERUSERID , true);
321
+ ResetAllOptions ();
322
+
319
323
return 1 ;
320
324
}
321
325
@@ -607,15 +611,13 @@ resubmit(PG_FUNCTION_ARGS)
607
611
608
612
void at_executor_worker_main (Datum arg )
609
613
{
610
- schd_executor_share_t * shared ;
614
+ schd_executor_share_state_t * shared ;
611
615
dsm_segment * seg ;
612
616
int result ;
613
617
int rc = 0 ;
614
618
schd_executor_status_t status ;
615
619
bool lets_sleep = false;
616
620
/* PGPROC *parent; */
617
- double begin , elapsed ;
618
- struct timeval tv ;
619
621
620
622
CurrentResourceOwner = ResourceOwnerCreate (NULL , "pgpro_scheduler_executor" );
621
623
seg = dsm_attach (DatumGetInt32 (arg ));
@@ -632,6 +634,7 @@ void at_executor_worker_main(Datum arg)
632
634
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
633
635
errmsg ("executor corrupted dynamic shared memory segment" )));
634
636
}
637
+ shared -> start_at = GetCurrentTimestamp ();
635
638
636
639
SetConfigOption ("application_name" , "pgp-s at executor" , PGC_USERSET , PGC_S_SESSION );
637
640
pgstat_report_activity (STATE_RUNNING , "initialize" );
@@ -644,18 +647,14 @@ void at_executor_worker_main(Datum arg)
644
647
645
648
while (1 )
646
649
{
650
+ if (shared -> stop_worker ) break ;
647
651
if (got_sighup )
648
652
{
649
653
got_sighup = false;
650
654
ProcessConfigFile (PGC_SIGHUP );
651
655
}
652
656
CHECK_FOR_INTERRUPTS ();
653
- gettimeofday (& tv , NULL );
654
- begin = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 ;
655
657
result = process_one_job (shared , & status );
656
- gettimeofday (& tv , NULL );
657
- elapsed = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 - begin ;
658
- elog (LOG , "job done %d = %f" , result , elapsed );
659
658
660
659
if (result == 0 )
661
660
{
@@ -680,42 +679,35 @@ void at_executor_worker_main(Datum arg)
680
679
}
681
680
}
682
681
682
+ if (shared -> stop_worker )
683
+ {
684
+ elog (LOG , "at worker stopped by parent signal" );
685
+ }
686
+
683
687
delete_worker_mem_ctx ();
684
688
dsm_detach (seg );
685
689
proc_exit (0 );
686
690
}
687
691
688
- int process_one_job (schd_executor_share_t * shared , schd_executor_status_t * status )
692
+ int process_one_job (schd_executor_share_state_t * shared , schd_executor_status_t * status )
689
693
{
690
694
char * error = NULL ;
691
695
job_t * job ;
692
696
int ret ;
693
697
char buff [512 ];
694
- double begin , elapsed ;
695
- struct timeval tv ;
696
698
697
699
* status = shared -> status = SchdExecutorWork ;
698
- shared -> message [0 ] = 0 ;
699
700
700
701
pgstat_report_activity (STATE_RUNNING , "initialize job" );
701
702
START_SPI_SNAP ();
702
703
703
- gettimeofday (& tv , NULL );
704
- begin = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 ;
705
-
706
704
job = get_next_at_job_with_lock (shared -> nodename , & error );
707
705
708
- gettimeofday (& tv , NULL );
709
- elapsed = ((double )tv .tv_sec )* 1000 + ((double )tv .tv_usec )/1000 - begin ;
710
- elog (LOG , "got jobs = %f" , elapsed );
711
-
712
706
if (!job )
713
707
{
714
708
if (error )
715
709
{
716
710
shared -> status = SchdExecutorIdling ;
717
- snprintf (shared -> message , PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
718
- "Cannot get job: %s" , error );
719
711
elog (LOG , "AT EXECUTOR: ERROR: %s" , error );
720
712
pfree (error );
721
713
ABORT_SPI_SNAP ();
@@ -734,15 +726,11 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
734
726
if (error )
735
727
{
736
728
set_at_job_done (job , error , 0 );
737
- snprintf (shared -> message , PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
738
- "Cannot set session auth: %s" , error );
739
729
pfree (error );
740
730
}
741
731
else
742
732
{
743
733
set_at_job_done (job , "Unknown set session auth error" , 0 );
744
- snprintf (shared -> message , PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
745
- "Cannot set session auth: unknown error" );
746
734
}
747
735
shared -> status = SchdExecutorIdling ;
748
736
STOP_SPI_SNAP ();
@@ -761,6 +749,7 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
761
749
sprintf (buff , "%lld" , job -> timelimit * 1000 );
762
750
#endif
763
751
SetConfigOption ("statement_timeout" , buff , PGC_SUSET , PGC_S_OVERRIDE );
752
+ enable_timeout_after (STATEMENT_TIMEOUT , StatementTimeout );
764
753
}
765
754
766
755
if (job -> sql_params_n > 0 )
@@ -771,6 +760,10 @@ int process_one_job(schd_executor_share_t *shared, schd_executor_status_t *statu
771
760
{
772
761
ret = execute_spi (job -> dosql [0 ], & error );
773
762
}
763
+ if (job -> timelimit )
764
+ {
765
+ disable_timeout (STATEMENT_TIMEOUT , false);
766
+ }
774
767
ResetAllOptions ();
775
768
SetConfigOption ("enable_seqscan" , "off" , PGC_USERSET , PGC_S_SESSION );
776
769
SetSessionAuthorization (BOOTSTRAP_SUPERUSERID , true);
0 commit comments