Skip to content

Commit b10f40b

Browse files
committed
Improve test coverage for LISTEN/NOTIFY.
We had no actual end-to-end test of NOTIFY message delivery. In the core async.sql regression test, testing this is problematic because psql traditionally prints the PID of the sending backend, making the output unstable. We also have an isolation test script, but it likewise failed to prove that delivery worked, because isolationtester.c had no provisions for detecting/reporting NOTIFY messages. Hence, add such provisions to isolationtester.c, and extend async-notify.spec to include direct tests of basic NOTIFY functionality. I also added tests showing that NOTIFY de-duplicates messages normally, but not across subtransaction boundaries. (That's the historical behavior since we introduced subtransactions, though perhaps we ought to change it.) Patch by me, with suggestions/review by Andres Freund. Discussion: https://postgr.es/m/31304.1564246011@sss.pgh.pa.us
1 parent 44460d7 commit b10f40b

File tree

3 files changed

+210
-50
lines changed

3 files changed

+210
-50
lines changed
Lines changed: 91 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,102 @@
11
Parsed test spec with 2 sessions
22

3-
starting permutation: listen begin check notify check
4-
step listen: LISTEN a;
5-
step begin: BEGIN;
6-
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
3+
starting permutation: listenc notify1 notify2 notify3 notifyf
4+
step listenc: LISTEN c1; LISTEN c2;
5+
step notify1: NOTIFY c1;
6+
notifier: NOTIFY "c1" with payload "" from notifier
7+
step notify2: NOTIFY c2, 'payload';
8+
notifier: NOTIFY "c2" with payload "payload" from notifier
9+
step notify3: NOTIFY c3, 'payload3';
10+
step notifyf: SELECT pg_notify('c2', NULL);
11+
pg_notify
12+
13+
14+
notifier: NOTIFY "c2" with payload "" from notifier
15+
16+
starting permutation: listenc notifyd1 notifyd2 notifys1
17+
step listenc: LISTEN c1; LISTEN c2;
18+
step notifyd1: NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload';
19+
notifier: NOTIFY "c2" with payload "payload" from notifier
20+
notifier: NOTIFY "c1" with payload "" from notifier
21+
step notifyd2: NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2';
22+
notifier: NOTIFY "c1" with payload "" from notifier
23+
notifier: NOTIFY "c1" with payload "p1" from notifier
24+
notifier: NOTIFY "c1" with payload "p2" from notifier
25+
step notifys1:
26+
BEGIN;
27+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
28+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
29+
SAVEPOINT s1;
30+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
31+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
32+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
33+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
34+
RELEASE SAVEPOINT s1;
35+
SAVEPOINT s2;
36+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
37+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
38+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
39+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
40+
ROLLBACK TO SAVEPOINT s2;
41+
COMMIT;
42+
43+
notifier: NOTIFY "c1" with payload "payload" from notifier
44+
notifier: NOTIFY "c2" with payload "payload" from notifier
45+
notifier: NOTIFY "c1" with payload "payload" from notifier
46+
notifier: NOTIFY "c2" with payload "payload" from notifier
47+
notifier: NOTIFY "c1" with payload "payloads" from notifier
48+
notifier: NOTIFY "c2" with payload "payloads" from notifier
49+
50+
starting permutation: llisten notify1 notify2 notify3 notifyf lcheck
51+
step llisten: LISTEN c1; LISTEN c2;
52+
step notify1: NOTIFY c1;
53+
step notify2: NOTIFY c2, 'payload';
54+
step notify3: NOTIFY c3, 'payload3';
55+
step notifyf: SELECT pg_notify('c2', NULL);
56+
pg_notify
57+
58+
59+
step lcheck: SELECT 1 AS x;
60+
x
61+
62+
1
63+
listener: NOTIFY "c1" with payload "" from notifier
64+
listener: NOTIFY "c2" with payload "payload" from notifier
65+
listener: NOTIFY "c2" with payload "" from notifier
66+
67+
starting permutation: listenc llisten notify1 notify2 notify3 notifyf lcheck
68+
step listenc: LISTEN c1; LISTEN c2;
69+
step llisten: LISTEN c1; LISTEN c2;
70+
step notify1: NOTIFY c1;
71+
notifier: NOTIFY "c1" with payload "" from notifier
72+
step notify2: NOTIFY c2, 'payload';
73+
notifier: NOTIFY "c2" with payload "payload" from notifier
74+
step notify3: NOTIFY c3, 'payload3';
75+
step notifyf: SELECT pg_notify('c2', NULL);
76+
pg_notify
77+
78+
79+
notifier: NOTIFY "c2" with payload "" from notifier
80+
step lcheck: SELECT 1 AS x;
81+
x
82+
83+
1
84+
listener: NOTIFY "c1" with payload "" from notifier
85+
listener: NOTIFY "c2" with payload "payload" from notifier
86+
listener: NOTIFY "c2" with payload "" from notifier
87+
88+
starting permutation: llisten lbegin usage bignotify usage
89+
step llisten: LISTEN c1; LISTEN c2;
90+
step lbegin: BEGIN;
91+
step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
792
nonzero
893

994
f
10-
step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
95+
step bignotify: SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s;
1196
count
1297

1398
1000
14-
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
99+
step usage: SELECT pg_notification_queue_usage() > 0 AS nonzero;
15100
nonzero
16101

17102
t

src/test/isolation/isolationtester.c

Lines changed: 53 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323

2424
/*
2525
* conns[0] is the global setup, teardown, and watchdog connection. Additional
26-
* connections represent spec-defined sessions.
26+
* connections represent spec-defined sessions. We also track the backend
27+
* PID, in numeric and string formats, for each connection.
2728
*/
2829
static PGconn **conns = NULL;
29-
static const char **backend_pids = NULL;
30+
static int *backend_pids = NULL;
31+
static const char **backend_pid_strs = NULL;
3032
static int nconns = 0;
3133

3234
/* In dry run only output permutations to be run by the tester. */
@@ -41,7 +43,7 @@ static void run_permutation(TestSpec *testspec, int nsteps, Step **steps);
4143

4244
#define STEP_NONBLOCK 0x1 /* return 0 as soon as cmd waits for a lock */
4345
#define STEP_RETRY 0x2 /* this is a retry of a previously-waiting cmd */
44-
static bool try_complete_step(Step *step, int flags);
46+
static bool try_complete_step(TestSpec *testspec, Step *step, int flags);
4547

4648
static int step_qsort_cmp(const void *a, const void *b);
4749
static int step_bsearch_cmp(const void *a, const void *b);
@@ -159,9 +161,11 @@ main(int argc, char **argv)
159161
* extra for lock wait detection and global work.
160162
*/
161163
nconns = 1 + testspec->nsessions;
162-
conns = calloc(nconns, sizeof(PGconn *));
164+
conns = (PGconn **) pg_malloc0(nconns * sizeof(PGconn *));
165+
backend_pids = pg_malloc0(nconns * sizeof(*backend_pids));
166+
backend_pid_strs = pg_malloc0(nconns * sizeof(*backend_pid_strs));
163167
atexit(disconnect_atexit);
164-
backend_pids = calloc(nconns, sizeof(*backend_pids));
168+
165169
for (i = 0; i < nconns; i++)
166170
{
167171
conns[i] = PQconnectdb(conninfo);
@@ -187,26 +191,9 @@ main(int argc, char **argv)
187191
blackholeNoticeProcessor,
188192
NULL);
189193

190-
/* Get the backend pid for lock wait checking. */
191-
res = PQexec(conns[i], "SELECT pg_catalog.pg_backend_pid()");
192-
if (PQresultStatus(res) == PGRES_TUPLES_OK)
193-
{
194-
if (PQntuples(res) == 1 && PQnfields(res) == 1)
195-
backend_pids[i] = pg_strdup(PQgetvalue(res, 0, 0));
196-
else
197-
{
198-
fprintf(stderr, "backend pid query returned %d rows and %d columns, expected 1 row and 1 column",
199-
PQntuples(res), PQnfields(res));
200-
exit(1);
201-
}
202-
}
203-
else
204-
{
205-
fprintf(stderr, "backend pid query failed: %s",
206-
PQerrorMessage(conns[i]));
207-
exit(1);
208-
}
209-
PQclear(res);
194+
/* Save each connection's backend PID for subsequent use. */
195+
backend_pids[i] = PQbackendPID(conns[i]);
196+
backend_pid_strs[i] = psprintf("%d", backend_pids[i]);
210197
}
211198

