Skip to content

Commit a04bb65

Browse files
committed
Add new function pg_notification_queue_usage.
This tells you what fraction of NOTIFY's queue is currently filled. Brendan Jurd, reviewed by Merlin Moncure and Gurjeet Singh. A few further tweaks by me.
1 parent 43d89a2 commit a04bb65

File tree

10 files changed

+110
-14
lines changed

10 files changed

+110
-14
lines changed

doc/src/sgml/func.sgml

+17-2
Original file line numberDiff line numberDiff line change
@@ -14805,6 +14805,12 @@ SELECT * FROM pg_ls_dir('.') WITH ORDINALITY AS t(ls,n);
1480514805
<entry>channel names that the session is currently listening on</entry>
1480614806
</row>
1480714807

14808+
<row>
14809+
<entry><literal><function>pg_notification_queue_usage()</function></literal></entry>
14810+
<entry><type>double</type></entry>
14811+
<entry>fraction of the asynchronous notification queue currently occupied (0-1)</entry>
14812+
</row>
14813+
1480814814
<row>
1480914815
<entry><literal><function>pg_my_temp_schema()</function></literal></entry>
1481014816
<entry><type>oid</type></entry>
@@ -14945,10 +14951,19 @@ SET search_path TO <replaceable>schema</> <optional>, <replaceable>schema</>, ..
1494514951
<primary>pg_listening_channels</primary>
1494614952
</indexterm>
1494714953

14954+
<indexterm>
14955+
<primary>pg_notification_queue_usage</primary>
14956+
</indexterm>
14957+
1494814958
<para>
1494914959
<function>pg_listening_channels</function> returns a set of names of
14950-
channels that the current session is listening to. See <xref
14951-
linkend="sql-listen"> for more information.
14960+
asynchronous notification channels that the current session is listening
14961+
to. <function>pg_notification_queue_usage</function> returns the
14962+
fraction of the total available space for notifications currently
14963+
occupied by notifications that are waiting to be processed, as a
14964+
<type>double</type> in the range 0-1.
14965+
See <xref linkend="sql-listen"> and <xref linkend="sql-notify">
14966+
for more information.
1495214967
</para>
1495314968

1495414969
<indexterm>

doc/src/sgml/ref/notify.sgml

+5
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,11 @@ NOTIFY <replaceable class="PARAMETER">channel</replaceable> [ , <replaceable cla
165165
cleanup. In this case you should make sure that this session ends its
166166
current transaction so that cleanup can proceed.
167167
</para>
168+
<para>
169+
The function <function>pg_notification_queue_usage</function> returns the
170+
fraction of the queue that is currently occupied by pending notifications.
171+
See <xref linkend="functions-info"> for more information.
172+
</para>
168173
<para>
169174
A transaction that has executed <command>NOTIFY</command> cannot be
170175
prepared for two-phase commit.

src/backend/commands/async.c

+39-11
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ static bool asyncQueueIsFull(void);
371371
static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength);
372372
static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe);
373373
static ListCell *asyncQueueAddEntries(ListCell *nextNotify);
374+
static double asyncQueueUsage(void);
374375
static void asyncQueueFillWarning(void);
375376
static bool SignalBackends(void);
376377
static void asyncQueueReadAllNotifications(void);
@@ -1362,35 +1363,62 @@ asyncQueueAddEntries(ListCell *nextNotify)
13621363
}
13631364

