Skip to content

Commit 6971141

Browse files
committed
Re-establish postgres_fdw connections after server or user mapping changes.
Previously, postgres_fdw would keep on using an existing connection even if the user did ALTER SERVER or ALTER USER MAPPING commands that should affect connection parameters. Teach it to watch for catcache invals on these catalogs and re-establish connections when the relevant catalog entries change. Per bug #14738 from Michal Lis. In passing, clean up some rather crufty decisions in commit ae9bfc5 about where fields of ConnCacheEntry should be reset. We now reset all the fields whenever we open a new connection. Kyotaro Horiguchi, reviewed by Ashutosh Bapat and myself. Back-patch to 9.3 where postgres_fdw appeared. Discussion: https://postgr.es/m/20170710113917.7727.10247@wrigleys.postgresql.org
1 parent a6088e1 commit 6971141

File tree

3 files changed

+170
-18
lines changed

3 files changed

+170
-18
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 113 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "storage/latch.h"
2323
#include "storage/proc.h"
2424
#include "utils/hsearch.h"
25+
#include "utils/inval.h"
2526
#include "utils/memutils.h"
2627
#include "utils/syscache.h"
2728
#include "utils/timestamp.h"
@@ -51,11 +52,15 @@ typedef struct ConnCacheEntry
5152
{
5253
ConnCacheKey key; /* hash key (must be first) */
5354
PGconn *conn; /* connection to foreign server, or NULL */
55+
/* Remaining fields are invalid when conn is NULL: */
5456
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
5557
* one level of subxact open, etc */
5658
bool have_prep_stmt; /* have we prepared any stmts in this xact? */
5759
bool have_error; /* have any subxacts aborted in this xact? */
5860
bool changing_xact_state; /* xact state change in process */
61+
bool invalidated; /* true if reconnect is pending */
62+
uint32 server_hashvalue; /* hash value of foreign server OID */
63+
uint32 mapping_hashvalue; /* hash value of user mapping OID */
5964
} ConnCacheEntry;
6065

6166
/*
@@ -72,6 +77,7 @@ static bool xact_got_connection = false;
7277

7378
/* prototypes of private functions */
7479
static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
80+
static void disconnect_pg_server(ConnCacheEntry *entry);
7581
static void check_conn_params(const char **keywords, const char **values);
7682
static void configure_remote_session(PGconn *conn);
7783
static void do_sql_command(PGconn *conn, const char *sql);
@@ -81,6 +87,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
8187
SubTransactionId mySubid,
8288
SubTransactionId parentSubid,
8389
void *arg);
90+
static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
8491
static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
8592
static bool pgfdw_cancel_query(PGconn *conn);
8693
static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
@@ -98,13 +105,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
98105
* will_prep_stmt must be true if caller intends to create any prepared
99106
* statements. Since those don't go away automatically at transaction end
100107
* (not even on error), we need this flag to cue manual cleanup.
101-
*
102-
* XXX Note that caching connections theoretically requires a mechanism to
103-
* detect change of FDW objects to invalidate already established connections.
104-
* We could manage that by watching for invalidation events on the relevant
105-
* syscaches. For the moment, though, it's not clear that this would really
106-
* be useful and not mere pedantry. We could not flush any active connections
107-
* mid-transaction anyway.
108108
*/
109109
PGconn *
110110
GetConnection(ForeignServer *server, UserMapping *user,
@@ -135,6 +135,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
135135
*/
136136
RegisterXactCallback(pgfdw_xact_callback, NULL);
137137
RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
138+
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
139+
pgfdw_inval_callback, (Datum) 0);
140+
CacheRegisterSyscacheCallback(USERMAPPINGOID,
141+
pgfdw_inval_callback, (Datum) 0);
138142
}
139143

