Skip to content

Commit 5c4854e

Browse files
committed
Debug pglogical_output
1 parent 00e0519 commit 5c4854e

File tree

6 files changed

+74
-61
lines changed

6 files changed

+74
-61
lines changed

contrib/multimaster/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
MODULE_big = multimaster
2-
OBJS = multimaster.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o
2+
OBJS = multimaster.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o pglogical_output.o pglogical_proto.o pglogical_receiver.o pglogical_apply.o pglogical_hooks.o pglogical_config.o
33
#OBJS = multimaster.o pglogical_receiver.o decoder_raw.o libdtm.o bytebuf.o bgwpool.o sockhub/sockhub.o
44
EXTENSION = multimaster
55
DATA = multimaster--1.0.sql

contrib/multimaster/pglogical_apply.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -751,6 +751,7 @@ process_remote_delete(StringInfo s, Relation rel)
751751
CommandCounterIncrement();
752752
}
753753

754+
static MemoryContext ApplyContext;
754755

755756
void MMExecutor(int id, void* work, size_t size)
756757
{
@@ -761,6 +762,15 @@ void MMExecutor(int id, void* work, size_t size)
761762
s.len = size;
762763
s.maxlen = -1;
763764

765+
if (ApplyContext == NULL) {
766+
ApplyContext = AllocSetContextCreate(TopMemoryContext,
767+
"MessageContext",
768+
ALLOCSET_DEFAULT_MINSIZE,
769+
ALLOCSET_DEFAULT_INITSIZE,
770+
ALLOCSET_DEFAULT_MAXSIZE);
771+
}
772+
MemoryContextSwitchTo(ApplyContext);
773+
764774
PG_TRY();
765775
{
766776
while (true) {
@@ -802,5 +812,7 @@ void MMExecutor(int id, void* work, size_t size)
802812
AbortCurrentTransaction();
803813
}
804814
PG_END_TRY();
815+
816+
MemoryContextResetAndDeleteChildren(ApplyContext);
805817
}
806818

contrib/multimaster/pglogical_output.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
433433
if (data->api->write_rel)
434434
{
435435
OutputPluginPrepareWrite(ctx, false);
436-
data->api->write_rel(ctx->out, relation);
436+
data->api->write_rel(ctx->out, data, relation);
437437
OutputPluginWrite(ctx, false);
438438
}
439439

contrib/multimaster/pglogical_proto.c

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ typedef struct PGLogicalProtoMM
4141
bool isLocal;
4242
} PGLogicalProtoMM;
4343

44-
static void pglogical_write_rel(StringInfo out, Relation rel);
44+
static void pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel);
4545

