Skip to content

Commit ff1651b

Browse files
committed
Eliminate recursion
1 parent 4e84ffc commit ff1651b

File tree

4 files changed

+131
-113
lines changed

4 files changed

+131
-113
lines changed

contrib/multimaster/decoder_raw.c

Lines changed: 20 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
4141
typedef struct
4242
{
4343
MemoryContext context;
44-
bool include_transaction;
44+
bool isExternal;
4545
} DecoderRawData;
4646

4747
static void decoder_raw_startup(LogicalDecodingContext *ctx,
@@ -79,67 +79,16 @@ decoder_raw_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
7979
ListCell *option;
8080
DecoderRawData *data;
8181

82-
data = palloc(sizeof(DecoderRawData));
82+
data = (DecoderRawData*)palloc(sizeof(DecoderRawData));
8383
data->context = AllocSetContextCreate(ctx->context,
8484
"Raw decoder context",
8585
ALLOCSET_DEFAULT_MINSIZE,
8686
ALLOCSET_DEFAULT_INITSIZE,
8787
ALLOCSET_DEFAULT_MAXSIZE);
88-
data->include_transaction = false;
89-
88+
data->isExternal = false;
9089
ctx->output_plugin_private = data;
9190

92-
/* Default output format */
9391
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
94-
95-
foreach(option, ctx->output_plugin_options)
96-
{
97-
DefElem *elem = lfirst(option);
98-
99-
Assert(elem->arg == NULL || IsA(elem->arg, String));
100-
101-
if (strcmp(elem->defname, "include_transaction") == 0)
102-
{
103-
/* if option does not provide a value, it means its value is true */
104-
if (elem->arg == NULL)
105-
data->include_transaction = true;
106-
else if (!parse_bool(strVal(elem->arg), &data->include_transaction))
107-
ereport(ERROR,
108-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
109-
errmsg("could not parse value \"%s\" for parameter \"%s\"",
110-
strVal(elem->arg), elem->defname)));
111-
}
112-
else if (strcmp(elem->defname, "output_format") == 0)
113-
{
114-
char *format = NULL;
115-
116-
if (elem->arg == NULL)
117-
ereport(ERROR,
118-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
119-
errmsg("No value specified for parameter \"%s\"",
120-
elem->defname)));
121-
122-
format = strVal(elem->arg);
123-
124-
if (strcmp(format, "textual") == 0)
125-
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
126-
else if (strcmp(format, "binary") == 0)
127-
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
128-
else
129-
ereport(ERROR,
130-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
131-
errmsg("Incorrect value \"%s\" for parameter \"%s\"",
132-
format, elem->defname)));
133-
}
134-
else
135-
{
136-
ereport(ERROR,
137-
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
138-
errmsg("option \"%s\" = \"%s\" is unknown",
139-
elem->defname,
140-
elem->arg ? strVal(elem->arg) : "(null)")));
141-
}
142-
}
14392
}
14493

14594
/* cleanup this plugin's resources */
@@ -155,16 +104,16 @@ decoder_raw_shutdown(LogicalDecodingContext *ctx)
155104
/* BEGIN callback */
156105
static void
157106
decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
158-
{
107+
{
159108
DecoderRawData *data = ctx->output_plugin_private;
160-
161-
/* Write to the plugin only if there is */
162-
if (data->include_transaction)
163-
{
164-
OutputPluginPrepareWrite(ctx, true);
165-
appendStringInfoString(ctx->out, "BEGIN;");
166-
OutputPluginWrite(ctx, true);
167-
}
109+
110+
if (MultimasterIsExternalTransaction(txn->xid)) {
111+
data->isExternal = true;
112+
} else {
113+
OutputPluginPrepareWrite(ctx, true);
114+
appendStringInfoString(ctx->out, "BEGIN %u;", txn->xid);
115+
OutputPluginWrite(ctx, true);
116+
}
168117
}
169118

170119
/* COMMIT callback */
@@ -173,14 +122,11 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
173122
XLogRecPtr commit_lsn)
174123
{
175124
DecoderRawData *data = ctx->output_plugin_private;
176-
177-
/* Write to the plugin only if there is */
178-
if (data->include_transaction)
179-
{
180-
OutputPluginPrepareWrite(ctx, true);
181-
appendStringInfoString(ctx->out, "COMMIT;");
182-
OutputPluginWrite(ctx, true);
183-
}
125+
if (!data->isExternal) {
126+
OutputPluginPrepareWrite(ctx, true);
127+
appendStringInfoString(ctx->out, "COMMIT;");
128+
OutputPluginWrite(ctx, true);
129+
}
184130
}
185131

