Skip to content

Commit f234df2

Browse files
committed
Add repeater
1 parent dc96188 commit f234df2

File tree

5 files changed

+229
-212
lines changed

5 files changed

+229
-212
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ EXTENSION = execplan
55
EXTVERSION = 0.1
66
PGFILEDESC = "ExecPlan"
77
MODULES = execplan
8-
OBJS = exec_plan.o planwalker.o $(WIN32RES)
8+
OBJS = exec_plan.o planwalker.o repeater.o $(WIN32RES)
99

1010
PG_CPPFLAGS = -I$(libpq_srcdir)
1111
SHLIB_LINK_INTERNAL = $(libpq)

exec_plan.c

Lines changed: 26 additions & 211 deletions
Original file line numberDiff line numberDiff line change
@@ -2,228 +2,53 @@
22
#include "postgres.h"
33

44
#include "catalog/namespace.h"
5-
#include "commands/extension.h"
6-
#include "executor/execdesc.h"
7-
#include "executor/executor.h"
8-
#include "fmgr.h"
9-
#include "libpq/libpq.h"
10-
#include "libpq-fe.h"
11-
#include "nodes/params.h"
12-
#include "planwalker.h"
13-
#include "optimizer/planner.h"
5+
#include "common/base64.h"
6+
#include "nodes/nodeFuncs.h"
147
#include "storage/lmgr.h"
15-
#include "tcop/utility.h"
168
#include "utils/builtins.h"
17-
#include "utils/guc.h"
189
#include "utils/lsyscache.h"
19-
#include "utils/plancache.h"
2010
#include "utils/snapmgr.h"
2111

12+
#include "exec_plan.h"
13+
#include "planwalker.h"
14+
2215
PG_MODULE_MAGIC;
2316

2417
PG_FUNCTION_INFO_V1(pg_execute_plan);
2518

26-
void _PG_init(void);
27-
28-
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
29-
static planner_hook_type prev_planner_hook = NULL;
30-
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
31-
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
32-
33-
static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
34-
ProcessUtilityContext context, ParamListInfo params,
35-
QueryEnvironment *queryEnv, DestReceiver *dest,
36-
char *completionTag);
37-
static PlannedStmt *HOOK_Planner_injection(Query *parse, int cursorOptions,
38-
ParamListInfo boundParams);
39-
static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
40-
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
4119
static char *serialize_plan(QueryDesc *queryDesc, int eflags);
42-
static void execute_query(char *planString);
4320
static bool store_irel_name(Plan *plan, char *buffer);
4421

