Skip to content

Commit 2e4eae8

Browse files
committed
Send NOTIFY signals during CommitTransaction.
Formerly, we sent signals for outgoing NOTIFY messages within ProcessCompletedNotifies, which was also responsible for sending relevant ones of those messages to our connected client. It therefore had to run during the main-loop processing that occurs just before going idle. This arrangement had two big disadvantages: * Now that procedures allow intra-command COMMITs, it would be useful to send NOTIFYs to other sessions immediately at COMMIT (though, for reasons of wire-protocol stability, we still shouldn't forward them to our client until end of command). * Background processes such as replication workers would not send NOTIFYs at all, since they never execute the client communication loop. We've had requests to allow triggers running in replication workers to send NOTIFYs, so that's a problem. To fix these things, move transmission of outgoing NOTIFY signals into AtCommit_Notify, where it will happen during CommitTransaction. Also move the possible call of asyncQueueAdvanceTail there, to ensure we don't bloat the async SLRU if a background worker sends many NOTIFYs with no one listening. We can also drop the call of asyncQueueReadAllNotifications, allowing ProcessCompletedNotifies to go away entirely. That's because commit 7900269 added a call of ProcessNotifyInterrupt adjacent to PostgresMain's call of ProcessCompletedNotifies, and that does its own call of asyncQueueReadAllNotifications, meaning that we were uselessly doing two such calls (inside two separate transactions) whenever inbound notify signals coincided with an outbound notify. We need only set notifyInterruptPending to ensure that ProcessNotifyInterrupt runs, and we're done. The existing documentation suggests that custom background workers should call ProcessCompletedNotifies if they want to send NOTIFY messages. To avoid an ABI break in the back branches, reduce it to an empty routine rather than removing it entirely. Removal will occur in v15. Although the problems mentioned above have existed for awhile, I don't feel comfortable back-patching this any further than v13. There was quite a bit of churn in adjacent code between 12 and 13. At minimum we'd have to also backpatch 51004c7, and a good deal of other adjustment would also be needed, so the benefit-to-risk ratio doesn't look attractive. Per bug #15293 from Michael Powers (and similar gripes from others). Artur Zakirov and Tom Lane Discussion: https://postgr.es/m/153243441449.1404.2274116228506175596@wrigleys.postgresql.org
1 parent e8638d7 commit 2e4eae8

File tree

5 files changed

+115
-137
lines changed

5 files changed

+115
-137
lines changed

doc/src/sgml/bgworker.sgml

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -280,15 +280,13 @@ typedef struct BackgroundWorker
280280
</para>
281281

282282
<para>
283-
If a background worker sends asynchronous notifications with the
284-
<command>NOTIFY</command> command via the Server Programming Interface
285-
(<acronym>SPI</acronym>), it should call
286-
<function>ProcessCompletedNotifies</function> explicitly after committing
287-
the enclosing transaction so that any notifications can be delivered. If a
288-
background worker registers to receive asynchronous notifications with
289-
the <command>LISTEN</command> through <acronym>SPI</acronym>, the worker
290-
will log those notifications, but there is no programmatic way for the
291-
worker to intercept and respond to those notifications.
283+
Background workers can send asynchronous notification messages, either by
284+
using the <command>NOTIFY</command> command via <acronym>SPI</acronym>,
285+
or directly via <function>Async_Notify()</function>. Such notifications
286+
will be sent at transaction commit.
287+
Background workers should not register to receive asynchronous
288+
notifications with the <command>LISTEN</command> command, as there is no
289+
infrastructure for a worker to consume such notifications.
292290
</para>
293291

294292
<para>

src/backend/access/transam/xact.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2269,7 +2269,17 @@ CommitTransaction(void)
22692269
*/
22702270
smgrDoPendingDeletes(true);
22712271

2272+
/*
2273+
* Send out notification signals to other backends (and do other
2274+
* post-commit NOTIFY cleanup). This must not happen until after our
2275+
* transaction is fully done from the viewpoint of other backends.
2276+
*/
22722277
AtCommit_Notify();
2278+
2279+
/*
2280+
* Everything after this should be purely internal-to-this-backend
2281+
* cleanup.
2282+
*/
22732283
AtEOXact_GUC(true, 1);
22742284
AtEOXact_SPI(true);
22752285
AtEOXact_Enum();

src/backend/commands/async.c

Lines changed: 90 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,27 @@
6868
* CommitTransaction() which will then do the actual transaction commit.
6969
*
7070
* After commit we are called another time (AtCommit_Notify()). Here we
71-
* make the actual updates to the effective listen state (listenChannels).
72-
*
73-
* Finally, after we are out of the transaction altogether, we check if
74-
* we need to signal listening backends. In SignalBackends() we scan the
75-
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
76-
* to every listening backend (we don't know which backend is listening on
77-
* which channel so we must signal them all). We can exclude backends that
78-
* are already up to date, though, and we can also exclude backends that
79-
* are in other databases (unless they are way behind and should be kicked
80-
* to make them advance their pointers). We don't bother with a
81-
* self-signal either, but just process the queue directly.
71+
* make any actual updates to the effective listen state (listenChannels).
72+
* Then we signal any backends that may be interested in our messages
73+
* (including our own backend, if listening). This is done by
74+
* SignalBackends(), which scans the list of listening backends and sends a
75+
* PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
76+
* know which backend is listening on which channel so we must signal them
77+
* all). We can exclude backends that are already up to date, though, and
78+
* we can also exclude backends that are in other databases (unless they
79+
* are way behind and should be kicked to make them advance their
80+
* pointers).
81+
*
82+
* Finally, after we are out of the transaction altogether and about to go
83+
* idle, we scan the queue for messages that need to be sent to our
84+
* frontend (which might be notifies from other backends, or self-notifies
85+
* from our own). This step is not part of the CommitTransaction sequence
86+
* for two important reasons. First, we could get errors while sending
87+
* data to our frontend, and it's really bad for errors to happen in
88+
* post-commit cleanup. Second, in cases where a procedure issues commits
89+
* within a single frontend command, we don't want to send notifies to our
90+
* frontend until the command is done; but notifies to other backends
91+
* should go out immediately after each commit.
8292
*
8393
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
8494
* sets the process's latch, which triggers the event to be processed
@@ -426,11 +436,8 @@ static bool unlistenExitRegistered = false;
426436
/* True if we're currently registered as a listener in asyncQueueControl */
427437
static bool amRegisteredListener = false;
428438

