Skip to content

Commit 4b713ea

Browse files
committed
Merge branch 'PGPROEE9_6_MULTIMASTER' into PGPROEE9_6
2 parents 4340bce + df1db59 commit 4b713ea

File tree

2 files changed

+48
-25
lines changed

2 files changed

+48
-25
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -99,18 +99,23 @@ BEGIN
9999
FOR seq_class IN
100100
SELECT * FROM pg_class WHERE pg_class.relkind = 'S'
101101
LOOP
102-
EXECUTE 'select * from ' || seq_class.relname || ';' INTO seq_tuple;
103-
IF seq_tuple.increment_by != max_nodes THEN
104-
altered := true;
105-
RAISE NOTICE 'Altering step for sequence % to %.', seq_tuple.sequence_name, max_nodes;
106-
EXECUTE 'ALTER SEQUENCE ' || seq_class.relname || ' INCREMENT BY ' || max_nodes || ';';
107-
END IF;
108-
IF (seq_tuple.last_value % max_nodes) != node_id THEN
109-
altered := true;
110-
new_start := (seq_tuple.last_value / max_nodes + 1)*max_nodes + node_id;
111-
RAISE NOTICE 'Altering start for sequence % to %.', seq_tuple.sequence_name, new_start;
112-
EXECUTE 'ALTER SEQUENCE ' || seq_class.relname || ' RESTART WITH ' || new_start || ';';
113-
END IF;
102+
BEGIN
103+
EXECUTE 'select * from ' || seq_class.relname || ';' INTO seq_tuple;
104+
IF seq_tuple.increment_by != max_nodes THEN
105+
altered := true;
106+
RAISE NOTICE 'Altering step for sequence % to %.', seq_tuple.sequence_name, max_nodes;
107+
EXECUTE 'ALTER SEQUENCE ' || seq_class.relname || ' INCREMENT BY ' || max_nodes || ';';
108+
END IF;
109+
IF (seq_tuple.last_value % max_nodes) != node_id THEN
110+
altered := true;
111+
new_start := (seq_tuple.last_value / max_nodes + 1)*max_nodes + node_id;
112+
RAISE NOTICE 'Altering start for sequence % to %.', seq_tuple.sequence_name, new_start;
113+
EXECUTE 'ALTER SEQUENCE ' || seq_class.relname || ' RESTART WITH ' || new_start || ';';
114+
END IF;
115+
EXCEPTION
116+
WHEN OTHERS THEN
117+
RAISE NOTICE 'Failed to alter sequence %s', seq_class.relname;
118+
END;
114119
END LOOP;
115120
IF altered = false THEN
116121
RAISE NOTICE 'All found sequnces have proper params.';

contrib/mmts/multimaster.c

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include "utils/builtins.h"
4747
#include "utils/memutils.h"
4848
#include "commands/dbcommands.h"
49+
#include "commands/extension.h"
4950
#include "postmaster/autovacuum.h"
5051
#include "storage/pmsignal.h"
5152
#include "storage/proc.h"
@@ -1253,6 +1254,15 @@ Mtm2PCVoting(MtmCurrentTrans* x, MtmTransState* ts)
12531254
MTM_LOG3("%d: Result of vote: %d", MyProcPid, MtmTxnStatusMnem[ts->status]);
12541255
}
12551256

