@@ -202,12 +202,19 @@ typedef struct QueuePosition
202
202
(x).page != (y).page ? (y) : \
203
203
(x).offset < (y).offset ? (x) : (y))
204
204
205
+ /* choose logically larger QueuePosition */
206
+ #define QUEUE_POS_MAX (x ,y ) \
207
+ (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \
208
+ (x).page != (y).page ? (x) : \
209
+ (x).offset > (y).offset ? (x) : (y))
210
+
205
211
/*
206
212
* Struct describing a listening backend's status
207
213
*/
208
214
typedef struct QueueBackendStatus
209
215
{
210
216
int32 pid ; /* either a PID or InvalidPid */
217
+ Oid dboid ; /* backend's database OID, or InvalidOid */
211
218
QueuePosition pos ; /* backend has read queue up to here */
212
219
} QueueBackendStatus ;
213
220
@@ -224,6 +231,7 @@ typedef struct QueueBackendStatus
224
231
* When holding the lock in EXCLUSIVE mode, backends can inspect the entries
225
232
* of other backends and also change the head and tail pointers.
226
233
*
234
+ * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers.
227
235
* In order to avoid deadlocks, whenever we need both locks, we always first
228
236
* get AsyncQueueLock and then AsyncCtlLock.
229
237
*
@@ -234,8 +242,8 @@ typedef struct QueueBackendStatus
234
242
typedef struct AsyncQueueControl
235
243
{
236
244
QueuePosition head ; /* head points to the next free location */
237
- QueuePosition tail ; /* the global tail is equivalent to the tail
238
- * of the "slowest" backend */
245
+ QueuePosition tail ; /* the global tail is equivalent to the pos of
246
+ * the "slowest" backend */
239
247
TimestampTz lastQueueFillWarn ; /* time of last queue-full msg */
240
248
QueueBackendStatus backend [FLEXIBLE_ARRAY_MEMBER ];
241
249
/* backend[0] is not used; used entries are from [1] to [MaxBackends] */
@@ -246,6 +254,7 @@ static AsyncQueueControl *asyncQueueControl;
246
254
#define QUEUE_HEAD (asyncQueueControl->head)
247
255
#define QUEUE_TAIL (asyncQueueControl->tail)
248
256
#define QUEUE_BACKEND_PID (i ) (asyncQueueControl->backend[i].pid)
257
+ #define QUEUE_BACKEND_DBOID (i ) (asyncQueueControl->backend[i].dboid)
249
258
#define QUEUE_BACKEND_POS (i ) (asyncQueueControl->backend[i].pos)
250
259
251
260
/*
@@ -459,6 +468,7 @@ AsyncShmemInit(void)
459
468
for (i = 0 ; i <= MaxBackends ; i ++ )
460
469
{
461
470
QUEUE_BACKEND_PID (i ) = InvalidPid ;
471
+ QUEUE_BACKEND_DBOID (i ) = InvalidOid ;
462
472
SET_QUEUE_POS (QUEUE_BACKEND_POS (i ), 0 , 0 );
463
473
}
464
474
}
@@ -905,6 +915,10 @@ AtCommit_Notify(void)
905
915
static void
906
916
Exec_ListenPreCommit (void )
907
917
{
918
+ QueuePosition head ;
919
+ QueuePosition max ;
920
+ int i ;
921
+
908
922
/*
909
923
* Nothing to do if we are already listening to something, nor if we
910
924
* already ran this routine in this transaction.
@@ -932,10 +946,34 @@ Exec_ListenPreCommit(void)
932
946
* over already-committed notifications. This ensures we cannot miss any
933
947
* not-yet-committed notifications. We might get a few more but that
934
948
* doesn't hurt.
949
+ *
950
+ * In some scenarios there might be a lot of committed notifications that
951
+ * have not yet been pruned away (because some backend is being lazy about
952
+ * reading them). To reduce our startup time, we can look at other
953
+ * backends and adopt the maximum "pos" pointer of any backend that's in
954
+ * our database; any notifications it's already advanced over are surely
955
+ * committed and need not be re-examined by us. (We must consider only
956
+ * backends connected to our DB, because others will not have bothered to
957
+ * check committed-ness of notifications in our DB.) But we only bother
958
+ * with that if there's more than a page worth of notifications
959
+ * outstanding, otherwise scanning all the other backends isn't worth it.
960
+ *
961
+ * We need exclusive lock here so we can look at other backends' entries.
935
962
*/
936
- LWLockAcquire (AsyncQueueLock , LW_SHARED );
937
- QUEUE_BACKEND_POS (MyBackendId ) = QUEUE_TAIL ;
963
+ LWLockAcquire (AsyncQueueLock , LW_EXCLUSIVE );
964
+ head = QUEUE_HEAD ;
965
+ max = QUEUE_TAIL ;
966
+ if (QUEUE_POS_PAGE (max ) != QUEUE_POS_PAGE (head ))
967
+ {
968
+ for (i = 1 ; i <= MaxBackends ; i ++ )
969
+ {
970
+ if (QUEUE_BACKEND_DBOID (i ) == MyDatabaseId )
971
+ max = QUEUE_POS_MAX (max , QUEUE_BACKEND_POS (i ));
972
+ }
973
+ }
974
+ QUEUE_BACKEND_POS (MyBackendId ) = max ;
938
975
QUEUE_BACKEND_PID (MyBackendId ) = MyProcPid ;
976
+ QUEUE_BACKEND_DBOID (MyBackendId ) = MyDatabaseId ;
939
977
LWLockRelease (AsyncQueueLock );
940
978
941
979
/* Now we are listed in the global array, so remember we're listening */
@@ -951,7 +989,8 @@ Exec_ListenPreCommit(void)
951
989
*
952
990
* This will also advance the global tail pointer if possible.
953
991
*/
954
- asyncQueueReadAllNotifications ();
992
+ if (!QUEUE_POS_EQUAL (max , head ))
993
+ asyncQueueReadAllNotifications ();
955
994
}
956
995
957
996
/*
@@ -1154,6 +1193,7 @@ asyncQueueUnregister(void)
1154
1193
QUEUE_POS_EQUAL (QUEUE_BACKEND_POS (MyBackendId ), QUEUE_TAIL );
1155
1194
/* ... then mark it invalid */
1156
1195
QUEUE_BACKEND_PID (MyBackendId ) = InvalidPid ;
1196
+ QUEUE_BACKEND_DBOID (MyBackendId ) = InvalidOid ;
1157
1197
LWLockRelease (AsyncQueueLock );
1158
1198
1159
1199
/* mark ourselves as no longer listed in the global array */
0 commit comments