Skip to content

Commit fe6a20c

Browse files
committed
Don't use Asserts to check for violations of replication protocol.
Using an Assert to check the validity of incoming messages is an extremely poor decision. In a debug build, it should not be that easy for a broken or malicious remote client to crash the logrep worker. The consequences could be even worse in non-debug builds, which will fail to make such checks at all, leading to who-knows-what misbehavior. Hence, promote every Assert that could possibly be triggered by wrong or out-of-order replication messages to a full test-and-ereport. To avoid bloating the set of messages the translation team has to cope with, establish a policy that replication protocol violation error reports don't need to be translated. Hence, all the new messages here use errmsg_internal(). A couple of old messages are changed likewise for consistency. Along the way, fix some non-idiomatic or outright wrong uses of hash_search(). Most of these mistakes are new with the "streaming replication" patch (commit 4648243), but a couple go back a long way. Back-patch as appropriate. Discussion: https://postgr.es/m/1719083.1623351052@sss.pgh.pa.us
1 parent c3652f9 commit fe6a20c

File tree

2 files changed

+85
-35
lines changed

2 files changed

+85
-35
lines changed

src/backend/replication/logical/reorderbuffer.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1703,7 +1703,7 @@ ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
17031703
ent = (ReorderBufferTupleCidEnt *)
17041704
hash_search(txn->tuplecid_hash,
17051705
(void *) &key,
1706-
HASH_ENTER | HASH_FIND,
1706+
HASH_ENTER,
17071707
&found);
17081708
if (!found)
17091709
{

src/backend/replication/logical/worker.c

Lines changed: 84 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ bool in_remote_transaction = false;
177177
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
178178

179179
/* fields valid only when processing streamed transaction */
180-
bool in_streamed_transaction = false;
180+
static bool in_streamed_transaction = false;
181181

182182
static TransactionId stream_xid = InvalidTransactionId;
183183

@@ -345,7 +345,10 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
345345
*/
346346
xid = pq_getmsgint(s, 4);
347347

348-
Assert(TransactionIdIsValid(xid));
348+
if (!TransactionIdIsValid(xid))
349+
ereport(ERROR,
350+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
351+
errmsg_internal("invalid transaction ID in streamed replication transaction")));
349352

350353
/* Add the new subxact to the array (unless already there). */
351354
subxact_info_add(xid);
@@ -785,7 +788,12 @@ apply_handle_commit(StringInfo s)
785788

786789
logicalrep_read_commit(s, &commit_data);
787790

788-
Assert(commit_data.commit_lsn == remote_final_lsn);
791+
if (commit_data.commit_lsn != remote_final_lsn)
792+
ereport(ERROR,
793+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
794+
errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
795+
LSN_FORMAT_ARGS(commit_data.commit_lsn),
796+
LSN_FORMAT_ARGS(remote_final_lsn))));
789797

790798
apply_handle_commit_internal(s, &commit_data);
791799

@@ -812,7 +820,7 @@ apply_handle_origin(StringInfo s)
812820
(IsTransactionState() && !am_tablesync_worker())))
813821
ereport(ERROR,
814822
(errcode(ERRCODE_PROTOCOL_VIOLATION),
815-
errmsg("ORIGIN message sent out of order")));
823+
errmsg_internal("ORIGIN message sent out of order")));
816824
}
817825

818826
/*
@@ -824,7 +832,10 @@ apply_handle_stream_start(StringInfo s)
824832
bool first_segment;
825833
HASHCTL hash_ctl;
826834

827-
Assert(!in_streamed_transaction);
835+
if (in_streamed_transaction)
836+
ereport(ERROR,
837+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
838+
errmsg_internal("duplicate STREAM START message")));
828839

829840
/*
830841
* Start a transaction on stream start, this transaction will be committed
@@ -841,6 +852,11 @@ apply_handle_stream_start(StringInfo s)
841852
/* extract XID of the top-level transaction */
842853
stream_xid = logicalrep_read_stream_start(s, &first_segment);
843854

