44
44
#include "sockhub/sockhub.h"
45
45
46
46
#include "libdtm.h"
47
+ #include "multimaster.h"
47
48
48
49
typedef struct
49
50
{
@@ -52,6 +53,7 @@ typedef struct
52
53
TransactionId minXid ; /* XID of oldest transaction visible by any active transaction (local or global) */
53
54
TransactionId nextXid ; /* next XID for local transaction */
54
55
size_t nReservedXids ; /* number of XIDs reserved for local transactions */
56
+ int nNodes ;
55
57
} DtmState ;
56
58
57
59
typedef struct
@@ -61,19 +63,18 @@ typedef struct
61
63
int used ;
62
64
} ByteBuffer ;
63
65
66
+ typedef struct
67
+ {
68
+ TransactionId xid ;
69
+ int count ;
70
+ } ExternalTransaction ;
64
71
65
72
#define DTM_SHMEM_SIZE (1024*1024)
66
73
#define DTM_HASH_SIZE 1003
67
74
68
75
void _PG_init (void );
69
76
void _PG_fini (void );
70
77
71
- extern void LogicalReplicationStartReceivers (char * nodes , int node_id );
72
- extern void LogicalReplicationBroadcastXid (TransactonId Xid );
73
-
74
- void MultimasterBeginTransaction (void );
75
- void MultimasterJoinTransaction (TransactionId xid );
76
-
77
78
static Snapshot DtmGetSnapshot (Snapshot snapshot );
78
79
static void DtmMergeWithGlobalSnapshot (Snapshot snapshot );
79
80
static XidStatus DtmGetTransactionStatus (TransactionId xid , XLogRecPtr * lsn );
@@ -103,6 +104,7 @@ static void ByteBufferFree(ByteBuffer* buf);
103
104
104
105
static shmem_startup_hook_type prev_shmem_startup_hook ;
105
106
static HTAB * xid_in_doubt ;
107
+ static HTAB * external_trans ;
106
108
static DtmState * dtm ;
107
109
static Snapshot CurrentTransactionSnapshot ;
108
110
@@ -126,11 +128,14 @@ static TransactionManager DtmTM = {
126
128
127
129
static char * MultimasterConnStrs ;
128
130
static int MultimasterNodeId ;
131
+ static int MultimasterNodes ;
129
132
130
133
static char * DtmHost ;
131
134
static int DtmPort ;
132
135
static int DtmBufferSize ;
133
136
137
+ bool isBackgroundWorker ;
138
+
134
139
static BackgroundWorker DtmWorker = {
135
140
"DtmWorker" ,
136
141
0 , /* do not need connection to the database */
@@ -694,6 +699,7 @@ static void DtmInitialize()
694
699
dtm -> xidLock = LWLockAssign ();
695
700
dtm -> nReservedXids = 0 ;
696
701
dtm -> minXid = InvalidTransactionId ;
702
+ dtm -> nNodes = MultimasterNodes ;
697
703
RegisterXactCallback (DtmXactCallback , NULL );
698
704
}
699
705
LWLockRelease (AddinShmemInitLock );
@@ -709,6 +715,17 @@ static void DtmInitialize()
709
715
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
710
716
);
711
717
718
+ info .keysize = sizeof (TransactionId );
719
+ info .entrysize = sizeof (ExternalTransaction );
720
+ info .hash = dtm_xid_hash_fn ;
721
+ info .match = dtm_xid_match_fn ;
722
+ external_trans = ShmemInitHash (
723
+ "external_trans" ,
724
+ DTM_HASH_SIZE , DTM_HASH_SIZE ,
725
+ & info ,
726
+ HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
727
+ );
728
+
712
729
713
730
TM = & DtmTM ;
714
731
}
@@ -720,7 +737,9 @@ DtmXactCallback(XactEvent event, void *arg)
720
737
switch (event )
721
738
{
722
739
case XACT_EVENT_BEGIN :
723
- MultimasterBeginTransaction ();
740
+ if (!isBackgroundWorker ) {
741
+ MultimasterBeginTransaction ();
742
+ }
724
743
break ;
725
744
case XACT_EVENT_COMMIT :
726
745
case XACT_EVENT_ABORT :
@@ -865,7 +884,7 @@ _PG_init(void)
865
884
NULL
866
885
);
867
886
868
- LogicalReplicationStartReceivers (MultimasterConnStrs , MultimasterNodeId );
887
+ MultimasterNodes = LogicalReplicationStartReceivers (MultimasterConnStrs , MultimasterNodeId );
869
888
870
889
if (DtmBufferSize != 0 )
871
890
{
@@ -931,10 +950,8 @@ dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
931
950
932
951
void MultimasterBeginTransaction (void )
933
952
{
934
- if (TransactionIdIsValid (DtmNextXid )) {
935
- /* slave transaction */
936
- return ;
937
- }
953
+ if (TransactionIdIsValid (DtmNextXid ))
954
+ elog (ERROR , "MultimasterBeginTransaction should be called only once for global transaction" );
938
955
if (dtm == NULL )
939
956
elog (ERROR , "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf" );
940
957
DtmNextXid = DtmGlobalStartTransaction (& DtmSnapshot , & dtm -> minXid );
@@ -944,12 +961,12 @@ void MultimasterBeginTransaction(void)
944
961
945
962
DtmHasGlobalSnapshot = true;
946
963
DtmLastSnapshot = NULL ;
947
-
948
- LogicalReplicationBroadcastXid (DtmNextXid );
949
964
}
950
965
951
966
void MultimasterJoinTransaction (TransactionId xid )
952
967
{
968
+ ExternalTrans * et ;
969
+
953
970
if (TransactionIdIsValid (DtmNextXid ))
954
971
elog (ERROR , "dtm_begin/join_transaction should be called only once for global transaction" );
955
972
DtmNextXid = xid ;
@@ -961,14 +978,39 @@ void MultimasterJoinTransaction(TransactionId xid)
961
978
962
979
DtmHasGlobalSnapshot = true;
963
980
DtmLastSnapshot = NULL ;
981
+
982
+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
983
+ et = hash_search (external_trans , & DtmNextXid , HASH_ENTER , NULL );
984
+ et -> count = dtm -> nNodes - 1 ;
985
+ LWLockRelease (dtm -> hashLock );
964
986
}
965
987
988
+ bool MultimasterIsExternalTransaction (TransactionId xid )
989
+ {
990
+ ExternalTrans * et ;
991
+ bool result = false;
992
+ LWLockAcquire (dtm -> hashLock , LW_EXCLUSIVE );
993
+ et = hash_search (external_trans , & DtmNextXid , HASH_FIND , NULL );
994
+ if (et != NULL ) {
995
+ result = true;
996
+ if (-- et -> count == 0 ) {
997
+ hash_search (external_trans , & DtmNextXid , HASH_REMOVE , NULL );
998
+ }
999
+ }
1000
+ LWLockRelease (dtm -> hashLock );
1001
+ return result ;
1002
+ }
1003
+
1004
+
1005
+
966
1006
void DtmBackgroundWorker (Datum arg )
967
1007
{
968
1008
Shub shub ;
969
1009
ShubParams params ;
970
1010
char unix_sock_path [MAXPGPATH ];
971
1011
1012
+ isBackgroundWorker = true;
1013
+
972
1014
snprintf (unix_sock_path , sizeof (unix_sock_path ), "%s/p%d" , Unix_socket_directories , DtmPort );
973
1015
974
1016
ShubInitParams (& params );
0 commit comments