Skip to content

Commit 2162563

Browse files
committed
add DbId to abort records
1 parent 75e6e6b commit 2162563

File tree

9 files changed

+162
-67
lines changed

9 files changed

+162
-67
lines changed

contrib/pglogical/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ OBJS = pglogical_apply.o pglogical_conflict.o pglogical_manager.o \
1515
PG_CPPFLAGS = -I$(libpq_srcdir) -I$(top_srcdir)/contrib/pglogical_output
1616
SHLIB_LINK = $(libpq)
1717

18-
REGRESS = preseed infofuncs init_fail init preseed_check basic extended twophase toasted replication_set add_table matview bidirectional primary_key foreign_key functions copy drop
18+
REGRESS = preseed infofuncs init_fail init preseed_check basic extended toasted replication_set add_table matview bidirectional primary_key foreign_key functions copy twophase drop
1919

2020
# In-tree builds only
2121
subdir = contrib/pglogical

contrib/pglogical/expected/twophase.out

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,26 @@ SELECT * FROM pglogical_regress_variables()
33
\gset
44
\c :provider_dsn
55
SELECT pglogical.replicate_ddl_command($$
6-
CREATE TABLE public.test2pc_tbl(id serial primary key, value int);
6+
CREATE TABLE public.test2pc_tbl(id serial primary key, value int);
77
$$);
88
replicate_ddl_command
99
-----------------------
1010
t
1111
(1 row)
1212

13-
SELECT * FROM pglogical.replication_set_add_all_tables('default', '{public}');
14-
replication_set_add_all_tables
15-
--------------------------------
13+
-- SELECT * FROM pglogical.replication_set_add_all_tables('default', '{public}');
14+
SELECT * FROM pglogical.replication_set_add_table('default', 'test2pc_tbl');
15+
replication_set_add_table
16+
---------------------------
1617
t
1718
(1 row)
1819

20+
SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
21+
pg_xlog_wait_remote_apply
22+
---------------------------
23+
24+
(1 row)
25+
1926
-- Check that prapeared state is visible on slave and data available after commit
2027
BEGIN;
2128
INSERT INTO test2pc_tbl VALUES (1, 10);
@@ -28,10 +35,11 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
2835

2936
\c :subscriber_dsn
3037
SELECT gid, owner, database FROM pg_prepared_xacts;
31-
gid | owner | database
32-
-----+-------+--------------------
33-
tx1 | stas | contrib_regression
34-
(1 row)
38+
gid | owner | database
39+
---------------------+-------+--------------------
40+
tx1 | stas | contrib_regression
41+
test_subscriber:tx1 | stas | postgres
42+
(2 rows)
3543

3644
SELECT * FROM test2pc_tbl;
3745
id | value
@@ -71,10 +79,11 @@ SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
7179

7280
\c :subscriber_dsn
7381
SELECT gid, owner, database FROM pg_prepared_xacts;
74-
gid | owner | database
75-
-----+-------+--------------------
76-
tx2 | stas | contrib_regression
77-
(1 row)
82+
gid | owner | database
83+
---------------------+-------+--------------------
84+
tx2 | stas | contrib_regression
85+
test_subscriber:tx2 | stas | postgres
86+
(2 rows)
7887

7988
SELECT * FROM test2pc_tbl;
8089
id | value

contrib/pglogical/pglogical_apply.c

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,10 @@ handle_commit(StringInfo s)
148148
TimestampTz commit_time;
149149
uint8 flags;
150150
const char *gid;
151+
char *local_gid;
151152
PGLFlushPosition *flushpos;
152153
bool flush = true;
154+
PGLogicalLocalNode *mynode;
153155

154156
pglogical_read_commit(s, &commit_lsn, &end_lsn, &commit_time, &flags, &gid);
155157

@@ -170,31 +172,53 @@ handle_commit(StringInfo s)
170172
CommitTransactionCommand();
171173
StartTransactionCommand();
172174

