Skip to content

Commit 25fe3cc

Browse files
committed
Update multimaster
1 parent 33e709e commit 25fe3cc

File tree

5 files changed

+37
-27
lines changed

5 files changed

+37
-27
lines changed

contrib/multimaster/dtmd/src/main.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ static void ondisconnect(client_t client) {
143143
}
144144
} else {
145145
shout(
146-
"[%d] DISCONNECT: transaction %u not found O_o\n",
146+
"[%d] DISCONNECT: transaction %u of disconnected client is not found\n",
147147
CLIENT_ID(client), CLIENT_XID(client)
148148
);
149149
}
@@ -392,11 +392,12 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
392392
client,
393393
"VOTE: transaction failed to abort O_o"
394394
);
395+
#if 0
395396
shout(
396397
"[%d] VOTE: abort xid %u\n",
397398
CLIENT_ID(client), xid
398399
);
399-
400+
#endif
400401
notify_listeners(t, NEGATIVE);
401402
free_transaction(t);
402403
client_message_shortcut(client, RES_TRANSACTION_ABORTED);

contrib/multimaster/multimaster.c

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,6 @@ static shmem_startup_hook_type prev_shmem_startup_hook;
118118
static HTAB* xid_in_doubt;
119119
static HTAB* local_trans;
120120
static DtmState* dtm;
121-
static Snapshot CurrentTransactionSnapshot;
122121

123122
static TransactionId DtmNextXid;
124123
static SnapshotData DtmSnapshot = { HeapTupleSatisfiesMVCC };
@@ -609,8 +608,9 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
609608
{
610609
if (TransactionIdIsValid(DtmNextXid) && snapshot != &CatalogSnapshotData)
611610
{
612-
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot->curcid))
611+
if (!DtmHasGlobalSnapshot && (snapshot != DtmLastSnapshot || DtmCurcid != snapshot->curcid)) {
613612
DtmGlobalGetSnapshot(DtmNextXid, &DtmSnapshot, &dtm->minXid);
613+
}
614614
DtmCurcid = snapshot->curcid;
615615
DtmLastSnapshot = snapshot;
616616
DtmMergeWithGlobalSnapshot(snapshot);
@@ -628,7 +628,6 @@ static Snapshot DtmGetSnapshot(Snapshot snapshot)
628628
snapshot = PgGetSnapshotData(snapshot);
629629
}
630630
DtmUpdateRecentXmin(snapshot);
631-
CurrentTransactionSnapshot = snapshot;
632631
return snapshot;
633632
}
634633

