Skip to content

Commit 34f66c0

Browse files
committed
Improve test coverage for LISTEN/NOTIFY.
Back-patch commit b10f40b into older branches. This adds reporting of NOTIFY messages to isolationtester.c, and extends the async-notify test to include direct tests of basic NOTIFY functionality. This provides useful infrastructure for testing a bug fix I'm about to back-patch, and there seems no good reason not to have better tests of LISTEN/NOTIFY in the back branches. The commit's survived long enough in HEAD to make it unlikely that it will cause problems. Back-patch as far as 9.6. isolationtester.c changed too much in 9.6 to make it sane to try to fix older branches this way, and I don't really want to back-patch those changes too. Discussion: https://postgr.es/m/31304.1564246011@sss.pgh.pa.us
1 parent 8b46acf commit 34f66c0

File tree

3 files changed

+211
-50
lines changed

3 files changed

+211
-50
lines changed
Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,102 @@
11
Parsed test spec with 2 sessions
22

3-
starting permutation: listen begin check notify check
4-
step listen: LISTEN a;
5-
step begin: BEGIN;
6-
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
3+
starting permutation: listenc notify1 notify2 notify3 notifyf
4+
step listenc: LISTEN c1; LISTEN c2;
5+
step notify1: NOTIFY c1;
6+
notifier: NOTIFY "c1" with payload "" from notifier
7+
step notify2: NOTIFY c2, 'payload';
8+
notifier: NOTIFY "c2" with payload "payload" from notifier
9+
step notify3: NOTIFY c3, 'payload3';
10+
step notifyf: SELECT pg_notify('c2', NULL);
11+
pg_notify
12+
13+
14+
notifier: NOTIFY "c2" with payload "" from notifier
15+
16+
starting permutation: listenc notifyd1 notifyd2 notifys1
17+
step listenc: LISTEN c1; LISTEN c2;
18+
step notifyd1: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload';
19+
notifier: NOTIFY "c2" with payload "payload" from notifier
20+
notifier: NOTIFY "c1" with payload "" from notifier
21+
step notifyd2: NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2';
22+
notifier: NOTIFY "c1" with payload "" from notifier
23+
notifier: NOTIFY "c1" with payload "p1" from notifier
24+
notifier: NOTIFY "c1" with payload "p2" from notifier
25+
step notifys1:
26+
BEGIN;
27+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
28+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
29+
SAVEPOINT s1;
30+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
31+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
32+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
33+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
34+
RELEASE SAVEPOINT s1;
35+
SAVEPOINT s2;
36+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
37+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
38+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
39+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
40+
ROLLBACK TO SAVEPOINT s2;
41+
COMMIT;
42+
43+
notifier: NOTIFY "c1" with payload "payload" from notifier
44+
notifier: NOTIFY "c2" with payload "payload" from notifier
45+
notifier: NOTIFY "c1" with payload "payload" from notifier
46+
notifier: NOTIFY "c2" with payload "payload" from notifier
47+
notifier: NOTIFY "c1" with payload "payloads" from notifier
48+
notifier: NOTIFY "c2" with payload "payloads" from notifier
49+
50+
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
51+
step llisten: LISTEN c1; LISTEN c2;
52+
step notify1: NOTIFY c1;
53+
step notify2: NOTIFY c2, 'payload';
54+
step notify3: NOTIFY c3, 'payload3';
55+
step notifyf: SELECT pg_notify('c2', NULL);
56+
pg_notify
57+
58+
59+
step lcheck: SELECT 1 AS x;
60+
x
61+
62+
1
63+
listener: NOTIFY "c1" with payload "" from notifier
64+
listener: NOTIFY "c2" with payload "payload" from notifier
65+
listener: NOTIFY "c2" with payload "" from notifier
66+
67+
starting permutation: listenc llisten notify1 notify2 notify3 notifyf lcheck
68+
step listenc: LISTEN c1; LISTEN c2;
69+
step llisten: LISTEN c1; LISTEN c2;
70+
step notify1: NOTIFY c1;
71+
notifier: NOTIFY "c1" with payload "" from notifier
72+
step notify2: NOTIFY c2, 'payload';
73+
notifier: NOTIFY "c2" with payload "payload" from notifier
74+
step notify3: NOTIFY c3, 'payload3';
75+
step notifyf: SELECT pg_notify('c2', NULL);
76+
pg_notify
77+
78+
79+
notifier: NOTIFY "c2" with payload "" from notifier
80+
step lcheck: SELECT 1 AS x;
81+
x
82+
83+
1
84+
listener: NOTIFY "c1" with payload "" from notifier
85+
listener: NOTIFY "c2" with payload "payload" from notifier
86+
listener: NOTIFY "c2" with payload "" from notifier
87+
88+
starting permutation: llisten lbegin usage bignotify usage
89+
step llisten: LISTEN c1; LISTEN c2;
90+
step lbegin: BEGIN;
91+
step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
792
nonzero
893

994
f
10-
step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
95+
step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s;
1196
count
1297

1398
1000
14-
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
99+
step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
15100
nonzero
16101