140144
/* Set flag that we did GetConnection during the current transaction */
@@ -150,17 +154,27 @@ GetConnection(ForeignServer *server, UserMapping *user,
150154
entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
151155
if (!found)
152156
{
153-
/* initialize new hashtable entry (key is already filled in) */
157+
/*
158+
* We need only clear "conn" here; remaining fields will be filled
159+
* later when "conn" is set.
160+
*/
154161
entry->conn = NULL;
155-
entry->xact_depth = 0;
156-
entry->have_prep_stmt = false;
157-
entry->have_error = false;
158-
entry->changing_xact_state = false;
159162
}
160163

161164
/* Reject further use of connections which failed abort cleanup. */
162165
pgfdw_reject_incomplete_xact_state_change(entry);
163166

167+
/*
168+
* If the connection needs to be remade due to invalidation, disconnect as
169+
* soon as we're out of all transactions.
170+
*/
171+
if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
172+
{
173+
elog(DEBUG3, "closing connection %p for option changes to take effect",
174+
entry->conn);
175+
disconnect_pg_server(entry);
176+
}
177+
164178
/*
165179
* We don't check the health of cached connection here, because it would
166180
* require some overhead. Broken connection will be detected when the
@@ -170,13 +184,36 @@ GetConnection(ForeignServer *server, UserMapping *user,
170184
/*
171185
* If cache entry doesn't have a connection, we have to establish a new
172186
* connection. (If connect_pg_server throws an error, the cache entry
173-
* will be left in a valid empty state.)
187+
* will remain in a valid empty state, ie conn == NULL.)
174188
*/
175189
if (entry->conn == NULL)
176190
{
177-
entry->xact_depth = 0; /* just to be sure */
191+
Oid umoid;
192+
193+
/* Reset all transient state fields, to be sure all are clean */
194+
entry->xact_depth = 0;
178195
entry->have_prep_stmt = false;
179196
entry->have_error = false;
197+
entry->changing_xact_state = false;
198+
entry->invalidated = false;
199+
entry->server_hashvalue =
200+
GetSysCacheHashValue1(FOREIGNSERVEROID,
201+
ObjectIdGetDatum(server->serverid));
202+
/* Pre-9.6, UserMapping doesn't store its OID, so look it up again */
203+
umoid = GetSysCacheOid2(USERMAPPINGUSERSERVER,
204+
ObjectIdGetDatum(user->userid),
205+
ObjectIdGetDatum(user->serverid));
206+
if (!OidIsValid(umoid))
207+
{
208+
/* Not found for the specific user -- try PUBLIC */
209+
umoid = GetSysCacheOid2(USERMAPPINGUSERSERVER,
210+
ObjectIdGetDatum(InvalidOid),
211+
ObjectIdGetDatum(user->serverid));
212+
}
213+
entry->mapping_hashvalue =
214+
GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(umoid));
215+
216+
/* Now try to make the connection */
180217
entry->conn = connect_pg_server(server, user);
181218
elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\"",
182219
entry->conn, server->servername);
@@ -289,6 +326,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
289326
return conn;
290327
}
291328

329+
/*
330+
* Disconnect any open connection for a connection cache entry.
331+
*/
332+
static void
333+
disconnect_pg_server(ConnCacheEntry *entry)
334+
{
335+
if (entry->conn != NULL)
336+
{
337+
PQfinish(entry->conn);
338+
entry->conn = NULL;
339+
}
340+
}
341+
292342
/*
293343
* For non-superusers, insist that the connstr specify a password. This
294344
* prevents a password from being picked up from .pgpass, a service file,
@@ -787,9 +837,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
787837
entry->changing_xact_state)
788838
{
789839
elog(DEBUG3, "discarding connection %p", entry->conn);
790-
PQfinish(entry->conn);
791-
entry->conn = NULL;
792-
entry->changing_xact_state = false;
840+
disconnect_pg_server(entry);
793841
}
794842
}
795843

@@ -906,6 +954,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
906954
}
907955
}
908956

957+
/*
958+
* Connection invalidation callback function
959+
*
960+
* After a change to a pg_foreign_server or pg_user_mapping catalog entry,
961+
* mark connections depending on that entry as needing to be remade.
962+
* We can't immediately destroy them, since they might be in the midst of
963+
* a transaction, but we'll remake them at the next opportunity.
964+
*
965+
* Although most cache invalidation callbacks blow away all the related stuff
966+
* regardless of the given hashvalue, connections are expensive enough that
967+
* it's worth trying to avoid that.
968+
*
969+
* NB: We could avoid unnecessary disconnection more strictly by examining
970+
* individual option values, but it seems too much effort for the gain.
971+
*/
972+
static void
973+
pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
974+
{
975+
HASH_SEQ_STATUS scan;
976+
ConnCacheEntry *entry;
977+
978+
Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
979+
980+
/* ConnectionHash must exist already, if we're registered */
981+
hash_seq_init(&scan, ConnectionHash);
982+
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
983+
{
984+
/* Ignore invalid entries */
985+
if (entry->conn == NULL)
986+
continue;
987+
988+
/* hashvalue == 0 means a cache reset, must clear all state */
989+
if (hashvalue == 0 ||
990+
(cacheid == FOREIGNSERVEROID &&
991+
entry->server_hashvalue == hashvalue) ||
992+
(cacheid == USERMAPPINGOID &&
993+
entry->mapping_hashvalue == hashvalue))
994+
entry->invalidated = true;
995+
}
996+
}
997+
909998
/*
910999
* Raise an error if the given connection cache entry is marked as being
9111000
* in the middle of an xact state change. This should be called at which no
@@ -921,10 +1010,16 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
9211010
{
9221011
ForeignServer *server;
9231012

924-
if (!entry->changing_xact_state)
1013+
/* nothing to do for inactive entries and entries of sane state */
1014+
if (entry->conn == NULL || !entry->changing_xact_state)
9251015
return;
9261016