45-
static PGconn *conn = NULL;
46-
47-
int node_number1 = 0;
48-
49-
/*
50-
* Module load/unload callback
51-
*/
52-
void
53-
_PG_init(void)
54-
{
55-
DefineCustomIntVariable("pargres.node",
56-
"Node number in instances collaboration",
57-
NULL,
58-
&node_number1,
59-
0,
60-
0,
61-
1023,
62-
PGC_SIGHUP,
63-
GUC_NOT_IN_SAMPLE,
64-
NULL,
65-
NULL,
66-
NULL);
67-
68-
/* ProcessUtility hook */
69-
next_ProcessUtility_hook = ProcessUtility_hook;
70-
ProcessUtility_hook = HOOK_Utility_injection;
71-
72-
/* Planner hook */
73-
prev_planner_hook = planner_hook;
74-
planner_hook = HOOK_Planner_injection;
75-
76-
prev_ExecutorStart = ExecutorStart_hook;
77-
ExecutorStart_hook = HOOK_ExecStart_injection;
78-
79-
prev_ExecutorEnd = ExecutorEnd_hook;
80-
ExecutorEnd_hook = HOOK_ExecEnd_injection;
81-
}
82-
83-
static void
84-
HOOK_Utility_injection(PlannedStmt *pstmt,
85-
const char *queryString,
86-
ProcessUtilityContext context,
87-
ParamListInfo params,
88-
QueryEnvironment *queryEnv,
89-
DestReceiver *dest,
90-
char *completionTag)
91-
{
92-
Node *parsetree = pstmt->utilityStmt;
93-
94-
if ((OidIsValid(get_extension_oid("execplan", true))) &&
95-
(node_number1 == 0) &&
96-
(nodeTag(parsetree) != T_CopyStmt) &&
97-
(nodeTag(parsetree) != T_CreateExtensionStmt) &&
98-
(context != PROCESS_UTILITY_SUBCOMMAND))
99-
{
100-
char conninfo[1024];
101-
int status;
102-
103-
// elog(LOG, "Send UTILITY query %d: %s", nodeTag(parsetree), queryString);
104-
105-
/* Connect to slave and send it a query plan */
106-
sprintf(conninfo, "host=localhost port=5433%c", '\0');
107-
conn = PQconnectdb(conninfo);
108-
if (PQstatus(conn) == CONNECTION_BAD)
109-
elog(LOG, "Connection error. conninfo: %s", conninfo);
110-
111-
status = PQsendQuery(conn, queryString);
112-
if (status == 0)
113-
elog(ERROR, "Query sending error: %s", PQerrorMessage(conn));
114-
}
115-
else if (node_number1 == 0)
116-
elog(LOG, "UTILITY query without sending: %s", queryString);
117-
118-
if (next_ProcessUtility_hook)
119-
(*next_ProcessUtility_hook) (pstmt, queryString, context, params,
120-
queryEnv, dest, completionTag);
121-
else
122-
standard_ProcessUtility(pstmt, queryString,
123-
context, params, queryEnv,
124-
dest, completionTag);
125-
126-
if (conn)
127-
{
128-
PGresult *result;
129-
130-
while ((result = PQgetResult(conn)) != NULL)
131-
Assert(PQresultStatus(result) != PGRES_FATAL_ERROR);
132-
PQfinish(conn);
133-
conn = NULL;
134-
}
135-
}
136-
13722
/*
13823
* INPUT: a base64-encoded serialized plan
13924
*/
140-
static void
141-
execute_query(char *planString)
25+
void
26+
execute_query(PGconn *dest, QueryDesc *queryDesc, int eflags)
14227
{
143-
char conninfo[1024];
14428
char *SQLCommand;
14529
int status;
30+
char *serializedPlan;
31+
PGresult *result;
14632

147-
/* Connect to slave and send it a query plan */
148-
sprintf(conninfo, "host=localhost port=5433%c", '\0');
149-
conn = PQconnectdb(conninfo);
150-
if (PQstatus(conn) == CONNECTION_BAD)
151-
elog(LOG, "Connection error. conninfo: %s", conninfo);
152-
153-
SQLCommand = (char *) palloc0(strlen(planString)+100);
154-
sprintf(SQLCommand, "SELECT pg_execute_plan('%s');", planString);
155-
//elog(LOG, "query: %s", SQLCommand);
156-
status = PQsendQuery(conn, SQLCommand);
157-
if (status == 0)
158-
elog(ERROR, "Query sending error: %s", PQerrorMessage(conn));
159-
}
33+
Assert(dest != NULL);
16034

161-
static PlannedStmt *
162-
HOOK_Planner_injection(Query *parse, int cursorOptions,
163-
ParamListInfo boundParams)
164-
{
165-
PlannedStmt *pstmt;
166-
167-
conn = NULL;
168-
169-
if (prev_planner_hook)
170-
pstmt = prev_planner_hook(parse, cursorOptions, boundParams);
171-
else
172-
pstmt = standard_planner(parse, cursorOptions, boundParams);
173-
174-
if ((node_number1 > 0) || (parse->utilityStmt != NULL))
175-
return pstmt;
176-
177-
/* Extension is not initialized. */
178-
if (OidIsValid(get_extension_oid("execplan", true)))
179-
{
180-
181-
}
182-
return pstmt;
183-
}
184-
185-
static void
186-
HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
187-
{
188-
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
189-
190-
if (prev_ExecutorStart)
191-
prev_ExecutorStart(queryDesc, eflags);
192-
else
193-
standard_ExecutorStart(queryDesc, eflags);
194-
195-
if ((OidIsValid(get_extension_oid("execplan", true))) &&
196-
(node_number1 == 0) &&
197-
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)))
198-
{
199-
// elog(LOG, "Send query: %s", queryDesc->sourceText);
200-
execute_query(serialize_plan(queryDesc, eflags));
201-
}
202-
}
35+
/*
36+
* Before send of plan we need to check connection state.
37+
* If previous query was failed, we get PGRES_FATAL_ERROR.
38+
*/
39+
while ((result = PQgetResult(dest)) != NULL);
20340

