68
68
#include "parser/analyze.h"
69
69
#include "parser/parse_relation.h"
70
70
#include "parser/parse_type.h"
71
+ #include "parser/parse_func.h"
71
72
#include "catalog/pg_class.h"
72
73
#include "catalog/pg_type.h"
73
74
#include "tcop/pquery.h"
@@ -157,6 +158,7 @@ static void MtmInitializeSequence(int64* start, int64* step);
157
158
static void * MtmCreateSavepointContext (void );
158
159
static void MtmRestoreSavepointContext (void * ctx );
159
160
static void MtmReleaseSavepointContext (void * ctx );
161
+ static void MtmSetRemoteFunction (char const * list , void * extra );
160
162
161
163
static void MtmCheckClusterLock (void );
162
164
static void MtmCheckSlots (void );
@@ -183,6 +185,7 @@ MtmConnectionInfo* MtmConnections;
183
185
184
186
HTAB * MtmXid2State ;
185
187
HTAB * MtmGid2State ;
188
+ static HTAB * MtmRemoteFunctions ;
186
189
static HTAB * MtmLocalTables ;
187
190
188
191
static bool MtmIsRecoverySession ;
@@ -254,6 +257,7 @@ bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some
254
257
TransactionId MtmUtilityProcessedInXid ;
255
258
256
259
static char * MtmConnStrs ;
260
+ static char * MtmRemoteFunctionsList ;
257
261
static char * MtmClusterName ;
258
262
static int MtmQueueSize ;
259
263
static int MtmWorkers ;
@@ -2567,7 +2571,7 @@ MtmCreateLocalTableMap(void)
2567
2571
"MtmLocalTables" ,
2568
2572
MULTIMASTER_MAX_LOCAL_TABLES , MULTIMASTER_MAX_LOCAL_TABLES ,
2569
2573
& info ,
2570
- HASH_ELEM
2574
+ HASH_ELEM | HASH_BLOBS
2571
2575
);
2572
2576
return htab ;
2573
2577
}
@@ -2761,6 +2765,48 @@ MtmShmemStartup(void)
2761
2765
MtmInitialize ();
2762
2766
}
2763
2767
2768
+ static void MtmSetRemoteFunction (char const * list , void * extra )
2769
+ {
2770
+ if (MtmRemoteFunctions ) {
2771
+ hash_destroy (MtmRemoteFunctions );
2772
+ MtmRemoteFunctions = NULL ;
2773
+ }
2774
+ }
2775
+
2776
+ static void MtmInitializeRemoteFunctionsMap ()
2777
+ {
2778
+ HASHCTL info ;
2779
+ char * p , * q ;
2780
+ int n_funcs = 1 ;
2781
+ FuncCandidateList clist ;
2782
+
2783
+ for (p = MtmRemoteFunctionsList ; (q = strchr (p , ',' )) != NULL ; p = q + 1 , n_funcs ++ );
2784
+
2785
+ Assert (MtmRemoteFunctions == NULL );
2786
+
2787
+ memset (& info , 0 , sizeof (info ));
2788
+ info .entrysize = info .keysize = sizeof (Oid );
2789
+ info .hcxt = TopMemoryContext ;
2790
+ MtmRemoteFunctions = hash_create ("MtmRemoteFunctions" , n_funcs , & info ,
2791
+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT );
2792
+
2793
+ p = pstrdup (MtmRemoteFunctionsList );
2794
+ do {
2795
+ q = strchr (p , ',' );
2796
+ if (q != NULL ) {
2797
+ * q ++ = '\0' ;
2798
+ }
2799
+ clist = FuncnameGetCandidates (stringToQualifiedNameList (p ), -1 , NIL , false, false, true);
2800
+ if (clist == NULL ) {
2801
+ MTM_ELOG (ERROR , "Failed to lookup function %s" , p );
2802
+ } else if (clist -> next != NULL ) {
2803
+ MTM_ELOG (ERROR , "Ambigious function %s" , p );
2804
+ }
2805
+ hash_search (MtmRemoteFunctions , & clist -> oid , HASH_ENTER , NULL );
2806
+ p = q ;
2807
+ } while (p != NULL );
2808
+ }
2809
+
2764
2810
/*
2765
2811
* Parse node connection string.
2766
2812
* This function is called at cluster startup and while adding new cluster node
@@ -3377,6 +3423,19 @@ _PG_init(void)
3377
3423
NULL /* GucShowHook show_hook */
3378
3424
);
3379
3425
3426
+ DefineCustomStringVariable (
3427
+ "multimaster.remote_functions" ,
3428
+ "List of fnuction names which should be executed remotely at all multimaster nodes instead of executing them at master and replicating result of their work" ,
3429
+ NULL ,
3430
+ & MtmRemoteFunctionsList ,
3431
+ "lo_create,lo_unlink" ,
3432
+ PGC_USERSET , /* context */
3433
+ 0 , /* flags */
3434
+ NULL , /* GucStringCheckHook check_hook */
3435
+ MtmSetRemoteFunction , /* GucStringAssignHook assign_hook */
3436
+ NULL /* GucShowHook show_hook */
3437
+ );
3438
+
3380
3439
DefineCustomStringVariable (
3381
3440
"multimaster.cluster_name" ,
3382
3441
"Name of the cluster" ,
@@ -3867,7 +3926,7 @@ lsn_t MtmGetFlushPosition(int nodeId)
3867
3926
* Keep track of progress of WAL writer.
3868
3927
* We need to notify WAL senders at other nodes which logical records
3869
3928
* are flushed to the disk and so can survive failure. In asynchronous commit mode
3870
- * WAL is flushed by WAL writer. Current flish position can be obtained by GetFlushRecPtr().
3929
+ * WAL is flushed by WAL writer. Current flush position can be obtained by GetFlushRecPtr().
3871
3930
* So on applying new logical record we insert it in the MtmLsnMapping and compare
3872
3931
* their poistions in local WAL log with current flush position.
3873
3932
* The records which are flushed to the disk by WAL writer are removed from the list
@@ -4975,7 +5034,7 @@ char* MtmGucSerialize(void)
4975
5034
appendStringInfoString (serialized_gucs , " TO " );
4976
5035
4977
5036
/* quite a crutch */
4978
- if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' )
5037
+ if (strstr (cur_entry -> key , "_mem" ) != NULL || * (cur_entry -> value ) == '\0' || strchr ( cur_entry -> value , ',' ) != NULL )
4979
5038
{
4980
5039
appendStringInfoString (serialized_gucs , "'" );
4981
5040
appendStringInfoString (serialized_gucs , cur_entry -> value );
@@ -5006,10 +5065,7 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
5006
5065
if (transactional )
5007
5066
{
5008
5067
char * gucCtx = MtmGucSerialize ();
5009
- if (* gucCtx )
5010
- queryString = psprintf ("RESET SESSION AUTHORIZATION; reset all; %s; %s" , gucCtx , queryString );
5011
- else
5012
- queryString = psprintf ("RESET SESSION AUTHORIZATION; reset all; %s" , queryString );
5068
+ queryString = psprintf ("RESET SESSION AUTHORIZATION; reset all; %s %s" , gucCtx , queryString );
5013
5069
5014
5070
/* Transactional DDL */
5015
5071
MTM_LOG3 ("Sending DDL: %s" , queryString );
@@ -5377,29 +5433,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
5377
5433
static void
5378
5434
MtmExecutorStart (QueryDesc * queryDesc , int eflags )
5379
5435
{
5380
- bool ddl_generating_call = false;
5381
- ListCell * tlist ;
5382
-
5383
- foreach (tlist , queryDesc -> plannedstmt -> planTree -> targetlist )
5436
+ if (!MtmTx .isReplicated && ActivePortal )
5384
5437
{
5385
- TargetEntry * tle = ( TargetEntry * ) lfirst ( tlist ) ;
5438
+ ListCell * tlist ;
5386
5439
5387
- if (tle -> resname && strcmp ( tle -> resname , "lo_create" ) == 0 )
5440
+ if (! MtmRemoteFunctions )
5388
5441
{
5389
- ddl_generating_call = true;
5390
- break ;
5442
+ MtmInitializeRemoteFunctionsMap ();
5391
5443
}
5392
5444
5393
- if ( tle -> resname && strcmp ( tle -> resname , "lo_unlink" ) == 0 )
5445
+ foreach ( tlist , queryDesc -> plannedstmt -> planTree -> targetlist )
5394
5446
{
5395
- ddl_generating_call = true;
5396
- break ;
5447
+ TargetEntry * tle = (TargetEntry * ) lfirst (tlist );
5448
+ if (tle -> expr && IsA (tle -> expr , FuncExpr ))
5449
+ {
5450
+ if (hash_search (MtmRemoteFunctions , & ((FuncExpr * )tle -> expr )-> funcid , HASH_FIND , NULL ))
5451
+ {
5452
+ MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5453
+ break ;
5454
+ }
5455
+ }
5397
5456
}
5398
5457
}
5399
-
5400
- if (ddl_generating_call && !MtmTx .isReplicated )
5401
- MtmProcessDDLCommand (ActivePortal -> sourceText , true);
5402
-
5403
5458
if (PreviousExecutorStartHook != NULL )
5404
5459
PreviousExecutorStartHook (queryDesc , eflags );
5405
5460
else
0 commit comments