175+
mynode = get_local_node(false);
176+
local_gid = palloc0(GIDSIZE);
177+
strncpy(local_gid, mynode->node->name, GIDSIZE);
178+
strncat(local_gid, ":", GIDSIZE);
179+
strncat(local_gid, gid, GIDSIZE);
180+
173181
/* PREPARE itself */
174-
PrepareTransactionBlock(gid);
182+
PrepareTransactionBlock(local_gid);
175183
CommitTransactionCommand();
176184
break;
177185
}
178186
case PGLOGICAL_COMMIT_PREPARED:
179187
{
180188
StartTransactionCommand();
181-
FinishPreparedTransaction(gid, true);
189+
190+
mynode = get_local_node(false);
191+
local_gid = palloc0(GIDSIZE);
192+
strncpy(local_gid, mynode->node->name, GIDSIZE);
193+
strncat(local_gid, ":", GIDSIZE);
194+
strncat(local_gid, gid, GIDSIZE);
195+
196+
FinishPreparedTransaction(local_gid, true);
182197
CommitTransactionCommand();
183198

184199
/* There were no BEGIN stmt for COMMIT PREPARED */
185200
replorigin_session_origin_timestamp = commit_time;
186201
replorigin_session_origin_lsn = commit_lsn;
202+
187203
break;
188204
}
189205
case PGLOGICAL_ABORT_PREPARED:
190206
{
191207
StartTransactionCommand();
192-
FinishPreparedTransaction(gid, false);
208+
209+
mynode = get_local_node(false);
210+
local_gid = palloc(GIDSIZE);
211+
strncpy(local_gid, mynode->node->name, GIDSIZE);
212+
strncat(local_gid, ":", GIDSIZE);
213+
strncat(local_gid, gid, GIDSIZE);
214+
215+
FinishPreparedTransaction(local_gid, false);
193216
CommitTransactionCommand();
194217

195218
/* There were no BEGIN stmt for ROLLBACK PREPARED */
196219
replorigin_session_origin_timestamp = commit_time;
197220
replorigin_session_origin_lsn = commit_lsn;
221+
198222
break;
199223
}
200224
default:

contrib/pglogical/sql/twophase.sql

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@ SELECT * FROM pglogical_regress_variables()
44

55
\c :provider_dsn
66
SELECT pglogical.replicate_ddl_command($$
7-
CREATE TABLE public.test2pc_tbl(id serial primary key, value int);
7+
CREATE TABLE public.test2pc_tbl(id serial primary key, value int);
88
$$);
9-
SELECT * FROM pglogical.replication_set_add_all_tables('default', '{public}');
10-
9+
-- SELECT * FROM pglogical.replication_set_add_all_tables('default', '{public}');
10+
SELECT * FROM pglogical.replication_set_add_table('default', 'test2pc_tbl');
11+
SELECT pg_xlog_wait_remote_apply(pg_current_xlog_location(), 0);
1112

1213
-- Check that prapeared state is visible on slave and data available after commit
1314
BEGIN;

reinit.sh

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -75,49 +75,75 @@ cd ../..
7575

7676
pkill -9 postgres
7777
reinit_master
78-
# reinit_master2
78+
reinit_master2
7979

80-
# ./install/bin/psql <<SQL
81-
# CREATE EXTENSION pglogical;
82-
# SELECT pglogical.create_node(
83-
# node_name := 'provider1',
84-
# dsn := 'port=5432 dbname=stas'
85-
# );
86-
# SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
87-
# SQL
88-
89-
# ./install/bin/psql -p 5433 <<SQL
90-
# CREATE EXTENSION pglogical;
91-
# SELECT pglogical.create_node(
92-
# node_name := 'subscriber1',
93-
# dsn := 'port=5433 dbname=stas'
94-
# );
95-
# SELECT pglogical.create_subscription(
96-
# subscription_name := 'subscription1',
97-
# provider_dsn := 'port=5432 dbname=stas'
98-
# );
99-
# SQL
80+
./install/bin/psql <<SQL
81+
CREATE EXTENSION pglogical;
82+
SELECT pglogical.create_node(
83+
node_name := 'provider1',
84+
dsn := 'port=5432 dbname=stas'
85+
);
86+
SELECT pglogical.replication_set_add_all_tables('default', ARRAY['public']);
87+
SQL
10088

