Skip to content

Commit 93cabed

Browse files
committed
Remove portals from pg_execplan, temporarily add DestLog receiver. + pgindent of code processing
1 parent b6f1dc0 commit 93cabed

File tree

1 file changed

+107
-104
lines changed

1 file changed

+107
-104
lines changed

pg_repeater.c

Lines changed: 107 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
/*-------------------------------------------------------------------------
22
*
33
* repeater.c
4-
* Simple demo for remote plan execution patch.
4+
* Simple demo for remote plan execution patch.
55
*
66
* Transfer query plan to a remote instance and wait for result.
77
* Remote instance parameters (host, port) defines by GUCs.
@@ -34,26 +34,26 @@
3434

3535
PG_MODULE_MAGIC;
3636

37-
void _PG_init(void);
37+
void _PG_init(void);
3838

39-
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
40-
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
41-
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
39+
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
40+
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
41+
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
4242

4343
static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
44-
ProcessUtilityContext context, ParamListInfo params,
45-
DestReceiver *dest, char *completionTag);
44+
ProcessUtilityContext context, ParamListInfo params,
45+
DestReceiver *dest, char *completionTag);
4646
static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
4747
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
4848

4949
/* Remote instance parameters. */
50-
char *remote_server_fdwname;
50+
char *remote_server_fdwname;
5151

52-
static bool ExtensionIsActivated = false;
53-
static PGconn *conn = NULL;
52+
static bool ExtensionIsActivated = false;
53+
static PGconn *conn = NULL;
5454

55-
static Oid serverid = InvalidOid;
56-
static UserMapping *user = NULL;
55+
static Oid serverid = InvalidOid;
56+
static UserMapping *user = NULL;
5757

5858

5959
/*
@@ -63,15 +63,15 @@ void
6363
_PG_init(void)
6464
{
6565
DefineCustomStringVariable("repeater.fdwname",
66-
"Remote host fdw name",
67-
NULL,
68-
&remote_server_fdwname,
69-
"remoteserv",
70-
PGC_SIGHUP,
71-
GUC_NOT_IN_SAMPLE,
72-
NULL,
73-
NULL,
74-
NULL);
66+
"Remote host fdw name",
67+
NULL,
68+
&remote_server_fdwname,
69+
"remoteserv",
70+
PGC_SIGHUP,
71+
GUC_NOT_IN_SAMPLE,
72+
NULL,
73+
NULL,
74+
NULL);
7575

7676
/* ProcessUtility hook */
7777
next_ProcessUtility_hook = ProcessUtility_hook;
@@ -106,22 +106,22 @@ ExtensionIsActive(void)
106106
*/
107107
static void
108108
HOOK_Utility_injection(PlannedStmt *pstmt,
109-
const char *queryString,
110-
ProcessUtilityContext context,
111-
ParamListInfo params,
112-
DestReceiver *dest,
113-
char *completionTag)
109+
const char *queryString,
110+
ProcessUtilityContext context,
111+
ParamListInfo params,
112+
DestReceiver *dest,
113+
char *completionTag)
114114
{
115-
Node *parsetree = pstmt->utilityStmt;
115+
Node *parsetree = pstmt->utilityStmt;
116116

117117
if (ExtensionIsActive() &&
118118
pstmt->canSetTag &&
119119
(context != PROCESS_UTILITY_SUBCOMMAND)
120-
)
120+
)
121121
{
122122
if (!user)
123123
{
124-
MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
124+
MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
125125

126126
serverid = get_foreign_server_oid(remote_server_fdwname, true);
127127
Assert(OidIsValid(serverid));
@@ -131,32 +131,33 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
131131
}
132132
switch (nodeTag(parsetree))
133133
{
134-
case T_CopyStmt:
135-
case T_CreateExtensionStmt:
136-
case T_ExplainStmt:
137-
case T_FetchStmt:
138-
case T_VacuumStmt:
139-
break;
140-
default:
141-
{
142-
PGresult *res;
143-
if (nodeTag(parsetree) == T_TransactionStmt)
144-
{
145-
TransactionStmt *stmt = (TransactionStmt *) parsetree;
146-
147-
if (
148-
// (stmt->kind != TRANS_STMT_ROLLBACK_TO) &&
149-
(stmt->kind != TRANS_STMT_SAVEPOINT)
150-
)
151-
break;
152-
}
153-
conn = GetConnection(user, true);
154-
Assert(conn != NULL);
155-
156-
res = PQexec(conn, queryString);
157-
PQclear(res);
158-
}
159-
break;
134+
case T_CopyStmt:
135+
case T_CreateExtensionStmt:
136+
case T_ExplainStmt:
137+
case T_FetchStmt:
138+
case T_VacuumStmt:
139+
break;
140+
default:
141+
{
142+
PGresult *res;
143+
144+
if (nodeTag(parsetree) == T_TransactionStmt)
145+
{
146+
TransactionStmt *stmt = (TransactionStmt *) parsetree;
147+
148+
if (
149+
/* (stmt->kind != TRANS_STMT_ROLLBACK_TO) && */
150+
(stmt->kind != TRANS_STMT_SAVEPOINT)
151+
)
152+
break;
153+
}
154+
conn = GetConnection(user, true);
155+
Assert(conn != NULL);
156+
157+
res = PQexec(conn, queryString);
158+
PQclear(res);
159+
}
160+
break;
160161
}
161162
}
162163

