Skip to content

Commit df9c2ca

Browse files
committed
Add 'contrib/pg_repeater/' from commit '93cabed0ce5cec20d36d1cd56e200a4a0091d55d'
git-subtree-dir: contrib/pg_repeater git-subtree-mainline: 5c7b664 git-subtree-split: 93cabed
2 parents 5c7b664 + 93cabed commit df9c2ca

File tree

7 files changed

+364
-0
lines changed

7 files changed

+364
-0
lines changed

contrib/pg_repeater/.gitignore

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Prerequisites
2+
*.d
3+
4+
# Object files
5+
*.o
6+
*.ko
7+
*.obj
8+
*.elf
9+
10+
# Linker output
11+
*.ilk
12+
*.map
13+
*.exp
14+
15+
# Precompiled Headers
16+
*.gch
17+
*.pch
18+
19+
# Libraries
20+
*.lib
21+
*.a
22+
*.la
23+
*.lo
24+
25+
# Shared objects (inc. Windows DLLs)
26+
*.dll
27+
*.so
28+
*.so.*
29+
*.dylib
30+
31+
# Executables
32+
*.exe
33+
*.out
34+
*.app
35+
*.i*86
36+
*.x86_64
37+
*.hex
38+
39+
# Debug files
40+
*.dSYM/
41+
*.su
42+
*.idb
43+
*.pdb
44+
45+
# Kernel Module Compile Results
46+
*.mod*
47+
*.cmd
48+
.tmp_versions/
49+
modules.order
50+
Module.symvers
51+
Mkfile.old
52+
dkms.conf

contrib/pg_repeater/LICENSE

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
pg_repeater is released under the PostgreSQL License, a liberal Open Source license, similar to the BSD or MIT licenses.
2+
3+
Copyright (c) 2018-2019, Postgres Professional
4+
Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
5+
Portions Copyright (c) 1994, The Regents of the University of California
6+
7+
Permission to use, copy, modify, and distribute this software and its documentation for any purpose, without fee, and without a written agreement is hereby granted, provided that the above copyright notice and this paragraph and the following two paragraphs appear in all copies.
8+
9+
IN NO EVENT SHALL POSTGRES PROFESSIONAL BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF POSTGRES PROFESSIONAL HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
10+
11+
POSTGRES PROFESSIONAL SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND POSTGRES PROFESSIONAL HAS NO OBLIGATIONS TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.

contrib/pg_repeater/Makefile

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
# contrib/pg_repeater/Makefile
2+
3+
MODULE_big = pg_repeater
4+
EXTENSION = pg_repeater
5+
EXTVERSION = 0.1
6+
PGFILEDESC = "pg_repeater"
7+
MODULES = pg_repeater
8+
OBJS = pg_repeater.o $(WIN32RES)
9+
10+
fdw_srcdir = $(top_srcdir)/contrib/postgres_fdw/
11+
execplan_srcdir = $(top_srcdir)/contrib/pg_execplan/
12+
13+
PG_CPPFLAGS = -I$(libpq_srcdir) -I$(fdw_srcdir) -L$(fdw_srcdir) -I$(execplan_srcdir) -L$(execplan_srcdir)
14+
SHLIB_LINK_INTERNAL = $(libpq)
15+
16+
DATA_built = $(EXTENSION)--$(EXTVERSION).sql
17+
18+
ifdef USE_PGXS
19+
PG_CONFIG = pg_config
20+
PGXS := $(shell $(PG_CONFIG) --pgxs)
21+
include $(PGXS)
22+
else
23+
EXTRA_INSTALL = contrib/postgres_fdw contrib/pg_execplan
24+
SHLIB_PREREQS = submake-libpq
25+
subdir = contrib/pg_repeater
26+
top_builddir = ../..
27+
include $(top_builddir)/src/Makefile.global
28+
include $(top_srcdir)/contrib/contrib-global.mk
29+
endif
30+
31+
$(EXTENSION)--$(EXTVERSION).sql: init.sql
32+
cat $^ > $@

contrib/pg_repeater/README.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# pg_repeater
2+
PostgreSQL patch & extension for UTILITY queries and query plans execution at
3+
remote instance.
4+
5+
Plan is passed by postgres_fdw connection service. It executed by pg_exec_plan()
6+
routine, introduced by pg_execplan extension.
7+
8+
This project dedicated to query execution problem in DBMS for computing systems
9+
with cluster architecture.
10+
11+
The DBMS may need to execute an identical query plan at each computing node.
12+
Today PostgreSQL can process only SQL statements. But it is not guaranteed, that
13+
the planner at each node will construct same query plan, because different
14+
statistics, relation sizes e.t.c.
15+
16+
This solution based on postgres-xl approach: plan tree is serialized by the
17+
nodeToString() routine.
18+
During serialization we transform all database object identifiers (oid) at each
19+
node field to portable representation.
20+

