Skip to content

Commit 63f2877

Browse files
committed
Send NOTIFY signals during CommitTransaction.
Formerly, we sent signals for outgoing NOTIFY messages within ProcessCompletedNotifies, which was also responsible for sending relevant ones of those messages to our connected client. It therefore had to run during the main-loop processing that occurs just before going idle. This arrangement had two big disadvantages: * Now that procedures allow intra-command COMMITs, it would be useful to send NOTIFYs to other sessions immediately at COMMIT (though, for reasons of wire-protocol stability, we still shouldn't forward them to our client until end of command). * Background processes such as replication workers would not send NOTIFYs at all, since they never execute the client communication loop. We've had requests to allow triggers running in replication workers to send NOTIFYs, so that's a problem. To fix these things, move transmission of outgoing NOTIFY signals into AtCommit_Notify, where it will happen during CommitTransaction. Also move the possible call of asyncQueueAdvanceTail there, to ensure we don't bloat the async SLRU if a background worker sends many NOTIFYs with no one listening. We can also drop the call of asyncQueueReadAllNotifications, allowing ProcessCompletedNotifies to go away entirely. That's because commit 7900269 added a call of ProcessNotifyInterrupt adjacent to PostgresMain's call of ProcessCompletedNotifies, and that does its own call of asyncQueueReadAllNotifications, meaning that we were uselessly doing two such calls (inside two separate transactions) whenever inbound notify signals coincided with an outbound notify. We need only set notifyInterruptPending to ensure that ProcessNotifyInterrupt runs, and we're done. The existing documentation suggests that custom background workers should call ProcessCompletedNotifies if they want to send NOTIFY messages. To avoid an ABI break in the back branches, reduce it to an empty routine rather than removing it entirely. Removal will occur in v15. Although the problems mentioned above have existed for awhile, I don't feel comfortable back-patching this any further than v13. There was quite a bit of churn in adjacent code between 12 and 13. At minimum we'd have to also backpatch 51004c7, and a good deal of other adjustment would also be needed, so the benefit-to-risk ratio doesn't look attractive. Per bug #15293 from Michael Powers (and similar gripes from others). Artur Zakirov and Tom Lane Discussion: https://postgr.es/m/153243441449.1404.2274116228506175596@wrigleys.postgresql.org
1 parent c49e6f9 commit 63f2877

File tree

5 files changed

+122
-129
lines changed

5 files changed

+122
-129
lines changed

doc/src/sgml/bgworker.sgml

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -284,15 +284,13 @@ typedef struct BackgroundWorker
284284
</para>
285285

286286
<para>
287-
If a background worker sends asynchronous notifications with the
288-
<command>NOTIFY</command> command via the Server Programming Interface
289-
(<acronym>SPI</acronym>), it should call
290-
<function>ProcessCompletedNotifies</function> explicitly after committing
291-
the enclosing transaction so that any notifications can be delivered. If a
292-
background worker registers to receive asynchronous notifications with
293-
the <command>LISTEN</command> through <acronym>SPI</acronym>, the worker
294-
will log those notifications, but there is no programmatic way for the
295-
worker to intercept and respond to those notifications.
287+
Background workers can send asynchronous notification messages, either by
288+
using the <command>NOTIFY</command> command via <acronym>SPI</acronym>,
289+
or directly via <function>Async_Notify()</function>. Such notifications
290+
will be sent at transaction commit.
291+
Background workers should not register to receive asynchronous
292+
notifications with the <command>LISTEN</command> command, as there is no
293+
infrastructure for a worker to consume such notifications.
296294
</para>
297295

298296
<para>

src/backend/access/transam/xact.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,7 +2242,17 @@ CommitTransaction(void)
22422242
*/
22432243
smgrDoPendingDeletes(true);
22442244

2245+
/*
2246+
* Send out notification signals to other backends (and do other
2247+
* post-commit NOTIFY cleanup). This must not happen until after our
2248+
* transaction is fully done from the viewpoint of other backends.
2249+
*/
22452250
AtCommit_Notify();
2251+
2252+
/*
2253+
* Everything after this should be purely internal-to-this-backend
2254+
* cleanup.
2255+
*/
22462256
AtEOXact_GUC(true, 1);
22472257
AtEOXact_SPI(true);
22482258
AtEOXact_Enum();

src/backend/commands/async.c

Lines changed: 97 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -68,17 +68,27 @@
6868
* CommitTransaction() which will then do the actual transaction commit.
6969
*
7070
* After commit we are called another time (AtCommit_Notify()). Here we
71-
* make the actual updates to the effective listen state (listenChannels).
72-
*
73-
* Finally, after we are out of the transaction altogether, we check if
74-
* we need to signal listening backends. In SignalBackends() we scan the
75-
* list of listening backends and send a PROCSIG_NOTIFY_INTERRUPT signal
76-
* to every listening backend (we don't know which backend is listening on
77-
* which channel so we must signal them all). We can exclude backends that
78-
* are already up to date, though, and we can also exclude backends that
79-
* are in other databases (unless they are way behind and should be kicked
80-
* to make them advance their pointers). We don't bother with a
81-
* self-signal either, but just process the queue directly.
71+
* make any actual updates to the effective listen state (listenChannels).
72+
* Then we signal any backends that may be interested in our messages
73+
* (including our own backend, if listening). This is done by
74+
* SignalBackends(), which scans the list of listening backends and sends a
75+
* PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't
76+
* know which backend is listening on which channel so we must signal them
77+
* all). We can exclude backends that are already up to date, though, and
78+
* we can also exclude backends that are in other databases (unless they
79+
* are way behind and should be kicked to make them advance their
80+
* pointers).
81+
*
82+
* Finally, after we are out of the transaction altogether and about to go
83+
* idle, we scan the queue for messages that need to be sent to our
84+
* frontend (which might be notifies from other backends, or self-notifies
85+
* from our own). This step is not part of the CommitTransaction sequence
86+
* for two important reasons. First, we could get errors while sending
87+
* data to our frontend, and it's really bad for errors to happen in
88+
* post-commit cleanup. Second, in cases where a procedure issues commits
89+
* within a single frontend command, we don't want to send notifies to our
90+
* frontend until the command is done; but notifies to other backends
91+
* should go out immediately after each commit.
8292
*
8393
* 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler
8494
* sets the process's latch, which triggers the event to be processed
@@ -429,11 +439,8 @@ static bool unlistenExitRegistered = false;
429439
/* True if we're currently registered as a listener in asyncQueueControl */
430440
static bool amRegisteredListener = false;
431441

