Skip to content

Commit 721f7bd

Browse files
committed
libpq: Add target_session_attrs parameter.
Commit 274bb2b made it possible to specify multiple IPs in a connection string, but that's not good enough for the case where you have a read-write master and a bunch of read-only standbys and want to connect to whichever server is the master at the current time. This commit allows that, by making it possible to specify target_session_attrs=read-write as a connection parameter. There was extensive discussion of the best name for the connection parameter and its values as well as the best way to distinguish master and standbys. For now, adopt the same solution as JDBC: if the user wants a read-write connection, issue 'show transaction_read_only' and rejection the connection if the result is 'on'. In the future, we could add additional values of this new target_session_attrs parameter that issue different queries; or we might have some way of distinguishing the server type without resorting to an SQL query; but right now, we have this, and that's (hopefully) a good start. Victor Wagner and Mithun Cy. Design review by Álvaro Herrera, Catalin Iacob, Takayuki Tsunakawa, and Craig Ringer; code review by me. I changed Mithun's patch to skip all remaining IPs for a host if we reject a connection based on this new parameter, rewrote the documentation, and did some other cosmetic cleanup. Discussion: http://postgr.es/m/CAD__OuhqPRGpcsfwPHz_PDqAGkoqS1UvnUnOnAB-LBWBW=wu4A@mail.gmail.com
1 parent 4fafa57 commit 721f7bd

File tree

4 files changed

+227
-51
lines changed

4 files changed

+227
-51
lines changed

doc/src/sgml/libpq.sgml

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,7 @@ postgresql://localhost/mydb
811811
postgresql://user@localhost
812812
postgresql://user:secret@localhost
813813
postgresql://other@localhost/otherdb?connect_timeout=10&application_name=myapp
814-
postgresql://host1:123,host2:456/somedb
814+
postgresql://host1:123,host2:456/somedb?target_session_attrs=any&application_name=myapp
815815
</programlisting>
816816
Components of the hierarchical part of the <acronym>URI</acronym> can also
817817
be given as parameters. For example:
@@ -1386,6 +1386,23 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
13861386
</para>
13871387
</listitem>
13881388
</varlistentry>
1389+
1390+
<varlistentry id="libpq-connect-target-session-attrs" xreflabel="target_session_attrs">
1391+
<term><literal>target_session_attrs</literal></term>
1392+
<listitem>
1393+
<para>
1394+
If this parameter is set to <literal>read-write</literal>, only a
1395+
connection in which read-write transactions are accepted by default
1396+
is considered acceptable. The query
1397+
<literal>show transaction_read_only</literal> will be sent upon any
1398+
successful connection; if it returns <literal>on</>, the connection
1399+
will be closed. If multiple hosts were specified in the connection
1400+
string, any remaining servers will be tried just as if the connection
1401+
attempt had failed. The default value of this parameter,
1402+
<literal>any</>, regards all connections as acceptable.
1403+
</para>
1404+
</listitem>
1405+
</varlistentry>
13891406
</variablelist>
13901407
</para>
13911408
</sect2>
@@ -7069,6 +7086,16 @@ myEventProc(PGEventId evtId, void *evtInfo, void *passThrough)
70697086
linkend="libpq-connect-client-encoding"> connection parameter.
70707087
</para>
70717088
</listitem>
7089+
7090+
<listitem>
7091+
<para>
7092+
<indexterm>
7093+
<primary><envar>PGTARGETSESSIONATTRS</envar></primary>
7094+
</indexterm>
7095+
<envar>PGTARGETSESSIONATTRS</envar> behaves the same as the <xref
7096+
linkend="libpq-connect-target-session-attrs"> connection parameter.
7097+
</para>
7098+
</listitem>
70727099
</itemizedlist>
70737100
</para>
70747101

src/interfaces/libpq/fe-connect.c

Lines changed: 193 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ static int ldapServiceLookup(const char *purl, PQconninfoOption *options,
108108
#define DefaultOption ""
109109
#define DefaultAuthtype ""
110110
#define DefaultPassword ""
111+
#define DefaultTargetSessionAttrs "any"
111112
#ifdef USE_SSL
112113
#define DefaultSSLMode "prefer"
113114
#else
@@ -300,6 +301,11 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
300301
"Replication", "D", 5,
301302
offsetof(struct pg_conn, replication)},
302303

