Skip to content

Commit 37a2e58

Browse files
committed
Do not set XMAX_INVALID for transaction which are in progress
2 parents 258b5f8 + 10b4690 commit 37a2e58

File tree

4 files changed

+88
-60
lines changed

4 files changed

+88
-60
lines changed

contrib/mmts/multimaster--1.0.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURN
1313
AS 'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16-
CREATE FUNCTION mtm.add_node(conn_str cstring) RETURNS void
16+
CREATE FUNCTION mtm.add_node(conn_str text) RETURNS void
1717
AS 'MODULE_PATHNAME','mtm_add_node'
1818
LANGUAGE C;
1919

contrib/mmts/multimaster.c

Lines changed: 46 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
#include "pglogical_output/hooks.h"
6363
#include "parser/analyze.h"
6464
#include "parser/parse_relation.h"
65+
#include "tcop/pquery.h"
6566

6667
#include "multimaster.h"
6768
#include "ddd.h"
@@ -150,7 +151,7 @@ static void MtmShmemStartup(void);
150151
static BgwPool* MtmPoolConstructor(void);
151152
static bool MtmRunUtilityStmt(PGconn* conn, char const* sql, char **errmsg);
152153
static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError);
153-
static bool MtmProcessDDLCommand(char const* queryString);
154+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional);
154155

155156
MtmState* Mtm;
156157

@@ -3022,7 +3023,7 @@ mtm_drop_node(PG_FUNCTION_ARGS)
30223023
Datum
30233024
mtm_add_node(PG_FUNCTION_ARGS)
30243025
{
3025-
char* connStr = PG_GETARG_CSTRING(0);
3026+
char *connStr = text_to_cstring(PG_GETARG_TEXT_PP(0));
30263027

30273028
if (Mtm->nAllNodes == MtmMaxNodes) {
30283029
elog(ERROR, "Maximal number of nodes %d is reached", MtmMaxNodes);
@@ -3729,7 +3730,7 @@ static char * MtmGucSerialize(void)
37293730
* -------------------------------------------
37303731
*/
37313732

3732-
static bool MtmProcessDDLCommand(char const* queryString)
3733+
static bool MtmProcessDDLCommand(char const* queryString, bool transactional)
37333734
{
37343735
char *queryWithContext;
37353736
char *gucContext;
@@ -3748,7 +3749,12 @@ static bool MtmProcessDDLCommand(char const* queryString)
37483749
}
37493750

37503751
MTM_LOG1("Sending utility: %s", queryWithContext);
3751-
LogLogicalMessage("G", queryWithContext, strlen(queryWithContext)+1, true);
3752+
if (transactional)
3753+
/* DDL */
3754+
LogLogicalMessage("D", queryWithContext, strlen(queryWithContext) + 1, true);
3755+
else
3756+
/* CONCURRENT DDL */
3757+
LogLogicalMessage("C", queryWithContext, strlen(queryWithContext) + 1, false);
37523758

37533759
MtmTx.containsDML = true;
37543760
return false;
@@ -3785,17 +3791,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
37853791
MTM_LOG3("%d: Process utility statement %s", MyProcPid, queryString);
37863792
switch (nodeTag(parsetree))
37873793
{
3788-
case T_IndexStmt:
3789-
{
3790-
IndexStmt* stmt = (IndexStmt*) parsetree;
3791-
if (stmt->concurrent) {
3792-
stmt->concurrent = false;
3793-
elog(WARNING, "Disable concurrent option for index creation");
3794-
}
3795-
break;
3796-
}
3797-
3798-
case T_TransactionStmt:
3794+
case T_TransactionStmt:
37993795
{
38003796
TransactionStmt *stmt = (TransactionStmt *) parsetree;
38013797
switch (stmt->kind)
@@ -3893,6 +3889,30 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
38933889
}
38943890
break;
38953891

3892+
case T_IndexStmt:
3893+
{
3894+
IndexStmt *indexStmt = (IndexStmt *) parsetree;
3895+
if (indexStmt->concurrent && !IsTransactionBlock())
3896+
{
3897+
skipCommand = true;
3898+
MtmProcessDDLCommand(queryString, false);
3899+
MtmTx.isDistributed = false;
3900+
}
3901+
}
3902+
break;
3903+
3904+
case T_DropStmt:
3905+
{
3906+
DropStmt *stmt = (DropStmt *) parsetree;
3907+
if (stmt->removeType == OBJECT_INDEX && stmt->concurrent && !IsTransactionBlock())
3908+
{
3909+
skipCommand = true;
3910+
MtmProcessDDLCommand(queryString, false);
3911+
MtmTx.isDistributed = false;
3912+
}
3913+
}
3914+
break;
3915+
38963916
/* Copy need some special care */
38973917
case T_CopyStmt:
38983918
{
@@ -3926,13 +3946,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39263946
if (!skipCommand && (context == PROCESS_UTILITY_TOPLEVEL || MtmUtilityProcessedInXid != GetCurrentTransactionId()))
39273947
MtmUtilityProcessedInXid = InvalidTransactionId;
39283948

3929-
if (context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY)
3930-
{
3931-
if (!skipCommand && !MtmTx.isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId)) {
3932-
MtmUtilityProcessedInXid = GetCurrentTransactionId();
3933-
MtmProcessDDLCommand(queryString);
3934-
executed = true;
3935-
}
3949+
if (!skipCommand && !MtmTx.isReplicated && (MtmUtilityProcessedInXid == InvalidTransactionId)) {
3950+
MtmUtilityProcessedInXid = GetCurrentTransactionId();
3951+
3952+
if (context == PROCESS_UTILITY_TOPLEVEL)
3953+
MtmProcessDDLCommand(queryString, true);
3954+
else
3955+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
3956+
3957+
executed = true;
39363958
}
39373959

39383960
if (PreviousProcessUtilityHook != NULL)
@@ -3945,19 +3967,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
39453967
standard_ProcessUtility(parsetree, queryString, context,
39463968
params, dest, completionTag);
39473969
}
3948-
39493970
if (!MtmVolksWagenMode && MtmTx.isDistributed && XactIsoLevel != XACT_REPEATABLE_READ) {
39503971
elog(ERROR, "Isolation level %s is not supported by multimaster", isoLevelStr[XactIsoLevel]);
39513972
}
3952-
3973+
39533974
if (MyXactAccessedTempRel)
39543975
{
39553976
MTM_LOG1("Xact accessed temp table, stopping replication");
39563977
MtmTx.isDistributed = false; /* Skip */
39573978
MtmTx.snapshot = INVALID_CSN;
39583979
}
39593980

3960-
if (executed)
3981+
if (executed && !skipCommand)
39613982
{
39623983
MtmFinishDDLCommand();
39633984
}

contrib/mmts/pglogical_apply.c

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
7171

7272
static bool process_remote_begin(StringInfo s);
73-
static void process_remote_transactional_message(StringInfo s);
7473
static void process_remote_message(StringInfo s);
7574
static void process_remote_commit(StringInfo s);
7675
static void process_remote_insert(StringInfo s, Relation rel);
@@ -355,35 +354,43 @@ process_remote_begin(StringInfo s)
355354
}
356355

357356
static void
358-
process_remote_transactional_message(StringInfo s)
357+
process_remote_message(StringInfo s)
359358
{
360-
int rc;
359+
char action = pq_getmsgbyte(s);
361360
int messageSize = pq_getmsgint(s, 4);
362-
char const* stmt = pq_getmsgbytes(s, messageSize);
361+
char const* messageBody = pq_getmsgbytes(s, messageSize);
363362

364-
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, stmt);
365-
SPI_connect();
366-
ActivePortal->sourceText = stmt;
367-
rc = SPI_execute(stmt, false, 0);
368-
SPI_finish();
369-
if (rc < 0)
370-
elog(ERROR, "Failed to execute utility statement %s", stmt);
363+
switch (action)
364+
{
365+
case 'C':
366+
{
367+
MTM_LOG1("%d: Executing non-tx utility statement %s", MyProcPid, messageBody);
368+
SetCurrentStatementStartTimestamp();
369+
StartTransactionCommand();
370+
/* intentional falldown to the next case */
371+
}
372+
case 'D':
373+
{
374+
int rc;
375+
376+
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, messageBody);
377+
SPI_connect();
378+
ActivePortal->sourceText = messageBody;
379+
rc = SPI_execute(messageBody, false, 0);
380+
SPI_finish();
381+
if (rc < 0)
382+
elog(ERROR, "Failed to execute utility statement %s", messageBody);
383+
break;
384+
}
385+
case 'L':
386+
{
387+
MTM_LOG3("%ld: Process deadlock message with size %d from %d", MtmGetSystemTime(), messageSize, MtmReplicationNodeId);
388+
MtmUpdateLockGraph(MtmReplicationNodeId, messageBody, messageSize);
389+
break;
390+
}
391+
}
371392

372-
//XXX: create messages for tables localization too.
373-
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
374-
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
375-
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
376-
// MtmMakeTableLocal(schema, name);
377-
// }
378-
}
379393

