Skip to content

Commit ec05e8b

Browse files
committed
Multimaster update
1 parent 5b3fc9c commit ec05e8b

File tree

6 files changed

+30
-51
lines changed

6 files changed

+30
-51
lines changed

contrib/multimaster/multimaster.c

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ typedef struct
7474
int count;
7575
} LocalTransaction;
7676

77-
#define DTM_SHMEM_SIZE (1024*1024)
77+
#define DTM_SHMEM_SIZE (64*1024*1024)
7878
#define DTM_HASH_SIZE 1003
7979

8080
void _PG_init(void);
@@ -665,20 +665,21 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
665665
hash_search(xid_in_doubt, &DtmNextXid, HASH_ENTER, NULL);
666666
LWLockRelease(dtm->hashLock);
667667
if (DtmGlobalSetTransStatus(xid, status, true) != status) {
668+
XTM_INFO("Commit of transaction %d is rejected by arbiter\n", xid);
668669
DtmNextXid = InvalidTransactionId;
669670
DtmLastSnapshot = NULL;
670671
MMIsDistributedTrans = false;
671672
MarkAsAborted();
672673
END_CRIT_SECTION();
673-
elog(ERROR, "Transaction commit rejected by XTM");
674+
elog(ERROR, "Commit of transaction %d is rejected by DTM", xid);
674675
} else {
675676
XTM_INFO("Commit transaction %d\n", xid);
676677
}
677678
}
678679
}
679680
else
680681
{
681-
XTM_INFO("Set transaction %u status in local CLOG" , xid);
682+
XTM_INFO("Set transaction %u status in local CLOG\n" , xid);
682683
}
683684
}
684685
else if (status != TRANSACTION_STATUS_ABORTED)
@@ -765,11 +766,14 @@ DtmXactCallback(XactEvent event, void *arg)
765766
break;
766767
case XACT_EVENT_PRE_COMMIT:
767768
case XACT_EVENT_PARALLEL_PRE_COMMIT:
768-
if (!MMIsDistributedTrans && TransactionIdIsValid(DtmNextXid)) {
769-
XTM_INFO("%d: Will ignore transaction %u\n", getpid(), DtmNextXid);
770-
MMMarkTransAsLocal(DtmNextXid);
769+
{
770+
TransactionId xid = GetCurrentTransactionIdIfAny();
771+
if (!MMIsDistributedTrans && TransactionIdIsValid(xid)) {
772+
XTM_INFO("%d: Will ignore transaction %u\n", getpid(), xid);
773+
MMMarkTransAsLocal(xid);
771774
}
772775
break;
776+
}
773777
case XACT_EVENT_COMMIT:
774778
case XACT_EVENT_ABORT:
775779
if (TransactionIdIsValid(DtmNextXid))
@@ -1137,8 +1141,10 @@ bool DtmDetectGlobalDeadLock(PGPROC* proc)
11371141
XTM_INFO("%d: wait graph end\n", getpid());
11381142
hasDeadlock = DtmGlobalDetectDeadLock(PostPortNumber, pgxact->xid, buf.data, buf.used);
11391143
ByteBufferFree(&buf);
1140-
XTM_INFO("%d: deadlock detected for %u\n", getpid(), pgxact->xid);
1141-
elog(WARNING, "Deadlock detected for transaction %u", pgxact->xid);
1144+
XTM_INFO("%d: deadlock %sdetected for transaction %u\n", getpid(), hasDeadlock ? "": "not ", pgxact->xid);
1145+
if (hasDeadlock) {
1146+
elog(WARNING, "Deadlock detected for transaction %u", pgxact->xid);
1147+
}
11421148
}
11431149
return hasDeadlock;
11441150
}

contrib/multimaster/multimaster.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
#define __MULTIMASTER_H__
33

44
#define XTM_TRACE(fmt, ...)
5-
//#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
6-
#define XTM_INFO(fmt, ...)
5+
#define XTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
6+
//#define XTM_INFO(fmt, ...)
77

88
extern int MMStartReceivers(char* nodes, int node_id);
99
extern void MMBeginTransaction(void);

contrib/multimaster/receiver_raw.c

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -93,17 +93,6 @@ sendFeedback(PGconn *conn, int64 now)
9393
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
9494
int len = 0;
9595