429-
/* has this backend sent notifications in the current transaction? */
430-
static bool backendHasSentNotifications = false;
431-
432439
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
433-
static bool backendTryAdvanceTail = false;
440+
static bool tryAdvanceTail = false;
434441

435442
/* GUC parameter */
436443
bool Trace_notify = false;
@@ -459,7 +466,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
459466
char *page_buffer,
460467
Snapshot snapshot);
461468
static void asyncQueueAdvanceTail(void);
462-
static void ProcessIncomingNotify(void);
469+
static void ProcessIncomingNotify(bool flush);
463470
static bool AsyncExistsPendingNotify(Notification *n);
464471
static void AddEventToPendingNotifies(Notification *n);
465472
static uint32 notification_hash(const void *key, Size keysize);
@@ -950,8 +957,6 @@ PreCommit_Notify(void)
950957
AccessExclusiveLock);
951958

952959
/* Now push the notifications into the queue */
953-
backendHasSentNotifications = true;
954-
955960
nextNotify = list_head(pendingNotifies->events);
956961
while (nextNotify != NULL)
957962
{
@@ -976,6 +981,8 @@ PreCommit_Notify(void)
976981
nextNotify = asyncQueueAddEntries(nextNotify);
977982
LWLockRelease(NotifyQueueLock);
978983
}
984+
985+
/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
979986
}
980987
}
981988

@@ -985,6 +992,11 @@ PreCommit_Notify(void)
985992
* This is called at transaction commit, after committing to clog.
986993
*
987994
* Update listenChannels and clear transaction-local state.
995+
*
996+
* If we issued any notifications in the transaction, send signals to
997+
* listening backends (possibly including ourselves) to process them.
998+
* Also, if we filled enough queue pages with new notifies, try to
999+
* advance the queue tail pointer.
9881000
*/
9891001
void
9901002
AtCommit_Notify(void)
@@ -1027,6 +1039,29 @@ AtCommit_Notify(void)
10271039
if (amRegisteredListener && listenChannels == NIL)
10281040
asyncQueueUnregister();
10291041

1042+
/*
1043+
* Send signals to listening backends. We need do this only if there are
1044+
* pending notifies, which were previously added to the shared queue by
1045+
* PreCommit_Notify().
1046+
*/
1047+
if (pendingNotifies != NULL)
1048+
SignalBackends();
1049+
1050+
/*
1051+
* If it's time to try to advance the global tail pointer, do that.
1052+
*
1053+
* (It might seem odd to do this in the sender, when more than likely the
1054+
* listeners won't yet have read the messages we just sent. However,
1055+
* there's less contention if only the sender does it, and there is little
1056+
* need for urgency in advancing the global tail. So this typically will
1057+
* be clearing out messages that were sent some time ago.)
1058+
*/
1059+
if (tryAdvanceTail)
1060+
{
1061+
tryAdvanceTail = false;
1062+
asyncQueueAdvanceTail();
1063+
}
1064+
10301065
/* And clean up */
10311066
ClearPendingActionsAndNotifies();
10321067
}
@@ -1199,85 +1234,6 @@ Exec_UnlistenAllCommit(void)
11991234
listenChannels = NIL;
12001235
}
12011236

