Skip to content

Commit 09cf6e4

Browse files
committed
Add local tables hash
1 parent 2420bbe commit 09cf6e4

File tree

4 files changed

+183
-6
lines changed

4 files changed

+183
-6
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,11 @@ CREATE FUNCTION mtm.get_cluster_state() RETURNS mtm.cluster_state
3636
AS 'MODULE_PATHNAME','mtm_get_cluster_state'
3737
LANGUAGE C;
3838

39+
CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
40+
AS 'MODULE_PATHNAME','mtm_make_table_local'
41+
LANGUAGE C;
42+
3943
CREATE TABLE IF NOT EXISTS mtm.ddl_log (issued timestamp with time zone not null, query text);
44+
45+
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key pk(rel_schema, rel_name));
46+

contrib/mmts/multimaster.c

Lines changed: 163 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
#include "storage/pmsignal.h"
4646
#include "storage/proc.h"
4747
#include "utils/syscache.h"
48+
#include "utils/lsyscache.h"
4849
#include "replication/walsender.h"
4950
#include "replication/walsender_private.h"
5051
#include "replication/slot.h"
@@ -53,6 +54,7 @@
5354
#include "nodes/makefuncs.h"
5455
#include "access/htup_details.h"
5556
#include "catalog/indexing.h"
57+
#include "catalog/namespace.h"
5658
#include "pglogical_output/hooks.h"
5759

5860
#include "multimaster.h"
@@ -105,6 +107,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
105107
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
106108
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
107109
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
110+
PG_FUNCTION_INFO_V1(mtm_make_table_local);
108111

109112
static Snapshot MtmGetSnapshot(Snapshot snapshot);
110113
static void MtmInitialize(void);
@@ -135,6 +138,7 @@ MtmState* Mtm;
135138

136139
HTAB* MtmXid2State;
137140
static HTAB* MtmGid2State;
141+
static HTAB* MtmLocalTables;
138142

139143
static MtmCurrentTrans MtmTx;
140144

@@ -176,11 +180,12 @@ bool MtmUseRaftable;
176180
MtmConnectionInfo* MtmConnections;
177181

178182
static char* MtmConnStrs;
179-
static int MtmQueueSize;
180-
static int MtmWorkers;
181-
static int MtmVacuumDelay;
182-
static int MtmMinRecoveryLag;
183-
static int MtmMaxRecoveryLag;
183+
static int MtmQueueSize;
184+
static int MtmWorkers;
185+
static int MtmVacuumDelay;
186+
static int MtmMinRecoveryLag;
187+
static int MtmMaxRecoveryLag;
188+
static bool MtmIgnoreTablesWithoutPk;
184189

185190
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
186191
static ProcessUtility_hook_type PreviousProcessUtilityHook;
@@ -1280,6 +1285,71 @@ MtmCreateGidMap(void)
12801285
return htab;
12811286
}
12821287

