@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_get_snapshot);
108
108
PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
109
109
PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
110
110
PG_FUNCTION_INFO_V1 (mtm_make_table_local );
111
+ PG_FUNCTION_INFO_V1 (mtm_dump_lock_graph );
111
112
112
113
static Snapshot MtmGetSnapshot (Snapshot snapshot );
113
114
static void MtmInitialize (void );
@@ -140,7 +141,7 @@ HTAB* MtmXid2State;
140
141
static HTAB * MtmGid2State ;
141
142
static HTAB * MtmLocalTables ;
142
143
143
- static bool MtmIsRecoverySession ;
144
+ static bool MtmIsRecoverySession ;
144
145
145
146
static MtmCurrentTrans MtmTx ;
146
147
@@ -199,6 +200,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
199
200
ProcessUtilityContext context , ParamListInfo params ,
200
201
DestReceiver * dest , char * completionTag );
201
202
203
+ static StringInfo MtmGUCBuffer ;
204
+ static bool MtmGUCBufferAllocated = false;
205
+
202
206
/*
203
207
* -------------------------------------------
204
208
* Synchronize access to MTM structures.
@@ -2153,6 +2157,31 @@ Datum mtm_make_table_local(PG_FUNCTION_ARGS)
2153
2157
return false;
2154
2158
}
2155
2159
2160
+ Datum mtm_dump_lock_graph (PG_FUNCTION_ARGS )
2161
+ {
2162
+ StringInfo s = makeStringInfo ();
2163
+ int i ;
2164
+ for (i = 0 ; i < MtmNodes ; i ++ )
2165
+ {
2166
+ size_t size ;
2167
+ char * data = RaftableGet (psprintf ("lock-graph-%d" , i + 1 ), & size , NULL , true);
2168
+ if (!data ) continue ;
2169
+ GlobalTransactionId * gtid = (GlobalTransactionId * )data ;
2170
+ GlobalTransactionId * last = (GlobalTransactionId * )(data + size );
2171
+ appendStringInfo (s , "node-%d lock graph: " , i + 1 );
2172
+ while (gtid != last ) {
2173
+ GlobalTransactionId * src = gtid ++ ;
2174
+ appendStringInfo (s , "%d:%d -> " , src -> node , src -> xid );
2175
+ while (gtid -> node != 0 ) {
2176
+ GlobalTransactionId * dst = gtid ++ ;
2177
+ appendStringInfo (s , "%d:%d, " , dst -> node , dst -> xid );
2178
+ }
2179
+ gtid += 1 ;
2180
+ }
2181
+ appendStringInfo (s , "\n" );
2182
+ }
2183
+ return CStringGetTextDatum (s -> data );
2184
+ }
2156
2185
2157
2186
/*
2158
2187
* -------------------------------------------
@@ -2241,6 +2270,12 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2241
2270
{
2242
2271
if (conns [i ])
2243
2272
{
2273
+ if (MtmGUCBufferAllocated && !MtmRunUtilityStmt (conns [i ], MtmGUCBuffer -> data , & utility_errmsg ) && !ignoreError )
2274
+ {
2275
+ errorMsg = "Failed to set GUC variables at node %d" ;
2276
+ failedNode = i ;
2277
+ break ;
2278
+ }
2244
2279
if (!MtmRunUtilityStmt (conns [i ], "BEGIN TRANSACTION" , & utility_errmsg ) && !ignoreError )
2245
2280
{
2246
2281
errorMsg = "Failed to start transaction at node %d" ;
@@ -2252,7 +2287,10 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
2252
2287
if (i + 1 == MtmNodeId )
2253
2288
errorMsg = utility_errmsg ;
2254
2289
else
2290
+ {
2291
+ elog (ERROR , utility_errmsg );
2255
2292
errorMsg = "Failed to run command at node %d" ;
2293
+ }
2256
2294
2257
2295
failedNode = i ;
2258
2296
break ;
@@ -2383,7 +2421,7 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2383
2421
ProcessUtilityContext context , ParamListInfo params ,
2384
2422
DestReceiver * dest , char * completionTag )
2385
2423
{
2386
- bool skipCommand ;
2424
+ bool skipCommand = false ;
2387
2425
MTM_TRACE ("%d: Process utility statement %s\n" , MyProcPid , queryString );
2388
2426
switch (nodeTag (parsetree ))
2389
2427
{
@@ -2414,7 +2452,6 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2414
2452
case T_FetchStmt :
2415
2453
case T_DoStmt :
2416
2454
case T_CreateTableSpaceStmt :
2417
- case T_DropTableSpaceStmt :
2418
2455
case T_AlterTableSpaceOptionsStmt :
2419
2456
case T_TruncateStmt :
2420
2457
case T_CommentStmt : /* XXX: we could replicate these */ ;
@@ -2423,9 +2460,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2423
2460
case T_ExecuteStmt :
2424
2461
case T_DeallocateStmt :
2425
2462
case T_GrantStmt : /* XXX: we could replicate some of these these */ ;
2426
- case T_GrantRoleStmt :
2427
- case T_AlterDatabaseStmt :
2428
- case T_AlterDatabaseSetStmt :
2463
+ // case T_GrantRoleStmt:
2464
+ // case T_AlterDatabaseStmt:
2465
+ // case T_AlterDatabaseSetStmt:
2429
2466
case T_NotifyStmt :
2430
2467
case T_ListenStmt :
2431
2468
case T_UnlistenStmt :
@@ -2434,22 +2471,46 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2434
2471
case T_VacuumStmt :
2435
2472
case T_ExplainStmt :
2436
2473
case T_AlterSystemStmt :
2437
- case T_VariableSetStmt :
2438
2474
case T_VariableShowStmt :
2439
2475
case T_DiscardStmt :
2440
- case T_CreateEventTrigStmt :
2441
- case T_AlterEventTrigStmt :
2442
- case T_CreateRoleStmt :
2443
- case T_AlterRoleStmt :
2444
- case T_AlterRoleSetStmt :
2445
- case T_DropRoleStmt :
2476
+ // case T_CreateEventTrigStmt:
2477
+ // case T_AlterEventTrigStmt:
2478
+ // case T_CreateRoleStmt:
2479
+ // case T_AlterRoleStmt:
2480
+ // case T_AlterRoleSetStmt:
2481
+ // case T_DropRoleStmt:
2446
2482
case T_ReassignOwnedStmt :
2447
2483
case T_LockStmt :
2448
- case T_ConstraintsSetStmt :
2484
+ // case T_ConstraintsSetStmt:
2449
2485
case T_CheckPointStmt :
2450
2486
case T_ReindexStmt :
2451
2487
skipCommand = true;
2452
2488
break ;
2489
+ case T_VariableSetStmt :
2490
+ {
2491
+ //VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
2492
+
2493
+ if (!MtmGUCBufferAllocated )
2494
+ {
2495
+ MemoryContext oldcontext ;
2496
+
2497
+ oldcontext = MemoryContextSwitchTo (TopMemoryContext );
2498
+ MtmGUCBuffer = makeStringInfo ();
2499
+ MemoryContextSwitchTo (oldcontext );
2500
+ MtmGUCBufferAllocated = true;
2501
+ }
2502
+
2503
+ //appendStringInfoString(MtmGUCBuffer, "SET ");
2504
+ //appendStringInfoString(MtmGUCBuffer, stmt->name);
2505
+ //appendStringInfoString(MtmGUCBuffer, " TO ");
2506
+ //appendStringInfoString(MtmGUCBuffer, ExtractSetVariableArgs(stmt));
2507
+ //appendStringInfoString(MtmGUCBuffer, "; ");
2508
+
2509
+ appendStringInfoString (MtmGUCBuffer , queryString );
2510
+
2511
+ skipCommand = true;
2512
+ }
2513
+ break ;
2453
2514
case T_CreateStmt :
2454
2515
{
2455
2516
/* Do not replicate temp tables */
@@ -2469,9 +2530,32 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
2469
2530
"CREATE INDEX CONCURRENTLY" );
2470
2531
2471
2532
relid = RelnameGetRelid (stmt -> relation -> relname );
2472
- rel = heap_open (relid , ShareLock );
2473
- skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2474
- heap_close (rel , NoLock );
2533
+
2534
+ if (OidIsValid (relid ))
2535
+ {
2536
+ rel = heap_open (relid , ShareLock );
2537
+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2538
+ heap_close (rel , NoLock );
2539
+ }
2540
+ }
2541
+ break ;
2542
+ case T_DropStmt :
2543
+ {
2544
+ DropStmt * stmt = (DropStmt * ) parsetree ;
2545
+
2546
+ if (stmt -> removeType == OBJECT_TABLE )
2547
+ {
2548
+ RangeVar * rv = makeRangeVarFromNameList (
2549
+ (List * ) lfirst (list_head (stmt -> objects )));
2550
+ Oid relid = RelnameGetRelid (rv -> relname );
2551
+
2552
+ if (OidIsValid (relid ))
2553
+ {
2554
+ Relation rel = heap_open (relid , ShareLock );
2555
+ skipCommand = rel -> rd_rel -> relpersistence == RELPERSISTENCE_TEMP ;
2556
+ heap_close (rel , ShareLock );
2557
+ }
2558
+ }
2475
2559
}
2476
2560
break ;
2477
2561
default :
0 commit comments