@@ -67,7 +67,7 @@ typedef struct
67
67
{
68
68
TransactionId xid ;
69
69
int count ;
70
- } ExternalTransaction ;
70
+ } LocalTransaction ;
71
71
72
72
#define DTM_SHMEM_SIZE (1024*1024)
73
73
#define DTM_HASH_SIZE 1003
@@ -77,6 +77,9 @@ void _PG_fini(void);
77
77
78
78
PG_MODULE_MAGIC ;
79
79
80
+ PG_FUNCTION_INFO_V1 (mm_start_replication );
81
+ PG_FUNCTION_INFO_V1 (mm_stop_replication );
82
+
80
83
static Snapshot DtmGetSnapshot (Snapshot snapshot );
81
84
static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
82
85
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
@@ -103,10 +106,11 @@ static void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
103
106
static void ByteBufferAppendInt32 (ByteBuffer * buf , int data );
104
107
static void ByteBufferFree (ByteBuffer * buf );
105
108
109
+ static void MMMarkTransAsLocal (TransactionId xid );
106
110
107
111
static shmem_startup_hook_type prev_shmem_startup_hook ;
108
112
static HTAB * xid_in_doubt ;
109
- static HTAB * external_trans ;
113
+ static HTAB * local_trans ;
110
114
static DtmState * dtm ;
111
115
static Snapshot CurrentTransactionSnapshot ;
112
116
@@ -128,9 +132,10 @@ static TransactionManager DtmTM = {
128
132
DtmDetectGlobalDeadLock
129
133
};
130
134
131
- static char * MultimasterConnStrs ;
132
- static int MultimasterNodeId ;
133
- static int MultimasterNodes ;
135
+ static char * MMConnStrs ;
136
+ static int MMNodeId ;
137
+ static int MMNodes ;
138
+ static bool MMDoReplication = true;
134
139
135
140
static char * DtmHost ;
136
141
static int DtmPort ;
@@ -145,8 +150,8 @@ static BackgroundWorker DtmWorker = {
145
150
};
146
151
147
152
#define XTM_TRACE (fmt , ...)
148
- // #define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
149
- #define XTM_INFO (fmt , ...)
153
+ #define XTM_INFO (fmt , ...) fprintf(stderr, fmt, ## __VA_ARGS__)
154
+ // #define XTM_INFO(fmt, ...)
150
155
151
156
static void DumpSnapshot (Snapshot s , char * name )
152
157
{
@@ -697,7 +702,7 @@ static void DtmInitialize()
697
702
dtm -> xidLock = LWLockAssign ();
698
703
dtm -> nReservedXids = 0 ;
699
704
dtm -> minXid = InvalidTransactionId ;
700
- dtm -> nNodes = MultimasterNodes ;
705
+ dtm -> nNodes = MMNodes ;
701
706
RegisterXactCallback (DtmXactCallback , NULL );
702
707
}
703
708
LWLockRelease (AddinShmemInitLock );
@@ -714,11 +719,11 @@ static void DtmInitialize()
714
719
);
715
720
716
721
info .keysize = sizeof (TransactionId );
717
- info .entrysize = sizeof (ExternalTransaction );
722
+ info .entrysize = sizeof (LocalTransaction );
718
723
info .hash = dtm_xid_hash_fn ;
719
724
info .match = dtm_xid_match_fn ;
720
- external_trans = ShmemInitHash (
721
- "external_trans " ,
725
+ local_trans = ShmemInitHash (
726
+ "local_trans " ,
722
727
DTM_HASH_SIZE , DTM_HASH_SIZE ,
723
728
& info ,
724
729
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
@@ -734,14 +739,19 @@ DtmXactCallback(XactEvent event, void *arg)
734
739
XTM_INFO ("%d: DtmXactCallbackevent=%d nextxid=%d\n" , getpid (), event , DtmNextXid );
735
740
switch (event )
736
741
{
737
- case XACT_EVENT_START :
738
- if (MyProc && MyProc -> backendId != InvalidBackendId ) {
739
- printf ("getpid=%d, MyProc=%d, MyProc->backendId=%d\n" , getpid (), MyProc -> pid , MyProc -> backendId );
740
- MultimasterBeginTransaction ();
742
+ case XACT_EVENT_START :
743
+ if (MyBackendId != InvalidBackendId && MMDoReplication ) {
744
+ printf ("getpid=%d, backendId=%d\n" , getpid (), MyBackendId );
745
+ MMBeginTransaction ();
746
+ }
747
+ break ;
748
+ case XACT_EVENT_PRE_COMMIT :
749
+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
750
+ if (!MMDoReplication && TransactionIdIsValid (GetCurrentTransactionIdIfAny ())) {
751
+ MMMarkTransAsLocal (GetCurrentTransactionIdIfAny ());
741
752
}
742
753
break ;
743
- case XACT_EVENT_COMMIT :
744
- case XACT_EVENT_ABORT :
754
+ case XACT_EVENT_ABORT :
745
755
if (TransactionIdIsValid (DtmNextXid ))
746
756
{
747
757
if (event == XACT_EVENT_COMMIT )
@@ -862,7 +872,7 @@ _PG_init(void)
862
872
"multimaster.conn_strings" ,
863
873
"Multimaster node connection strings separated by commas, i.e. 'replication=database dbname=postgres host=localhost port=5001,replication=database dbname=postgres host=localhost port=5002'" ,
864
874
NULL ,
865
- & MultimasterConnStrs ,
875
+ & MMConnStrs ,
866
876
"" ,
867
877
PGC_POSTMASTER , // context
868
878
0 , // flags,
@@ -875,7 +885,7 @@ _PG_init(void)
875
885
"multimaster.node_id" ,
876
886
"Multimaster node ID" ,
877
887
NULL ,
878
- & MultimasterNodeId ,
888
+ & MMNodeId ,
879
889
1 ,
880
890
1 ,
881
891
INT_MAX ,
@@ -886,7 +896,7 @@ _PG_init(void)
886
896
NULL
887
897
);
888
898
889
- MultimasterNodes = LogicalReplicationStartReceivers ( MultimasterConnStrs , MultimasterNodeId );
899
+ MMNodes = MMStartReceivers ( MMConnStrs , MMNodeId );
890
900
891
901
if (DtmBufferSize != 0 )
892
902
{
@@ -924,10 +934,10 @@ static void DtmShmemStartup(void)
924
934
* ***************************************************************************
925
935
*/
926
936
927
- void MultimasterBeginTransaction (void )
937
+ void MMBeginTransaction (void )
928
938
{
929
939
if (TransactionIdIsValid (DtmNextXid ))
930
- elog (ERROR , "MultimasterBeginTransaction should be called only once for global transaction" );
940
+ elog (ERROR , "MMBeginTransaction should be called only once for global transaction" );
931
941
if (dtm == NULL )
932
942
elog (ERROR , "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf" );
933
943
DtmNextXid = DtmGlobalStartTransaction (& DtmSnapshot , & dtm -> minXid );
@@ -939,10 +949,8 @@ void MultimasterBeginTransaction(void)
939
949
DtmLastSnapshot = NULL ;
940
950
}
941
951
942
- void MultimasterJoinTransaction (TransactionId xid )
952
+ void MMJoinTransaction (TransactionId xid )
943
953
{
944
- ExternalTransaction * et ;
945
-
946
954
if (TransactionIdIsValid (DtmNextXid ))
947
955
elog (ERROR , "dtm_begin/join_transaction should be called only once for global transaction" );
948
956
DtmNextXid = xid ;
@@ -955,28 +963,52 @@ void MultimasterJoinTransaction(TransactionId xid)
955
963
DtmHasGlobalSnapshot = true;
956
964
DtmLastSnapshot = NULL ;
957
965
966
+ MMMarkTransAsLocal (DtmNextXid );
967
+ }
968
+
969
+
970
+ void MMMarkTransAsLocal (TransactionId xid )
971
+ {
972
+ LocalTransaction * lt ;
973
+
974
+ Assert (TransactionIdIsValid (xid ));
958
975
LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
959
- et = hash_search (external_trans , & DtmNextXid , HASH_ENTER , NULL );
960
- et -> count = dtm -> nNodes - 1 ;
976
+ lt = hash_search (local_trans , & xid , HASH_ENTER , NULL );
977
+ lt -> count = dtm -> nNodes - 1 ;
961
978
LWLockRelease (dtm -> hashLock );
962
979
}
963
980
964
- bool MultimasterIsExternalTransaction (TransactionId xid )
981
+ bool MMIsLocalTransaction (TransactionId xid )
965
982
{
966
- ExternalTransaction * et ;
983
+ LocalTransaction * lt ;
967
984
bool result = false;
968
985
LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
969
- et = hash_search (external_trans , & xid , HASH_FIND , NULL );
970
- if (et != NULL ) {
986
+ lt = hash_search (local_trans , & xid , HASH_FIND , NULL );
987
+ if (lt != NULL ) {
971
988
result = true;
972
- if (-- et -> count == 0 ) {
973
- hash_search (external_trans , & xid , HASH_REMOVE , NULL );
989
+ if (-- lt -> count == 0 ) {
990
+ hash_search (local_trans , & xid , HASH_REMOVE , NULL );
974
991
}
975
992
}
976
993
LWLockRelease (dtm -> hashLock );
977
994
return result ;
978
995
}
979
996
997
+ Datum
998
+ mm_start_replication (PG_FUNCTION_ARGS )
999
+ {
1000
+ MMDoReplication = true;
1001
+ PG_RETURN_VOID ();
1002
+ }
1003
+
1004
+ Datum
1005
+ mm_stop_replication (PG_FUNCTION_ARGS )
1006
+ {
1007
+ MMDoReplication = false;
1008
+ PG_RETURN_VOID ();
1009
+ }
1010
+
1011
+
980
1012
981
1013
982
1014
void DtmBackgroundWorker (Datum arg )
0 commit comments