@@ -651,7 +650,6 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
651650
{
652651
if (TransactionIdIsValid(DtmNextXid))
653652
{
654-
CurrentTransactionSnapshot = NULL;
655653
if (status == TRANSACTION_STATUS_ABORTED || !MMIsDistributedTrans)
656654
{
657655
PgTransactionIdSetTreeStatus(xid, nsubxids, subxids, status, lsn);
@@ -662,7 +660,7 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
662660
else
663661
{
664662
XTM_INFO("Begin commit transaction %d\n", xid);
665-
/* Mark transaction as on-doubt in xid_in_doubt hash table */
663+
/* Mark transaction as in-doubt in xid_in_doubt hash table */
666664
LWLockAcquire(dtm->hashLock, LW_EXCLUSIVE);
667665
hash_search(xid_in_doubt, &DtmNextXid, HASH_ENTER, NULL);
668666
LWLockRelease(dtm->hashLock);
@@ -673,20 +671,22 @@ static void DtmSetTransactionStatus(TransactionId xid, int nsubxids, Transaction
673671
MarkAsAborted();
674672
END_CRIT_SECTION();
675673
elog(ERROR, "Transaction commit rejected by XTM");
674+
} else {
675+
XTM_INFO("Commit transaction %d\n", xid);
676676
}
677-
XTM_INFO("Commit transaction %d\n", xid);
678677
}
679678
}
680679
else
681680
{
682681
XTM_INFO("Set transaction %u status in local CLOG" , xid);
683682
}
684683
}
685-
else
684+
else if (status != TRANSACTION_STATUS_ABORTED)
686685
{
687686
XidStatus gs;
688687
gs = DtmGlobalGetTransStatus(xid, false);
689688
if (gs != TRANSACTION_STATUS_UNKNOWN) {
689+
Assert(gs != TRANSACTION_STATUS_IN_PROGRESS);
690690
status = gs;
691691
}
692692
}
@@ -753,23 +753,21 @@ static void DtmInitialize()
753753
static void
754754
DtmXactCallback(XactEvent event, void *arg)
755755
{
756-
XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n", getpid(), event, DtmNextXid);
756+
//XTM_INFO("%d: DtmXactCallbackevent=%d nextxid=%d\n", getpid(), event, DtmNextXid);
757757
switch (event)
758758
{
759759
case XACT_EVENT_START:
760-
XTM_INFO("%d: normal=%d, initialized=%d, replication=%d, bgw=%d, vacuum=%d\n",
761-
getpid(), IsNormalProcessingMode(), dtm->initialized, MMDoReplication, IsBackgroundWorker, IsAutoVacuumWorkerProcess());
760+
//XTM_INFO("%d: normal=%d, initialized=%d, replication=%d, bgw=%d, vacuum=%d\n",
761+
// getpid(), IsNormalProcessingMode(), dtm->initialized, MMDoReplication, IsBackgroundWorker, IsAutoVacuumWorkerProcess());
762762
if (IsNormalProcessingMode() && dtm->initialized && MMDoReplication && !IsBackgroundWorker && !IsAutoVacuumWorkerProcess()) {
763763
MMBeginTransaction();
764764
}
765765
break;
766766
case XACT_EVENT_PRE_COMMIT:
767767
case XACT_EVENT_PARALLEL_PRE_COMMIT:
768-
if (!MMIsDistributedTrans && TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
769-
XTM_INFO("%d: Will ignore transaction %u\n", getpid(), GetCurrentTransactionIdIfAny());
770-
MMMarkTransAsLocal(GetCurrentTransactionIdIfAny());
771-
} else {
772-
XTM_INFO("%d: Transaction %u will be replicated\n", getpid(), GetCurrentTransactionIdIfAny());
768+
if (!MMIsDistributedTrans && TransactionIdIsValid(DtmNextXid)) {
769+
XTM_INFO("%d: Will ignore transaction %u\n", getpid(), DtmNextXid);
770+
MMMarkTransAsLocal(DtmNextXid);
773771
}
774772
break;
775773
case XACT_EVENT_COMMIT:

contrib/multimaster/receiver_raw.c

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,11 @@ receiver_raw_main(Datum main_arg)
417417
Assert(!insideTrans);
418418
SetCurrentStatementStartTimestamp();
419419
MMJoinTransaction(xid);
420+
420421
StartTransactionCommand();
422+
BeginTransactionBlock();
423+
CommitTransactionCommand();
424+
421425
SPI_connect();
422426
PushActiveSnapshot(GetTransactionSnapshot());
423427
insideTrans = true;
@@ -427,11 +431,19 @@ receiver_raw_main(Datum main_arg)
427431
insideTrans = false;
428432
SPI_finish();
429433
PopActiveSnapshot();
434+
StartTransactionCommand();
430435
if (rollbackTransaction) {
431-
AbortCurrentTransaction();
432-
} else {
436+
UserAbortTransactionBlock();
437+
}
438+
PG_TRY();
439+
{
433440
CommitTransactionCommand();
434441
}
442+
PG_CATCH();
443+
{
444+
elog(WARNING, "%s: Current transaction is aborted at receiver", worker_name);
445+
}
446+
PG_END_TRY();
435447
} else if (!rollbackTransaction) {
436448
Assert(insideTrans);
437449
/* Execute query */
@@ -448,11 +460,12 @@ receiver_raw_main(Datum main_arg)
448460
ereport(LOG, (errmsg("%s: DELETE received correctly: %s",
449461
worker_name, stmt)));
450462
else
451-
ereport(LOG, (errmsg("%s: Error when applying change: %s",
463+
ereport(WARNING, (errmsg("%s: Error when applying change: %s",
452464
worker_name, stmt)));
453465
}
454466
PG_CATCH();
455467
{
468+
elog(WARNING, "%s: %s failed at receiver", worker_name, stmt);
456469
rollbackTransaction = true;
457470
}
458471
PG_END_TRY();
@@ -595,8 +608,4 @@ int MMStartReceivers(char* conns, int node_id)
595608
worker.bgw_main_arg = (Datum)ctx;
596609
RegisterBackgroundWorker(&worker);
597610
}
598-
conn_str = p + 1;
599-
}
600-
601-
return i;
602-
}
611+
con

contrib/multimaster/tests/dtmbench

208 Bytes
Binary file not shown.

contrib/multimaster/tests/dtmbench.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ int main (int argc, char* argv[])
210210

211211
if (initialize) {
212212
initializeDatabase();
213-
printf("%d account inserted\n", cfg.nAccounts);
213+
printf("%d accounts inserted\n", cfg.nAccounts);
214214
}
215215

216216
time_t start = getCurrentTime();
@@ -246,12 +246,14 @@ int main (int argc, char* argv[])
246246

247247
printf(
248248
"{\"update_tps\":%f, \"read_tps\":%f,"
249-
" \"readers\":%d, \"writers\":%d,"
249+
" \"readers\":%d, \"writers\":%d, \"aborts\":%ld, \"abort_percent\": %d,"
250250
" \"accounts\":%d, \"iterations\":%d, \"hosts\":%ld}\n",
251251
(double)(nWrites*USEC)/elapsed,
252252
(double)(nReads*USEC)/elapsed,
253253
cfg.nReaders,
254254
cfg.nWriters,
255+
nAborts,
256+
(int)(nAborts*100/cfg.nWriters),
255257
cfg.nAccounts,
256258
cfg.nIterations,
257259
cfg.connections.size()

0 commit comments

Comments
 (0)