Skip to content

Commit 4c799f7

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 2e264d1 + 93d9f9e commit 4c799f7

File tree

8 files changed

+236
-54
lines changed

8 files changed

+236
-54
lines changed

contrib/mmts/check-recovery.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
rm -rf /home/knizhnik/postgres_cluster/contrib/mmts/tmp_check/log
1+
rm -rf /home/knizhnik/postgres_cluster/contrib/mmts/tmp_check
22
TESTDIR='/home/knizhnik/postgres_cluster/contrib/mmts' PATH="/home/knizhnik/postgres_cluster/tmp_install/home/knizhnik/postgres_cluster/dist/bin:$PATH" LD_LIBRARY_PATH="/home/knizhnik/postgres_cluster/tmp_install/home/knizhnik/postgres_cluster/dist/lib:$LD_LIBRARY_PATH" PGPORT='65432' PG_REGRESS='/home/knizhnik/postgres_cluster/contrib/mmts/../../src/test/regress/pg_regress' prove -I ../../src/test/perl/ --verbose t/001_basic_recovery.pl

contrib/mmts/multimaster--1.0.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ CREATE FUNCTION mtm.dump_lock_graph() RETURNS text
4444
AS 'MODULE_PATHNAME','mtm_dump_lock_graph'
4545
LANGUAGE C;
4646

47+
CREATE FUNCTION mtm.poll_node(nodeId integer, noWait boolean default FALSE) RETURNS boolean
48+
AS 'MODULE_PATHNAME','mtm_poll_node'
49+
LANGUAGE C;
50+
4751
CREATE TABLE IF NOT EXISTS mtm.ddl_log (issued timestamp with time zone not null, query text);
4852

4953
CREATE TABLE IF NOT EXISTS mtm.local_tables(rel_schema text, rel_name text, primary key(rel_schema, rel_name));

contrib/mmts/multimaster.c

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ PG_MODULE_MAGIC;
103103
PG_FUNCTION_INFO_V1(mtm_start_replication);
104104
PG_FUNCTION_INFO_V1(mtm_stop_replication);
105105
PG_FUNCTION_INFO_V1(mtm_drop_node);
106+
PG_FUNCTION_INFO_V1(mtm_poll_node);
106107
PG_FUNCTION_INFO_V1(mtm_recover_node);
107108
PG_FUNCTION_INFO_V1(mtm_get_snapshot);
108109
PG_FUNCTION_INFO_V1(mtm_get_nodes_state);
@@ -181,6 +182,7 @@ int MtmKeepaliveTimeout;
181182
int MtmReconnectAttempts;
182183
int MtmNodeDisableDelay;
183184
bool MtmUseRaftable;
185+
bool MtmUseDtm;
184186
MtmConnectionInfo* MtmConnections;
185187

186188
static char* MtmConnStrs;
@@ -339,7 +341,7 @@ TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum)
339341
}
340342

341343
bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
342-
{
344+
{
343345
#if TRACE_SLEEP_TIME
344346
static timestamp_t firstReportTime;
345347
static timestamp_t prevReportTime;
@@ -349,6 +351,10 @@ bool MtmXidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
349351
timestamp_t delay = MIN_WAIT_TIMEOUT;
350352
Assert(xid != InvalidTransactionId);
351353

354+
if (!MtmUseDtm) {
355+
return PgXidInMVCCSnapshot(xid, snapshot);
356+
}
357+
352358
MtmLock(LW_SHARED);
353359

354360
#if TRACE_SLEEP_TIME
@@ -512,13 +518,19 @@ MtmAdjustOldestXid(TransactionId xid)
512518
hash_search(MtmXid2State, &prev->xid, HASH_REMOVE, NULL);
513519
}
514520
}
515-
}
516-
if (prev != NULL) {
517-
Mtm->transListHead = prev;
518-
Mtm->oldestXid = xid = prev->xid;
519-
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
520-
xid = Mtm->oldestXid;
521-
}
521+
}
522+
if (MtmUseDtm) {
523+
if (prev != NULL) {
524+
Mtm->transListHead = prev;
525+
Mtm->oldestXid = xid = prev->xid;
526+
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
527+
xid = Mtm->oldestXid;
528+
}
529+
} else {
530+
if (prev != NULL) {
531+
Mtm->transListHead = prev;
532+
}
533+
}
522534
MtmUnlock();
523535
}
524536
return xid;
@@ -753,6 +765,8 @@ MtmPrePrepareTransaction(MtmCurrentTrans* x)
753765

