Skip to content

Commit 173b56f

Browse files
committed
Add flush option to pg_logical_emit_message()
Since its introduction, LogLogicalMessage() (via the SQL interface pg_logical_emit_message()) has never included a call to XLogFlush(), causing it to potentially lose messages on a crash when used in non-transactional mode. This has come up to me as a problem while playing with ideas to design a test suite for what has become 039_end_of_wal.pl introduced in bae868c by Thomas Munro, because there are no direct ways to force a WAL flush via SQL. The default is false, to not flush messages and influence existing use-cases where this function could be used. If set to true, the message emitted is flushed before returning back to the caller, making the message durable on crash. This new option has no effect when using pg_logical_emit_message() in transactional mode, as the record's flush is guaranteed by the WAL record generated by the transaction committed. Two queries of test_decoding are tweaked to cover the new code path for the flush. Bump catalog version. Author: Michael Paquier Reviewed-by: Andres Freund, Amit Kapila, Fujii Masao, Tung Nguyen, Tomas Vondra Discussion: https://postgr.es/m/ZNsdThSe2qgsfs7R@paquier.xyz
1 parent 19fa977 commit 173b56f

File tree

9 files changed

+51
-13
lines changed

9 files changed

+51
-13
lines changed

contrib/test_decoding/expected/messages.out

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
66
init
77
(1 row)
88

9-
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
9+
-- These two cover the path for the flush variant.
10+
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
1011
?column?
1112
----------
1213
msg1
1314
(1 row)
1415

15-
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
16+
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
1617
?column?
1718
----------
1819
msg2

contrib/test_decoding/sql/messages.sql

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ SET synchronous_commit = on;
33

44
SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
55

6-
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1');
7-
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2');
6+
-- These two cover the path for the flush variant.
7+
SELECT 'msg1' FROM pg_logical_emit_message(true, 'test', 'msg1', true);
8+
SELECT 'msg2' FROM pg_logical_emit_message(false, 'test', 'msg2', true);
89

910
BEGIN;
1011
SELECT 'msg3' FROM pg_logical_emit_message(true, 'test', 'msg3');

doc/src/sgml/func.sgml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27740,11 +27740,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2774027740
<indexterm>
2774127741
<primary>pg_logical_emit_message</primary>
2774227742
</indexterm>
27743-
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> )
27743+
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
2774427744
<returnvalue>pg_lsn</returnvalue>
2774527745
</para>
2774627746
<para role="func_signature">
27747-
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> )
27747+
<function>pg_logical_emit_message</function> ( <parameter>transactional</parameter> <type>boolean</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type> [, <parameter>flush</parameter> <type>boolean</type> <literal>DEFAULT</literal> <literal>false</literal>] )
2774827748
<returnvalue>pg_lsn</returnvalue>
2774927749
</para>
2775027750
<para>
@@ -27758,6 +27758,11 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
2775827758
recognize messages that are interesting for them.
2775927759
The <parameter>content</parameter> parameter is the content of the
2776027760
message, given either in text or binary form.
27761+
The <parameter>flush</parameter> parameter (default set to
27762+
<literal>false</literal>) controls if the message is immediately
27763+
flushed to WAL or not. <parameter>flush</parameter> has no effect
27764+
with <parameter>transactional</parameter>, as the message's WAL
27765+
record is flushed along with its transaction.
2776127766
</para></entry>
2776227767
</row>
2776327768
</tbody>

src/backend/catalog/system_functions.sql

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,26 @@ LANGUAGE INTERNAL
446446
VOLATILE ROWS 1000 COST 1000
447447
AS 'pg_logical_slot_peek_binary_changes';
448448

449+
CREATE OR REPLACE FUNCTION pg_logical_emit_message(
450+
transactional boolean,
451+
prefix text,
452+
message text,
453+
flush boolean DEFAULT false)
454+
RETURNS pg_lsn
455+
LANGUAGE INTERNAL
456+
STRICT VOLATILE
457+
AS 'pg_logical_emit_message_text';
458+
459+
CREATE OR REPLACE FUNCTION pg_logical_emit_message(
460+
transactional boolean,
461+
prefix text,
462+
message bytea,
463+
flush boolean DEFAULT false)
464+
RETURNS pg_lsn
465+
LANGUAGE INTERNAL
466+
STRICT VOLATILE
467+
AS 'pg_logical_emit_message_bytea';
468+
449469
CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot(
450470
IN slot_name name, IN immediately_reserve boolean DEFAULT false,
451471
IN temporary boolean DEFAULT false,

src/backend/replication/logical/logicalfuncs.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,10 +362,11 @@ pg_logical_emit_message_bytea(PG_FUNCTION_ARGS)
362362
bool transactional = PG_GETARG_BOOL(0);
363363
char *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
364364
bytea *data = PG_GETARG_BYTEA_PP(2);
365+
bool flush = PG_GETARG_BOOL(3);
365366
XLogRecPtr lsn;
366367

367368
lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
368-
transactional);
369+
transactional, flush);
369370
PG_RETURN_LSN(lsn);
370371
}
371372

src/backend/replication/logical/message.c

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@
4444
*/
4545
XLogRecPtr
4646
LogLogicalMessage(const char *prefix, const char *message, size_t size,
47-
bool transactional)
47+
bool transactional, bool flush)
4848
{
4949
xl_logical_message xlrec;
50+
XLogRecPtr lsn;
5051

5152
/*
5253
* Force xid to be allocated if we're emitting a transactional message.
@@ -71,7 +72,15 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
7172
/* allow origin filtering */
7273
XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
7374

74-
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
75+
lsn = XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
76+
77+
/*
78+
* Make sure that the message hits disk before leaving if emitting a
79+
* non-transactional message when flush is requested.
80+
*/
81+
if (!transactional && flush)
82+
XLogFlush(lsn);
83+
return lsn;
7584
}
7685

7786
/*

src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,6 @@
5757
*/
5858

5959
/* yyyymmddN */
60-
#define CATALOG_VERSION_NO 202310161
60+
#define CATALOG_VERSION_NO 202310181
6161

6262
#endif

src/include/catalog/pg_proc.dat

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11167,11 +11167,11 @@
1116711167
prosrc => 'pg_replication_slot_advance' },
1116811168
{ oid => '3577', descr => 'emit a textual logical decoding message',
1116911169
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
11170-
prorettype => 'pg_lsn', proargtypes => 'bool text text',
11170+
prorettype => 'pg_lsn', proargtypes => 'bool text text bool',
1117111171
prosrc => 'pg_logical_emit_message_text' },
1117211172
{ oid => '3578', descr => 'emit a binary logical decoding message',
1117311173
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
11174-
prorettype => 'pg_lsn', proargtypes => 'bool text bytea',
11174+
prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
1117511175
prosrc => 'pg_logical_emit_message_bytea' },
1117611176

1117711177
# event triggers

src/include/replication/message.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ typedef struct xl_logical_message
3030
#define SizeOfLogicalMessage (offsetof(xl_logical_message, message))
3131

3232
extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
33-
size_t size, bool transactional);
33+
size_t size, bool transactional,
34+
bool flush);
3435

3536
/* RMGR API */
3637
#define XLOG_LOGICAL_MESSAGE 0x00

0 commit comments

Comments
 (0)