1288+
static HTAB*
1289+
MtmCreateLocalTableMap(void)
1290+
{
1291+
HASHCTL info;
1292+
HTAB* htab;
1293+
memset(&info, 0, sizeof(info));
1294+
info.keysize = sizeof(Oid);
1295+
htab = ShmemInitHash(
1296+
"MtmLocalTables",
1297+
MULTIMASTER_MAX_LOCAL_TABLES, MULTIMASTER_MAX_LOCAL_TABLES,
1298+
&info,
1299+
0
1300+
);
1301+
return htab;
1302+
}
1303+
1304+
static void MtmMakeRelationLocal(Oid relid)
1305+
{
1306+
if (OidIsValid(relid)) {
1307+
MtmLock(LW_EXCLUSIVE);
1308+
hash_search(MtmLocalTables, &relid, HASH_ENTER, NULL);
1309+
MtmUnlock();
1310+
}
1311+
}
1312+
1313+
1314+
void MtmMakeTableLocal(char* schema, char* name)
1315+
{
1316+
RangeVar* rv = makeRangeVar(schema, name, -1);
1317+
Oid relid = RangeVarGetRelid(rv, NoLock, true);
1318+
MtmMakeRelationLocal(relid);
1319+
}
1320+
1321+
1322+
typedef struct {
1323+
NameData schema;
1324+
NameData name;
1325+
} MtmLocalTablesTuple;
1326+
1327+
static void MtmLoadLocalTables(void)
1328+
{
1329+
RangeVar *rv;
1330+
Relation rel;
1331+
SysScanDesc scan;
1332+
HeapTuple tuple;
1333+
1334+
Assert(IsTransactionState());
1335+
1336+
rv = makeRangeVar(MULTIMASTER_SCHEMA_NAME, MULTIMASTER_LOCAL_TABLES_TABLE, -1);
1337+
rel = heap_openrv_extended(rv, RowExclusiveLock, true);
1338+
if (rel != NULL) {
1339+
scan = systable_beginscan(rel, 0, true, NULL, 0, NULL);
1340+
1341+
while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1342+
{
1343+
MtmLocalTablesTuple *t = (MtmLocalTablesTuple*) GETSTRUCT(tuple);
1344+
MtmMakeTableLocal(NameStr(t->schema), NameStr(t->name));
1345+
}
1346+
1347+
systable_endscan(scan);
1348+
heap_close(rel, RowExclusiveLock);
1349+
}
1350+
}
1351+
1352+
12831353
static void MtmInitialize()
12841354
{
12851355
bool found;
@@ -1309,6 +1379,7 @@ static void MtmInitialize()
13091379
Mtm->nReceivers = 0;
13101380
Mtm->timeShift = 0;
13111381
Mtm->transCount = 0;
1382+
Mtm->localTablesHashLoaded = false;
13121383
for (i = 0; i < MtmNodes; i++) {
13131384
Mtm->nodes[i].oldestSnapshot = 0;
13141385
Mtm->nodes[i].transDelay = 0;
@@ -1324,6 +1395,7 @@ static void MtmInitialize()
13241395
}
13251396
MtmXid2State = MtmCreateXidMap();
13261397
MtmGid2State = MtmCreateGidMap();
1398+
MtmLocalTables = MtmCreateLocalTableMap();
13271399
MtmDoReplication = true;
13281400
TM = &MtmTM;
13291401
LWLockRelease(AddinShmemInitLock);
@@ -1476,6 +1548,19 @@ _PG_init(void)
14761548
NULL
14771549
);
14781550

1551+
DefineCustomBoolVariable(
1552+
"multimaster.ignore_tables_without_pk",
1553+
"Do not replicate tables withpout primary key",
1554+
NULL,
1555+
&MtmIgnoreTablesWithoutPk,
1556+
false,
1557+
PGC_BACKEND,
1558+
0,
1559+
NULL,
1560+
NULL,
1561+
NULL
1562+
);
1563+
14791564
DefineCustomIntVariable(
14801565
"multimaster.workers",
14811566
"Number of multimaster executor workers per node",
@@ -1805,11 +1890,30 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18051890
return res;
18061891
}
18071892

1893+
static bool
1894+
MtmReplicationRowFilterHook(struct PGLogicalRowFilterArgs* args)
1895+
{
1896+
bool isDistributed;
1897+
MtmLock(LW_SHARED);
1898+
if (!Mtm->localTablesHashLoaded) {
1899+
MtmUnlock();
1900+
MtmLock(LW_EXCLUSIVE);
1901+
if (!Mtm->localTablesHashLoaded) {
1902+
MtmLoadLocalTables();
1903+
Mtm->localTablesHashLoaded = true;
1904+
}
1905+
}
1906+
isDistributed = hash_search(MtmLocalTables, &RelationGetRelid(args->changed_rel), HASH_FIND, NULL) == NULL;
1907+
MtmUnlock();
1908+
return isDistributed;
1909+
}
1910+
18081911
void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks)
18091912
{
18101913
hooks->startup_hook = MtmReplicationStartupHook;
18111914
hooks->shutdown_hook = MtmReplicationShutdownHook;
18121915
hooks->txn_filter_hook = MtmReplicationTxnFilterHook;
1916+
hooks->row_filter_hook = MtmReplicationRowFilterHook;
18131917
}
18141918

18151919

@@ -1936,6 +2040,52 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
19362040
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(desc, values, nulls)));
19372041
}
19382042

