34
34
#include "utils/syscache.h"
35
35
#include "utils/typcache.h"
36
36
37
+ #include "access/xact.h"
38
+ #include "miscadmin.h"
39
+ #include "executor/executor.h"
40
+ #include "nodes/nodes.h"
41
+ #include "postmaster/autovacuum.h"
42
+ #include "replication/walsender.h"
43
+ #include "storage/latch.h"
44
+ #include "storage/proc.h"
45
+ #include "storage/ipc.h"
46
+ #include "pgstat.h"
47
+ #include "tcop/utility.h"
48
+
37
49
PG_MODULE_MAGIC ;
38
50
39
51
/* These must be available to pg_dlsym() */
@@ -85,11 +97,211 @@ static void pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx,
85
97
ReorderBufferTXN * txn ,
86
98
XLogRecPtr abort_lsn );
87
99
100
+ static void test_decoding_xact_callback (XactEvent event , void * arg );
101
+
102
+ static void test_decoding_process_utility (PlannedStmt * pstmt ,
103
+ const char * queryString , ProcessUtilityContext context ,
104
+ ParamListInfo params , DestReceiver * dest , char * completionTag );
105
+
106
+ static bool test_decoding_twophase_commit ();
107
+
108
+ static void test_decoding_executor_finish (QueryDesc * queryDesc );
109
+
110
+ static ProcessUtility_hook_type PreviousProcessUtilityHook ;
111
+
112
+ static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
113
+
114
+ static bool CurrentTxContainsDML ;
115
+ static bool CurrentTxContainsDDL ;
116
+ static bool CurrentTxNonpreparable ;
88
117
89
118
void
90
119
_PG_init (void )
91
120
{
92
- /* other plugins can perform things here */
121
+ PreviousExecutorFinishHook = ExecutorFinish_hook ;
122
+ ExecutorFinish_hook = test_decoding_executor_finish ;
123
+
124
+ PreviousProcessUtilityHook = ProcessUtility_hook ;
125
+ ProcessUtility_hook = test_decoding_process_utility ;
126
+
127
+ if (!IsUnderPostmaster )
128
+ RegisterXactCallback (test_decoding_xact_callback , NULL );
129
+ }
130
+
131
+
132
+ /* ability to hook into sigle-statement transaction */
133
+ static void
134
+ test_decoding_xact_callback (XactEvent event , void * arg )
135
+ {
136
+ switch (event )
137
+ {
138
+ case XACT_EVENT_START :
139
+ case XACT_EVENT_ABORT :
140
+ CurrentTxContainsDML = false;
141
+ CurrentTxContainsDDL = false;
142
+ CurrentTxNonpreparable = false;
143
+ break ;
144
+ case XACT_EVENT_COMMIT_COMMAND :
145
+ if (!IsTransactionBlock ())
146
+ test_decoding_twophase_commit ();
147
+ break ;
148
+ default :
149
+ break ;
150
+ }
151
+ }
152
+
153
+ /* find out whether transaction had wrote any data or not */
154
+ static void
155
+ test_decoding_executor_finish (QueryDesc * queryDesc )
156
+ {
157
+ CmdType operation = queryDesc -> operation ;
158
+ EState * estate = queryDesc -> estate ;
159
+ if (estate -> es_processed != 0 &&
160
+ (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE ))
161
+ {
162
+ int i ;
163
+ for (i = 0 ; i < estate -> es_num_result_relations ; i ++ )
164
+ {
165
+ Relation rel = estate -> es_result_relations [i ].ri_RelationDesc ;
166
+ if (RelationNeedsWAL (rel )) {
167
+ CurrentTxContainsDML = true;
168
+ break ;
169
+ }
170
+ }
171
+ }
172
+
173
+ if (PreviousExecutorFinishHook != NULL )
174
+ PreviousExecutorFinishHook (queryDesc );
175
+ else
176
+ standard_ExecutorFinish (queryDesc );
177
+ }
178
+
179
+
180
+ /*
181
+ * Several things here:
182
+ * 1) hook into commit of transaction block
183
+ * 2) write logical message for DDL (default path)
184
+ * 3) prevent 2pc hook for tx that can not be prepared and
185
+ * send them as logical nontransactional message.
186
+ */
187
+ static void
188
+ test_decoding_process_utility (PlannedStmt * pstmt ,
189
+ const char * queryString , ProcessUtilityContext context ,
190
+ ParamListInfo params , DestReceiver * dest , char * completionTag )
191
+ {
192
+ Node * parsetree = pstmt -> utilityStmt ;
193
+ switch (nodeTag (parsetree ))
194
+ {
195
+ case T_TransactionStmt :
196
+ {
197
+ TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
198
+ switch (stmt -> kind )
199
+ {
200
+ case TRANS_STMT_BEGIN :
201
+ case TRANS_STMT_START :
202
+ break ;
203
+ case TRANS_STMT_COMMIT :
204
+ if (test_decoding_twophase_commit ())
205
+ return ; /* do not proceed */
206
+ break ;
207
+ case TRANS_STMT_PREPARE :
208
+ case TRANS_STMT_COMMIT_PREPARED :
209
+ case TRANS_STMT_ROLLBACK_PREPARED :
210
+ break ;
211
+ default :
212
+ break ;
213
+ }
214
+ }
215
+ case T_ReindexStmt :
216
+ {
217
+ ReindexStmt * stmt = (ReindexStmt * ) parsetree ;
218
+ switch (stmt -> kind )
219
+ {
220
+ case REINDEX_OBJECT_SCHEMA :
221
+ case REINDEX_OBJECT_SYSTEM :
222
+ case REINDEX_OBJECT_DATABASE :
223
+ LogLogicalMessage ("C" , queryString , strlen (queryString ) + 1 , false);
224
+ CurrentTxNonpreparable = true;
225
+ default :
226
+ break ;
227
+ }
228
+ }
229
+ break ;
230
+ case T_IndexStmt :
231
+ {
232
+ IndexStmt * indexStmt = (IndexStmt * ) parsetree ;
233
+ if (indexStmt -> concurrent )
234
+ {
235
+ LogLogicalMessage ("C" , queryString , strlen (queryString ) + 1 , false);
236
+ CurrentTxNonpreparable = true;
237
+ }
238
+ }
239
+ break ;
240
+ default :
241
+ LogLogicalMessage ("D" , queryString , strlen (queryString ) + 1 , true);
242
+ CurrentTxContainsDDL = true;
243
+ break ;
244
+ }
245
+
246
+ if (PreviousProcessUtilityHook != NULL )
247
+ {
248
+ PreviousProcessUtilityHook (pstmt , queryString , context ,
249
+ params , dest , completionTag );
250
+ }
251
+ else
252
+ {
253
+ standard_ProcessUtility (pstmt , queryString , context ,
254
+ params , dest , completionTag );
255
+ }
256
+ }
257
+
258
+ /*
259
+ * Change commit to prepare and wait on latch.
260
+ * WalSender will unlock us after decoding and we can proceed.
261
+ */
262
+ static bool
263
+ test_decoding_twophase_commit ()
264
+ {
265
+ int result = 0 ;
266
+ char gid [20 ];
267
+
268
+ if (IsAutoVacuumLauncherProcess () ||
269
+ !IsNormalProcessingMode () ||
270
+ am_walsender ||
271
+ IsBackgroundWorker ||
272
+ IsAutoVacuumWorkerProcess () ||
273
+ IsAbortedTransactionBlockState () ||
274
+ !(CurrentTxContainsDML || CurrentTxContainsDDL ) ||
275
+ CurrentTxNonpreparable )
276
+ return false;
277
+
278
+ snprintf (gid , sizeof (gid ), "test_decoding:%d" , MyProc -> pgprocno );
279
+
280
+ if (!IsTransactionBlock ())
281
+ {
282
+ BeginTransactionBlock ();
283
+ CommitTransactionCommand ();
284
+ StartTransactionCommand ();
285
+ }
286
+ if (!PrepareTransactionBlock (gid ))
287
+ {
288
+ fprintf (stderr , "Can't prepare transaction '%s'\n" , gid );
289
+ }
290
+ CommitTransactionCommand ();
291
+
292
+ result = WaitLatch (& MyProc -> procLatch , WL_LATCH_SET | WL_POSTMASTER_DEATH , 0 ,
293
+ WAIT_EVENT_REPLICATION_SLOT_SYNC );
294
+
295
+ if (result & WL_POSTMASTER_DEATH )
296
+ proc_exit (1 );
297
+
298
+ if (result & WL_LATCH_SET )
299
+ ResetLatch (& MyProc -> procLatch );
300
+
301
+
302
+ StartTransactionCommand ();
303
+ FinishPreparedTransaction (gid , true);
304
+ return true;
93
305
}
94
306
95
307
/* specify output plugin callbacks */
@@ -297,74 +509,11 @@ static bool
297
509
pg_filter_prepare (LogicalDecodingContext * ctx , ReorderBufferTXN * txn ,
298
510
char * gid )
299
511
{
300
- TestDecodingData * data = ctx -> output_plugin_private ;
301
-
302
- /* treat all transaction as one-phase */
303
- if (! data -> twophase_decoding )
512
+ /* decode only tx that are prepared by our hook */
513
+ if ( strncmp ( gid , "test_decoding:" , 14 ) == 0 )
514
+ return false;
515
+ else
304
516
return true;
305
-
306
- /*
307
- * Two-phase transactions that accessed catalog require special
308
- * treatment.
309
- *
310
- * Right now we don't have a safe way to decode catalog changes made in
311
- * prepared transaction that was already aborted by the time of
312
- * decoding.
313
- *
314
- * That kind of problem arises only when we are trying to
315
- * retrospectively decode aborted transactions with catalog changes -
316
- * including if a transaction aborts while we're decoding it. If one
317
- * wants to code distributed commit based on prepare decoding then
318
- * commits/aborts will happend strictly after decoding will be
319
- * completed, so it is possible to skip any checks/locks here.
320
- *
321
- * We'll also get stuck trying to acquire locks on catalog relations
322
- * we need for decoding if the prepared xact holds a strong lock on
323
- * one of them and we also need to decode row changes.
324
- */
325
- if (txn -> has_catalog_changes )
326
- {
327
- LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
328
-
329
- if (TransactionIdIsInProgress (txn -> xid ))
330
- {
331
- /*
332
- * For the sake of simplicity, by default we just
333
- * ignore in-progess prepared transactions with catalog
334
- * changes in this extension. If they abort during
335
- * decoding then tuples we need to decode them may be
336
- * overwritten while we're still decoding, causing
337
- * wrong catalog lookups.
338
- *
339
- * It is possible to move that LWLockRelease() to
340
- * pg_decode_prepare_txn() and allow decoding of
341
- * running prepared tx, but such lock will prevent any
342
- * 2pc transaction commit during decoding time. That
343
- * can be a long time in case of lots of
344
- * changes/inserts in that tx or if the downstream is
345
- * slow/unresonsive.
346
- *
347
- * (Continuing to decode without the lock is unsafe, XXX)
348
- */
349
- LWLockRelease (TwoPhaseStateLock );
350
- return !data -> twophase_decode_with_catalog_changes ;
351
- }
352
- else if (TransactionIdDidAbort (txn -> xid ))
353
- {
354
- /*
355
- * Here we know that it is already aborted and there is
356
- * not much sense in doing something with this
357
- * transaction. Consequently ABORT PREPARED will be
358
- * suppressed.
359
- */
360
- LWLockRelease (TwoPhaseStateLock );
361
- return true;
362
- }
363
-
364
- LWLockRelease (TwoPhaseStateLock );
365
- }
366
-
367
- return false;
368
517
}
369
518
370
519
@@ -374,9 +523,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
374
523
XLogRecPtr prepare_lsn )
375
524
{
376
525
TestDecodingData * data = ctx -> output_plugin_private ;
526
+ int backend_procno ;
377
527
378
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
379
- return ;
528
+ // if (data->skip_empty_xacts && !data->xact_wrote_changes)
529
+ // return;
380
530
381
531
OutputPluginPrepareWrite (ctx , true);
382
532
@@ -391,6 +541,10 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
391
541
timestamptz_to_str (txn -> commit_time ));
392
542
393
543
OutputPluginWrite (ctx , true);
544
+
545
+ /* Unlock backend */
546
+ sscanf (txn -> gid , "test_decoding:%d" , & backend_procno );
547
+ SetLatch (& ProcGlobal -> allProcs [backend_procno ].procLatch );
394
548
}
395
549
396
550
/* COMMIT PREPARED callback */
@@ -400,8 +554,8 @@ pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn
400
554
{
401
555
TestDecodingData * data = ctx -> output_plugin_private ;
402
556
403
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
404
- return ;
557
+ // if (data->skip_empty_xacts && !data->xact_wrote_changes)
558
+ // return;
405
559
406
560
OutputPluginPrepareWrite (ctx , true);
407
561
@@ -425,8 +579,8 @@ pg_decode_abort_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
425
579
{
426
580
TestDecodingData * data = ctx -> output_plugin_private ;
427
581
428
- if (data -> skip_empty_xacts && !data -> xact_wrote_changes )
429
- return ;
582
+ // if (data->skip_empty_xacts && !data->xact_wrote_changes)
583
+ // return;
430
584
431
585
OutputPluginPrepareWrite (ctx , true);
432
586
0 commit comments