Skip to content

Commit fd7d901

Browse files
committed
Add pg_repeater and functionality for passing regression tests
1 parent 6dfba1a commit fd7d901

File tree

3 files changed

+52
-86
lines changed

3 files changed

+52
-86
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ MODULE_big = pg_repeater
44
EXTENSION = pg_repeater
55
EXTVERSION = 0.1
66
PGFILEDESC = "pg_repeater"
7-
MODULES = pg_repeater1
7+
MODULES = pg_repeater
88
OBJS = pg_repeater.o $(WIN32RES)
99

1010
fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw/

pg_repeater.c

Lines changed: 50 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,19 @@
1717
#include "commands/extension.h"
1818
#include "executor/executor.h"
1919
#include "fmgr.h"
20+
#include "foreign/foreign.h"
2021
#include "libpq/libpq.h"
2122
#include "libpq-fe.h"
23+
#include "miscadmin.h"
2224
#include "optimizer/planner.h"
25+
#include "pgstat.h"
26+
#include "postgres_fdw.h"
27+
#include "storage/latch.h"
2328
#include "tcop/utility.h"
29+
#include "utils/builtins.h"
2430
#include "utils/guc.h"
31+
#include "utils/memutils.h"
32+
2533

2634
PG_MODULE_MAGIC;
2735

@@ -33,8 +41,7 @@ static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
3341

3442
static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
3543
ProcessUtilityContext context, ParamListInfo params,
36-
QueryEnvironment *queryEnv, DestReceiver *dest,
37-
char *completionTag);
44+
DestReceiver *dest, char *completionTag);
3845
static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
3946
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
4047

@@ -44,6 +51,10 @@ char *remote_server_fdwname;
4451
static bool ExtensionIsActivated = false;
4552
static PGconn *conn = NULL;
4653

54+
static Oid serverid = InvalidOid;
55+
static UserMapping *user = NULL;
56+
57+
4758
/*
4859
* Module load/unload callback
4960
*/
@@ -80,79 +91,14 @@ ExtensionIsActive(void)
8091

8192
if (
8293
!IsTransactionState() ||
83-
!OidIsValid(get_extension_oid("repeater", true))
94+
!OidIsValid(get_extension_oid("pg_repeater", true))
8495
)
8596
return false;
8697

8798
ExtensionIsActivated = true;
8899
return ExtensionIsActivated;
89100
}
90101

