Skip to content

Commit 6a8d759

Browse files
committed
Support remote functions
1 parent 8b99228 commit 6a8d759

File tree

4 files changed

+100
-57
lines changed

4 files changed

+100
-57
lines changed

contrib/mmts/multimaster.c

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#include "parser/analyze.h"
6969
#include "parser/parse_relation.h"
7070
#include "parser/parse_type.h"
71+
#include "parser/parse_func.h"
7172
#include "catalog/pg_class.h"
7273
#include "catalog/pg_type.h"
7374
#include "tcop/pquery.h"
@@ -157,6 +158,7 @@ static void MtmInitializeSequence(int64* start, int64* step);
157158
static void* MtmCreateSavepointContext(void);
158159
static void MtmRestoreSavepointContext(void* ctx);
159160
static void MtmReleaseSavepointContext(void* ctx);
161+
static void MtmSetRemoteFunction(char const* list, void* extra);
160162

161163
static void MtmCheckClusterLock(void);
162164
static void MtmCheckSlots(void);
@@ -183,6 +185,7 @@ MtmConnectionInfo* MtmConnections;
183185

184186
HTAB* MtmXid2State;
185187
HTAB* MtmGid2State;
188+
static HTAB* MtmRemoteFunctions;
186189
static HTAB* MtmLocalTables;
187190

188191
static bool MtmIsRecoverySession;
@@ -254,6 +257,7 @@ bool MtmVolksWagenMode; /* Pretend to be normal postgres. This means skip some
254257
TransactionId MtmUtilityProcessedInXid;
255258

256259
static char* MtmConnStrs;
260+
static char* MtmRemoteFunctionsList;
257261
static char* MtmClusterName;
258262
static int MtmQueueSize;
259263
static int MtmWorkers;
@@ -2567,7 +2571,7 @@ MtmCreateLocalTableMap(void)
25672571
"MtmLocalTables",
25682572
MULTIMASTER_MAX_LOCAL_TABLES, MULTIMASTER_MAX_LOCAL_TABLES,
25692573
&info,
2570-
HASH_ELEM
2574+
HASH_ELEM | HASH_BLOBS
25712575
);
25722576
return htab;
25732577
}
@@ -2761,6 +2765,48 @@ MtmShmemStartup(void)
27612765
MtmInitialize();
27622766
}
27632767

2768+
static void MtmSetRemoteFunction(char const* list, void* extra)
2769+
{
2770+
if (MtmRemoteFunctions) {
2771+
hash_destroy(MtmRemoteFunctions);
2772+
MtmRemoteFunctions = NULL;
2773+
}
2774+
}
2775+
2776+
static void MtmInitializeRemoteFunctionsMap()
2777+
{
2778+
HASHCTL info;
2779+
char* p, *q;
2780+
int n_funcs = 1;
2781+
FuncCandidateList clist;
2782+
2783+
for (p = MtmRemoteFunctionsList; (q = strchr(p, ',')) != NULL; p = q + 1, n_funcs++);
2784+
2785+
Assert(MtmRemoteFunctions == NULL);
2786+
2787+
memset(&info, 0, sizeof(info));
2788+
info.entrysize = info.keysize = sizeof(Oid);
2789+
info.hcxt = TopMemoryContext;
2790+
MtmRemoteFunctions = hash_create("MtmRemoteFunctions", n_funcs, &info,
2791+
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2792+
2793+
p = pstrdup(MtmRemoteFunctionsList);
2794+
do {
2795+
q = strchr(p, ',');
2796+
if (q != NULL) {
2797+
*q++ = '\0';
2798+
}
2799+
clist = FuncnameGetCandidates(stringToQualifiedNameList(p), -1, NIL, false, false, true);
2800+
if (clist == NULL) {
2801+
MTM_ELOG(ERROR, "Failed to lookup function %s", p);
2802+
} else if (clist->next != NULL) {
2803+
MTM_ELOG(ERROR, "Ambigious function %s", p);
2804+
}
2805+
hash_search(MtmRemoteFunctions, &clist->oid, HASH_ENTER, NULL);
2806+
p = q;
2807+
} while (p != NULL);
2808+
}
2809+
27642810
/*
27652811
* Parse node connection string.
27662812
* This function is called at cluster startup and while adding new cluster node
@@ -3377,6 +3423,19 @@ _PG_init(void)
33773423
NULL /* GucShowHook show_hook */
33783424
);
33793425

