@@ -58,6 +58,7 @@ typedef struct ConnCacheEntry
58
58
/* Remaining fields are invalid when conn is NULL: */
59
59
int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
60
60
* one level of subxact open, etc */
61
+ bool xact_read_only ; /* xact r/o state */
61
62
bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
62
63
bool have_error ; /* have any subxacts aborted in this xact? */
63
64
bool changing_xact_state ; /* xact state change in process */
@@ -84,6 +85,12 @@ static unsigned int prep_stmt_number = 0;
84
85
/* tracks whether any work is needed in callback functions */
85
86
static bool xact_got_connection = false;
86
87
88
+ /*
89
+ * tracks the nesting level of the topmost read-only transaction determined
90
+ * by GetTopReadOnlyTransactionNestLevel()
91
+ */
92
+ static int top_read_only_level = 0 ;
93
+
87
94
/* custom wait event values, retrieved from shared memory */
88
95
static uint32 pgfdw_we_cleanup_result = 0 ;
89
96
static uint32 pgfdw_we_connect = 0 ;
@@ -372,6 +379,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
372
379
373
380
/* Reset all transient state fields, to be sure all are clean */
374
381
entry -> xact_depth = 0 ;
382
+ entry -> xact_read_only = false;
375
383
entry -> have_prep_stmt = false;
376
384
entry -> have_error = false;
377
385
entry -> changing_xact_state = false;
@@ -843,29 +851,81 @@ do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
843
851
* those scans. A disadvantage is that we can't provide sane emulation of
844
852
* READ COMMITTED behavior --- it would be nice if we had some other way to
845
853
* control which remote queries share a snapshot.
854
+ *
855
+ * Note also that we always start the remote transaction with the same
856
+ * read/write and deferrable properties as the local transaction, and start
857
+ * the remote subtransaction with the same read/write property as the local
858
+ * subtransaction.
846
859
*/
847
860
static void
848
861
begin_remote_xact (ConnCacheEntry * entry )
849
862
{
850
863
int curlevel = GetCurrentTransactionNestLevel ();
851
864
852
- /* Start main transaction if we haven't yet */
865
+ /*
866
+ * Set the nesting level of the topmost read-only transaction if the
867
+ * current transaction is read-only and we haven't yet. Once it's set,
868
+ * it's retained until that transaction is committed/aborted, and then
869
+ * reset (see pgfdw_xact_callback and pgfdw_subxact_callback).
870
+ */
871
+ if (XactReadOnly )
872
+ {
873
+ if (top_read_only_level == 0 )
874
+ top_read_only_level = GetTopReadOnlyTransactionNestLevel ();
875
+ Assert (top_read_only_level > 0 );
876
+ }
877
+ else
878
+ Assert (top_read_only_level == 0 );
879
+
880
+ /*
881
+ * Start main transaction if we haven't yet; otherwise, change the
882
+ * already-started remote transaction/subtransaction to read-only if the
883
+ * local transaction/subtransaction have been done so after starting them
884
+ * and we haven't yet.
885
+ */
853
886
if (entry -> xact_depth <= 0 )
854
887
{
855
- const char * sql ;
888
+ StringInfoData sql ;
889
+ bool ro = (top_read_only_level == 1 );
856
890
857
891
elog (DEBUG3 , "starting remote transaction on connection %p" ,
858
892
entry -> conn );
859
893
894
+ initStringInfo (& sql );
895
+ appendStringInfoString (& sql , "START TRANSACTION ISOLATION LEVEL " );
860
896
if (IsolationIsSerializable ())
861
- sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE" ;
897
+ appendStringInfoString ( & sql , " SERIALIZABLE") ;
862
898
else
863
- sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ" ;
899
+ appendStringInfoString (& sql , "REPEATABLE READ" );
900
+ if (ro )
901
+ appendStringInfoString (& sql , " READ ONLY" );
902
+ if (XactDeferrable )
903
+ appendStringInfoString (& sql , " DEFERRABLE" );
864
904
entry -> changing_xact_state = true;
865
- do_sql_command (entry -> conn , sql );
905
+ do_sql_command (entry -> conn , sql . data );
866
906
entry -> xact_depth = 1 ;
907
+ if (ro )
908
+ {
909
+ Assert (!entry -> xact_read_only );
910
+ entry -> xact_read_only = true;
911
+ }
867
912
entry -> changing_xact_state = false;
868
913
}
914
+ else if (!entry -> xact_read_only )
915
+ {
916
+ Assert (top_read_only_level == 0 ||
917
+ entry -> xact_depth <= top_read_only_level );
918
+ if (entry -> xact_depth == top_read_only_level )
919
+ {
920
+ entry -> changing_xact_state = true;
921
+ do_sql_command (entry -> conn , "SET transaction_read_only = on" );
922
+ entry -> xact_read_only = true;
923
+ entry -> changing_xact_state = false;
924
+ }
925
+ }
926
+ else
927
+ Assert (top_read_only_level > 0 &&
928
+ entry -> xact_depth >= top_read_only_level );
869
929
870
930
/*
871
931
* If we're in a subtransaction, stack up savepoints to match our level.
@@ -874,12 +934,21 @@ begin_remote_xact(ConnCacheEntry *entry)
874
934
*/
875
935
while (entry -> xact_depth < curlevel )
876
936
{
877
- char sql [64 ];
937
+ StringInfoData sql ;
938
+ bool ro = (entry -> xact_depth + 1 == top_read_only_level );
878
939
879
- snprintf (sql , sizeof (sql ), "SAVEPOINT s%d" , entry -> xact_depth + 1 );
940
+ initStringInfo (& sql );
941
+ appendStringInfo (& sql , "SAVEPOINT s%d" , entry -> xact_depth + 1 );
942
+ if (ro )
943
+ appendStringInfoString (& sql , "; SET transaction_read_only = on" );
880
944
entry -> changing_xact_state = true;
881
- do_sql_command (entry -> conn , sql );
945
+ do_sql_command (entry -> conn , sql . data );
882
946
entry -> xact_depth ++ ;
947
+ if (ro )
948
+ {
949
+ Assert (!entry -> xact_read_only );
950
+ entry -> xact_read_only = true;
951
+ }
883
952
entry -> changing_xact_state = false;
884
953
}
885
954
}
@@ -1174,6 +1243,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
1174
1243
1175
1244
/* Also reset cursor numbering for next transaction */
1176
1245
cursor_number = 0 ;
1246
+
1247
+ /* Likewise for top_read_only_level */
1248
+ top_read_only_level = 0 ;
1177
1249
}
1178
1250
1179
1251
/*
@@ -1272,6 +1344,10 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1272
1344
false);
1273
1345
}
1274
1346
}
1347
+
1348
+ /* If in the topmost read-only transaction, reset top_read_only_level */
1349
+ if (curlevel == top_read_only_level )
1350
+ top_read_only_level = 0 ;
1275
1351
}
1276
1352
1277
1353
/*
@@ -1374,6 +1450,9 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1374
1450
/* Reset state to show we're out of a transaction */
1375
1451
entry -> xact_depth = 0 ;
1376
1452
1453
+ /* Reset xact r/o state */
1454
+ entry -> xact_read_only = false;
1455
+
1377
1456
/*
1378
1457
* If the connection isn't in a good idle state, it is marked as
1379
1458
* invalid or keep_connections option of its server is disabled, then
@@ -1394,6 +1473,10 @@ pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1394
1473
{
1395
1474
/* Reset state to show we're out of a subtransaction */
1396
1475
entry -> xact_depth -- ;
1476
+
1477
+ /* If in the topmost read-only transaction, reset xact r/o state */
1478
+ if (entry -> xact_depth + 1 == top_read_only_level )
1479
+ entry -> xact_read_only = false;
1397
1480
}
1398
1481
}
1399
1482
0 commit comments