101-
./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
89+
./install/bin/psql -p 5433 <<SQL
90+
CREATE EXTENSION pglogical;
91+
SELECT pglogical.create_node(
92+
node_name := 'subscriber1',
93+
dsn := 'port=5433 dbname=stas'
94+
);
95+
SELECT pglogical.create_subscription(
96+
subscription_name := 'subscription1',
97+
provider_dsn := 'port=5432 dbname=stas'
98+
);
99+
SQL
102100

103101
./install/bin/psql <<SQL
104-
begin;
105-
insert into t values (42);
106-
prepare transaction 'hellyeah';
107-
rollback prepared 'hellyeah';
102+
select pg_sleep(2);
103+
begin;
104+
insert into t values (42);
105+
prepare transaction 'hellyeah';
106+
select * from pg_current_xlog_location();
107+
select pg_stat_get_wal_senders();
108+
select * from pg_current_xlog_location();
109+
--select pg_sleep(3);
110+
SQL
111+
112+
./install/bin/psql -p 5433 <<SQL
113+
select * from pg_prepared_xacts;
114+
--select * from t;
108115
SQL
109116

110117
./install/bin/psql <<SQL
111-
SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
112-
NULL, NULL,
113-
'expected_encoding', 'UTF8',
114-
'min_proto_version', '1',
115-
'max_proto_version', '1',
116-
'startup_params_format', '1',
117-
'proto_format', 'json',
118-
'no_txinfo', 't');
118+
commit prepared 'hellyeah';
119+
select pg_sleep(1);
119120
SQL
120121

122+
./install/bin/psql -p 5433 <<SQL
123+
select * from pg_prepared_xacts;
124+
select * from t;
125+
SQL
126+
127+
# ./install/bin/psql -c "SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'pglogical_output');"
128+
129+
# ./install/bin/psql <<SQL
130+
# --begin;
131+
# insert into t values (42);
132+
# --prepare transaction 'hellyeah';
133+
# --commit prepared 'hellyeah';
134+
# SQL
135+
136+
# ./install/bin/psql <<SQL
137+
# SELECT * FROM pg_logical_slot_peek_changes('regression_slot',
138+
# NULL, NULL,
139+
# 'expected_encoding', 'UTF8',
140+
# 'min_proto_version', '1',
141+
# 'max_proto_version', '1',
142+
# 'startup_params_format', '1',
143+
# 'proto_format', 'json',
144+
# 'no_txinfo', 't');
145+
# SQL
146+
121147

122148

123149

src/backend/access/rmgrdesc/xactdesc.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,16 @@ ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
141141
data += sizeof(xl_xact_xinfo);
142142
}
143143