1257+
static void MtmStopTransaction(void)
1258+
{
1259+
if (MtmInsideTransaction) {
1260+
Assert(Mtm->nRunningTransactions > 0);
1261+
Mtm->nRunningTransactions -= 1;
1262+
MtmInsideTransaction = false;
1263+
}
1264+
}
1265+
12561266
static void
12571267
MtmPostPrepareTransaction(MtmCurrentTrans* x)
12581268
{
@@ -1291,13 +1301,14 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
12911301
} else {
12921302
ts->votingCompleted = true;
12931303
}
1294-
MtmUnlock();
12951304
if (x->isTwoPhase) {
12961305
if (x->status == TRANSACTION_STATUS_ABORTED) {
12971306
MTM_ELOG(WARNING, "Prepare of user's 2PC transaction %s (%llu) is aborted by DTM", x->gid, (long64)x->xid);
1298-
}
1307+
}
1308+
MtmStopTransaction();
12991309
MtmResetTransaction();
13001310
}
1311+
MtmUnlock();
13011312
}
13021313
if (Mtm->inject2PCError == 3) {
13031314
Mtm->inject2PCError = 0;
@@ -1357,10 +1368,14 @@ MtmAbortPreparedTransaction(MtmCurrentTrans* x)
13571368
tm = (MtmTransMap*)hash_search(MtmGid2State, x->gid, HASH_FIND, NULL);
13581369
if (tm == NULL) {
13591370
MTM_ELOG(WARNING, "Global transaction ID '%s' is not found", x->gid);
1360-
} else {
1361-
Assert(tm->state != NULL);
1371+
} else {
1372+
MtmTransState* ts = tm->state;
1373+
Assert(ts != NULL);
13621374
MTM_LOG1("Abort prepared transaction %s (%llu)", x->gid, (long64)x->xid);
1363-
MtmAbortTransaction(tm->state);
1375+
MtmAbortTransaction(ts);
1376+
if (ts->isTwoPhase) {
1377+
MtmDeactivateTransaction(ts);
1378+
}
13641379
}
13651380
MtmUnlock();
13661381
x->status = TRANSACTION_STATUS_ABORTED;
@@ -1381,7 +1396,7 @@ MtmLogAbortLogicalMessage(int nodeId, char const* gid)
13811396
XLogFlush(lsn);
13821397
MTM_LOG1("MtmLogAbortLogicalMessage node=%d transaction=%s lsn=%llx", nodeId, gid, lsn);
13831398
}
1384-
1399+
13851400

13861401
static void
13871402
MtmEndTransaction(MtmCurrentTrans* x, bool commit)
@@ -1392,11 +1407,7 @@ MtmEndTransaction(MtmCurrentTrans* x, bool commit)
13921407

13931408
MtmLock(LW_EXCLUSIVE);
13941409

1395-
if (MtmInsideTransaction) {
1396-
Assert(Mtm->nRunningTransactions > 0);
1397-
Mtm->nRunningTransactions -= 1;
1398-
MtmInsideTransaction = false;
1399-
}
1410+
MtmStopTransaction();
14001411

14011412
if (x->isDistributed && (x->isPrepared || x->isReplicated) && !x->isTwoPhase) {
14021413
MtmTransState* ts = NULL;
@@ -4865,7 +4876,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
48654876
bool skipCommand = false;
48664877
bool executed = false;
48674878

4868-
MTM_LOG3("%d: Process utility statement %s", MyProcPid, queryString);
4879+
MTM_LOG3("%d: Process utility statement tag=%d, context=%d, issubtrans=%d, query=%s",
4880+
MyProcPid, nodeTag(parsetree), context, IsSubTransaction(), queryString);
48694881
switch (nodeTag(parsetree))
48704882
{
48714883
case T_TransactionStmt:
@@ -5119,7 +5131,13 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
51195131
}
51205132

51215133
/* XXX: dirty. Clear on new tx */
5122-
if (!skipCommand && (context != PROCESS_UTILITY_SUBCOMMAND || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
5134+
/* Some "black magic here":( We want to avoid redundant execution of utility statement by ProcessUtilitySlow (which is done with PROCESS_UTILITY_SUBCOMMAND).
5135+
* But if we allow only PROCESS_UTILITY_TOPLEVEL context, then we will not replicated DDL inside dynamic queries in plpgsql functions (see https://jira.postgrespro.ru/browse/CORE-526).
5136+
* If we disable only PROCESS_UTILITY_SUBCOMMAND, then we will get problems with "create extension" which is executed also in PROCESS_UTILITY_QUERY context.
5137+
* So workaround at this moment is to treat extension as special case.
5138+
* TODO: try to find right solution and rewrite this dummy check.
5139+
*/
5140+
if (!skipCommand && (context == PROCESS_UTILITY_TOPLEVEL || (context == PROCESS_UTILITY_QUERY && !creating_extension) || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
51235141
MtmUtilityProcessedInXid = InvalidTransactionId;
51245142

51255143
if (!skipCommand && !MtmTx.isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId)) {

0 commit comments

Comments
 (0)