Skip to content

Commit b98bc97

Browse files
committed
Uncompleted attempt to merge with pglogical plugin
1 parent b7d599e commit b98bc97

9 files changed

+701
-64
lines changed

contrib/mmts/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o ddd.o bkb.o
2+
OBJS = multimaster.o raftable.o arbiter.o bytebuf.o bgwpool.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o pglogical_relmetacache.o ddd.o bkb.o
33

44
override CPPFLAGS += -I../raftable
55

contrib/mmts/pglogical_apply.c

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -577,18 +577,44 @@ process_remote_commit(StringInfo in)
577577
}
578578

579579
static void
580-
process_remote_insert(StringInfo s, Relation rel)
580+
process_remote_insert(StringInfo s)
581581
{
582582
EState *estate;
583583
TupleData new_tuple;
584584
TupleTableSlot *newslot;
585585
TupleTableSlot *oldslot;
586586
ResultRelInfo *relinfo;
587587
ScanKey *index_keys;
588+
uint32 relid;
589+
uint8 flags;
590+
PGLogicalRelation *lr;
591+
Relation rel;
588592
char* relname = RelationGetRelationName(rel);
589593
int i;
590594

595+
/* read the flags */
596+
flags = pq_getmsgbyte(in);
597+
Assert(flags == 0);
598+
599+
/* read the relation id */
600+
relid = pq_getmsgint(in, 4);
601+
602+
action = pq_getmsgbyte(in);
603+
if (action != 'N')
604+
elog(ERROR, "expected new tuple but got %d",
605+
action);
606+
607+
rl = pglogical_relation_open(relid, RowExclusiveLock);
608+
rel = rl->rel;
609+
591610
estate = create_rel_estate(rel);
611+
econtext = GetPerTupleExprContext(estate);
612+
613+
PushActiveSnapshot(GetTransactionSnapshot());
614+
615+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
616+
fill_tuple_defaults(rel, econtext, &new_tup);
617+
592618
newslot = ExecInitExtraTupleSlot(estate);
593619
oldslot = ExecInitExtraTupleSlot(estate);
594620
ExecSetSlotDescriptor(newslot, RelationGetDescr(rel));
@@ -664,6 +690,7 @@ process_remote_insert(StringInfo s, Relation rel)
664690
ExecCloseIndices(estate->es_result_relation_info);
665691

666692
heap_close(rel, NoLock);
693+
pglogical_relation_close(rl, NoLock);
667694
ExecResetTupleTable(estate->es_tupleTable, true);
668695
FreeExecutorState(estate);
669696

contrib/mmts/pglogical_output.c

Lines changed: 89 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212
*/
1313
#include "postgres.h"
1414

15+
#include "pglogical_output/compat.h"
1516
#include "pglogical_config.h"
1617
#include "pglogical_output.h"
1718
#include "pglogical_proto.h"
1819
#include "pglogical_hooks.h"
20+
#include "pglogical_relmetacache.h"
1921

2022
#include "access/hash.h"
2123
#include "access/sysattr.h"
@@ -33,7 +35,9 @@
3335

3436
#include "replication/output_plugin.h"
3537
#include "replication/logical.h"
38+
#ifdef HAVE_REPLICATION_ORIGINS
3639
#include "replication/origin.h"
40+
#endif
3741

3842
#include "utils/builtins.h"
3943
#include "utils/catcache.h"
@@ -47,6 +51,8 @@
4751
#include "utils/syscache.h"
4852
#include "utils/typcache.h"
4953

54+
PG_MODULE_MAGIC;
55+
5056
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
5157

5258
/* These must be available to pg_dlsym() */
@@ -61,8 +67,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
6167
ReorderBufferTXN *txn, Relation rel,
6268
ReorderBufferChange *change);
6369

70+
#ifdef HAVE_REPLICATION_ORIGINS
6471
static bool pg_decode_origin_filter(LogicalDecodingContext *ctx,
6572
RepOriginId origin_id);
73+
#endif
6674