432-
/* has this backend sent notifications in the current transaction? */
433-
static bool backendHasSentNotifications = false;
434-
435442
/* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */
436-
static bool backendTryAdvanceTail = false;
443+
static bool tryAdvanceTail = false;
437444

438445
/* GUC parameter */
439446
bool Trace_notify = false;
@@ -462,7 +469,7 @@ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
462469
char *page_buffer,
463470
Snapshot snapshot);
464471
static void asyncQueueAdvanceTail(void);
465-
static void ProcessIncomingNotify(void);
472+
static void ProcessIncomingNotify(bool flush);
466473
static bool AsyncExistsPendingNotify(Notification *n);
467474
static void AddEventToPendingNotifies(Notification *n);
468475
static uint32 notification_hash(const void *key, Size keysize);
@@ -954,8 +961,6 @@ PreCommit_Notify(void)
954961
AccessExclusiveLock);
955962

956963
/* Now push the notifications into the queue */
957-
backendHasSentNotifications = true;
958-
959964
nextNotify = list_head(pendingNotifies->events);
960965
while (nextNotify != NULL)
961966
{
@@ -980,6 +985,8 @@ PreCommit_Notify(void)
980985
nextNotify = asyncQueueAddEntries(nextNotify);
981986
LWLockRelease(NotifyQueueLock);
982987
}
988+
989+
/* Note that we don't clear pendingNotifies; AtCommit_Notify will. */
983990
}
984991
}
985992

@@ -989,6 +996,11 @@ PreCommit_Notify(void)
989996
* This is called at transaction commit, after committing to clog.
990997
*
991998
* Update listenChannels and clear transaction-local state.
999+
*
1000+
* If we issued any notifications in the transaction, send signals to
1001+
* listening backends (possibly including ourselves) to process them.
1002+
* Also, if we filled enough queue pages with new notifies, try to
1003+
* advance the queue tail pointer.
9921004
*/
9931005
void
9941006
AtCommit_Notify(void)
@@ -1031,6 +1043,29 @@ AtCommit_Notify(void)
10311043
if (amRegisteredListener && listenChannels == NIL)
10321044
asyncQueueUnregister();
10331045

1046+
/*
1047+
* Send signals to listening backends. We need do this only if there are
1048+
* pending notifies, which were previously added to the shared queue by
1049+
* PreCommit_Notify().
1050+
*/
1051+
if (pendingNotifies != NULL)
1052+
SignalBackends();
1053+
1054+
/*
1055+
* If it's time to try to advance the global tail pointer, do that.
1056+
*
1057+
* (It might seem odd to do this in the sender, when more than likely the
1058+
* listeners won't yet have read the messages we just sent. However,
1059+
* there's less contention if only the sender does it, and there is little
1060+
* need for urgency in advancing the global tail. So this typically will
1061+
* be clearing out messages that were sent some time ago.)
1062+
*/
1063+
if (tryAdvanceTail)
1064+
{
1065+
tryAdvanceTail = false;
1066+
asyncQueueAdvanceTail();
1067+
}
1068+
10341069
/* And clean up */
10351070
ClearPendingActionsAndNotifies();
10361071
}
@@ -1204,82 +1239,17 @@ Exec_UnlistenAllCommit(void)
12041239
}
12051240