2043+
2044+
Datum mtm_make_table_local(PG_FUNCTION_ARGS)
2045+
{
2046+
Oid reloid = PG_GETARG_OID(1);
2047+
RangeVar *rv;
2048+
Relation rel;
2049+
TupleDesc tupDesc;
2050+
HeapTuple tup;
2051+
Datum values[Natts_mtm_local_tables];
2052+
bool nulls[Natts_mtm_local_tables];
2053+
2054+
MtmMakeRelationLocal(reloid);
2055+
2056+
rv = makeRangeVar(MULTIMASTER_SCHEMA_NAME, MULTIMASTER_LOCAL_TABLES_TABLE, -1);
2057+
rel = heap_openrv(rv, RowExclusiveLock);
2058+
if (rel != NULL) {
2059+
char* tableName = get_rel_name(reloid);
2060+
Oid schemaid = get_rel_namespace(reloid);
2061+
char* schemaName = get_namespace_name(schemaid);
2062+
2063+
tupDesc = RelationGetDescr(rel);
2064+
2065+
/* Form a tuple. */
2066+
memset(nulls, false, sizeof(nulls));
2067+
2068+
values[Anum_mtm_local_tables_rel_schema - 1] = CStringGetTextDatum(schemaName);
2069+
values[Anum_mtm_local_tables_rel_name - 1] = CStringGetTextDatum(tableName);
2070+
2071+
tup = heap_form_tuple(tupDesc, values, nulls);
2072+
2073+
/* Insert the tuple to the catalog. */
2074+
simple_heap_insert(rel, tup);
2075+
2076+
/* Update the indexes. */
2077+
CatalogUpdateIndexes(rel, tup);
2078+
2079+
/* Cleanup. */
2080+
heap_freetuple(tup);
2081+
heap_close(rel, RowExclusiveLock);
2082+
2083+
MtmTx.containsDML = true;
2084+
}
2085+
return false;
2086+
}
2087+
2088+
19392089
/*
19402090
* -------------------------------------------
19412091
* Broadcast utulity statements
@@ -2248,10 +2398,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22482398
if (estate->es_processed != 0 && (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE)) {
22492399
int i;
22502400
for (i = 0; i < estate->es_num_result_relations; i++) {
2251-
if (RelationNeedsWAL(estate->es_result_relations[i].ri_RelationDesc)) {
2401+
Relation rel = estate->es_result_relations[i].ri_RelationDesc;
2402+
if (RelationNeedsWAL(rel)) {
22522403
MtmTx.containsDML = true;
22532404
break;
22542405
}
2406+
if (MtmIgnoreTablesWithoutPk) {
2407+
if (!rel->rd_indexvalid) {
2408+
RelationGetIndexList(rel);
2409+
}
2410+
MtmMakeRelationLocal(rel->rd_replidindex);
2411+
}
22552412
}
22562413
}
22572414
}

contrib/mmts/multimaster.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
#define MULTIMASTER_NAME "multimaster"
2020
#define MULTIMASTER_SCHEMA_NAME "mtm"
2121
#define MULTIMASTER_DDL_TABLE "ddl_log"
22+
#define MULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
2223
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
2324
#define MULTIMASTER_MIN_PROTO_VERSION 1
2425
#define MULTIMASTER_MAX_PROTO_VERSION 1
2526
#define MULTIMASTER_MAX_GID_SIZE 32
2627
#define MULTIMASTER_MAX_SLOT_NAME_SIZE 16
2728
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
2829
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
30+
#define MULTIMASTER_MAX_LOCAL_TABLES 256
2931
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
3032
#define MULTIMASTER_ADMIN "mtm_admin"
3133

@@ -35,6 +37,10 @@
3537
#define Anum_mtm_ddl_log_issued 1
3638
#define Anum_mtm_ddl_log_query 2
3739

40+
#define Natts_mtm_local_tables 2
41+
#define Anum_mtm_local_tables_rel_schema 1
42+
#define Anum_mtm_local_tables_rel_name 2
43+
3844
typedef uint64 csn_t; /* commit serial number */
3945
#define INVALID_CSN ((csn_t)-1)
4046

@@ -135,6 +141,7 @@ typedef struct
135141
nodemask_t nodeLockerMask; /* Mask of node IDs which WAL-senders are locking the cluster */
136142
nodemask_t reconnectMask; /* Mask of nodes connection to which has to be reestablished by sender */
137143

144+
bool localTablesHashLoaded; /* Whether data from local_tables table is loaded in shared memory hash table */
138145
int nNodes; /* Number of active nodes */
139146
int nReceivers; /* Number of initialized logical receivers (used to determine moment when Mtm intialization is completed */
140147
int nLockers; /* Number of lockers */
@@ -208,4 +215,6 @@ extern void MtmSetupReplicationHooks(struct PGLogicalHooks* hooks);
208215
extern void MtmCheckQuorum(void);
209216
extern bool MtmRecoveryCaughtUp(int nodeId, XLogRecPtr slotLSN);
210217
extern void MtmRecoveryCompleted(void);
218+
extern void MtmMakeTableLocal(char* schema, char* name);
219+
211220
#endif

contrib/mmts/pglogical_apply.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,10 @@ process_remote_insert(StringInfo s, Relation rel)
679679
if (rc != SPI_OK_UTILITY) {
680680
elog(ERROR, "Failed to execute utility statement %s", ddl);
681681
}
682+
} else if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
683+
char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
684+
char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
685+
MtmMakeTableLocal(schema, name);
682686
}
683687

684688
}

0 commit comments

Comments
 (0)