17102
t

src/test/isolation/isolationtester.c

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323

2424
/*
2525
* conns[0] is the global setup, teardown, and watchdog connection. Additional
26-
* connections represent spec-defined sessions.
26+
* connections represent spec-defined sessions. We also track the backend
27+
* PID, in numeric and string formats, for each connection.
2728
*/
2829
static PGconn **conns = NULL;
29-
static const char **backend_pids = NULL;
30+
static int *backend_pids = NULL;
31+
static const char **backend_pid_strs = NULL;
3032
static int nconns = 0;
3133

3234
/* In dry run only output permutations to be run by the tester. */
@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);
4143

4244
#define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */
4345
#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
44-
static bool try_complete_step(Step *step, int flags);
46+
static bool try_complete_step(TestSpec *testspec, Step *step, int flags);
4547

4648
static int step_qsort_cmp(const void *a, const void *b);
4749
static int step_bsearch_cmp(const void *a, const void *b);
@@ -160,8 +162,10 @@ main(int argc, char **argv)
160162
* extra for lock wait detection and global work.
161163
*/
162164
nconns = 1 + testspec->nsessions;
163-
conns = calloc(nconns, sizeof(PGconn *));
164-
backend_pids = calloc(nconns, sizeof(*backend_pids));
165+
conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *));
166+
backend_pids = pg_malloc0(nconns * sizeof(*backend_pids));
167+
backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs));
168+
165169
for (i = 0; i < nconns; i++)
166170
{
167171
conns[i] = PQconnectdb(conninfo);
@@ -187,26 +191,9 @@ main(int argc, char **argv)
187191
blackholeNoticeProcessor,
188192
NULL);
189193

190-
/* Get the backend pid for lock wait checking. */
191-
res = PQexec(conns[i], "SELECT pg_backend_pid()");
192-
if (PQresultStatus(res) == PGRES_TUPLES_OK)
193-
{
194-
if (PQntuples(res) == 1 && PQnfields(res) == 1)
195-
backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0));
196-
else
197-
{
198-
fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column",
199-
PQntuples(res), PQnfields(res));
200-
exit_nicely();
201-
}
202-
}
203-
else
204-
{
205-
fprintf(stderr, "backend pid query failed: %s",
206-
PQerrorMessage(conns[i]));
207-
exit_nicely();
208-
}
209-
PQclear(res);
194+
/* Save each connection's backend PID for subsequent use. */
195+
backend_pids[i] = PQbackendPID(conns[i]);
196+
backend_pid_strs[i] = psprintf("%d", backend_pids[i]);
210197
}
211198

212199
/* Set the session index fields in steps. */
@@ -231,9 +218,9 @@ main(int argc, char **argv)
231218
appendPQExpBufferStr(&wait_query,
232219
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
233220
/* The spec syntax requires at least one session; assume that here. */
234-
appendPQExpBufferStr(&wait_query, backend_pids[1]);
221+
appendPQExpBufferStr(&wait_query, backend_pid_strs[1]);
235222
for (i = 2; i < nconns; i++)
236-
appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]);
223+
appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]);
237224
appendPQExpBufferStr(&wait_query, "}')");
238225

239226
res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL);
@@ -552,7 +539,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
552539
oldstep = waiting[w];
553540

554541
/* Wait for previous step on this connection. */
555-
try_complete_step(oldstep, STEP_RETRY);
542+
try_complete_step(testspec, oldstep, STEP_RETRY);
556543