1202-
/*
1203-
* ProcessCompletedNotifies --- send out signals and self-notifies
1204-
*
1205-
* This is called from postgres.c just before going idle at the completion
1206-
* of a transaction. If we issued any notifications in the just-completed
1207-
* transaction, send signals to other backends to process them, and also
1208-
* process the queue ourselves to send messages to our own frontend.
1209-
* Also, if we filled enough queue pages with new notifies, try to advance
1210-
* the queue tail pointer.
1211-
*
1212-
* The reason that this is not done in AtCommit_Notify is that there is
1213-
* a nonzero chance of errors here (for example, encoding conversion errors
1214-
* while trying to format messages to our frontend). An error during
1215-
* AtCommit_Notify would be a PANIC condition. The timing is also arranged
1216-
* to ensure that a transaction's self-notifies are delivered to the frontend
1217-
* before it gets the terminating ReadyForQuery message.
1218-
*
1219-
* Note that we send signals and process the queue even if the transaction
1220-
* eventually aborted. This is because we need to clean out whatever got
1221-
* added to the queue.
1222-
*
1223-
* NOTE: we are outside of any transaction here.
1224-
*/
1225-
void
1226-
ProcessCompletedNotifies(void)
1227-
{
1228-
MemoryContext caller_context;
1229-
1230-
/* Nothing to do if we didn't send any notifications */
1231-
if (!backendHasSentNotifications)
1232-
return;
1233-
1234-
/*
1235-
* We reset the flag immediately; otherwise, if any sort of error occurs
1236-
* below, we'd be locked up in an infinite loop, because control will come
1237-
* right back here after error cleanup.
1238-
*/
1239-
backendHasSentNotifications = false;
1240-
1241-
/*
1242-
* We must preserve the caller's memory context (probably MessageContext)
1243-
* across the transaction we do here.
1244-
*/
1245-
caller_context = CurrentMemoryContext;
1246-
1247-
if (Trace_notify)
1248-
elog(DEBUG1, "ProcessCompletedNotifies");
1249-
1250-
/*
1251-
* We must run asyncQueueReadAllNotifications inside a transaction, else
1252-
* bad things happen if it gets an error.
1253-
*/
1254-
StartTransactionCommand();
1255-
1256-
/* Send signals to other backends */
1257-
SignalBackends();
1258-
1259-
if (listenChannels != NIL)
1260-
{
1261-
/* Read the queue ourselves, and send relevant stuff to the frontend */
1262-
asyncQueueReadAllNotifications();
1263-
}
1264-
1265-
/*
1266-
* If it's time to try to advance the global tail pointer, do that.
1267-
*/
1268-
if (backendTryAdvanceTail)
1269-
{
1270-
backendTryAdvanceTail = false;
1271-
asyncQueueAdvanceTail();
1272-
}
1273-
1274-
CommitTransactionCommand();
1275-
1276-
MemoryContextSwitchTo(caller_context);
1277-
1278-
/* We don't need pq_flush() here since postgres.c will do one shortly */
1279-
}
1280-
12811237
/*
12821238
* Test whether we are actively listening on the given channel name.
12831239
*
@@ -1543,7 +1499,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
15431499
* pointer (we don't want to actually do that right here).
15441500
*/
15451501
if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1546-
backendTryAdvanceTail = true;
1502+
tryAdvanceTail = true;
15471503

15481504
/* And exit the loop */
15491505
break;
@@ -1658,8 +1614,6 @@ asyncQueueFillWarning(void)
16581614
/*
16591615
* Send signals to listening backends.
16601616
*
1661-
* We never signal our own process; that should be handled by our caller.
1662-
*
16631617
* Normally we signal only backends in our own database, since only those
16641618
* backends could be interested in notifies we send. However, if there's
16651619
* notify traffic in our database but no traffic in another database that
@@ -1668,6 +1622,9 @@ asyncQueueFillWarning(void)
16681622
* advance their queue position pointers, allowing the global tail to advance.
16691623
*
16701624
* Since we know the BackendId and the Pid the signaling is quite cheap.
1625+
*
1626+
* This is called during CommitTransaction(), so it's important for it
1627+
* to have very low probability of failure.
16711628
*/
16721629
static void
16731630
SignalBackends(void)
@@ -1682,8 +1639,7 @@ SignalBackends(void)
16821639
* list of target PIDs.
16831640
*
16841641
* XXX in principle these pallocs could fail, which would be bad. Maybe
1685-
* preallocate the arrays? But in practice this is only run in trivial
1686-
* transactions, so there should surely be space available.
1642+
* preallocate the arrays? They're not that large, though.
16871643
*/
16881644
pids = (int32 *) palloc(MaxBackends * sizeof(int32));
16891645
ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
@@ -1696,8 +1652,6 @@ SignalBackends(void)
16961652
QueuePosition pos;
16971653