91-
#include "miscadmin.h"
92-
#include "pgstat.h"
93-
#include "storage/latch.h"
94-
95-
#include "foreign/foreign.h"
96-
#include "postgres_fdw.h"
97-
98-
static Oid serverid = InvalidOid;
99-
static UserMapping *user = NULL;
100-
101-
static bool
102-
pgfdw_cancel_query(PGconn *conn)
103-
{
104-
PGcancel *cancel;
105-
char errbuf[256];
106-
PGresult *result = NULL;
107-
108-
if ((cancel = PQgetCancel(conn)))
109-
{
110-
if (!PQcancel(cancel, errbuf, sizeof(errbuf)))
111-
{
112-
ereport(WARNING,
113-
(errcode(ERRCODE_CONNECTION_FAILURE),
114-
errmsg("could not send cancel request: %s",
115-
errbuf)));
116-
PQfreeCancel(cancel);
117-
return false;
118-
}
119-
120-
PQfreeCancel(cancel);
121-
}
122-
else
123-
elog(FATAL, "Can't get connection cancel descriptor");
124-
125-
PQconsumeInput(conn);
126-
PQclear(result);
127-
128-
return true;
129-
}
130-
131-
static void
132-
cancelQueryIfNeeded(PGconn *conn, const char *query)
133-
{
134-
Assert(conn != NULL);
135-
Assert(query != NULL);
136-
137-
if (PQtransactionStatus(conn) != PQTRANS_IDLE)
138-
{
139-
PGresult *res;
140-
141-
printf("CONN status BEFORE EXEC: %d, txs: %d errmsg: %s\n",
142-
PQstatus(conn),
143-
PQtransactionStatus(conn),
144-
PQerrorMessage(conn));
145-
146-
res = PQgetResult(conn);
147-
148-
if (PQresultStatus(res) == PGRES_FATAL_ERROR)
149-
Assert(pgfdw_cancel_query(conn));
150-
else
151-
pgfdw_get_result(conn, query);
152-
}
153-
154-
}
155-
156102
/*
157103
* We need to send some DML queries for sync database schema to a plan execution
158104
* at a remote instance.
@@ -162,7 +108,6 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
162108
const char *queryString,
163109
ProcessUtilityContext context,
164110
ParamListInfo params,
165-
QueryEnvironment *queryEnv,
166111
DestReceiver *dest,
167112
char *completionTag)
168113
{
@@ -192,6 +137,8 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
192137
case T_VacuumStmt:
193138
break;
194139
default:
140+
{
141+
PGresult *res;
195142
if (nodeTag(parsetree) == T_TransactionStmt)
196143
{
197144
TransactionStmt *stmt = (TransactionStmt *) parsetree;
@@ -202,26 +149,23 @@ HOOK_Utility_injection(PlannedStmt *pstmt,
202149
)
203150
break;
204151
}
205-
if (conn)
206-
cancelQueryIfNeeded(conn, queryString);
207152
conn = GetConnection(user, true);
208-
cancelQueryIfNeeded(conn, queryString);
209153
Assert(conn != NULL);
210154

211-
Assert(PQsendQuery(conn, queryString));
155+
res = PQexec(conn, queryString);
156+
PQclear(res);
157+
}
212158
break;
213-
};
159+
}
214160
}
215161

216162
if (next_ProcessUtility_hook)
217163
(*next_ProcessUtility_hook) (pstmt, queryString, context, params,
218-
queryEnv, dest, completionTag);
164+
dest, completionTag);
219165
else
220166
standard_ProcessUtility(pstmt, queryString,
221-
context, params, queryEnv,
167+
context, params,
222168
dest, completionTag);
223-
if (conn)
224-
cancelQueryIfNeeded(conn, queryString);
225169
}
226170

227171
static void
@@ -245,25 +189,47 @@ HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
245189
{
246190
Oid serverid;
247191
UserMapping *user;
192+
char *query,
193+
*query_container,
194+
*plan,
195+
*plan_container;
196+
int qlen, qlen1,
197+
plen, plen1;
198+
PGresult *res;
248199

249200
serverid = get_foreign_server_oid(remote_server_fdwname, true);
250201
Assert(OidIsValid(serverid));
251202

252203
user = GetUserMapping(GetUserId(), serverid);
253204
conn = GetConnection(user, true);
254-
cancelQueryIfNeeded(conn, queryDesc->sourceText);
255205

256-
if (PQsendPlan(conn, serialize_plan(queryDesc, eflags)) == 0)
257-
pgfdw_report_error(ERROR, NULL, conn, false, queryDesc->sourceText);
206+
set_portable_output(true);
207+
plan = nodeToString(queryDesc->plannedstmt);
208+
set_portable_output(false);
209+
plen = b64_enc_len(plan, strlen(plan) + 1);
210+
plan_container = (char *) palloc0(plen+1);
211+
plen1 = b64_encode(plan, strlen(plan), plan_container);
212+
Assert(plen > plen1);
213+
214+
qlen = b64_enc_len(queryDesc->sourceText, strlen(queryDesc->sourceText) + 1);
215+
query_container = (char *) palloc0(qlen+1);
216+
qlen1 = b64_encode(queryDesc->sourceText, strlen(queryDesc->sourceText), query_container);
217+
Assert(qlen > qlen1);
218+
219+
query = palloc0(qlen + plen + 100);
220+
sprintf(query, "SELECT public.pg_exec_plan('%s', '%s');", query_container, plan_container);
221+
222+
res = PQexec(conn, query);
223+
PQclear(res);
224+
pfree(query);
225+
pfree(query_container);
226+
pfree(plan_container);
258227
}
259228
}
260229

261230
static void
262231
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
263232
{
264-
if (conn)
265-
cancelQueryIfNeeded(conn, queryDesc->sourceText);
266-
267233
if (prev_ExecutorEnd)
268234
prev_ExecutorEnd(queryDesc);
269235
else

pg_repeater.control

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@ comment = 'Pass raw query plan to a remote node'
33
default_version = '0.1'
44
module_pathname = '$libdir/pg_repeater'
55
relocatable = false
6-
requires = 'postgres_fdw pg_execplan'
6+
requires = 'postgres_fdw, pg_execplan'

0 commit comments

Comments
 (0)