Skip to content

Commit 0aa73d2

Browse files
committed
Merge branch 'master' into more_tests
2 parents d26d915 + 0dd74d5 commit 0aa73d2

File tree

11 files changed

+245
-209
lines changed

11 files changed

+245
-209
lines changed

contrib/mmts/Cluster.pm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,7 @@ sub stop
233233
}
234234
}
235235
}
236-
236+
sleep(2);
237237
return $ok;
238238
}
239239

contrib/mmts/arbiter.c

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -898,14 +898,13 @@ static void MtmReceiver(Datum arg)
898898
msg->status = TRANSACTION_STATUS_ABORTED;
899899
} else {
900900
msg->status = tm->state->status;
901-
msg->csn = tm->state->csn;
901+
msg->csn = tm->state->csn;
902902
MTM_LOG1("Send response %d for transaction %s to node %d", msg->status, msg->gid, msg->node);
903903
}
904904
msg->disabledNodeMask = Mtm->disabledNodeMask;
905905
msg->connectivityMask = Mtm->connectivityMask;
906906
msg->oldestSnapshot = Mtm->nodes[MtmNodeId-1].oldestSnapshot;
907907
msg->code = MSG_POLL_STATUS;
908-
msg->csn = ts->csn;
909908
MtmSendMessage(msg);
910909
continue;
911910
case MSG_POLL_STATUS:
@@ -918,11 +917,11 @@ static void MtmReceiver(Datum arg)
918917
BIT_SET(ts->votedMask, node-1);
919918
if (ts->status == TRANSACTION_STATUS_UNKNOWN) {
920919
if (msg->status == TRANSACTION_STATUS_IN_PROGRESS || msg->status == TRANSACTION_STATUS_ABORTED) {
921-
elog(LOG, "Abort transaction %s because it is in state %d at node %d",
922-
msg->gid, ts->status, node);
920+
elog(LOG, "Abort prepared transaction %s because it is in state %s at node %d",
921+
msg->gid, MtmNodeStatusMnem[msg->status], node);
923922
MtmFinishPreparedTransaction(ts, false);
924923
}
925-
else if (msg->status == TRANSACTION_STATUS_COMMITTED || msg->status == TRANSACTION_STATUS_UNKNOWN)
924+
else if (msg->status == TRANSACTION_STATUS_COMMITTED || msg->status == TRANSACTION_STATUS_UNKNOWN)
926925
{
927926
if (msg->csn > ts->csn) {
928927
ts->csn = msg->csn;
@@ -933,17 +932,17 @@ static void MtmReceiver(Datum arg)
933932
MtmFinishPreparedTransaction(ts, true);
934933
}
935934
} else {
936-
elog(LOG, "Receive response %d for transaction %s for node %d, votedMask=%llx, participantsMask=%llx",
937-
msg->status, msg->gid, node, (long long) ts->votedMask, (long long) (ts->participantsMask & ~Mtm->disabledNodeMask));
935+
elog(LOG, "Receive response %s for transaction %s for node %d, votedMask %llx, participantsMask %llx",
936+
MtmNodeStatusMnem[msg->status], msg->gid, node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask));
938937
continue;
939938
}
940939
} else if (ts->status == TRANSACTION_STATUS_ABORTED && msg->status == TRANSACTION_STATUS_COMMITTED) {
941940
elog(WARNING, "Transaction %s is aborted at node %d but committed at node %d", msg->gid, MtmNodeId, node);
942941
} else if (msg->status == TRANSACTION_STATUS_ABORTED && ts->status == TRANSACTION_STATUS_COMMITTED) {
943942
elog(WARNING, "Transaction %s is committed at node %d but aborted at node %d", msg->gid, MtmNodeId, node);
944943
} else {
945-
elog(LOG, "Receive response %d for transaction %s status %d for node %d, votedMask=%llx, participantsMask=%llx",
946-
msg->status, msg->gid, ts->status, node, (long long) ts->votedMask, (long long) (ts->participantsMask & ~Mtm->disabledNodeMask) );
944+
elog(LOG, "Receive response %s for transaction %s status %s for node %d, votedMask %llx, participantsMask %llx",
945+
MtmNodeStatusMnem[msg->status], msg->gid, MtmNodeStatusMnem[ts->status], node, (long long)ts->votedMask, (long long)(ts->participantsMask & ~Mtm->disabledNodeMask) );
947946
}
948947
}
949948
continue;
@@ -983,8 +982,8 @@ static void MtmReceiver(Datum arg)
983982
if ((~msg->disabledNodeMask & Mtm->disabledNodeMask) != 0) {
984983
/* Coordinator's disabled mask is wider than of this node: so reject such transaction to avoid
985984
commit on smaller subset of nodes */
986-
elog(WARNING, "Coordinator of distributed transaction see less nodes than node %d: %lx instead of %lx",
987-
node, (long) Mtm->disabledNodeMask, (long) msg->disabledNodeMask);
985+
elog(WARNING, "Coordinator of distributed transaction see less nodes than node %d: %llx instead of %llx",
986+
node, (long long) Mtm->disabledNodeMask, (long long) msg->disabledNodeMask);
988987
MtmAbortTransaction(ts);
989988
}
990989
if ((ts->participantsMask & ~Mtm->disabledNodeMask & ~ts->votedMask) == 0) {
@@ -993,7 +992,7 @@ static void MtmReceiver(Datum arg)
993992
MtmWakeUpBackend(ts);
994993
} else {
995994
Assert(ts->status == TRANSACTION_STATUS_IN_PROGRESS);
996-
MTM_LOG1("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
995+
MTM_LOG2("Transaction %s is prepared (status=%d participants=%lx disabled=%lx, voted=%lx)",
997996
ts->gid, ts->status, ts->participantsMask, Mtm->disabledNodeMask, ts->votedMask);
998997
ts->isPrepared = true;
999998
if (ts->isTwoPhase) {

contrib/mmts/bgwpool.c

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "postmaster/bgworker.h"
66
#include "storage/s_lock.h"
77
#include "storage/spin.h"
8+
#include "storage/proc.h"
89
#include "storage/pg_sema.h"
910
#include "storage/shmem.h"
1011
#include "datatype/timestamp.h"
@@ -16,23 +17,41 @@
1617
bool MtmIsLogicalReceiver;
1718
int MtmMaxWorkers;
1819

20+
static BgwPool* pool;
21+
22+
static void BgwShutdownWorker(int sig)
23+
{
24+
BgwPoolStop(pool);
25+
}
26+
1927
static void BgwPoolMainLoop(BgwPool* pool)
2028
{
2129
int size;
2230
void* work;
2331
static PortalData fakePortal;
32+
sigset_t sset;
2433

2534
MtmIsLogicalReceiver = true;
2635

36+
signal(SIGINT, BgwShutdownWorker);
37+
signal(SIGQUIT, BgwShutdownWorker);
38+
signal(SIGTERM, BgwShutdownWorker);
39+
40+
sigfillset(&sset);
41+
sigprocmask(SIG_UNBLOCK, &sset, NULL);
42+
2743
BackgroundWorkerUnblockSignals();
2844
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);
2945
ActivePortal = &fakePortal;
3046
ActivePortal->status = PORTAL_ACTIVE;
3147
ActivePortal->sourceText = "";
3248

33-
while(true) {
49+
while (true) {
3450
PGSemaphoreLock(&pool->available);
3551
SpinLockAcquire(&pool->lock);
52+
if (pool->shutdown) {
53+
break;
54+
}
3655
size = *(int*)&pool->queue[pool->head];
3756
Assert(size < pool->size);
3857
work = malloc(size);
@@ -64,6 +83,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
6483
pool->lastPeakTime = 0;
6584
SpinLockRelease(&pool->lock);
6685
}
86+
SpinLockRelease(&pool->lock);
6787
}
6888

6989
void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, char const* dbuser, size_t queueSize, size_t nWorkers)
@@ -75,6 +95,7 @@ void BgwPoolInit(BgwPool* pool, BgwPoolExecutor executor, char const* dbname, c
7595
PGSemaphoreReset(&pool->available);
7696
PGSemaphoreReset(&pool->overflow);
7797
SpinLockInit(&pool->lock);
98+
pool->shutdown = false;
7899
pool->producerBlocked = false;
79100
pool->head = 0;
80101
pool->tail = 0;
@@ -167,7 +188,7 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
167188
}
168189

169190
SpinLockAcquire(&pool->lock);
170-
while (true) {
191+
while (!pool->shutdown) {
171192
if ((pool->head <= pool->tail && pool->size - pool->tail < size + 4 && pool->head < size)
172193
|| (pool->head > pool->tail && pool->head - pool->tail < size + 4))
173194
{
@@ -204,3 +225,11 @@ void BgwPoolExecute(BgwPool* pool, void* work, size_t size)
204225
SpinLockRelease(&pool->lock);
205226
}
206227

228+
void BgwPoolStop(BgwPool* pool)
229+
{
230+
SpinLockAcquire(&pool->lock);
231+
pool->shutdown = true;
232+
SpinLockRelease(&pool->lock);
233+
PGSemaphoreUnlock(&pool->available);
234+
PGSemaphoreUnlock(&pool->overflow);
235+
}

contrib/mmts/bgwpool.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ typedef struct
3434
time_t lastPeakTime;
3535
timestamp_t lastDynamicWorkerStartTime;
3636
bool producerBlocked;
37+
bool shutdown;
3738
char dbname[MAX_DBNAME_LEN];
3839
char dbuser[MAX_DBUSER_LEN];
3940
char* queue;
@@ -51,4 +52,5 @@ extern size_t BgwPoolGetQueueSize(BgwPool* pool);
5152

5253
extern timestamp_t BgwGetLastPeekTime(BgwPool* pool);
5354

55+
extern void BgwPoolStop(BgwPool* pool);
5456
#endif

0 commit comments

Comments
 (0)