Skip to content

Commit 23aa459

Browse files
committed
Add start/stop replication functions
1 parent efe7ec4 commit 23aa459

File tree

6 files changed

+88
-47
lines changed

6 files changed

+88
-47
lines changed

contrib/multimaster/decoder_raw.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
3939
typedef struct
4040
{
4141
MemoryContext context;
42-
bool isExternal;
42+
bool isLocal;
4343
} DecoderRawData;
4444

4545
static void decoder_raw_startup(LogicalDecodingContext *ctx,
@@ -82,7 +82,7 @@ decoder_raw_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
8282
ALLOCSET_DEFAULT_MINSIZE,
8383
ALLOCSET_DEFAULT_INITSIZE,
8484
ALLOCSET_DEFAULT_MAXSIZE);
85-
data->isExternal = false;
85+
data->isLocal = false;
8686
ctx->output_plugin_private = data;
8787

8888
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
@@ -104,8 +104,8 @@ decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
104104
{
105105
DecoderRawData *data = ctx->output_plugin_private;
106106

107-
if (MultimasterIsExternalTransaction(txn->xid)) {
108-
data->isExternal = true;
107+
if (MMIsLocalTransaction(txn->xid)) {
108+
data->isLocal = true;
109109
} else {
110110
OutputPluginPrepareWrite(ctx, true);
111111
appendStringInfo(ctx->out, "BEGIN %u;", txn->xid);
@@ -119,7 +119,7 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
119119
XLogRecPtr commit_lsn)
120120
{
121121
DecoderRawData *data = ctx->output_plugin_private;
122-
if (!data->isExternal) {
122+
if (!data->isLocal) {
123123
OutputPluginPrepareWrite(ctx, true);
124124
appendStringInfoString(ctx->out, "COMMIT;");
125125
OutputPluginWrite(ctx, true);
@@ -473,7 +473,7 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
473473
bool is_rel_non_selective;
474474

475475
data = ctx->output_plugin_private;
476-
if (data->isExternal) {
476+
if (data->isLocal) {
477477
return;
478478
}
479479
/* Avoid leaking memory by using and resetting our own context */
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,11 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION multimaster" to load this file. \quit
3+
4+
CREATE FUNCTION mm_start_replication() RETURNS void
5+
AS 'MODULE_PATHNAME','mm_start_replication'
6+
LANGUAGE C;
7+
8+
CREATE FUNCTION mm_stop_replication() RETURNS void
9+
AS 'MODULE_PATHNAME','mm_stop_replication'
10+
LANGUAGE C;
11+

contrib/multimaster/multimaster.c

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ typedef struct
6767
{
6868
TransactionId xid;
6969
int count;
70-
} ExternalTransaction;
70+
} LocalTransaction;
7171

7272
#define DTM_SHMEM_SIZE (1024*1024)
7373
#define DTM_HASH_SIZE 1003
@@ -77,6 +77,9 @@ void _PG_fini(void);
7777

7878
PG_MODULE_MAGIC;
7979

80+
PG_FUNCTION_INFO_V1(mm_start_replication);
81+
PG_FUNCTION_INFO_V1(mm_stop_replication);
82+
8083
static Snapshot DtmGetSnapshot(Snapshot snapshot);
8184
static void DtmMergeWithGlobalSnapshot(Snapshot snapshot);
8285
static XidStatus DtmGetTransactionStatus(TransactionId xid, XLogRecPtr *lsn);
@@ -103,10 +106,11 @@ static void ByteBufferAppend(ByteBuffer* buf, void* data, int len);
103106
static void ByteBufferAppendInt32(ByteBuffer* buf, int data);
104107
static void ByteBufferFree(ByteBuffer* buf);
105108

109+
static void MMMarkTransAsLocal(TransactionId xid);
106110

107111
static shmem_startup_hook_type prev_shmem_startup_hook;
108112
static HTAB* xid_in_doubt;
109-
static HTAB* external_trans;
113+
static HTAB* local_trans;
110114
static DtmState* dtm;
111115
static Snapshot CurrentTransactionSnapshot;
112116

@@ -128,9 +132,10 @@ static TransactionManager DtmTM = {
128132
DtmDetectGlobalDeadLock
129133
};
130134

131-
static char* MultimasterConnStrs;
132-
static int MultimasterNodeId;
133-
static int MultimasterNodes;
135+
static char* MMConnStrs;
136+
static int MMNodeId;
137+
static int MMNodes;
138+
static bool MMDoReplication = true;
134139

135140
static char* DtmHost;
136141
static int DtmPort;
@@ -145,8 +150,8 @@ static BackgroundWorker DtmWorker = {
145150
};
146151

147152
#define XTM_TRACE(fmt, ...)
148-
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
149-
#define XTM_INFO(fmt, ...)
153+
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
154+
//#define XTM_INFO(fmt, ...)
150155

151156
static void DumpSnapshot(Snapshot s, char *name)
152157
{
@@ -697,7 +702,7 @@ static void DtmInitialize()
697702
dtm->xidLock = LWLockAssign();
698703
dtm->nReservedXids = 0;
699704
dtm->minXid = InvalidTransactionId;
700-
dtm->nNodes = MultimasterNodes;
705+
dtm->nNodes = MMNodes;
701706
RegisterXactCallback(DtmXactCallback, NULL);
702707
}
703708
LWLockRelease(AddinShmemInitLock);
@@ -714,11 +719,11 @@ static void DtmInitialize()
714719
);
715720

716721
info.keysize = sizeof(TransactionId);
717-
info.entrysize = sizeof(ExternalTransaction);
722+
info.entrysize = sizeof(LocalTransaction);
718723
info.hash = dtm_xid_hash_fn;
719724
info.match = dtm_xid_match_fn;
720-
external_trans = ShmemInitHash(
721-
"external_trans",
725+
local_trans = ShmemInitHash(
726+
"local_trans",
722727
DTM_HASH_SIZE, DTM_HASH_SIZE,
723728
&info,
724729
HASH_ELEM | HASH_FUNCTION | HASH_COMPARE
@@ -734,14 +739,19 @@ DtmXactCallback(XactEvent event, void *arg)
734739
XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n", getpid(), event, DtmNextXid);
735740
switch (event)
736741
{
737-
case XACT_EVENT_START:
738-
if (MyProc && MyProc->backendId != InvalidBackendId) {
739-
printf("getpid=%d, MyProc=%d, MyProc->backendId=%d\n", getpid(), MyProc->pid, MyProc->backendId);
740-
MultimasterBeginTransaction();
742+
case XACT_EVENT_START:
743+
if (MyBackendId != InvalidBackendId && MMDoReplication) {
744+
printf("getpid=%d, backendId=%d\n", getpid(), MyBackendId);
745+
MMBeginTransaction();
746+
}
747+
break;
748+
case XACT_EVENT_PRE_COMMIT:
749+
case XACT_EVENT_PARALLEL_PRE_COMMIT:
750+
if (!MMDoReplication && TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
751+
MMMarkTransAsLocal(GetCurrentTransactionIdIfAny());
741752
}
742753
break;
743-
case XACT_EVENT_COMMIT:
744-
case XACT_EVENT_ABORT:
754+
case XACT_EVENT_ABORT:
745755
if (TransactionIdIsValid(DtmNextXid))
746756
{
747757
if (event == XACT_EVENT_COMMIT)
@@ -862,7 +872,7 @@ _PG_init(void)
862872
"multimaster.conn_strings",
863873
"Multimaster node connection strings separated by commas, i.e. 'replication=database dbname=postgres host=localhost port=5001,replication=database dbname=postgres host=localhost port=5002'",
864874
NULL,
865-
&MultimasterConnStrs,
875+
&MMConnStrs,
866876
"",
867877
PGC_POSTMASTER, // context
868878
0, // flags,
@@ -875,7 +885,7 @@ _PG_init(void)
875885
"multimaster.node_id",
876886
"Multimaster node ID",
877887
NULL,
878-
&MultimasterNodeId,
888+
&MMNodeId,
879889
1,
880890
1,
881891
INT_MAX,
@@ -886,7 +896,7 @@ _PG_init(void)
886896
NULL
887897
);
888898

889-
MultimasterNodes = LogicalReplicationStartReceivers(MultimasterConnStrs, MultimasterNodeId);
899+
MMNodes = MMStartReceivers(MMConnStrs, MMNodeId);
890900

891901
if (DtmBufferSize != 0)
892902
{
@@ -924,10 +934,10 @@ static void DtmShmemStartup(void)
924934
* ***************************************************************************
925935
*/
926936

927-
void MultimasterBeginTransaction(void)
937+
void MMBeginTransaction(void)
928938
{
929939
if (TransactionIdIsValid(DtmNextXid))
930-
elog(ERROR, "MultimasterBeginTransaction should be called only once for global transaction");
940+
elog(ERROR, "MMBeginTransaction should be called only once for global transaction");
931941
if (dtm == NULL)
932942
elog(ERROR, "DTM is not properly initialized, please check that pg_dtm plugin was added to shared_preload_libraries list in postgresql.conf");
933943
DtmNextXid = DtmGlobalStartTransaction(&DtmSnapshot, &dtm->minXid);
@@ -939,10 +949,8 @@ void MultimasterBeginTransaction(void)
939949
DtmLastSnapshot = NULL;
940950
}
941951

942-
void MultimasterJoinTransaction(TransactionId xid)
952+
void MMJoinTransaction(TransactionId xid)
943953
{
944-
ExternalTransaction* et;
945-
946954
if (TransactionIdIsValid(DtmNextXid))
947955
elog(ERROR, "dtm_begin/join_transaction should be called only once for global transaction");
948956
DtmNextXid = xid;
@@ -955,28 +963,52 @@ void MultimasterJoinTransaction(TransactionId xid)
955963
DtmHasGlobalSnapshot = true;
956964
DtmLastSnapshot = NULL;
957965

966+
MMMarkTransAsLocal(DtmNextXid);
967+
}
968+
969+
970+
void MMMarkTransAsLocal(TransactionId xid)
971+
{
972+
LocalTransaction* lt;
973+
974+
Assert(TransactionIdIsValid(xid));
958975
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
959-
et = hash_search(external_trans, &DtmNextXid, HASH_ENTER, NULL);
960-
et->count = dtm->nNodes-1;
976+
lt = hash_search(local_trans, &xid, HASH_ENTER, NULL);
977+
lt->count = dtm->nNodes-1;
961978
LWLockRelease(dtm->hashLock);
962979
}
963980

964-
bool MultimasterIsExternalTransaction(TransactionId xid)
981+
bool MMIsLocalTransaction(TransactionId xid)
965982
{
966-
ExternalTransaction* et;
983+
LocalTransaction* lt;
967984
bool result = false;
968985
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
969-
et = hash_search(external_trans, &xid, HASH_FIND, NULL);
970-
if (et != NULL) {
986+
lt = hash_search(local_trans, &xid, HASH_FIND, NULL);
987+
if (lt != NULL) {
971988
result = true;
972-
if (--et->count == 0) {
973-
hash_search(external_trans, &xid, HASH_REMOVE, NULL);
989+
if (--lt->count == 0) {
990+
hash_search(local_trans, &xid, HASH_REMOVE, NULL);
974991
}
975992
}
976993
LWLockRelease(dtm->hashLock);
977994
return result;
978995
}
979996

997+
Datum
998+
mm_start_replication(PG_FUNCTION_ARGS)
999+
{
1000+
MMDoReplication = true;
1001+
PG_RETURN_VOID();
1002+
}
1003+
1004+
Datum
1005+
mm_stop_replication(PG_FUNCTION_ARGS)
1006+
{
1007+
MMDoReplication = false;
1008+
PG_RETURN_VOID();
1009+
}
1010+
1011+
9801012

9811013

9821014
void DtmBackgroundWorker(Datum arg)

contrib/multimaster/multimaster.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
#ifndef __MULTIMASTER_H__
22
#define __MULTIMASTER_H__
33

4-
extern int LogicalReplicationStartReceivers(char* nodes, int node_id);
5-
extern void MultimasterBeginTransaction(void);
6-
extern void MultimasterJoinTransaction(TransactionId xid);
7-
extern bool MultimasterIsExternalTransaction(TransactionId xid);
4+
extern int MMStartReceivers(char* nodes, int node_id);
5+
extern void MMBeginTransaction(void);
6+
extern void MMJoinTransaction(TransactionId xid);
7+
extern bool MMIsLocalTransaction(TransactionId xid);
88

99
extern bool isBackgroundWorker;
1010

contrib/multimaster/receiver_raw.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -402,7 +402,7 @@ receiver_raw_main(Datum main_arg)
402402
int rc = sscanf(stmt + 6, "%u", &xid);
403403
Assert(rc == 1);
404404
Assert(!insideTrans);
405-
MultimasterJoinTransaction(xid);
405+
MMJoinTransaction(xid);
406406
insideTrans = true;
407407
} else if (strncmp(stmt, "COMMIT;", 7) == 0) {
408408
Assert(insideTrans);
@@ -526,7 +526,7 @@ receiver_raw_main(Datum main_arg)
526526
}
527527

528528

529-
int LogicalReplicationStartReceivers(char* conn_strs, int node_id)
529+
int MMStartReceivers(char* conn_strs, int node_id)
530530
{
531531
int i = 0;
532532
BackgroundWorker worker;

contrib/multimaster/sockhub/sockhub.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -255,11 +255,11 @@ static void die(int sig) {
255255
void ShubLoop(Shub* shub)
256256
{
257257
int buffer_size = shub->params->buffer_size;
258+
sigset_t sset;
258259
signal(SIGINT, die);
259260
signal(SIGQUIT, die);
260261
signal(SIGTERM, die);
261-
// signal(SIGHUP, die);
262-
sigset_t sset;
262+
/* signal(SIGHUP, die); */
263263
sigfillset(&sset);
264264
sigprocmask(SIG_UNBLOCK, &sset, NULL);
265265

0 commit comments

Comments
 (0)