Skip to content

Commit 1ae3274

Browse files
committed
2 parents ea73b2b + df68923 commit 1ae3274

File tree

8 files changed

+310
-22
lines changed

8 files changed

+310
-22
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ CREATE FUNCTION mtm.make_table_local(relation regclass) RETURNS void
4040
AS 'MODULE_PATHNAME','mtm_make_table_local'
4141
LANGUAGE C;
4242

43+
CREATE FUNCTION mtm.dump_lock_graph() RETURNS text
44+
AS 'MODULE_PATHNAME','mtm_dump_lock_graph'
45+
LANGUAGE C;
46+
4347
CREATE TABLE IF NOT EXISTS mtm.ddl_log (issued timestamp with time zone not null, query text);
4448

4549
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));

contrib/mmts/multimaster.c

Lines changed: 101 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_get_snapshot);
108108
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
109109
PG_FUNCTION_INFO_V1(mtm_get_cluster_state);
110110
PG_FUNCTION_INFO_V1(mtm_make_table_local);
111+
PG_FUNCTION_INFO_V1(mtm_dump_lock_graph);
111112

112113
static Snapshot MtmGetSnapshot(Snapshot snapshot);
113114
static void MtmInitialize(void);
@@ -140,7 +141,7 @@ HTAB* MtmXid2State;
140141
static HTAB* MtmGid2State;
141142
static HTAB* MtmLocalTables;
142143

143-
static bool MtmIsRecoverySession;
144+
static bool MtmIsRecoverySession;
144145

145146
static MtmCurrentTrans MtmTx;
146147

@@ -199,6 +200,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
199200
ProcessUtilityContext context, ParamListInfo params,
200201
DestReceiver *dest, char *completionTag);
201202

203+
static StringInfo MtmGUCBuffer;
204+
static bool MtmGUCBufferAllocated = false;
205+
202206
/*
203207
* -------------------------------------------
204208
* Synchronize access to MTM structures.
@@ -2153,6 +2157,31 @@ Datum mtm_make_table_local(PG_FUNCTION_ARGS)
21532157
return false;
21542158
}
21552159

2160+
Datum mtm_dump_lock_graph(PG_FUNCTION_ARGS)
2161+
{
2162+
StringInfo s = makeStringInfo();
2163+
int i;
2164+
for (i = 0; i < MtmNodes; i++)
2165+
{
2166+
size_t size;
2167+
char *data = RaftableGet(psprintf("lock-graph-%d", i+1), &size, NULL, true);
2168+
if (!data) continue;
2169+
GlobalTransactionId *gtid = (GlobalTransactionId *)data;
2170+
GlobalTransactionId *last = (GlobalTransactionId *)(data + size);
2171+
appendStringInfo(s, "node-%d lock graph: ", i+1);
2172+
while (gtid != last) {
2173+
GlobalTransactionId *src = gtid++;
2174+
appendStringInfo(s, "%d:%d -> ", src->node, src->xid);
2175+
while (gtid->node != 0) {
2176+
GlobalTransactionId *dst = gtid++;
2177+
appendStringInfo(s, "%d:%d, ", dst->node, dst->xid);
2178+
}
2179+
gtid += 1;
2180+
}
2181+
appendStringInfo(s, "\n");
2182+
}
2183+
return CStringGetTextDatum(s->data);
2184+
}
21562185

21572186
/*
21582187
* -------------------------------------------
@@ -2241,6 +2270,12 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22412270
{
22422271
if (conns[i])
22432272
{
2273+
if (MtmGUCBufferAllocated && !MtmRunUtilityStmt(conns[i], MtmGUCBuffer->data, &utility_errmsg) && !ignoreError)
2274+
{
2275+
errorMsg = "Failed to set GUC variables at node %d";
2276+
failedNode = i;
2277+
break;
2278+
}
22442279
if (!MtmRunUtilityStmt(conns[i], "BEGIN TRANSACTION", &utility_errmsg) && !ignoreError)
22452280
{
22462281
errorMsg = "Failed to start transaction at node %d";
@@ -2252,7 +2287,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
22522287
if (i + 1 == MtmNodeId)
22532288
errorMsg = utility_errmsg;
22542289
else
2290+
{
2291+
elog(ERROR, utility_errmsg);
22552292
errorMsg = "Failed to run command at node %d";
2293+
}
22562294

22572295
failedNode = i;
22582296
break;
@@ -2383,7 +2421,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
23832421
ProcessUtilityContext context, ParamListInfo params,
23842422
DestReceiver *dest, char *completionTag)
23852423
{
2386-
bool skipCommand;
2424+
bool skipCommand = false;
23872425
MTM_TRACE("%d: Process utility statement %s\n", MyProcPid, queryString);
23882426
switch (nodeTag(parsetree))
23892427
{
@@ -2414,7 +2452,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24142452
case T_FetchStmt:
24152453
case T_DoStmt:
24162454
case T_CreateTableSpaceStmt:
2417-
case T_DropTableSpaceStmt:
24182455
case T_AlterTableSpaceOptionsStmt:
24192456
case T_TruncateStmt:
24202457
case T_CommentStmt: /* XXX: we could replicate these */;
@@ -2423,9 +2460,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24232460
case T_ExecuteStmt:
24242461
case T_DeallocateStmt:
24252462
case T_GrantStmt: /* XXX: we could replicate some of these these */;
2426-
case T_GrantRoleStmt:
2427-
case T_AlterDatabaseStmt:
2428-
case T_AlterDatabaseSetStmt:
2463+
//case T_GrantRoleStmt:
2464+
//case T_AlterDatabaseStmt:
2465+
//case T_AlterDatabaseSetStmt:
24292466
case T_NotifyStmt:
24302467
case T_ListenStmt:
24312468
case T_UnlistenStmt:
@@ -2434,22 +2471,46 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24342471
case T_VacuumStmt:
24352472
case T_ExplainStmt:
24362473
case T_AlterSystemStmt:
2437-
case T_VariableSetStmt:
24382474
case T_VariableShowStmt:
24392475
case T_DiscardStmt:
2440-
case T_CreateEventTrigStmt:
2441-
case T_AlterEventTrigStmt:
2442-
case T_CreateRoleStmt:
2443-
case T_AlterRoleStmt:
2444-
case T_AlterRoleSetStmt:
2445-
case T_DropRoleStmt:
2476+
//case T_CreateEventTrigStmt:
2477+
//case T_AlterEventTrigStmt:
2478+
//case T_CreateRoleStmt:
2479+
//case T_AlterRoleStmt:
2480+
//case T_AlterRoleSetStmt:
2481+
//case T_DropRoleStmt:
24462482
case T_ReassignOwnedStmt:
24472483
case T_LockStmt:
2448-
case T_ConstraintsSetStmt:
2484+
//case T_ConstraintsSetStmt:
24492485
case T_CheckPointStmt:
24502486
case T_ReindexStmt:
24512487
skipCommand = true;
24522488
break;
2489+
case T_VariableSetStmt:
2490+
{
2491+
//VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
2492+
2493+
if (!MtmGUCBufferAllocated)
2494+
{
2495+
MemoryContext oldcontext;
2496+
2497+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
2498+
MtmGUCBuffer = makeStringInfo();
2499+
MemoryContextSwitchTo(oldcontext);
2500+
MtmGUCBufferAllocated = true;
2501+
}
2502+
2503+
//appendStringInfoString(MtmGUCBuffer, "SET ");
2504+
//appendStringInfoString(MtmGUCBuffer, stmt->name);
2505+
//appendStringInfoString(MtmGUCBuffer, " TO ");
2506+
//appendStringInfoString(MtmGUCBuffer, ExtractSetVariableArgs(stmt));
2507+
//appendStringInfoString(MtmGUCBuffer, "; ");
2508+
2509+
appendStringInfoString(MtmGUCBuffer, queryString);
2510+
2511+
skipCommand = true;
2512+
}
2513+
break;
24532514
case T_CreateStmt:
24542515
{
24552516
/* Do not replicate temp tables */
@@ -2469,9 +2530,32 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
24692530
"CREATE INDEX CONCURRENTLY");
24702531

