Skip to content

Commit 2674780

Browse files
committed
implicit 2pc in test_decoding
1 parent 258bbce commit 2674780

File tree

1 file changed

+228
-74
lines changed

1 file changed

+228
-74
lines changed

contrib/test_decoding/test_decoding.c

Lines changed: 228 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,18 @@
3434
#include "utils/syscache.h"
3535
#include "utils/typcache.h"
3636

37+
#include "access/xact.h"
38+
#include "miscadmin.h"
39+
#include "executor/executor.h"
40+
#include "nodes/nodes.h"
41+
#include "postmaster/autovacuum.h"
42+
#include "replication/walsender.h"
43+
#include "storage/latch.h"
44+
#include "storage/proc.h"
45+
#include "storage/ipc.h"
46+
#include "pgstat.h"
47+
#include "tcop/utility.h"
48+
3749
PG_MODULE_MAGIC;
3850

3951
/* These must be available to pg_dlsym() */
@@ -85,11 +97,211 @@ static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
8597
ReorderBufferTXN *txn,
8698
XLogRecPtr abort_lsn);
8799

100+
static void test_decoding_xact_callback(XactEvent event, void *arg);
101+
102+
static void test_decoding_process_utility(PlannedStmt *pstmt,
103+
const char *queryString, ProcessUtilityContext context,
104+
ParamListInfo params, DestReceiver *dest, char *completionTag);
105+
106+
static bool test_decoding_twophase_commit();
107+
108+
static void test_decoding_executor_finish(QueryDesc *queryDesc);
109+
110+
static ProcessUtility_hook_type PreviousProcessUtilityHook;
111+
112+
static ExecutorFinish_hook_type PreviousExecutorFinishHook;
113+
114+
static bool CurrentTxContainsDML;
115+
static bool CurrentTxContainsDDL;
116+
static bool CurrentTxNonpreparable;
88117