380-
static void
381-
process_remote_message(StringInfo s)
382-
{
383-
int messageSize = pq_getmsgint(s, 4);
384-
char const* messageBody = pq_getmsgbytes(s, messageSize);
385-
MTM_LOG3("%ld: Process deadlock message with size %d from %d", MtmGetSystemTime(), messageSize, MtmReplicationNodeId);
386-
MtmUpdateLockGraph(MtmReplicationNodeId, messageBody, messageSize);
387394
}
388395

389396
static void
@@ -1049,16 +1056,10 @@ void MtmExecutor(void* work, size_t size)
10491056
s.len = save_len;
10501057
continue;
10511058
}
1052-
case 'G':
1053-
case 'E':
1054-
{
1055-
process_remote_transactional_message(&s);
1056-
continue;
1057-
}
1058-
case 'L':
1059+
case 'M':
10591060
{
10601061
process_remote_message(&s);
1061-
break;
1062+
continue;
10621063
}
10631064
default:
10641065
elog(ERROR, "unknown action of type %c", action);

contrib/mmts/pglogical_proto.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pglogical_write_message(StringInfo out,
137137
{
138138
MTM_LOG1("Send deadlock message to node %d", MtmReplicationNodeId);
139139
}
140-
else if (*prefix == 'G')
140+
else if (*prefix == 'D')
141141
{
142142
if (MtmTransactionSnapshot(MtmCurrentXid) == INVALID_CSN)
143143
{
@@ -149,8 +149,14 @@ pglogical_write_message(StringInfo out,
149149
else if (*prefix == 'E')
150150
{
151151
DDLInProress = false;
152+
/*
153+
* we use End message only as indicator of DDL transaction finish,
154+
* so no need to send that to replicas.
155+
*/
156+
return;
152157
}
153158

159+
pq_sendbyte(out, 'M');
154160
pq_sendbyte(out, *prefix);
155161
pq_sendint(out, sz, 4);
156162
pq_sendbytes(out, message, sz);

0 commit comments

Comments
 (0)