45
45
#include "storage/ipc.h"
46
46
#include "pgstat.h"
47
47
#include "tcop/utility.h"
48
+ #include "commands/portalcmds.h"
48
49
49
50
PG_MODULE_MAGIC ;
50
51
@@ -190,28 +191,32 @@ test_decoding_process_utility(PlannedStmt *pstmt,
190
191
ParamListInfo params , DestReceiver * dest , char * completionTag )
191
192
{
192
193
Node * parsetree = pstmt -> utilityStmt ;
194
+
193
195
switch (nodeTag (parsetree ))
194
196
{
195
197
case T_TransactionStmt :
196
- {
197
- TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
198
- switch (stmt -> kind )
199
198
{
200
- case TRANS_STMT_BEGIN :
201
- case TRANS_STMT_START :
202
- break ;
203
- case TRANS_STMT_COMMIT :
204
- if (test_decoding_twophase_commit ())
205
- return ; /* do not proceed */
206
- break ;
207
- case TRANS_STMT_PREPARE :
208
- case TRANS_STMT_COMMIT_PREPARED :
209
- case TRANS_STMT_ROLLBACK_PREPARED :
210
- break ;
211
- default :
212
- break ;
199
+ TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
200
+ switch (stmt -> kind )
201
+ {
202
+ case TRANS_STMT_COMMIT :
203
+ if (test_decoding_twophase_commit ())
204
+ return ; /* do not proceed */
205
+ break ;
206
+ default :
207
+ break ;
208
+ }
213
209
}
214
- }
210
+ break ;
211
+
212
+ /* cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY */
213
+ case T_NotifyStmt :
214
+ case T_ListenStmt :
215
+ case T_UnlistenStmt :
216
+ CurrentTxNonpreparable = true;
217
+ break ;
218
+
219
+ /* create/reindex/drop concurrently can not be execuled in prepared tx */
215
220
case T_ReindexStmt :
216
221
{
217
222
ReindexStmt * stmt = (ReindexStmt * ) parsetree ;
@@ -220,7 +225,6 @@ test_decoding_process_utility(PlannedStmt *pstmt,
220
225
case REINDEX_OBJECT_SCHEMA :
221
226
case REINDEX_OBJECT_SYSTEM :
222
227
case REINDEX_OBJECT_DATABASE :
223
- LogLogicalMessage ("C" , queryString , strlen (queryString ) + 1 , false);
224
228
CurrentTxNonpreparable = true;
225
229
default :
226
230
break ;
@@ -231,18 +235,36 @@ test_decoding_process_utility(PlannedStmt *pstmt,
231
235
{
232
236
IndexStmt * indexStmt = (IndexStmt * ) parsetree ;
233
237
if (indexStmt -> concurrent )
234
- {
235
- LogLogicalMessage ("C" , queryString , strlen (queryString ) + 1 , false);
236
238
CurrentTxNonpreparable = true;
237
- }
238
239
}
239
240
break ;
241
+ case T_DropStmt :
242
+ {
243
+ DropStmt * stmt = (DropStmt * ) parsetree ;
244
+ if (stmt -> removeType == OBJECT_INDEX && stmt -> concurrent )
245
+ CurrentTxNonpreparable = true;
246
+ }
247
+ break ;
248
+
249
+ /* cannot PREPARE a transaction that has created a cursor WITH HOLD */
250
+ case T_DeclareCursorStmt :
251
+ {
252
+ DeclareCursorStmt * stmt = (DeclareCursorStmt * ) parsetree ;
253
+ if (stmt -> options & CURSOR_OPT_HOLD )
254
+ CurrentTxNonpreparable = true;
255
+ }
256
+ break ;
257
+
240
258
default :
241
259
LogLogicalMessage ("D" , queryString , strlen (queryString ) + 1 , true);
242
260
CurrentTxContainsDDL = true;
243
261
break ;
244
262
}
245
263
264
+ /* Send non-transactional message then */
265
+ if (CurrentTxNonpreparable )
266
+ LogLogicalMessage ("C" , queryString , strlen (queryString ) + 1 , false);
267
+
246
268
if (PreviousProcessUtilityHook != NULL )
247
269
{
248
270
PreviousProcessUtilityHook (pstmt , queryString , context ,
0 commit comments