855+
if (!TransactionIdIsValid(stream_xid))
856+
ereport(ERROR,
857+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
858+
errmsg_internal("invalid transaction ID in streamed replication transaction")));
859+
844860
/*
845861
* Initialize the xidhash table if we haven't yet. This will be used for
846862
* the entire duration of the apply worker so create it in permanent
@@ -873,7 +889,10 @@ apply_handle_stream_start(StringInfo s)
873889
static void
874890
apply_handle_stream_stop(StringInfo s)
875891
{
876-
Assert(in_streamed_transaction);
892+
if (!in_streamed_transaction)
893+
ereport(ERROR,
894+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
895+
errmsg_internal("STREAM STOP message without STREAM START")));
877896

878897
/*
879898
* Close the file with serialized changes, and serialize information about
@@ -905,7 +924,10 @@ apply_handle_stream_abort(StringInfo s)
905924
TransactionId xid;
906925
TransactionId subxid;
907926

908-
Assert(!in_streamed_transaction);
927+
if (in_streamed_transaction)
928+
ereport(ERROR,
929+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
930+
errmsg_internal("STREAM ABORT message without STREAM STOP")));
909931

910932
logicalrep_read_stream_abort(s, &xid, &subxid);
911933

@@ -932,7 +954,6 @@ apply_handle_stream_abort(StringInfo s)
932954
* performed rollback to savepoint for one of the earlier
933955
* sub-transaction.
934956
*/
935-
936957
int64 i;
937958
int64 subidx;
938959
BufFile *fd;
@@ -967,13 +988,15 @@ apply_handle_stream_abort(StringInfo s)
967988
return;
968989
}
969990

970-
Assert((subidx >= 0) && (subidx < subxact_data.nsubxacts));
971-
972991
ent = (StreamXidHash *) hash_search(xidhash,
973992
(void *) &xid,
974993
HASH_FIND,
975-
&found);
976-
Assert(found);
994+
NULL);
995+
if (!ent)
996+
ereport(ERROR,
997+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
998+
errmsg_internal("transaction %u not found in stream XID hash table",
999+
xid)));
9771000

9781001
/* open the changes file */
9791002
changes_filename(path, MyLogicalRepWorker->subid, xid);
@@ -1006,13 +1029,15 @@ apply_handle_stream_commit(StringInfo s)
10061029
int nchanges;
10071030
char path[MAXPGPATH];
10081031
char *buffer = NULL;
1009-
bool found;
10101032
LogicalRepCommitData commit_data;
10111033
StreamXidHash *ent;
10121034
MemoryContext oldcxt;
10131035
BufFile *fd;
10141036

1015-
Assert(!in_streamed_transaction);
1037+
if (in_streamed_transaction)
1038+
ereport(ERROR,
1039+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1040+
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
10161041

10171042
xid = logicalrep_read_stream_commit(s, &commit_data);
10181043

@@ -1031,11 +1056,17 @@ apply_handle_stream_commit(StringInfo s)
10311056
/* open the spool file for the committed transaction */
10321057
changes_filename(path, MyLogicalRepWorker->subid, xid);
10331058
elog(DEBUG1, "replaying changes from file \"%s\"", path);
1059+
10341060
ent = (StreamXidHash *) hash_search(xidhash,
10351061
(void *) &xid,
10361062
HASH_FIND,
1037-
&found);
1038-
Assert(found);
1063+
NULL);
1064+
if (!ent)
1065+
ereport(ERROR,
1066+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
1067+
errmsg_internal("transaction %u not found in stream XID hash table",
1068+
xid)));
1069+
10391070
fd = BufFileOpenShared(ent->stream_fileset, path, O_RDONLY);
10401071

10411072
buffer = palloc(BLCKSZ);
@@ -1080,7 +1111,9 @@ apply_handle_stream_commit(StringInfo s)
10801111
errmsg("could not read from streaming transaction's changes file \"%s\": %m",
10811112
path)));
10821113

1083-
Assert(len > 0);
1114+
if (len <= 0)
1115+
elog(ERROR, "incorrect length %d in streaming transaction's changes file \"%s\"",
1116+
len, path);
10841117

10851118
/* make sure we have sufficiently large buffer */
10861119
buffer = repalloc(buffer, len);
@@ -1108,7 +1141,7 @@ apply_handle_stream_commit(StringInfo s)
11081141
nchanges++;
11091142

11101143
if (nchanges % 1000 == 0)
1111-
elog(DEBUG1, "replayed %d changes from file '%s'",
1144+
elog(DEBUG1, "replayed %d changes from file \"%s\"",
11121145
nchanges, path);
11131146
}
11141147

@@ -2053,7 +2086,8 @@ apply_dispatch(StringInfo s)
20532086

20542087
ereport(ERROR,
20552088
(errcode(ERRCODE_PROTOCOL_VIOLATION),
2056-
errmsg("invalid logical replication message type \"%c\"", action)));
2089+
errmsg_internal("invalid logical replication message type \"%c\"",
2090+
action)));
20572091
}
20582092