144+
if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
145+
{
146+
xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
147+
148+
parsed->dbId = xl_dbinfo->dbId;
149+
parsed->tsId = xl_dbinfo->tsId;
150+
151+
data += sizeof(xl_xact_dbinfo);
152+
}
153+
144154
if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
145155
{
146156
xl_xact_subxacts *xl_subxacts = (xl_xact_subxacts *) data;

src/backend/access/transam/xact.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5231,6 +5231,7 @@ XactLogAbortRecord(TimestampTz abort_time,
52315231
xl_xact_subxacts xl_subxacts;
52325232
xl_xact_relfilenodes xl_relfilenodes;
52335233
xl_xact_twophase xl_twophase;
5234+
xl_xact_dbinfo xl_dbinfo;
52345235

52355236
uint8 info;
52365237

@@ -5268,6 +5269,13 @@ XactLogAbortRecord(TimestampTz abort_time,
52685269
xl_twophase.gidlen = strlen(twophase_gid) + 1; /* Include '\0' */
52695270
}
52705271

5272+
if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
5273+
{
5274+
xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
5275+
xl_dbinfo.dbId = MyDatabaseId;
5276+
xl_dbinfo.tsId = MyDatabaseTableSpace;
5277+
}
5278+
52715279
if (xl_xinfo.xinfo != 0)
52725280
info |= XLOG_XACT_HAS_INFO;
52735281

@@ -5280,6 +5288,9 @@ XactLogAbortRecord(TimestampTz abort_time,
52805288
if (xl_xinfo.xinfo != 0)
52815289
XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
52825290

5291+
if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
5292+
XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
5293+
52835294
if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
52845295
{
52855296
XLogRegisterData((char *) (&xl_subxacts),
@@ -5300,7 +5311,8 @@ XactLogAbortRecord(TimestampTz abort_time,
53005311
{
53015312
XLogRegisterData((char *) (&xl_twophase), MinSizeOfXactTwophase);
53025313
XLogRegisterData((char *) twophase_gid, xl_twophase.gidlen);
5303-
}
5314+
} if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
5315+
XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
53045316

53055317
return XLogInsert(RM_XACT_ID, info);
53065318
}

src/backend/replication/logical/decode.c

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -461,16 +461,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
461461
commit_time = parsed->origin_timestamp;
462462
}
463463

464-
/*
465-
* If that is COMMIT PREPARED than send that to callbacks.
466-
*/
467-
if (TransactionIdIsValid(parsed->twophase_xid)) {
468-
strcpy(ctx->reorder->gid, parsed->twophase_gid);
469-
ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
470-
commit_time, origin_id, origin_lsn);
471-
return;
472-
}
473-
474464
/*
475465
* Process invalidation messages, even if we're not interested in the
476466
* transaction's contents, since the various caches need to always be
@@ -531,9 +521,19 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
531521
buf->origptr, buf->endptr);
532522
}
533523

534-
/* replay actions of all transaction + subtransactions in order */
535-
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
536-
commit_time, origin_id, origin_lsn);
524+
if (TransactionIdIsValid(parsed->twophase_xid)) {
525+
/*
526+
* We are processing COMMIT PREPARED and know that reorder buffer is
527+
* empty. So we can skip use shortcut for coomiting bare xact.
528+
*/
529+
strcpy(ctx->reorder->gid, parsed->twophase_gid);
530+
ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
531+
commit_time, origin_id, origin_lsn);
532+
} else {
533+
/* replay actions of all transaction + subtransactions in order */
534+
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
535+
commit_time, origin_id, origin_lsn);
536+
}
537537
}
538538

539539
static void
@@ -603,8 +603,18 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
603603
/*
604604
* If that is ROLLBACK PREPARED than send that to callbacks.
605605
*/
606+
606607
if (TransactionIdIsValid(parsed->twophase_xid)) {
608+
fprintf(stderr, "(!) db_id=%d, gid=%s\n", parsed->dbId, parsed->twophase_gid);
609+
}
610+
611+
if (TransactionIdIsValid(parsed->twophase_xid)
612+
&& (parsed->dbId == ctx->slot->data.database)) {
613+
607614
strcpy(ctx->reorder->gid, parsed->twophase_gid);
615+
616+
fprintf(stderr, "(!) aborting: db_id=%d, gid=%s\n", parsed->dbId, parsed->twophase_gid);
617+
608618
ReorderBufferCommitBareXact(ctx->reorder, xid, buf->origptr, buf->endptr,
609619
commit_time, origin_id, origin_lsn);
610620
return;

src/include/access/xact.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,9 @@ typedef struct xl_xact_parsed_prepare
314314

315315
typedef struct xl_xact_parsed_abort
316316
{
317+
Oid dbId;
318+
Oid tsId;
319+
317320
TimestampTz xact_time;
318321
uint32 xinfo;
319322

0 commit comments

Comments
 (0)