Skip to content

Commit ac4645c

Browse files
author
Amit Kapila
committed
Allow pgoutput to send logical decoding messages.
The output plugin accepts a new parameter (messages) that controls if logical decoding messages are written into the replication stream. It is useful for those clients that use pgoutput as an output plugin and needs to process messages that were written by pg_logical_emit_message(). Although logical streaming replication protocol supports logical decoding messages now, logical replication does not use this feature yet. Author: David Pirotte, Euler Taveira Reviewed-by: Euler Taveira, Andres Freund, Ashutosh Bapat, Amit Kapila Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com
1 parent 531737d commit ac4645c

File tree

7 files changed

+312
-0
lines changed

7 files changed

+312
-0
lines changed

doc/src/sgml/protocol.sgml

+76
Original file line numberDiff line numberDiff line change
@@ -6433,6 +6433,82 @@ Begin
64336433
</listitem>
64346434
</varlistentry>
64356435

6436+
<varlistentry>
6437+
<term>
6438+
Message
6439+
</term>
6440+
<listitem>
6441+
<para>
6442+
6443+
<variablelist>
6444+
<varlistentry>
6445+
<term>
6446+
Byte1('M')
6447+
</term>
6448+
<listitem>
6449+
<para>
6450+
Identifies the message as a logical decoding message.
6451+
</para>
6452+
</listitem>
6453+
</varlistentry>
6454+
<varlistentry>
6455+
<term>
6456+
Int32
6457+
</term>
6458+
<listitem>
6459+
<para>
6460+
Xid of the transaction. The XID is sent only when user has
6461+
requested streaming of in-progress transactions.
6462+
</para>
6463+
</listitem>
6464+
</varlistentry>
6465+
<varlistentry>
6466+
<term>
6467+
Int8
6468+
</term>
6469+
<listitem>
6470+
<para>
6471+
Flags; Either 0 for no flags or 1 if the logical decoding
6472+
message is transactional.
6473+
</para>
6474+
</listitem>
6475+
</varlistentry>
6476+
<varlistentry>
6477+
<term>
6478+
Int64
6479+
</term>
6480+
<listitem>
6481+
<para>
6482+
The LSN of the logical decoding message.
6483+
</para>
6484+
</listitem>
6485+
</varlistentry>
6486+
<varlistentry>
6487+
<term>
6488+
String
6489+
</term>
6490+
<listitem>
6491+
<para>
6492+
The prefix of the logical decoding message.
6493+
</para>
6494+
</listitem>
6495+
</varlistentry>
6496+
<varlistentry>
6497+
<term>
6498+
Byte<replaceable>n</replaceable>
6499+
</term>
6500+
<listitem>
6501+
<para>
6502+
The content of the logical decoding message.
6503+
</para>
6504+
</listitem>
6505+
</varlistentry>
6506+
6507+
</variablelist>
6508+
</para>
6509+
</listitem>
6510+
</varlistentry>
6511+
64366512
<varlistentry>
64376513
<term>
64386514
Commit

src/backend/replication/logical/proto.c

+28
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
*/
2626
#define LOGICALREP_IS_REPLICA_IDENTITY 1
2727

28+
#define MESSAGE_TRANSACTIONAL (1<<0)
2829
#define TRUNCATE_CASCADE (1<<0)
2930
#define TRUNCATE_RESTART_SEQS (1<<1)
3031

@@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in,
361362
return relids;
362363
}
363364

365+
/*
366+
* Write MESSAGE to stream
367+
*/
368+
void
369+
logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
370+
bool transactional, const char *prefix, Size sz,
371+
const char *message)
372+
{
373+
uint8 flags = 0;
374+
375+
pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
376+
377+
/* encode and send message flags */
378+
if (transactional)
379+
flags |= MESSAGE_TRANSACTIONAL;
380+
381+
/* transaction ID (if not valid, we're not streaming) */
382+
if (TransactionIdIsValid(xid))
383+
pq_sendint32(out, xid);
384+
385+
pq_sendint8(out, flags);
386+
pq_sendint64(out, lsn);
387+
pq_sendstring(out, prefix);
388+
pq_sendint32(out, sz);
389+
pq_sendbytes(out, message, sz);
390+
}
391+
364392
/*
365393
* Write relation description to the output stream.
366394
*/

src/backend/replication/logical/worker.c

+9
Original file line numberDiff line numberDiff line change
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
19391939
apply_handle_origin(s);
19401940
return;
19411941