186132
/*
@@ -530,7 +476,9 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
530476
bool is_rel_non_selective;
531477

532478
data = ctx->output_plugin_private;
533-
479+
if (data->isExternal) {
480+
return;
481+
}
534482
/* Avoid leaking memory by using and resetting our own context */
535483
old = MemoryContextSwitchTo(data->context);
536484

contrib/multimaster/multimaster.c

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include "sockhub/sockhub.h"
4545

4646
#include "libdtm.h"
47+
#include "multimaster.h"
4748

4849
typedef struct
4950
{
@@ -52,6 +53,7 @@ typedef struct
5253
TransactionId minXid; /* XID of oldest transaction visible by any active transaction (local or global) */
5354
TransactionId nextXid; /* next XID for local transaction */
5455
size_t nReservedXids; /* number of XIDs reserved for local transactions */
56+
int nNodes;
5557
} DtmState;
5658

5759
typedef struct
@@ -61,19 +63,18 @@ typedef struct
6163
int used;
6264
} ByteBuffer;
6365

66+
typedef struct
67+
{
68+
TransactionId xid;
69+
int count;
70+
} ExternalTransaction;
6471

6572
#define DTM_SHMEM_SIZE (1024*1024)
6673
#define DTM_HASH_SIZE 1003
6774

6875
void _PG_init(void);
6976
void _PG_fini(void);
7077

71-
extern void LogicalReplicationStartReceivers(char* nodes, int node_id);
72-
extern void LogicalReplicationBroadcastXid(TransactonId Xid);
73-
74-
void MultimasterBeginTransaction(void);
75-
void MultimasterJoinTransaction(TransactionId xid);
76-
7778
static Snapshot DtmGetSnapshot(Snapshot snapshot);
7879
static void DtmMergeWithGlobalSnapshot(Snapshot snapshot);
7980
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
@@ -103,6 +104,7 @@ static void ByteBufferFree(ByteBuffer* buf);
103104

104105
static shmem_startup_hook_type prev_shmem_startup_hook;
105106
static HTAB* xid_in_doubt;
107+
static HTAB* external_trans;
106108
static DtmState* dtm;
107109
static Snapshot CurrentTransactionSnapshot;
108110

@@ -126,11 +128,14 @@ static TransactionManager DtmTM = {
126128

127129
static char* MultimasterConnStrs;
128130
static int MultimasterNodeId;
131+
static int MultimasterNodes;
129132

130133
static char* DtmHost;
131134
static int DtmPort;
132135
static int DtmBufferSize;
133136

137+
bool isBackgroundWorker;
138+
134139
static BackgroundWorker DtmWorker = {
135140
"DtmWorker",
136141
0, /* do not need connection to the database */
@@ -694,6 +699,7 @@ static void DtmInitialize()
694699
dtm->xidLock = LWLockAssign();
695700
dtm->nReservedXids = 0;
696701
dtm->minXid = InvalidTransactionId;
702+
dtm->nNodes = MultimasterNodes;
697703
RegisterXactCallback(DtmXactCallback, NULL);
698704
}
699705
LWLockRelease(AddinShmemInitLock);
@@ -709,6 +715,17 @@ static void DtmInitialize()
709715
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
710716
);
711717

718+
info.keysize = sizeof(TransactionId);
719+
info.entrysize = sizeof(ExternalTransaction);
720+
info.hash = dtm_xid_hash_fn;
721+
info.match = dtm_xid_match_fn;
722+
external_trans = ShmemInitHash(
723+
"external_trans",
724+
DTM_HASH_SIZE, DTM_HASH_SIZE,
725+
&info,
726+
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
727+
);
728+
712729