89118
void
90119
_PG_init(void)
91120
{
92-
/* other plugins can perform things here */
121+
PreviousExecutorFinishHook = ExecutorFinish_hook;
122+
ExecutorFinish_hook = test_decoding_executor_finish;
123+
124+
PreviousProcessUtilityHook = ProcessUtility_hook;
125+
ProcessUtility_hook = test_decoding_process_utility;
126+
127+
if (!IsUnderPostmaster)
128+
RegisterXactCallback(test_decoding_xact_callback, NULL);
129+
}
130+
131+
132+
/* ability to hook into sigle-statement transaction */
133+
static void
134+
test_decoding_xact_callback(XactEvent event, void *arg)
135+
{
136+
switch (event)
137+
{
138+
case XACT_EVENT_START:
139+
case XACT_EVENT_ABORT:
140+
CurrentTxContainsDML = false;
141+
CurrentTxContainsDDL = false;
142+
CurrentTxNonpreparable = false;
143+
break;
144+
case XACT_EVENT_COMMIT_COMMAND:
145+
if (!IsTransactionBlock())
146+
test_decoding_twophase_commit();
147+
break;
148+
default:
149+
break;
150+
}
151+
}
152+
153+
/* find out whether transaction had wrote any data or not */
154+
static void
155+
test_decoding_executor_finish(QueryDesc *queryDesc)
156+
{
157+
CmdType operation = queryDesc->operation;
158+
EState *estate = queryDesc->estate;
159+
if (estate->es_processed != 0 &&
160+
(operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE))
161+
{
162+
int i;
163+
for (i = 0; i < estate->es_num_result_relations; i++)
164+
{
165+
Relation rel = estate->es_result_relations[i].ri_RelationDesc;
166+
if (RelationNeedsWAL(rel)) {
167+
CurrentTxContainsDML = true;
168+
break;
169+
}
170+
}
171+
}
172+
173+
if (PreviousExecutorFinishHook != NULL)
174+
PreviousExecutorFinishHook(queryDesc);
175+
else
176+
standard_ExecutorFinish(queryDesc);
177+
}
178+
179+
180+
/*
181+
* Several things here:
182+
* 1) hook into commit of transaction block
183+
* 2) write logical message for DDL (default path)
184+
* 3) prevent 2pc hook for tx that can not be prepared and
185+
* send them as logical nontransactional message.
186+
*/
187+
static void
188+
test_decoding_process_utility(PlannedStmt *pstmt,
189+
const char *queryString, ProcessUtilityContext context,
190+
ParamListInfo params, DestReceiver *dest, char *completionTag)
191+
{
192+
Node *parsetree = pstmt->utilityStmt;
193+
switch (nodeTag(parsetree))
194+
{
195+
case T_TransactionStmt:
196+
{
197+
TransactionStmt *stmt = (TransactionStmt *) parsetree;
198+
switch (stmt->kind)
199+
{
200+
case TRANS_STMT_BEGIN:
201+
case TRANS_STMT_START:
202+
break;
203+
case TRANS_STMT_COMMIT:
204+
if (test_decoding_twophase_commit())
205+
return; /* do not proceed */
206+
break;
207+
case TRANS_STMT_PREPARE:
208+
case TRANS_STMT_COMMIT_PREPARED:
209+
case TRANS_STMT_ROLLBACK_PREPARED:
210+
break;
211+
default:
212+
break;
213+
}
214+
}
215+
case T_ReindexStmt:
216+
{
217+
ReindexStmt *stmt = (ReindexStmt *) parsetree;
218+
switch (stmt->kind)
219+
{
220+
case REINDEX_OBJECT_SCHEMA:
221+
case REINDEX_OBJECT_SYSTEM:
222+
case REINDEX_OBJECT_DATABASE:
223+
LogLogicalMessage("C", queryString, strlen(queryString) + 1, false);
224+
CurrentTxNonpreparable = true;
225+
default:
226+
break;
227+
}
228+
}
229+
break;
230+
case T_IndexStmt:
231+
{
232+
IndexStmt *indexStmt = (IndexStmt *) parsetree;
233+
if (indexStmt->concurrent)
234+
{
235+
LogLogicalMessage("C", queryString, strlen(queryString) + 1, false);
236+
CurrentTxNonpreparable = true;
237+
}
238+
}
239+
break;
240+
default:
241+
LogLogicalMessage("D", queryString, strlen(queryString) + 1, true);
242+
CurrentTxContainsDDL = true;
243+
break;
244+
}
245+
246+
if (PreviousProcessUtilityHook != NULL)
247+
{
248+
PreviousProcessUtilityHook(pstmt, queryString, context,
249+
params, dest, completionTag);
250+
}
251+
else
252+
{
253+
standard_ProcessUtility(pstmt, queryString, context,
254+
params, dest, completionTag);
255+
}
256+
}
257+
258+
/*
259+
* Change commit to prepare and wait on latch.
260+
* WalSender will unlock us after decoding and we can proceed.
261+
*/
262+
static bool
263+
test_decoding_twophase_commit()
264+
{
265+
int result = 0;
266+
char gid[20];
267+
268+
if (IsAutoVacuumLauncherProcess() ||
269+
!IsNormalProcessingMode() ||
270+
am_walsender ||
271+
IsBackgroundWorker ||
272+
IsAutoVacuumWorkerProcess() ||
273+
IsAbortedTransactionBlockState() ||
274+
!(CurrentTxContainsDML || CurrentTxContainsDDL) ||
275+
CurrentTxNonpreparable )
276+
return false;
277+
278+
snprintf(gid, sizeof(gid), "test_decoding:%d", MyProc->pgprocno);
279+
280+
if (!IsTransactionBlock())
281+
{
282+
BeginTransactionBlock();
283+
CommitTransactionCommand();
284+
StartTransactionCommand();
285+
}
286+
if (!PrepareTransactionBlock(gid))
287+
{
288+
fprintf(stderr, "Can't prepare transaction '%s'\n", gid);
289+
}
290+
CommitTransactionCommand();
291+
292+
result = WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0,
293+
WAIT_EVENT_REPLICATION_SLOT_SYNC);
294+
295+
if (result & WL_POSTMASTER_DEATH)
296+
proc_exit(1);
297+
298+
if (result & WL_LATCH_SET)
299+
ResetLatch(&MyProc->procLatch);
300+
301+
302+
StartTransactionCommand();
303+
FinishPreparedTransaction(gid, true);
304+
return true;
93305
}
94306

