Skip to content

Commit 2e1ee94

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 72d16a6 + ae5f741 commit 2e1ee94

File tree

9 files changed

+487
-7
lines changed

9 files changed

+487
-7
lines changed

contrib/mmts/multimaster.c

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -342,14 +342,14 @@ csn_t MtmTransactionSnapshot(TransactionId xid)
342342
Snapshot MtmGetSnapshot(Snapshot snapshot)
343343
{
344344
snapshot = PgGetSnapshotData(snapshot);
345-
RecentGlobalDataXmin = RecentGlobalXmin = MtmAdjustOldestXid(RecentGlobalDataXmin);
345+
RecentGlobalDataXmin = RecentGlobalXmin = Mtm->oldestXid;//MtmAdjustOldestXid(RecentGlobalDataXmin);
346346
return snapshot;
347347
}
348348

349349

350350
TransactionId MtmGetOldestXmin(Relation rel, bool ignoreVacuum)
351351
{
352-
TransactionId xmin = PgGetOldestXmin(NULL, ignoreVacuum); /* consider all backends */
352+
TransactionId xmin = PgGetOldestXmin(NULL, false); /* consider all backends */
353353
xmin = MtmAdjustOldestXid(xmin);
354354
return xmin;
355355
}
@@ -505,6 +505,8 @@ MtmAdjustOldestXid(TransactionId xid)
505505
MtmTransState *ts, *prev = NULL;
506506
int i;
507507

508+
return FirstNormalTransactionId;
509+
508510
MtmLock(LW_EXCLUSIVE);
509511
ts = (MtmTransState*)hash_search(MtmXid2State, &xid, HASH_FIND, NULL);
510512
if (ts != NULL && ts->status == TRANSACTION_STATUS_COMMITTED) {
@@ -538,7 +540,8 @@ MtmAdjustOldestXid(TransactionId xid)
538540
if (prev != NULL) {
539541
Mtm->transListHead = prev;
540542
Mtm->oldestXid = xid = prev->xid;
541-
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
543+
} else {
544+
Assert(TransactionIdPrecedesOrEqual(Mtm->oldestXid, xid));
542545
xid = Mtm->oldestXid;
543546
}
544547
} else {

contrib/mmts/multimaster.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#define MULTIMASTER_ADMIN "mtm_admin"
4848

4949
#define USEC 1000000
50+
#define MB (1024*1024)
5051

5152
#define USEC_TO_MSEC(t) ((t)/1000)
5253
#define MSEC_TO_USEC(t) ((t)*1000)

contrib/mmts/pglogical_receiver.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ pglogical_receiver_main(Datum main_arg)
449449
{
450450
stmt = copybuf + hdr_len;
451451

452-
if (buf.used >= MtmTransSpillThreshold) {
452+
if (buf.used >= MtmTransSpillThreshold*MB) {
453453
if (spill_file < 0) {
454454
int file_id;
455455
spill_file = MtmCreateSpillFile(nodeId, &file_id);

contrib/mmts/t/002_cross.pl

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
use strict;
2+
use warnings;
3+
4+
use Cluster;
5+
use TestLib;
6+
use Test::More tests => 1;
7+
use IPC::Run qw(start finish);
8+
use Cwd;
9+
10+
my $nnodes = 2;
11+
my $nclients = 10;
12+
my $nkeys = $nnodes * $nclients;
13+
my $cluster = new Cluster($nnodes);
14+
15+
$cluster->init();
16+
$cluster->configure();
17+
$cluster->start();
18+
19+
my ($rc, $in, $out, $err);
20+
21+
diag("sleeping 10");
22+
sleep(10);
23+
24+
diag("preparing the tables");
25+
if ($cluster->psql(0, 'postgres', "create table t (k int primary key, v int)"))
26+
{
27+
BAIL_OUT('failed to create t');
28+
}
29+
30+
if ($cluster->psql(0, 'postgres', "insert into t (select generate_series(0, $nkeys - 1), 0)"))
31+
{
32+
BAIL_OUT('failed to fill t');
33+
}
34+
35+
sub appender
36+
{
37+
my ($appender_id, $clients, $seconds, $node, $inref, $outref) = @_;
38+
39+
my @argv = (
40+
'pgbench',
41+
'-n',
42+
-c => $clients,
43+
-j => $clients,
44+
-T => $seconds,
45+
-h => $node->host(),
46+
-p => $node->port(),
47+
-D => "appender_id=$appender_id",
48+
-D => "clients=$clients",
49+
-f => 'tests/appender.pgb',
50+
'postgres',
51+
);
52+
53+
diag("running[" . getcwd() . "]: " . join(' ', @argv));
54+
55+
return start(\@argv, $inref, $outref);
56+
}
57+
58+
sub state_dump
59+
{
60+
my $state = shift;
61+
62+
diag("<<<<<");
63+
while (my ($key, $value) = each(%{$state}))
64+
{
65+
diag("$key -> $value");
66+
}
67+
diag(">>>>>");
68+
}
69+
70+
sub state_leq
71+
{
72+
my ($a, $b) = @_;
73+
74+
while (my ($key, $value) = each(%{$a}))
75+
{
76+
if (!exists($b->{$key}))
77+
{
78+
diag("b has no key $key\n");
79+
return 0;
80+
}
81+
82+
if ($b->{$key} < $value)
83+
{
84+
diag($b->{$key} . " < $value\n");
85+
return 0;
86+
}
87+
}
88+
89+
return 1;
90+
}
91+
92+
sub parse_state
93+
{
94+
my $str = shift;
95+
my $state = {};
96+
97+
while ($str =~ /(\d+)\|(\d+)/g)
98+
{
99+
$state->{$1} = $2;
100+
}
101+
102+
return $state;
103+
}
104+
105+
diag("starting appenders");
106+
diag("starting benches");
107+
$in = '';
108+
$out = '';
109+
my @appenders = ();
110+
my $appender_id = 0;
111+
my $seconds = 30;
112+
foreach my $node (@{$cluster->{nodes}})
113+
{
114+
push(@appenders, appender($appender_id, $nclients, $seconds, $node, \$in, \$out));
115+
$appender_id++;
116+
}
117+
118+
my $selects = 0;
119+
my $anomalies = 0;
120+
my $started = time();
121+
my $node_id = 0;
122+
my $state_a = undef;
123+
my $state_b = undef;
124+
my $out_a = '';
125+
my $out_b = '';
126+
while (time() - $started < $seconds)
127+
{
128+
$node_id = ($node_id + 1) % $nnodes;
129+
$state_a = $state_b;
130+
$out_a = $out_b;
131+
($rc, $out, $err) = $cluster->psql($node_id, 'postgres', "select * from t;");
132+
$selects++;
133+
$state_b = parse_state($out);
134+
$out_b = $out;
135+
if (defined $state_a)
136+
{
137+
if (!state_leq($state_a, $state_b) && !state_leq($state_a, $state_b))
138+
{
139+
diag("cross anomaly detected:\n===a\n$out_a\n+++b\n$out_b\n---\n");
140+
$anomalies++;
141+
}
142+
}
143+
}
144+
145+
diag("finishing benches");
146+
foreach my $appender (@appenders)
147+
{
148+
finish($appender) || BAIL_OUT("pgbench exited with $?");
149+
}
150+
151+
is($anomalies, 0, "no cross anomalies after $selects selects");

contrib/mmts/tests/appender.pgb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
\set key :appender_id * :clients + :client_id
2+
begin;
3+
update t set v = v + 1 where k = :key;
4+
commit;

0 commit comments

Comments
 (0)