20
20
#include "utils/hsearch.h"
21
21
#include "utils/memutils.h"
22
22
23
+ #undef DEBUG3
24
+ #define DEBUG3 WARNING
23
25
24
26
/*
25
27
* Connection cache hash table entry
@@ -68,6 +70,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
68
70
static void check_conn_params (const char * * keywords , const char * * values );
69
71
static void configure_remote_session (PGconn * conn );
70
72
static void do_sql_command (PGconn * conn , const char * sql );
73
+ static void do_sql_send_command (PGconn * conn , const char * sql );
74
+ static void do_sql_wait_command (PGconn * conn , const char * sql );
71
75
static void begin_remote_xact (ConnCacheEntry * entry );
72
76
static void pgfdw_xact_callback (XactEvent event , void * arg );
73
77
static void pgfdw_subxact_callback (SubXactEvent event ,
@@ -358,6 +362,27 @@ do_sql_command(PGconn *conn, const char *sql)
358
362
PQclear (res );
359
363
}
360
364
365
+ static void
366
+ do_sql_send_command (PGconn * conn , const char * sql )
367
+ {
368
+ if (PQsendQuery (conn , sql ) != PGRES_COMMAND_OK ) {
369
+ PGresult * res = PQgetResult (conn );
370
+ pgfdw_report_error (ERROR , res , conn , true, sql );
371
+ PQclear (res );
372
+ }
373
+ }
374
+
375
+ static void
376
+ do_sql_wait_command (PGconn * conn , const char * sql )
377
+ {
378
+ PGresult * res ;
379
+ while ((res = PQgetResult (conn )) != NULL ) {
380
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
381
+ pgfdw_report_error (ERROR , res , conn , true, sql );
382
+ PQclear (res );
383
+ }
384
+ }
385
+
361
386
/*
362
387
* Start remote transaction or subtransaction, if needed.
363
388
*
@@ -541,16 +566,35 @@ pgfdw_xact_callback(XactEvent event, void *arg)
541
566
/* If it has an open remote transaction, try to close it */
542
567
if (entry -> xact_depth > 0 )
543
568
{
544
- elog (DEBUG3 , "closing remote transaction on connection %p" ,
545
- entry -> conn );
569
+ elog (DEBUG3 , "closing remote transaction on connection %p event %d " ,
570
+ entry -> conn , event );
546
571
547
572
switch (event )
548
573
{
549
574
case XACT_EVENT_PARALLEL_PRE_COMMIT :
550
575
case XACT_EVENT_PRE_COMMIT :
551
576
/* Commit all remote transactions during pre-commit */
552
- do_sql_command (entry -> conn , "COMMIT TRANSACTION" );
577
+ do_sql_send_command (entry -> conn , "COMMIT TRANSACTION" );
578
+ continue ;
579
+ case XACT_EVENT_PRE_PREPARE :
553
580
581
+ /*
582
+ * We disallow remote transactions that modified anything,
583
+ * since it's not very reasonable to hold them open until
584
+ * the prepared transaction is committed. For the moment,
585
+ * throw error unconditionally; later we might allow
586
+ * read-only cases. Note that the error will cause us to
587
+ * come right back here with event == XACT_EVENT_ABORT, so
588
+ * we'll clean up the connection state at that point.
589
+ */
590
+ ereport (ERROR ,
591
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
592
+ errmsg ("cannot prepare a transaction that modified remote tables" )));
593
+ break ;
594
+ case XACT_EVENT_PARALLEL_COMMIT :
595
+ case XACT_EVENT_COMMIT :
596
+ case XACT_EVENT_PREPARE :
597
+ do_sql_wait_command (entry -> conn , "COMMIT TRANSACTION" );
554
598
/*
555
599
* If there were any errors in subtransactions, and we
556
600
* made prepared statements, do a DEALLOCATE ALL to make
@@ -574,27 +618,6 @@ pgfdw_xact_callback(XactEvent event, void *arg)
574
618
entry -> have_prep_stmt = false;
575
619
entry -> have_error = false;
576
620
break ;
577
- case XACT_EVENT_PRE_PREPARE :
578
-
579
- /*
580
- * We disallow remote transactions that modified anything,
581
- * since it's not very reasonable to hold them open until
582
- * the prepared transaction is committed. For the moment,
583
- * throw error unconditionally; later we might allow
584
- * read-only cases. Note that the error will cause us to
585
- * come right back here with event == XACT_EVENT_ABORT, so
586
- * we'll clean up the connection state at that point.
587
- */
588
- ereport (ERROR ,
589
- (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
590
- errmsg ("cannot prepare a transaction that modified remote tables" )));
591
- break ;
592
- case XACT_EVENT_PARALLEL_COMMIT :
593
- case XACT_EVENT_COMMIT :
594
- case XACT_EVENT_PREPARE :
595
- /* Pre-commit should have closed the open transaction */
596
- elog (ERROR , "missed cleaning up connection during pre-commit" );
597
- break ;
598
621
case XACT_EVENT_PARALLEL_ABORT :
599
622
case XACT_EVENT_ABORT :
600
623
/* Assume we might have lost track of prepared statements */
@@ -631,21 +654,23 @@ pgfdw_xact_callback(XactEvent event, void *arg)
631
654
if (PQstatus (entry -> conn ) != CONNECTION_OK ||
632
655
PQtransactionStatus (entry -> conn ) != PQTRANS_IDLE )
633
656
{
634
- elog (DEBUG3 , "discarding connection %p" , entry -> conn );
657
+ elog (DEBUG3 , "discarding connection %p, conn status=%d, trans status=%d " , entry -> conn , PQstatus ( entry -> conn ), PQtransactionStatus ( entry -> conn ) );
635
658
PQfinish (entry -> conn );
636
659
entry -> conn = NULL ;
637
660
}
638
661
}
639
662
640
- /*
641
- * Regardless of the event type, we can now mark ourselves as out of the
642
- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
643
- * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
644
- */
645
- xact_got_connection = false;
646
-
647
- /* Also reset cursor numbering for next transaction */
648
- cursor_number = 0 ;
663
+ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT ) {
664
+ /*
665
+ * Regardless of the event type, we can now mark ourselves as out of the
666
+ * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
667
+ * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
668
+ */
669
+ xact_got_connection = false;
670
+
671
+ /* Also reset cursor numbering for next transaction */
672
+ cursor_number = 0 ;
673
+ }
649
674
}
650
675
651
676
/*
0 commit comments