@@ -199,12 +199,19 @@ typedef struct QueuePosition
199
199
(x).page != (y).page ? (y) : \
200
200
(x).offset < (y).offset ? (x) : (y))
201
201
202
+ /* choose logically larger QueuePosition */
203
+ #define QUEUE_POS_MAX (x ,y ) \
204
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
205
+ (x).page != (y).page ? (x) : \
206
+ (x).offset > (y).offset ? (x) : (y))
207
+
202
208
/*
203
209
* Struct describing a listening backend's status
204
210
*/
205
211
typedef struct QueueBackendStatus
206
212
{
207
213
int32 pid ; /* either a PID or InvalidPid */
214
+ Oid dboid ; /* backend's database OID, or InvalidOid */
208
215
QueuePosition pos ; /* backend has read queue up to here */
209
216
} QueueBackendStatus ;
210
217
@@ -223,6 +230,7 @@ typedef struct QueueBackendStatus
223
230
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
224
231
* of other backends and also change the head and tail pointers.
225
232
*
233
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
226
234
* In order to avoid deadlocks, whenever we need both locks, we always first
227
235
* get AsyncQueueLock and then AsyncCtlLock.
228
236
*
@@ -233,8 +241,8 @@ typedef struct QueueBackendStatus
233
241
typedef struct AsyncQueueControl
234
242
{
235
243
QueuePosition head ; /* head points to the next free location */
236
- QueuePosition tail ; /* the global tail is equivalent to the tail
237
- * of the "slowest" backend */
244
+ QueuePosition tail ; /* the global tail is equivalent to the pos of
245
+ * the "slowest" backend */
238
246
TimestampTz lastQueueFillWarn ; /* time of last queue-full msg */
239
247
QueueBackendStatus backend [1 ]; /* actually of length MaxBackends+1 */
240
248
/* DO NOT ADD FURTHER STRUCT MEMBERS HERE */
@@ -245,6 +253,7 @@ static AsyncQueueControl *asyncQueueControl;
245
253
#define QUEUE_HEAD (asyncQueueControl->head)
246
254
#define QUEUE_TAIL (asyncQueueControl->tail)
247
255
#define QUEUE_BACKEND_PID (i ) (asyncQueueControl->backend[i].pid)
256
+ #define QUEUE_BACKEND_DBOID (i ) (asyncQueueControl->backend[i].dboid)
248
257
#define QUEUE_BACKEND_POS (i ) (asyncQueueControl->backend[i].pos)
249
258
250
259
/*
@@ -463,6 +472,7 @@ AsyncShmemInit(void)
463
472
for (i = 0 ; i <= MaxBackends ; i ++ )
464
473
{
465
474
QUEUE_BACKEND_PID (i ) = InvalidPid ;
475
+ QUEUE_BACKEND_DBOID (i ) = InvalidOid ;
466
476
SET_QUEUE_POS (QUEUE_BACKEND_POS (i ), 0 , 0 );
467
477
}
468
478
}
@@ -908,6 +918,10 @@ AtCommit_Notify(void)
908
918
static void
909
919
Exec_ListenPreCommit (void )
910
920
{
921
+ QueuePosition head ;
922
+ QueuePosition max ;
923
+ int i ;
924
+
911
925
/*
912
926
* Nothing to do if we are already listening to something, nor if we
913
927
* already ran this routine in this transaction.
@@ -935,10 +949,34 @@ Exec_ListenPreCommit(void)
935
949
* over already-committed notifications. This ensures we cannot miss any
936
950
* not-yet-committed notifications. We might get a few more but that
937
951
* doesn't hurt.
952
+ *
953
+ * In some scenarios there might be a lot of committed notifications that
954
+ * have not yet been pruned away (because some backend is being lazy about
955
+ * reading them). To reduce our startup time, we can look at other
956
+ * backends and adopt the maximum "pos" pointer of any backend that's in
957
+ * our database; any notifications it's already advanced over are surely
958
+ * committed and need not be re-examined by us. (We must consider only
959
+ * backends connected to our DB, because others will not have bothered to
960
+ * check committed-ness of notifications in our DB.) But we only bother
961
+ * with that if there's more than a page worth of notifications
962
+ * outstanding, otherwise scanning all the other backends isn't worth it.
963
+ *
964
+ * We need exclusive lock here so we can look at other backends' entries.
938
965
*/
939
- LWLockAcquire (AsyncQueueLock , LW_SHARED );
940
- QUEUE_BACKEND_POS (MyBackendId ) = QUEUE_TAIL ;
966
+ LWLockAcquire (AsyncQueueLock , LW_EXCLUSIVE );
967
+ head = QUEUE_HEAD ;
968
+ max = QUEUE_TAIL ;
969
+ if (QUEUE_POS_PAGE (max ) != QUEUE_POS_PAGE (head ))
970
+ {
971
+ for (i = 1 ; i <= MaxBackends ; i ++ )
972
+ {
973
+ if (QUEUE_BACKEND_DBOID (i ) == MyDatabaseId )
974
+ max = QUEUE_POS_MAX (max , QUEUE_BACKEND_POS (i ));
975
+ }
976
+ }
977
+ QUEUE_BACKEND_POS (MyBackendId ) = max ;
941
978
QUEUE_BACKEND_PID (MyBackendId ) = MyProcPid ;
979
+ QUEUE_BACKEND_DBOID (MyBackendId ) = MyDatabaseId ;
942
980
LWLockRelease (AsyncQueueLock );
943
981
944
982
/* Now we are listed in the global array, so remember we're listening */
@@ -954,7 +992,8 @@ Exec_ListenPreCommit(void)
954
992
*
955
993
* This will also advance the global tail pointer if possible.
956
994
*/
957
- asyncQueueReadAllNotifications ();
995
+ if (!QUEUE_POS_EQUAL (max , head ))
996
+ asyncQueueReadAllNotifications ();
958
997
}
959
998
960
999
/*
@@ -1157,6 +1196,7 @@ asyncQueueUnregister(void)
1157
1196
QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ), QUEUE_TAIL );
1158
1197
/* ... then mark it invalid */
1159
1198
QUEUE_BACKEND_PID (MyBackendId ) = InvalidPid ;
1199
+ QUEUE_BACKEND_DBOID (MyBackendId ) = InvalidOid ;
1160
1200
LWLockRelease (AsyncQueueLock );
1161
1201
1162
1202
/* mark ourselves as no longer listed in the global array */
0 commit comments