304+
{"target_session_attrs", "PGTARGETSESSIONATTRS",
305+
DefaultTargetSessionAttrs, NULL,
306+
"Target-Session-Attrs", "", 11, /* sizeof("read-write") = 11 */
307+
offsetof(struct pg_conn, target_session_attrs)},
308+
303309
/* Terminating entry --- MUST BE LAST */
304310
{NULL, NULL, NULL, NULL,
305311
NULL, NULL, 0}
@@ -336,6 +342,8 @@ static PGconn *makeEmptyPGconn(void);
336342
static bool fillPGconn(PGconn *conn, PQconninfoOption *connOptions);
337343
static void freePGconn(PGconn *conn);
338344
static void closePGconn(PGconn *conn);
345+
static void release_all_addrinfo(PGconn *conn);
346+
static void sendTerminateConn(PGconn *conn);
339347
static PQconninfoOption *conninfo_init(PQExpBuffer errorMessage);
340348
static PQconninfoOption *parse_connection_string(const char *conninfo,
341349
PQExpBuffer errorMessage, bool use_defaults);
@@ -1025,6 +1033,22 @@ connectOptions2(PGconn *conn)
10251033
goto oom_error;
10261034
}
10271035

1036+
/*
1037+
* Validate target_session_attrs option.
1038+
*/
1039+
if (conn->target_session_attrs)
1040+
{
1041+
if (strcmp(conn->target_session_attrs, "any") != 0
1042+
&& strcmp(conn->target_session_attrs, "read-write") != 0)
1043+
{
1044+
conn->status = CONNECTION_BAD;
1045+
printfPQExpBuffer(&conn->errorMessage,
1046+
libpq_gettext("invalid target_session_attrs value: \"%s\"\n"),
1047+
conn->target_session_attrs);
1048+
return false;
1049+
}
1050+
}
1051+
10281052
/*
10291053
* Only if we get this far is it appropriate to try to connect. (We need a
10301054
* state flag, rather than just the boolean result of this function, in
@@ -1814,6 +1838,7 @@ PQconnectPoll(PGconn *conn)
18141838
/* Special cases: proceed without waiting. */
18151839
case CONNECTION_SSL_STARTUP:
18161840
case CONNECTION_NEEDED:
1841+
case CONNECTION_CHECK_WRITABLE:
18171842
break;
18181843

18191844
default:
@@ -2752,27 +2777,6 @@ PQconnectPoll(PGconn *conn)
27522777
goto error_return;
27532778
}
27542779

2755-
/* We can release the address lists now. */
2756-
if (conn->connhost != NULL)
2757-
{
2758-
int i;
2759-
2760-
for (i = 0; i < conn->nconnhost; ++i)
2761-
{
2762-
int family = AF_UNSPEC;
2763-
2764-
#ifdef HAVE_UNIX_SOCKETS
2765-
if (conn->connhost[i].type == CHT_UNIX_SOCKET)
2766-
family = AF_UNIX;
2767-
#endif
2768-
2769-
pg_freeaddrinfo_all(family,
2770-
conn->connhost[i].addrlist);
2771-
conn->connhost[i].addrlist = NULL;
2772-
}
2773-
}
2774-
conn->addr_cur = NULL;
2775-
27762780
/* Fire up post-connection housekeeping if needed */
27772781
if (PG_PROTOCOL_MAJOR(conn->pversion) < 3)
27782782
{
@@ -2782,7 +2786,24 @@ PQconnectPoll(PGconn *conn)
27822786
return PGRES_POLLING_WRITING;
27832787
}
27842788

2785-
/* Otherwise, we are open for business! */
2789+
/*
2790+
* If a read-write connection is required, see if we have one.
2791+
*/
2792+
if (conn->target_session_attrs != NULL &&
2793+
strcmp(conn->target_session_attrs, "read-write") == 0)
2794+
{
2795+
conn->status = CONNECTION_OK;
2796+
if (!PQsendQuery(conn,
2797+
"show transaction_read_only"))
2798+
goto error_return;
2799+
conn->status = CONNECTION_CHECK_WRITABLE;
2800+
return PGRES_POLLING_READING;
2801+
}
2802+
2803+
/* We can release the address lists now. */
2804+
release_all_addrinfo(conn);
2805+
2806+
/* We are open for business! */
27862807
conn->status = CONNECTION_OK;
27872808
return PGRES_POLLING_OK;
27882809
}
@@ -2814,10 +2835,109 @@ PQconnectPoll(PGconn *conn)
28142835
goto error_return;
28152836
}
28162837