557544
/* Remove that step from the waiting[] array. */
558545
if (w + 1 < nwaiting)
@@ -574,7 +561,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
574561
nerrorstep = 0;
575562
while (w < nwaiting)
576563
{
577-
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
564+
if (try_complete_step(testspec, waiting[w],
565+
STEP_NONBLOCK | STEP_RETRY))
578566
{
579567
/* Still blocked on a lock, leave it alone. */
580568
w++;
@@ -603,14 +591,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
603591
}
604592

605593
/* Try to complete this step without blocking. */
606-
mustwait = try_complete_step(step, STEP_NONBLOCK);
594+
mustwait = try_complete_step(testspec, step, STEP_NONBLOCK);
607595

608596
/* Check for completion of any steps that were previously waiting. */
609597
w = 0;
610598
nerrorstep = 0;
611599
while (w < nwaiting)
612600
{
613-
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
601+
if (try_complete_step(testspec, waiting[w],
602+
STEP_NONBLOCK | STEP_RETRY))
614603
w++;
615604
else
616605
{
@@ -633,7 +622,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
633622
/* Wait for any remaining queries. */
634623
for (w = 0; w < nwaiting; ++w)
635624
{
636-
try_complete_step(waiting[w], STEP_RETRY);
625+
try_complete_step(testspec, waiting[w], STEP_RETRY);
637626
report_error_message(waiting[w]);
638627
}
639628

@@ -696,7 +685,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
696685
* a lock, returns true. Otherwise, returns false.
697686
*/
698687
static bool
699-
try_complete_step(Step *step, int flags)
688+
try_complete_step(TestSpec *testspec, Step *step, int flags)
700689
{
701690
PGconn *conn = conns[1 + step->session];
702691
fd_set read_set;
@@ -705,6 +694,7 @@ try_complete_step(Step *step, int flags)
705694
int sock = PQsocket(conn);
706695
int ret;
707696
PGresult *res;
697+
PGnotify *notify;
708698
bool canceled = false;
709699

710700
if (sock < 0)
@@ -741,7 +731,7 @@ try_complete_step(Step *step, int flags)
741731
bool waiting;
742732

743733
res = PQexecPrepared(conns[0], PREP_WAITING, 1,
744-
&backend_pids[step->session + 1],
734+
&backend_pid_strs[step->session + 1],
745735
NULL, NULL, 0);
746736
if (PQresultStatus(res) != PGRES_TUPLES_OK ||
747737
PQntuples(res) != 1)
@@ -883,6 +873,36 @@ try_complete_step(Step *step, int flags)
883873
PQclear(res);
884874
}
885875

876+
/* Report any available NOTIFY messages, too */
877+
PQconsumeInput(conn);
878+
while ((notify = PQnotifies(conn)) != NULL)
879+
{
880+
/* Try to identify which session it came from */
881+
const char *sendername = NULL;
882+
char pidstring[32];
883+
int i;
884+
885+
for (i = 0; i < testspec->nsessions; i++)
886+
{
887+
if (notify->be_pid == backend_pids[i + 1])
888+
{
889+
sendername = testspec->sessions[i]->name;
890+
break;
891+
}
892+
}
893+
if (sendername == NULL)
894+
{
895+
/* Doesn't seem to be any test session, so show the hard way */
896+
snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
897+
sendername = pidstring;
898+
}
899+
printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
900+
testspec->sessions[step->session]->name,
901+
notify->relname, notify->extra, sendername);
902+
PQfreemem(notify);
903+
PQconsumeInput(conn);
904+
}
905+
886906
return false;
887907
}
888908

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,70 @@
1-
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
2-
# after submitting notifications while another connection is listening for
3-
# those notifications and waiting inside an active transaction.
1+
# Tests for LISTEN/NOTIFY
42

5-
session "listener"
6-
step "listen" { LISTEN a; }
7-
step "begin" { BEGIN; }
8-
teardown { ROLLBACK; UNLISTEN *; }
3+
# Most of these tests use only the "notifier" session and hence exercise only
4+
# self-notifies, which are convenient because they minimize timing concerns.
5+
# Note we assume that each step is delivered to the backend as a single Query
6+
# message so it will run as one transaction.
97

108
session "notifier"
11-
step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
12-
step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
9+
step "listenc" { LISTEN c1; LISTEN c2; }
10+
step "notify1" { NOTIFY c1; }
11+
step "notify2" { NOTIFY c2, 'payload'; }
12+
step "notify3" { NOTIFY c3, 'payload3'; } # not listening to c3
13+
step "notifyf" { SELECT pg_notify('c2', NULL); }
14+
step "notifyd1" { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; }
15+
step "notifyd2" { NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; }
16+
step "notifys1" {
17+
BEGIN;
18+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
19+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
20+
SAVEPOINT s1;
21+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
22+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
23+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
24+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
25+
RELEASE SAVEPOINT s1;
26+
SAVEPOINT s2;
27+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
28+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
29+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
30+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
31+
ROLLBACK TO SAVEPOINT s2;
32+
COMMIT;
33+
}
34+
step "usage" { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
35+
step "bignotify" { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
36+
teardown { UNLISTEN *; }
37+
38+
# The listener session is used for cross-backend notify checks.
39+
40+
session "listener"
41+
step "llisten" { LISTEN c1; LISTEN c2; }
42+
step "lcheck" { SELECT 1 AS x; }
43+
step "lbegin" { BEGIN; }
44+
teardown { UNLISTEN *; }
45+
46+
47+
# Trivial cases.
48+
permutation "listenc" "notify1" "notify2" "notify3" "notifyf"
49+
50+
# Check simple and less-simple deduplication.
51+
permutation "listenc" "notifyd1" "notifyd2" "notifys1"
52+
53+
# Cross-backend notification delivery. We use a "select 1" to force the
54+
# listener session to check for notifies. In principle we could just wait
55+
# for delivery, but that would require extra support in isolationtester
56+
# and might have portability-of-timing issues.
57+
permutation "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
58+
59+
# Again, with local delivery too.
60+
permutation "listenc" "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
61+
62+
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
63+
# after submitting notifications while another connection is listening for
64+
# those notifications and waiting inside an active transaction. We have to
65+
# fill a page of the notify SLRU to make this happen, which is a good deal
66+
# of traffic. To not bloat the expected output, we intentionally don't
67+
# commit the listener's transaction, so that it never reports these events.
68+
# Hence, this should be the last test in this script.
1369

14-
permutation "listen" "begin" "check" "notify" "check"
70+
permutation "llisten" "lbegin" "usage" "bignotify" "usage"

0 commit comments

Comments
 (0)