@@ -56,6 +56,16 @@ handle_sigterm(SIGNAL_ARGS)
56
56
errno = save_errno ;
57
57
}
58
58
59
+ int read_worker_job_limit (void )
60
+ {
61
+ const char * opt ;
62
+ int var ;
63
+
64
+ opt = GetConfigOption ("schedule.worker_job_limit" , false, false);
65
+ if (opt == NULL ) return 1 ;
66
+ var = atoi (opt );
67
+ return var ;
68
+ }
59
69
60
70
void executor_worker_main (Datum arg )
61
71
{
@@ -64,6 +74,9 @@ void executor_worker_main(Datum arg)
64
74
int result ;
65
75
int64 jobs_done = 0 ;
66
76
int64 worker_jobs_limit = 1 ;
77
+ int rc = 0 ;
78
+ schd_executor_status_t status ;
79
+ PGPROC * parent ;
67
80
68
81
CurrentResourceOwner = ResourceOwnerCreate (NULL , "pgpro_scheduler_executor" );
69
82
seg = dsm_attach (DatumGetInt32 (arg ));
@@ -72,6 +85,7 @@ void executor_worker_main(Datum arg)
72
85
(errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
73
86
errmsg ("executor unable to map dynamic shared memory segment" )));
74
87
shared = dsm_segment_address (seg );
88
+ parent = BackendPidGetProc (MyBgworkerEntry -> bgw_notify_pid );
75
89
76
90
if (shared -> status != SchdExecutorInit )
77
91
{
@@ -84,38 +98,77 @@ void executor_worker_main(Datum arg)
84
98
pgstat_report_activity (STATE_RUNNING , "initialize" );
85
99
init_worker_mem_ctx ("ExecutorMemoryContext" );
86
100
BackgroundWorkerInitializeConnection (shared -> database , NULL );
87
- /* TODO check latch, wait signals, die */
101
+ worker_jobs_limit = read_worker_job_limit ();
102
+
103
+ pqsignal (SIGTERM , handle_sigterm );
104
+ pqsignal (SIGHUP , worker_spi_sighup );
105
+ BackgroundWorkerUnblockSignals ();
106
+
88
107
while (1 )
89
108
{
90
- result = do_one_job (shared );
91
- if (result < 0 )
109
+ /* we need it if idle worker recieve SIGHUP an realize that it done
110
+ too mach */
111
+ status = SchdExecutorLimitReached ;
112
+
113
+ if (got_sighup )
114
+ {
115
+ got_sighup = false;
116
+ ProcessConfigFile (PGC_SIGHUP );
117
+ worker_jobs_limit = read_worker_job_limit ();
118
+ }
119
+ result = do_one_job (shared , & status );
120
+ if (result > 0 )
121
+ {
122
+ if (++ jobs_done >= worker_jobs_limit )
123
+ {
124
+ shared -> worker_exit = true;
125
+ shared -> status = status ;
126
+ break ;
127
+ }
128
+ else
129
+ {
130
+ shared -> status = status ;
131
+ }
132
+ SetLatch (& parent -> procLatch );
133
+ }
134
+ else if (result < 0 )
92
135
{
93
136
delete_worker_mem_ctx ();
94
137
dsm_detach (seg );
95
138
proc_exit (0 );
96
139
}
97
- if (++ jobs_done >= worker_jobs_limit ) break ;
98
- }
99
140
100
- shared -> worker_exit = true;
141
+ pgstat_report_activity (STATE_IDLE , "waiting for a job" );
142
+ rc = WaitLatch (MyLatch , WL_LATCH_SET | WL_POSTMASTER_DEATH , 0L );
143
+ ResetLatch (MyLatch );
144
+ if (rc && rc & WL_POSTMASTER_DEATH ) break ;
145
+ }
101
146
102
147
delete_worker_mem_ctx ();
103
148
dsm_detach (seg );
104
149
proc_exit (0 );
105
150
}
106
151
107
- int do_one_job (schd_executor_share_t * shared )
152
+ int do_one_job (schd_executor_share_t * shared , schd_executor_status_t * status )
108
153
{
109
154
executor_error_t EE ;
110
155
char * error = NULL ;
111
- schd_executor_status_t status ;
112
156
int i ;
113
157
job_t * job ;
114
158
int ret ;
115
159
116
160
EE .n = 0 ;
117
161
EE .errors = NULL ;
118
- status = shared -> status = SchdExecutorWork ;
162
+ if (shared -> new_job )
163
+ {
164
+ shared -> new_job = false;
165
+ }
166
+ else
167
+ {
168
+ return 0 ;
169
+ }
170
+
171
+ * status = shared -> status = SchdExecutorWork ;
119
172
shared -> message [0 ] = 0 ;
120
173
121
174
pgstat_report_activity (STATE_RUNNING , "initialize job" );
@@ -125,8 +178,8 @@ int do_one_job(schd_executor_share_t *shared)
125
178
if (shared -> message [0 ] == 0 )
126
179
snprintf (shared -> message , PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
127
180
"Cannot retrive job information" );
128
- shared -> status = SchdExecutorError ;
129
181
shared -> worker_exit = true;
182
+ * status = shared -> status = SchdExecutorError ;
130
183
131
184
return -1 ;
132
185
}
@@ -146,14 +199,11 @@ int do_one_job(schd_executor_share_t *shared)
146
199
snprintf (shared -> message , PGPRO_SCHEDULER_EXECUTOR_MESSAGE_MAX ,
147
200
"Cannot set session auth: unknown error" );
148
201
}
202
+ * status = shared -> worker_exit = true;
149
203
shared -> status = SchdExecutorError ;
150
- shared -> worker_exit = true;
151
204
return -2 ;
152
205
}
153
206
154
- pqsignal (SIGTERM , handle_sigterm );
155
- BackgroundWorkerUnblockSignals ();
156
-
157
207
pgstat_report_activity (STATE_RUNNING , "process job" );
158
208
CHECK_FOR_INTERRUPTS ();
159
209
/* rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, 0);
@@ -183,7 +233,7 @@ int do_one_job(schd_executor_share_t *shared)
183
233
if (ret < 0 )
184
234
{
185
235
/* success = false; */
186
- status = SchdExecutorError ;
236
+ * status = SchdExecutorError ;
187
237
if (error )
188
238
{
189
239
push_executor_error (& EE , "error in command #%d: %s" ,
@@ -209,7 +259,7 @@ int do_one_job(schd_executor_share_t *shared)
209
259
}
210
260
}
211
261
}
212
- if (status != SchdExecutorError )
262
+ if (* status != SchdExecutorError )
213
263
{
214
264
if (job -> same_transaction )
215
265
{
@@ -219,8 +269,8 @@ int do_one_job(schd_executor_share_t *shared)
219
269
{
220
270
if (job -> attempt >= job -> resubmit_limit )
221
271
{
222
- status = SchdExecutorError ;
223
- #ifdef HAVE_INT64
272
+ * status = SchdExecutorError ;
273
+ #ifdef HAVE_LONG_INT_64
224
274
push_executor_error (& EE , "Cannot resubmit: limit reached (%ld)" , job -> resubmit_limit );
225
275
#else
226
276
push_executor_error (& EE , "Cannot resubmit: limit reached (%lld)" , job -> resubmit_limit );
@@ -229,12 +279,12 @@ int do_one_job(schd_executor_share_t *shared)
229
279
}
230
280
else
231
281
{
232
- status = SchdExecutorResubmit ;
282
+ * status = SchdExecutorResubmit ;
233
283
}
234
284
}
235
285
else
236
286
{
237
- status = SchdExecutorDone ;
287
+ * status = SchdExecutorDone ;
238
288
}
239
289
240
290
SetConfigOption ("schedule.transaction_state" , "success" , PGC_INTERNAL , PGC_S_SESSION );
@@ -255,8 +305,7 @@ int do_one_job(schd_executor_share_t *shared)
255
305
{
256
306
set_shared_message (shared , & EE );
257
307
}
258
- shared -> status = status ;
259
- if (status == SchdExecutorResubmit )
308
+ if (* status == SchdExecutorResubmit )
260
309
{
261
310
shared -> next_time = timestamp_add_seconds (0 , resubmit_current_job );
262
311
resubmit_current_job = 0 ;
0 commit comments