Skip to content

Commit 91f9861

Browse files
author
Amit Kapila
committed
Refactor to make common functions in proto.c and worker.c.
This is a non-functional change only to refactor code to extract some replication logic into static functions. This is done as preparation for the 2PC streaming patch which also shares this common logic. Author: Peter Smith Reviewed-By: Amit Kapila Discussion: https://postgr.es/m/CAHut+PuiSA8AiLcE2N5StzSKs46SQEP_vDOUD5fX2XCVtfZ7mQ@mail.gmail.com
1 parent 454ae15 commit 91f9861

File tree

2 files changed

+95
-48
lines changed

2 files changed

+95
-48
lines changed

src/backend/replication/logical/proto.c

+32-10
Original file line numberDiff line numberDiff line change
@@ -145,22 +145,23 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
145145
}
146146

147147
/*
148-
* Write PREPARE to the output stream.
148+
* The core functionality for logicalrep_write_prepare.
149149
*/
150-
void
151-
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
152-
XLogRecPtr prepare_lsn)
150+
static void
151+
logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
152+
ReorderBufferTXN *txn, XLogRecPtr prepare_lsn)
153153
{
154154
uint8 flags = 0;
155155

156-
pq_sendbyte(out, LOGICAL_REP_MSG_PREPARE);
156+
pq_sendbyte(out, type);
157157

158158
/*
159159
* This should only ever happen for two-phase commit transactions, in
160160
* which case we expect to have a valid GID.
161161
*/
162162
Assert(txn->gid != NULL);
163163
Assert(rbtxn_prepared(txn));
164+
Assert(TransactionIdIsValid(txn->xid));
164165

165166
/* send the flags field */
166167
pq_sendbyte(out, flags);
@@ -176,31 +177,52 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
176177
}
177178

178179
/*
179-
* Read transaction PREPARE from the stream.
180+
* Write PREPARE to the output stream.
180181
*/
181182
void
182-
logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
183+
logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
184+
XLogRecPtr prepare_lsn)
185+
{
186+
logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_PREPARE,
187+
txn, prepare_lsn);
188+
}
189+
190+
/*
191+
* The core functionality for logicalrep_read_prepare.
192+
*/
193+
static void
194+
logicalrep_read_prepare_common(StringInfo in, char *msgtype,
195+
LogicalRepPreparedTxnData *prepare_data)
183196
{
184197
/* read flags */
185198
uint8 flags = pq_getmsgbyte(in);
186199

187200
if (flags != 0)
188-
elog(ERROR, "unrecognized flags %u in prepare message", flags);
201+
elog(ERROR, "unrecognized flags %u in %s message", flags, msgtype);
189202

190203
/* read fields */
191204
prepare_data->prepare_lsn = pq_getmsgint64(in);
192205
if (prepare_data->prepare_lsn == InvalidXLogRecPtr)
193-
elog(ERROR, "prepare_lsn is not set in prepare message");
206+
elog(ERROR, "prepare_lsn is not set in %s message", msgtype);
194207
prepare_data->end_lsn = pq_getmsgint64(in);
195208
if (prepare_data->end_lsn == InvalidXLogRecPtr)
196-
elog(ERROR, "end_lsn is not set in prepare message");
209+
elog(ERROR, "end_lsn is not set in %s message", msgtype);
197210
prepare_data->prepare_time = pq_getmsgint64(in);
198211
prepare_data->xid = pq_getmsgint(in, 4);
199212

200213
/* read gid (copy it into a pre-allocated buffer) */
201214
strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
202215
}
203216

217+
/*
218+
* Read transaction PREPARE from the stream.
219+
*/
220+
void
221+
logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
222+
{
223+
logicalrep_read_prepare_common(in, "prepare", prepare_data);
224+
}
225+
204226
/*
205227
* Write COMMIT PREPARED to the output stream.
206228
*/

src/backend/replication/logical/worker.c

+63-38
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,8 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata,
333333
/* Compute GID for two_phase transactions */
334334
static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid);
335335

336+
/* Common streaming function to apply all the spooled messages */
337+
static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
336338