6775
static void send_startup_message(LogicalDecodingContext *ctx,
6876
PGLogicalOutputData *data, bool last_message);
@@ -79,7 +87,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7987
cb->begin_cb = pg_decode_begin_txn;
8088
cb->change_cb = pg_decode_change;
8189
cb->commit_cb = pg_decode_commit_txn;
90+
#ifdef HAVE_REPLICATION_ORIGINS
8291
cb->filter_by_origin_cb = pg_decode_origin_filter;
92+
#endif
8393
cb->shutdown_cb = pg_decode_shutdown;
8494
}
8595

@@ -99,42 +109,42 @@ check_binary_compatibility(PGLogicalOutputData *data)
99109
if (data->client_binary_sizeofdatum != 0
100110
&& data->client_binary_sizeofdatum != sizeof(Datum))
101111
{
102-
elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(Datum) mismatch");
112+
elog(DEBUG1, "Binary mode rejected: Server and client sizeof(Datum) mismatch");
103113
return false;
104114
}
105115

106116
if (data->client_binary_sizeofint != 0
107117
&& data->client_binary_sizeofint != sizeof(int))
108118
{
109-
elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(int) mismatch");
119+
elog(DEBUG1, "Binary mode rejected: Server and client sizeof(int) mismatch");
110120
return false;
111121
}
112122

113123
if (data->client_binary_sizeoflong != 0
114124
&& data->client_binary_sizeoflong != sizeof(long))
115125
{
116-
elog(DEBUG1, "Binary mode rejected: Server and client endian sizeof(long) mismatch");
126+
elog(DEBUG1, "Binary mode rejected: Server and client sizeof(long) mismatch");
117127
return false;
118128
}
119129

120130
if (data->client_binary_float4byval_set
121131
&& data->client_binary_float4byval != server_float4_byval())
122132
{
123-
elog(DEBUG1, "Binary mode rejected: Server and client endian float4byval mismatch");
133+
elog(DEBUG1, "Binary mode rejected: Server and client float4byval mismatch");
124134
return false;
125135
}
126136

127137
if (data->client_binary_float8byval_set
128138
&& data->client_binary_float8byval != server_float8_byval())
129139
{
130-
elog(DEBUG1, "Binary mode rejected: Server and client endian float8byval mismatch");
140+
elog(DEBUG1, "Binary mode rejected: Server and client float8byval mismatch");
131141
return false;
132142
}
133143

134144
if (data->client_binary_intdatetimes_set
135145
&& data->client_binary_intdatetimes != server_integer_datetimes())
136146
{
137-
elog(DEBUG1, "Binary mode rejected: Server and client endian integer datetimes mismatch");
147+
elog(DEBUG1, "Binary mode rejected: Server and client integer datetimes mismatch");
138148
return false;
139149
}
140150

@@ -148,7 +158,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
148158
{
149159
PGLogicalOutputData *data = palloc0(sizeof(PGLogicalOutputData));
150160

151-
data->context = AllocSetContextCreate(TopMemoryContext,
161+
data->context = AllocSetContextCreate(ctx->context,
152162
"pglogical conversion context",
153163
ALLOCSET_DEFAULT_MINSIZE,
154164
ALLOCSET_DEFAULT_INITSIZE,
@@ -202,17 +212,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
202212
errmsg("client sent startup parameters in format %d but we only support format 1",
203213
params_format)));
204214

205-
if (data->client_min_proto_version > PG_LOGICAL_PROTO_VERSION_NUM)
215+
if (data->client_min_proto_version > PGLOGICAL_PROTO_VERSION_NUM)
206216
ereport(ERROR,
207217
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
208218
errmsg("client sent min_proto_version=%d but we only support protocol %d or lower",
209-
data->client_min_proto_version, PG_LOGICAL_PROTO_VERSION_NUM)));
219+
data->client_min_proto_version, PGLOGICAL_PROTO_VERSION_NUM)));
210220

