17
17
#include "commands/extension.h"
18
18
#include "executor/executor.h"
19
19
#include "fmgr.h"
20
+ #include "foreign/foreign.h"
20
21
#include "libpq/libpq.h"
21
22
#include "libpq-fe.h"
23
+ #include "miscadmin.h"
22
24
#include "optimizer/planner.h"
25
+ #include "pgstat.h"
26
+ #include "postgres_fdw.h"
27
+ #include "storage/latch.h"
23
28
#include "tcop/utility.h"
29
+ #include "utils/builtins.h"
24
30
#include "utils/guc.h"
31
+ #include "utils/memutils.h"
32
+
25
33
26
34
PG_MODULE_MAGIC ;
27
35
@@ -33,8 +41,7 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
33
41
34
42
static void HOOK_Utility_injection (PlannedStmt * pstmt , const char * queryString ,
35
43
ProcessUtilityContext context , ParamListInfo params ,
36
- QueryEnvironment * queryEnv , DestReceiver * dest ,
37
- char * completionTag );
44
+ DestReceiver * dest , char * completionTag );
38
45
static void HOOK_ExecStart_injection (QueryDesc * queryDesc , int eflags );
39
46
static void HOOK_ExecEnd_injection (QueryDesc * queryDesc );
40
47
@@ -44,6 +51,10 @@ char *remote_server_fdwname;
44
51
static bool ExtensionIsActivated = false;
45
52
static PGconn * conn = NULL ;
46
53
54
+ static Oid serverid = InvalidOid ;
55
+ static UserMapping * user = NULL ;
56
+
57
+
47
58
/*
48
59
* Module load/unload callback
49
60
*/
@@ -80,79 +91,14 @@ ExtensionIsActive(void)
80
91
81
92
if (
82
93
!IsTransactionState () ||
83
- !OidIsValid (get_extension_oid ("repeater " , true))
94
+ !OidIsValid (get_extension_oid ("pg_repeater " , true))
84
95
)
85
96
return false;
86
97
87
98
ExtensionIsActivated = true;
88
99
return ExtensionIsActivated ;
89
100
}
90
101
91
- #include "miscadmin.h"
92
- #include "pgstat.h"
93
- #include "storage/latch.h"
94
-
95
- #include "foreign/foreign.h"
96
- #include "postgres_fdw.h"
97
-
98
- static Oid serverid = InvalidOid ;
99
- static UserMapping * user = NULL ;
100
-
101
- static bool
102
- pgfdw_cancel_query (PGconn * conn )
103
- {
104
- PGcancel * cancel ;
105
- char errbuf [256 ];
106
- PGresult * result = NULL ;
107
-
108
- if ((cancel = PQgetCancel (conn )))
109
- {
110
- if (!PQcancel (cancel , errbuf , sizeof (errbuf )))
111
- {
112
- ereport (WARNING ,
113
- (errcode (ERRCODE_CONNECTION_FAILURE ),
114
- errmsg ("could not send cancel request: %s" ,
115
- errbuf )));
116
- PQfreeCancel (cancel );
117
- return false;
118
- }
119
-
120
- PQfreeCancel (cancel );
121
- }
122
- else
123
- elog (FATAL , "Can't get connection cancel descriptor" );
124
-
125
- PQconsumeInput (conn );
126
- PQclear (result );
127
-
128
- return true;
129
- }
130
-
131
- static void
132
- cancelQueryIfNeeded (PGconn * conn , const char * query )
133
- {
134
- Assert (conn != NULL );
135
- Assert (query != NULL );
136
-
137
- if (PQtransactionStatus (conn ) != PQTRANS_IDLE )
138
- {
139
- PGresult * res ;
140
-
141
- printf ("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n" ,
142
- PQstatus (conn ),
143
- PQtransactionStatus (conn ),
144
- PQerrorMessage (conn ));
145
-
146
- res = PQgetResult (conn );
147
-
148
- if (PQresultStatus (res ) == PGRES_FATAL_ERROR )
149
- Assert (pgfdw_cancel_query (conn ));
150
- else
151
- pgfdw_get_result (conn , query );
152
- }
153
-
154
- }
155
-
156
102
/*
157
103
* We need to send some DML queries for sync database schema to a plan execution
158
104
* at a remote instance.
@@ -162,7 +108,6 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
162
108
const char * queryString ,
163
109
ProcessUtilityContext context ,
164
110
ParamListInfo params ,
165
- QueryEnvironment * queryEnv ,
166
111
DestReceiver * dest ,
167
112
char * completionTag )
168
113
{
@@ -192,6 +137,8 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
192
137
case T_VacuumStmt :
193
138
break ;
194
139
default :
140
+ {
141
+ PGresult * res ;
195
142
if (nodeTag (parsetree ) == T_TransactionStmt )
196
143
{
197
144
TransactionStmt * stmt = (TransactionStmt * ) parsetree ;
@@ -202,26 +149,23 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
202
149
)
203
150
break ;
204
151
}
205
- if (conn )
206
- cancelQueryIfNeeded (conn , queryString );
207
152
conn = GetConnection (user , true);
208
- cancelQueryIfNeeded (conn , queryString );
209
153
Assert (conn != NULL );
210
154
211
- Assert (PQsendQuery (conn , queryString ));
155
+ res = PQexec (conn , queryString );
156
+ PQclear (res );
157
+ }
212
158
break ;
213
- };
159
+ }
214
160
}
215
161
216
162
if (next_ProcessUtility_hook )
217
163
(* next_ProcessUtility_hook ) (pstmt , queryString , context , params ,
218
- queryEnv , dest , completionTag );
164
+ dest , completionTag );
219
165
else
220
166
standard_ProcessUtility (pstmt , queryString ,
221
- context , params , queryEnv ,
167
+ context , params ,
222
168
dest , completionTag );
223
- if (conn )
224
- cancelQueryIfNeeded (conn , queryString );
225
169
}
226
170
227
171
static void
@@ -245,25 +189,47 @@ HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
245
189
{
246
190
Oid serverid ;
247
191
UserMapping * user ;
192
+ char * query ,
193
+ * query_container ,
194
+ * plan ,
195
+ * plan_container ;
196
+ int qlen , qlen1 ,
197
+ plen , plen1 ;
198
+ PGresult * res ;
248
199
249
200
serverid = get_foreign_server_oid (remote_server_fdwname , true);
250
201
Assert (OidIsValid (serverid ));
251
202
252
203
user = GetUserMapping (GetUserId (), serverid );
253
204
conn = GetConnection (user , true);
254
- cancelQueryIfNeeded (conn , queryDesc -> sourceText );
255
205
256
- if (PQsendPlan (conn , serialize_plan (queryDesc , eflags )) == 0 )
257
- pgfdw_report_error (ERROR , NULL , conn , false, queryDesc -> sourceText );
206
+ set_portable_output (true);
207
+ plan = nodeToString (queryDesc -> plannedstmt );
208
+ set_portable_output (false);
209
+ plen = b64_enc_len (plan , strlen (plan ) + 1 );
210
+ plan_container = (char * ) palloc0 (plen + 1 );
211
+ plen1 = b64_encode (plan , strlen (plan ), plan_container );
212
+ Assert (plen > plen1 );
213
+
214
+ qlen = b64_enc_len (queryDesc -> sourceText , strlen (queryDesc -> sourceText ) + 1 );
215
+ query_container = (char * ) palloc0 (qlen + 1 );
216
+ qlen1 = b64_encode (queryDesc -> sourceText , strlen (queryDesc -> sourceText ), query_container );
217
+ Assert (qlen > qlen1 );
218
+
219
+ query = palloc0 (qlen + plen + 100 );
220
+ sprintf (query , "SELECT public.pg_exec_plan('%s', '%s');" , query_container , plan_container );
221
+
222
+ res = PQexec (conn , query );
223
+ PQclear (res );
224
+ pfree (query );
225
+ pfree (query_container );
226
+ pfree (plan_container );
258
227
}
259
228
}
260
229
261
230
static void
262
231
HOOK_ExecEnd_injection (QueryDesc * queryDesc )
263
232
{
264
- if (conn )
265
- cancelQueryIfNeeded (conn , queryDesc -> sourceText );
266
-
267
233
if (prev_ExecutorEnd )
268
234
prev_ExecutorEnd (queryDesc );
269
235
else
0 commit comments