1942+
case LOGICAL_REP_MSG_MESSAGE:
1943+
1944+
/*
1945+
* Logical replication does not use generic logical messages yet.
1946+
* Although, it could be used by other applications that use this
1947+
* output plugin.
1948+
*/
1949+
return;
1950+
19421951
case LOGICAL_REP_MSG_STREAM_START:
19431952
apply_handle_stream_start(s);
19441953
return;

src/backend/replication/pgoutput/pgoutput.c

+47
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
4545
static void pgoutput_truncate(LogicalDecodingContext *ctx,
4646
ReorderBufferTXN *txn, int nrelations, Relation relations[],
4747
ReorderBufferChange *change);
48+
static void pgoutput_message(LogicalDecodingContext *ctx,
49+
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
50+
bool transactional, const char *prefix,
51+
Size sz, const char *message);
4852
static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
4953
RepOriginId origin_id);
5054
static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
142146
cb->begin_cb = pgoutput_begin_txn;
143147
cb->change_cb = pgoutput_change;
144148
cb->truncate_cb = pgoutput_truncate;
149+
cb->message_cb = pgoutput_message;
145150
cb->commit_cb = pgoutput_commit_txn;
146151
cb->filter_by_origin_cb = pgoutput_origin_filter;
147152
cb->shutdown_cb = pgoutput_shutdown;
@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
152157
cb->stream_abort_cb = pgoutput_stream_abort;
153158
cb->stream_commit_cb = pgoutput_stream_commit;
154159
cb->stream_change_cb = pgoutput_change;
160+
cb->stream_message_cb = pgoutput_message;
155161
cb->stream_truncate_cb = pgoutput_truncate;
156162
}
157163

@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
162168
bool protocol_version_given = false;
163169
bool publication_names_given = false;
164170
bool binary_option_given = false;
171+
bool messages_option_given = false;
165172
bool streaming_given = false;
166173

167174
data->binary = false;
168175
data->streaming = false;
176+
data->messages = false;
169177

170178
foreach(lc, options)
171179
{
@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
221229

222230
data->binary = defGetBoolean(defel);
223231
}
232+
else if (strcmp(defel->defname, "messages") == 0)
233+
{
234+
if (messages_option_given)
235+
ereport(ERROR,
236+
(errcode(ERRCODE_SYNTAX_ERROR),
237+
errmsg("conflicting or redundant options")));
238+
messages_option_given = true;
239+
240+
data->messages = defGetBoolean(defel);
241+
}
224242
else if (strcmp(defel->defname, "streaming") == 0)
225243
{
226244
if (streaming_given)
@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
689707
MemoryContextReset(data->context);
690708
}
691709

710+
static void
711+
pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
712+
XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
713+
const char *message)
714+
{
715+
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
716+
TransactionId xid = InvalidTransactionId;
717+
718+
if (!data->messages)
719+
return;
720+
721+
/*
722+
* Remember the xid for the message in streaming mode. See
723+
* pgoutput_change.
724+
*/
725+
if (in_streaming)
726+
xid = txn->xid;
727+
728+
OutputPluginPrepareWrite(ctx, true);
729+
logicalrep_write_message(ctx->out,
730+
xid,
731+
message_lsn,
732+
transactional,
733+
prefix,
734+
sz,
735+
message);
736+
OutputPluginWrite(ctx, true);
737+
}
738+
692739
/*
693740
* Currently we always forward.
694741
*/

src/include/replication/logicalproto.h

+3
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
5454
LOGICAL_REP_MSG_TRUNCATE = 'T',
5555
LOGICAL_REP_MSG_RELATION = 'R',
5656
LOGICAL_REP_MSG_TYPE = 'Y',
57+
LOGICAL_REP_MSG_MESSAGE = 'M',
5758
LOGICAL_REP_MSG_STREAM_START = 'S',
5859
LOGICAL_REP_MSG_STREAM_END = 'E',
5960
LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
151152
bool cascade, bool restart_seqs);
152153
extern List *logicalrep_read_truncate(StringInfo in,
153154
bool *cascade, bool *restart_seqs);
155+
extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
156+
bool transactional, const char *prefix, Size sz, const char *message);
154157
extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
155158
Relation rel);
156159
extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);

src/include/replication/pgoutput.h

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ typedef struct PGOutputData
2626
List *publications;
2727
bool binary;
2828
bool streaming;
29+
bool messages;
2930
} PGOutputData;
3031

3132
#endif /* PGOUTPUT_H */

0 commit comments

Comments
 (0)