211-
if (data->client_max_proto_version < PG_LOGICAL_PROTO_MIN_VERSION_NUM)
221+
if (data->client_max_proto_version < PGLOGICAL_PROTO_MIN_VERSION_NUM)
212222
ereport(ERROR,
213223
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
214224
errmsg("client sent max_proto_version=%d but we only support protocol %d or higher",
215-
data->client_max_proto_version, PG_LOGICAL_PROTO_MIN_VERSION_NUM)));
225+
data->client_max_proto_version, PGLOGICAL_PROTO_MIN_VERSION_NUM)));
216226

217227
/*
218228
* Set correct protocol format.
@@ -308,42 +318,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
308318
}
309319

310320
/*
311-
* Will we forward changesets? We have to if we're on 9.4;
312-
* otherwise honour the client's request.
321+
* 9.4 lacks origins info so don't forward it.
322+
*
323+
* There's currently no knob for clients to use to suppress
324+
* this info and it's sent if it's supported and available.
313325
*/
314326
if (PG_VERSION_NUM/100 == 904)
315-
{
316-
/*
317-
* 9.4 unconditionally forwards changesets due to lack of
318-
* replication origins, and it can't ever send origin info
319-
* for the same reason.
320-
*/
321-
data->forward_changesets = true;
322327
data->forward_changeset_origins = false;
323-
324-
if (data->client_forward_changesets_set
325-
&& !data->client_forward_changesets)
326-
{
327-
ereport(DEBUG1,
328-
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
329-
errmsg("Cannot disable changeset forwarding on PostgreSQL 9.4")));
330-
}
331-
}
332-
else if (data->client_forward_changesets_set
333-
&& data->client_forward_changesets)
334-
{
335-
/* Client explicitly asked for forwarding; forward csets and origins */
336-
data->forward_changesets = true;
337-
data->forward_changeset_origins = true;
338-
}
339328
else
340-
{
341-
/* Default to not forwarding or honour client's request not to fwd */
342-
data->forward_changesets = false;
343-
data->forward_changeset_origins = false;
344-
}
329+
data->forward_changeset_origins = true;
345330

346-
if (data->hooks_setup_funcname != NIL || data->api->setup_hooks)
331+
if (data->hooks_setup_funcname != NIL)
347332
{
348333

349334
data->hooks_mctxt = AllocSetContextCreate(ctx->context,
@@ -355,6 +340,43 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
355340
load_hooks(data);
356341
call_startup_hook(data, ctx->output_plugin_options);
357342
}
343+
344+
if (data->client_relmeta_cache_size < -1)
345+
{
346+
ereport(ERROR,
347+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
348+
errmsg("relmeta_cache_size must be -1, 0, or positive")));
349+
}
350+
351+
/*
352+
* Relation metadata cache configuration.
353+
*
354+
* TODO: support fixed size cache
355+
*
356+
* Need a LRU for eviction, and need to implement a new message type for
357+
* cache purge notifications for clients. In the mean time force it to 0
358+
* (off). The client will be told via a startup param and must respect
359+
* that.
360+
*/
361+
if (data->client_relmeta_cache_size != 0
362+
&& data->client_relmeta_cache_size != -1)
363+
{
364+
ereport(INFO,
365+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
366+
errmsg("fixed size cache not supported, forced to off"),
367+
errdetail("only relmeta_cache_size=0 (off) or relmeta_cache_size=-1 (unlimited) supported")));
368+
369+
data->relmeta_cache_size = 0;
370+
}
371+
else
372+
{
373+
/* ack client request */
374+
data->relmeta_cache_size = data->client_relmeta_cache_size;
375+
}
376+
377+
/* if cache enabled, init it */
378+
if (data->relmeta_cache_size != 0)
379+
pglogical_init_relmetacache(ctx->context);
358380
}
359381
}
360382

