Skip to content

Commit aad86c5

Browse files
committed
Improve LISTEN startup time when there are many unread notifications.
If some existing listener is far behind, incoming new listener sessions would start from that session's read pointer and then need to advance over many already-committed notification messages, which they have no interest in. This was expensive in itself and also thrashed the pg_notify SLRU buffers a lot more than necessary. We can improve matters considerably in typical scenarios, without much added cost, by starting from the furthest-ahead read pointer, not the furthest-behind one. We do have to consider only sessions in our own database when doing this, which requires an extra field in the data structure, but that's a pretty small cost. Back-patch to 9.0 where the current LISTEN/NOTIFY logic was introduced. Matt Newell, slightly adjusted by me
1 parent f60b2e2 commit aad86c5

File tree

1 file changed

+45
-5
lines changed

1 file changed

+45
-5
lines changed

src/backend/commands/async.c

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -199,12 +199,19 @@ typedef struct QueuePosition
199199
(x).page != (y).page ? (y) : \
200200
(x).offset < (y).offset ? (x) : (y))
201201

202+
/* choose logically larger QueuePosition */
203+
#define QUEUE_POS_MAX(x,y) \
204+
(asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
205+
(x).page != (y).page ? (x) : \
206+
(x).offset > (y).offset ? (x) : (y))
207+
202208
/*
203209
* Struct describing a listening backend's status
204210
*/
205211
typedef struct QueueBackendStatus
206212
{
207213
int32 pid; /* either a PID or InvalidPid */
214+
Oid dboid; /* backend's database OID, or InvalidOid */
208215
QueuePosition pos; /* backend has read queue up to here */
209216
} QueueBackendStatus;
210217

@@ -223,6 +230,7 @@ typedef struct QueueBackendStatus
223230
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
224231
* of other backends and also change the head and tail pointers.
225232
*
233+
* AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
226234
* In order to avoid deadlocks, whenever we need both locks, we always first
227235
* get AsyncQueueLock and then AsyncCtlLock.
228236
*
@@ -233,8 +241,8 @@ typedef struct QueueBackendStatus
233241
typedef struct AsyncQueueControl
234242
{
235243
QueuePosition head; /* head points to the next free location */
236-
QueuePosition tail; /* the global tail is equivalent to the tail
237-
* of the "slowest" backend */
244+
QueuePosition tail; /* the global tail is equivalent to the pos of
245+
* the "slowest" backend */
238246
TimestampTz lastQueueFillWarn; /* time of last queue-full msg */
239247
QueueBackendStatus backend[1]; /* actually of length MaxBackends+1 */
240248
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
@@ -245,6 +253,7 @@ static AsyncQueueControl *asyncQueueControl;
245253
#define QUEUE_HEAD (asyncQueueControl->head)
246254
#define QUEUE_TAIL (asyncQueueControl->tail)
247255
#define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid)
256+
#define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid)
248257
#define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos)
249258

250259
/*
@@ -463,6 +472,7 @@ AsyncShmemInit(void)
463472
for (i = 0; i <= MaxBackends; i++)
464473
{
465474
QUEUE_BACKEND_PID(i) = InvalidPid;
475+
QUEUE_BACKEND_DBOID(i) = InvalidOid;
466476
SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0);
467477
}
468478
}
@@ -908,6 +918,10 @@ AtCommit_Notify(void)
908918
static void
909919
Exec_ListenPreCommit(void)
910920
{
921+
QueuePosition head;
922+
QueuePosition max;
923+
int i;
924+
911925
/*
912926
* Nothing to do if we are already listening to something, nor if we
913927
* already ran this routine in this transaction.
@@ -935,10 +949,34 @@ Exec_ListenPreCommit(void)
935949
* over already-committed notifications. This ensures we cannot miss any
936950
* not-yet-committed notifications. We might get a few more but that
937951
* doesn't hurt.
952+
*
953+
* In some scenarios there might be a lot of committed notifications that
954+
* have not yet been pruned away (because some backend is being lazy about
955+
* reading them). To reduce our startup time, we can look at other
956+
* backends and adopt the maximum "pos" pointer of any backend that's in
957+
* our database; any notifications it's already advanced over are surely
958+
* committed and need not be re-examined by us. (We must consider only
959+
* backends connected to our DB, because others will not have bothered to
960+
* check committed-ness of notifications in our DB.) But we only bother
961+
* with that if there's more than a page worth of notifications
962+
* outstanding, otherwise scanning all the other backends isn't worth it.
963+
*
964+
* We need exclusive lock here so we can look at other backends' entries.
938965
*/
939-
LWLockAcquire(AsyncQueueLock, LW_SHARED);
940-
QUEUE_BACKEND_POS(MyBackendId) = QUEUE_TAIL;
966+
LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE);
967+
head = QUEUE_HEAD;
968+
max = QUEUE_TAIL;
969+
if (QUEUE_POS_PAGE(max) != QUEUE_POS_PAGE(head))
970+
{
971+
for (i = 1; i <= MaxBackends; i++)
972+
{
973+
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
974+
max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i));
975+
}
976+
}
977+
QUEUE_BACKEND_POS(MyBackendId) = max;
941978
QUEUE_BACKEND_PID(MyBackendId) = MyProcPid;
979+
QUEUE_BACKEND_DBOID(MyBackendId) = MyDatabaseId;
942980
LWLockRelease(AsyncQueueLock);
943981

944982
/* Now we are listed in the global array, so remember we're listening */
@@ -954,7 +992,8 @@ Exec_ListenPreCommit(void)
954992
*
955993
* This will also advance the global tail pointer if possible.
956994
*/
957-
asyncQueueReadAllNotifications();
995+
if (!QUEUE_POS_EQUAL(max, head))
996+
asyncQueueReadAllNotifications();
958997
}
959998

960999
/*
@@ -1157,6 +1196,7 @@ asyncQueueUnregister(void)
11571196
QUEUE_POS_EQUAL(QUEUE_BACKEND_POS(MyBackendId), QUEUE_TAIL);
11581197
/* ... then mark it invalid */
11591198
QUEUE_BACKEND_PID(MyBackendId) = InvalidPid;
1199+
QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid;
11601200
LWLockRelease(AsyncQueueLock);
11611201

11621202
/* mark ourselves as no longer listed in the global array */

0 commit comments

Comments
 (0)