@@ -64,7 +64,7 @@ static bool xact_got_connection = false;
64
64
65
65
typedef long long csn_t ;
66
66
static csn_t currentGlobalTransactionId = 0 ;
67
- static int currentLocalTransactionId = 0 ;
67
+ static int currentLocalTransactionId = 0 ;
68
68
69
69
/* prototypes of private functions */
70
70
static PGconn * connect_pg_server (ForeignServer * server , UserMapping * user );
@@ -367,8 +367,10 @@ do_sql_command(PGconn *conn, const char *sql)
367
367
static void
368
368
do_sql_send_command (PGconn * conn , const char * sql )
369
369
{
370
- if (PQsendQuery (conn , sql ) != PGRES_COMMAND_OK ) {
371
- PGresult * res = PQgetResult (conn );
370
+ if (PQsendQuery (conn , sql ) != PGRES_COMMAND_OK )
371
+ {
372
+ PGresult * res = PQgetResult (conn );
373
+
372
374
elog (WARNING , "Failed to send command %s" , sql );
373
375
pgfdw_report_error (ERROR , res , conn , true, sql );
374
376
PQclear (res );
@@ -378,8 +380,10 @@ do_sql_send_command(PGconn *conn, const char *sql)
378
380
static void
379
381
do_sql_wait_command (PGconn * conn , const char * sql )
380
382
{
381
- PGresult * res ;
382
- while ((res = PQgetResult (conn )) != NULL ) {
383
+ PGresult * res ;
384
+
385
+ while ((res = PQgetResult (conn )) != NULL )
386
+ {
383
387
if (PQresultStatus (res ) != PGRES_COMMAND_OK )
384
388
pgfdw_report_error (ERROR , res , conn , true, sql );
385
389
PQclear (res );
@@ -410,9 +414,10 @@ begin_remote_xact(ConnCacheEntry *entry)
410
414
elog (DEBUG3 , "starting remote transaction on connection %p" ,
411
415
entry -> conn );
412
416
413
- if (TransactionIdIsValid (gxid )) {
414
- char stmt [64 ];
415
- PGresult * res ;
417
+ if (TransactionIdIsValid (gxid ))
418
+ {
419
+ char stmt [64 ];
420
+ PGresult * res ;
416
421
417
422
snprintf (stmt , sizeof (stmt ), "select public.dtm_join_transaction(%d)" , gxid );
418
423
res = PQexec (entry -> conn , stmt );
@@ -425,14 +430,15 @@ begin_remote_xact(ConnCacheEntry *entry)
425
430
sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ" ;
426
431
do_sql_command (entry -> conn , sql );
427
432
entry -> xact_depth = 1 ;
428
- if (UseTsDtmTransactions )
433
+ if (UseTsDtmTransactions )
429
434
{
430
435
if (!currentGlobalTransactionId )
431
436
{
432
- PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_extend('%d.%d')" ,
433
- MyProcPid , ++ currentLocalTransactionId ));
434
- char * resp ;
435
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
437
+ PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_extend('%d.%d')" ,
438
+ MyProcPid , ++ currentLocalTransactionId ));
439
+ char * resp ;
440
+
441
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
436
442
{
437
443
pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
438
444
}
@@ -442,13 +448,16 @@ begin_remote_xact(ConnCacheEntry *entry)
442
448
pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
443
449
}
444
450
PQclear (res );
445
- } else {
446
- PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_access(%llu, '%d.%d')" , currentGlobalTransactionId , MyProcPid , currentLocalTransactionId ));
447
- if (PQresultStatus (res ) != PGRES_TUPLES_OK )
451
+ }
452
+ else
453
+ {
454
+ PGresult * res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_access(%llu, '%d.%d')" , currentGlobalTransactionId , MyProcPid , currentLocalTransactionId ));
455
+
456
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
448
457
{
449
458
pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
450
459
}
451
- PQclear (res );
460
+ PQclear (res );
452
461
}
453
462
}
454
463
}
@@ -576,13 +585,14 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
576
585
PQclear (res );
577
586
}
578
587
579
- typedef bool (* DtmCommandResultHandler )(PGresult * result , void * arg );
588
+ typedef bool (* DtmCommandResultHandler ) (PGresult * result , void * arg );
580
589
581
- static bool RunDtmStatement (char const * sql , unsigned expectedStatus , DtmCommandResultHandler handler , void * arg )
590
+ static bool
591
+ RunDtmStatement (char const * sql , unsigned expectedStatus , DtmCommandResultHandler handler , void * arg )
582
592
{
583
593
HASH_SEQ_STATUS scan ;
584
594
ConnCacheEntry * entry ;
585
- bool allOk = true;
595
+ bool allOk = true;
586
596
587
597
hash_seq_init (& scan , ConnectionHash );
588
598
while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
@@ -598,40 +608,47 @@ static bool RunDtmStatement(char const* sql, unsigned expectedStatus, DtmCommand
598
608
{
599
609
if (entry -> xact_depth > 0 )
600
610
{
601
- PGresult * result = PQgetResult (entry -> conn );
611
+ PGresult * result = PQgetResult (entry -> conn );
612
+
602
613
if (PQresultStatus (result ) != expectedStatus || (handler && !handler (result , arg )))
603
614
{
604
615
elog (WARNING , "Failed command %s: status=%d, expected status=%d" , sql , PQresultStatus (result ), expectedStatus );
605
616
pgfdw_report_error (ERROR , result , entry -> conn , true, sql );
606
617
allOk = false;
607
618
}
608
619
PQclear (result );
609
- PQgetResult (entry -> conn ); /* consume NULL result */
620
+ PQgetResult (entry -> conn ); /* consume NULL result */
610
621
}
611
622
}
612
623
return allOk ;
613
624
}
614
625
615
- static bool RunDtmCommand (char const * sql )
626
+ static bool
627
+ RunDtmCommand (char const * sql )
616
628
{
617
629
return RunDtmStatement (sql , PGRES_COMMAND_OK , NULL , NULL );
618
630
}
619
631
620
- static bool RunDtmFunction (char const * sql )
632
+ static bool
633
+ RunDtmFunction (char const * sql )
621
634
{
622
635
return RunDtmStatement (sql , PGRES_TUPLES_OK , NULL , NULL );
623
636
}
624
637
625
638
626
- static bool DtmMaxCSN (PGresult * result , void * arg )
639
+ static bool
640
+ DtmMaxCSN (PGresult * result , void * arg )
627
641
{
628
- char * resp = PQgetvalue (result , 0 , 0 );
629
- csn_t * maxCSN = (csn_t * )arg ;
630
- csn_t csn = 0 ;
642
+ char * resp = PQgetvalue (result , 0 , 0 );
643
+ csn_t * maxCSN = (csn_t * ) arg ;
644
+ csn_t csn = 0 ;
645
+
631
646
if (resp == NULL || (* resp ) == '\0' || sscanf (resp , "%lld" , & csn ) != 1 )
632
647
{
633
648
return false;
634
- } else {
649
+ }
650
+ else
651
+ {
635
652
if (* maxCSN < csn )
636
653
{
637
654
* maxCSN = csn ;
@@ -657,35 +674,36 @@ pgfdw_xact_callback(XactEvent event, void *arg)
657
674
{
658
675
switch (event )
659
676
{
660
- case XACT_EVENT_PARALLEL_PRE_COMMIT :
661
- case XACT_EVENT_PRE_COMMIT :
662
- {
663
- csn_t maxCSN = 0 ;
664
-
665
- if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" ,
666
- MyProcPid , currentLocalTransactionId )) ||
667
- !RunDtmFunction (psprintf ("SELECT public.dtm_begin_prepare('%d.%d')" ,
668
- MyProcPid , currentLocalTransactionId )) ||
669
- !RunDtmStatement (psprintf ("SELECT public.dtm_prepare('%d.%d',0)" ,
670
- MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
671
- !RunDtmFunction (psprintf ("SELECT public.dtm_end_prepare('%d.%d',%lld)" ,
672
- MyProcPid , currentLocalTransactionId , maxCSN )) ||
673
- !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" ,
674
- MyProcPid , currentLocalTransactionId )))
675
- {
676
- RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" ,
677
- MyProcPid , currentLocalTransactionId ));
678
- ereport (ERROR ,
679
- (errcode (ERRCODE_TRANSACTION_ROLLBACK ),
680
- errmsg ("transaction was aborted at one of the shards" )));
681
- break ;
682
- }
683
- return ;
684
- }
685
- default :
686
- break ;
677
+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
678
+ case XACT_EVENT_PRE_COMMIT :
679
+ {
680
+ csn_t maxCSN = 0 ;
681
+
682
+ if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" ,
683
+ MyProcPid , currentLocalTransactionId )) ||
684
+ !RunDtmFunction (psprintf ("SELECT public.dtm_begin_prepare('%d.%d')" ,
685
+ MyProcPid , currentLocalTransactionId )) ||
686
+ !RunDtmStatement (psprintf ("SELECT public.dtm_prepare('%d.%d',0)" ,
687
+ MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
688
+ !RunDtmFunction (psprintf ("SELECT public.dtm_end_prepare('%d.%d',%lld)" ,
689
+ MyProcPid , currentLocalTransactionId , maxCSN )) ||
690
+ !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" ,
691
+ MyProcPid , currentLocalTransactionId )))
692
+ {
693
+ RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" ,
694
+ MyProcPid , currentLocalTransactionId ));
695
+ ereport (ERROR ,
696
+ (errcode (ERRCODE_TRANSACTION_ROLLBACK ),
697
+ errmsg ("transaction was aborted at one of the shards" )));
698
+ break ;
699
+ }
700
+ return ;
701
+ }
702
+ default :
703
+ break ;
687
704
}
688
705
}
706
+
689
707
/*
690
708
* Scan all connection cache entries to find open remote transactions, and
691
709
* close them.
@@ -694,27 +712,28 @@ pgfdw_xact_callback(XactEvent event, void *arg)
694
712
while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
695
713
{
696
714
PGresult * res ;
697
-
715
+
698
716
/* Ignore cache entry if no open connection right now */
699
717
if (entry -> conn == NULL )
700
718
continue ;
701
-
719
+
702
720
/* If it has an open remote transaction, try to close it */
703
721
if (entry -> xact_depth > 0 )
704
722
{
705
723
elog (DEBUG3 , "closing remote transaction on connection %p event %d" ,
706
724
entry -> conn , event );
707
-
725
+
708
726
switch (event )
709
727
{
710
- case XACT_EVENT_PARALLEL_PRE_COMMIT :
711
- case XACT_EVENT_PRE_COMMIT :
712
- /* Commit all remote transactions during pre-commit */
728
+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
729
+ case XACT_EVENT_PRE_COMMIT :
730
+ /* Commit all remote transactions during pre-commit */
713
731
do_sql_send_command (entry -> conn , "COMMIT TRANSACTION" );
714
732
continue ;
715
-
716
- case XACT_EVENT_PRE_PREPARE :
717
- /*
733
+
734
+ case XACT_EVENT_PRE_PREPARE :
735
+
736
+ /*
718
737
* We disallow remote transactions that modified anything,
719
738
* since it's not very reasonable to hold them open until
720
739
* the prepared transaction is committed. For the moment,
@@ -723,18 +742,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
723
742
* come right back here with event == XACT_EVENT_ABORT, so
724
743
* we'll clean up the connection state at that point.
725
744
*/
726
- ereport (ERROR ,
745
+ ereport (ERROR ,
727
746
(errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
728
747
errmsg ("cannot prepare a transaction that modified remote tables" )));
729
748
break ;
730
749
731
- case XACT_EVENT_PARALLEL_COMMIT :
750
+ case XACT_EVENT_PARALLEL_COMMIT :
732
751
case XACT_EVENT_COMMIT :
733
- case XACT_EVENT_PREPARE :
734
- if (!currentGlobalTransactionId )
752
+ case XACT_EVENT_PREPARE :
753
+ if (!currentGlobalTransactionId )
735
754
{
736
755
do_sql_wait_command (entry -> conn , "COMMIT TRANSACTION" );
737
756
}
757
+
738
758
/*
739
759
* If there were any errors in subtransactions, and we
740
760
* made prepared statements, do a DEALLOCATE ALL to make
@@ -758,11 +778,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
758
778
entry -> have_prep_stmt = false;
759
779
entry -> have_error = false;
760
780
break ;
761
-
762
- case XACT_EVENT_PARALLEL_ABORT :
763
- case XACT_EVENT_ABORT :
764
- /* Assume we might have lost track of prepared statements */
765
- entry -> have_error = true;
781
+
782
+ case XACT_EVENT_PARALLEL_ABORT :
783
+ case XACT_EVENT_ABORT :
784
+ /* Assume we might have lost track of prepared statements */
785
+ entry -> have_error = true;
766
786
/* If we're aborting, abort all remote transactions too */
767
787
res = PQexec (entry -> conn , "ABORT TRANSACTION" );
768
788
/* Note: can't throw ERROR, it would be infinite loop */
@@ -782,11 +802,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
782
802
entry -> have_error = false;
783
803
}
784
804
break ;
785
-
786
- case XACT_EVENT_START :
787
- case XACT_EVENT_ABORT_PREPARED :
788
- case XACT_EVENT_COMMIT_PREPARED :
789
- break ;
805
+
806
+ case XACT_EVENT_START :
807
+ case XACT_EVENT_ABORT_PREPARED :
808
+ case XACT_EVENT_COMMIT_PREPARED :
809
+ break ;
790
810
}
791
811
}
792
812
/* Reset state to show we're out of a transaction */
@@ -804,11 +824,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
804
824
entry -> conn = NULL ;
805
825
}
806
826
}
807
- if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT ) {
827
+ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT )
828
+ {
808
829
/*
809
- * Regardless of the event type, we can now mark ourselves as out of the
810
- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
811
- * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
830
+ * Regardless of the event type, we can now mark ourselves as out of
831
+ * the transaction. (Note: if we are here during PRE_COMMIT or
832
+ * PRE_PREPARE, this saves a useless scan of the hashtable during
833
+ * COMMIT or PREPARE.)
812
834
*/
813
835
xact_got_connection = false;
814
836
0 commit comments