4646
static void pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
4747
ReorderBufferTXN *txn);
@@ -67,29 +67,32 @@ static char decide_datum_transfer(Form_pg_attribute att,
6767
* Write relation description to the output stream.
6868
*/
6969
static void
70-
pglogical_write_rel(StringInfo out, Relation rel)
70+
pglogical_write_rel(StringInfo out, PGLogicalOutputData *data, Relation rel)
7171
{
72-
const char *nspname;
73-
uint8 nspnamelen;
74-
const char *relname;
75-
uint8 relnamelen;
76-
77-
pq_sendbyte(out, 'R'); /* sending RELATION */
78-
79-
nspname = get_namespace_name(rel->rd_rel->relnamespace);
80-
if (nspname == NULL)
81-
elog(ERROR, "cache lookup failed for namespace %u",
82-
rel->rd_rel->relnamespace);
83-
nspnamelen = strlen(nspname) + 1;
84-
85-
relname = NameStr(rel->rd_rel->relname);
86-
relnamelen = strlen(relname) + 1;
87-
88-
pq_sendbyte(out, nspnamelen); /* schema name length */
89-
pq_sendbytes(out, nspname, nspnamelen);
90-
91-
pq_sendbyte(out, relnamelen); /* table name length */
92-
pq_sendbytes(out, relname, relnamelen);
72+
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
73+
if (!mm->isLocal) {
74+
const char *nspname;
75+
uint8 nspnamelen;
76+
const char *relname;
77+
uint8 relnamelen;
78+
79+
pq_sendbyte(out, 'R'); /* sending RELATION */
80+
81+
nspname = get_namespace_name(rel->rd_rel->relnamespace);
82+
if (nspname == NULL)
83+
elog(ERROR, "cache lookup failed for namespace %u",
84+
rel->rd_rel->relnamespace);
85+
nspnamelen = strlen(nspname) + 1;
86+
87+
relname = NameStr(rel->rd_rel->relname);
88+
relnamelen = strlen(relname) + 1;
89+
90+
pq_sendbyte(out, nspnamelen); /* schema name length */
91+
pq_sendbytes(out, nspname, nspnamelen);
92+
93+
pq_sendbyte(out, relnamelen); /* table name length */
94+
pq_sendbytes(out, relname, relnamelen);
95+
}
9396
}
9497

9598
/*

contrib/multimaster/pglogical_proto.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#ifndef PG_LOGICAL_PROTO_H
1414
#define PG_LOGICAL_PROTO_H
1515

16-
typedef void (*pglogical_write_rel_fn)(StringInfo out, Relation rel);
16+
typedef void (*pglogical_write_rel_fn)(StringInfo out, PGLogicalOutputData *data, Relation rel);
1717

1818
typedef void (*pglogical_write_begin_fn)(StringInfo out, PGLogicalOutputData *data,
1919
ReorderBufferTXN *txn);

contrib/multimaster/pglogical_receiver.c

Lines changed: 33 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ pglogical_receiver_main(Datum main_arg)
244244
resetPQExpBuffer(query);
245245

246246
/* Start logical replication at specified position */
247-
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL 0/0 ",
247+
appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL 0/0 (\"startup_params_format\" '1', \"max_proto_version\" '1', \"min_proto_version\" '1')",
248248
args->receiver_slot);
249249
res = PQexec(conn, query->data);
250250
if (PQresultStatus(res) != PGRES_COPY_BOTH)
@@ -376,51 +376,49 @@ pglogical_receiver_main(Datum main_arg)
376376
walEnd = fe_recvint64(&copybuf[hdr_len]);
377377
hdr_len += 8; /* WALEnd */
378378
hdr_len += 8; /* sendTime */
379-
if (rc < hdr_len + 1)
380-
{
381-
ereport(LOG, (errmsg("%s: Streaming header too small",
382-
worker_proc)));
383-
proc_exit(1);
384-
}
385379

386-
stmt = copybuf + hdr_len;
380+
/*ereport(LOG, (errmsg("%s: receive message %c length %d", worker_proc, copybuf[hdr_len], rc - hdr_len)));*/
381+
382+
Assert(rc >= hdr_len);
383+
384+
if (rc > hdr_len)
385+
{
386+
stmt = copybuf + hdr_len;
387387

388388
#ifdef USE_PGLOGICAL_OUTPUT
389-
ByteBufferAppend(&buf, stmt, rc - hdr_len);
390-
if (stmt[0] == 'C')
391-
{
392-
MMExecute(buf.data, buf.used);
393-
ByteBufferReset(&buf);
394-
}
389+
ByteBufferAppend(&buf, stmt, rc - hdr_len);
390+
if (stmt[0] == 'C') /* commit */
391+
{
392+
MMExecute(buf.data, buf.used);
393+
ByteBufferReset(&buf);
394+
}
395395
#else
396-
if (strncmp(stmt, "BEGIN ", 6) == 0) {
397-
TransactionId xid;
398-
int rc = sscanf(stmt + 6, "%u", &xid);
399-
Assert(rc == 1);
400-
ByteBufferAppendInt32(&buf, xid);
401-
Assert(!insideTrans);
402-
insideTrans = true;
403-
} else if (strncmp(stmt, "COMMIT;", 7) == 0) {
404-
Assert(insideTrans);
405-
Assert(buf.used > 4);
406-
buf.data[buf.used-1] = '\0'; /* replace last ';' with '\0' to make string zero terminated */
407-
MMExecute(buf.data, buf.used);
408-
ByteBufferReset(&buf);
409-
insideTrans = false;
410-
} else {
411-
Assert(insideTrans);
412-
ByteBufferAppend(&buf, stmt, rc - hdr_len/*strlen(stmt)*/);
413-
}
396+
if (strncmp(stmt, "BEGIN ", 6) == 0) {
397+
TransactionId xid;
398+
int rc = sscanf(stmt + 6, "%u", &xid);
399+
Assert(rc == 1);
400+
ByteBufferAppendInt32(&buf, xid);
401+
Assert(!insideTrans);
402+
insideTrans = true;
403+
} else if (strncmp(stmt, "COMMIT;", 7) == 0) {
404+
Assert(insideTrans);
405+
Assert(buf.used > 4);
406+
buf.data[buf.used-1] = '\0'; /* replace last ';' with '\0' to make string zero terminated */
407+
MMExecute(buf.data, buf.used);
408+
ByteBufferReset(&buf);
409+
insideTrans = false;
410+
} else {
411+
Assert(insideTrans);
412+
ByteBufferAppend(&buf, stmt, rc - hdr_len/*strlen(stmt)*/);
413+
}
414414
#endif
415+
}
415416
/* Update written position */
416417
output_written_lsn = Max(walEnd, output_written_lsn);
417418
output_fsync_lsn = output_written_lsn;
418419
output_applied_lsn = output_written_lsn;
419420
}
420421

421-
/* Finish process */
422-
pgstat_report_activity(STATE_IDLE, NULL);
423-
424422
/* No data, move to next loop */
425423
if (rc == 0)
426424
{

0 commit comments

Comments
 (0)