@@ -140,7 +140,7 @@ HTAB* MtmXid2State;
140
140
static HTAB * MtmGid2State ;
141
141
static HTAB * MtmLocalTables ;
142
142
143
- static bool MtmIsRecoverySession ;
143
+ static bool MtmIsRecoverySession ;
144
144
145
145
static MtmCurrentTrans MtmTx ;
146
146
@@ -199,6 +199,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
199
199
ProcessUtilityContext context , ParamListInfo params ,
200
200
DestReceiver * dest , char * completionTag );
201
201
202
+ static StringInfo MtmGUCBuffer ;
203
+ static bool MtmGUCBufferAllocated = false;
204
+
202
205
/*
203
206
* -------------------------------------------
204
207
* Synchronize access to MTM structures.
@@ -1934,7 +1937,7 @@ MtmReplicationStartupHook(struct PGLogicalStartupHookArgs* args)
1934
1937
elog (NOTICE , "Node %d start logical replication to node %d in normal mode" , MtmNodeId , MtmReplicationNodeId );
1935
1938
}
1936
1939
MtmUnlock ();
1937
- on_proc_exit (MtmOnProcExit , 0 );
1940
+ on_shmem_exit (MtmOnProcExit , 0 );
1938
1941
}
1939
1942
1940
1943
static void
@@ -2239,6 +2242,12 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2239
2242
{
2240
2243
if (conns [i ])
2241
2244
{
2245
+ if (MtmGUCBufferAllocated && !MtmRunUtilityStmt (conns [i ], MtmGUCBuffer -> data , & utility_errmsg ) && !ignoreError )
2246
+ {
2247
+ errorMsg = "Failed to set GUC variables at node %d" ;
2248
+ failedNode = i ;
2249
+ break ;
2250
+ }
2242
2251
if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" , & utility_errmsg ) && !ignoreError )
2243
2252
{
2244
2253
errorMsg = "Failed to start transaction at node %d" ;
@@ -2250,7 +2259,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2250
2259
if (i + 1 == MtmNodeId )
2251
2260
errorMsg = utility_errmsg ;
2252
2261
else
2262
+ {
2263
+ elog (ERROR , utility_errmsg );
2253
2264
errorMsg = "Failed to run command at node %d" ;
2265
+ }
2254
2266
2255
2267
failedNode = i ;
2256
2268
break ;
@@ -2381,7 +2393,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2381
2393
ProcessUtilityContext context , ParamListInfo params ,
2382
2394
DestReceiver * dest , char * completionTag )
2383
2395
{
2384
- bool skipCommand ;
2396
+ bool skipCommand = false ;
2385
2397
MTM_TRACE ("%d: Process utility statement %s\n" , MyProcPid , queryString );
2386
2398
switch (nodeTag (parsetree ))
2387
2399
{
@@ -2412,7 +2424,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2412
2424
case T_FetchStmt :
2413
2425
case T_DoStmt :
2414
2426
case T_CreateTableSpaceStmt :
2415
- case T_DropTableSpaceStmt :
2416
2427
case T_AlterTableSpaceOptionsStmt :
2417
2428
case T_TruncateStmt :
2418
2429
case T_CommentStmt : /* XXX: we could replicate these */ ;
@@ -2421,9 +2432,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2421
2432
case T_ExecuteStmt :
2422
2433
case T_DeallocateStmt :
2423
2434
case T_GrantStmt : /* XXX: we could replicate some of these these */ ;
2424
- case T_GrantRoleStmt :
2425
- case T_AlterDatabaseStmt :
2426
- case T_AlterDatabaseSetStmt :
2435
+ // case T_GrantRoleStmt:
2436
+ // case T_AlterDatabaseStmt:
2437
+ // case T_AlterDatabaseSetStmt:
2427
2438
case T_NotifyStmt :
2428
2439
case T_ListenStmt :
2429
2440
case T_UnlistenStmt :
@@ -2432,22 +2443,46 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2432
2443
case T_VacuumStmt :
2433
2444
case T_ExplainStmt :
2434
2445
case T_AlterSystemStmt :
2435
- case T_VariableSetStmt :
2436
2446
case T_VariableShowStmt :
2437
2447
case T_DiscardStmt :
2438
- case T_CreateEventTrigStmt :
2439
- case T_AlterEventTrigStmt :
2440
- case T_CreateRoleStmt :
2441
- case T_AlterRoleStmt :
2442
- case T_AlterRoleSetStmt :
2443
- case T_DropRoleStmt :
2448
+ // case T_CreateEventTrigStmt:
2449
+ // case T_AlterEventTrigStmt:
2450
+ // case T_CreateRoleStmt:
2451
+ // case T_AlterRoleStmt:
2452
+ // case T_AlterRoleSetStmt:
2453
+ // case T_DropRoleStmt:
2444
2454
case T_ReassignOwnedStmt :
2445
2455
case T_LockStmt :
2446
- case T_ConstraintsSetStmt :
2456
+ // case T_ConstraintsSetStmt:
2447
2457
case T_CheckPointStmt :
2448
2458
case T_ReindexStmt :
2449
2459
skipCommand = true;
2450
2460
break ;
2461
+ case T_VariableSetStmt :
2462
+ {
2463
+ //VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
2464
+
2465
+ if (!MtmGUCBufferAllocated )
2466
+ {
2467
+ MemoryContext oldcontext ;
2468
+
2469
+ oldcontext = MemoryContextSwitchTo (TopMemoryContext );
2470
+ MtmGUCBuffer = makeStringInfo ();
2471
+ MemoryContextSwitchTo (oldcontext );
2472
+ MtmGUCBufferAllocated = true;
2473
+ }
2474
+
2475
+ //appendStringInfoString(MtmGUCBuffer, "SET ");
2476
+ //appendStringInfoString(MtmGUCBuffer, stmt->name);
2477
+ //appendStringInfoString(MtmGUCBuffer, " TO ");
2478
+ //appendStringInfoString(MtmGUCBuffer, ExtractSetVariableArgs(stmt));
2479
+ //appendStringInfoString(MtmGUCBuffer, "; ");
2480
+
2481
+ appendStringInfoString (MtmGUCBuffer , queryString );
2482
+
2483
+ skipCommand = true;
2484
+ }
2485
+ break ;
2451
2486
case T_CreateStmt :
2452
2487
{
2453
2488
/* Do not replicate temp tables */
@@ -2467,9 +2502,32 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2467
2502
"CREATE INDEX CONCURRENTLY" );
2468
2503
2469
2504
relid = RelnameGetRelid (stmt -> relation -> relname );
2470
- rel = heap_open (relid , ShareLock );
2471
- skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2472
- heap_close (rel , NoLock );
2505
+
2506
+ if (OidIsValid (relid ))
2507
+ {
2508
+ rel = heap_open (relid , ShareLock );
2509
+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2510
+ heap_close (rel , NoLock );
2511
+ }
2512
+ }
2513
+ break ;
2514
+ case T_DropStmt :
2515
+ {
2516
+ DropStmt * stmt = (DropStmt * ) parsetree ;
2517
+
2518
+ if (stmt -> removeType == OBJECT_TABLE )
2519
+ {
2520
+ RangeVar * rv = makeRangeVarFromNameList (
2521
+ (List * ) lfirst (list_head (stmt -> objects )));
2522
+ Oid relid = RelnameGetRelid (rv -> relname );
2523
+
2524
+ if (OidIsValid (relid ))
2525
+ {
2526
+ Relation rel = heap_open (relid , ShareLock );
2527
+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2528
+ heap_close (rel , ShareLock );
2529
+ }
2530
+ }
2473
2531
}
2474
2532
break ;
2475
2533
default :
0 commit comments