Skip to content

Commit 4648243

Browse files
author
Amit Kapila
committed
Add support for streaming to built-in logical replication.
To add support for streaming of in-progress transactions into the built-in logical replication, we need to do three things: * Extend the logical replication protocol, so identify in-progress transactions, and allow adding additional bits of information (e.g. XID of subtransactions). * Modify the output plugin (pgoutput) to implement the new stream API callbacks, by leveraging the extended replication protocol. * Modify the replication apply worker, to properly handle streamed in-progress transaction by spilling the data to disk and then replaying them on commit. We however must explicitly disable streaming replication during replication slot creation, even if the plugin supports it. We don't need to replicate the changes accumulated during this phase, and moreover we don't have a replication connection open so we don't have where to send the data anyway. Author: Tomas Vondra, Dilip Kumar and Amit Kapila Reviewed-by: Amit Kapila, Kuntal Ghosh and Ajin Cherian Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
1 parent 66f1630 commit 4648243

File tree

23 files changed

+1766
-74
lines changed

23 files changed

+1766
-74
lines changed

doc/src/sgml/monitoring.sgml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,22 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser
15091509
<entry><literal>WALWrite</literal></entry>
15101510
<entry>Waiting for a write to a WAL file.</entry>
15111511
</row>
1512+
<row>
1513+
<entry><literal>LogicalChangesRead</literal></entry>
1514+
<entry>Waiting for a read from a logical changes file.</entry>
1515+
</row>
1516+
<row>
1517+
<entry><literal>LogicalChangesWrite</literal></entry>
1518+
<entry>Waiting for a write to a logical changes file.</entry>
1519+
</row>
1520+
<row>
1521+
<entry><literal>LogicalSubxactRead</literal></entry>
1522+
<entry>Waiting for a read from a logical subxact file.</entry>
1523+
</row>
1524+
<row>
1525+
<entry><literal>LogicalSubxactWrite</literal></entry>
1526+
<entry>Waiting for a write to a logical subxact file.</entry>
1527+
</row>
15121528
</tbody>
15131529
</tgroup>
15141530
</table>

doc/src/sgml/ref/alter_subscription.sgml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
165165
<xref linkend="sql-createsubscription"/>. See there for more
166166
information. The parameters that can be altered
167167
are <literal>slot_name</literal>,
168-
<literal>synchronous_commit</literal>, and
169-
<literal>binary</literal>.
168+
<literal>synchronous_commit</literal>,
169+
<literal>binary</literal>, and
170+
<literal>streaming</literal>.
170171
</para>
171172
</listitem>
172173
</varlistentry>

doc/src/sgml/ref/create_subscription.sgml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,17 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
228228
</para>
229229
</listitem>
230230
</varlistentry>
231+
<varlistentry>
232+
<term><literal>streaming</literal> (<type>boolean</type>)</term>
233+
<listitem>
234+
<para>
235+
Specifies whether streaming of in-progress transactions should
236+
be enabled for this subscription. By default, all transactions
237+
are fully decoded on the publisher, and only then sent to the
238+
subscriber as a whole.
239+
</para>
240+
</listitem>
241+
</varlistentry>
231242
</variablelist></para>
232243
</listitem>
233244
</varlistentry>

src/backend/catalog/pg_subscription.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ GetSubscription(Oid subid, bool missing_ok)
6666
sub->owner = subform->subowner;
6767
sub->enabled = subform->subenabled;
6868
sub->binary = subform->subbinary;
69+
sub->stream = subform->substream;
6970

7071
/* Get conninfo */
7172
datum = SysCacheGetAttr(SUBSCRIPTIONOID,

src/backend/catalog/system_views.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1128,7 +1128,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
11281128

11291129
-- All columns of pg_subscription except subconninfo are readable.
11301130
REVOKE ALL ON pg_subscription FROM public;
1131-
GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, subslotname, subpublications)
1131+
GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications)
11321132
ON pg_subscription TO public;
11331133

11341134

src/backend/commands/subscriptioncmds.c

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ parse_subscription_options(List *options,
6363
bool *copy_data,
6464
char **synchronous_commit,
6565
bool *refresh,
66-
bool *binary_given, bool *binary)
66+
bool *binary_given, bool *binary,
67+
bool *streaming_given, bool *streaming)
6768
{
6869
ListCell *lc;
6970
bool connect_given = false;
@@ -99,6 +100,11 @@ parse_subscription_options(List *options,
99100
*binary_given = false;
100101
*binary = false;
101102
}
103+
if (streaming)
104+
{
105+
*streaming_given = false;
106+
*streaming = false;
107+
}
102108

103109
/* Parse options */
104110
foreach(lc, options)
@@ -194,6 +200,16 @@ parse_subscription_options(List *options,
194200
*binary_given = true;
195201
*binary = defGetBoolean(defel);
196202
}
203+
else if (strcmp(defel->defname, "streaming") == 0 && streaming)
204+
{
205+
if (*streaming_given)
206+
ereport(ERROR,
207+
(errcode(ERRCODE_SYNTAX_ERROR),
208+
errmsg("conflicting or redundant options")));
209+
210+
*streaming_given = true;
211+
*streaming = defGetBoolean(defel);
212+
}
197213
else
198214
ereport(ERROR,
199215
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -337,6 +353,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
337353
bool enabled_given;
338354
bool enabled;
339355
bool copy_data;
356+
bool streaming;
357+
bool streaming_given;
340358
char *synchronous_commit;
341359
char *conninfo;
342360
char *slotname;
@@ -360,7 +378,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
360378
&copy_data,
361379
&synchronous_commit,
362380
NULL, /* no "refresh" */
363-
&binary_given, &binary);
381+
&binary_given, &binary,
382+
&streaming_given, &streaming);
364383

