68
68
* CommitTransaction() which will then do the actual transaction commit.
69
69
*
70
70
* After commit we are called another time (AtCommit_Notify()). Here we
71
- * make the actual updates to the effective listen state (listenChannels).
72
- *
73
- * Finally, after we are out of the transaction altogether, we check if
74
- * we need to signal listening backends. In SignalBackends() we scan the
75
- * list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
76
- * to every listening backend (we don't know which backend is listening on
77
- * which channel so we must signal them all). We can exclude backends that
78
- * are already up to date, though, and we can also exclude backends that
79
- * are in other databases (unless they are way behind and should be kicked
80
- * to make them advance their pointers). We don't bother with a
81
- * self-signal either, but just process the queue directly.
71
+ * make any actual updates to the effective listen state (listenChannels).
72
+ * Then we signal any backends that may be interested in our messages
73
+ * (including our own backend, if listening). This is done by
74
+ * SignalBackends(), which scans the list of listening backends and sends a
75
+ * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
76
+ * know which backend is listening on which channel so we must signal them
77
+ * all). We can exclude backends that are already up to date, though, and
78
+ * we can also exclude backends that are in other databases (unless they
79
+ * are way behind and should be kicked to make them advance their
80
+ * pointers).
81
+ *
82
+ * Finally, after we are out of the transaction altogether and about to go
83
+ * idle, we scan the queue for messages that need to be sent to our
84
+ * frontend (which might be notifies from other backends, or self-notifies
85
+ * from our own). This step is not part of the CommitTransaction sequence
86
+ * for two important reasons. First, we could get errors while sending
87
+ * data to our frontend, and it's really bad for errors to happen in
88
+ * post-commit cleanup. Second, in cases where a procedure issues commits
89
+ * within a single frontend command, we don't want to send notifies to our
90
+ * frontend until the command is done; but notifies to other backends
91
+ * should go out immediately after each commit.
82
92
*
83
93
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
84
94
* sets the process's latch, which triggers the event to be processed
@@ -426,11 +436,8 @@ static bool unlistenExitRegistered = false;
426
436
/* True if we're currently registered as a listener in asyncQueueControl */
427
437
static bool amRegisteredListener = false;
428
438
429
- /* has this backend sent notifications in the current transaction? */
430
- static bool backendHasSentNotifications = false;
431
-
432
439
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
433
- static bool backendTryAdvanceTail = false;
440
+ static bool tryAdvanceTail = false;
434
441
435
442
/* GUC parameter */
436
443
bool Trace_notify = false;
@@ -459,7 +466,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
459
466
char * page_buffer ,
460
467
Snapshot snapshot );
461
468
static void asyncQueueAdvanceTail (void );
462
- static void ProcessIncomingNotify (void );
469
+ static void ProcessIncomingNotify (bool flush );
463
470
static bool AsyncExistsPendingNotify (Notification * n );
464
471
static void AddEventToPendingNotifies (Notification * n );
465
472
static uint32 notification_hash (const void * key , Size keysize );
@@ -950,8 +957,6 @@ PreCommit_Notify(void)
950
957
AccessExclusiveLock );
951
958
952
959
/* Now push the notifications into the queue */
953
- backendHasSentNotifications = true;
954
-
955
960
nextNotify = list_head (pendingNotifies -> events );
956
961
while (nextNotify != NULL )
957
962
{
@@ -976,6 +981,8 @@ PreCommit_Notify(void)
976
981
nextNotify = asyncQueueAddEntries (nextNotify );
977
982
LWLockRelease (NotifyQueueLock );
978
983
}
984
+
985
+ /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
979
986
}
980
987
}
981
988
@@ -985,6 +992,11 @@ PreCommit_Notify(void)
985
992
* This is called at transaction commit, after committing to clog.
986
993
*
987
994
* Update listenChannels and clear transaction-local state.
995
+ *
996
+ * If we issued any notifications in the transaction, send signals to
997
+ * listening backends (possibly including ourselves) to process them.
998
+ * Also, if we filled enough queue pages with new notifies, try to
999
+ * advance the queue tail pointer.
988
1000
*/
989
1001
void
990
1002
AtCommit_Notify (void )
@@ -1027,6 +1039,29 @@ AtCommit_Notify(void)
1027
1039
if (amRegisteredListener && listenChannels == NIL )
1028
1040
asyncQueueUnregister ();
1029
1041
1042
+ /*
1043
+ * Send signals to listening backends. We need do this only if there are
1044
+ * pending notifies, which were previously added to the shared queue by
1045
+ * PreCommit_Notify().
1046
+ */
1047
+ if (pendingNotifies != NULL )
1048
+ SignalBackends ();
1049
+
1050
+ /*
1051
+ * If it's time to try to advance the global tail pointer, do that.
1052
+ *
1053
+ * (It might seem odd to do this in the sender, when more than likely the
1054
+ * listeners won't yet have read the messages we just sent. However,
1055
+ * there's less contention if only the sender does it, and there is little
1056
+ * need for urgency in advancing the global tail. So this typically will
1057
+ * be clearing out messages that were sent some time ago.)
1058
+ */
1059
+ if (tryAdvanceTail )
1060
+ {
1061
+ tryAdvanceTail = false;
1062
+ asyncQueueAdvanceTail ();
1063
+ }
1064
+
1030
1065
/* And clean up */
1031
1066
ClearPendingActionsAndNotifies ();
1032
1067
}
@@ -1199,85 +1234,6 @@ Exec_UnlistenAllCommit(void)
1199
1234
listenChannels = NIL ;
1200
1235
}
1201
1236
1202
- /*
1203
- * ProcessCompletedNotifies --- send out signals and self-notifies
1204
- *
1205
- * This is called from postgres.c just before going idle at the completion
1206
- * of a transaction. If we issued any notifications in the just-completed
1207
- * transaction, send signals to other backends to process them, and also
1208
- * process the queue ourselves to send messages to our own frontend.
1209
- * Also, if we filled enough queue pages with new notifies, try to advance
1210
- * the queue tail pointer.
1211
- *
1212
- * The reason that this is not done in AtCommit_Notify is that there is
1213
- * a nonzero chance of errors here (for example, encoding conversion errors
1214
- * while trying to format messages to our frontend). An error during
1215
- * AtCommit_Notify would be a PANIC condition. The timing is also arranged
1216
- * to ensure that a transaction's self-notifies are delivered to the frontend
1217
- * before it gets the terminating ReadyForQuery message.
1218
- *
1219
- * Note that we send signals and process the queue even if the transaction
1220
- * eventually aborted. This is because we need to clean out whatever got
1221
- * added to the queue.
1222
- *
1223
- * NOTE: we are outside of any transaction here.
1224
- */
1225
- void
1226
- ProcessCompletedNotifies (void )
1227
- {
1228
- MemoryContext caller_context ;
1229
-
1230
- /* Nothing to do if we didn't send any notifications */
1231
- if (!backendHasSentNotifications )
1232
- return ;
1233
-
1234
- /*
1235
- * We reset the flag immediately; otherwise, if any sort of error occurs
1236
- * below, we'd be locked up in an infinite loop, because control will come
1237
- * right back here after error cleanup.
1238
- */
1239
- backendHasSentNotifications = false;
1240
-
1241
- /*
1242
- * We must preserve the caller's memory context (probably MessageContext)
1243
- * across the transaction we do here.
1244
- */
1245
- caller_context = CurrentMemoryContext ;
1246
-
1247
- if (Trace_notify )
1248
- elog (DEBUG1 , "ProcessCompletedNotifies" );
1249
-
1250
- /*
1251
- * We must run asyncQueueReadAllNotifications inside a transaction, else
1252
- * bad things happen if it gets an error.
1253
- */
1254
- StartTransactionCommand ();
1255
-
1256
- /* Send signals to other backends */
1257
- SignalBackends ();
1258
-
1259
- if (listenChannels != NIL )
1260
- {
1261
- /* Read the queue ourselves, and send relevant stuff to the frontend */
1262
- asyncQueueReadAllNotifications ();
1263
- }
1264
-
1265
- /*
1266
- * If it's time to try to advance the global tail pointer, do that.
1267
- */
1268
- if (backendTryAdvanceTail )
1269
- {
1270
- backendTryAdvanceTail = false;
1271
- asyncQueueAdvanceTail ();
1272
- }
1273
-
1274
- CommitTransactionCommand ();
1275
-
1276
- MemoryContextSwitchTo (caller_context );
1277
-
1278
- /* We don't need pq_flush() here since postgres.c will do one shortly */
1279
- }
1280
-
1281
1237
/*
1282
1238
* Test whether we are actively listening on the given channel name.
1283
1239
*
@@ -1543,7 +1499,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
1543
1499
* pointer (we don't want to actually do that right here).
1544
1500
*/
1545
1501
if (QUEUE_POS_PAGE (queue_head ) % QUEUE_CLEANUP_DELAY == 0 )
1546
- backendTryAdvanceTail = true;
1502
+ tryAdvanceTail = true;
1547
1503
1548
1504
/* And exit the loop */
1549
1505
break ;
@@ -1658,8 +1614,6 @@ asyncQueueFillWarning(void)
1658
1614
/*
1659
1615
* Send signals to listening backends.
1660
1616
*
1661
- * We never signal our own process; that should be handled by our caller.
1662
- *
1663
1617
* Normally we signal only backends in our own database, since only those
1664
1618
* backends could be interested in notifies we send. However, if there's
1665
1619
* notify traffic in our database but no traffic in another database that
@@ -1668,6 +1622,9 @@ asyncQueueFillWarning(void)
1668
1622
* advance their queue position pointers, allowing the global tail to advance.
1669
1623
*
1670
1624
* Since we know the BackendId and the Pid the signaling is quite cheap.
1625
+ *
1626
+ * This is called during CommitTransaction(), so it's important for it
1627
+ * to have very low probability of failure.
1671
1628
*/
1672
1629
static void
1673
1630
SignalBackends (void )
@@ -1682,8 +1639,7 @@ SignalBackends(void)
1682
1639
* list of target PIDs.
1683
1640
*
1684
1641
* XXX in principle these pallocs could fail, which would be bad. Maybe
1685
- * preallocate the arrays? But in practice this is only run in trivial
1686
- * transactions, so there should surely be space available.
1642
+ * preallocate the arrays? They're not that large, though.
1687
1643
*/
1688
1644
pids = (int32 * ) palloc (MaxBackends * sizeof (int32 ));
1689
1645
ids = (BackendId * ) palloc (MaxBackends * sizeof (BackendId ));
@@ -1696,8 +1652,6 @@ SignalBackends(void)
1696
1652
QueuePosition pos ;
1697
1653
1698
1654
Assert (pid != InvalidPid );
1699
- if (pid == MyProcPid )
1700
- continue ; /* never signal self */
1701
1655
pos = QUEUE_BACKEND_POS (i );
1702
1656
if (QUEUE_BACKEND_DBOID (i ) == MyDatabaseId )
1703
1657
{
@@ -1730,6 +1684,16 @@ SignalBackends(void)
1730
1684
{
1731
1685
int32 pid = pids [i ];
1732
1686
1687
+ /*
1688
+ * If we are signaling our own process, no need to involve the kernel;
1689
+ * just set the flag directly.
1690
+ */
1691
+ if (pid == MyProcPid )
1692
+ {
1693
+ notifyInterruptPending = true;
1694
+ continue ;
1695
+ }
1696
+
1733
1697
/*
1734
1698
* Note: assuming things aren't broken, a signal failure here could
1735
1699
* only occur if the target backend exited since we released
@@ -1910,15 +1874,20 @@ HandleNotifyInterrupt(void)
1910
1874
* via the process's latch, and this routine will get called.
1911
1875
* If we are truly idle (ie, *not* inside a transaction block),
1912
1876
* process the incoming notifies.
1877
+ *
1878
+ * If "flush" is true, force any frontend messages out immediately.
1879
+ * This can be false when being called at the end of a frontend command,
1880
+ * since we'll flush after sending ReadyForQuery.
1913
1881
*/
1914
1882
void
1915
- ProcessNotifyInterrupt (void )
1883
+ ProcessNotifyInterrupt (bool flush )
1916
1884
{
1917
1885
if (IsTransactionOrTransactionBlock ())
1918
1886
return ; /* not really idle */
1919
1887
1888
+ /* Loop in case another signal arrives while sending messages */
1920
1889
while (notifyInterruptPending )
1921
- ProcessIncomingNotify ();
1890
+ ProcessIncomingNotify (flush );
1922
1891
}
1923
1892
1924
1893
@@ -2180,6 +2149,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
2180
2149
/*
2181
2150
* Advance the shared queue tail variable to the minimum of all the
2182
2151
* per-backend tail pointers. Truncate pg_notify space if possible.
2152
+ *
2153
+ * This is (usually) called during CommitTransaction(), so it's important for
2154
+ * it to have very low probability of failure.
2183
2155
*/
2184
2156
static void
2185
2157
asyncQueueAdvanceTail (void )
@@ -2253,17 +2225,16 @@ asyncQueueAdvanceTail(void)
2253
2225
/*
2254
2226
* ProcessIncomingNotify
2255
2227
*
2256
- * Deal with arriving NOTIFYs from other backends as soon as it's safe to
2257
- * do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
2258
- * signal handler, but isn't anymore .
2228
+ * Scan the queue for arriving notifications and report them to the front
2229
+ * end. The notifications might be from other sessions, or our own;
2230
+ * there's no need to distinguish here .
2259
2231
*
2260
- * Scan the queue for arriving notifications and report them to my front
2261
- * end.
2232
+ * If "flush" is true, force any frontend messages out immediately.
2262
2233
*
2263
2234
* NOTE: since we are outside any transaction, we must create our own.
2264
2235
*/
2265
2236
static void
2266
- ProcessIncomingNotify (void )
2237
+ ProcessIncomingNotify (bool flush )
2267
2238
{
2268
2239
/* We *must* reset the flag */
2269
2240
notifyInterruptPending = false;
@@ -2288,9 +2259,11 @@ ProcessIncomingNotify(void)
2288
2259
CommitTransactionCommand ();
2289
2260
2290
2261
/*
2291
- * Must flush the notify messages to ensure frontend gets them promptly.
2262
+ * If this isn't an end-of-command case, we must flush the notify messages
2263
+ * to ensure frontend gets them promptly.
2292
2264
*/
2293
- pq_flush ();
2265
+ if (flush )
2266
+ pq_flush ();
2294
2267
2295
2268
set_ps_display ("idle" );
2296
2269
@@ -2315,9 +2288,9 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
2315
2288
pq_endmessage (& buf );
2316
2289
2317
2290
/*
2318
- * NOTE: we do not do pq_flush() here. For a self-notify, it will
2319
- * happen at the end of the transaction, and for incoming notifies
2320
- * ProcessIncomingNotify will do it after finding all the notifies .
2291
+ * NOTE: we do not do pq_flush() here. Some level of caller will
2292
+ * handle it later, allowing this message to be combined into a packet
2293
+ * with other ones .
2321
2294
*/
2322
2295
}
2323
2296
else
0 commit comments