Skip to content

Commit 9096022

Browse files
committed
Add postgres patches in this repo
1 parent 75c1393 commit 9096022

File tree

2 files changed

+690
-0
lines changed

2 files changed

+690
-0
lines changed

postgres_fdw.patch

Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
2+
index 1a1e5b5..f52d2ba 100644
3+
--- a/contrib/postgres_fdw/connection.c
4+
+++ b/contrib/postgres_fdw/connection.c
5+
@@ -15,11 +15,15 @@
6+
#include "postgres_fdw.h"
7+
8+
#include "access/xact.h"
9+
+#include "access/xtm.h"
10+
+#include "access/transam.h"
11+
#include "mb/pg_wchar.h"
12+
#include "miscadmin.h"
13+
#include "utils/hsearch.h"
14+
#include "utils/memutils.h"
15+
16+
+#undef DEBUG3
17+
+#define DEBUG3 WARNING
18+
19+
/*
20+
* Connection cache hash table entry
21+
@@ -68,6 +72,8 @@ static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
22+
static void check_conn_params(const char **keywords, const char **values);
23+
static void configure_remote_session(PGconn *conn);
24+
static void do_sql_command(PGconn *conn, const char *sql);
25+
+static void do_sql_send_command(PGconn *conn, const char *sql);
26+
+static void do_sql_wait_command(PGconn *conn, const char *sql);
27+
static void begin_remote_xact(ConnCacheEntry *entry);
28+
static void pgfdw_xact_callback(XactEvent event, void *arg);
29+
static void pgfdw_subxact_callback(SubXactEvent event,
30+
@@ -358,6 +364,27 @@ do_sql_command(PGconn *conn, const char *sql)
31+
PQclear(res);
32+
}
33+
34+
+static void
35+
+do_sql_send_command(PGconn *conn, const char *sql)
36+
+{
37+
+ if (PQsendQuery(conn, sql) != PGRES_COMMAND_OK) {
38+
+ PGresult *res = PQgetResult(conn);
39+
+ pgfdw_report_error(ERROR, res, conn, true, sql);
40+
+ PQclear(res);
41+
+ }
42+
+}
43+
+
44+
+static void
45+
+do_sql_wait_command(PGconn *conn, const char *sql)
46+
+{
47+
+ PGresult *res;
48+
+ while ((res = PQgetResult(conn)) != NULL) {
49+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
50+
+ pgfdw_report_error(ERROR, res, conn, true, sql);
51+
+ PQclear(res);
52+
+ }
53+
+}
54+
+
55+
/*
56+
* Start remote transaction or subtransaction, if needed.
57+
*
58+
@@ -376,11 +403,21 @@ begin_remote_xact(ConnCacheEntry *entry)
59+
/* Start main transaction if we haven't yet */
60+
if (entry->xact_depth <= 0)
61+
{
62+
+ TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId();
63+
const char *sql;
64+
65+
elog(DEBUG3, "starting remote transaction on connection %p",
66+
entry->conn);
67+
68+
+ if (TransactionIdIsValid(gxid)) {
69+
+ char stmt[64];
70+
+ PGresult *res;
71+
+
72+
+ snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
73+
+ res = PQexec(entry->conn, stmt);
74+
+ PQclear(res);
75+
+ }
76+
+
77+
if (IsolationIsSerializable())
78+
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
79+
else
80+
@@ -541,16 +578,36 @@ pgfdw_xact_callback(XactEvent event, void *arg)
81+
/* If it has an open remote transaction, try to close it */
82+
if (entry->xact_depth > 0)
83+
{
84+
- elog(DEBUG3, "closing remote transaction on connection %p",
85+
- entry->conn);
86+
+ elog(DEBUG3, "closing remote transaction on connection %p event %d",
87+
+ entry->conn, event);
88+
89+
switch (event)
90+
{
91+
case XACT_EVENT_PARALLEL_PRE_COMMIT:
92+
case XACT_EVENT_PRE_COMMIT:
93+
/* Commit all remote transactions during pre-commit */
94+
- do_sql_command(entry->conn, "COMMIT TRANSACTION");
95+
+ do_sql_send_command(entry->conn, "COMMIT TRANSACTION");
96+
+ continue;
97+
98+
+ case XACT_EVENT_PRE_PREPARE:
99+
+ /*
100+
+ * We disallow remote transactions that modified anything,
101+
+ * since it's not very reasonable to hold them open until
102+
+ * the prepared transaction is committed. For the moment,
103+
+ * throw error unconditionally; later we might allow
104+
+ * read-only cases. Note that the error will cause us to
105+
+ * come right back here with event == XACT_EVENT_ABORT, so
106+
+ * we'll clean up the connection state at that point.
107+
+ */
108+
+ ereport(ERROR,
109+
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
110+
+ errmsg("cannot prepare a transaction that modified remote tables")));
111+
+ break;
112+
+
113+
+ case XACT_EVENT_PARALLEL_COMMIT:
114+
+ case XACT_EVENT_COMMIT:
115+
+ case XACT_EVENT_PREPARE:
116+
+ do_sql_wait_command(entry->conn, "COMMIT TRANSACTION");
117+
/*
118+
* If there were any errors in subtransactions, and we
119+
* made prepared statements, do a DEALLOCATE ALL to make
120+
@@ -574,27 +631,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
121+
entry->have_prep_stmt = false;
122+
entry->have_error = false;
123+
break;
124+
- case XACT_EVENT_PRE_PREPARE:
125+
126+
- /*
127+
- * We disallow remote transactions that modified anything,
128+
- * since it's not very reasonable to hold them open until
129+
- * the prepared transaction is committed. For the moment,
130+
- * throw error unconditionally; later we might allow
131+
- * read-only cases. Note that the error will cause us to
132+
- * come right back here with event == XACT_EVENT_ABORT, so
133+
- * we'll clean up the connection state at that point.
134+
- */
135+
- ereport(ERROR,
136+
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
137+
- errmsg("cannot prepare a transaction that modified remote tables")));
138+
- break;
139+
- case XACT_EVENT_PARALLEL_COMMIT:
140+
- case XACT_EVENT_COMMIT:
141+
- case XACT_EVENT_PREPARE:
142+
- /* Pre-commit should have closed the open transaction */
143+
- elog(ERROR, "missed cleaning up connection during pre-commit");
144+
- break;
145+
case XACT_EVENT_PARALLEL_ABORT:
146+
case XACT_EVENT_ABORT:
147+
/* Assume we might have lost track of prepared statements */
148+
@@ -618,6 +655,12 @@ pgfdw_xact_callback(XactEvent event, void *arg)
149+
entry->have_error = false;
150+
}
151+
break;
152+
+
153+
+ case XACT_EVENT_START:
154+
+ case XACT_EVENT_ABORT_PREPARED:
155+
+ case XACT_EVENT_COMMIT_PREPARED:
156+
+ break;
157+
+
158+
}
159+
}
160+
161+
@@ -631,21 +674,23 @@ pgfdw_xact_callback(XactEvent event, void *arg)
162+
if (PQstatus(entry->conn) != CONNECTION_OK ||
163+
PQtransactionStatus(entry->conn) != PQTRANS_IDLE)
164+
{
165+
- elog(DEBUG3, "discarding connection %p", entry->conn);
166+
+ elog(DEBUG3, "discarding connection %p, conn status=%d, trans status=%d", entry->conn, PQstatus(entry->conn), PQtransactionStatus(entry->conn));
167+
PQfinish(entry->conn);
168+
entry->conn = NULL;
169+
}
170+
}
171+
172+
- /*
173+
- * Regardless of the event type, we can now mark ourselves as out of the
174+
- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
175+
- * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
176+
- */
177+
- xact_got_connection = false;
178+
+ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT) {
179+
+ /*
180+
+ * Regardless of the event type, we can now mark ourselves as out of the
181+
+ * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
182+
+ * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
183+
+ */
184+
+ xact_got_connection = false;
185+
186+
- /* Also reset cursor numbering for next transaction */
187+
- cursor_number = 0;
188+
+ /* Also reset cursor numbering for next transaction */
189+
+ cursor_number = 0;
190+
+ }
191+
}
192+
193+
/*
194+
diff --git a/contrib/postgres_fdw/postgres_fdw--1.0.sql b/contrib/postgres_fdw/postgres_fdw--1.0.sql
195+
index a0f0fc1..0ce8f0e 100644
196+
--- a/contrib/postgres_fdw/postgres_fdw--1.0.sql
197+
+++ b/contrib/postgres_fdw/postgres_fdw--1.0.sql
198+
@@ -16,3 +16,8 @@ LANGUAGE C STRICT;
199+
CREATE FOREIGN DATA WRAPPER postgres_fdw
200+
HANDLER postgres_fdw_handler
201+
VALIDATOR postgres_fdw_validator;
202+
+
203+
+CREATE FUNCTION postgres_fdw_exec(relid oid, sql cstring)
204+
+RETURNS void
205+
+AS 'MODULE_PATHNAME'
206+
+LANGUAGE C STRICT;
207+
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
208+
index 1902f1f..1925c61 100644
209+
--- a/contrib/postgres_fdw/postgres_fdw.c
210+
+++ b/contrib/postgres_fdw/postgres_fdw.c
211+
@@ -230,6 +230,7 @@ typedef struct
212+
* SQL functions
213+
*/
214+
PG_FUNCTION_INFO_V1(postgres_fdw_handler);
215+
+PG_FUNCTION_INFO_V1(postgres_fdw_exec);
216+
217+
/*
218+
* FDW callback routines
219+
@@ -3002,3 +3003,19 @@ conversion_error_callback(void *arg)
220+
NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
221+
RelationGetRelationName(errpos->rel));
222+
}
223+
+
224+
+Datum
225+
+postgres_fdw_exec(PG_FUNCTION_ARGS)
226+
+{
227+
+ Oid relid = PG_GETARG_OID(0);
228+
+ char const* sql = PG_GETARG_CSTRING(1);
229+
+ Oid userid = GetUserId();
230+
+ ForeignTable *table = GetForeignTable(relid);
231+
+ ForeignServer *server = GetForeignServer(table->serverid);
232+
+ UserMapping *user = GetUserMapping(userid, server->serverid);
233+
+ PGconn* conn = GetConnection(server, user, false);
234+
+ PGresult* res = PQexec(conn, sql);
235+
+ PQclear(res);
236+
+ ReleaseConnection(conn);
237+
+ PG_RETURN_VOID();
238+
+}

0 commit comments

Comments
 (0)