365384
/*
366385
* Since creating a replication slot is not transactional, rolling back
@@ -427,6 +446,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
427446
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
428447
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled);
429448
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary);
449+
values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming);
430450
values[Anum_pg_subscription_subconninfo - 1] =
431451
CStringGetTextDatum(conninfo);
432452
if (slotname)
@@ -698,6 +718,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
698718
char *synchronous_commit;
699719
bool binary_given;
700720
bool binary;
721+
bool streaming_given;
722+
bool streaming;
701723

702724
parse_subscription_options(stmt->options,
703725
NULL, /* no "connect" */
@@ -707,7 +729,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
707729
NULL, /* no "copy_data" */
708730
&synchronous_commit,
709731
NULL, /* no "refresh" */
710-
&binary_given, &binary);
732+
&binary_given, &binary,
733+
&streaming_given, &streaming);
711734

712735
if (slotname_given)
713736
{
@@ -739,6 +762,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
739762
replaces[Anum_pg_subscription_subbinary - 1] = true;
740763
}
741764

765+
if (streaming_given)
766+
{
767+
values[Anum_pg_subscription_substream - 1] =
768+
BoolGetDatum(streaming);
769+
replaces[Anum_pg_subscription_substream - 1] = true;
770+
}
771+
742772
update_tuple = true;
743773
break;
744774
}
@@ -756,7 +786,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
756786
NULL, /* no "copy_data" */
757787
NULL, /* no "synchronous_commit" */
758788
NULL, /* no "refresh" */
759-
NULL, NULL); /* no "binary" */
789+
NULL, NULL, /* no "binary" */
790+
NULL, NULL); /* no streaming */
760791
Assert(enabled_given);
761792

762793
if (!sub->slotname && enabled)
@@ -800,8 +831,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
800831
&copy_data,
801832
NULL, /* no "synchronous_commit" */
802833
&refresh,
803-
NULL, NULL); /* no "binary" */
804-
834+
NULL, NULL, /* no "binary" */
835+
NULL, NULL); /* no "streaming" */
805836
values[Anum_pg_subscription_subpublications - 1] =
806837
publicationListToArray(stmt->publication);
807838
replaces[Anum_pg_subscription_subpublications - 1] = true;
@@ -843,7 +874,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt)
843874
&copy_data,
844875
NULL, /* no "synchronous_commit" */
845876
NULL, /* no "refresh" */
846-
NULL, NULL); /* no "binary" */
877+
NULL, NULL, /* no "binary" */
878+
NULL, NULL); /* no "streaming" */
847879

848880
AlterSubscription_refresh(sub, copy_data);
849881

src/backend/postmaster/pgstat.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4141,6 +4141,18 @@ pgstat_get_wait_io(WaitEventIO w)
41414141
case WAIT_EVENT_WAL_WRITE:
41424142
event_name = "WALWrite";
41434143
break;
4144+
case WAIT_EVENT_LOGICAL_CHANGES_READ:
4145+
event_name = "LogicalChangesRead";
4146+
break;
4147+
case WAIT_EVENT_LOGICAL_CHANGES_WRITE:
4148+
event_name = "LogicalChangesWrite";
4149+
break;
4150+
case WAIT_EVENT_LOGICAL_SUBXACT_READ:
4151+
event_name = "LogicalSubxactRead";
4152+
break;
4153+
case WAIT_EVENT_LOGICAL_SUBXACT_WRITE:
4154+
event_name = "LogicalSubxactWrite";
4155+
break;
41444156

41454157
/* no default case, so that compiler will warn */
41464158
}

src/backend/replication/libpqwalreceiver/libpqwalreceiver.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn,
425425
appendStringInfo(&cmd, "proto_version '%u'",
426426
options->proto.logical.proto_version);
427427

428+
if (options->proto.logical.streaming &&
429+
PQserverVersion(conn->streamConn) >= 140000)
430+
appendStringInfo(&cmd, ", streaming 'on'");
431+
428432
pubnames = options->proto.logical.publication_names;
429433
pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
430434
if (!pubnames_str)

0 commit comments

Comments
 (0)