337339
/*
338340
* Should this worker apply changes for given relation.
@@ -884,14 +886,47 @@ apply_handle_begin_prepare(StringInfo s)
884886
pgstat_report_activity(STATE_RUNNING, NULL);
885887
}
886888

889+
/*
890+
* Common function to prepare the GID.
891+
*/
892+
static void
893+
apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data)
894+
{
895+
char gid[GIDSIZE];
896+
897+
/*
898+
* Compute unique GID for two_phase transactions. We don't use GID of
899+
* prepared transaction sent by server as that can lead to deadlock when
900+
* we have multiple subscriptions from same node point to publications on
901+
* the same node. See comments atop worker.c
902+
*/
903+
TwoPhaseTransactionGid(MySubscription->oid, prepare_data->xid,
904+
gid, sizeof(gid));
905+
906+
/*
907+
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
908+
* called within the PrepareTransactionBlock below.
909+
*/
910+
BeginTransactionBlock();
911+
CommitTransactionCommand(); /* Completes the preceding Begin command. */
912+
913+
/*
914+
* Update origin state so we can restart streaming from correct position
915+
* in case of crash.
916+
*/
917+
replorigin_session_origin_lsn = prepare_data->end_lsn;
918+
replorigin_session_origin_timestamp = prepare_data->prepare_time;
919+
920+
PrepareTransactionBlock(gid);
921+
}
922+
887923
/*
888924
* Handle PREPARE message.
889925
*/
890926
static void
891927
apply_handle_prepare(StringInfo s)
892928
{
893929
LogicalRepPreparedTxnData prepare_data;
894-
char gid[GIDSIZE];
895930

896931
logicalrep_read_prepare(s, &prepare_data);
897932

@@ -902,15 +937,6 @@ apply_handle_prepare(StringInfo s)
902937
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
903938
LSN_FORMAT_ARGS(remote_final_lsn))));
904939

905-
/*
906-
* Compute unique GID for two_phase transactions. We don't use GID of
907-
* prepared transaction sent by server as that can lead to deadlock when
908-
* we have multiple subscriptions from same node point to publications on
909-
* the same node. See comments atop worker.c
910-
*/
911-
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
912-
gid, sizeof(gid));
913-
914940
/*
915941
* Unlike commit, here, we always prepare the transaction even though no
916942
* change has happened in this transaction. It is done this way because at
@@ -923,21 +949,8 @@ apply_handle_prepare(StringInfo s)
923949
*/
924950
begin_replication_step();
925951

926-
/*
927-
* BeginTransactionBlock is necessary to balance the EndTransactionBlock
928-
* called within the PrepareTransactionBlock below.
929-
*/
930-
BeginTransactionBlock();
931-
CommitTransactionCommand(); /* Completes the preceding Begin command. */
932-
933-
/*
934-
* Update origin state so we can restart streaming from correct position
935-
* in case of crash.
936-
*/
937-
replorigin_session_origin_lsn = prepare_data.end_lsn;
938-
replorigin_session_origin_timestamp = prepare_data.prepare_time;
952+
apply_handle_prepare_internal(&prepare_data);
939953

940-
PrepareTransactionBlock(gid);
941954
end_replication_step();
942955
CommitTransactionCommand();
943956
pgstat_report_stat(false);
@@ -1256,30 +1269,19 @@ apply_handle_stream_abort(StringInfo s)
12561269
}
12571270

12581271
/*
1259-
* Handle STREAM COMMIT message.
1272+
* Common spoolfile processing.
12601273
*/
12611274
static void
1262-
apply_handle_stream_commit(StringInfo s)
1275+
apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
12631276
{
1264-
TransactionId xid;
12651277
StringInfoData s2;
12661278
int nchanges;
12671279
char path[MAXPGPATH];
12681280
char *buffer = NULL;
1269-
LogicalRepCommitData commit_data;
12701281
StreamXidHash *ent;
12711282
MemoryContext oldcxt;
12721283
BufFile *fd;
12731284

1274-
if (in_streamed_transaction)
1275-
ereport(ERROR,
1276-
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1277-
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1278-
1279-
xid = logicalrep_read_stream_commit(s, &commit_data);
1280-
1281-
elog(DEBUG1, "received commit for streamed transaction %u", xid);
1282-
12831285
/* Make sure we have an open transaction */
12841286
begin_replication_step();
12851287

@@ -1311,7 +1313,7 @@ apply_handle_stream_commit(StringInfo s)
13111313

13121314
MemoryContextSwitchTo(oldcxt);
13131315

1314-
remote_final_lsn = commit_data.commit_lsn;
1316+
remote_final_lsn = lsn;
13151317

13161318
/*
13171319
* Make sure the handle apply_dispatch methods are aware we're in a remote
@@ -1390,6 +1392,29 @@ apply_handle_stream_commit(StringInfo s)
13901392
elog(DEBUG1, "replayed %d (all) changes from file \"%s\"",
13911393
nchanges, path);
13921394

1395+
return;
1396+
}
1397+
1398+
/*
1399+
* Handle STREAM COMMIT message.
1400+
*/
1401+
static void
1402+
apply_handle_stream_commit(StringInfo s)
1403+
{
1404+
TransactionId xid;
1405+
LogicalRepCommitData commit_data;
1406+
1407+
if (in_streamed_transaction)
1408+
ereport(ERROR,
1409+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1410+
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
1411+
1412+
xid = logicalrep_read_stream_commit(s, &commit_data);
1413+
1414+
elog(DEBUG1, "received commit for streamed transaction %u", xid);
1415+
1416+
apply_spooled_messages(xid, commit_data.commit_lsn);
1417+
13931418
apply_handle_commit_internal(s, &commit_data);
13941419

13951420
/* unlink the files with serialized changes and subxact info */

0 commit comments

Comments
 (0)