12061241
/*
1207-
* ProcessCompletedNotifies --- send out signals and self-notifies
1208-
*
1209-
* This is called from postgres.c just before going idle at the completion
1210-
* of a transaction. If we issued any notifications in the just-completed
1211-
* transaction, send signals to other backends to process them, and also
1212-
* process the queue ourselves to send messages to our own frontend.
1213-
* Also, if we filled enough queue pages with new notifies, try to advance
1214-
* the queue tail pointer.
1215-
*
1216-
* The reason that this is not done in AtCommit_Notify is that there is
1217-
* a nonzero chance of errors here (for example, encoding conversion errors
1218-
* while trying to format messages to our frontend). An error during
1219-
* AtCommit_Notify would be a PANIC condition. The timing is also arranged
1220-
* to ensure that a transaction's self-notifies are delivered to the frontend
1221-
* before it gets the terminating ReadyForQuery message.
1222-
*
1223-
* Note that we send signals and process the queue even if the transaction
1224-
* eventually aborted. This is because we need to clean out whatever got
1225-
* added to the queue.
1226-
*
1227-
* NOTE: we are outside of any transaction here.
1242+
* ProcessCompletedNotifies --- nowadays this does nothing
1243+
*
1244+
* This routine used to send signals and handle self-notifies,
1245+
* but that functionality has been moved elsewhere.
1246+
* We'd delete it entirely, except that the documentation used to instruct
1247+
* background-worker authors to call it. To avoid an ABI break in stable
1248+
* branches, keep it as a no-op routine.
12281249
*/
12291250
void
12301251
ProcessCompletedNotifies(void)
12311252
{
1232-
MemoryContext caller_context;
1233-
1234-
/* Nothing to do if we didn't send any notifications */
1235-
if (!backendHasSentNotifications)
1236-
return;
1237-
1238-
/*
1239-
* We reset the flag immediately; otherwise, if any sort of error occurs
1240-
* below, we'd be locked up in an infinite loop, because control will come
1241-
* right back here after error cleanup.
1242-
*/
1243-
backendHasSentNotifications = false;
1244-
1245-
/*
1246-
* We must preserve the caller's memory context (probably MessageContext)
1247-
* across the transaction we do here.
1248-
*/
1249-
caller_context = CurrentMemoryContext;
1250-
1251-
if (Trace_notify)
1252-
elog(DEBUG1, "ProcessCompletedNotifies");
1253-
1254-
/*
1255-
* We must run asyncQueueReadAllNotifications inside a transaction, else
1256-
* bad things happen if it gets an error.
1257-
*/
1258-
StartTransactionCommand();
1259-
1260-
/* Send signals to other backends */
1261-
SignalBackends();
1262-
1263-
if (listenChannels != NIL)
1264-
{
1265-
/* Read the queue ourselves, and send relevant stuff to the frontend */
1266-
asyncQueueReadAllNotifications();
1267-
}
1268-
1269-
/*
1270-
* If it's time to try to advance the global tail pointer, do that.
1271-
*/
1272-
if (backendTryAdvanceTail)
1273-
{
1274-
backendTryAdvanceTail = false;
1275-
asyncQueueAdvanceTail();
1276-
}
1277-
1278-
CommitTransactionCommand();
1279-
1280-
MemoryContextSwitchTo(caller_context);
1281-
1282-
/* We don't need pq_flush() here since postgres.c will do one shortly */
12831253
}
12841254

12851255
/*
@@ -1547,7 +1517,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
15471517
* pointer (we don't want to actually do that right here).
15481518
*/
15491519
if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0)
1550-
backendTryAdvanceTail = true;
1520+
tryAdvanceTail = true;
15511521

15521522
/* And exit the loop */
15531523
break;
@@ -1662,8 +1632,6 @@ asyncQueueFillWarning(void)
16621632
/*
16631633
* Send signals to listening backends.
16641634
*
1665-
* We never signal our own process; that should be handled by our caller.
1666-
*
16671635
* Normally we signal only backends in our own database, since only those
16681636
* backends could be interested in notifies we send. However, if there's
16691637
* notify traffic in our database but no traffic in another database that
@@ -1672,6 +1640,9 @@ asyncQueueFillWarning(void)
16721640
* advance their queue position pointers, allowing the global tail to advance.
16731641
*
16741642
* Since we know the BackendId and the Pid the signaling is quite cheap.
1643+
*
1644+
* This is called during CommitTransaction(), so it's important for it
1645+
* to have very low probability of failure.
16751646
*/
16761647
static void
16771648
SignalBackends(void)
@@ -1686,8 +1657,7 @@ SignalBackends(void)
16861657
* list of target PIDs.
16871658
*
16881659
* XXX in principle these pallocs could fail, which would be bad. Maybe
1689-
* preallocate the arrays? But in practice this is only run in trivial
1690-
* transactions, so there should surely be space available.
1660+
* preallocate the arrays? They're not that large, though.
16911661
*/
16921662
pids = (int32 *) palloc(MaxBackends * sizeof(int32));
16931663
ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId));
@@ -1700,8 +1670,6 @@ SignalBackends(void)
17001670
QueuePosition pos;
17011671