20592093
/*
@@ -2589,20 +2623,19 @@ static void
25892623
subxact_info_write(Oid subid, TransactionId xid)
25902624
{
25912625
char path[MAXPGPATH];
2592-
bool found;
25932626
Size len;
25942627
StreamXidHash *ent;
25952628
BufFile *fd;
25962629

25972630
Assert(TransactionIdIsValid(xid));
25982631

2599-
/* find the xid entry in the xidhash */
2632+
/* Find the xid entry in the xidhash */
26002633
ent = (StreamXidHash *) hash_search(xidhash,
26012634
(void *) &xid,
26022635
HASH_FIND,
2603-
&found);
2604-
/* we must found the entry for its top transaction by this time */
2605-
Assert(found);
2636+
NULL);
2637+
/* By this time we must have created the transaction entry */
2638+
Assert(ent);
26062639

26072640
/*
26082641
* If there is no subtransaction then nothing to do, but if already have
@@ -2667,13 +2700,11 @@ static void
26672700
subxact_info_read(Oid subid, TransactionId xid)
26682701
{
26692702
char path[MAXPGPATH];
2670-
bool found;
26712703
Size len;
26722704
BufFile *fd;
26732705
StreamXidHash *ent;
26742706
MemoryContext oldctx;
26752707

2676-
Assert(TransactionIdIsValid(xid));
26772708
Assert(!subxact_data.subxacts);
26782709
Assert(subxact_data.nsubxacts == 0);
26792710
Assert(subxact_data.nsubxacts_max == 0);
@@ -2682,7 +2713,12 @@ subxact_info_read(Oid subid, TransactionId xid)
26822713
ent = (StreamXidHash *) hash_search(xidhash,
26832714
(void *) &xid,
26842715
HASH_FIND,
2685-
&found);
2716+
NULL);
2717+
if (!ent)
2718+
ereport(ERROR,
2719+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
2720+
errmsg_internal("transaction %u not found in stream XID hash table",
2721+
xid)));
26862722

26872723
/*
26882724
* If subxact_fileset is not valid that mean we don't have any subxact
@@ -2836,14 +2872,17 @@ stream_cleanup_files(Oid subid, TransactionId xid)
28362872
{
28372873
char path[MAXPGPATH];
28382874
StreamXidHash *ent;
2839-
bool found = false;
28402875

2841-
/* By this time we must have created the transaction entry */
2876+
/* Find the xid entry in the xidhash */
28422877
ent = (StreamXidHash *) hash_search(xidhash,
28432878
(void *) &xid,
28442879
HASH_FIND,
2845-
&found);
2846-
Assert(found);
2880+
NULL);
2881+
if (!ent)
2882+
ereport(ERROR,
2883+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
2884+
errmsg_internal("transaction %u not found in stream XID hash table",
2885+
xid)));
28472886

28482887
/* Delete the change file and release the stream fileset memory */
28492888
changes_filename(path, subid, xid);
@@ -2893,9 +2932,9 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
28932932
/* create or find the xid entry in the xidhash */
28942933
ent = (StreamXidHash *) hash_search(xidhash,
28952934
(void *) &xid,
2896-
HASH_ENTER | HASH_FIND,
2935+
HASH_ENTER,
28972936
&found);
2898-
Assert(first_segment || found);
2937+
28992938
changes_filename(path, subid, xid);
29002939
elog(DEBUG1, "opening file \"%s\" for streamed changes", path);
29012940

@@ -2915,6 +2954,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
29152954
MemoryContext savectx;
29162955
SharedFileSet *fileset;
29172956

2957+
if (found)
2958+
ereport(ERROR,
2959+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
2960+
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
2961+
29182962
/*
29192963
* We need to maintain shared fileset across multiple stream
29202964
* start/stop calls. So, need to allocate it in a persistent context.
@@ -2934,6 +2978,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
29342978
}
29352979
else
29362980
{
2981+
if (!found)
2982+
ereport(ERROR,
2983+
(errcode(ERRCODE_PROTOCOL_VIOLATION),
2984+
errmsg_internal("incorrect first-segment flag for streamed replication transaction")));
2985+
29372986
/*
29382987
* Open the file and seek to the end of the file because we always
29392988
* append the changes file.
@@ -3140,7 +3189,8 @@ ApplyWorkerMain(Datum main_arg)
31403189
*/
31413190
if (!myslotname)
31423191
ereport(ERROR,
3143-
(errmsg("subscription has no replication slot set")));
3192+
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
3193+
errmsg("subscription has no replication slot set")));
31443194

31453195
/* Setup replication origin tracking. */
31463196
StartTransactionCommand();

0 commit comments

Comments
 (0)