Skip to content

Commit b0122db

Browse files
committed
Use pglogical plugin in multimaster
1 parent bcb5307 commit b0122db

File tree

13 files changed

+2194
-47
lines changed

13 files changed

+2194
-47
lines changed

contrib/multimaster/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o receiver_raw.o decoder_raw.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o
3-
2+
OBJS = multimaster.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o pglogical_output.o pglogical_proto.o pglogical_receiver.o
3+
#OBJS = multimaster.o receiver_raw.o decoder_raw.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o
44
EXTENSION = multimaster
55
DATA = multimaster--1.0.sql
66

contrib/multimaster/multimaster.c

Lines changed: 1 addition & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
#include "access/xlog.h"
2929
#include "storage/proc.h"
3030
#include "storage/procarray.h"
31-
#include "executor/spi.h"
3231
#include "executor/executor.h"
3332
#include "access/twophase.h"
3433
#include "utils/guc.h"
@@ -104,7 +103,6 @@ static void DtmShmemStartup(void);
104103
static void DtmBackgroundWorker(Datum arg);
105104

106105
static void MMMarkTransAsLocal(TransactionId xid);
107-
static void MMExecutor(int id, void* work, size_t size);
108106
static BgwPool* MMPoolConstructor(void);
109107

110108
static shmem_startup_hook_type prev_shmem_startup_hook;
@@ -1185,48 +1183,7 @@ MMExecutorFinish(QueryDesc *queryDesc)
11851183
{
11861184
standard_ExecutorFinish(queryDesc);
11871185
}
1188-
}
1189-
1190-
static void MMExecutor(int id, void* work, size_t size)
1191-
{
1192-
TransactionId xid = *(TransactionId*)work;
1193-
char* stmts = (char*)work + 4;
1194-
bool finished = false;
1195-
1196-
MMJoinTransaction(xid);
1197-
1198-
SetCurrentStatementStartTimestamp();
1199-
StartTransactionCommand();
1200-
SPI_connect();
1201-
PushActiveSnapshot(GetTransactionSnapshot());
1202-
1203-
PG_TRY();
1204-
{
1205-
int rc = SPI_execute(stmts, false, 0);
1206-
SPI_finish();
1207-
PopActiveSnapshot();
1208-
finished = true;
1209-
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE) {
1210-
ereport(LOG, (errmsg("Executor %d: failed to apply transaction %u",
1211-
id, xid)));
1212-
AbortCurrentTransaction();
1213-
} else {
1214-
CommitTransactionCommand();
1215-
}
1216-
}
1217-
PG_CATCH();
1218-
{
1219-
FlushErrorState();
1220-
if (!finished) {
1221-
SPI_finish();
1222-
if (ActiveSnapshotSet()) {
1223-
PopActiveSnapshot();
1224-
}
1225-
}
1226-
AbortCurrentTransaction();
1227-
}
1228-
PG_END_TRY();
1229-
}
1186+
}
12301187

12311188
extern void MMExecute(void* work, int size)
12321189
{

contrib/multimaster/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ extern void MMJoinTransaction(TransactionId xid);
1313
extern bool MMIsLocalTransaction(TransactionId xid);
1414
extern void MMReceiverStarted(void);
1515
extern void MMExecute(void* work, int size);
16+
extern void MMExecutor(int id, void* work, size_t size);
1617

1718
extern char* MMDatabaseName;
1819

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#ifndef PG_LOGICAL_CONFIG_H
2+
#define PG_LOGICAL_CONFIG_H
3+
4+
#ifndef PG_VERSION_NUM
5+
#error <postgres.h> must be included first
6+
#endif
7+
8+
inline static bool
9+
server_float4_byval(void)
10+
{
11+
#ifdef USE_FLOAT4_BYVAL
12+
return true;
13+
#else
14+
return false;
15+
#endif
16+
}
17+
18+
inline static bool
19+
server_float8_byval(void)
20+
{
21+
#ifdef USE_FLOAT8_BYVAL
22+
return true;
23+
#else
24+
return false;
25+
#endif
26+
}
27+
28+
inline static bool
29+
server_integer_datetimes(void)
30+
{
31+
#ifdef USE_INTEGER_DATETIMES
32+
return true;
33+
#else
34+
return false;
35+
#endif
36+
}
37+
38+
inline static bool
39+
server_bigendian(void)
40+
{
41+
#ifdef WORDS_BIGENDIAN
42+
return true;
43+
#else
44+
return false;
45+
#endif
46+
}
47+
48+
typedef struct List List;
49+
typedef struct PGLogicalOutputData PGLogicalOutputData;
50+
51+
extern int process_parameters(List *options, PGLogicalOutputData *data);
52+
53+
extern List * prepare_startup_message(PGLogicalOutputData *data);
54+
55+
#endif

contrib/multimaster/pglogical_hooks.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#ifndef PGLOGICAL_HOOKS_H
2+
#define PGLOGICAL_HOOKS_H
3+
4+
#include "replication/reorderbuffer.h"
5+
6+
/* public interface for hooks */
7+
#include "pglogical_output/hooks.h"
8+
9+
extern void load_hooks(PGLogicalOutputData *data);
10+
11+
extern void call_startup_hook(PGLogicalOutputData *data, List *plugin_params);
12+
13+
extern void call_shutdown_hook(PGLogicalOutputData *data);
14+
15+
extern bool call_row_filter_hook(PGLogicalOutputData *data,
16+
ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change);
17+
18+
extern bool call_txn_filter_hook(PGLogicalOutputData *data,
19+
RepOriginId txn_origin);
20+
21+
22+
#endif

0 commit comments

Comments
 (0)