17021672
Assert(pid != InvalidPid);
1703-
if (pid == MyProcPid)
1704-
continue; /* never signal self */
17051673
pos = QUEUE_BACKEND_POS(i);
17061674
if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId)
17071675
{
@@ -1734,6 +1702,16 @@ SignalBackends(void)
17341702
{
17351703
int32 pid = pids[i];
17361704

1705+
/*
1706+
* If we are signaling our own process, no need to involve the kernel;
1707+
* just set the flag directly.
1708+
*/
1709+
if (pid == MyProcPid)
1710+
{
1711+
notifyInterruptPending = true;
1712+
continue;
1713+
}
1714+
17371715
/*
17381716
* Note: assuming things aren't broken, a signal failure here could
17391717
* only occur if the target backend exited since we released
@@ -1914,15 +1892,20 @@ HandleNotifyInterrupt(void)
19141892
* via the process's latch, and this routine will get called.
19151893
* If we are truly idle (ie, *not* inside a transaction block),
19161894
* process the incoming notifies.
1895+
*
1896+
* If "flush" is true, force any frontend messages out immediately.
1897+
* This can be false when being called at the end of a frontend command,
1898+
* since we'll flush after sending ReadyForQuery.
19171899
*/
19181900
void
1919-
ProcessNotifyInterrupt(void)
1901+
ProcessNotifyInterrupt(bool flush)
19201902
{
19211903
if (IsTransactionOrTransactionBlock())
19221904
return; /* not really idle */
19231905

1906+
/* Loop in case another signal arrives while sending messages */
19241907
while (notifyInterruptPending)
1925-
ProcessIncomingNotify();
1908+
ProcessIncomingNotify(flush);
19261909
}
19271910

19281911

@@ -2185,6 +2168,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current,
21852168
/*
21862169
* Advance the shared queue tail variable to the minimum of all the
21872170
* per-backend tail pointers. Truncate pg_notify space if possible.
2171+
*
2172+
* This is (usually) called during CommitTransaction(), so it's important for
2173+
* it to have very low probability of failure.
21882174
*/
21892175
static void
21902176
asyncQueueAdvanceTail(void)
@@ -2258,17 +2244,16 @@ asyncQueueAdvanceTail(void)
22582244
/*
22592245
* ProcessIncomingNotify
22602246
*
2261-
* Deal with arriving NOTIFYs from other backends as soon as it's safe to
2262-
* do so. This used to be called from the PROCSIG_NOTIFY_INTERRUPT
2263-
* signal handler, but isn't anymore.
2247+
* Scan the queue for arriving notifications and report them to the front
2248+
* end. The notifications might be from other sessions, or our own;
2249+
* there's no need to distinguish here.
22642250
*
2265-
* Scan the queue for arriving notifications and report them to my front
2266-
* end.
2251+
* If "flush" is true, force any frontend messages out immediately.
22672252
*
22682253
* NOTE: since we are outside any transaction, we must create our own.
22692254
*/
22702255
static void
2271-
ProcessIncomingNotify(void)
2256+
ProcessIncomingNotify(bool flush)
22722257
{
22732258
/* We *must* reset the flag */
22742259
notifyInterruptPending = false;
@@ -2293,9 +2278,11 @@ ProcessIncomingNotify(void)
22932278
CommitTransactionCommand();
22942279

22952280
/*
2296-
* Must flush the notify messages to ensure frontend gets them promptly.
2281+
* If this isn't an end-of-command case, we must flush the notify messages
2282+
* to ensure frontend gets them promptly.
22972283
*/
2298-
pq_flush();
2284+
if (flush)
2285+
pq_flush();
22992286

23002287
set_ps_display("idle");
23012288

@@ -2321,9 +2308,9 @@ NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid)
23212308
pq_endmessage(&buf);
23222309

23232310
/*
2324-
* NOTE: we do not do pq_flush() here. For a self-notify, it will
2325-
* happen at the end of the transaction, and for incoming notifies
2326-
* ProcessIncomingNotify will do it after finding all the notifies.
2311+
* NOTE: we do not do pq_flush() here. Some level of caller will
2312+
* handle it later, allowing this message to be combined into a packet
2313+
* with other ones.
23272314
*/
23282315
}
23292316
else

0 commit comments

Comments
 (0)