96-
ereport(LOG, (errmsg("%s: confirming write up to %X/%X, "
97-
"flush to %X/%X (slot custom_slot), "
98-
"applied to %X/%X",
99-
worker_proc,
100-
(uint32) (output_written_lsn >> 32),
101-
(uint32) output_written_lsn,
102-
(uint32) (output_fsync_lsn >> 32),
103-
(uint32) output_fsync_lsn,
104-
(uint32) (output_applied_lsn >> 32),
105-
(uint32) output_applied_lsn)));
106-
10796
replybuf[len] = 'r';
10897
len += 1;
10998
fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
@@ -315,7 +304,7 @@ receiver_raw_main(Datum main_arg)
315304
*/
316305
while (true)
317306
{
318-
XLogRecPtr walEnd, walStart;
307+
XLogRecPtr walEnd;
319308
char* stmt;
320309

321310
rc = PQgetCopyData(conn, &copybuf, 1);
@@ -345,11 +334,6 @@ receiver_raw_main(Datum main_arg)
345334
* considered as sent to this receiver.
346335
*/
347336
walEnd = fe_recvint64(&copybuf[pos]);
348-
ereport(LOG, (errmsg("%s: keepalive message from server, "
349-
"walEnd %X/%X, ",
350-
worker_proc,
351-
(uint32) (walEnd >> 32),
352-
(uint32) walEnd)));
353337
pos += 8; /* read walEnd */
354338
pos += 8; /* skip sendTime */
355339
if (rc < pos + 1)
@@ -389,7 +373,7 @@ receiver_raw_main(Datum main_arg)
389373

390374
/* Now fetch the data */
391375
hdr_len = 1; /* msgtype 'w' */
392-
walStart = fe_recvint64(&copybuf[hdr_len]);
376+
fe_recvint64(&copybuf[hdr_len]);
393377
hdr_len += 8; /* dataStart */
394378
walEnd = fe_recvint64(&copybuf[hdr_len]);
395379
hdr_len += 8; /* WALEnd */
@@ -401,15 +385,6 @@ receiver_raw_main(Datum main_arg)
401385
proc_exit(1);
402386
}
403387

404-
/* Log some useful information */
405-
ereport(LOG, (errmsg("%s: received from server, walStart %X/%X, "
406-
"and walEnd %X/%X",
407-
worker_proc,
408-
(uint32) (walStart >> 32),
409-
(uint32) walStart,
410-
(uint32) (walEnd >> 32),
411-
(uint32) walEnd)));
412-
413388
/* Apply change to database */
414389
stmt = copybuf + hdr_len;
415390
pgstat_report_activity(STATE_RUNNING, stmt);
@@ -427,6 +402,7 @@ receiver_raw_main(Datum main_arg)
427402
PushActiveSnapshot(GetTransactionSnapshot());
428403
insideTrans = true;
429404
rollbackTransaction = false;
405+
XTM_INFO("%s: Receive transaction %u\n", worker_proc, xid);
430406
} else if (strncmp(stmt, "COMMIT;", 7) == 0) {
431407
Assert(insideTrans);
432408
insideTrans = false;
@@ -438,11 +414,15 @@ receiver_raw_main(Datum main_arg)
438414
} else {
439415
PG_TRY();
440416
{
417+
XTM_INFO("%s: Start commit of transaction %u\n", worker_proc, xid);
441418
CommitTransactionCommand();
419+
XTM_INFO("%s: Complete commit of transaction %u\n", worker_proc, xid);
442420
}
443421
PG_CATCH();
444422
{
423+
FlushErrorState();
445424
elog(WARNING, "%s: Commit of transaction %u is failed", worker_proc, xid);
425+
AbortCurrentTransaction();
446426
}
447427
PG_END_TRY();
448428
}
@@ -452,21 +432,14 @@ receiver_raw_main(Datum main_arg)
452432
PG_TRY();
453433
{
454434
rc = SPI_execute(stmt, false, 0);
455-
if (rc == SPI_OK_INSERT)
456-
ereport(LOG, (errmsg("%s: INSERT received correctly: %s",
457-
worker_proc, stmt)));
458-
else if (rc == SPI_OK_UPDATE)
459-
ereport(LOG, (errmsg("%s: UPDATE received correctly: %s",
460-
worker_proc, stmt)));
461-
else if (rc == SPI_OK_DELETE)
462-
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
463-
worker_proc, stmt)));
464-
else
465-
ereport(WARNING, (errmsg("%s: Error when applying change: %s",
435+
if (rc != SPI_OK_INSERT && rc != SPI_OK_UPDATE && rc != SPI_OK_DELETE) {
436+
ereport(LOG, (errmsg("%s: Error when applying change: %s",
466437
worker_proc, stmt)));
438+
}
467439
}
468440
PG_CATCH();
469441
{
442+
FlushErrorState();
470443
elog(WARNING, "%s: %s failed in transaction %u", worker_proc, stmt, xid);
471444
rollbackTransaction = true;
472445
}

contrib/multimaster/tests/dtmbench

49.8 KB
Binary file not shown.

contrib/multimaster/tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ int main (int argc, char* argv[])
253253
cfg.nReaders,
254254
cfg.nWriters,
255255
nAborts,
256-
(int)(nAborts*100/cfg.nIterations),
256+
(int)(nAborts*100/nWrites),
257257
cfg.nAccounts,
258258
cfg.nIterations,
259259
cfg.connections.size()

src/backend/access/transam/xact.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5611,6 +5611,6 @@ xact_redo(XLogReaderState *record)
56115611

56125612
void MarkAsAborted()
56135613
{
5614-
CurrentTransactionState->state = TRANS_ABORT;
5615-
CurrentTransactionState->blockState = TBLOCK_ABORT_PENDING;
5614+
CurrentTransactionState->state = TRANS_INPROGRESS;
5615+
CurrentTransactionState->blockState = TBLOCK_STARTED;
56165616
}

0 commit comments

Comments
 (0)