Skip to content

Commit eb21867

Browse files
committed
Wrap standalone statements with transaction block
2 parents 7da0e33 + 02ed44b commit eb21867

File tree

9 files changed

+49
-30
lines changed

9 files changed

+49
-30
lines changed

contrib/mmts/arbiter.c

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -243,30 +243,34 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
243243
if (!MtmResolveHostByName(host, addrs, &n_addrs)) {
244244
elog(ERROR, "Arbiter failed to resolve host '%s' by name", host);
245245
}
246-
Retry:
247-
sd = socket(AF_INET, SOCK_STREAM, 0);
248-
if (sd < 0) {
249-
elog(ERROR, "Arbiter failed to create socket: %d", errno);
250-
}
246+
247+
Retry:
248+
251249
while (1) {
252250
int rc = -1;
251+
252+
sd = socket(AF_INET, SOCK_STREAM, 0);
253+
if (sd < 0) {
254+
elog(ERROR, "Arbiter failed to create socket: %d", errno);
255+
}
253256
for (i = 0; i < n_addrs; ++i) {
254257
memcpy(&sock_inet.sin_addr, &addrs[i], sizeof sock_inet.sin_addr);
255258
do {
256259
rc = connect(sd, (struct sockaddr*)&sock_inet, sizeof(sock_inet));
257260
} while (rc < 0 && errno == EINTR);
258-
261+
259262
if (rc >= 0 || errno == EINPROGRESS) {
260263
break;
261264
}
262265
}
263266
if (rc < 0) {
264267
if ((errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS) || max_attempts == 0) {
265-
elog(WARNING, "Arbiter failed to connect to %s:%d: %d", host, port, errno);
268+
elog(WARNING, "Arbiter failed to connect to %s:%d: error=%d", host, port, errno);
266269
return -1;
267270
} else {
268271
max_attempts -= 1;
269-
MtmSleep(MtmConnectTimeout);
272+
elog(WARNING, "Arbiter trying to connect to %s:%d: error=%d", host, port, errno);
273+
MtmSleep(5*MtmConnectTimeout);
270274
}
271275
continue;
272276
} else {
@@ -601,7 +605,7 @@ static void MtmTransReceiver(Datum arg)
601605
} while (n < 0 && errno == EINTR);
602606
} while (n < 0 && MtmRecovery());
603607