13641365
/*
1365-
* Check whether the queue is at least half full, and emit a warning if so.
1366-
*
1367-
* This is unlikely given the size of the queue, but possible.
1368-
* The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
1366+
* SQL function to return the fraction of the notification queue currently
1367+
* occupied.
1368+
*/
1369+
Datum
1370+
pg_notification_queue_usage(PG_FUNCTION_ARGS)
1371+
{
1372+
double usage;
1373+
1374+
LWLockAcquire(AsyncQueueLock, LW_SHARED);
1375+
usage = asyncQueueUsage();
1376+
LWLockRelease(AsyncQueueLock);
1377+
1378+
PG_RETURN_FLOAT8(usage);
1379+
}
1380+
1381+
/*
1382+
* Return the fraction of the queue that is currently occupied.
13691383
*
1370-
* Caller must hold exclusive AsyncQueueLock.
1384+
* The caller must hold AysncQueueLock in (at least) shared mode.
13711385
*/
1372-
static void
1373-
asyncQueueFillWarning(void)
1386+
static double
1387+
asyncQueueUsage(void)
13741388
{
13751389
int headPage = QUEUE_POS_PAGE(QUEUE_HEAD);
13761390
int tailPage = QUEUE_POS_PAGE(QUEUE_TAIL);
13771391
int occupied;
1378-
double fillDegree;
1379-
TimestampTz t;
13801392

13811393
occupied = headPage - tailPage;
13821394

13831395
if (occupied == 0)
1384-
return; /* fast exit for common case */
1396+
return (double) 0; /* fast exit for common case */
13851397

13861398
if (occupied < 0)
13871399
{
13881400
/* head has wrapped around, tail not yet */
13891401
occupied += QUEUE_MAX_PAGE + 1;
13901402
}
13911403

1392-
fillDegree = (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1404+
return (double) occupied / (double) ((QUEUE_MAX_PAGE + 1) / 2);
1405+
}
1406+
1407+
/*
1408+
* Check whether the queue is at least half full, and emit a warning if so.
1409+
*
1410+
* This is unlikely given the size of the queue, but possible.
1411+
* The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL.
1412+
*
1413+
* Caller must hold exclusive AsyncQueueLock.
1414+
*/
1415+
static void
1416+
asyncQueueFillWarning(void)
1417+
{
1418+
double fillDegree;
1419+
TimestampTz t;
13931420

1421+
fillDegree = asyncQueueUsage();
13941422
if (fillDegree < 0.5)
13951423
return;
13961424

src/include/catalog/catversion.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/* yyyymmddN */
56-
#define CATALOG_VERSION_NO 201507021
56+
#define CATALOG_VERSION_NO 201507171
5757

5858
#endif

src/include/catalog/pg_proc.h

+4
Original file line numberDiff line numberDiff line change
@@ -4046,10 +4046,14 @@ DATA(insert OID = 2856 ( pg_timezone_names PGNSP PGUID 12 1 1000 0 0 f f f f t
40464046
DESCR("get the available time zone names");
40474047
DATA(insert OID = 2730 ( pg_get_triggerdef PGNSP PGUID 12 1 0 0 0 f f f f t f s 2 0 25 "26 16" _null_ _null_ _null_ _null_ _null_ pg_get_triggerdef_ext _null_ _null_ _null_ ));
40484048
DESCR("trigger description with pretty-print option");
4049+
4050+
/* asynchronous notifications */
40494051
DATA(insert OID = 3035 ( pg_listening_channels PGNSP PGUID 12 1 10 0 0 f f f f t t s 0 0 25 "" _null_ _null_ _null_ _null_ _null_ pg_listening_channels _null_ _null_ _null_ ));
40504052
DESCR("get the channels that the current backend listens to");
40514053
DATA(insert OID = 3036 ( pg_notify PGNSP PGUID 12 1 0 0 0 f f f f f f v 2 0 2278 "25 25" _null_ _null_ _null_ _null_ _null_ pg_notify _null_ _null_ _null_ ));
40524054
DESCR("send a notification event");
4055+
DATA(insert OID = 3296 ( pg_notification_queue_usage PGNSP PGUID 12 1 0 0 0 f f f f t f v 0 0 701 "" _null_ _null_ _null_ _null_ _null_ pg_notification_queue_usage _null_ _null_ _null_ ));
4056+
DESCR("get the fraction of the asynchronous notification queue currently in use");
40534057

40544058
/* non-persistent series generator */
40554059
DATA(insert OID = 1066 ( generate_series PGNSP PGUID 12 1 1000 0 0 f f f f t t i 3 0 23 "23 23 23" _null_ _null_ _null_ _null_ _null_ generate_series_step_int4 _null_ _null_ _null_ ));

src/include/commands/async.h

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ extern void Async_UnlistenAll(void);
3737
/* notify-related SQL functions */
3838
extern Datum pg_listening_channels(PG_FUNCTION_ARGS);
3939
extern Datum pg_notify(PG_FUNCTION_ARGS);
40+
extern Datum pg_notification_queue_usage(PG_FUNCTION_ARGS);
4041

4142
/* perform (or cancel) outbound notify processing at transaction commit */
4243
extern void PreCommit_Notify(void);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
Parsed test spec with 2 sessions
2+
3+
starting permutation: listen begin check notify check
4+
step listen: LISTEN a;
5+
step begin: BEGIN;
6+
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
7+
nonzero
8+
9+
f
10+
step notify: SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s;
11+
count
12+
13+
1000
14+
step check: SELECT pg_notification_queue_usage() > 0 AS nonzero;
15+
nonzero
16+
17+
t
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Verify that pg_notification_queue_usage correctly reports a non-zero result,
2+
# after submitting notifications while another connection is listening for
3+
# those notifications and waiting inside an active transaction.
4+
5+
session "listener"
6+
step "listen" { LISTEN a; }
7+
step "begin" { BEGIN; }
8+
teardown { ROLLBACK; }
9+
10+
session "notifier"
11+
step "check" { SELECT pg_notification_queue_usage() > 0 AS nonzero; }
12+
step "notify" { SELECT count(pg_notify('a', s::text)) FROM generate_series(1, 1000) s; }
13+
14+
permutation "listen" "begin" "check" "notify" "check"

src/test/regress/expected/async.out

+8
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,11 @@ NOTIFY notify_async2;
3232
LISTEN notify_async2;
3333
UNLISTEN notify_async2;
3434
UNLISTEN *;
35+
-- Should return zero while there are no pending notifications.
36+
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
37+
SELECT pg_notification_queue_usage();
38+
pg_notification_queue_usage
39+
-----------------------------
40+
0
41+
(1 row)
42+

src/test/regress/sql/async.sql

+4
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,7 @@ NOTIFY notify_async2;
1717
LISTEN notify_async2;
1818
UNLISTEN notify_async2;
1919
UNLISTEN *;
20+
21+
-- Should return zero while there are no pending notifications.
22+
-- src/test/isolation/specs/async-notify.spec tests for actual usage.
23+
SELECT pg_notification_queue_usage();

0 commit comments

Comments
 (0)