@@ -108,6 +108,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
108
108
#define DefaultOption ""
109
109
#define DefaultAuthtype ""
110
110
#define DefaultPassword ""
111
+ #define DefaultTargetSessionAttrs "any"
111
112
#ifdef USE_SSL
112
113
#define DefaultSSLMode "prefer"
113
114
#else
@@ -300,6 +301,11 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
300
301
"Replication" , "D" , 5 ,
301
302
offsetof(struct pg_conn , replication )},
302
303
304
+ {"target_session_attrs" , "PGTARGETSESSIONATTRS" ,
305
+ DefaultTargetSessionAttrs , NULL ,
306
+ "Target-Session-Attrs" , "" , 11 , /* sizeof("read-write") = 11 */
307
+ offsetof(struct pg_conn , target_session_attrs )},
308
+
303
309
/* Terminating entry --- MUST BE LAST */
304
310
{NULL , NULL , NULL , NULL ,
305
311
NULL , NULL , 0 }
@@ -336,6 +342,8 @@ static PGconn *makeEmptyPGconn(void);
336
342
static bool fillPGconn (PGconn * conn , PQconninfoOption * connOptions );
337
343
static void freePGconn (PGconn * conn );
338
344
static void closePGconn (PGconn * conn );
345
+ static void release_all_addrinfo (PGconn * conn );
346
+ static void sendTerminateConn (PGconn * conn );
339
347
static PQconninfoOption * conninfo_init (PQExpBuffer errorMessage );
340
348
static PQconninfoOption * parse_connection_string (const char * conninfo ,
341
349
PQExpBuffer errorMessage , bool use_defaults );
@@ -1025,6 +1033,22 @@ connectOptions2(PGconn *conn)
1025
1033
goto oom_error ;
1026
1034
}
1027
1035
1036
+ /*
1037
+ * Validate target_session_attrs option.
1038
+ */
1039
+ if (conn -> target_session_attrs )
1040
+ {
1041
+ if (strcmp (conn -> target_session_attrs , "any" ) != 0
1042
+ && strcmp (conn -> target_session_attrs , "read-write" ) != 0 )
1043
+ {
1044
+ conn -> status = CONNECTION_BAD ;
1045
+ printfPQExpBuffer (& conn -> errorMessage ,
1046
+ libpq_gettext ("invalid target_session_attrs value: \"%s\"\n" ),
1047
+ conn -> target_session_attrs );
1048
+ return false;
1049
+ }
1050
+ }
1051
+
1028
1052
/*
1029
1053
* Only if we get this far is it appropriate to try to connect. (We need a
1030
1054
* state flag, rather than just the boolean result of this function, in
@@ -1814,6 +1838,7 @@ PQconnectPoll(PGconn *conn)
1814
1838
/* Special cases: proceed without waiting. */
1815
1839
case CONNECTION_SSL_STARTUP :
1816
1840
case CONNECTION_NEEDED :
1841
+ case CONNECTION_CHECK_WRITABLE :
1817
1842
break ;
1818
1843
1819
1844
default :
@@ -2752,27 +2777,6 @@ PQconnectPoll(PGconn *conn)
2752
2777
goto error_return ;
2753
2778
}
2754
2779
2755
- /* We can release the address lists now. */
2756
- if (conn -> connhost != NULL )
2757
- {
2758
- int i ;
2759
-
2760
- for (i = 0 ; i < conn -> nconnhost ; ++ i )
2761
- {
2762
- int family = AF_UNSPEC ;
2763
-
2764
- #ifdef HAVE_UNIX_SOCKETS
2765
- if (conn -> connhost [i ].type == CHT_UNIX_SOCKET )
2766
- family = AF_UNIX ;
2767
- #endif
2768
-
2769
- pg_freeaddrinfo_all (family ,
2770
- conn -> connhost [i ].addrlist );
2771
- conn -> connhost [i ].addrlist = NULL ;
2772
- }
2773
- }
2774
- conn -> addr_cur = NULL ;
2775
-
2776
2780
/* Fire up post-connection housekeeping if needed */
2777
2781
if (PG_PROTOCOL_MAJOR (conn -> pversion ) < 3 )
2778
2782
{
@@ -2782,7 +2786,24 @@ PQconnectPoll(PGconn *conn)
2782
2786
return PGRES_POLLING_WRITING ;
2783
2787
}
2784
2788
2785
- /* Otherwise, we are open for business! */
2789
+ /*
2790
+ * If a read-write connection is required, see if we have one.
2791
+ */
2792
+ if (conn -> target_session_attrs != NULL &&
2793
+ strcmp (conn -> target_session_attrs , "read-write" ) == 0 )
2794
+ {
2795
+ conn -> status = CONNECTION_OK ;
2796
+ if (!PQsendQuery (conn ,
2797
+ "show transaction_read_only" ))
2798
+ goto error_return ;
2799
+ conn -> status = CONNECTION_CHECK_WRITABLE ;
2800
+ return PGRES_POLLING_READING ;
2801
+ }
2802
+
2803
+ /* We can release the address lists now. */
2804
+ release_all_addrinfo (conn );
2805
+
2806
+ /* We are open for business! */
2786
2807
conn -> status = CONNECTION_OK ;
2787
2808
return PGRES_POLLING_OK ;
2788
2809
}
@@ -2814,10 +2835,109 @@ PQconnectPoll(PGconn *conn)
2814
2835
goto error_return ;
2815
2836
}
2816
2837
2838
+ /*
2839
+ * If a read-write connection is requisted check for same.
2840
+ */
2841
+ if (conn -> target_session_attrs != NULL &&
2842
+ strcmp (conn -> target_session_attrs , "read-write" ) == 0 )
2843
+ {
2844
+ conn -> status = CONNECTION_OK ;
2845
+ if (!PQsendQuery (conn ,
2846
+ "show transaction_read_only" ))
2847
+ goto error_return ;
2848
+ conn -> status = CONNECTION_CHECK_WRITABLE ;
2849
+ return PGRES_POLLING_READING ;
2850
+ }
2851
+
2852
+ /* We can release the address lists now. */
2853
+ release_all_addrinfo (conn );
2854
+
2817
2855
/* We are open for business! */
2818
2856
conn -> status = CONNECTION_OK ;
2819
2857
return PGRES_POLLING_OK ;
2820
2858
2859
+ case CONNECTION_CHECK_WRITABLE :
2860
+ {
2861
+ conn -> status = CONNECTION_OK ;
2862
+ if (!PQconsumeInput (conn ))
2863
+ goto error_return ;
2864
+
2865
+ if (PQisBusy (conn ))
2866
+ {
2867
+ conn -> status = CONNECTION_CHECK_WRITABLE ;
2868
+ return PGRES_POLLING_READING ;
2869
+ }
2870
+
2871
+ res = PQgetResult (conn );
2872
+ if (res && (PQresultStatus (res ) == PGRES_TUPLES_OK ) &&
2873
+ PQntuples (res ) == 1 )
2874
+ {
2875
+ char * val ;
2876
+
2877
+ val = PQgetvalue (res , 0 , 0 );
2878
+ if (strncmp (val , "on" , 2 ) == 0 )
2879
+ {
2880
+ PQclear (res );
2881
+
2882
+ /* Not writable; close connection. */
2883
+ appendPQExpBuffer (& conn -> errorMessage ,
2884
+ libpq_gettext ("could not make a writable "
2885
+ "connection to server "
2886
+ "\"%s:%s\"\n" ),
2887
+ conn -> connhost [conn -> whichhost ].host ,
2888
+ conn -> connhost [conn -> whichhost ].port );
2889
+ conn -> status = CONNECTION_OK ;
2890
+ sendTerminateConn (conn );
2891
+ pqDropConnection (conn , true);
2892
+
2893
+ /* Skip any remaining addresses for this host. */
2894
+ conn -> addr_cur = NULL ;
2895
+ if (conn -> whichhost + 1 < conn -> nconnhost )
2896
+ {
2897
+ conn -> status = CONNECTION_NEEDED ;
2898
+ goto keep_going ;
2899
+ }
2900
+
2901
+ /* No more addresses to try. So we fail. */
2902
+ goto error_return ;
2903
+ }
2904
+ PQclear (res );
2905
+
2906
+ /* We can release the address lists now. */
2907
+ release_all_addrinfo (conn );
2908
+
2909
+ /* We are open for business! */
2910
+ conn -> status = CONNECTION_OK ;
2911
+ return PGRES_POLLING_OK ;
2912
+ }
2913
+
2914
+ /*
2915
+ * Something went wrong with "show transaction_read_only". We
2916
+ * should try next addresses.
2917
+ */
2918
+ if (res )
2919
+ PQclear (res );
2920
+ appendPQExpBuffer (& conn -> errorMessage ,
2921
+ libpq_gettext ("test \"show transaction_read_only\" failed "
2922
+ " on \"%s:%s\" \n" ),
2923
+ conn -> connhost [conn -> whichhost ].host ,
2924
+ conn -> connhost [conn -> whichhost ].port );
2925
+ conn -> status = CONNECTION_OK ;
2926
+ sendTerminateConn (conn );
2927
+ pqDropConnection (conn , true);
2928
+
2929
+ if (conn -> addr_cur -> ai_next != NULL ||
2930
+ conn -> whichhost + 1 < conn -> nconnhost )
2931
+ {
2932
+ conn -> addr_cur = conn -> addr_cur -> ai_next ;
2933
+ conn -> status = CONNECTION_NEEDED ;
2934
+ goto keep_going ;
2935
+ }
2936
+
2937
+ /* No more addresses to try. So we fail. */
2938
+ goto error_return ;
2939
+ }
2940
+
2821
2941
default :
2822
2942
appendPQExpBuffer (& conn -> errorMessage ,
2823
2943
libpq_gettext ("invalid connection state %d, "
@@ -3109,6 +3229,8 @@ freePGconn(PGconn *conn)
3109
3229
free (conn -> outBuffer );
3110
3230
if (conn -> rowBuf )
3111
3231
free (conn -> rowBuf );
3232
+ if (conn -> target_session_attrs )
3233
+ free (conn -> target_session_attrs );
3112
3234
termPQExpBuffer (& conn -> errorMessage );
3113
3235
termPQExpBuffer (& conn -> workBuffer );
3114
3236
@@ -3120,19 +3242,41 @@ freePGconn(PGconn *conn)
3120
3242
}
3121
3243
3122
3244
/*
3123
- * closePGconn
3124
- * - properly close a connection to the backend
3125
- *
3126
- * This should reset or release all transient state, but NOT the connection
3127
- * parameters. On exit, the PGconn should be in condition to start a fresh
3128
- * connection with the same parameters (see PQreset()).
3245
+ * release_all_addrinfo
3246
+ * - free addrinfo of all hostconn elements.
3129
3247
*/
3248
+
3130
3249
static void
3131
- closePGconn (PGconn * conn )
3250
+ release_all_addrinfo (PGconn * conn )
3132
3251
{
3133
- PGnotify * notify ;
3134
- pgParameterStatus * pstatus ;
3252
+ if (conn -> connhost != NULL )
3253
+ {
3254
+ int i ;
3255
+
3256
+ for (i = 0 ; i < conn -> nconnhost ; ++ i )
3257
+ {
3258
+ int family = AF_UNSPEC ;
3259
+
3260
+ #ifdef HAVE_UNIX_SOCKETS
3261
+ if (conn -> connhost [i ].type == CHT_UNIX_SOCKET )
3262
+ family = AF_UNIX ;
3263
+ #endif
3135
3264
3265
+ pg_freeaddrinfo_all (family ,
3266
+ conn -> connhost [i ].addrlist );
3267
+ conn -> connhost [i ].addrlist = NULL ;
3268
+ }
3269
+ }
3270
+ conn -> addr_cur = NULL ;
3271
+ }
3272
+
3273
+ /*
3274
+ * sendTerminateConn
3275
+ * - Send a terminate message to backend.
3276
+ */
3277
+ static void
3278
+ sendTerminateConn (PGconn * conn )
3279
+ {
3136
3280
/*
3137
3281
* Note that the protocol doesn't allow us to send Terminate messages
3138
3282
* during the startup phase.
@@ -3147,6 +3291,23 @@ closePGconn(PGconn *conn)
3147
3291
pqPutMsgEnd (conn );
3148
3292
(void ) pqFlush (conn );
3149
3293
}
3294
+ }
3295
+
3296
+ /*
3297
+ * closePGconn
3298
+ * - properly close a connection to the backend
3299
+ *
3300
+ * This should reset or release all transient state, but NOT the connection
3301
+ * parameters. On exit, the PGconn should be in condition to start a fresh
3302
+ * connection with the same parameters (see PQreset()).
3303
+ */
3304
+ static void
3305
+ closePGconn (PGconn * conn )
3306
+ {
3307
+ PGnotify * notify ;
3308
+ pgParameterStatus * pstatus ;
3309
+
3310
+ sendTerminateConn (conn );
3150
3311
3151
3312
/*
3152
3313
* Must reset the blocking status so a possible reconnect will work.
@@ -3165,25 +3326,8 @@ closePGconn(PGconn *conn)
3165
3326
conn -> asyncStatus = PGASYNC_IDLE ;
3166
3327
pqClearAsyncResult (conn ); /* deallocate result */
3167
3328
resetPQExpBuffer (& conn -> errorMessage );
3168
- if (conn -> connhost != NULL )
3169
- {
3170
- int i ;
3171
-
3172
- for (i = 0 ; i < conn -> nconnhost ; ++ i )
3173
- {
3174
- int family = AF_UNSPEC ;
3175
-
3176
- #ifdef HAVE_UNIX_SOCKETS
3177
- if (conn -> connhost [i ].type == CHT_UNIX_SOCKET )
3178
- family = AF_UNIX ;
3179
- #endif
3329
+ release_all_addrinfo (conn );
3180
3330
3181
- pg_freeaddrinfo_all (family ,
3182
- conn -> connhost [i ].addrlist );
3183
- conn -> connhost [i ].addrlist = NULL ;
3184
- }
3185
- }
3186
- conn -> addr_cur = NULL ;
3187
3331
notify = conn -> notifyHead ;
3188
3332
while (notify != NULL )
3189
3333
{
0 commit comments