Skip to content

Commit 603a97a

Browse files
committed
Merge branch 'xtm' of gitlab.postgrespro.ru:pgpro-dev/postgrespro into xtm
2 parents bf89fd4 + 862d0e0 commit 603a97a

28 files changed

+477
-1832
lines changed

contrib/pg_dtm/dtmd/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
CC=gcc
2-
CFLAGS=-g -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
2+
CFLAGS=-g -O2 -Wall -Iinclude -D_LARGEFILE64_SOURCE # -DDEBUG
33
LIBUV_PREFIX=$(HOME)/libuv-build
44
LIBUV_CFLAGS=-I"$(LIBUV_PREFIX)/include" -L"$(LIBUV_PREFIX)/lib"
55
LIBUV_LDFLAGS=-luv -pthread -lrt

contrib/pg_dtm/libdtm.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -301,7 +301,7 @@ void DtmInitSnapshot(Snapshot snapshot)
301301
* we are in recovery, see later comments.
302302
*/
303303
snapshot->xip = (TransactionId *)
304-
malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
304+
malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
305305
if (snapshot->xip == NULL)
306306
ereport(ERROR,
307307
(errcode(ERRCODE_OUT_OF_MEMORY),

contrib/pg_dtm/pg_dtm.c

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -205,22 +205,39 @@ static void DtmMergeWithGlobalSnapshot(Snapshot dst)
205205
if (src->xmin < dst->xmin) {
206206
dst->xmin = src->xmin;
207207
}
208-
209-
n = dst->xcnt;
210-
Assert(src->xcnt + n <= GetMaxSnapshotXidCount());
211-
memcpy(dst->xip + n, src->xip, src->xcnt*sizeof(TransactionId));
212-
n += src->xcnt;
213-
214-
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
215-
xid = InvalidTransactionId;
216-
217-
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
218-
if (dst->xip[i] != xid) {
219-
dst->xip[j++] = xid = dst->xip[i];
220-
}
221-
}
222-
dst->xcnt = j;
223-
208+
Assert(src->subxcnt == 0);
209+
210+
if (src->xcnt + dst->subxcnt + dst->xcnt <= GetMaxSnapshotXidCount()) {
211+
Assert(dst->subxcnt == 0);
212+
memcpy(dst->xip + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
213+
n = dst->xcnt + src->xcnt;
214+
215+
qsort(dst->xip, n, sizeof(TransactionId), xidComparator);
216+
xid = InvalidTransactionId;
217+
218+
for (i = 0, j = 0; i < n && dst->xip[i] < dst->xmax; i++) {
219+
if (dst->xip[i] != xid) {
220+
dst->xip[j++] = xid = dst->xip[i];
221+
}
222+
}
223+
dst->xcnt = j;
224+
} else {
225+
Assert(src->xcnt + dst->subxcnt + dst->xcnt <= GetMaxSnapshotSubxidCount());
226+
memcpy(dst->subxip + dst->subxcnt, dst->xip, dst->xcnt*sizeof(TransactionId));
227+
memcpy(dst->subxip + dst->subxcnt + dst->xcnt, src->xip, src->xcnt*sizeof(TransactionId));
228+
n = dst->xcnt + dst->subxcnt + src->xcnt;
229+
230+
qsort(dst->subxip, n, sizeof(TransactionId), xidComparator);
231+
xid = InvalidTransactionId;
232+
233+
for (i = 0, j = 0; i < n && dst->subxip[i] < dst->xmax; i++) {
234+
if (dst->subxip[i] != xid) {
235+
dst->subxip[j++] = xid = dst->subxip[i];
236+
}
237+
}
238+
dst->subxcnt = j;
239+
dst->xcnt = 0;
240+
}
224241
DumpSnapshot(dst, "merged");
225242
}
226243

contrib/pg_dtm/sockhub/sockhub.c

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,23 @@ static void reconnect(Shub* shub)
151151
}
152152
}
153153