204-
static void
205-
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
206-
{
207-
/* Execute before hook because it destruct memory context of exchange list */
208-
if (conn)
209-
{
210-
PGresult *result;
41+
serializedPlan = serialize_plan(queryDesc, eflags);
42+
/* Connect to slave and send it a query plan */
43+
SQLCommand = (char *) palloc0(strlen(serializedPlan)+100);
44+
sprintf(SQLCommand, "SELECT pg_execute_plan('%s');", serializedPlan);
21145

212-
while ((result = PQgetResult(conn)) != NULL)
213-
Assert(PQresultStatus(result) != PGRES_FATAL_ERROR);
214-
PQfinish(conn);
215-
conn = NULL;
216-
}
46+
status = PQsendQuery(dest, SQLCommand);
21747

218-
if (prev_ExecutorEnd)
219-
prev_ExecutorEnd(queryDesc);
220-
else
221-
standard_ExecutorEnd(queryDesc);
48+
if (status == 0)
49+
elog(ERROR, "Query sending error: %s", PQerrorMessage(dest));
22250
}
22351

224-
#include "common/base64.h"
225-
#include "nodes/nodeFuncs.h"
226-
22752
static bool
22853
compute_irels_buffer_len(Plan *plan, int *length)
22954
{
@@ -258,8 +83,6 @@ compute_irels_buffer_len(Plan *plan, int *length)
25883
return plan_tree_walker(plan, compute_irels_buffer_len, length);
25984
}
26085

261-
//#include "nodes/pg_list.h"
262-
26386
static char *
26487
serialize_plan(QueryDesc *queryDesc, int eflags)
26588
{
@@ -275,7 +98,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
27598
*econtainer,
27699
*start_address;
277100
ListCell *lc;
278-
101+
elog(LOG, "Send QUERY: %s", queryDesc->sourceText);
279102
serialized_plan = nodeToString(queryDesc->plannedstmt);
280103

281104
/*
@@ -302,11 +125,7 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
302125
* to save the relation names in serialized plan.
303126
*/
304127
compute_irels_buffer_len(queryDesc->plannedstmt->planTree, &inames_len);
305-
// plan_tree_walker(queryDesc->plannedstmt->planTree,
306-
// compute_irels_buffer_len,
307-
// &inames_len);
308-
// planstate_tree_walker((PlanState *) (queryDesc->planstate), compute_irels_buffer_len, &inames_len);
309-
//elog(LOG, "inames_len=%d", inames_len);
128+
310129
/* We use len+1 bytes for include end-of-string symbol. */
311130
splan_len = strlen(serialized_plan) + 1;
312131
qtext_len = strlen(queryDesc->sourceText) + 1;
@@ -353,9 +172,6 @@ serialize_plan(QueryDesc *queryDesc, int eflags)
353172
}
354173
}
355174
store_irel_name((Plan *) (queryDesc->plannedstmt->planTree), start_address);
356-
// plan_tree_walker((Plan *) (queryDesc->plannedstmt->planTree),
357-
// store_irel_name,
358-
// start_address);
359175

360176
start_address += inames_len;
361177
Assert((start_address - container) == tot_len);
@@ -525,7 +341,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
525341

526342
/* Restore query source text string */
527343
queryString = start_addr;
528-
//elog(LOG, "queryString: %s", queryString);
344+
elog(LOG, "Recv QUERY: %s", queryString);
529345
/* Restore instrument and flags */
530346
start_addr += strlen(queryString) + 1;
531347
instrument_options = (int *) start_addr;
@@ -544,7 +360,6 @@ pg_execute_plan(PG_FUNCTION_ARGS)
544360
{
545361
rte->relid = RelnameGetRelid(start_addr);
546362
Assert(rte->relid != InvalidOid);
547-
// elog(LOG, "Relation from decoded plan. relid=%d relname=%s", rte->relid, start_addr);
548363
start_addr += strlen(start_addr) + 1;
549364
}
550365
}
@@ -565,7 +380,7 @@ pg_execute_plan(PG_FUNCTION_ARGS)
565380
ExecutorFinish(queryDesc);
566381
ExecutorEnd(queryDesc);
567382
FreeQueryDesc(queryDesc);
568-
383+
// elog(LOG, "End of QUERY: %s", queryString);
569384
pfree(decdata);
570385
PG_RETURN_BOOL(true);
571386
}

exec_plan.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
/*
2+
* exec_plan.h
3+
*
4+
*/
5+
6+
#ifndef CONTRIB_EXECPLAN_EXEC_PLAN_H_
7+
#define CONTRIB_EXECPLAN_EXEC_PLAN_H_
8+
9+
#include "postgres.h"
10+
11+
#include "executor/executor.h"
12+
#include "libpq-fe.h"
13+
14+
void execute_query(PGconn *dest, QueryDesc *queryDesc, int eflags);
15+
16+
#endif /* CONTRIB_EXECPLAN_EXEC_PLAN_H_ */

0 commit comments

Comments
 (0)