754766
}
755767

768+
static time_t maxWakeupTime;
769+
756770
static void
757771
MtmPostPrepareTransaction(MtmCurrentTrans* x)
758772
{
@@ -768,18 +782,23 @@ MtmPostPrepareTransaction(MtmCurrentTrans* x)
768782
tm->state = ts;
769783
ts->votingCompleted = true;
770784
if (Mtm->status != MTM_RECOVERY) {
771-
MtmSendNotificationMessage(ts, MSG_READY); /* send notification to coordinator */
785+
MtmSendNotificationMessage(ts, MtmUseDtm ? MSG_READY : MSG_PREPARED); /* send notification to coordinator */
772786
} else {
773787
ts->status = TRANSACTION_STATUS_UNKNOWN;
774788
}
775789
MtmUnlock();
776790
MtmResetTransaction(x);
777791
} else {
792+
time_t wakeupTime;
778793
/* wait votes from all nodes */
779794
while (!ts->votingCompleted) {
780795
MtmUnlock();
781796
WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
782797
ResetLatch(&MyProc->procLatch);
798+
wakeupTime = MtmGetCurrentTime() - ts->wakeupTime;
799+
if (wakeupTime > maxWakeupTime) {
800+
maxWakeupTime = wakeupTime;
801+
}
783802
MtmLock(LW_SHARED);
784803
}
785804
x->status = ts->status;
@@ -972,6 +991,7 @@ void MtmWakeUpBackend(MtmTransState* ts)
972991
{
973992
MTM_LOG3("Wakeup backed procno=%d, pid=%d", ts->procno, ProcGlobal->allProcs[ts->procno].pid);
974993
ts->votingCompleted = true;
994+
ts->wakeupTime = MtmGetCurrentTime();
975995
SetLatch(&ProcGlobal->allProcs[ts->procno].procLatch);
976996
}
977997

@@ -1651,6 +1671,19 @@ _PG_init(void)
16511671
NULL
16521672
);
16531673

1674+
DefineCustomBoolVariable(
1675+
"multimaster.use_dtm",
1676+
"Use distributed transaction manager",
1677+
NULL,
1678+
&MtmUseDtm,
1679+
true,
1680+
PGC_BACKEND,
1681+
0,
1682+
NULL,
1683+
NULL,
1684+
NULL
1685+
);
1686+
16541687
DefineCustomIntVariable(
16551688
"multimaster.workers",
16561689
"Number of multimaster executor workers per node",
@@ -2069,6 +2102,27 @@ mtm_drop_node(PG_FUNCTION_ARGS)
20692102
PG_RETURN_VOID();
20702103
}
20712104

