|
| 1 | +diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c |
| 2 | +index 1a1e5b5..f52d2ba 100644 |
| 3 | +--- a/contrib/postgres_fdw/connection.c |
| 4 | ++++ b/contrib/postgres_fdw/connection.c |
| 5 | +@@ -15,11 +15,15 @@ |
| 6 | + #include "postgres_fdw.h" |
| 7 | + |
| 8 | + #include "access/xact.h" |
| 9 | ++#include "access/xtm.h" |
| 10 | ++#include "access/transam.h" |
| 11 | + #include "mb/pg_wchar.h" |
| 12 | + #include "miscadmin.h" |
| 13 | + #include "utils/hsearch.h" |
| 14 | + #include "utils/memutils.h" |
| 15 | + |
| 16 | ++#undef DEBUG3 |
| 17 | ++#define DEBUG3 WARNING |
| 18 | + |
| 19 | + /* |
| 20 | + * Connection cache hash table entry |
| 21 | +@@ -68,6 +72,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user); |
| 22 | + static void check_conn_params(const char **keywords, const char **values); |
| 23 | + static void configure_remote_session(PGconn *conn); |
| 24 | + static void do_sql_command(PGconn *conn, const char *sql); |
| 25 | ++static void do_sql_send_command(PGconn *conn, const char *sql); |
| 26 | ++static void do_sql_wait_command(PGconn *conn, const char *sql); |
| 27 | + static void begin_remote_xact(ConnCacheEntry *entry); |
| 28 | + static void pgfdw_xact_callback(XactEvent event, void *arg); |
| 29 | + static void pgfdw_subxact_callback(SubXactEvent event, |
| 30 | +@@ -358,6 +364,27 @@ do_sql_command(PGconn *conn, const char *sql) |
| 31 | + PQclear(res); |
| 32 | + } |
| 33 | + |
| 34 | ++static void |
| 35 | ++do_sql_send_command(PGconn *conn, const char *sql) |
| 36 | ++{ |
| 37 | ++ if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) { |
| 38 | ++ PGresult *res = PQgetResult(conn); |
| 39 | ++ pgfdw_report_error(ERROR, res, conn, true, sql); |
| 40 | ++ PQclear(res); |
| 41 | ++ } |
| 42 | ++} |
| 43 | ++ |
| 44 | ++static void |
| 45 | ++do_sql_wait_command(PGconn *conn, const char *sql) |
| 46 | ++{ |
| 47 | ++ PGresult *res; |
| 48 | ++ while ((res = PQgetResult(conn)) != NULL) { |
| 49 | ++ if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| 50 | ++ pgfdw_report_error(ERROR, res, conn, true, sql); |
| 51 | ++ PQclear(res); |
| 52 | ++ } |
| 53 | ++} |
| 54 | ++ |
| 55 | + /* |
| 56 | + * Start remote transaction or subtransaction, if needed. |
| 57 | + * |
| 58 | +@@ -376,11 +403,21 @@ begin_remote_xact(ConnCacheEntry *entry) |
| 59 | + /* Start main transaction if we haven't yet */ |
| 60 | + if (entry->xact_depth <= 0) |
| 61 | + { |
| 62 | ++ TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId(); |
| 63 | + const char *sql; |
| 64 | + |
| 65 | + elog(DEBUG3, "starting remote transaction on connection %p", |
| 66 | + entry->conn); |
| 67 | + |
| 68 | ++ if (TransactionIdIsValid(gxid)) { |
| 69 | ++ char stmt[64]; |
| 70 | ++ PGresult *res; |
| 71 | ++ |
| 72 | ++ snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid); |
| 73 | ++ res = PQexec(entry->conn, stmt); |
| 74 | ++ PQclear(res); |
| 75 | ++ } |
| 76 | ++ |
| 77 | + if (IsolationIsSerializable()) |
| 78 | + sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"; |
| 79 | + else |
| 80 | +@@ -541,16 +578,36 @@ pgfdw_xact_callback(XactEvent event, void *arg) |
| 81 | + /* If it has an open remote transaction, try to close it */ |
| 82 | + if (entry->xact_depth > 0) |
| 83 | + { |
| 84 | +- elog(DEBUG3, "closing remote transaction on connection %p", |
| 85 | +- entry->conn); |
| 86 | ++ elog(DEBUG3, "closing remote transaction on connection %p event %d", |
| 87 | ++ entry->conn, event); |
| 88 | + |
| 89 | + switch (event) |
| 90 | + { |
| 91 | + case XACT_EVENT_PARALLEL_PRE_COMMIT: |
| 92 | + case XACT_EVENT_PRE_COMMIT: |
| 93 | + /* Commit all remote transactions during pre-commit */ |
| 94 | +- do_sql_command(entry->conn, "COMMIT TRANSACTION"); |
| 95 | ++ do_sql_send_command(entry->conn, "COMMIT TRANSACTION"); |
| 96 | ++ continue; |
| 97 | + |
| 98 | ++ case XACT_EVENT_PRE_PREPARE: |
| 99 | ++ /* |
| 100 | ++ * We disallow remote transactions that modified anything, |
| 101 | ++ * since it's not very reasonable to hold them open until |
| 102 | ++ * the prepared transaction is committed. For the moment, |
| 103 | ++ * throw error unconditionally; later we might allow |
| 104 | ++ * read-only cases. Note that the error will cause us to |
| 105 | ++ * come right back here with event == XACT_EVENT_ABORT, so |
| 106 | ++ * we'll clean up the connection state at that point. |
| 107 | ++ */ |
| 108 | ++ ereport(ERROR, |
| 109 | ++ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 110 | ++ errmsg("cannot prepare a transaction that modified remote tables"))); |
| 111 | ++ break; |
| 112 | ++ |
| 113 | ++ case XACT_EVENT_PARALLEL_COMMIT: |
| 114 | ++ case XACT_EVENT_COMMIT: |
| 115 | ++ case XACT_EVENT_PREPARE: |
| 116 | ++ do_sql_wait_command(entry->conn, "COMMIT TRANSACTION"); |
| 117 | + /* |
| 118 | + * If there were any errors in subtransactions, and we |
| 119 | + * made prepared statements, do a DEALLOCATE ALL to make |
| 120 | +@@ -574,27 +631,7 @@ pgfdw_xact_callback(XactEvent event, void *arg) |
| 121 | + entry->have_prep_stmt = false; |
| 122 | + entry->have_error = false; |
| 123 | + break; |
| 124 | +- case XACT_EVENT_PRE_PREPARE: |
| 125 | + |
| 126 | +- /* |
| 127 | +- * We disallow remote transactions that modified anything, |
| 128 | +- * since it's not very reasonable to hold them open until |
| 129 | +- * the prepared transaction is committed. For the moment, |
| 130 | +- * throw error unconditionally; later we might allow |
| 131 | +- * read-only cases. Note that the error will cause us to |
| 132 | +- * come right back here with event == XACT_EVENT_ABORT, so |
| 133 | +- * we'll clean up the connection state at that point. |
| 134 | +- */ |
| 135 | +- ereport(ERROR, |
| 136 | +- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| 137 | +- errmsg("cannot prepare a transaction that modified remote tables"))); |
| 138 | +- break; |
| 139 | +- case XACT_EVENT_PARALLEL_COMMIT: |
| 140 | +- case XACT_EVENT_COMMIT: |
| 141 | +- case XACT_EVENT_PREPARE: |
| 142 | +- /* Pre-commit should have closed the open transaction */ |
| 143 | +- elog(ERROR, "missed cleaning up connection during pre-commit"); |
| 144 | +- break; |
| 145 | + case XACT_EVENT_PARALLEL_ABORT: |
| 146 | + case XACT_EVENT_ABORT: |
| 147 | + /* Assume we might have lost track of prepared statements */ |
| 148 | +@@ -618,6 +655,12 @@ pgfdw_xact_callback(XactEvent event, void *arg) |
| 149 | + entry->have_error = false; |
| 150 | + } |
| 151 | + break; |
| 152 | ++ |
| 153 | ++ case XACT_EVENT_START: |
| 154 | ++ case XACT_EVENT_ABORT_PREPARED: |
| 155 | ++ case XACT_EVENT_COMMIT_PREPARED: |
| 156 | ++ break; |
| 157 | ++ |
| 158 | + } |
| 159 | + } |
| 160 | + |
| 161 | +@@ -631,21 +674,23 @@ pgfdw_xact_callback(XactEvent event, void *arg) |
| 162 | + if (PQstatus(entry->conn) != CONNECTION_OK || |
| 163 | + PQtransactionStatus(entry->conn) != PQTRANS_IDLE) |
| 164 | + { |
| 165 | +- elog(DEBUG3, "discarding connection %p", entry->conn); |
| 166 | ++ elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn)); |
| 167 | + PQfinish(entry->conn); |
| 168 | + entry->conn = NULL; |
| 169 | + } |
| 170 | + } |
| 171 | + |
| 172 | +- /* |
| 173 | +- * Regardless of the event type, we can now mark ourselves as out of the |
| 174 | +- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, |
| 175 | +- * this saves a useless scan of the hashtable during COMMIT or PREPARE.) |
| 176 | +- */ |
| 177 | +- xact_got_connection = false; |
| 178 | ++ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) { |
| 179 | ++ /* |
| 180 | ++ * Regardless of the event type, we can now mark ourselves as out of the |
| 181 | ++ * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE, |
| 182 | ++ * this saves a useless scan of the hashtable during COMMIT or PREPARE.) |
| 183 | ++ */ |
| 184 | ++ xact_got_connection = false; |
| 185 | + |
| 186 | +- /* Also reset cursor numbering for next transaction */ |
| 187 | +- cursor_number = 0; |
| 188 | ++ /* Also reset cursor numbering for next transaction */ |
| 189 | ++ cursor_number = 0; |
| 190 | ++ } |
| 191 | + } |
| 192 | + |
| 193 | + /* |
| 194 | +diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql |
| 195 | +index a0f0fc1..0ce8f0e 100644 |
| 196 | +--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql |
| 197 | ++++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql |
| 198 | +@@ -16,3 +16,8 @@ LANGUAGE C STRICT; |
| 199 | + CREATE FOREIGN DATA WRAPPER postgres_fdw |
| 200 | + HANDLER postgres_fdw_handler |
| 201 | + VALIDATOR postgres_fdw_validator; |
| 202 | ++ |
| 203 | ++CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring) |
| 204 | ++RETURNS void |
| 205 | ++AS 'MODULE_PATHNAME' |
| 206 | ++LANGUAGE C STRICT; |
| 207 | +diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c |
| 208 | +index 1902f1f..1925c61 100644 |
| 209 | +--- a/contrib/postgres_fdw/postgres_fdw.c |
| 210 | ++++ b/contrib/postgres_fdw/postgres_fdw.c |
| 211 | +@@ -230,6 +230,7 @@ typedef struct |
| 212 | + * SQL functions |
| 213 | + */ |
| 214 | + PG_FUNCTION_INFO_V1(postgres_fdw_handler); |
| 215 | ++PG_FUNCTION_INFO_V1(postgres_fdw_exec); |
| 216 | + |
| 217 | + /* |
| 218 | + * FDW callback routines |
| 219 | +@@ -3002,3 +3003,19 @@ conversion_error_callback(void *arg) |
| 220 | + NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname), |
| 221 | + RelationGetRelationName(errpos->rel)); |
| 222 | + } |
| 223 | ++ |
| 224 | ++Datum |
| 225 | ++postgres_fdw_exec(PG_FUNCTION_ARGS) |
| 226 | ++{ |
| 227 | ++ Oid relid = PG_GETARG_OID(0); |
| 228 | ++ char const* sql = PG_GETARG_CSTRING(1); |
| 229 | ++ Oid userid = GetUserId(); |
| 230 | ++ ForeignTable *table = GetForeignTable(relid); |
| 231 | ++ ForeignServer *server = GetForeignServer(table->serverid); |
| 232 | ++ UserMapping *user = GetUserMapping(userid, server->serverid); |
| 233 | ++ PGconn* conn = GetConnection(server, user, false); |
| 234 | ++ PGresult* res = PQexec(conn, sql); |
| 235 | ++ PQclear(res); |
| 236 | ++ ReleaseConnection(conn); |
| 237 | ++ PG_RETURN_VOID(); |
| 238 | ++} |
0 commit comments