@@ -177,7 +177,7 @@ bool in_remote_transaction = false;
177
177
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr ;
178
178
179
179
/* fields valid only when processing streamed transaction */
180
- bool in_streamed_transaction = false;
180
+ static bool in_streamed_transaction = false;
181
181
182
182
static TransactionId stream_xid = InvalidTransactionId ;
183
183
@@ -345,7 +345,10 @@ handle_streamed_transaction(LogicalRepMsgType action, StringInfo s)
345
345
*/
346
346
xid = pq_getmsgint (s , 4 );
347
347
348
- Assert (TransactionIdIsValid (xid ));
348
+ if (!TransactionIdIsValid (xid ))
349
+ ereport (ERROR ,
350
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
351
+ errmsg_internal ("invalid transaction ID in streamed replication transaction" )));
349
352
350
353
/* Add the new subxact to the array (unless already there). */
351
354
subxact_info_add (xid );
@@ -785,7 +788,12 @@ apply_handle_commit(StringInfo s)
785
788
786
789
logicalrep_read_commit (s , & commit_data );
787
790
788
- Assert (commit_data .commit_lsn == remote_final_lsn );
791
+ if (commit_data .commit_lsn != remote_final_lsn )
792
+ ereport (ERROR ,
793
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
794
+ errmsg_internal ("incorrect commit LSN %X/%X in commit message (expected %X/%X)" ,
795
+ LSN_FORMAT_ARGS (commit_data .commit_lsn ),
796
+ LSN_FORMAT_ARGS (remote_final_lsn ))));
789
797
790
798
apply_handle_commit_internal (s , & commit_data );
791
799
@@ -812,7 +820,7 @@ apply_handle_origin(StringInfo s)
812
820
(IsTransactionState () && !am_tablesync_worker ())))
813
821
ereport (ERROR ,
814
822
(errcode (ERRCODE_PROTOCOL_VIOLATION ),
815
- errmsg ("ORIGIN message sent out of order" )));
823
+ errmsg_internal ("ORIGIN message sent out of order" )));
816
824
}
817
825
818
826
/*
@@ -824,7 +832,10 @@ apply_handle_stream_start(StringInfo s)
824
832
bool first_segment ;
825
833
HASHCTL hash_ctl ;
826
834
827
- Assert (!in_streamed_transaction );
835
+ if (in_streamed_transaction )
836
+ ereport (ERROR ,
837
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
838
+ errmsg_internal ("duplicate STREAM START message" )));
828
839
829
840
/*
830
841
* Start a transaction on stream start, this transaction will be committed
@@ -841,6 +852,11 @@ apply_handle_stream_start(StringInfo s)
841
852
/* extract XID of the top-level transaction */
842
853
stream_xid = logicalrep_read_stream_start (s , & first_segment );
843
854
855
+ if (!TransactionIdIsValid (stream_xid ))
856
+ ereport (ERROR ,
857
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
858
+ errmsg_internal ("invalid transaction ID in streamed replication transaction" )));
859
+
844
860
/*
845
861
* Initialize the xidhash table if we haven't yet. This will be used for
846
862
* the entire duration of the apply worker so create it in permanent
@@ -873,7 +889,10 @@ apply_handle_stream_start(StringInfo s)
873
889
static void
874
890
apply_handle_stream_stop (StringInfo s )
875
891
{
876
- Assert (in_streamed_transaction );
892
+ if (!in_streamed_transaction )
893
+ ereport (ERROR ,
894
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
895
+ errmsg_internal ("STREAM STOP message without STREAM START" )));
877
896
878
897
/*
879
898
* Close the file with serialized changes, and serialize information about
@@ -905,7 +924,10 @@ apply_handle_stream_abort(StringInfo s)
905
924
TransactionId xid ;
906
925
TransactionId subxid ;
907
926
908
- Assert (!in_streamed_transaction );
927
+ if (in_streamed_transaction )
928
+ ereport (ERROR ,
929
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
930
+ errmsg_internal ("STREAM ABORT message without STREAM STOP" )));
909
931
910
932
logicalrep_read_stream_abort (s , & xid , & subxid );
911
933
@@ -932,7 +954,6 @@ apply_handle_stream_abort(StringInfo s)
932
954
* performed rollback to savepoint for one of the earlier
933
955
* sub-transaction.
934
956
*/
935
-
936
957
int64 i ;
937
958
int64 subidx ;
938
959
BufFile * fd ;
@@ -967,13 +988,15 @@ apply_handle_stream_abort(StringInfo s)
967
988
return ;
968
989
}
969
990
970
- Assert ((subidx >= 0 ) && (subidx < subxact_data .nsubxacts ));
971
-
972
991
ent = (StreamXidHash * ) hash_search (xidhash ,
973
992
(void * ) & xid ,
974
993
HASH_FIND ,
975
- & found );
976
- Assert (found );
994
+ NULL );
995
+ if (!ent )
996
+ ereport (ERROR ,
997
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
998
+ errmsg_internal ("transaction %u not found in stream XID hash table" ,
999
+ xid )));
977
1000
978
1001
/* open the changes file */
979
1002
changes_filename (path , MyLogicalRepWorker -> subid , xid );
@@ -1006,13 +1029,15 @@ apply_handle_stream_commit(StringInfo s)
1006
1029
int nchanges ;
1007
1030
char path [MAXPGPATH ];
1008
1031
char * buffer = NULL ;
1009
- bool found ;
1010
1032
LogicalRepCommitData commit_data ;
1011
1033
StreamXidHash * ent ;
1012
1034
MemoryContext oldcxt ;
1013
1035
BufFile * fd ;
1014
1036
1015
- Assert (!in_streamed_transaction );
1037
+ if (in_streamed_transaction )
1038
+ ereport (ERROR ,
1039
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
1040
+ errmsg_internal ("STREAM COMMIT message without STREAM STOP" )));
1016
1041
1017
1042
xid = logicalrep_read_stream_commit (s , & commit_data );
1018
1043
@@ -1031,11 +1056,17 @@ apply_handle_stream_commit(StringInfo s)
1031
1056
/* open the spool file for the committed transaction */
1032
1057
changes_filename (path , MyLogicalRepWorker -> subid , xid );
1033
1058
elog (DEBUG1 , "replaying changes from file \"%s\"" , path );
1059
+
1034
1060
ent = (StreamXidHash * ) hash_search (xidhash ,
1035
1061
(void * ) & xid ,
1036
1062
HASH_FIND ,
1037
- & found );
1038
- Assert (found );
1063
+ NULL );
1064
+ if (!ent )
1065
+ ereport (ERROR ,
1066
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
1067
+ errmsg_internal ("transaction %u not found in stream XID hash table" ,
1068
+ xid )));
1069
+
1039
1070
fd = BufFileOpenShared (ent -> stream_fileset , path , O_RDONLY );
1040
1071
1041
1072
buffer = palloc (BLCKSZ );
@@ -1080,7 +1111,9 @@ apply_handle_stream_commit(StringInfo s)
1080
1111
errmsg ("could not read from streaming transaction's changes file \"%s\": %m" ,
1081
1112
path )));
1082
1113
1083
- Assert (len > 0 );
1114
+ if (len <= 0 )
1115
+ elog (ERROR , "incorrect length %d in streaming transaction's changes file \"%s\"" ,
1116
+ len , path );
1084
1117
1085
1118
/* make sure we have sufficiently large buffer */
1086
1119
buffer = repalloc (buffer , len );
@@ -1108,7 +1141,7 @@ apply_handle_stream_commit(StringInfo s)
1108
1141
nchanges ++ ;
1109
1142
1110
1143
if (nchanges % 1000 == 0 )
1111
- elog (DEBUG1 , "replayed %d changes from file '%s' " ,
1144
+ elog (DEBUG1 , "replayed %d changes from file \"%s\" " ,
1112
1145
nchanges , path );
1113
1146
}
1114
1147
@@ -2053,7 +2086,8 @@ apply_dispatch(StringInfo s)
2053
2086
2054
2087
ereport (ERROR ,
2055
2088
(errcode (ERRCODE_PROTOCOL_VIOLATION ),
2056
- errmsg ("invalid logical replication message type \"%c\"" , action )));
2089
+ errmsg_internal ("invalid logical replication message type \"%c\"" ,
2090
+ action )));
2057
2091
}
2058
2092
2059
2093
/*
@@ -2589,20 +2623,19 @@ static void
2589
2623
subxact_info_write (Oid subid , TransactionId xid )
2590
2624
{
2591
2625
char path [MAXPGPATH ];
2592
- bool found ;
2593
2626
Size len ;
2594
2627
StreamXidHash * ent ;
2595
2628
BufFile * fd ;
2596
2629
2597
2630
Assert (TransactionIdIsValid (xid ));
2598
2631
2599
- /* find the xid entry in the xidhash */
2632
+ /* Find the xid entry in the xidhash */
2600
2633
ent = (StreamXidHash * ) hash_search (xidhash ,
2601
2634
(void * ) & xid ,
2602
2635
HASH_FIND ,
2603
- & found );
2604
- /* we must found the entry for its top transaction by this time */
2605
- Assert (found );
2636
+ NULL );
2637
+ /* By this time we must have created the transaction entry */
2638
+ Assert (ent );
2606
2639
2607
2640
/*
2608
2641
* If there is no subtransaction then nothing to do, but if already have
@@ -2667,13 +2700,11 @@ static void
2667
2700
subxact_info_read (Oid subid , TransactionId xid )
2668
2701
{
2669
2702
char path [MAXPGPATH ];
2670
- bool found ;
2671
2703
Size len ;
2672
2704
BufFile * fd ;
2673
2705
StreamXidHash * ent ;
2674
2706
MemoryContext oldctx ;
2675
2707
2676
- Assert (TransactionIdIsValid (xid ));
2677
2708
Assert (!subxact_data .subxacts );
2678
2709
Assert (subxact_data .nsubxacts == 0 );
2679
2710
Assert (subxact_data .nsubxacts_max == 0 );
@@ -2682,7 +2713,12 @@ subxact_info_read(Oid subid, TransactionId xid)
2682
2713
ent = (StreamXidHash * ) hash_search (xidhash ,
2683
2714
(void * ) & xid ,
2684
2715
HASH_FIND ,
2685
- & found );
2716
+ NULL );
2717
+ if (!ent )
2718
+ ereport (ERROR ,
2719
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
2720
+ errmsg_internal ("transaction %u not found in stream XID hash table" ,
2721
+ xid )));
2686
2722
2687
2723
/*
2688
2724
* If subxact_fileset is not valid that mean we don't have any subxact
@@ -2836,14 +2872,17 @@ stream_cleanup_files(Oid subid, TransactionId xid)
2836
2872
{
2837
2873
char path [MAXPGPATH ];
2838
2874
StreamXidHash * ent ;
2839
- bool found = false;
2840
2875
2841
- /* By this time we must have created the transaction entry */
2876
+ /* Find the xid entry in the xidhash */
2842
2877
ent = (StreamXidHash * ) hash_search (xidhash ,
2843
2878
(void * ) & xid ,
2844
2879
HASH_FIND ,
2845
- & found );
2846
- Assert (found );
2880
+ NULL );
2881
+ if (!ent )
2882
+ ereport (ERROR ,
2883
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
2884
+ errmsg_internal ("transaction %u not found in stream XID hash table" ,
2885
+ xid )));
2847
2886
2848
2887
/* Delete the change file and release the stream fileset memory */
2849
2888
changes_filename (path , subid , xid );
@@ -2893,9 +2932,9 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2893
2932
/* create or find the xid entry in the xidhash */
2894
2933
ent = (StreamXidHash * ) hash_search (xidhash ,
2895
2934
(void * ) & xid ,
2896
- HASH_ENTER | HASH_FIND ,
2935
+ HASH_ENTER ,
2897
2936
& found );
2898
- Assert ( first_segment || found );
2937
+
2899
2938
changes_filename (path , subid , xid );
2900
2939
elog (DEBUG1 , "opening file \"%s\" for streamed changes" , path );
2901
2940
@@ -2915,6 +2954,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2915
2954
MemoryContext savectx ;
2916
2955
SharedFileSet * fileset ;
2917
2956
2957
+ if (found )
2958
+ ereport (ERROR ,
2959
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
2960
+ errmsg_internal ("incorrect first-segment flag for streamed replication transaction" )));
2961
+
2918
2962
/*
2919
2963
* We need to maintain shared fileset across multiple stream
2920
2964
* start/stop calls. So, need to allocate it in a persistent context.
@@ -2934,6 +2978,11 @@ stream_open_file(Oid subid, TransactionId xid, bool first_segment)
2934
2978
}
2935
2979
else
2936
2980
{
2981
+ if (!found )
2982
+ ereport (ERROR ,
2983
+ (errcode (ERRCODE_PROTOCOL_VIOLATION ),
2984
+ errmsg_internal ("incorrect first-segment flag for streamed replication transaction" )));
2985
+
2937
2986
/*
2938
2987
* Open the file and seek to the end of the file because we always
2939
2988
* append the changes file.
@@ -3140,7 +3189,8 @@ ApplyWorkerMain(Datum main_arg)
3140
3189
*/
3141
3190
if (!myslotname )
3142
3191
ereport (ERROR ,
3143
- (errmsg ("subscription has no replication slot set" )));
3192
+ (errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
3193
+ errmsg ("subscription has no replication slot set" )));
3144
3194
3145
3195
/* Setup replication origin tracking. */
3146
3196
StartTransactionCommand ();
0 commit comments