22
22
#include "storage/latch.h"
23
23
#include "storage/proc.h"
24
24
#include "utils/hsearch.h"
25
+ #include "utils/inval.h"
25
26
#include "utils/memutils.h"
26
27
#include "utils/syscache.h"
27
28
#include "utils/timestamp.h"
@@ -51,11 +52,15 @@ typedef struct ConnCacheEntry
51
52
{
52
53
ConnCacheKey key ; /* hash key (must be first) */
53
54
PGconn * conn ; /* connection to foreign server, or NULL */
55
+ /* Remaining fields are invalid when conn is NULL: */
54
56
int xact_depth ; /* 0 = no xact open, 1 = main xact open, 2 =
55
57
* one level of subxact open, etc */
56
58
bool have_prep_stmt ; /* have we prepared any stmts in this xact? */
57
59
bool have_error ; /* have any subxacts aborted in this xact? */
58
60
bool changing_xact_state ; /* xact state change in process */
61
+ bool invalidated ; /* true if reconnect is pending */
62
+ uint32 server_hashvalue ; /* hash value of foreign server OID */
63
+ uint32 mapping_hashvalue ; /* hash value of user mapping OID */
59
64
} ConnCacheEntry ;
60
65
61
66
/*
@@ -72,6 +77,7 @@ static bool xact_got_connection = false;
72
77
73
78
/* prototypes of private functions */
74
79
static PGconn * connect_pg_server (ForeignServer * server , UserMapping * user );
80
+ static void disconnect_pg_server (ConnCacheEntry * entry );
75
81
static void check_conn_params (const char * * keywords , const char * * values );
76
82
static void configure_remote_session (PGconn * conn );
77
83
static void do_sql_command (PGconn * conn , const char * sql );
@@ -81,6 +87,7 @@ static void pgfdw_subxact_callback(SubXactEvent event,
81
87
SubTransactionId mySubid ,
82
88
SubTransactionId parentSubid ,
83
89
void * arg );
90
+ static void pgfdw_inval_callback (Datum arg , int cacheid , uint32 hashvalue );
84
91
static void pgfdw_reject_incomplete_xact_state_change (ConnCacheEntry * entry );
85
92
static bool pgfdw_cancel_query (PGconn * conn );
86
93
static bool pgfdw_exec_cleanup_query (PGconn * conn , const char * query ,
@@ -98,13 +105,6 @@ static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
98
105
* will_prep_stmt must be true if caller intends to create any prepared
99
106
* statements. Since those don't go away automatically at transaction end
100
107
* (not even on error), we need this flag to cue manual cleanup.
101
- *
102
- * XXX Note that caching connections theoretically requires a mechanism to
103
- * detect change of FDW objects to invalidate already established connections.
104
- * We could manage that by watching for invalidation events on the relevant
105
- * syscaches. For the moment, though, it's not clear that this would really
106
- * be useful and not mere pedantry. We could not flush any active connections
107
- * mid-transaction anyway.
108
108
*/
109
109
PGconn *
110
110
GetConnection (ForeignServer * server , UserMapping * user ,
@@ -135,6 +135,10 @@ GetConnection(ForeignServer *server, UserMapping *user,
135
135
*/
136
136
RegisterXactCallback (pgfdw_xact_callback , NULL );
137
137
RegisterSubXactCallback (pgfdw_subxact_callback , NULL );
138
+ CacheRegisterSyscacheCallback (FOREIGNSERVEROID ,
139
+ pgfdw_inval_callback , (Datum ) 0 );
140
+ CacheRegisterSyscacheCallback (USERMAPPINGOID ,
141
+ pgfdw_inval_callback , (Datum ) 0 );
138
142
}
139
143
140
144
/* Set flag that we did GetConnection during the current transaction */
@@ -150,17 +154,27 @@ GetConnection(ForeignServer *server, UserMapping *user,
150
154
entry = hash_search (ConnectionHash , & key , HASH_ENTER , & found );
151
155
if (!found )
152
156
{
153
- /* initialize new hashtable entry (key is already filled in) */
157
+ /*
158
+ * We need only clear "conn" here; remaining fields will be filled
159
+ * later when "conn" is set.
160
+ */
154
161
entry -> conn = NULL ;
155
- entry -> xact_depth = 0 ;
156
- entry -> have_prep_stmt = false;
157
- entry -> have_error = false;
158
- entry -> changing_xact_state = false;
159
162
}
160
163
161
164
/* Reject further use of connections which failed abort cleanup. */
162
165
pgfdw_reject_incomplete_xact_state_change (entry );
163
166
167
+ /*
168
+ * If the connection needs to be remade due to invalidation, disconnect as
169
+ * soon as we're out of all transactions.
170
+ */
171
+ if (entry -> conn != NULL && entry -> invalidated && entry -> xact_depth == 0 )
172
+ {
173
+ elog (DEBUG3 , "closing connection %p for option changes to take effect" ,
174
+ entry -> conn );
175
+ disconnect_pg_server (entry );
176
+ }
177
+
164
178
/*
165
179
* We don't check the health of cached connection here, because it would
166
180
* require some overhead. Broken connection will be detected when the
@@ -170,13 +184,36 @@ GetConnection(ForeignServer *server, UserMapping *user,
170
184
/*
171
185
* If cache entry doesn't have a connection, we have to establish a new
172
186
* connection. (If connect_pg_server throws an error, the cache entry
173
- * will be left in a valid empty state.)
187
+ * will remain in a valid empty state, ie conn == NULL .)
174
188
*/
175
189
if (entry -> conn == NULL )
176
190
{
177
- entry -> xact_depth = 0 ; /* just to be sure */
191
+ Oid umoid ;
192
+
193
+ /* Reset all transient state fields, to be sure all are clean */
194
+ entry -> xact_depth = 0 ;
178
195
entry -> have_prep_stmt = false;
179
196
entry -> have_error = false;
197
+ entry -> changing_xact_state = false;
198
+ entry -> invalidated = false;
199
+ entry -> server_hashvalue =
200
+ GetSysCacheHashValue1 (FOREIGNSERVEROID ,
201
+ ObjectIdGetDatum (server -> serverid ));
202
+ /* Pre-9.6, UserMapping doesn't store its OID, so look it up again */
203
+ umoid = GetSysCacheOid2 (USERMAPPINGUSERSERVER ,
204
+ ObjectIdGetDatum (user -> userid ),
205
+ ObjectIdGetDatum (user -> serverid ));
206
+ if (!OidIsValid (umoid ))
207
+ {
208
+ /* Not found for the specific user -- try PUBLIC */
209
+ umoid = GetSysCacheOid2 (USERMAPPINGUSERSERVER ,
210
+ ObjectIdGetDatum (InvalidOid ),
211
+ ObjectIdGetDatum (user -> serverid ));
212
+ }
213
+ entry -> mapping_hashvalue =
214
+ GetSysCacheHashValue1 (USERMAPPINGOID , ObjectIdGetDatum (umoid ));
215
+
216
+ /* Now try to make the connection */
180
217
entry -> conn = connect_pg_server (server , user );
181
218
elog (DEBUG3 , "new postgres_fdw connection %p for server \"%s\"" ,
182
219
entry -> conn , server -> servername );
@@ -289,6 +326,19 @@ connect_pg_server(ForeignServer *server, UserMapping *user)
289
326
return conn ;
290
327
}
291
328
329
+ /*
330
+ * Disconnect any open connection for a connection cache entry.
331
+ */
332
+ static void
333
+ disconnect_pg_server (ConnCacheEntry * entry )
334
+ {
335
+ if (entry -> conn != NULL )
336
+ {
337
+ PQfinish (entry -> conn );
338
+ entry -> conn = NULL ;
339
+ }
340
+ }
341
+
292
342
/*
293
343
* For non-superusers, insist that the connstr specify a password. This
294
344
* prevents a password from being picked up from .pgpass, a service file,
@@ -787,9 +837,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
787
837
entry -> changing_xact_state )
788
838
{
789
839
elog (DEBUG3 , "discarding connection %p" , entry -> conn );
790
- PQfinish (entry -> conn );
791
- entry -> conn = NULL ;
792
- entry -> changing_xact_state = false;
840
+ disconnect_pg_server (entry );
793
841
}
794
842
}
795
843
@@ -906,6 +954,47 @@ pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
906
954
}
907
955
}
908
956
957
+ /*
958
+ * Connection invalidation callback function
959
+ *
960
+ * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
961
+ * mark connections depending on that entry as needing to be remade.
962
+ * We can't immediately destroy them, since they might be in the midst of
963
+ * a transaction, but we'll remake them at the next opportunity.
964
+ *
965
+ * Although most cache invalidation callbacks blow away all the related stuff
966
+ * regardless of the given hashvalue, connections are expensive enough that
967
+ * it's worth trying to avoid that.
968
+ *
969
+ * NB: We could avoid unnecessary disconnection more strictly by examining
970
+ * individual option values, but it seems too much effort for the gain.
971
+ */
972
+ static void
973
+ pgfdw_inval_callback (Datum arg , int cacheid , uint32 hashvalue )
974
+ {
975
+ HASH_SEQ_STATUS scan ;
976
+ ConnCacheEntry * entry ;
977
+
978
+ Assert (cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID );
979
+
980
+ /* ConnectionHash must exist already, if we're registered */
981
+ hash_seq_init (& scan , ConnectionHash );
982
+ while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
983
+ {
984
+ /* Ignore invalid entries */
985
+ if (entry -> conn == NULL )
986
+ continue ;
987
+
988
+ /* hashvalue == 0 means a cache reset, must clear all state */
989
+ if (hashvalue == 0 ||
990
+ (cacheid == FOREIGNSERVEROID &&
991
+ entry -> server_hashvalue == hashvalue ) ||
992
+ (cacheid == USERMAPPINGOID &&
993
+ entry -> mapping_hashvalue == hashvalue ))
994
+ entry -> invalidated = true;
995
+ }
996
+ }
997
+
909
998
/*
910
999
* Raise an error if the given connection cache entry is marked as being
911
1000
* in the middle of an xact state change. This should be called at which no
@@ -921,10 +1010,16 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
921
1010
{
922
1011
ForeignServer * server ;
923
1012
924
- if (!entry -> changing_xact_state )
1013
+ /* nothing to do for inactive entries and entries of sane state */
1014
+ if (entry -> conn == NULL || !entry -> changing_xact_state )
925
1015
return ;
926
1016
1017
+ /* make sure this entry is inactive */
1018
+ disconnect_pg_server (entry );
1019
+
1020
+ /* find server name to be shown in the message below */
927
1021
server = GetForeignServer (entry -> key .serverid );
1022
+
928
1023
ereport (ERROR ,
929
1024
(errcode (ERRCODE_CONNECTION_EXCEPTION ),
930
1025
errmsg ("connection to server \"%s\" was lost" ,
0 commit comments