45
45
#include "storage/pmsignal.h"
46
46
#include "storage/proc.h"
47
47
#include "utils/syscache.h"
48
+ #include "utils/lsyscache.h"
48
49
#include "replication/walsender.h"
49
50
#include "replication/walsender_private.h"
50
51
#include "replication/slot.h"
53
54
#include "nodes/makefuncs.h"
54
55
#include "access/htup_details.h"
55
56
#include "catalog/indexing.h"
57
+ #include "catalog/namespace.h"
56
58
#include "pglogical_output/hooks.h"
57
59
58
60
#include "multimaster.h"
@@ -105,6 +107,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
105
107
PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
106
108
PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
107
109
PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
110
+ PG_FUNCTION_INFO_V1 (mtm_make_table_local );
108
111
109
112
static Snapshot MtmGetSnapshot (Snapshot snapshot );
110
113
static void MtmInitialize (void );
@@ -135,6 +138,7 @@ MtmState* Mtm;
135
138
136
139
HTAB * MtmXid2State ;
137
140
static HTAB * MtmGid2State ;
141
+ static HTAB * MtmLocalTables ;
138
142
139
143
static MtmCurrentTrans MtmTx ;
140
144
@@ -176,11 +180,12 @@ bool MtmUseRaftable;
176
180
MtmConnectionInfo * MtmConnections ;
177
181
178
182
static char * MtmConnStrs ;
179
- static int MtmQueueSize ;
180
- static int MtmWorkers ;
181
- static int MtmVacuumDelay ;
182
- static int MtmMinRecoveryLag ;
183
- static int MtmMaxRecoveryLag ;
183
+ static int MtmQueueSize ;
184
+ static int MtmWorkers ;
185
+ static int MtmVacuumDelay ;
186
+ static int MtmMinRecoveryLag ;
187
+ static int MtmMaxRecoveryLag ;
188
+ static bool MtmIgnoreTablesWithoutPk ;
184
189
185
190
static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
186
191
static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1280,6 +1285,71 @@ MtmCreateGidMap(void)
1280
1285
return htab ;
1281
1286
}
1282
1287
1288
+ static HTAB *
1289
+ MtmCreateLocalTableMap (void )
1290
+ {
1291
+ HASHCTL info ;
1292
+ HTAB * htab ;
1293
+ memset (& info , 0 , sizeof (info ));
1294
+ info .keysize = sizeof (Oid );
1295
+ htab = ShmemInitHash (
1296
+ "MtmLocalTables" ,
1297
+ MULTIMASTER_MAX_LOCAL_TABLES , MULTIMASTER_MAX_LOCAL_TABLES ,
1298
+ & info ,
1299
+ 0
1300
+ );
1301
+ return htab ;
1302
+ }
1303
+
1304
+ static void MtmMakeRelationLocal (Oid relid )
1305
+ {
1306
+ if (OidIsValid (relid )) {
1307
+ MtmLock (LW_EXCLUSIVE );
1308
+ hash_search (MtmLocalTables , & relid , HASH_ENTER , NULL );
1309
+ MtmUnlock ();
1310
+ }
1311
+ }
1312
+
1313
+
1314
+ void MtmMakeTableLocal (char * schema , char * name )
1315
+ {
1316
+ RangeVar * rv = makeRangeVar (schema , name , -1 );
1317
+ Oid relid = RangeVarGetRelid (rv , NoLock , true);
1318
+ MtmMakeRelationLocal (relid );
1319
+ }
1320
+
1321
+
1322
+ typedef struct {
1323
+ NameData schema ;
1324
+ NameData name ;
1325
+ } MtmLocalTablesTuple ;
1326
+
1327
+ static void MtmLoadLocalTables (void )
1328
+ {
1329
+ RangeVar * rv ;
1330
+ Relation rel ;
1331
+ SysScanDesc scan ;
1332
+ HeapTuple tuple ;
1333
+
1334
+ Assert (IsTransactionState ());
1335
+
1336
+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME , MULTIMASTER_LOCAL_TABLES_TABLE , -1 );
1337
+ rel = heap_openrv_extended (rv , RowExclusiveLock , true);
1338
+ if (rel != NULL ) {
1339
+ scan = systable_beginscan (rel , 0 , true, NULL , 0 , NULL );
1340
+
1341
+ while (HeapTupleIsValid (tuple = systable_getnext (scan )))
1342
+ {
1343
+ MtmLocalTablesTuple * t = (MtmLocalTablesTuple * ) GETSTRUCT (tuple );
1344
+ MtmMakeTableLocal (NameStr (t -> schema ), NameStr (t -> name ));
1345
+ }
1346
+
1347
+ systable_endscan (scan );
1348
+ heap_close (rel , RowExclusiveLock );
1349
+ }
1350
+ }
1351
+
1352
+
1283
1353
static void MtmInitialize ()
1284
1354
{
1285
1355
bool found ;
@@ -1309,6 +1379,7 @@ static void MtmInitialize()
1309
1379
Mtm -> nReceivers = 0 ;
1310
1380
Mtm -> timeShift = 0 ;
1311
1381
Mtm -> transCount = 0 ;
1382
+ Mtm -> localTablesHashLoaded = false;
1312
1383
for (i = 0 ; i < MtmNodes ; i ++ ) {
1313
1384
Mtm -> nodes [i ].oldestSnapshot = 0 ;
1314
1385
Mtm -> nodes [i ].transDelay = 0 ;
@@ -1324,6 +1395,7 @@ static void MtmInitialize()
1324
1395
}
1325
1396
MtmXid2State = MtmCreateXidMap ();
1326
1397
MtmGid2State = MtmCreateGidMap ();
1398
+ MtmLocalTables = MtmCreateLocalTableMap ();
1327
1399
MtmDoReplication = true;
1328
1400
TM = & MtmTM ;
1329
1401
LWLockRelease (AddinShmemInitLock );
@@ -1476,6 +1548,19 @@ _PG_init(void)
1476
1548
NULL
1477
1549
);
1478
1550
1551
+ DefineCustomBoolVariable (
1552
+ "multimaster.ignore_tables_without_pk" ,
1553
+ "Do not replicate tables withpout primary key" ,
1554
+ NULL ,
1555
+ & MtmIgnoreTablesWithoutPk ,
1556
+ false,
1557
+ PGC_BACKEND ,
1558
+ 0 ,
1559
+ NULL ,
1560
+ NULL ,
1561
+ NULL
1562
+ );
1563
+
1479
1564
DefineCustomIntVariable (
1480
1565
"multimaster.workers" ,
1481
1566
"Number of multimaster executor workers per node" ,
@@ -1805,11 +1890,30 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
1805
1890
return res ;
1806
1891
}
1807
1892
1893
+ static bool
1894
+ MtmReplicationRowFilterHook (struct PGLogicalRowFilterArgs * args )
1895
+ {
1896
+ bool isDistributed ;
1897
+ MtmLock (LW_SHARED );
1898
+ if (!Mtm -> localTablesHashLoaded ) {
1899
+ MtmUnlock ();
1900
+ MtmLock (LW_EXCLUSIVE );
1901
+ if (!Mtm -> localTablesHashLoaded ) {
1902
+ MtmLoadLocalTables ();
1903
+ Mtm -> localTablesHashLoaded = true;
1904
+ }
1905
+ }
1906
+ isDistributed = hash_search (MtmLocalTables , & RelationGetRelid (args -> changed_rel ), HASH_FIND , NULL ) == NULL ;
1907
+ MtmUnlock ();
1908
+ return isDistributed ;
1909
+ }
1910
+
1808
1911
void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
1809
1912
{
1810
1913
hooks -> startup_hook = MtmReplicationStartupHook ;
1811
1914
hooks -> shutdown_hook = MtmReplicationShutdownHook ;
1812
1915
hooks -> txn_filter_hook = MtmReplicationTxnFilterHook ;
1916
+ hooks -> row_filter_hook = MtmReplicationRowFilterHook ;
1813
1917
}
1814
1918
1815
1919
@@ -1936,6 +2040,52 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
1936
2040
PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
1937
2041
}
1938
2042
2043
+
2044
+ Datum mtm_make_table_local (PG_FUNCTION_ARGS )
2045
+ {
2046
+ Oid reloid = PG_GETARG_OID (1 );
2047
+ RangeVar * rv ;
2048
+ Relation rel ;
2049
+ TupleDesc tupDesc ;
2050
+ HeapTuple tup ;
2051
+ Datum values [Natts_mtm_local_tables ];
2052
+ bool nulls [Natts_mtm_local_tables ];
2053
+
2054
+ MtmMakeRelationLocal (reloid );
2055
+
2056
+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME , MULTIMASTER_LOCAL_TABLES_TABLE , -1 );
2057
+ rel = heap_openrv (rv , RowExclusiveLock );
2058
+ if (rel != NULL ) {
2059
+ char * tableName = get_rel_name (reloid );
2060
+ Oid schemaid = get_rel_namespace (reloid );
2061
+ char * schemaName = get_namespace_name (schemaid );
2062
+
2063
+ tupDesc = RelationGetDescr (rel );
2064
+
2065
+ /* Form a tuple. */
2066
+ memset (nulls , false, sizeof (nulls ));
2067
+
2068
+ values [Anum_mtm_local_tables_rel_schema - 1 ] = CStringGetTextDatum (schemaName );
2069
+ values [Anum_mtm_local_tables_rel_name - 1 ] = CStringGetTextDatum (tableName );
2070
+
2071
+ tup = heap_form_tuple (tupDesc , values , nulls );
2072
+
2073
+ /* Insert the tuple to the catalog. */
2074
+ simple_heap_insert (rel , tup );
2075
+
2076
+ /* Update the indexes. */
2077
+ CatalogUpdateIndexes (rel , tup );
2078
+
2079
+ /* Cleanup. */
2080
+ heap_freetuple (tup );
2081
+ heap_close (rel , RowExclusiveLock );
2082
+
2083
+ MtmTx .containsDML = true;
2084
+ }
2085
+ return false;
2086
+ }
2087
+
2088
+
1939
2089
/*
1940
2090
* -------------------------------------------
1941
2091
* Broadcast utulity statements
@@ -2248,10 +2398,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
2248
2398
if (estate -> es_processed != 0 && (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE )) {
2249
2399
int i ;
2250
2400
for (i = 0 ; i < estate -> es_num_result_relations ; i ++ ) {
2251
- if (RelationNeedsWAL (estate -> es_result_relations [i ].ri_RelationDesc )) {
2401
+ Relation rel = estate -> es_result_relations [i ].ri_RelationDesc ;
2402
+ if (RelationNeedsWAL (rel )) {
2252
2403
MtmTx .containsDML = true;
2253
2404
break ;
2254
2405
}
2406
+ if (MtmIgnoreTablesWithoutPk ) {
2407
+ if (!rel -> rd_indexvalid ) {
2408
+ RelationGetIndexList (rel );
2409
+ }
2410
+ MtmMakeRelationLocal (rel -> rd_replidindex );
2411
+ }
2255
2412
}
2256
2413
}
2257
2414
}
0 commit comments