Skip to content

Commit ced1d58

Browse files
author
Vladimir Ershov
committed
proper termination
1 parent 1ab15cf commit ced1d58

File tree

5 files changed

+184
-3
lines changed

5 files changed

+184
-3
lines changed

src/scheduler_executor.c

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,10 @@ handle_sigterm(SIGNAL_ARGS)
6565
}
6666

6767
errno = save_errno;
68-
proc_exit(0);
68+
/*
69+
* Do not need to exit at once
70+
* CHECK_FOR_INTERRUPTS will do cleanup and exits
71+
*/
6972
}
7073

7174
int read_worker_job_limit(void)
@@ -133,7 +136,9 @@ void executor_worker_main(Datum arg)
133136
ProcessConfigFile(PGC_SIGHUP);
134137
worker_jobs_limit = read_worker_job_limit();
135138
}
139+
CHECK_FOR_INTERRUPTS();
136140
result = do_one_job(shared, &status);
141+
CHECK_FOR_INTERRUPTS();
137142
if(result > 0)
138143
{
139144
if(++jobs_done >= worker_jobs_limit)
@@ -207,6 +212,7 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
207212

208213
pgstat_report_activity(STATE_RUNNING, "initialize job");
209214
job = initializeExecutorJob(shared);
215+
CHECK_FOR_INTERRUPTS();
210216
if(!job)
211217
{
212218
if(shared->message[0] == 0)
@@ -332,6 +338,7 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
332338

333339
SetConfigOption("schedule.transaction_state", "success", PGC_INTERNAL, PGC_S_SESSION);
334340
}
341+
CHECK_FOR_INTERRUPTS();
335342
if(job->next_time_statement)
336343
{
337344
shared->next_time = get_next_excution_time(job->next_time_statement, &EE);

src/scheduler_manager.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,6 +1076,8 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10761076
TimestampTz next_time;
10771077
char *next_time_str;
10781078
char *error;
1079+
schd_remove_reason_t die_reason = 0;
1080+
BgwHandleStatus status;
10791081

10801082
if(p->free == p->len) return 0;
10811083
busy = p->len - p->free;
@@ -1084,6 +1086,7 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
10841086
for(i = 0; i < busy; i++)
10851087
{
10861088
item = p->slots[i];
1089+
10871090
if(item->wait_worker_to_die)
10881091
{
10891092
toremove[nremove].pos = i;
@@ -1122,6 +1125,27 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
11221125
toremove[nremove].vanish_item = true;
11231126
nremove++;
11241127
}
1128+
else
1129+
{
1130+
die_reason = 0;
1131+
status = GetBackgroundWorkerPid(item->handler, &tmppid);
1132+
if(status == BGWH_STOPPED)
1133+
{
1134+
die_reason = RmExited;
1135+
}
1136+
else if(status == BGWH_POSTMASTER_DIED)
1137+
{
1138+
die_reason = RmDied;
1139+
}
1140+
1141+
if(die_reason)
1142+
{
1143+
toremove[nremove].pos = i;
1144+
toremove[nremove].reason = die_reason;
1145+
toremove[nremove].vanish_item = true;
1146+
nremove++;
1147+
}
1148+
}
11251149
}
11261150
}
11271151
if(nremove)
@@ -1180,6 +1204,10 @@ int scheduler_check_slots(scheduler_manager_ctx_t *ctx, scheduler_manager_pool_t
11801204
set_job_error(ctx->mem_ctx, item->job, "unknown error occured" );
11811205
}
11821206
}
1207+
else if(toremove[i].reason == RmExited || toremove[i].reason == RmDied)
1208+
{
1209+
set_job_error(ctx->mem_ctx, item->job, "Executor died unexpectedly (%d)", toremove[i].reason);
1210+
}
11831211
else if(toremove[i].reason == RmFreeSlot)
11841212
{
11851213
/* Just free slot - worker exited cause it achived max job

src/scheduler_manager.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ typedef enum {
3333
RmWaitWorker,
3434
RmError,
3535
RmDoneResubmit,
36-
RmDone
36+
RmDone,
37+
RmExited,
38+
RmDied
3739
} schd_remove_reason_t;
3840

3941
typedef struct {

test/perl/runtest.pl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@
9191
"DROP ROLE IF EXISTS tester",
9292
"CREATE ROLE tester",
9393
"GRANT INSERT ON test_results TO tester",
94+
"CREATE TABLE task_info (pid integer, name text, vanished timestamp, finished boolean default false)",
95+
"GRANT ALL ON task_info TO tester",
9496
);
9597
map { __do_sql($dbh, $_) } @sql2;
9698
$dbh->disconnect();
@@ -109,7 +111,7 @@
109111
);
110112
my $harness = TAP::Harness->new( \%args );
111113
my @tests = glob( 't/*.t' );
112-
#@tests = ('t/jobMaxRunTime.t');
114+
#@tests = ('t/terminateBackend.t');
113115
$harness->runtests(@tests);
114116

115117

test/perl/t/terminateBackend.t

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
#!/usr/bin/perl
2+
use strict;
3+
no warnings;
4+
use Test::More tests => 15;
5+
use DBI;
6+
use Getopt::Long;
7+
use Data::Dumper;
8+
9+
my $dbh = require 't/_connect.pl';
10+
ok($dbh->err == 0, 'connect') or BAIL_OUT($DBI::errstr);
11+
12+
my $query = "DELETE FROM test_results;";
13+
$dbh->do($query);
14+
ok($dbh->err == 0, 'clean up test_results') or BAIL_OUT($DBI::errstr);
15+
16+
my $query = "DELETE FROM task_info;";
17+
$dbh->do($query);
18+
ok($dbh->err == 0, 'clean up task_info') or BAIL_OUT($DBI::errstr);
19+
20+
one_task_do('sleeper', 'select pg_sleep(200)');
21+
one_task_do('writer',
22+
'DO
23+
$do$
24+
BEGIN
25+
WHILE true LOOP
26+
INSERT INTO test_results (commentary) SELECT md5(random()::text) from generate_series(1, 1000000) s(i);
27+
DELETE FROM test_results;
28+
END LOOP;
29+
END
30+
$do$', qq[, 'max_run_time', '120 seconds'] );
31+
32+
33+
$dbh->disconnect();
34+
done_testing();
35+
36+
sub one_task_do
37+
{
38+
my $name = shift;
39+
my $sql_part = shift;
40+
my $add_to_task = shift;
41+
42+
$add_to_task ||= '';
43+
44+
$query = "SELECT schedule.create_job(
45+
jsonb_build_object(
46+
'name', '$name' ,
47+
'cron', '* * * * *',
48+
'commands', jsonb_build_array(
49+
'insert into task_info values ( pg_backend_pid(), ''$name'')',
50+
'$sql_part',
51+
'update task_info set finished = true where pid = pg_backend_pid()'
52+
),
53+
'max_instances', 1$add_to_task
54+
)
55+
)";
56+
my $sth = $dbh->prepare($query);
57+
ok($sth->execute(), "create $name task") or BAIL_OUT($DBI::errstr);
58+
59+
my $job_id = $sth->fetchrow_array();
60+
$sth->finish();
61+
62+
my $pid = wait_for_task_to_begin($dbh, $name, 120);
63+
64+
ok($pid > 0, 'find '.$name.' task started') or BAIL_OUT("failed to await task '$name' for 120s");
65+
66+
$query = "SELECT pg_terminate_backend(?)";
67+
$sth = $dbh->prepare($query);
68+
$sth->bind_param(1, $pid);
69+
ok($sth->execute(), "terminate $name job") or BAIL_OUT($DBI::errstr);
70+
$sth->finish();
71+
72+
$sth = $dbh->prepare('UPDATE task_info SET vanished = now() where pid = ?');
73+
ok($sth->execute($pid), "set $name task vanished") or BAIL_OUT(print $DBI::errstr);
74+
$sth->finish;
75+
76+
ok(find_job_exited($dbh, $job_id), "find $name exit job") or BAIL_OUT("Cannot find job $job_id exited");
77+
78+
$sth = $dbh->prepare('SELECT schedule.deactivate_job(?)');
79+
ok($sth->execute($job_id), "deactivate $name job") or BAIL_OUT("Cannot deactivate $name job");
80+
81+
}
82+
83+
sub wait_for_task_to_begin
84+
{
85+
my $db = shift;
86+
my $name = shift;
87+
my $how_long = shift;
88+
89+
my $iter = $how_long;
90+
my $sth1 = $db->prepare('SELECT pid from task_info where name = ? and vanished is null and finished = false limit 1');
91+
92+
while($iter-- > 0)
93+
{
94+
if($sth1->execute($name))
95+
{
96+
my $pid = $sth1->fetchrow_array() and $sth1->finish();
97+
return $pid if $pid;
98+
}
99+
else
100+
{
101+
die $DBI::errstr;
102+
}
103+
sleep(1);
104+
}
105+
return 0;
106+
}
107+
108+
sub find_job_exited
109+
{
110+
my $d = shift;
111+
my $id = shift;
112+
113+
my $n = 20;
114+
115+
while($n-- > 0)
116+
{
117+
my $s = $d->prepare('SELECT message from schedule.log where cron = ? and status = false');
118+
if($s->execute($id))
119+
{
120+
my @data = $s->fetchrow_array() and $s->finish();
121+
if(scalar(@data))
122+
{
123+
if($data[0] =~ /^Executor died unexpectedly/)
124+
{
125+
return 1;
126+
}
127+
else
128+
{
129+
return 0;
130+
}
131+
}
132+
}
133+
else
134+
{
135+
die $DBI::errstr;
136+
}
137+
sleep(1);
138+
}
139+
140+
return 0;
141+
}
142+

0 commit comments

Comments
 (0)