Skip to content

Commit 13a25d7

Browse files
committed
Rewrite handshake procedure
1 parent f6d500c commit 13a25d7

File tree

4 files changed

+12
-11
lines changed

4 files changed

+12
-11
lines changed

contrib/mmts/arbiter.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
396396
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
397397
MtmHandshakeMessage req;
398398
MtmArbiterMessage resp;
399-
int sd;
399+
int rc, sd;
400400
timestamp_t start = MtmGetSystemTime();
401401
char const* host = Mtm->nodes[node].con.hostName;
402402

@@ -476,7 +476,10 @@ static int MtmConnectSocket(int node, int port, int timeout)
476476
close(sd);
477477
goto Retry;
478478
}
479-
if (MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout) != 1 || MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
479+
while ((rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout)) == 0) {
480+
elog(WARNING, "Arbiter waiting response for handshake message from %s:%d: rc=%d, error=%d", host, port, rc, errno);
481+
}
482+
if (rc != 1 || MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
480483
elog(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: errno=%d", host, port, errno);
481484
close(sd);
482485
goto Retry;

contrib/mmts/pglogical_output.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -376,8 +376,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
376376
/* If the record didn't originate locally, send origin info */
377377
send_replication_origin &= txn->origin_id != InvalidRepOriginId;
378378

379-
OutputPluginPrepareWrite(ctx, !send_replication_origin);
380379
if (data->api) {
380+
OutputPluginPrepareWrite(ctx, !send_replication_origin);
381381
data->api->write_begin(ctx->out, data, txn);
382382

383383
if (send_replication_origin)
@@ -401,8 +401,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
401401
replorigin_by_oid(txn->origin_id, true, &origin))
402402
data->api->write_origin(ctx->out, origin, txn->origin_lsn);
403403
}
404+
OutputPluginWrite(ctx, true);
404405
}
405-
OutputPluginWrite(ctx, true);
406406
}
407407

408408
/*
@@ -414,11 +414,11 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
414414
{
415415
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
416416

417-
OutputPluginPrepareWrite(ctx, true);
418417
if (data->api) {
418+
OutputPluginPrepareWrite(ctx, true);
419419
data->api->write_commit(ctx->out, data, txn, commit_lsn);
420+
OutputPluginWrite(ctx, true);
420421
}
421-
OutputPluginWrite(ctx, true);
422422
}
423423

424424
void
@@ -522,11 +522,11 @@ send_startup_message(LogicalDecodingContext *ctx,
522522
* not.
523523
*/
524524

525-
OutputPluginPrepareWrite(ctx, last_message);
526525
if (data->api) {
526+
OutputPluginPrepareWrite(ctx, last_message);
527527
data->api->write_startup_message(ctx->out, msg);
528+
OutputPluginWrite(ctx, last_message);
528529
}
529-
OutputPluginWrite(ctx, last_message);
530530

531531
pfree(msg);
532532

contrib/mmts/tests2/docker-entrypoint.sh

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,6 @@ if [ "$1" = 'postgres' ]; then
8080
checkpoint_timeout = 30
8181
log_autovacuum_min_duration = 0
8282
83-
raftable.id = $NODE_ID
84-
raftable.peers = '$RAFT_PEERS'
85-
8683
multimaster.workers = 4
8784
multimaster.use_raftable = true
8885
multimaster.queue_size=52857600

contrib/mmts/tests2/lib/bank_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def exec_tx(self, name, tx_block):
151151
def check_total(self):
152152

153153
def tx(conn, cur):
154+
conn.commit()
154155
cur.execute('select sum(amount) from bank_test')
155156
res = cur.fetchone()
156157
total = res[0]

0 commit comments

Comments
 (0)