@@ -370,12 +392,15 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
370392
if (!startup_message_sent)
371393
send_startup_message(ctx, data, false /* can't be last message */);
372394

395+
#ifdef HAVE_REPLICATION_ORIGINS
373396
/* If the record didn't originate locally, send origin info */
374397
send_replication_origin &= txn->origin_id != InvalidRepOriginId;
398+
#endif
375399

376400
OutputPluginPrepareWrite(ctx, !send_replication_origin);
377401
data->api->write_begin(ctx->out, data, txn);
378402

403+
#ifdef HAVE_REPLICATION_ORIGINS
379404
if (send_replication_origin)
380405
{
381406
char *origin;
@@ -397,6 +422,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
397422
replorigin_by_oid(txn->origin_id, true, &origin))
398423
data->api->write_origin(ctx->out, origin, txn->origin_lsn);
399424
}
425+
#endif
400426

401427
OutputPluginWrite(ctx, true);
402428
}
@@ -421,6 +447,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
421447
{
422448
PGLogicalOutputData *data = ctx->output_plugin_private;
423449
MemoryContext old;
450+
struct PGLRelMetaCacheEntry *cached_relmeta = NULL;
451+
424452

425453
/* First check the table filter */
426454
if (!call_row_filter_hook(data, txn, relation, change))
@@ -429,11 +457,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
429457
/* Avoid leaking memory by using and resetting our own context */
430458
old = MemoryContextSwitchTo(data->context);
431459

432-
/* TODO: add caching (send only if changed) */
433-
if (data->api->write_rel)
460+
/*
461+
* If the protocol wants to write relation information and the client
462+
* isn't known to have metadata cached for this relation already,
463+
* send relation metadata.
464+
*
465+
* TODO: track hit/miss stats
466+
*/
467+
if (data->api->write_rel != NULL &&
468+
!pglogical_cache_relmeta(data, relation, &cached_relmeta))
434469
{
435470
OutputPluginPrepareWrite(ctx, false);
436-
data->api->write_rel(ctx->out, data, relation);
471+
data->api->write_rel(ctx->out, data, relation, cached_relmeta);
437472
OutputPluginWrite(ctx, false);
438473
}
439474

@@ -477,28 +512,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
477512
MemoryContextReset(data->context);
478513
}
479514

515+
#ifdef HAVE_REPLICATION_ORIGINS
480516
/*
481517
* Decide if the whole transaction with specific origin should be filtered out.
482518
*/
483-
extern int MtmReplicationNodeId;
484-
485519
static bool
486520
pg_decode_origin_filter(LogicalDecodingContext *ctx,
487521
RepOriginId origin_id)
488522
{
489523
PGLogicalOutputData *data = ctx->output_plugin_private;
490524

491-
if (!call_txn_filter_hook(data, origin_id)) {
492-
return true;
493-
}
494-
495-
if (!data->forward_changesets && origin_id != InvalidRepOriginId) {
496-
*(int*)0 = 0;
525+
if (!call_txn_filter_hook(data, origin_id))
497526
return true;
498-
}
499527

500528
return false;
501529
}
530+
#endif
502531

503532
static void
504533
send_startup_message(LogicalDecodingContext *ctx,
@@ -532,9 +561,10 @@ static void pg_decode_shutdown(LogicalDecodingContext * ctx)
532561

533562
call_shutdown_hook(data);
534563

535-
if (data->hooks_mctxt != NULL)
536-
{
537-
MemoryContextDelete(data->hooks_mctxt);
538-
data->hooks_mctxt = NULL;
539-
}
564+
pglogical_destroy_relmetacache();
565+
566+
/*
567+
* no need to delete data->context or data->hooks_mctxt as they're children
568+
* of ctx->context which will expire on return.
569+
*/
540570
}

0 commit comments

Comments
 (0)