Skip to content

Commit 6296fba

Browse files
committed
fix pglogical protocol between decoder and receiver
1 parent f682eef commit 6296fba

File tree

2 files changed

+25
-98
lines changed

2 files changed

+25
-98
lines changed

contrib/multimaster/pglogical_proto.c

Lines changed: 5 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,7 @@ pglogical_write_insert(StringInfo out, PGLogicalOutputData *data,
131131
{
132132
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
133133
if (!mm->isLocal) {
134-
uint8 flags = 0;
135-
136134
pq_sendbyte(out, 'I'); /* action INSERT */
137-
138-
/* send the flags field */
139-
pq_sendbyte(out, flags);
140-
141-
/* use Oid as relation identifier */
142-
pq_sendint(out, RelationGetRelid(rel), 4);
143-
144-
pq_sendbyte(out, 'N'); /* new tuple follows */
145135
pglogical_write_tuple(out, data, rel, newtuple);
146136
}
147137
}
@@ -155,15 +145,7 @@ pglogical_write_update(StringInfo out, PGLogicalOutputData *data,
155145
{
156146
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
157147
if (!mm->isLocal) {
158-
uint8 flags = 0;
159148
pq_sendbyte(out, 'U'); /* action UPDATE */
160-
161-
/* send the flags field */
162-
pq_sendbyte(out, flags);
163-
164-
/* use Oid as relation identifier */
165-
pq_sendint(out, RelationGetRelid(rel), 4);
166-
167149
/* FIXME support whole tuple (O tuple type) */
168150
if (oldtuple != NULL)
169151
{
@@ -184,18 +166,7 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
184166
{
185167
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
186168
if (!mm->isLocal) {
187-
uint8 flags = 0;
188-
189169
pq_sendbyte(out, 'D'); /* action DELETE */
190-
191-
/* send the flags field */
192-
pq_sendbyte(out, flags);
193-
194-
/* use Oid as relation identifier */
195-
pq_sendint(out, RelationGetRelid(rel), 4);
196-
197-
/* FIXME support whole tuple (O tuple type) */
198-
pq_sendbyte(out, 'K'); /* old key follows */
199170
pglogical_write_tuple(out, data, rel, oldtuple);
200171
}
201172
}
@@ -207,20 +178,6 @@ pglogical_write_delete(StringInfo out, PGLogicalOutputData *data,
207178
static void
208179
write_startup_message(StringInfo out, List *msg)
209180
{
210-
#if 0
211-
ListCell *lc;
212-
213-
pq_sendbyte(out, 'S'); /* message type field */
214-
pq_sendbyte(out, 1); /* startup message version */
215-
foreach (lc, msg)
216-
{
217-
DefElem *param = (DefElem*)lfirst(lc);
218-
Assert(IsA(param->arg, String) && strVal(param->arg) != NULL);
219-
/* null-terminated key and value pairs, in client_encoding */
220-
pq_sendstring(out, param->defname);
221-
pq_sendstring(out, strVal(param->arg));
222-
}
223-
#endif
224181
}
225182

226183
/*
@@ -289,11 +246,10 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
289246
transfer_type = decide_datum_transfer(att, typclass,
290247
data->allow_internal_basetypes,
291248
data->allow_binary_basetypes);
292-
249+
pq_sendbyte(out, transfer_type);
293250
switch (transfer_type)
294251
{
295-
case 'i':
296-
pq_sendbyte(out, 'i'); /* internal-format binary data follows */
252+
case 'b': /* internal-format binary data follows */
297253

298254
/* pass by value */
299255
if (att->attbyval)
@@ -338,13 +294,11 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
338294

339295
break;
340296

341-
case 'b':
297+
case 's': /* binary send/recv data follows */
342298
{
343299
bytea *outputbytes;
344300
int len;
345301

346-
pq_sendbyte(out, 'b'); /* binary send/recv data follows */
347-
348302
outputbytes = OidSendFunctionCall(typclass->typsend,
349303
values[i]);
350304

@@ -360,8 +314,6 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
360314
char *outputstr;
361315
int len;
362316

363-
pq_sendbyte(out, 't'); /* 'text' data follows */
364-
365317
outputstr = OidOutputFunctionCall(typclass->typoutput,
366318
values[i]);
367319
len = strlen(outputstr) + 1;
@@ -391,7 +343,7 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
391343
att->atttypid < FirstNormalObjectId &&
392344
typclass->typelem == InvalidOid)
393345
{
394-
return 'i';
346+
return 'b';
395347
}
396348
/*
397349
* Use send/recv, if allowed, if the type is plain or builtin.
@@ -404,7 +356,7 @@ decide_datum_transfer(Form_pg_attribute att, Form_pg_type typclass,
404356
(att->atttypid < FirstNormalObjectId || typclass->typtype != 'c') &&
405357
(att->atttypid < FirstNormalObjectId || typclass->typelem == InvalidOid))
406358
{
407-
return 'b';
359+
return 's';
408360
}
409361

410362
return 't';

contrib/multimaster/pglogical_receiver.c

Lines changed: 20 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
6666

6767
static void process_remote_begin(StringInfo s);
6868
static void process_remote_commit(StringInfo s);
69-
static void process_remote_insert(StringInfo s);
70-
static void process_remote_update(StringInfo s);
71-
static void process_remote_delete(StringInfo s);
69+
static void process_remote_insert(StringInfo s, Relation rel);
70+
static void process_remote_update(StringInfo s, Relation rel);
71+
static void process_remote_delete(StringInfo s, Relation rel);
7272

7373
/*
7474
* Search the index 'idxrel' for a tuple identified by 'skey' in 'rel'.
@@ -346,7 +346,7 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
346346
memset(tup->isnull, 1, sizeof(tup->isnull));
347347
memset(tup->changed, 1, sizeof(tup->changed));
348348

349-
rnatts = pq_getmsgint(s, 4);
349+
rnatts = pq_getmsgint(s, 2);
350350

351351
if (desc->natts != rnatts)
352352
elog(ERROR, "tuple natts mismatch, %u vs %u", desc->natts, rnatts);
@@ -370,7 +370,6 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
370370
tup->isnull[i] = true;
371371
tup->changed[i] = false;
372372
tup->values[i] = 0xdeadbeef; /* make bad usage more obvious */
373-
374373
break;
375374

376375
case 'b': /* binary format */
@@ -463,25 +462,16 @@ process_remote_commit(StringInfo s)
463462
}
464463

465464
static void
466-
process_remote_insert(StringInfo s)
465+
process_remote_insert(StringInfo s, Relation rel)
467466
{
468-
char action;
469467
EState *estate;
470468
TupleData new_tuple;
471469
TupleTableSlot *newslot;
472470
TupleTableSlot *oldslot;
473-
Relation rel;
474471
ResultRelInfo *relinfo;
475472
ScanKey *index_keys;
476473
int i;
477474

478-
rel = read_rel(s, RowExclusiveLock);
479-
480-
action = pq_getmsgbyte(s);
481-
if (action != 'N')
482-
elog(ERROR, "expected new tuple but got %d",
483-
action);
484-
485475
estate = create_rel_estate(rel);
486476
newslot = ExecInitExtraTupleSlot(estate);
487477
oldslot = ExecInitExtraTupleSlot(estate);
@@ -565,7 +555,7 @@ process_remote_insert(StringInfo s)
565555
}
566556

567557
static void
568-
process_remote_update(StringInfo s)
558+
process_remote_update(StringInfo s, Relation rel)
569559
{
570560
char action;
571561
EState *estate;
@@ -576,13 +566,10 @@ process_remote_update(StringInfo s)
576566
TupleData old_tuple;
577567
TupleData new_tuple;
578568
Oid idxoid;
579-
Relation rel;
580569
Relation idxrel;
581570
ScanKeyData skey[INDEX_MAX_KEYS];
582571
HeapTuple remote_tuple = NULL;
583572

584-
rel = read_rel(s, RowExclusiveLock);
585-
586573
action = pq_getmsgbyte(s);
587574

588575
/* old key present, identifying key changed */
@@ -690,32 +677,16 @@ process_remote_update(StringInfo s)
690677
}
691678

692679
static void
693-
process_remote_delete(StringInfo s)
680+
process_remote_delete(StringInfo s, Relation rel)
694681
{
695-
char action;
696682
EState *estate;
697683
TupleData oldtup;
698684
TupleTableSlot *oldslot;
699685
Oid idxoid;
700-
Relation rel;
701686
Relation idxrel;
702687
ScanKeyData skey[INDEX_MAX_KEYS];
703688
bool found_old;
704689

705-
rel = read_rel(s, RowExclusiveLock);
706-
707-
action = pq_getmsgbyte(s);
708-
709-
if (action != 'K' && action != 'E')
710-
elog(ERROR, "expected action K or E got %c", action);
711-
712-
if (action == 'E')
713-
{
714-
elog(WARNING, "got delete without pkey");
715-
heap_close(rel, NoLock);
716-
return;
717-
}
718-
719690
estate = create_rel_estate(rel);
720691
oldslot = ExecInitExtraTupleSlot(estate);
721692
ExecSetSlotDescriptor(oldslot, RelationGetDescr(rel));
@@ -784,6 +755,7 @@ process_remote_delete(StringInfo s)
784755
void MMExecutor(int id, void* work, size_t size)
785756
{
786757
StringInfoData s;
758+
Relation rel = NULL;
787759
initStringInfo(&s);
788760
s.data = work;
789761
s.len = size;
@@ -796,26 +768,29 @@ void MMExecutor(int id, void* work, size_t size)
796768

797769
switch (action) {
798770
/* BEGIN */
799-
case 'B':
771+
case 'B':
800772
process_remote_begin(&s);
801773
continue;
802774
/* COMMIT */
803-
case 'C':
775+
case 'C':
804776
process_remote_commit(&s);
805777
break;
806778
/* INSERT */
807-
case 'I':
808-
process_remote_insert(&s);
779+
case 'I':
780+
process_remote_insert(&s, rel);
809781
continue;
810782
/* UPDATE */
811-
case 'U':
812-
process_remote_update(&s);
783+
case 'U':
784+
process_remote_update(&s, rel);
813785
continue;
814786
/* DELETE */
815-
case 'D':
816-
process_remote_delete(&s);
787+
case 'D':
788+
process_remote_delete(&s, rel);
789+
continue;
790+
case 'R':
791+
rel = read_rel(&s, RowExclusiveLock);
817792
continue;
818-
default:
793+
default:
819794
elog(ERROR, "unknown action of type %c", action);
820795
}
821796
break;

0 commit comments

Comments
 (0)