16981654
Assert(pid != InvalidPid);
1699-
if (pid == MyProcPid)
1700-
continue; /* never signal self */
17011655
pos = QUEUE_BACKEND_POS(i);
17021656
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
17031657
{
@@ -1730,6 +1684,16 @@ SignalBackends(void)
17301684
{
17311685
int32 pid = pids[i];
17321686

1687+
/*
1688+
* If we are signaling our own process, no need to involve the kernel;
1689+
* just set the flag directly.
1690+
*/
1691+
if (pid == MyProcPid)
1692+
{
1693+
notifyInterruptPending = true;
1694+
continue;
1695+
}
1696+
17331697
/*
17341698
* Note: assuming things aren't broken, a signal failure here could
17351699
* only occur if the target backend exited since we released
@@ -1910,15 +1874,20 @@ HandleNotifyInterrupt(void)
19101874
* via the process's latch, and this routine will get called.
19111875
* If we are truly idle (ie, *not* inside a transaction block),
19121876
* process the incoming notifies.
1877+
*
1878+
* If "flush" is true, force any frontend messages out immediately.
1879+
* This can be false when being called at the end of a frontend command,
1880+
* since we'll flush after sending ReadyForQuery.
19131881
*/
19141882
void
1915-
ProcessNotifyInterrupt(void)
1883+
ProcessNotifyInterrupt(bool flush)
19161884
{
19171885
if (IsTransactionOrTransactionBlock())
19181886
return; /* not really idle */
19191887

1888+
/* Loop in case another signal arrives while sending messages */
19201889
while (notifyInterruptPending)
1921-
ProcessIncomingNotify();
1890+
ProcessIncomingNotify(flush);
19221891
}
19231892

19241893

@@ -2180,6 +2149,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
21802149
/*
21812150
* Advance the shared queue tail variable to the minimum of all the
21822151
* per-backend tail pointers. Truncate pg_notify space if possible.
2152+
*
2153+
* This is (usually) called during CommitTransaction(), so it's important for
2154+
* it to have very low probability of failure.
21832155
*/
21842156
static void
21852157
asyncQueueAdvanceTail(void)
@@ -2253,17 +2225,16 @@ asyncQueueAdvanceTail(void)
22532225
/*
22542226
* ProcessIncomingNotify
22552227
*
2256-
* Deal with arriving NOTIFYs from other backends as soon as it's safe to
2257-
* do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
2258-
* signal handler, but isn't anymore.
2228+
* Scan the queue for arriving notifications and report them to the front
2229+
* end. The notifications might be from other sessions, or our own;
2230+
* there's no need to distinguish here.
22592231
*
2260-
* Scan the queue for arriving notifications and report them to my front
2261-
* end.
2232+
* If "flush" is true, force any frontend messages out immediately.
22622233
*
22632234
* NOTE: since we are outside any transaction, we must create our own.
22642235
*/
22652236
static void
2266-
ProcessIncomingNotify(void)
2237+
ProcessIncomingNotify(bool flush)
22672238
{
22682239
/* We *must* reset the flag */
22692240
notifyInterruptPending = false;
@@ -2288,9 +2259,11 @@ ProcessIncomingNotify(void)
22882259
CommitTransactionCommand();
22892260

22902261
/*
2291-
* Must flush the notify messages to ensure frontend gets them promptly.
2262+
* If this isn't an end-of-command case, we must flush the notify messages
2263+
* to ensure frontend gets them promptly.
22922264
*/
2293-
pq_flush();
2265+
if (flush)
2266+
pq_flush();
22942267

22952268
set_ps_display("idle");
22962269

@@ -2315,9 +2288,9 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
23152288
pq_endmessage(&buf);
23162289

23172290
/*
2318-
* NOTE: we do not do pq_flush() here. For a self-notify, it will
2319-
* happen at the end of the transaction, and for incoming notifies
2320-
* ProcessIncomingNotify will do it after finding all the notifies.
2291+
* NOTE: we do not do pq_flush() here. Some level of caller will
2292+
* handle it later, allowing this message to be combined into a packet
2293+
* with other ones.
23212294
*/
23222295
}
23232296
else

0 commit comments

Comments
 (0)