Skip to content

Commit d2d5401

Browse files
committed
raw
1 parent 8c21867 commit d2d5401

File tree

1 file changed

+63
-72
lines changed

1 file changed

+63
-72
lines changed

repeater.c

Lines changed: 63 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
1-
/*
1+
/*-------------------------------------------------------------------------
2+
*
23
* repeater.c
4+
* Simple demo for remote plan execution patch.
35
*
6+
* Transfer query plan to a remote instance and wait for result.
7+
* Remote instance parameters (host, port) defines by GUCs.
8+
*
9+
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10+
* Portions Copyright (c) 2018-2019, Postgres Professional
11+
*-------------------------------------------------------------------------
412
*/
513

614
#include "postgres.h"
@@ -11,12 +19,9 @@
1119
#include "fmgr.h"
1220
#include "libpq/libpq.h"
1321
#include "libpq-fe.h"
14-
#include "nodes/params.h"
1522
#include "optimizer/planner.h"
1623
#include "tcop/utility.h"
1724
#include "utils/guc.h"
18-
#include "utils/memutils.h"
19-
#include "utils/plancache.h"
2025

2126
PG_MODULE_MAGIC;
2227

@@ -32,12 +37,14 @@ static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
3237
char *completionTag);
3338
static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
3439
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
35-
static int execute_query(PGconn *dest, QueryDesc *queryDesc, int eflags);
36-
3740

41+
/* Remote instance parameters. */
3842
char *repeater_host_name;
3943
int repeater_port_number;
4044

45+
static bool ExtensionIsActivated = false;
46+
static PGconn *conn = NULL;
47+
4148
/*
4249
* Module load/unload callback
4350
*/
@@ -79,8 +86,6 @@ _PG_init(void)
7986
ExecutorEnd_hook = HOOK_ExecEnd_injection;
8087
}
8188

82-
static PGconn *conn = NULL;
83-
8489
static PGconn*
8590
EstablishConnection(void)
8691
{
@@ -90,17 +95,17 @@ EstablishConnection(void)
9095
return conn;
9196

9297
/* Connect to slave and send it a query plan */
93-
sprintf(conninfo, "host=localhost port=5433%c", '\0');
98+
sprintf(conninfo, "host=%s port=%d %c", repeater_host_name, repeater_port_number, '\0');
9499
conn = PQconnectdb(conninfo);
95100

96101
if (PQstatus(conn) == CONNECTION_BAD)
97102
elog(LOG, "Connection error. conninfo: %s", conninfo);
103+
else
104+
elog(LOG, "Connection established: host=%s, port=%d", repeater_host_name, repeater_port_number);
98105

99106
return conn;
100107
}
101108

102-
static bool ExtensionIsActivated = false;
103-
104109
static bool
105110
ExtensionIsActive(void)
106111
{
@@ -117,6 +122,10 @@ ExtensionIsActive(void)
117122
return ExtensionIsActivated;
118123
}
119124

125+
/*
126+
* We need to send some DML queries for sync database schema to a plan execution
127+
* at a remote instance.
128+
*/
120129
static void
121130
HOOK_Utility_injection(PlannedStmt *pstmt,
122131
const char *queryString,
@@ -126,27 +135,35 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
126135
DestReceiver *dest,
127136
char *completionTag)
128137
{
129-
Node *parsetree = pstmt->utilityStmt;
138+
Node *parsetree = pstmt->utilityStmt;
139+
PGresult *result;
130140

141+
/*
142+
* Very non-trivial decision about transferring utility query to data nodes.
143+
* This exception list used for demonstration and let us to execute some
144+
* simple queries.
145+
*/
131146
if (ExtensionIsActive() &&
147+
pstmt->canSetTag &&
132148
(nodeTag(parsetree) != T_CopyStmt) &&
133149
(nodeTag(parsetree) != T_CreateExtensionStmt) &&
134150
(nodeTag(parsetree) != T_ExplainStmt) &&
151+
(nodeTag(parsetree) != T_FetchStmt) &&
135152
(context != PROCESS_UTILITY_SUBCOMMAND)
136153
)
137154
{
138-
PGresult *result;
139-
155+
/*
156+
* Previous query could be completed with error report at this instance.
157+
* In this case, we need to prepare connection to the remote instance.
158+
*/
140159
while ((result = PQgetResult(EstablishConnection())) != NULL);
141160

142161
if (PQsendQuery(EstablishConnection(), queryString) == 0)
143-
{
144-
elog(ERROR, "Sending UTILITY query error: %s", queryString);
145-
PQreset(conn);
146-
}
162+
elog(ERROR, "Connection error: query: %s, status=%d, errmsg=%s",
163+
queryString,
164+
PQstatus(EstablishConnection()),
165+
PQerrorMessage(EstablishConnection()));
147166
}
148-
else
149-
elog(LOG, "UTILITY query without sending: %s", queryString);
150167

151168
if (next_ProcessUtility_hook)
152169
(*next_ProcessUtility_hook) (pstmt, queryString, context, params,
@@ -156,48 +173,51 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
156173
context, params, queryEnv,
157174
dest, completionTag);
158175

176+
/*
177+
* Check end of query execution at the remote instance.
178+
*/
159179
if (conn)
160-
{
161-
PGresult *result;
162-
163180
while ((result = PQgetResult(conn)) != NULL);
164-
}
165181
}
166-
static int IsExecuted = 0;
167182

