Skip to content

Commit d0e0458

Browse files
knizhnikkelvich
authored andcommitted
Continue work on MMTS
1 parent ba48376 commit d0e0458

File tree

9 files changed

+58
-35
lines changed

9 files changed

+58
-35
lines changed

arbiter.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ static void MtmAppendBuffer(MessageCode code, MtmBuffer* txBuffer, TransactionId
334334
MtmWriteSocket(sockets[node], buf->data, buf->used*sizeof(MtmCommitMessage));
335335
buf->used = 0;
336336
}
337-
DTM_TRACE("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
337+
MTM_TRACE("Send message %s CSN=%ld to node %d from node %d for global transaction %d/local transaction %d\n",
338338
messageText[code], ts->csn, node, MtmNodeId, ts->gtid.xid, ts->xid);
339339
buf->data[buf->used].code = code;
340340
buf->data[buf->used].dxid = xid;

decoder_raw.c

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,11 @@ decoder_raw_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
108108
Assert(lastXid != txn->xid);
109109
lastXid = txn->xid;
110110
if (MMIsLocalTransaction(txn->xid)) {
111-
XTM_INFO("Skip local transaction %u\n", txn->xid);
111+
MTM_INFO("Skip local transaction %u\n", txn->xid);
112112
data->isLocal = true;
113113
} else {
114114
OutputPluginPrepareWrite(ctx, true);
115-
XTM_INFO("Send transaction %u to replica\n", txn->xid);
115+
MTM_INFO("Send transaction %u to replica\n", txn->xid);
116116
appendStringInfo(ctx->out, "BEGIN %u;", txn->xid);
117117
OutputPluginWrite(ctx, true);
118118
data->isLocal = false;
@@ -126,12 +126,12 @@ decoder_raw_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
126126
{
127127
DecoderRawData *data = ctx->output_plugin_private;
128128
if (!data->isLocal) {
129-
XTM_INFO("Send commit of transaction %u to replica\n", txn->xid);
129+
MTM_INFO("Send commit of transaction %u to replica\n", txn->xid);
130130
OutputPluginPrepareWrite(ctx, true);
131131
appendStringInfoString(ctx->out, "COMMIT;");
132132
OutputPluginWrite(ctx, true);
133133
} else {
134-
XTM_INFO("Skip commit of transaction %u\n", txn->xid);
134+
MTM_INFO("Skip commit of transaction %u\n", txn->xid);
135135
}
136136
}
137137

@@ -483,10 +483,10 @@ decoder_raw_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
483483

484484
data = ctx->output_plugin_private;
485485
if (data->isLocal) {
486-
XTM_INFO("Skip action %d in transaction %u\n", change->action, txn->xid);
486+
MTM_INFO("Skip action %d in transaction %u\n", change->action, txn->xid);
487487
return;
488488
}
489-
XTM_INFO("Send action %d in transaction %u to replica\n", change->action, txn->xid);
489+
MTM_INFO("Send action %d in transaction %u to replica\n", change->action, txn->xid);
490490

491491
/* Avoid leaking memory by using and resetting our own context */
492492
old = MemoryContextSwitchTo(data->context);

multimaster.c

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ static char const* MtmGetName(void)
194194
Snapshot MtmGetSnapshot(Snapshot snapshot)
195195
{
196196
snapshot = PgGetSnapshotData(snapshot);
197-
RecentGlobalDataXmin = RecentGlobalXmin = MtmAdjustOldestXid(RecentGlobalDataXmin);
197+
RecentGlobalDataXmin = RecentGlobalXmin = dtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
198198
return snapshot;
199199
}
200200

@@ -499,10 +499,8 @@ void MtmSendNotificationMessage(MtmTransState* ts)
499499
ts->nextVoting = votingList;
500500
dtm->votingTransactions = ts;
501501
SpinLockRelease(&dtm->votingSpinlock);
502-
MTM_TRACE("Register commit message\n");
503502
if (votingList == NULL) {
504503
/* singal semaphore only once for the whole list */
505-
MTM_TRACE("Signal semaphore\n");
506504
PGSemaphoreUnlock(&dtm->votingSemaphore);
507505
}
508506
}
@@ -519,9 +517,11 @@ MtmCommitTransaction(TransactionId xid, int nsubxids, TransactionId *subxids)
519517
MtmTransactionListAppend(ts);
520518
MtmAddSubtransactions(ts, subxids, nsubxids);
521519

520+
MtmVoteForTransaction(ts);
521+
522522
LWLockRelease(dtm->hashLock);
523523

524-
MtmVoteForTransaction(ts);
524+
MTM_TRACE("%d: MtmCommitTransaction status=%d\n", getpid(), ts->status);
525525

526526
return ts->status == TRANSACTION_STATUS_COMMITTED;
527527
}
@@ -530,15 +530,32 @@ static void
530530
MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status)
531531
{
532532
MtmTransState* ts;
533+
MtmCurrentTrans* x = &dtmTx;
534+
bool found;
533535

534536
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
535-
ts = hash_search(xid2state, &xid, HASH_FIND, NULL);
536-
if (ts != NULL) {
537+
ts = hash_search(xid2state, &xid, HASH_ENTER, &found);
538+
if (!found) {
537539
ts->status = status;
538-
MtmAdjustSubtransactions(ts);
539-
if (dtmTx.isReplicated) {
540-
MtmSendNotificationMessage(ts);
540+
ts->csn = MtmAssignCSN();
541+
ts->procno = MyProc->pgprocno;
542+
ts->snapshot = INVALID_CSN;
543+
if (!TransactionIdIsValid(x->gtid.xid))
544+
{
545+
ts->gtid.xid = x->xid;
546+
ts->gtid.node = MtmNodeId;
547+
} else {
548+
ts->gtid = x->gtid;
541549
}
550+
MtmTransactionListAppend(ts);
551+
MtmAddSubtransactions(ts, subxids, nsubxids);
552+
}
553+
ts->status = status;
554+
MtmAdjustSubtransactions(ts);
555+
556+
if (dtmTx.isReplicated) {
557+
ts->gtid = x->gtid;
558+
MtmSendNotificationMessage(ts);
542559
}
543560
LWLockRelease(dtm->hashLock);
544561
}
@@ -548,13 +565,13 @@ MtmFinishTransaction(TransactionId xid, int nsubxids, TransactionId *subxids, Xi
548565
static void
549566
MtmSetTransactionStatus(TransactionId xid, int nsubxids, TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
550567
{
551-
MTM_TRACE("%d: MtmSetTransactionStatus %u = %u\n", getpid(), xid, status);
568+
MTM_TRACE("%d: MtmSetTransactionStatus %u(%u) = %u, isDistributed=%d\n", getpid(), xid, dtmTx.xid, status, dtmTx.isDistributed);
552569
if (xid == dtmTx.xid && dtmTx.isDistributed)
553570
{
554571
if (status == TRANSACTION_STATUS_ABORTED || !dtmTx.containsDML)
555572
{
556573
MtmFinishTransaction(xid, nsubxids, subxids, status);
557-
MTM_TRACE("Abort transaction %d\n", xid);
574+
MTM_TRACE("Abort transaction %d, status=%d, DML=%d\n", xid, status, dtmTx.containsDML);
558575
}
559576
else
560577
{
@@ -1006,10 +1023,14 @@ MtmVoteForTransaction(MtmTransState* ts)
10061023
MtmSendNotificationMessage(ts); /* send READY message to coordinator */
10071024
}
10081025

1009-
MTM_TRACE("Node %d waiting latch...\n", MtmNodeId);
1010-
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
1011-
ResetLatch(&MyProc->procLatch);
1012-
MTM_TRACE("Node %d receives response...\n", MtmNodeId);
1026+
MTM_TRACE("%d: Node %d waiting latch...\n", getpid(), MtmNodeId);
1027+
while (ts->status != TRANSACTION_STATUS_COMMITTED && ts->status != TRANSACTION_STATUS_ABORTED) {
1028+
LWLockRelease(dtm->hashLock);
1029+
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
1030+
ResetLatch(&MyProc->procLatch);
1031+
LWLockAcquire(dtm->hashLock, LW_SHARED);
1032+
}
1033+
MTM_TRACE("%d: Node %d receives response...\n", getpid(), MtmNodeId);
10131034
}
10141035

10151036
HTAB* MtmCreateHash(void)

multimaster.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
#include "bytebuf.h"
55
#include "bgwpool.h"
66

7-
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
7+
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
88
//#define MTM_TRACE(fmt, ...)
9+
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
910
#define MTM_TUPLE_TRACE(fmt, ...)
1011

1112
#define BIT_SET(mask, bit) ((mask) & ((int64)1 << (bit)))

pglogical_apply.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
#include <unistd.h>
12
#include "postgres.h"
23

34
#include "funcapi.h"
@@ -333,7 +334,8 @@ process_remote_begin(StringInfo s)
333334
SetCurrentStatementStartTimestamp();
334335
StartTransactionCommand();
335336
MtmJoinTransaction(&gtid, snapshot);
336-
fprintf(stderr, "REMOTE begin node=%d xid=%d snapshot=%ld\n", gtid.node, gtid.xid, snapshot);
337+
338+
MTM_TRACE("REMOTE begin node=%d xid=%d snapshot=%ld\n", gtid.node, gtid.xid, snapshot);
337339
}
338340

339341
static void
@@ -815,6 +817,7 @@ void MtmExecutor(int id, void* work, size_t size)
815817
PG_CATCH();
816818
{
817819
FlushErrorState();
820+
MTM_TRACE("%d: REMOTE abort transaction %d\n", getpid(), GetCurrentTransactionId());
818821
AbortCurrentTransaction();
819822
}
820823
PG_END_TRY();

pglogical_proto.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
105105
{
106106
PGLogicalProtoMM* mm = (PGLogicalProtoMM*)data->api;
107107
csn_t csn = MtmTransactionSnapshot(txn->xid);
108-
fprintf(stderr, "pglogical_write_begin %d CSN=%ld\n", txn->xid, csn);
108+
MTM_TRACE("pglogical_write_begin %d CSN=%ld\n", txn->xid, csn);
109109
if (csn == INVALID_CSN) {
110110
mm->isLocal = true;
111111
} else {

pglogical_receiver.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ pglogical_receiver_main(Datum main_arg)
216216
BackgroundWorkerUnblockSignals();
217217

218218
/* Connect to a database */
219-
BackgroundWorkerInitializeConnection(MMDatabaseName, NULL);
219+
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL);
220220

221221
/* Establish connection to remote server */
222222
conn = PQconnectdb(args->receiver_conn_string);
@@ -260,7 +260,7 @@ pglogical_receiver_main(Datum main_arg)
260260
PQclear(res);
261261
resetPQExpBuffer(query);
262262

263-
MMReceiverStarted();
263+
MtmReceiverStarted();
264264
ByteBufferAlloc(&buf);
265265

266266
while (!got_sigterm)
@@ -392,7 +392,7 @@ pglogical_receiver_main(Datum main_arg)
392392
ByteBufferAppend(&buf, stmt, rc - hdr_len);
393393
if (stmt[0] == 'C') /* commit */
394394
{
395-
MMExecute(buf.data, buf.used);
395+
MtmExecute(buf.data, buf.used);
396396
ByteBufferReset(&buf);
397397
}
398398
#else
@@ -511,7 +511,7 @@ pglogical_receiver_main(Datum main_arg)
511511
}
512512

513513

514-
int MMStartReceivers(char* conns, int node_id)
514+
int MtmStartReceivers(char* conns, int node_id)
515515
{
516516
int i = 0;
517517
BackgroundWorker worker;
@@ -530,17 +530,17 @@ int MMStartReceivers(char* conns, int node_id)
530530
}
531531
if (++i != node_id) {
532532
ReceiverArgs* ctx = (ReceiverArgs*)malloc(sizeof(ReceiverArgs));
533-
if (MMDatabaseName == NULL) {
533+
if (MtmDatabaseName == NULL) {
534534
char* dbname = strstr(conn_str, "dbname=");
535535
char* eon;
536536
int len;
537537
Assert(dbname != NULL);
538538
dbname += 7;
539539
eon = strchr(dbname, ' ');
540540
len = eon - dbname;
541-
MMDatabaseName = (char*)malloc(len + 1);
542-
memcpy(MMDatabaseName, dbname, len);
543-
MMDatabaseName[len] = '\0';
541+
MtmDatabaseName = (char*)malloc(len + 1);
542+
memcpy(MtmDatabaseName, dbname, len);
543+
MtmDatabaseName[len] = '\0';
544544
}
545545
ctx->receiver_conn_string = psprintf("replication=database %.*s", (int)(p - conn_str), conn_str);
546546
sprintf(ctx->receiver_slot, "mm_slot_%d", node_id);

tests/dtmbench

-9.36 KB
Binary file not shown.

tests/dtmbench.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,6 @@ void initializeDatabase()
184184
nontransaction txn(conn);
185185
exec(txn, "drop extension if exists multimaster");
186186
exec(txn, "create extension multimaster");
187-
txn.commit();
188187
}
189188
printf("extension created\n");
190189

@@ -193,7 +192,6 @@ void initializeDatabase()
193192
nontransaction txn(conn);
194193
exec(txn, "drop table if exists t");
195194
exec(txn, "create table t(u int primary key, v int)");
196-
txn.commit();
197195
}
198196
printf("table t created\n");
199197
printf("inserting stuff into t\n");

0 commit comments

Comments
 (0)