@@ -680,6 +680,9 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
680
680
scheduler_manager_pool_t * p ;
681
681
scheduler_manager_slot_t * item ;
682
682
int ret ;
683
+ int idx ;
684
+ schd_executor_share_t * sdata ;
685
+ PGPROC * worker ;
683
686
684
687
p = job -> type == CronJob ? & (ctx -> cron ) : & (ctx -> at );
685
688
if (p -> free == 0 )
@@ -691,27 +694,52 @@ int set_job_on_free_slot(scheduler_manager_ctx_t *ctx, job_t *job)
691
694
set_cron_job_started (job ): set_at_job_started (job );
692
695
if (ret )
693
696
{
694
- item = worker_alloc (sizeof (scheduler_manager_slot_t ));
695
- item -> job = worker_alloc (sizeof (job_t ));
696
- memcpy (item -> job , job , sizeof (job_t ));
697
+ idx = p -> len - p -> free ; /* next free slot */
697
698
698
- item -> started = GetCurrentTimestamp ();
699
- item -> wait_worker_to_die = false;
700
- item -> stop_it = job -> timelimit ?
699
+ if (p -> slots [idx ] && p -> slots [idx ]-> is_free )
700
+ {
701
+ item = p -> slots [idx ];
702
+ item -> job = worker_alloc (sizeof (job_t ));
703
+ memcpy (item -> job , job , sizeof (job_t ));
704
+ item -> started = GetCurrentTimestamp ();
705
+ item -> wait_worker_to_die = false;
706
+ item -> stop_it = job -> timelimit ?
701
707
timestamp_add_seconds (0 , job -> timelimit ): 0 ;
708
+ sdata = dsm_segment_address (item -> shared );
702
709
703
- if (launch_executor_worker (ctx , item ) == 0 )
704
- {
705
- pfree (item -> job );
706
- pfree (item );
707
- return 0 ;
710
+ init_executor_shared_data (sdata , ctx , item -> job );
711
+ worker = BackendPidGetProc (item -> pid );
712
+ if (worker )
713
+ {
714
+ item -> is_free = false;
715
+ SetLatch (& worker -> procLatch );
716
+ }
717
+ else
718
+ {
719
+ return 0 ;
720
+ }
708
721
}
722
+ else
723
+ {
724
+ /* need to launch new worker to process job */
725
+ item = worker_alloc (sizeof (scheduler_manager_slot_t ));
726
+ item -> job = worker_alloc (sizeof (job_t ));
727
+ memcpy (item -> job , job , sizeof (job_t ));
728
+
729
+ item -> started = item -> worker_started = GetCurrentTimestamp ();
730
+ item -> wait_worker_to_die = false;
731
+ item -> stop_it = job -> timelimit ?
732
+ timestamp_add_seconds (0 , job -> timelimit ): 0 ;
709
733
710
- /* rrr = rand() % 30;
711
- elog(LOG, " -- set timeout in %d sec", rrr);
712
- item->stop_it = timestamp_add_seconds(0, rrr); */
713
-
714
- p -> slots [p -> len - (p -> free -- )] = item ;
734
+ if (launch_executor_worker (ctx , item ) == 0 )
735
+ {
736
+ pfree (item -> job );
737
+ pfree (item );
738
+ return 0 ;
739
+ }
740
+ p -> slots [idx ] = item ;
741
+ }
742
+ p -> free -- ;
715
743
job -> cron_id = -1 ; /* job copied to slot - no need to be destroyed */
716
744
717
745
return 1 ;
@@ -739,17 +767,7 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
739
767
item -> shared = seg ;
740
768
shm_data = dsm_segment_address (item -> shared );
741
769
742
- shm_data -> status = SchdExecutorInit ;
743
- memcpy (shm_data -> database , ctx -> database , strlen (ctx -> database ));
744
- memcpy (shm_data -> nodename , ctx -> nodename , strlen (ctx -> nodename ));
745
- memcpy (shm_data -> user , item -> job -> executor , NAMEDATALEN );
746
- shm_data -> cron_id = item -> job -> cron_id ;
747
- shm_data -> start_at = item -> job -> start_at ;
748
- shm_data -> type = item -> job -> type ;
749
- shm_data -> message [0 ] = 0 ;
750
- shm_data -> next_time = 0 ;
751
- shm_data -> set_invalid = false;
752
- shm_data -> set_invalid_reason [0 ] = 0 ;
770
+ init_executor_shared_data (shm_data , ctx , item -> job );
753
771
754
772
worker .bgw_flags = BGWORKER_SHMEM_ACCESS |
755
773
BGWORKER_BACKEND_DATABASE_CONNECTION ;
@@ -783,6 +801,21 @@ int launch_executor_worker(scheduler_manager_ctx_t *ctx, scheduler_manager_slot_
783
801
return item -> pid ;
784
802
}
785
803
804
+ void init_executor_shared_data (schd_executor_share_t * data , scheduler_manager_ctx_t * ctx , job_t * job )
805
+ {
806
+ data -> status = SchdExecutorInit ;
807
+ memcpy (data -> database , ctx -> database , strlen (ctx -> database ));
808
+ memcpy (data -> nodename , ctx -> nodename , strlen (ctx -> nodename ));
809
+ memcpy (data -> user , job -> executor , NAMEDATALEN );
810
+ data -> cron_id = job -> cron_id ;
811
+ data -> start_at = job -> start_at ;
812
+ data -> type = job -> type ;
813
+ data -> message [0 ] = 0 ;
814
+ data -> next_time = 0 ;
815
+ data -> set_invalid = false;
816
+ data -> set_invalid_reason [0 ] = 0 ;
817
+ }
818
+
786
819
int scheduler_start_jobs (scheduler_manager_ctx_t * ctx , task_type_t type )
787
820
{
788
821
int interval = 20 ;
@@ -955,12 +988,14 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
955
988
{
956
989
toremove [nremove ].pos = i ;
957
990
toremove [nremove ].reason = RmWaitWorker ;
991
+ toremove [nremove ].vanish_item = true;
958
992
nremove ++ ;
959
993
}
960
994
else if (item -> stop_it && item -> stop_it < GetCurrentTimestamp ())
961
995
{
962
996
toremove [nremove ].pos = i ;
963
997
toremove [nremove ].reason = RmTimeout ;
998
+ toremove [nremove ].vanish_item = true;
964
999
nremove ++ ;
965
1000
}
966
1001
else
@@ -970,12 +1005,14 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
970
1005
{
971
1006
toremove [nremove ].pos = i ;
972
1007
toremove [nremove ].reason = shm_data -> status == SchdExecutorDone ? RmDone : RmError ;
1008
+ toremove [nremove ].vanish_item = shm_data -> worker_exit ;
973
1009
nremove ++ ;
974
1010
}
975
1011
else if (shm_data -> status == SchdExecutorResubmit )
976
1012
{
977
1013
toremove [nremove ].pos = i ;
978
1014
toremove [nremove ].reason = RmDoneResubmit ;
1015
+ toremove [nremove ].vanish_item = shm_data -> worker_exit ;
979
1016
nremove ++ ;
980
1017
}
981
1018
}
@@ -1100,13 +1137,28 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
1100
1137
STOP_SPI_SNAP ();
1101
1138
1102
1139
last = p -> len - p -> free - 1 ;
1103
- destroy_slot_item (item );
1140
+ if (toremove [i ].vanish_item )
1141
+ {
1142
+ destroy_slot_item (item );
1143
+ }
1144
+ else
1145
+ {
1146
+ item -> is_free = true;
1147
+ }
1104
1148
1105
1149
if (toremove [i ].pos != last )
1106
1150
{
1107
1151
_pdebug ("--- move from %d to %d" , last , toremove [i ].pos );
1108
1152
p -> slots [toremove [i ].pos ] = p -> slots [last ];
1109
- p -> slots [last ] = NULL ;
1153
+ if (toremove [i ].vanish_item )
1154
+ {
1155
+ p -> slots [last ] = NULL ;
1156
+ }
1157
+ else
1158
+ {
1159
+ p -> slots [last ] = item ;
1160
+ }
1161
+
1110
1162
for (j = i + 1 ; j < nremove ; j ++ )
1111
1163
{
1112
1164
if (toremove [j ].pos == last )
0 commit comments