1017+
/* make sure this entry is inactive */
1018+
disconnect_pg_server(entry);
1019+
1020+
/* find server name to be shown in the message below */
9271021
server = GetForeignServer(entry->key.serverid);
1022+
9281023
ereport(ERROR,
9291024
(errcode(ERRCODE_CONNECTION_EXCEPTION),
9301025
errmsg("connection to server \"%s\" was lost",

contrib/postgres_fdw/expected/postgres_fdw.out

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,43 @@ ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
120120
public | ft2 | loopback | (schema_name 'S 1', table_name 'T 1') |
121121
(2 rows)
122122

123+
-- Test that alteration of server options causes reconnection
124+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work
125+
c3 | c4
126+
-------+------------------------------
127+
00001 | Fri Jan 02 00:00:00 1970 PST
128+
(1 row)
129+
130+
ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
131+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
132+
ERROR: could not connect to server "loopback"
133+
DETAIL: FATAL: database "no such database" does not exist
134+
DO $d$
135+
BEGIN
136+
EXECUTE $$ALTER SERVER loopback
137+
OPTIONS (SET dbname '$$||current_database()||$$')$$;
138+
END;
139+
$d$;
140+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
141+
c3 | c4
142+
-------+------------------------------
143+
00001 | Fri Jan 02 00:00:00 1970 PST
144+
(1 row)
145+
146+
-- Test that alteration of user mapping options causes reconnection
147+
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
148+
OPTIONS (ADD user 'no such user');
149+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
150+
ERROR: could not connect to server "loopback"
151+
DETAIL: FATAL: role "no such user" does not exist
152+
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
153+
OPTIONS (DROP user);
154+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
155+
c3 | c4
156+
-------+------------------------------
157+
00001 | Fri Jan 02 00:00:00 1970 PST
158+
(1 row)
159+
123160
-- Now we should be able to run ANALYZE.
124161
-- To exercise multiple code paths, we use local stats on ft1
125162
-- and remote-estimate mode on ft2.

contrib/postgres_fdw/sql/postgres_fdw.sql

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,26 @@ ALTER FOREIGN TABLE ft1 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
123123
ALTER FOREIGN TABLE ft2 ALTER COLUMN c1 OPTIONS (column_name 'C 1');
124124
\det+
125125

126+
-- Test that alteration of server options causes reconnection
127+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work
128+
ALTER SERVER loopback OPTIONS (SET dbname 'no such database');
129+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
130+
DO $d$
131+
BEGIN
132+
EXECUTE $$ALTER SERVER loopback
133+
OPTIONS (SET dbname '$$||current_database()||$$')$$;
134+
END;
135+
$d$;
136+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
137+
138+
-- Test that alteration of user mapping options causes reconnection
139+
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
140+
OPTIONS (ADD user 'no such user');
141+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should fail
142+
ALTER USER MAPPING FOR CURRENT_USER SERVER loopback
143+
OPTIONS (DROP user);
144+
SELECT c3, c4 FROM ft1 ORDER BY c3, c1 LIMIT 1; -- should work again
145+
126146
-- Now we should be able to run ANALYZE.
127147
-- To exercise multiple code paths, we use local stats on ft1
128148
-- and remote-estimate mode on ft2.

0 commit comments

Comments
 (0)