@@ -165,68 +166,70 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
165166
dest, completionTag);
166167
else
167168
standard_ProcessUtility(pstmt, queryString,
168-
context, params,
169-
dest, completionTag);
169+
context, params,
170+
dest, completionTag);
170171
}
171172

172173
static void
173174
HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
174175
{
175-
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
176+
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
176177

177178
if (prev_ExecutorStart)
178179
prev_ExecutorStart(queryDesc, eflags);
179180
else
180181
standard_ExecutorStart(queryDesc, eflags);
181182

182183
/*
183-
* This not fully correct sign for prevent passing each subquery to
184-
* the remote instance. Only for demo.
184+
* This not fully correct sign for prevent passing each subquery to the
185+
* remote instance. Only for demo.
185186
*/
186-
if (ExtensionIsActive() &&
187-
queryDesc->plannedstmt->canSetTag &&
188-
!IsParallelWorker() &&
189-
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)) &&
190-
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
191-
{
192-
Oid serverid;
193-
UserMapping *user;
194-
char *query,
195-
*query_container,
196-
*plan,
197-
*plan_container;
198-
int qlen, qlen1,
199-
plen, plen1;
200-
PGresult *res;
201-
202-
serverid = get_foreign_server_oid(remote_server_fdwname, true);
203-
Assert(OidIsValid(serverid));
204-
205-
user = GetUserMapping(GetUserId(), serverid);
206-
conn = GetConnection(user, true);
207-
208-
set_portable_output(true);
209-
plan = nodeToString(queryDesc->plannedstmt);
210-
set_portable_output(false);
211-
plen = b64_enc_len(plan, strlen(plan) + 1);
212-
plan_container = (char *) palloc0(plen+1);
213-
plen1 = b64_encode(plan, strlen(plan), plan_container);
214-
Assert(plen > plen1);
215-
216-
qlen = b64_enc_len(queryDesc->sourceText, strlen(queryDesc->sourceText) + 1);
217-
query_container = (char *) palloc0(qlen+1);
218-
qlen1 = b64_encode(queryDesc->sourceText, strlen(queryDesc->sourceText), query_container);
219-
Assert(qlen > qlen1);
220-
221-
query = palloc0(qlen + plen + 100);
222-
sprintf(query, "SELECT public.pg_exec_plan('%s', '%s');", query_container, plan_container);
223-
224-
res = PQexec(conn, query);
225-
PQclear(res);
226-
pfree(query);
227-
pfree(query_container);
228-
pfree(plan_container);
229-
}
187+
if (ExtensionIsActive() &&
188+
queryDesc->plannedstmt->canSetTag &&
189+
!IsParallelWorker() &&
190+
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)) &&
191+
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
192+
{
193+
Oid serverid;
194+
UserMapping *user;
195+
char *query,
196+
*query_container,
197+
*plan,
198+
*plan_container;
199+
int qlen,
200+
qlen1,
201+
plen,
202+
plen1;
203+
PGresult *res;
204+
205+
serverid = get_foreign_server_oid(remote_server_fdwname, true);
206+
Assert(OidIsValid(serverid));
207+
208+
user = GetUserMapping(GetUserId(), serverid);
209+
conn = GetConnection(user, true);
210+
211+
set_portable_output(true);
212+
plan = nodeToString(queryDesc->plannedstmt);
213+
set_portable_output(false);
214+
plen = b64_enc_len(plan, strlen(plan) + 1);
215+
plan_container = (char *) palloc0(plen + 1);
216+
plen1 = b64_encode(plan, strlen(plan), plan_container);
217+
Assert(plen > plen1);
218+
219+
qlen = b64_enc_len(queryDesc->sourceText, strlen(queryDesc->sourceText) + 1);
220+
query_container = (char *) palloc0(qlen + 1);
221+
qlen1 = b64_encode(queryDesc->sourceText, strlen(queryDesc->sourceText), query_container);
222+
Assert(qlen > qlen1);
223+
224+
query = palloc0(qlen + plen + 100);
225+
sprintf(query, "SELECT public.pg_exec_plan('%s', '%s');", query_container, plan_container);
226+
227+
res = PQexec(conn, query);
228+
PQclear(res);
229+
pfree(query);
230+
pfree(query_container);
231+
pfree(plan_container);
232+
}
230233
}
231234

232235
static void

0 commit comments

Comments
 (0)