2105+
Datum
2106+
mtm_poll_node(PG_FUNCTION_ARGS)
2107+
{
2108+
int nodeId = PG_GETARG_INT32(0);
2109+
bool nowait = PG_GETARG_BOOL(1);
2110+
bool online = true;
2111+
while (BIT_CHECK(Mtm->disabledNodeMask, nodeId-1)) {
2112+
if (nowait) {
2113+
online = false;
2114+
break;
2115+
} else {
2116+
MtmSleep(STATUS_POLL_DELAY);
2117+
}
2118+
}
2119+
if (!nowait) {
2120+
/* Just wait some time until logical repication channels will be reestablished */
2121+
MtmSleep(MtmNodeDisableDelay);
2122+
}
2123+
PG_RETURN_BOOL(online);
2124+
}
2125+
20722126
Datum
20732127
mtm_recover_node(PG_FUNCTION_ARGS)
20742128
{

contrib/mmts/multimaster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
#include "pglogical_output/hooks.h"
99

10-
#define DEBUG_LEVEL 1
10+
#define DEBUG_LEVEL 0
1111

1212
#if DEBUG_LEVEL == 0
1313
#define MTM_LOG1(fmt, ...) elog(LOG, fmt, ## __VA_ARGS__)
@@ -135,6 +135,7 @@ typedef struct MtmTransState
135135
int procno; /* pgprocno of transaction coordinator waiting for responses from replicas,
136136
used to notify coordinator by arbiter */
137137
int nSubxids; /* Number of subtransanctions */
138+
time_t wakeupTime;
138139
MtmMessageCode cmd; /* Notification message to be sent */
139140
struct MtmTransState* nextVoting; /* Next element in L1-list of voting transactions. */
140141
struct MtmTransState* next; /* Next element in L1 list of all finished transaction present in xid2state hash */
@@ -191,6 +192,7 @@ extern int MtmConnectTimeout;
191192
extern int MtmReconnectAttempts;
192193
extern int MtmKeepaliveTimeout;
193194
extern int MtmNodeDisableDelay;
195+
extern bool MtmUseDtm;
194196
extern HTAB* MtmXid2State;
195197

196198
extern MtmConnectionInfo* MtmConnections;

contrib/mmts/t/001_basic_recovery.pl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@
6060
diag("starting node 2");
6161
$cluster->{nodes}->[2]->start;
6262
diag("sleeping 10");
63-
sleep(10); # XXX: here we can poll
63+
#sleep(10); # XXX: here we can poll
64+
$cluster->psql(0, 'postgres', "select mtm.poll_node(3);");
6465
diag("inserting 3");
6566
$cluster->psql(0, 'postgres', "insert into t values(3, 30);");
6667
diag("selecting");

contrib/mmts/tests/deploy_layouts/cluster.yml

Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,5 @@
11
---
22

3-
- hosts: nodes[0]
4-
5-
roles:
6-
- role: postgres
7-
pg_port: 15432
8-
pg_repo: https://github.com/postgrespro/postgres_cluster.git
9-
pg_version_tag: master
10-
pg_destroy_and_init: true
11-
12-
tasks:
13-
- name: build sockhub
14-
shell: "make clean && make -j 4"
15-
args:
16-
chdir: "~/pg_cluster/src/contrib/arbiter/sockhub"
17-
18-
- name: build dtm
19-
shell: "make clean && make -j 4"
20-
args:
21-
chdir: "~/pg_cluster/src/contrib/arbiter"
22-
# when: dtm_sources.changed
23-
24-
- name: kill arbiter
25-
shell: kill -9 `cat ~/pg_cluster/dtm_data/arbiter.pid` || true
26-
27-
- name: ensure datadir for dtm exists
28-
shell: "rm -rf ~/pg_cluster/dtm_data && mkdir ~/pg_cluster/dtm_data"
29-
30-
- name: start dtm
31-
shell: >
32-
nohup ~/pg_cluster/src/contrib/arbiter/bin/arbiter
33-
-d ~/pg_cluster/dtm_data -r 0.0.0.0:5431 -i 0 -l ~/pg_cluster/dtm_data/log &
34-
35-
- name: wait until dtm is available
36-
wait_for: port=5431 delay=1
37-
38-
393
- hosts: nodes[1]:nodes[2]:nodes[3]
404
# accelerate: true
415

@@ -61,10 +25,10 @@
6125
set_fact:
6226
connections: "{{ connstrs.results | map(attribute='ansible_facts.connstr') | join(', ') }}"
6327

64-
- name: build sockhub
65-
shell: "make clean && make -j 4"
28+
- name: build raftable
29+
shell: "make clean && make -j {{makejobs}} install"
6630
args:
67-
chdir: "{{pg_src}}/contrib/arbiter/sockhub"
31+
chdir: "{{pg_src}}/contrib/raftable"
6832

6933
- name: build multimaster
7034
shell: "make clean && make -j {{makejobs}} install"
@@ -81,13 +45,17 @@
8145
- "max_wal_senders = 10"
8246
- "wal_sender_timeout = 0"
8347
- "max_replication_slots = 10"
48+
- "max_connections = 200"
8449
- "max_worker_processes = 100"
85-
- "shared_preload_libraries = 'multimaster'"
86-
- "multimaster.arbiters = '{{groups['nodes'][0]}}:5431'"
50+
- "shared_preload_libraries = 'raftable,multimaster'"
8751
- "multimaster.conn_strings = '{{connections}}'"
8852
- "multimaster.node_id = {{ node_id }}"
53+
- "multimaster.buffer_size = 65536"
8954
- "multimaster.queue_size = 1073741824"
55+
- "multimaster.arbiter_port = 5600"
56+
- "multimaster.vacuum_delay = 1"
9057
- "multimaster.workers = 32"
58+
- "multimaster.use_dtm = 0"
9159

9260
- name: restart postgrespro
9361
command: "{{pg_dst}}/bin/pg_ctl restart -w -D {{pg_datadir}} -l {{pg_datadir}}/pg.log"

0 commit comments

Comments
 (0)