95307
/* specify output plugin callbacks */
@@ -297,74 +509,11 @@ static bool
297509
pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
298510
char *gid)
299511
{
300-
TestDecodingData *data = ctx->output_plugin_private;
301-
302-
/* treat all transaction as one-phase */
303-
if (!data->twophase_decoding)
512+
/* decode only tx that are prepared by our hook */
513+
if (strncmp(gid, "test_decoding:", 14) == 0)
514+
return false;
515+
else
304516
return true;
305-
306-
/*
307-
* Two-phase transactions that accessed catalog require special
308-
* treatment.
309-
*
310-
* Right now we don't have a safe way to decode catalog changes made in
311-
* prepared transaction that was already aborted by the time of
312-
* decoding.
313-
*
314-
* That kind of problem arises only when we are trying to
315-
* retrospectively decode aborted transactions with catalog changes -
316-
* including if a transaction aborts while we're decoding it. If one
317-
* wants to code distributed commit based on prepare decoding then
318-
* commits/aborts will happend strictly after decoding will be
319-
* completed, so it is possible to skip any checks/locks here.
320-
*
321-
* We'll also get stuck trying to acquire locks on catalog relations
322-
* we need for decoding if the prepared xact holds a strong lock on
323-
* one of them and we also need to decode row changes.
324-
*/
325-
if (txn->has_catalog_changes)
326-
{
327-
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
328-
329-
if (TransactionIdIsInProgress(txn->xid))
330-
{
331-
/*
332-
* For the sake of simplicity, by default we just
333-
* ignore in-progess prepared transactions with catalog
334-
* changes in this extension. If they abort during
335-
* decoding then tuples we need to decode them may be
336-
* overwritten while we're still decoding, causing
337-
* wrong catalog lookups.
338-
*
339-
* It is possible to move that LWLockRelease() to
340-
* pg_decode_prepare_txn() and allow decoding of
341-
* running prepared tx, but such lock will prevent any
342-
* 2pc transaction commit during decoding time. That
343-
* can be a long time in case of lots of
344-
* changes/inserts in that tx or if the downstream is
345-
* slow/unresonsive.
346-
*
347-
* (Continuing to decode without the lock is unsafe, XXX)
348-
*/
349-
LWLockRelease(TwoPhaseStateLock);
350-
return !data->twophase_decode_with_catalog_changes;
351-
}
352-
else if (TransactionIdDidAbort(txn->xid))
353-
{
354-
/*
355-
* Here we know that it is already aborted and there is
356-
* not much sense in doing something with this
357-
* transaction. Consequently ABORT PREPARED will be
358-
* suppressed.
359-
*/
360-
LWLockRelease(TwoPhaseStateLock);
361-
return true;
362-
}
363-
364-
LWLockRelease(TwoPhaseStateLock);
365-
}
366-
367-
return false;
368517
}
369518

370519

@@ -374,9 +523,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
374523
XLogRecPtr prepare_lsn)
375524
{
376525
TestDecodingData *data = ctx->output_plugin_private;
526+
int backend_procno;
377527

378-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
379-
return;
528+
// if (data->skip_empty_xacts && !data->xact_wrote_changes)
529+
// return;
380530

381531
OutputPluginPrepareWrite(ctx, true);
382532

@@ -391,6 +541,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
391541
timestamptz_to_str(txn->commit_time));
392542

393543
OutputPluginWrite(ctx, true);
544+
545+
/* Unlock backend */
546+
sscanf(txn->gid, "test_decoding:%d", &backend_procno);
547+
SetLatch(&ProcGlobal->allProcs[backend_procno].procLatch);
394548
}
395549

396550
/* COMMIT PREPARED callback */
@@ -400,8 +554,8 @@ pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
400554
{
401555
TestDecodingData *data = ctx->output_plugin_private;
402556

403-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
404-
return;
557+
// if (data->skip_empty_xacts && !data->xact_wrote_changes)
558+
// return;
405559

406560
OutputPluginPrepareWrite(ctx, true);
407561

@@ -425,8 +579,8 @@ pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
425579
{
426580
TestDecodingData *data = ctx->output_plugin_private;
427581

428-
if (data->skip_empty_xacts && !data->xact_wrote_changes)
429-
return;
582+
// if (data->skip_empty_xacts && !data->xact_wrote_changes)
583+
// return;
430584

431585
OutputPluginPrepareWrite(ctx, true);
432586

0 commit comments

Comments
 (0)