604-
if (rc < 0) {
608+
if (n < 0) {
605609
elog(ERROR, "Arbiter failed to select sockets: %d", errno);
606610
}
607611
for (i = 0; i < nNodes; i++) {

contrib/mmts/multimaster.c

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ typedef struct {
6464
bool isReplicated; /* transaction on replica */
6565
bool isDistributed; /* transaction performed INSERT/UPDATE/DELETE and has to be replicated to other nodes */
6666
bool isPrepared; /* transaction is perpared at first stage of 2PC */
67+
bool isTransactionBlock; /* is transaction block */
6768
bool containsDML; /* transaction contains DML statements */
6869
XidStatus status; /* transaction status */
6970
csn_t snapshot; /* transaction snaphsot */
@@ -590,7 +591,7 @@ MtmXactCallback(XactEvent event, void *arg)
590591
MtmEndTransaction(&MtmTx, false);
591592
break;
592593
case XACT_EVENT_COMMIT_COMMAND:
593-
if (!IsTransactionBlock()) {
594+
if (!MtmTx.isTransactionBlock) {
594595
MtmTwoPhaseCommit(&MtmTx);
595596
}
596597
break;
@@ -629,6 +630,7 @@ MtmBeginTransaction(MtmCurrentTrans* x)
629630
x->isReplicated = false;
630631
x->isDistributed = MtmIsUserTransaction();
631632
x->isPrepared = false;
633+
x->isTransactionBlock = IsTransactionBlock();
632634
if (x->isDistributed && Mtm->status != MTM_ONLINE) {
633635
/* reject all user's transactions at offline cluster */
634636
MtmUnlock();
@@ -1930,19 +1932,20 @@ MtmGenerateGid(char* gid)
19301932

19311933
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
19321934
{
1933-
if (x->isDistributed && x->containsDML) {
1935+
if (!x->isReplicated && (x->isDistributed && x->containsDML)) {
19341936
MtmGenerateGid(x->gid);
1935-
if (!IsTransactionBlock()) {
1936-
elog(WARNING, "Start transaction block for %d", x->xid);
1937+
if (!x->isTransactionBlock) {
1938+
elog(WARNING, "Start transaction block for %s", x->gid);
19371939
BeginTransactionBlock();
1940+
x->isTransactionBlock = true;
19381941
CommitTransactionCommand();
19391942
StartTransactionCommand();
19401943
}
19411944
if (!PrepareTransactionBlock(x->gid))
19421945
{
19431946
elog(WARNING, "Failed to prepare transaction %s", x->gid);
19441947
/* ??? Should we do explicit rollback */
1945-
} else {
1948+
} else {
19461949
CommitTransactionCommand();
19471950
StartTransactionCommand();
19481951
if (MtmGetCurrentTransactionStatus() == TRANSACTION_STATUS_ABORTED) {
@@ -1970,8 +1973,11 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
19701973
TransactionStmt *stmt = (TransactionStmt *) parsetree;
19711974
switch (stmt->kind)
19721975
{
1976+
case TRANS_STMT_BEGIN:
1977+
MtmTx.isTransactionBlock = true;
1978+
break;
19731979
case TRANS_STMT_COMMIT:
1974-
if (MtmTwoPhaseCommit(&MtmTx)) {
1980+
if (MtmTwoPhaseCommit(&MtmTx)) {
19751981
return;
19761982
}
19771983
break;
@@ -2036,9 +2042,6 @@ MtmExecutorFinish(QueryDesc *queryDesc)
20362042
}
20372043
}
20382044
}
2039-
if (MtmTx.isDistributed && MtmTx.containsDML && !IsTransactionBlock()) {
2040-
MtmTwoPhaseCommit(&MtmTx);
2041-
}
20422045
}
20432046
if (PreviousExecutorFinishHook != NULL)
20442047
{

contrib/mmts/pglogical_apply.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -480,13 +480,15 @@ MtmBeginSession(void)
480480
}
481481

482482
static void
483-
MtmEndSession(void)
483+
MtmEndSession(bool unlock)
484484
{
485485
if (replorigin_session_origin != InvalidRepOriginId) {
486486
MTM_TRACE("%d: Begin reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
487487
replorigin_session_origin = InvalidRepOriginId;
488488
replorigin_session_reset();
489-
MtmUnlockNode(MtmReplicationNode);
489+
if (unlock) {
490+
MtmUnlockNode(MtmReplicationNode);
491+
}
490492
MTM_TRACE("%d: End reset replorigin session: %d\n", MyProcPid, replorigin_session_origin);
491493
}
492494
}
@@ -568,7 +570,7 @@ process_remote_commit(StringInfo in)
568570
default:
569571
Assert(false);
570572
}
571-
MtmEndSession();
573+
MtmEndSession(true);
572574
}
573575

574576
static void
@@ -935,7 +937,7 @@ void MtmExecutor(int id, void* work, size_t size)
935937
EmitErrorReport();
936938
FlushErrorState();
937939
MTM_TRACE("%d: REMOTE begin abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
938-
MtmEndSession();
940+
MtmEndSession(false);
939941
AbortCurrentTransaction();
940942
MTM_TRACE("%d: REMOTE end abort transaction %d\n", MyProcPid, MtmGetCurrentTransactionId());
941943
}

contrib/mmts/tests/dtmbench.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void* reader(void* arg)
129129
result r = txn.exec("select sum(v) from t");
130130
int64_t sum = r[0][0].as(int64_t());
131131
if (sum != prevSum) {
132-
// r = txn.exec("select mtm_get_snapshot()");
132+
r = txn.exec("select mtm_get_snapshot()");
133133
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
134134
prevSum = sum;
135135
}

contrib/mmts/tests/pg_hba.conf

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,7 @@ host all all ::1/128 trust
9090
# replication privilege.
9191
local replication knizhnik trust
9292
host replication knizhnik 127.0.0.1/32 trust
93+
local replication stas trust
94+
host replication stas ::1/128 trust
95+
host replication stas 127.0.0.1/32 trust
9396
#host replication knizhnik ::1/128 trust

contrib/mmts/tests/postgresql.conf.mm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,4 +625,4 @@
625625
# Add settings for extensions here
626626

627627
multimaster.workers=8
628-
multimaster.queue_size=1073741824
628+
multimaster.queue_size=104857600 # 100mb

contrib/mmts/tests/reinit-mm.sh

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ export PATH=~/code/postgres_cluster/install/bin/:$PATH
33
ulimit -c unlimited
44
pkill -9 postgres
55
pkill -9 arbiter
6+
7+
cd ~/code/postgres_cluster/contrib/mmts/
8+
make install
9+
cd ~/code/postgres_cluster/contrib/mmts/tests
10+
11+
612
rm -fr node? *.log dtm
713
mkdir dtm
814
conn_str=""
@@ -28,9 +34,10 @@ do
2834
echo "multimaster.conn_strings = '$conn_str'" >> node$i/postgresql.conf
2935
echo "multimaster.node_id = $i" >> node$i/postgresql.conf
3036
cp pg_hba.conf node$i
31-
pg_ctl -D node$i -l node$i.log start
37+
pg_ctl -w -D node$i -l node$i.log start
3238
done
3339

34-
sleep 5
40+
# sleep 5
41+
# psql -c "create extension multimaster;" postgres
3542

3643
echo Done

src/backend/access/transam/twophase.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,9 +1248,9 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12481248

12491249
hdr = (TwoPhaseFileHeader *) xlrec;
12501250
bufptr = xlrec + MAXALIGN(sizeof(TwoPhaseFileHeader));
1251-
bufptr += MAXALIGN(hdr->gidlen);
12521251

12531252
strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
1253+
bufptr += MAXALIGN(hdr->gidlen);
12541254

12551255
parsed->twophase_xid = hdr->xid;
12561256
parsed->dbId = hdr->database;
@@ -1269,8 +1269,6 @@ ParsePrepareRecord(uint8 info, char *xlrec, xl_xact_parsed_prepare *parsed)
12691269

12701270
parsed->msgs = (SharedInvalidationMessage *) bufptr;
12711271
bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage));
1272-
1273-
// strncpy(parsed->twophase_gid, bufptr, hdr->gidlen);
12741272
}
12751273

12761274

src/backend/utils/cache/inval.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,8 +603,10 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
603603
else if (msg->rm.dbId == MyDatabaseId)
604604
InvalidateCatalogSnapshot();
605605
}
606-
else
606+
else {
607+
*(int*) 0 = 0;
607608
elog(FATAL, "unrecognized SI message ID: %d", msg->id);
609+
}
608610
}
609611

610612
/*

0 commit comments

Comments
 (0)