3426+
DefineCustomStringVariable(
3427+
"multimaster.remote_functions",
3428+
"List of fnuction names which should be executed remotely at all multimaster nodes instead of executing them at master and replicating result of their work",
3429+
NULL,
3430+
&MtmRemoteFunctionsList,
3431+
"lo_create,lo_unlink",
3432+
PGC_USERSET, /* context */
3433+
0, /* flags */
3434+
NULL, /* GucStringCheckHook check_hook */
3435+
MtmSetRemoteFunction, /* GucStringAssignHook assign_hook */
3436+
NULL /* GucShowHook show_hook */
3437+
);
3438+
33803439
DefineCustomStringVariable(
33813440
"multimaster.cluster_name",
33823441
"Name of the cluster",
@@ -3867,7 +3926,7 @@ lsn_t MtmGetFlushPosition(int nodeId)
38673926
* Keep track of progress of WAL writer.
38683927
* We need to notify WAL senders at other nodes which logical records
38693928
* are flushed to the disk and so can survive failure. In asynchronous commit mode
3870-
* WAL is flushed by WAL writer. Current flish position can be obtained by GetFlushRecPtr().
3929+
* WAL is flushed by WAL writer. Current flush position can be obtained by GetFlushRecPtr().
38713930
* So on applying new logical record we insert it in the MtmLsnMapping and compare
38723931
* their poistions in local WAL log with current flush position.
38733932
* The records which are flushed to the disk by WAL writer are removed from the list
@@ -4975,7 +5034,7 @@ char* MtmGucSerialize(void)
49755034
appendStringInfoString(serialized_gucs, " TO ");
49765035

49775036
/* quite a crutch */
4978-
if (strstr(cur_entry->key, "_mem") != NULL || *(cur_entry->value) == '\0')
5037+
if (strstr(cur_entry->key, "_mem") != NULL || *(cur_entry->value) == '\0' || strchr(cur_entry->value, ',') != NULL)
49795038
{
49805039
appendStringInfoString(serialized_gucs, "'");
49815040
appendStringInfoString(serialized_gucs, cur_entry->value);
@@ -5006,10 +5065,7 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
50065065
if (transactional)
50075066
{
50085067
char *gucCtx = MtmGucSerialize();
5009-
if (*gucCtx)
5010-
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s; %s", gucCtx, queryString);
5011-
else
5012-
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s", queryString);
5068+
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s", gucCtx, queryString);
50135069

50145070
/* Transactional DDL */
50155071
MTM_LOG3("Sending DDL: %s", queryString);
@@ -5377,29 +5433,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
53775433
static void
53785434
MtmExecutorStart(QueryDesc *queryDesc, int eflags)
53795435
{
5380-
bool ddl_generating_call = false;
5381-
ListCell *tlist;
5382-
5383-
foreach(tlist, queryDesc->plannedstmt->planTree->targetlist)
5436+
if (!MtmTx.isReplicated && ActivePortal)
53845437
{
5385-
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
5438+
ListCell *tlist;
53865439

5387-
if (tle->resname && strcmp(tle->resname, "lo_create") == 0)
5440+
if (!MtmRemoteFunctions)
53885441
{
5389-
ddl_generating_call = true;
5390-
break;
5442+
MtmInitializeRemoteFunctionsMap();
53915443
}
53925444

5393-
if (tle->resname && strcmp(tle->resname, "lo_unlink") == 0)
5445+
foreach(tlist, queryDesc->plannedstmt->planTree->targetlist)
53945446
{
5395-
ddl_generating_call = true;
5396-
break;
5447+
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
5448+
if (tle->expr && IsA(tle->expr, FuncExpr))
5449+
{
5450+
if (hash_search(MtmRemoteFunctions, &((FuncExpr*)tle->expr)->funcid, HASH_FIND, NULL))
5451+
{
5452+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5453+
break;
5454+
}
5455+
}
53975456
}
53985457
}
5399-
5400-
if (ddl_generating_call && !MtmTx.isReplicated)
5401-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5402-
54035458
if (PreviousExecutorStartHook != NULL)
54045459
PreviousExecutorStartHook(queryDesc, eflags);
54055460
else

contrib/mmts/multimaster.h

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,22 @@
5151
fprintf(stderr, MTM_TAG "%s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
5252
#endif
5353

54-
#define MULTIMASTER_NAME "multimaster"
55-
#define MULTIMASTER_SCHEMA_NAME "mtm"
56-
#define MULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
57-
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
58-
#define MULTIMASTER_MIN_PROTO_VERSION 1
59-
#define MULTIMASTER_MAX_PROTO_VERSION 1
60-
#define MULTIMASTER_MAX_GID_SIZE 32
61-
#define MULTIMASTER_MAX_SLOT_NAME_SIZE 16
62-
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
63-
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
64-
#define MULTIMASTER_MAX_LOCAL_TABLES 256
65-
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
66-
#define MULTIMASTER_LOCK_BUF_INIT_SIZE 4096
67-
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
68-
#define MULTIMASTER_ADMIN "mtm_admin"
69-
#define MULTIMASTER_PRECOMMITTED "precommitted"
54+
#define MULTIMASTER_NAME "multimaster"
55+
#define MULTIMASTER_SCHEMA_NAME "mtm"
56+
#define MULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
57+
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
58+
#define MULTIMASTER_MIN_PROTO_VERSION 1
59+
#define MULTIMASTER_MAX_PROTO_VERSION 1
60+
#define MULTIMASTER_MAX_GID_SIZE 32
61+
#define MULTIMASTER_MAX_SLOT_NAME_SIZE 16
62+
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
63+
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
64+
#define MULTIMASTER_MAX_LOCAL_TABLES 256
65+
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
66+
#define MULTIMASTER_LOCK_BUF_INIT_SIZE 4096
67+
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
68+
#define MULTIMASTER_ADMIN "mtm_admin"
69+
#define MULTIMASTER_PRECOMMITTED "precommitted"
7070

7171
#define MULTIMASTER_DEFAULT_ARBITER_PORT 5433
7272

contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -478,8 +478,8 @@ pglogical_receiver_main(Datum main_arg)
478478
{
479479
int64 now = feGetCurrentTimestamp();
480480

481-
/* Leave is feedback is not sent properly */
482481
MtmUpdateLsnMapping(nodeId, walEnd);
482+
/* Leave if feedback is not sent properly */
483483
if (!sendFeedback(conn, now, nodeId)) {
484484
goto OnError;
485485
}
@@ -628,7 +628,6 @@ pglogical_receiver_main(Datum main_arg)
628628
{
629629
int64 now = feGetCurrentTimestamp();
630630

631-
/* Leave is feedback is not sent properly */
632631
MtmUpdateLsnMapping(nodeId, INVALID_LSN);
633632
sendFeedback(conn, now, nodeId);
634633
}
@@ -724,4 +723,3 @@ void MtmStartReceivers(void)
724723
}
725724
}
726725
}
727-

contrib/mmts/pglogical_relid_map.c

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,19 @@ static void
2020
pglogical_relid_map_init(void)
2121
{
2222
HASHCTL ctl;
23-
int hash_flags = HASH_ELEM;
24-
2523
Assert(relid_map == NULL);
2624

2725
MemSet(&ctl, 0, sizeof(ctl));
2826
ctl.keysize = sizeof(Oid);
2927
ctl.entrysize = sizeof(PGLRelidMapEntry);
30-
31-
#if PG_VERSION_NUM >= 90500
32-
hash_flags |= HASH_BLOBS;
33-
#else
34-
ctl.hash = tag_hash;
35-
hash_flags |= HASH_FUNCTION;
36-
#endif
37-
38-
relid_map = hash_create("pglogical_relid_map", PGL_INIT_RELID_MAP_SIZE, &ctl, hash_flags);
28+
relid_map = hash_create("pglogical_relid_map", PGL_INIT_RELID_MAP_SIZE, &ctl, HASH_ELEM | HASH_BLOBS);
3929

4030
Assert(relid_map != NULL);
4131
}
4232

4333
Oid pglogical_relid_map_get(Oid relid)
4434
{
45-
if (relid_map != NULL) {
35+
if (relid_map != NULL) {
4636
PGLRelidMapEntry* entry = (PGLRelidMapEntry*)hash_search(relid_map, &relid, HASH_FIND, NULL);
4737
return entry ? entry->local_relid : InvalidOid;
4838
}
@@ -51,23 +41,23 @@ Oid pglogical_relid_map_get(Oid relid)
5141

5242
bool pglogical_relid_map_put(Oid remote_relid, Oid local_relid)
5343
{
54-
bool found;
44+
bool found;
5545
PGLRelidMapEntry* entry;
56-
if (relid_map == NULL) {
46+
if (relid_map == NULL) {
5747
pglogical_relid_map_init();
5848
}
5949
entry = hash_search(relid_map, &remote_relid, HASH_ENTER, &found);
6050
if (found) {
6151
entry->local_relid = local_relid;
62-
return false;
52+
return false;
6353
}
6454
entry->local_relid = local_relid;
6555
return true;
6656
}
6757

6858
void pglogical_relid_map_reset(void)
6959
{
70-
if (relid_map != NULL) {
60+
if (relid_map != NULL) {
7161
hash_destroy(relid_map);
7262
relid_map = NULL;
7363
}

0 commit comments

Comments
 (0)