24712532
relid = RelnameGetRelid(stmt->relation->relname);
2472-
rel = heap_open(relid, ShareLock);
2473-
skipCommand = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
2474-
heap_close(rel, NoLock);
2533+
2534+
if (OidIsValid(relid))
2535+
{
2536+
rel = heap_open(relid, ShareLock);
2537+
skipCommand = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
2538+
heap_close(rel, NoLock);
2539+
}
2540+
}
2541+
break;
2542+
case T_DropStmt:
2543+
{
2544+
DropStmt *stmt = (DropStmt *) parsetree;
2545+
2546+
if (stmt->removeType == OBJECT_TABLE)
2547+
{
2548+
RangeVar *rv = makeRangeVarFromNameList(
2549+
(List *) lfirst(list_head(stmt->objects)));
2550+
Oid relid = RelnameGetRelid(rv->relname);
2551+
2552+
if (OidIsValid(relid))
2553+
{
2554+
Relation rel = heap_open(relid, ShareLock);
2555+
skipCommand = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
2556+
heap_close(rel, ShareLock);
2557+
}
2558+
}
24752559
}
24762560
break;
24772561
default:

contrib/mmts/t/002_dtmbench.pl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ sub allocate_ports
6060
listen_addresses = '$host'
6161
unix_socket_directories = ''
6262
port = $pgport
63-
max_prepared_transactions = 10
63+
max_prepared_transactions = 1000
6464
max_worker_processes = 10
6565
wal_level = logical
6666
fsync = off
@@ -103,13 +103,13 @@ sub allocate_ports
103103
push(@argv, '-n', 1000, '-a', 1000, '-w', 10, '-r', 1);
104104

105105
diag("running dtmbench -i");
106-
if (TestLib::run_log([@argv, '-i']))
106+
if (!TestLib::run_log([@argv, '-i']))
107107
{
108108
BAIL_OUT("dtmbench -i failed");
109109
}
110110

111111
diag("running dtmbench");
112-
if (TestLib::run_log(\@argv, '>', \$out))
112+
if (!TestLib::run_log(\@argv, '>', \$out))
113113
{
114114
fail("dtmbench failed");
115115
}

0 commit comments

Comments
 (0)