154+
static void notify_disconnect(Shub* shub, int chan)
155+
{
156+
ShubMessageHdr* hdr;
157+
hdr = (ShubMessageHdr*)&shub->in_buffer[shub->in_buffer_used];
158+
hdr->size = 0;
159+
hdr->chan = chan;
160+
hdr->code = MSG_DISCONNECT;
161+
shub->in_buffer_used += sizeof(ShubMessageHdr);
162+
if (shub->in_buffer_used + sizeof(ShubMessageHdr) > shub->params->buffer_size) {
163+
while (!write_socket(shub->output, shub->in_buffer, shub->in_buffer_used)) {
164+
shub->params->error_handler("Failed to write to inet socket", SHUB_RECOVERABLE_ERROR);
165+
reconnect(shub);
166+
}
167+
shub->in_buffer_used = 0;
168+
}
169+
}
170+
154171
static void recovery(Shub* shub)
155172
{
156173
int i, max_fd;
@@ -162,6 +179,9 @@ static void recovery(Shub* shub)
162179
FD_ZERO(&tryset);
163180
FD_SET(i, &tryset);
164181
if (select(i+1, &tryset, NULL, NULL, &tm) < 0) {
182+
if (i != shub->input && i != shub->output) {
183+
notify_disconnect(shub, i);
184+
}
165185
close_socket(shub, i);
166186
}
167187
}
@@ -259,6 +279,7 @@ void ShubLoop(Shub* shub)
259279
if (!write_socket(chan, (char*)hdr, n)) {
260280
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
261281
close_socket(shub, chan);
282+
notify_disconnect(shub, chan);
262283
chan = -1;
263284
}
264285
if (n != hdr->size + sizeof(ShubMessageHdr)) {
@@ -274,6 +295,7 @@ void ShubLoop(Shub* shub)
274295
if (chan >= 0 && !write_socket(chan, shub->out_buffer, n)) {
275296
shub->params->error_handler("Failed to write to local socket", SHUB_RECOVERABLE_ERROR);
276297
close_socket(shub, chan);
298+
notify_disconnect(shub, chan);
277299
chan = -1;
278300
}
279301
tail -= n;
@@ -295,6 +317,7 @@ void ShubLoop(Shub* shub)
295317
if (available < sizeof(ShubMessageHdr)) {
296318
shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
297319
close_socket(shub, i);
320+
notify_disconnect(shub, i);
298321
} else {
299322
int pos = 0;
300323
/* loop through all fetched messages */
@@ -333,6 +356,7 @@ void ShubLoop(Shub* shub)
333356
if (hdr != NULL) { /* if message header is not yet sent to the server... */
334357
/* ... then skip this message */
335358
shub->in_buffer_used = (char*)hdr - shub->in_buffer;
359+
notify_disconnect(shub, chan);
336360
break;
337361
} else { /* if message was partly sent to the server, we can not skip it, so we have to send garbage to the server */
338362
chan = -1; /* do not try to read rest of body of this message */
@@ -351,6 +375,10 @@ void ShubLoop(Shub* shub)
351375
shub->in_buffer_used = 0;
352376
}
353377
} while (size != 0); /* repeat until all message body is received */
378+
379+
if (chan < 0) {
380+
notify_disconnect(shub, i);
381+
}
354382

355383
pos = available;
356384
break;

contrib/pg_dtm/sockhub/sockhub.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ typedef struct {
99
unsigned int chan; /* local socket: set by SockHUB */
1010
} ShubMessageHdr;
1111

12+
enum ShubMessageCodes
13+
{
14+
MSG_DISCONNECT,
15+
MSG_FIRST_USER_CODE /* all codes >= 1 are user defined */
16+
};
17+
1218
typedef enum
1319
{
1420
SHUB_FATAL_ERROR,

contrib/pg_dtm/sockhub/start-clients.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
n_clients=100
1+
n_clients=200
22
n_iters=100000
33
./sockhub -h $1 -p 5001 -f /tmp/p5002 &
44
for ((i=0;i<n_clients;i++))
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
---
2+
- hosts: master-workers
3+
roles:
4+
- role: postgrespro
5+
deploy_dtm: true
6+
7+
- hosts: workers
8+
roles:
9+
- role: postgrespro
10+
pg_port: 15432
11+
deploy_postgres: true
12+
pg_dtm_enable: true
13+
pg_dtm_host: "{{groups['master-workers'][0]}}"
14+
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
---
2+
- hosts: master-workers
3+
roles:
4+
- role: postgrespro
5+
deploy_postgres: true
6+
ppg_usedtm: false
7+
8+
- hosts: workers
9+
roles:
10+
- role: postgrespro
11+
deploy_postgres: true
12+
ppg_usedtm: false
13+

contrib/pg_dtm/tests/deploy_layouts/cluster_pg_shard.yml

Whitespace-only changes.
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
---
2+
- hosts: dtm
3+
gather_facts: no
4+
roles:
5+
- role: postgrespro
6+
deploy_dtm: true
7+
8+
- hosts: master-workers
9+
gather_facts: no
10+
roles:
11+
12+
- role: postgrespro
13+
deploy_postgres: true
14+
deploy_pg_shard: true
15+
ppg_usedtm: true
16+
ppg_dtmhost: "blade8" #!!!
17+
ppg:
18+
version: xtm_pgshard
19+
src: /home/stas/postgrespro
20+
dst: /home/stas/postgrespro-build
21+
log: /home/stas/ppg.log
22+
datadir: /home/stas/postgrespro-data
23+
usedtm: true
24+
node_id: 1
25+
ppg_configfile:
26+
# we should so delete/reinit on playbook, after that drop rexeps here
27+
- line: "shared_buffers = 1024MB" # 1/4 RAM
28+
regexp: "^shared_buffers "
29+
- line: "wal_keep_segments = 128"
30+
regexp: "^wal_keep_segments "
31+
- line: "fsync = off"
32+
regexp: "^fsync "
33+
- line: "autovacuum = off"
34+
regexp: "^autovacuum "
35+
- line: "listen_addresses = '*'"
36+
regexp: "^listen_addresses "
37+
- line: "port = 5432"
38+
regexp: "^port "
39+
40+
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
-----BEGIN RSA PRIVATE KEY-----
2+
MIIEpAIBAAKCAQEAx2a2Rhzv9lQvufzq3UGgLhdwQJL3eyBTaJ+5HNWrmLHrXsti
3+
QDKFSPhLavZzfIodZUYA0KHjk4RIfXwCpdEPGiOoiK0Kcrr1mSEASuSfITea9/i9
4+
Q+0TR9xQErmjK9JKRqJcKD7G/ow/pszc29Lr97/lEk7eZ91ISveS50P9zefCFDNJ
5+
Hdl38Qr2TjV5cubbhU9sFjeD8Ql7ronzOAx9UmZWVnluT1bgknn4PqkBqVxT+caz
6+
to4MiibyGO8MlGSnK+W/0RYt8pxwgGivgIrwRzZSputwGhzTpJj2OlvbY7CJ7ybq
7+
0kgEMUvf52b6acVcaFWxwYIfcSM/WQzMZD3j+QIDAQABAoIBAA2jBqAw5nBQPZtY
8+
oq9TB6NfUvv3kRlIkqu7EKvaKQweTtyCyEtfqSHeaTn9dNR1laERcojSKYAzMcnz
9+
KzMv4vqEqcf/HZHPG3DHgv+cNw3MgbqntjnpAsKNXKSLMY+TqCTAgdD2yiEqckf0
10+
Br655A1jgEUKQ8eSqJkH6XILglYMfp0oEocGnWQtF5dDgtOz7ELBMSGf5NgLP1hR
11+
32VzSY9iuffPIjUtSfoUtlwmhOQWgP0X1EA+gFjjk+o6gUWicfWkq3BLfj85n2Rg
12+
9SdUg+XIXhEaMq0faWrn+ZfcSBpukR8P06CyujnDYdtjxC5byAhvCLzGOgmxDVDE
13+
tFIX/wECgYEA6Dn4VkFqtmD4UI7CuJWYlSMkiy+LWchlBfmI+ET/T3cEy8zrXp6g
14+
Hv5F6TkOSEnrATHFgMvgbA2z0BOB9Njp8aDFW/M1QbVLcKNhLdFT+soAqC4BCacF
15+
ooZyM+AlDJKjRCJKnx9mhAfV7Bq7rph9ZPVjpRsS7DEUXLWSBHdpxvECgYEA29B7
16+
Kqi7ufpl0tmox/3vHpdRYdNO7CIC94CdYkGbHXuizly2RVzk9JBHro76ge8vO/UU
17+
qADL+sQxhCjt3R847uJdrB9IKPirNar5e3/TgS5fXsPWpnW2hLgsq75KdVR/ay0b
18+
Mxu6YS0jfx8rCPuqEmVnpO+akCCyEbarfznrPYkCgYByyFx3/eOJ+8ogvz99zutZ
19+
I8KnTTu6h/NvDZTm6eur0m87cLER1qPcTeAuU7Y8gtS2hWxSqfLmVat/+HRumlyf
20+
wtBqD39OWBbOuOKKrAGvXvMOLZbzt2twlrWR8IM/gKdUQQLTPckFD35sMhZ8SQEJ
21+
ysSS0hv7RJME8/YVYRSWcQKBgQCnK1FQKwzBrpWbmh7LIeqlmCyzOhGucVtSQUTw
22+
Abbm4Cz7xfR0oeYZvFRXg7Mt9+ozLfrsndaDOovx13K2lNUmj47vpMarKhqC8SlU
23+
6+y9NLghCM0IwULygmKupkRYIM/agW5LGw5OcxaoydpftY0s+mOtQu+IJuVlpUed
24+
tT2bCQKBgQDHXhA1+y1bpaxAahqDKGq7fiwjBSIgJiHcN46ku1Dm+rFBJAhERDC0
25+
4uyAcap1Z+FFMp3U8qkY1ut4HsBtDWT4QiPyglDLs38/OTPuf9Q6R91AEoKGaS/V
26+
d76uZD3elYG0CqOK+9eE1eqbALDFnJ7KRLtVRD8nYtgSc/lIAAh1Xw==
27+
-----END RSA PRIVATE KEY-----
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
---
2+
3+
- name: ensure we have checked out libuv-1.7.5
4+
git: repo=https://github.com/libuv/libuv.git
5+
dest={{libuv.src}}
6+
version={{libuv.version}}
7+
update=no
8+
9+
- name: build libuv
10+
shell: ./autogen.sh && ./configure --disable-shared --prefix={{libuv.dst}} && make && make install
11+
args:
12+
chdir: "{{libuv.src}}"
13+
creates: "{{libuv.dst}}/lib/libuv.a"
14+
15+
- name: compile dtmd
16+
shell: make "LIBUV_PREFIX={{libuv.dst}}"
17+
args:
18+
chdir: "{{pg_src}}/contrib/pg_dtm/dtmd"
19+
creates: "{{pg_src}}/contrib/pg_dtm/dtmd/bin/dtmd"
20+
21+
- name: install dtmd
22+
command: cp "{{pg_src}}/contrib/pg_dtm/dtmd/bin/dtmd" "{{dtmd.dst}}"
23+
args:
24+
creates: "{{dtmd.dst}}"
25+
26+
- name: ensure datadir for dtm exists
27+
file: dest={{dtmd.datadir}} state=directory
28+
29+
# FIXME: use dtmd'd pid file
30+
- name: kill dtmd
31+
shell: killall -q dtmd || true
32+
33+
- name: start dtm
34+
shell: nohup {{dtmd.dst}} -d {{dtmd.datadir}} -a 0.0.0.0 -p {{dtmd.port}} > {{dtmd.log}} &
35+
36+
- name: wait until dtm is available
37+
wait_for: port=5431 delay=1
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
---
2+
3+
- name: ensure dependencies (Debian)
4+
apt: pkg={{item}} state=installed
5+
with_items:
6+
- git
7+
- automake
8+
- libtool
9+
- build-essential
10+
- bison
11+
- flex
12+
- libreadline-dev
13+
when: ansible_os_family == "Debian"
14+
sudo: yes
15+
16+
- name: ensure dependencies (RedHat)
17+
yum: name="@Development tools" state=present
18+
when: ansible_os_family == "RedHat"
19+
sudo: yes
20+
21+
- name: ensure dependencies (RedHat)
22+
yum: name={{item}} state=installed
23+
with_items:
24+
- git
25+
- automake
26+
- libtool
27+
- bison
28+
- flex
29+
- readline-devel
30+
when: ansible_os_family == "RedHat"
31+
sudo: yes
32+
33+
- name: setup the private key for postgrespro git access
34+
copy: dest=.ssh/ppg-deploy src=ppg-deploy.key mode=0600
35+
36+
- name: ensure we have checked out postgrespro (xtm)
37+
git: repo=git@gitlab.postgrespro.ru:pgpro-dev/postgrespro.git
38+
dest={{pg_src}}
39+
version={{pg_version}}
40+
force=yes
41+
update=yes
42+
key_file=.ssh/ppg-deploy
43+
accept_hostkey=yes
44+
depth=1
45+
register: pg_sources
46+
47+
- name: remove binaries if sources have changed
48+
file: dest={{item}} state=absent
49+
with_items:
50+
- "{{pg_dst}}"
51+
- "{{dtmd.dst}}"
52+
when: pg_sources.changed
53+
54+
- include: postgres.yml
55+
when: deploy_postgres
56+
57+
- include: dtm.yml
58+
when: deploy_dtm
59+
60+
- include: pg_shard.yml
61+
when: deploy_pg_shard

0 commit comments

Comments
 (0)