68
68
* CommitTransaction() which will then do the actual transaction commit.
69
69
*
70
70
* After commit we are called another time (AtCommit_Notify()). Here we
71
- * make the actual updates to the effective listen state (listenChannels).
72
- *
73
- * Finally, after we are out of the transaction altogether, we check if
74
- * we need to signal listening backends. In SignalBackends() we scan the
75
- * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
76
- * to every listening backend (we don't know which backend is listening on
77
- * which channel so we must signal them all). We can exclude backends that
78
- * are already up to date, though, and we can also exclude backends that
79
- * are in other databases (unless they are way behind and should be kicked
80
- * to make them advance their pointers). We don't bother with a
81
- * self-signal either, but just process the queue directly.
71
+ * make any actual updates to the effective listen state (listenChannels).
72
+ * Then we signal any backends that may be interested in our messages
73
+ * (including our own backend, if listening). This is done by
74
+ * SignalBackends(), which scans the list of listening backends and sends a
75
+ * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
76
+ * know which backend is listening on which channel so we must signal them
77
+ * all). We can exclude backends that are already up to date, though, and
78
+ * we can also exclude backends that are in other databases (unless they
79
+ * are way behind and should be kicked to make them advance their
80
+ * pointers).
81
+ *
82
+ * Finally, after we are out of the transaction altogether and about to go
83
+ * idle, we scan the queue for messages that need to be sent to our
84
+ * frontend (which might be notifies from other backends, or self-notifies
85
+ * from our own). This step is not part of the CommitTransaction sequence
86
+ * for two important reasons. First, we could get errors while sending
87
+ * data to our frontend, and it's really bad for errors to happen in
88
+ * post-commit cleanup. Second, in cases where a procedure issues commits
89
+ * within a single frontend command, we don't want to send notifies to our
90
+ * frontend until the command is done; but notifies to other backends
91
+ * should go out immediately after each commit.
82
92
*
83
93
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
84
94
* sets the process's latch, which triggers the event to be processed
@@ -429,11 +439,8 @@ static bool unlistenExitRegistered = false;
429
439
/* True if we're currently registered as a listener in asyncQueueControl */
430
440
static bool amRegisteredListener = false;
431
441
432
- /* has this backend sent notifications in the current transaction? */
433
- static bool backendHasSentNotifications = false;
434
-
435
442
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
436
- static bool backendTryAdvanceTail = false;
443
+ static bool tryAdvanceTail = false;
437
444
438
445
/* GUC parameter */
439
446
bool Trace_notify = false;
@@ -462,7 +469,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
462
469
char * page_buffer ,
463
470
Snapshot snapshot );
464
471
static void asyncQueueAdvanceTail (void );
465
- static void ProcessIncomingNotify (void );
472
+ static void ProcessIncomingNotify (bool flush );
466
473
static bool AsyncExistsPendingNotify (Notification * n );
467
474
static void AddEventToPendingNotifies (Notification * n );
468
475
static uint32 notification_hash (const void * key , Size keysize );
@@ -954,8 +961,6 @@ PreCommit_Notify(void)
954
961
AccessExclusiveLock );
955
962
956
963
/* Now push the notifications into the queue */
957
- backendHasSentNotifications = true;
958
-
959
964
nextNotify = list_head (pendingNotifies -> events );
960
965
while (nextNotify != NULL )
961
966
{
@@ -980,6 +985,8 @@ PreCommit_Notify(void)
980
985
nextNotify = asyncQueueAddEntries (nextNotify );
981
986
LWLockRelease (NotifyQueueLock );
982
987
}
988
+
989
+ /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
983
990
}
984
991
}
985
992
@@ -989,6 +996,11 @@ PreCommit_Notify(void)
989
996
* This is called at transaction commit, after committing to clog.
990
997
*
991
998
* Update listenChannels and clear transaction-local state.
999
+ *
1000
+ * If we issued any notifications in the transaction, send signals to
1001
+ * listening backends (possibly including ourselves) to process them.
1002
+ * Also, if we filled enough queue pages with new notifies, try to
1003
+ * advance the queue tail pointer.
992
1004
*/
993
1005
void
994
1006
AtCommit_Notify (void )
@@ -1031,6 +1043,29 @@ AtCommit_Notify(void)
1031
1043
if (amRegisteredListener && listenChannels == NIL )
1032
1044
asyncQueueUnregister ();
1033
1045
1046
+ /*
1047
+ * Send signals to listening backends. We need do this only if there are
1048
+ * pending notifies, which were previously added to the shared queue by
1049
+ * PreCommit_Notify().
1050
+ */
1051
+ if (pendingNotifies != NULL )
1052
+ SignalBackends ();
1053
+
1054
+ /*
1055
+ * If it's time to try to advance the global tail pointer, do that.
1056
+ *
1057
+ * (It might seem odd to do this in the sender, when more than likely the
1058
+ * listeners won't yet have read the messages we just sent. However,
1059
+ * there's less contention if only the sender does it, and there is little
1060
+ * need for urgency in advancing the global tail. So this typically will
1061
+ * be clearing out messages that were sent some time ago.)
1062
+ */
1063
+ if (tryAdvanceTail )
1064
+ {
1065
+ tryAdvanceTail = false;
1066
+ asyncQueueAdvanceTail ();
1067
+ }
1068
+
1034
1069
/* And clean up */
1035
1070
ClearPendingActionsAndNotifies ();
1036
1071
}
@@ -1204,82 +1239,17 @@ Exec_UnlistenAllCommit(void)
1204
1239
}
1205
1240
1206
1241
/*
1207
- * ProcessCompletedNotifies --- send out signals and self-notifies
1208
- *
1209
- * This is called from postgres.c just before going idle at the completion
1210
- * of a transaction. If we issued any notifications in the just-completed
1211
- * transaction, send signals to other backends to process them, and also
1212
- * process the queue ourselves to send messages to our own frontend.
1213
- * Also, if we filled enough queue pages with new notifies, try to advance
1214
- * the queue tail pointer.
1215
- *
1216
- * The reason that this is not done in AtCommit_Notify is that there is
1217
- * a nonzero chance of errors here (for example, encoding conversion errors
1218
- * while trying to format messages to our frontend). An error during
1219
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
1220
- * to ensure that a transaction's self-notifies are delivered to the frontend
1221
- * before it gets the terminating ReadyForQuery message.
1222
- *
1223
- * Note that we send signals and process the queue even if the transaction
1224
- * eventually aborted. This is because we need to clean out whatever got
1225
- * added to the queue.
1226
- *
1227
- * NOTE: we are outside of any transaction here.
1242
+ * ProcessCompletedNotifies --- nowadays this does nothing
1243
+ *
1244
+ * This routine used to send signals and handle self-notifies,
1245
+ * but that functionality has been moved elsewhere.
1246
+ * We'd delete it entirely, except that the documentation used to instruct
1247
+ * background-worker authors to call it. To avoid an ABI break in stable
1248
+ * branches, keep it as a no-op routine.
1228
1249
*/
1229
1250
void
1230
1251
ProcessCompletedNotifies (void )
1231
1252
{
1232
- MemoryContext caller_context ;
1233
-
1234
- /* Nothing to do if we didn't send any notifications */
1235
- if (!backendHasSentNotifications )
1236
- return ;
1237
-
1238
- /*
1239
- * We reset the flag immediately; otherwise, if any sort of error occurs
1240
- * below, we'd be locked up in an infinite loop, because control will come
1241
- * right back here after error cleanup.
1242
- */
1243
- backendHasSentNotifications = false;
1244
-
1245
- /*
1246
- * We must preserve the caller's memory context (probably MessageContext)
1247
- * across the transaction we do here.
1248
- */
1249
- caller_context = CurrentMemoryContext ;
1250
-
1251
- if (Trace_notify )
1252
- elog (DEBUG1 , "ProcessCompletedNotifies" );
1253
-
1254
- /*
1255
- * We must run asyncQueueReadAllNotifications inside a transaction, else
1256
- * bad things happen if it gets an error.
1257
- */
1258
- StartTransactionCommand ();
1259
-
1260
- /* Send signals to other backends */
1261
- SignalBackends ();
1262
-
1263
- if (listenChannels != NIL )
1264
- {
1265
- /* Read the queue ourselves, and send relevant stuff to the frontend */
1266
- asyncQueueReadAllNotifications ();
1267
- }
1268
-
1269
- /*
1270
- * If it's time to try to advance the global tail pointer, do that.
1271
- */
1272
- if (backendTryAdvanceTail )
1273
- {
1274
- backendTryAdvanceTail = false;
1275
- asyncQueueAdvanceTail ();
1276
- }
1277
-
1278
- CommitTransactionCommand ();
1279
-
1280
- MemoryContextSwitchTo (caller_context );
1281
-
1282
- /* We don't need pq_flush() here since postgres.c will do one shortly */
1283
1253
}
1284
1254
1285
1255
/*
@@ -1547,7 +1517,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
1547
1517
* pointer (we don't want to actually do that right here).
1548
1518
*/
1549
1519
if (QUEUE_POS_PAGE (queue_head ) % QUEUE_CLEANUP_DELAY == 0 )
1550
- backendTryAdvanceTail = true;
1520
+ tryAdvanceTail = true;
1551
1521
1552
1522
/* And exit the loop */
1553
1523
break ;
@@ -1662,8 +1632,6 @@ asyncQueueFillWarning(void)
1662
1632
/*
1663
1633
* Send signals to listening backends.
1664
1634
*
1665
- * We never signal our own process; that should be handled by our caller.
1666
- *
1667
1635
* Normally we signal only backends in our own database, since only those
1668
1636
* backends could be interested in notifies we send. However, if there's
1669
1637
* notify traffic in our database but no traffic in another database that
@@ -1672,6 +1640,9 @@ asyncQueueFillWarning(void)
1672
1640
* advance their queue position pointers, allowing the global tail to advance.
1673
1641
*
1674
1642
* Since we know the BackendId and the Pid the signaling is quite cheap.
1643
+ *
1644
+ * This is called during CommitTransaction(), so it's important for it
1645
+ * to have very low probability of failure.
1675
1646
*/
1676
1647
static void
1677
1648
SignalBackends (void )
@@ -1686,8 +1657,7 @@ SignalBackends(void)
1686
1657
* list of target PIDs.
1687
1658
*
1688
1659
* XXX in principle these pallocs could fail, which would be bad. Maybe
1689
- * preallocate the arrays? But in practice this is only run in trivial
1690
- * transactions, so there should surely be space available.
1660
+ * preallocate the arrays? They're not that large, though.
1691
1661
*/
1692
1662
pids = (int32 * ) palloc (MaxBackends * sizeof (int32 ));
1693
1663
ids = (BackendId * ) palloc (MaxBackends * sizeof (BackendId ));
@@ -1700,8 +1670,6 @@ SignalBackends(void)
1700
1670
QueuePosition pos ;
1701
1671
1702
1672
Assert (pid != InvalidPid );
1703
- if (pid == MyProcPid )
1704
- continue ; /* never signal self */
1705
1673
pos = QUEUE_BACKEND_POS (i );
1706
1674
if (QUEUE_BACKEND_DBOID (i ) == MyDatabaseId )
1707
1675
{
@@ -1734,6 +1702,16 @@ SignalBackends(void)
1734
1702
{
1735
1703
int32 pid = pids [i ];
1736
1704
1705
+ /*
1706
+ * If we are signaling our own process, no need to involve the kernel;
1707
+ * just set the flag directly.
1708
+ */
1709
+ if (pid == MyProcPid )
1710
+ {
1711
+ notifyInterruptPending = true;
1712
+ continue ;
1713
+ }
1714
+
1737
1715
/*
1738
1716
* Note: assuming things aren't broken, a signal failure here could
1739
1717
* only occur if the target backend exited since we released
@@ -1914,15 +1892,20 @@ HandleNotifyInterrupt(void)
1914
1892
* via the process's latch, and this routine will get called.
1915
1893
* If we are truly idle (ie, *not* inside a transaction block),
1916
1894
* process the incoming notifies.
1895
+ *
1896
+ * If "flush" is true, force any frontend messages out immediately.
1897
+ * This can be false when being called at the end of a frontend command,
1898
+ * since we'll flush after sending ReadyForQuery.
1917
1899
*/
1918
1900
void
1919
- ProcessNotifyInterrupt (void )
1901
+ ProcessNotifyInterrupt (bool flush )
1920
1902
{
1921
1903
if (IsTransactionOrTransactionBlock ())
1922
1904
return ; /* not really idle */
1923
1905
1906
+ /* Loop in case another signal arrives while sending messages */
1924
1907
while (notifyInterruptPending )
1925
- ProcessIncomingNotify ();
1908
+ ProcessIncomingNotify (flush );
1926
1909
}
1927
1910
1928
1911
@@ -2185,6 +2168,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
2185
2168
/*
2186
2169
* Advance the shared queue tail variable to the minimum of all the
2187
2170
* per-backend tail pointers. Truncate pg_notify space if possible.
2171
+ *
2172
+ * This is (usually) called during CommitTransaction(), so it's important for
2173
+ * it to have very low probability of failure.
2188
2174
*/
2189
2175
static void
2190
2176
asyncQueueAdvanceTail (void )
@@ -2258,17 +2244,16 @@ asyncQueueAdvanceTail(void)
2258
2244
/*
2259
2245
* ProcessIncomingNotify
2260
2246
*
2261
- * Deal with arriving NOTIFYs from other backends as soon as it's safe to
2262
- * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
2263
- * signal handler, but isn't anymore .
2247
+ * Scan the queue for arriving notifications and report them to the front
2248
+ * end. The notifications might be from other sessions, or our own;
2249
+ * there's no need to distinguish here .
2264
2250
*
2265
- * Scan the queue for arriving notifications and report them to my front
2266
- * end.
2251
+ * If "flush" is true, force any frontend messages out immediately.
2267
2252
*
2268
2253
* NOTE: since we are outside any transaction, we must create our own.
2269
2254
*/
2270
2255
static void
2271
- ProcessIncomingNotify (void )
2256
+ ProcessIncomingNotify (bool flush )
2272
2257
{
2273
2258
/* We *must* reset the flag */
2274
2259
notifyInterruptPending = false;
@@ -2293,9 +2278,11 @@ ProcessIncomingNotify(void)
2293
2278
CommitTransactionCommand ();
2294
2279
2295
2280
/*
2296
- * Must flush the notify messages to ensure frontend gets them promptly.
2281
+ * If this isn't an end-of-command case, we must flush the notify messages
2282
+ * to ensure frontend gets them promptly.
2297
2283
*/
2298
- pq_flush ();
2284
+ if (flush )
2285
+ pq_flush ();
2299
2286
2300
2287
set_ps_display ("idle" );
2301
2288
@@ -2321,9 +2308,9 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
2321
2308
pq_endmessage (& buf );
2322
2309
2323
2310
/*
2324
- * NOTE: we do not do pq_flush() here. For a self-notify, it will
2325
- * happen at the end of the transaction, and for incoming notifies
2326
- * ProcessIncomingNotify will do it after finding all the notifies .
2311
+ * NOTE: we do not do pq_flush() here. Some level of caller will
2312
+ * handle it later, allowing this message to be combined into a packet
2313
+ * with other ones .
2327
2314
*/
2328
2315
}
2329
2316
else
0 commit comments