713730
TM = &DtmTM;
714731
}
@@ -720,7 +737,9 @@ DtmXactCallback(XactEvent event, void *arg)
720737
switch (event)
721738
{
722739
case XACT_EVENT_BEGIN:
723-
MultimasterBeginTransaction();
740+
if (!isBackgroundWorker) {
741+
MultimasterBeginTransaction();
742+
}
724743
break;
725744
case XACT_EVENT_COMMIT:
726745
case XACT_EVENT_ABORT:
@@ -865,7 +884,7 @@ _PG_init(void)
865884
NULL
866885
);
867886

868-
LogicalReplicationStartReceivers(MultimasterConnStrs, MultimasterNodeId);
887+
MultimasterNodes = LogicalReplicationStartReceivers(MultimasterConnStrs, MultimasterNodeId);
869888

870889
if (DtmBufferSize != 0)
871890
{
@@ -931,10 +950,8 @@ dtm_get_current_snapshot_xcnt(PG_FUNCTION_ARGS)
931950

932951
void MultimasterBeginTransaction(void)
933952
{
934-
if (TransactionIdIsValid(DtmNextXid)) {
935-
/* slave transaction */
936-
return;
937-
}
953+
if (TransactionIdIsValid(DtmNextXid))
954+
elog(ERROR, "MultimasterBeginTransaction should be called only once for global transaction");
938955
if (dtm == NULL)
939956
elog(ERROR, "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf");
940957
DtmNextXid = DtmGlobalStartTransaction(&DtmSnapshot, &dtm->minXid);
@@ -944,12 +961,12 @@ void MultimasterBeginTransaction(void)
944961

945962
DtmHasGlobalSnapshot = true;
946963
DtmLastSnapshot = NULL;
947-
948-
LogicalReplicationBroadcastXid(DtmNextXid);
949964
}
950965

951966
void MultimasterJoinTransaction(TransactionId xid)
952967
{
968+
ExternalTrans* et;
969+
953970
if (TransactionIdIsValid(DtmNextXid))
954971
elog(ERROR, "dtm_begin/join_transaction should be called only once for global transaction");
955972
DtmNextXid = xid;
@@ -961,14 +978,39 @@ void MultimasterJoinTransaction(TransactionId xid)
961978

962979
DtmHasGlobalSnapshot = true;
963980
DtmLastSnapshot = NULL;
981+
982+
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
983+
et = hash_search(external_trans, &DtmNextXid, HASH_ENTER, NULL);
984+
et->count = dtm->nNodes-1;
985+
LWLockRelease(dtm->hashLock);
964986
}
965987

988+
bool MultimasterIsExternalTransaction(TransactionId xid)
989+
{
990+
ExternalTrans* et;
991+
bool result = false;
992+
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
993+
et = hash_search(external_trans, &DtmNextXid, HASH_FIND, NULL);
994+
if (et != NULL) {
995+
result = true;
996+
if (--et->count == 0) {
997+
hash_search(external_trans, &DtmNextXid, HASH_REMOVE, NULL);
998+
}
999+
}
1000+
LWLockRelease(dtm->hashLock);
1001+
return result;
1002+
}
1003+
1004+
1005+
9661006
void DtmBackgroundWorker(Datum arg)
9671007
{
9681008
Shub shub;
9691009
ShubParams params;
9701010
char unix_sock_path[MAXPGPATH];
9711011

1012+
isBackgroundWorker = true;
1013+
9721014
snprintf(unix_sock_path, sizeof(unix_sock_path), "%s/p%d", Unix_socket_directories, DtmPort);
9731015

9741016
ShubInitParams(&params);

contrib/multimaster/multimaster.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
#ifndef __MULTIMASTER_H__
2+
#define __MULTIMASTER_H__
3+
4+
extern int LogicalReplicationStartReceivers(char* nodes, int node_id);
5+
extern void MultimasterBeginTransaction(void);
6+
extern void MultimasterJoinTransaction(TransactionId xid);
7+
extern bool MultimasterIsExternalTransaction(TransactionId xid);
8+
9+
extern bool isBackgroundWorker;
10+
11+
#endif

0 commit comments

Comments
 (0)