2838+
/*
2839+
* If a read-write connection is requisted check for same.
2840+
*/
2841+
if (conn->target_session_attrs != NULL &&
2842+
strcmp(conn->target_session_attrs, "read-write") == 0)
2843+
{
2844+
conn->status = CONNECTION_OK;
2845+
if (!PQsendQuery(conn,
2846+
"show transaction_read_only"))
2847+
goto error_return;
2848+
conn->status = CONNECTION_CHECK_WRITABLE;
2849+
return PGRES_POLLING_READING;
2850+
}
2851+
2852+
/* We can release the address lists now. */
2853+
release_all_addrinfo(conn);
2854+
28172855
/* We are open for business! */
28182856
conn->status = CONNECTION_OK;
28192857
return PGRES_POLLING_OK;
28202858

2859+
case CONNECTION_CHECK_WRITABLE:
2860+
{
2861+
conn->status = CONNECTION_OK;
2862+
if (!PQconsumeInput(conn))
2863+
goto error_return;
2864+
2865+
if (PQisBusy(conn))
2866+
{
2867+
conn->status = CONNECTION_CHECK_WRITABLE;
2868+
return PGRES_POLLING_READING;
2869+
}
2870+
2871+
res = PQgetResult(conn);
2872+
if (res && (PQresultStatus(res) == PGRES_TUPLES_OK) &&
2873+
PQntuples(res) == 1)
2874+
{
2875+
char *val;
2876+
2877+
val = PQgetvalue(res, 0, 0);
2878+
if (strncmp(val, "on", 2) == 0)
2879+
{
2880+
PQclear(res);
2881+
2882+
/* Not writable; close connection. */
2883+
appendPQExpBuffer(&conn->errorMessage,
2884+
libpq_gettext("could not make a writable "
2885+
"connection to server "
2886+
"\"%s:%s\"\n"),
2887+
conn->connhost[conn->whichhost].host,
2888+
conn->connhost[conn->whichhost].port);
2889+
conn->status = CONNECTION_OK;
2890+
sendTerminateConn(conn);
2891+
pqDropConnection(conn, true);
2892+
2893+
/* Skip any remaining addresses for this host. */
2894+
conn->addr_cur = NULL;
2895+
if (conn->whichhost + 1 < conn->nconnhost)
2896+
{
2897+
conn->status = CONNECTION_NEEDED;
2898+
goto keep_going;
2899+
}
2900+
2901+
/* No more addresses to try. So we fail. */
2902+
goto error_return;
2903+
}
2904+
PQclear(res);
2905+
2906+
/* We can release the address lists now. */
2907+
release_all_addrinfo(conn);
2908+
2909+
/* We are open for business! */
2910+
conn->status = CONNECTION_OK;
2911+
return PGRES_POLLING_OK;
2912+
}
2913+
2914+
/*
2915+
* Something went wrong with "show transaction_read_only". We
2916+
* should try next addresses.
2917+
*/
2918+
if (res)
2919+
PQclear(res);
2920+
appendPQExpBuffer(&conn->errorMessage,
2921+
libpq_gettext("test \"show transaction_read_only\" failed "
2922+
" on \"%s:%s\" \n"),
2923+
conn->connhost[conn->whichhost].host,
2924+
conn->connhost[conn->whichhost].port);
2925+
conn->status = CONNECTION_OK;
2926+
sendTerminateConn(conn);
2927+
pqDropConnection(conn, true);
2928+
2929+
if (conn->addr_cur->ai_next != NULL ||
2930+
conn->whichhost + 1 < conn->nconnhost)
2931+
{
2932+
conn->addr_cur = conn->addr_cur->ai_next;
2933+
conn->status = CONNECTION_NEEDED;
2934+
goto keep_going;
2935+
}
2936+
2937+
/* No more addresses to try. So we fail. */
2938+
goto error_return;
2939+
}
2940+
28212941
default:
28222942
appendPQExpBuffer(&conn->errorMessage,
28232943
libpq_gettext("invalid connection state %d, "
@@ -3109,6 +3229,8 @@ freePGconn(PGconn *conn)
31093229
free(conn->outBuffer);
31103230
if (conn->rowBuf)
31113231
free(conn->rowBuf);
3232+
if (conn->target_session_attrs)
3233+
free(conn->target_session_attrs);
31123234
termPQExpBuffer(&conn->errorMessage);
31133235
termPQExpBuffer(&conn->workBuffer);
31143236

@@ -3120,19 +3242,41 @@ freePGconn(PGconn *conn)
31203242
}
31213243

31223244
/*
3123-
* closePGconn
3124-
* - properly close a connection to the backend
3125-
*
3126-
* This should reset or release all transient state, but NOT the connection
3127-
* parameters. On exit, the PGconn should be in condition to start a fresh
3128-
* connection with the same parameters (see PQreset()).
3245+
* release_all_addrinfo
3246+
* - free addrinfo of all hostconn elements.
31293247
*/
3248+
31303249
static void
3131-
closePGconn(PGconn *conn)
3250+
release_all_addrinfo(PGconn *conn)
31323251
{
3133-
PGnotify *notify;
3134-
pgParameterStatus *pstatus;
3252+
if (conn->connhost != NULL)
3253+
{
3254+
int i;
3255+
3256+
for (i = 0; i < conn->nconnhost; ++i)
3257+
{
3258+
int family = AF_UNSPEC;
3259+
3260+
#ifdef HAVE_UNIX_SOCKETS
3261+
if (conn->connhost[i].type == CHT_UNIX_SOCKET)
3262+
family = AF_UNIX;
3263+
#endif
31353264

3265+
pg_freeaddrinfo_all(family,
3266+
conn->connhost[i].addrlist);
3267+
conn->connhost[i].addrlist = NULL;
3268+
}
3269+
}
3270+
conn->addr_cur = NULL;
3271+
}
3272+
3273+
/*
3274+
* sendTerminateConn
3275+
* - Send a terminate message to backend.
3276+
*/
3277+
static void
3278+
sendTerminateConn(PGconn *conn)
3279+
{
31363280
/*
31373281
* Note that the protocol doesn't allow us to send Terminate messages
31383282
* during the startup phase.
@@ -3147,6 +3291,23 @@ closePGconn(PGconn *conn)
31473291
pqPutMsgEnd(conn);
31483292
(void) pqFlush(conn);
31493293
}
3294+
}
3295+
3296+
/*
3297+
* closePGconn
3298+
* - properly close a connection to the backend
3299+
*
3300+
* This should reset or release all transient state, but NOT the connection
3301+
* parameters. On exit, the PGconn should be in condition to start a fresh
3302+
* connection with the same parameters (see PQreset()).
3303+
*/
3304+
static void
3305+
closePGconn(PGconn *conn)
3306+
{
3307+
PGnotify *notify;
3308+
pgParameterStatus *pstatus;
3309+
3310+
sendTerminateConn(conn);
31503311

31513312
/*
31523313
* Must reset the blocking status so a possible reconnect will work.
@@ -3165,25 +3326,8 @@ closePGconn(PGconn *conn)
31653326
conn->asyncStatus = PGASYNC_IDLE;
31663327
pqClearAsyncResult(conn); /* deallocate result */
31673328
resetPQExpBuffer(&conn->errorMessage);
3168-
if (conn->connhost != NULL)
3169-
{
3170-
int i;
3171-
3172-
for (i = 0; i < conn->nconnhost; ++i)
3173-
{
3174-
int family = AF_UNSPEC;
3175-
3176-
#ifdef HAVE_UNIX_SOCKETS
3177-
if (conn->connhost[i].type == CHT_UNIX_SOCKET)
3178-
family = AF_UNIX;
3179-
#endif
3329+
release_all_addrinfo(conn);
31803330

3181-
pg_freeaddrinfo_all(family,
3182-
conn->connhost[i].addrlist);
3183-
conn->connhost[i].addrlist = NULL;
3184-
}
3185-
}
3186-
conn->addr_cur = NULL;
31873331
notify = conn->notifyHead;
31883332
while (notify != NULL)
31893333
{

src/interfaces/libpq/libpq-fe.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ typedef enum
6262
* backend startup. */
6363
CONNECTION_SETENV, /* Negotiating environment. */
6464
CONNECTION_SSL_STARTUP, /* Negotiating SSL. */
65-
CONNECTION_NEEDED /* Internal state: connect() needed */
65+
CONNECTION_NEEDED, /* Internal state: connect() needed */
66+
CONNECTION_CHECK_WRITABLE /* Check if we could make a writable
67+
* connection. */
6668
} ConnStatusType;
6769

6870
typedef enum

0 commit comments

Comments
 (0)