212199
/* Set the session index fields in steps. */
@@ -231,9 +218,9 @@ main(int argc, char **argv)
231218
appendPQExpBufferStr(&wait_query,
232219
"SELECT pg_catalog.pg_isolation_test_session_is_blocked($1, '{");
233220
/* The spec syntax requires at least one session; assume that here. */
234-
appendPQExpBufferStr(&wait_query, backend_pids[1]);
221+
appendPQExpBufferStr(&wait_query, backend_pid_strs[1]);
235222
for (i = 2; i < nconns; i++)
236-
appendPQExpBuffer(&wait_query, ",%s", backend_pids[i]);
223+
appendPQExpBuffer(&wait_query, ",%s", backend_pid_strs[i]);
237224
appendPQExpBufferStr(&wait_query, "}')");
238225

239226
res = PQprepare(conns[0], PREP_WAITING, wait_query.data, 0, NULL);
@@ -549,7 +536,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
549536
oldstep = waiting[w];
550537

551538
/* Wait for previous step on this connection. */
552-
try_complete_step(oldstep, STEP_RETRY);
539+
try_complete_step(testspec, oldstep, STEP_RETRY);
553540

554541
/* Remove that step from the waiting[] array. */
555542
if (w + 1 < nwaiting)
@@ -571,7 +558,8 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
571558
nerrorstep = 0;
572559
while (w < nwaiting)
573560
{
574-
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
561+
if (try_complete_step(testspec, waiting[w],
562+
STEP_NONBLOCK | STEP_RETRY))
575563
{
576564
/* Still blocked on a lock, leave it alone. */
577565
w++;
@@ -600,14 +588,15 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
600588
}
601589

602590
/* Try to complete this step without blocking. */
603-
mustwait = try_complete_step(step, STEP_NONBLOCK);
591+
mustwait = try_complete_step(testspec, step, STEP_NONBLOCK);
604592

605593
/* Check for completion of any steps that were previously waiting. */
606594
w = 0;
607595
nerrorstep = 0;
608596
while (w < nwaiting)
609597
{
610-
if (try_complete_step(waiting[w], STEP_NONBLOCK | STEP_RETRY))
598+
if (try_complete_step(testspec, waiting[w],
599+
STEP_NONBLOCK | STEP_RETRY))
611600
w++;
612601
else
613602
{
@@ -630,7 +619,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
630619
/* Wait for any remaining queries. */
631620
for (w = 0; w < nwaiting; ++w)
632621
{
633-
try_complete_step(waiting[w], STEP_RETRY);
622+
try_complete_step(testspec, waiting[w], STEP_RETRY);
634623
report_error_message(waiting[w]);
635624
}
636625

@@ -693,7 +682,7 @@ run_permutation(TestSpec *testspec, int nsteps, Step **steps)
693682
* a lock, returns true. Otherwise, returns false.
694683
*/
695684
static bool
696-
try_complete_step(Step *step, int flags)
685+
try_complete_step(TestSpec *testspec, Step *step, int flags)
697686
{
698687
PGconn *conn = conns[1 + step->session];
699688
fd_set read_set;
@@ -702,6 +691,7 @@ try_complete_step(Step *step, int flags)
702691
int sock = PQsocket(conn);
703692
int ret;
704693
PGresult *res;
694+
PGnotify *notify;
705695
bool canceled = false;
706696

707697
if (sock < 0)
@@ -738,7 +728,7 @@ try_complete_step(Step *step, int flags)
738728
bool waiting;
739729

740730
res = PQexecPrepared(conns[0], PREP_WAITING, 1,
741-
&backend_pids[step->session + 1],
731+
&backend_pid_strs[step->session + 1],
742732
NULL, NULL, 0);
743733
if (PQresultStatus(res) != PGRES_TUPLES_OK ||
744734
PQntuples(res) != 1)
@@ -880,6 +870,35 @@ try_complete_step(Step *step, int flags)
880870
PQclear(res);
881871
}
882872

873+
/* Report any available NOTIFY messages, too */
874+
PQconsumeInput(conn);
875+
while ((notify = PQnotifies(conn)) != NULL)
876+
{
877+
/* Try to identify which session it came from */
878+
const char *sendername = NULL;
879+
char pidstring[32];
880+
881+
for (int i = 0; i < testspec->nsessions; i++)
882+
{
883+
if (notify->be_pid == backend_pids[i + 1])
884+
{
885+
sendername = testspec->sessions[i]->name;
886+
break;
887+
}
888+
}
889+
if (sendername == NULL)
890+
{
891+
/* Doesn't seem to be any test session, so show the hard way */
892+
snprintf(pidstring, sizeof(pidstring), "PID %d", notify->be_pid);
893+
sendername = pidstring;
894+
}
895+
printf("%s: NOTIFY \"%s\" with payload \"%s\" from %s\n",
896+
testspec->sessions[step->session]->name,
897+
notify->relname, notify->extra, sendername);
898+
PQfreemem(notify);
899+
PQconsumeInput(conn);
900+
}
901+
883902
return false;
884903
}
885904

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,70 @@
1-
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
2-
# after submitting notifications while another connection is listening for
3-
# those notifications and waiting inside an active transaction.
1+
# Tests for LISTEN/NOTIFY
42

5-
session "listener"
6-
step "listen" { LISTEN a; }
7-
step "begin" { BEGIN; }
8-
teardown { ROLLBACK; UNLISTEN *; }
3+
# Most of these tests use only the "notifier" session and hence exercise only
4+
# self-notifies, which are convenient because they minimize timing concerns.
5+
# Note we assume that each step is delivered to the backend as a single Query
6+
# message so it will run as one transaction.
97

108
session "notifier"
11-
step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
12-
step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
9+
step "listenc" { LISTEN c1; LISTEN c2; }
10+
step "notify1" { NOTIFY c1; }
11+
step "notify2" { NOTIFY c2, 'payload'; }
12+
step "notify3" { NOTIFY c3, 'payload3'; } # not listening to c3
13+
step "notifyf" { SELECT pg_notify('c2', NULL); }
14+
step "notifyd1" { NOTIFY c2, 'payload'; NOTIFY c1; NOTIFY "c2", 'payload'; }
15+
step "notifyd2" { NOTIFY c1; NOTIFY c1; NOTIFY c1, 'p1'; NOTIFY c1, 'p2'; }
16+
step "notifys1" {
17+
BEGIN;
18+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
19+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
20+
SAVEPOINT s1;
21+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
22+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
23+
NOTIFY c1, 'payload'; NOTIFY "c2", 'payload';
24+
NOTIFY c1, 'payloads'; NOTIFY "c2", 'payloads';
25+
RELEASE SAVEPOINT s1;
26+
SAVEPOINT s2;
27+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
28+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
29+
NOTIFY c1, 'rpayload'; NOTIFY "c2", 'rpayload';
30+
NOTIFY c1, 'rpayloads'; NOTIFY "c2", 'rpayloads';
31+
ROLLBACK TO SAVEPOINT s2;
32+
COMMIT;
33+
}
34+
step "usage" { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
35+
step "bignotify" { SELECT count(pg_notify('c1', s::text)) FROM generate_series(1, 1000) s; }
36+
teardown { UNLISTEN *; }
37+
38+
# The listener session is used for cross-backend notify checks.
39+
40+
session "listener"
41+
step "llisten" { LISTEN c1; LISTEN c2; }
42+
step "lcheck" { SELECT 1 AS x; }
43+
step "lbegin" { BEGIN; }
44+
teardown { UNLISTEN *; }
45+
46+
47+
# Trivial cases.
48+
permutation "listenc" "notify1" "notify2" "notify3" "notifyf"
49+
50+
# Check simple and less-simple deduplication.
51+
permutation "listenc" "notifyd1" "notifyd2" "notifys1"
52+
53+
# Cross-backend notification delivery. We use a "select 1" to force the
54+
# listener session to check for notifies. In principle we could just wait
55+
# for delivery, but that would require extra support in isolationtester
56+
# and might have portability-of-timing issues.
57+
permutation "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
58+
59+
# Again, with local delivery too.
60+
permutation "listenc" "llisten" "notify1" "notify2" "notify3" "notifyf" "lcheck"
61+
62+
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
63+
# after submitting notifications while another connection is listening for
64+
# those notifications and waiting inside an active transaction. We have to
65+
# fill a page of the notify SLRU to make this happen, which is a good deal
66+
# of traffic. To not bloat the expected output, we intentionally don't
67+
# commit the listener's transaction, so that it never reports these events.
68+
# Hence, this should be the last test in this script.
1369

14-
permutation "listen" "begin" "check" "notify" "check"
70+
permutation "llisten" "lbegin" "usage" "bignotify" "usage"

0 commit comments

Comments
 (0)