Skip to content

Commit c431162

Browse files
committed
ignore logical messages outside of tx
1 parent 5a20755 commit c431162

File tree

2 files changed

+38
-111
lines changed

2 files changed

+38
-111
lines changed

multimaster.c

Lines changed: 25 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -3172,11 +3172,16 @@ MtmGenerateGid(char* gid)
31723172

31733173
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
31743174
{
3175-
if (MtmUtilityStmt && !MyXactAccessedTempRel)
3175+
if (MyXactAccessedTempRel)
31763176
{
3177-
MtmProcessDDLCommand(MtmUtilityStmt);
3178-
pfree(MtmUtilityStmt);
3179-
MtmUtilityStmt = NULL;
3177+
/*
3178+
* XXX: this tx anyway goes to subscribers later, but without
3179+
* surrounding begin/commit. Probably there is more clever way
3180+
* to do that.
3181+
*/
3182+
x->isDistributed = false;
3183+
x->csn = NULL;
3184+
return false;
31803185
}
31813186

31823187
if (!x->isReplicated && (x->isDistributed && x->containsDML)) {
@@ -3245,15 +3250,15 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32453250
case T_CreateTableSpaceStmt:
32463251
case T_AlterTableSpaceOptionsStmt:
32473252
case T_TruncateStmt:
3248-
case T_CommentStmt: /* XXX: we could replicate these */;
3253+
case T_CommentStmt:
32493254
case T_PrepareStmt:
32503255
case T_ExecuteStmt:
32513256
case T_DeallocateStmt:
32523257
case T_NotifyStmt:
32533258
case T_ListenStmt:
32543259
case T_UnlistenStmt:
32553260
case T_LoadStmt:
3256-
case T_ClusterStmt: /* XXX: we could replicate these */;
3261+
case T_ClusterStmt:
32573262
case T_VacuumStmt:
32583263
case T_ExplainStmt:
32593264
case T_VariableShowStmt:
@@ -3263,6 +3268,16 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32633268
case T_ReindexStmt:
32643269
skipCommand = true;
32653270
break;
3271+
3272+
/* Do not skip following unless temp object was accessed */
3273+
case T_CreateTableAsStmt:
3274+
case T_CreateStmt:
3275+
case T_ViewStmt:
3276+
case T_IndexStmt:
3277+
case T_DropStmt:
3278+
break;
3279+
3280+
/* Save GUC context for consequent DDL execution */
32663281
case T_DiscardStmt:
32673282
{
32683283
DiscardStmt *stmt = (DiscardStmt *) parsetree;
@@ -3279,8 +3294,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32793294
{
32803295
VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
32813296

3282-
// skipCommand = true;
3283-
32843297
/* Prevent SET TRANSACTION from replication */
32853298
if (stmt->kind == VAR_SET_MULTI)
32863299
skipCommand = true;
@@ -3292,88 +3305,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32923305
}
32933306
}
32943307
break;
3295-
case T_CreateTableAsStmt:
3296-
// {
3297-
// /* Do not replicate temp tables */
3298-
// CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
3299-
// skipCommand = stmt->into->rel->relpersistence == RELPERSISTENCE_TEMP ||
3300-
// (stmt->into->rel->schemaname && strcmp(stmt->into->rel->schemaname, "pg_temp") == 0);
3301-
// }
3302-
break;
3303-
case T_CreateStmt:
3304-
{
3305-
/* Do not replicate temp tables */
3306-
CreateStmt *stmt = (CreateStmt *) parsetree;
3307-
skipCommand = stmt->relation->relpersistence == RELPERSISTENCE_TEMP ||
3308-
(stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
3309-
}
3310-
break;
3311-
case T_ViewStmt:
3312-
{
3313-
ViewStmt *stmt = (ViewStmt *) parsetree;
3314-
Query *viewParse;
3315-
3316-
viewParse = parse_analyze((Node *) copyObject(stmt->query),
3317-
queryString, NULL, 0);
3318-
skipCommand = isQueryUsingTempRelation(viewParse) ||
3319-
stmt->view->relpersistence == RELPERSISTENCE_TEMP;
3320-
// ||
3321-
// (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
3322-
}
3323-
break;
3324-
case T_IndexStmt:
3325-
{
3326-
Oid relid;
3327-
Relation rel;
3328-
IndexStmt *stmt = (IndexStmt *) parsetree;
3329-
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
33303308

3331-
if (stmt->concurrent)
3332-
PreventTransactionChain(isTopLevel,
3333-
"CREATE INDEX CONCURRENTLY");
3334-
3335-
relid = RelnameGetRelid(stmt->relation->relname);
3336-
3337-
if (OidIsValid(relid))
3338-
{
3339-
rel = heap_open(relid, ShareLock);
3340-
skipCommand = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
3341-
heap_close(rel, ShareLock);
3342-
}
3343-
}
3344-
break;
3345-
case T_DropStmt:
3346-
{
3347-
DropStmt *stmt = (DropStmt *) parsetree;
3348-
3349-
if (stmt->removeType == OBJECT_TABLE)
3350-
{
3351-
RangeVar *rv = makeRangeVarFromNameList(
3352-
(List *) lfirst(list_head(stmt->objects)));
3353-
Oid relid = RelnameGetRelid(rv->relname);
3354-
3355-
if (OidIsValid(relid))
3356-
{
3357-
Relation rel = heap_open(relid, ShareLock);
3358-
skipCommand = rel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
3359-
heap_close(rel, ShareLock);
3360-
}
3361-
}
3362-
else if (stmt->removeType == OBJECT_INDEX)
3363-
{
3364-
RangeVar *rv = makeRangeVarFromNameList(
3365-
(List *) lfirst(list_head(stmt->objects)));
3366-
Oid relid = RelnameGetRelid(rv->relname);
3367-
3368-
if (OidIsValid(relid))
3369-
{
3370-
Relation irel = index_open(relid, ShareLock);
3371-
skipCommand = irel->rd_rel->relpersistence == RELPERSISTENCE_TEMP;
3372-
index_close(irel, ShareLock);
3373-
}
3374-
}
3375-
}
3376-
break;
3309+
/* Copy need some special care */
33773310
case T_CopyStmt:
33783311
{
33793312
CopyStmt *copyStatement = (CopyStmt *) parsetree;
@@ -3404,20 +3337,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
34043337
if (context == PROCESS_UTILITY_TOPLEVEL)
34053338
{
34063339
if (!skipCommand && !MtmTx.isReplicated) {
3407-
// if (MtmProcessDDLCommand(queryString)) {
3408-
// return;
3409-
// }
3410-
3411-
MemoryContext oldcontext;
3412-
3413-
if (MtmUtilityStmt)
3414-
pfree(MtmUtilityStmt);
3415-
3416-
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3417-
MtmUtilityStmt = palloc(strlen(queryString) + 1);
3418-
MemoryContextSwitchTo(oldcontext);
3419-
3420-
strncpy(MtmUtilityStmt, queryString, strlen(queryString) + 1);
3340+
if (MtmProcessDDLCommand(queryString)) {
3341+
return;
3342+
}
34213343
}
34223344
}
34233345

pglogical_apply.c

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ typedef struct TupleData
5959
bool changed[MaxTupleAttributeNumber];
6060
} TupleData;
6161

62+
static bool inside_tx = false;
63+
6264
static Relation read_rel(StringInfo s, LOCKMODE mode);
6365
static void read_tuple_parts(StringInfo s, Relation rel, TupleData *tup);
6466
static EState* create_rel_estate(Relation rel);
@@ -339,6 +341,8 @@ process_remote_begin(StringInfo s)
339341
StartTransactionCommand();
340342
MtmJoinTransaction(&gtid, snapshot);
341343

344+
inside_tx = true;
345+
342346
MTM_LOG1("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
343347
}
344348

@@ -349,9 +353,14 @@ process_remote_message(StringInfo s)
349353
int rc;
350354

351355
stmt = pq_getmsgstring(s);
352-
MTM_LOG1("utility: %s", stmt);
353-
MTM_LOG3("%d: Execute utility statement %s", MyProcPid, stmt);
354356

357+
if (!inside_tx)
358+
{
359+
MTM_LOG1("%d: Ignoring utility statement %s", MyProcPid, stmt);
360+
return;
361+
}
362+
363+
MTM_LOG1("%d: Executing utility statement %s", MyProcPid, stmt);
355364
SPI_connect();
356365
rc = SPI_execute(stmt, false, 0);
357366
SPI_finish();
@@ -631,6 +640,7 @@ process_remote_commit(StringInfo in)
631640
if (flags & PGLOGICAL_CAUGHT_UP) {
632641
MtmRecoveryCompleted();
633642
}
643+
inside_tx = false;
634644
}
635645

636646
static void
@@ -950,12 +960,7 @@ void MtmExecutor(int id, void* work, size_t size)
950960
{
951961
while (true) {
952962
char action = pq_getmsgbyte(&s);
953-
MTM_LOG3("%d: REMOTE process action %c", MyProcPid, action);
954-
#if 0
955-
if (Mtm->status == MTM_RECOVERY) {
956-
MTM_LOG1("Replay action %c[%x]", action, s.data[s.cursor]);
957-
}
958-
#endif
963+
MTM_LOG1("%d: REMOTE process action %c", MyProcPid, action);
959964
switch (action) {
960965
/* BEGIN */
961966
case 'B':

0 commit comments

Comments
 (0)