168183
static void
169184
HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
170185
{
171-
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
186+
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
187+
PGresult *result;
188+
PGconn *dest = EstablishConnection();
172189

173190
if (prev_ExecutorStart)
174191
prev_ExecutorStart(queryDesc, eflags);
175192
else
176193
standard_ExecutorStart(queryDesc, eflags);
177-
178-
IsExecuted++;
179-
180-
if (IsExecuted > 1)
181-
return;
182-
183-
if (
184-
ExtensionIsActive() &&
185-
(repeater_host_name == 0) &&
186-
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)) &&
187-
!(eflags & EXEC_FLAG_EXPLAIN_ONLY)
188-
)
194+
elog(LOG, "QUERY: %s", queryDesc->sourceText);
195+
/*
196+
* This not fully correct sign for prevent passing each subquery to
197+
* the remote instance. Only for demo.
198+
*/
199+
if (ExtensionIsActive() &&
200+
queryDesc->plannedstmt->canSetTag &&
201+
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)) &&
202+
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
189203
{
190-
elog(LOG, "Send query: %s", queryDesc->sourceText);
191-
if (execute_query(EstablishConnection(), queryDesc, eflags) == 0)
192-
PQreset(conn);
204+
/*
205+
* Prepare connection.
206+
*/
207+
while ((result = PQgetResult(dest)) != NULL);
208+
elog(LOG, "->QUERY: %s", queryDesc->sourceText);
209+
if (PQsendPlan(dest, serialize_plan(queryDesc, eflags)) == 0)
210+
/*
211+
* Report about remote execution error.
212+
*/
213+
elog(ERROR, "Connection errors during PLAN transferring: status=%d, errmsg=%s",
214+
PQstatus(dest), PQerrorMessage(dest));
193215
}
194216
}
195217

196218
static void
197219
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
198220
{
199-
IsExecuted--;
200-
/* Execute before hook because it destruct memory context of exchange list */
201221
if (conn)
202222
{
203223
PGresult *result;
@@ -210,32 +230,3 @@ HOOK_ExecEnd_injection(QueryDesc *queryDesc)
210230
else
211231
standard_ExecutorEnd(queryDesc);
212232
}
213-
214-
215-
/*
216-
* Serialize plan and send it to the destination instance
217-
*/
218-
static int
219-
execute_query(PGconn *dest, QueryDesc *queryDesc, int eflags)
220-
{
221-
PGresult *result;
222-
223-
Assert(dest != NULL);
224-
225-
/*
226-
* Before send of plan we need to check connection state.
227-
* If previous query was failed, we get PGRES_FATAL_ERROR.
228-
*/
229-
while ((result = PQgetResult(dest)) != NULL);
230-
231-
if (PQsendPlan(dest, serialize_plan(queryDesc, eflags)) == 0)
232-
{
233-
/*
234-
* Report about remote execution error and return control to caller.
235-
*/
236-
elog(ERROR, "PLAN sending error.");
237-
return 0;
238-
}
239-
240-
return 1;
241-
}

0 commit comments

Comments
 (0)