contrib/pg_repeater/init.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
\echo Use "CREATE EXTENSION pg_repeater" to load this file. \quit

contrib/pg_repeater/pg_repeater.c

Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* repeater.c
4+
* Simple demo for remote plan execution patch.
5+
*
6+
* Transfer query plan to a remote instance and wait for result.
7+
* Remote instance parameters (host, port) defines by GUCs.
8+
*
9+
* Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
10+
* Portions Copyright (c) 2018-2019, Postgres Professional
11+
*-------------------------------------------------------------------------
12+
*/
13+
14+
#include "postgres.h"
15+
16+
#include "access/parallel.h"
17+
#include "access/xact.h"
18+
#include "commands/extension.h"
19+
#include "executor/executor.h"
20+
#include "fmgr.h"
21+
#include "foreign/foreign.h"
22+
#include "libpq/libpq.h"
23+
#include "libpq-fe.h"
24+
#include "miscadmin.h"
25+
#include "optimizer/planner.h"
26+
#include "pgstat.h"
27+
#include "postgres_fdw.h"
28+
#include "storage/latch.h"
29+
#include "tcop/utility.h"
30+
#include "utils/builtins.h"
31+
#include "utils/guc.h"
32+
#include "utils/memutils.h"
33+
34+
35+
PG_MODULE_MAGIC;
36+
37+
void _PG_init(void);
38+
39+
static ProcessUtility_hook_type next_ProcessUtility_hook = NULL;
40+
static ExecutorStart_hook_type prev_ExecutorStart = NULL;
41+
static ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
42+
43+
static void HOOK_Utility_injection(PlannedStmt *pstmt, const char *queryString,
44+
ProcessUtilityContext context, ParamListInfo params,
45+
DestReceiver *dest, char *completionTag);
46+
static void HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags);
47+
static void HOOK_ExecEnd_injection(QueryDesc *queryDesc);
48+
49+
/* Remote instance parameters. */
50+
char *remote_server_fdwname;
51+
52+
static bool ExtensionIsActivated = false;
53+
static PGconn *conn = NULL;
54+
55+
static Oid serverid = InvalidOid;
56+
static UserMapping *user = NULL;
57+
58+
59+
/*
60+
* Module load/unload callback
61+
*/
62+
void
63+
_PG_init(void)
64+
{
65+
DefineCustomStringVariable("repeater.fdwname",
66+
"Remote host fdw name",
67+
NULL,
68+
&remote_server_fdwname,
69+
"remoteserv",
70+
PGC_SIGHUP,
71+
GUC_NOT_IN_SAMPLE,
72+
NULL,
73+
NULL,
74+
NULL);
75+
76+
/* ProcessUtility hook */
77+
next_ProcessUtility_hook = ProcessUtility_hook;
78+
ProcessUtility_hook = HOOK_Utility_injection;
79+
80+
prev_ExecutorStart = ExecutorStart_hook;
81+
ExecutorStart_hook = HOOK_ExecStart_injection;
82+
83+
prev_ExecutorEnd = ExecutorEnd_hook;
84+
ExecutorEnd_hook = HOOK_ExecEnd_injection;
85+
}
86+
87+
static bool
88+
ExtensionIsActive(void)
89+
{
90+
if (ExtensionIsActivated)
91+
return true;
92+
93+
if (
94+
!IsTransactionState() ||
95+
!OidIsValid(get_extension_oid("pg_repeater", true))
96+
)
97+
return false;
98+
99+
ExtensionIsActivated = true;
100+
return ExtensionIsActivated;
101+
}
102+
103+
/*
104+
* We need to send some DML queries for sync database schema to a plan execution
105+
* at a remote instance.
106+
*/
107+
static void
108+
HOOK_Utility_injection(PlannedStmt *pstmt,
109+
const char *queryString,
110+
ProcessUtilityContext context,
111+
ParamListInfo params,
112+
DestReceiver *dest,
113+
char *completionTag)
114+
{
115+
Node *parsetree = pstmt->utilityStmt;
116+
117+
if (ExtensionIsActive() &&
118+
pstmt->canSetTag &&
119+
(context != PROCESS_UTILITY_SUBCOMMAND)
120+
)
121+
{
122+
if (!user)
123+
{
124+
MemoryContext oldCxt = MemoryContextSwitchTo(TopMemoryContext);
125+
126+
serverid = get_foreign_server_oid(remote_server_fdwname, true);
127+
Assert(OidIsValid(serverid));
128+
129+
user = GetUserMapping(GetUserId(), serverid);
130+
MemoryContextSwitchTo(oldCxt);
131+
}
132+
switch (nodeTag(parsetree))
133+
{
134+
case T_CopyStmt:
135+
case T_CreateExtensionStmt:
136+
case T_ExplainStmt:
137+
case T_FetchStmt:
138+
case T_VacuumStmt:
139+
break;
140+
default:
141+
{
142+
PGresult *res;
143+
144+
if (nodeTag(parsetree) == T_TransactionStmt)
145+
{
146+
TransactionStmt *stmt = (TransactionStmt *) parsetree;
147+
148+
if (
149+
/* (stmt->kind != TRANS_STMT_ROLLBACK_TO) && */
150+
(stmt->kind != TRANS_STMT_SAVEPOINT)
151+
)
152+
break;
153+
}
154+
conn = GetConnection(user, true);
155+
Assert(conn != NULL);
156+
157+
res = PQexec(conn, queryString);
158+
PQclear(res);
159+
}
160+
break;
161+
}
162+
}
163+
164+
if (next_ProcessUtility_hook)
165+
(*next_ProcessUtility_hook) (pstmt, queryString, context, params,
166+
dest, completionTag);
167+
else
168+
standard_ProcessUtility(pstmt, queryString,
169+
context, params,
170+
dest, completionTag);
171+
}
172+
173+
static void
174+
HOOK_ExecStart_injection(QueryDesc *queryDesc, int eflags)
175+
{
176+
Node *parsetree = queryDesc->plannedstmt->utilityStmt;
177+
178+
if (prev_ExecutorStart)
179+
prev_ExecutorStart(queryDesc, eflags);
180+
else
181+
standard_ExecutorStart(queryDesc, eflags);
182+
183+
/*
184+
* This not fully correct sign for prevent passing each subquery to the
185+
* remote instance. Only for demo.
186+
*/
187+
if (ExtensionIsActive() &&
188+
queryDesc->plannedstmt->canSetTag &&
189+
!IsParallelWorker() &&
190+
((parsetree == NULL) || (nodeTag(parsetree) != T_CreatedbStmt)) &&
191+
!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
192+
{
193+
Oid serverid;
194+
UserMapping *user;
195+
char *query,
196+
*query_container,
197+
*plan,
198+
*plan_container;
199+
int qlen,
200+
qlen1,
201+
plen,
202+
plen1;
203+
PGresult *res;
204+
205+
serverid = get_foreign_server_oid(remote_server_fdwname, true);
206+
Assert(OidIsValid(serverid));
207+
208+
user = GetUserMapping(GetUserId(), serverid);
209+
conn = GetConnection(user, true);
210+
211+
set_portable_output(true);
212+
plan = nodeToString(queryDesc->plannedstmt);
213+
set_portable_output(false);
214+
plen = b64_enc_len(plan, strlen(plan) + 1);
215+
plan_container = (char *) palloc0(plen + 1);
216+
plen1 = b64_encode(plan, strlen(plan), plan_container);
217+
Assert(plen > plen1);
218+
219+
qlen = b64_enc_len(queryDesc->sourceText, strlen(queryDesc->sourceText) + 1);
220+
query_container = (char *) palloc0(qlen + 1);
221+
qlen1 = b64_encode(queryDesc->sourceText, strlen(queryDesc->sourceText), query_container);
222+
Assert(qlen > qlen1);
223+
224+
query = palloc0(qlen + plen + 100);
225+
sprintf(query, "SELECT public.pg_exec_plan('%s', '%s');", query_container, plan_container);
226+
227+
res = PQexec(conn, query);
228+
PQclear(res);
229+
pfree(query);
230+
pfree(query_container);
231+
pfree(plan_container);
232+
}
233+
}
234+
235+
static void
236+
HOOK_ExecEnd_injection(QueryDesc *queryDesc)
237+
{
238+
if (prev_ExecutorEnd)
239+
prev_ExecutorEnd(queryDesc);
240+
else
241+
standard_ExecutorEnd(queryDesc);
242+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
# pg_repeater extension
2+
comment = 'Pass raw query plan to a remote node'
3+
default_version = '0.1'
4+
module_pathname = '$libdir/pg_repeater'
5+
relocatable = false
6+
requires = 'postgres_fdw, pg_execplan'

0 commit comments

Comments
 (0)