Skip to content

Commit 42fd4af

Browse files
committed
Add pglogical_hooks.c
1 parent 6296fba commit 42fd4af

File tree

1 file changed

+232
-0
lines changed

1 file changed

+232
-0
lines changed

contrib/multimaster/pglogical_hooks.c

Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
#include "postgres.h"
2+
3+
#include "access/xact.h"
4+
5+
#include "catalog/pg_proc.h"
6+
#include "catalog/pg_type.h"
7+
8+
#include "replication/origin.h"
9+
10+
#include "parser/parse_func.h"
11+
12+
#include "utils/acl.h"
13+
#include "utils/lsyscache.h"
14+
15+
#include "miscadmin.h"
16+
17+
#include "pglogical_hooks.h"
18+
#include "pglogical_output.h"
19+
20+
/*
21+
* Returns Oid of the hooks function specified in funcname.
22+
*
23+
* Error is thrown if function doesn't exist or doen't return correct datatype
24+
* or is volatile.
25+
*/
26+
static Oid
27+
get_hooks_function_oid(List *funcname)
28+
{
29+
Oid funcid;
30+
Oid funcargtypes[1];
31+
32+
funcargtypes[0] = INTERNALOID;
33+
34+
/* find the the function */
35+
funcid = LookupFuncName(funcname, 1, funcargtypes, false);
36+
37+
/* Validate that the function returns void */
38+
if (get_func_rettype(funcid) != VOIDOID)
39+
{
40+
ereport(ERROR,
41+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
42+
errmsg("function %s must return void",
43+
NameListToString(funcname))));
44+
}
45+
46+
if (func_volatile(funcid) == PROVOLATILE_VOLATILE)
47+
{
48+
ereport(ERROR,
49+
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
50+
errmsg("function %s must not be VOLATILE",
51+
NameListToString(funcname))));
52+
}
53+
54+
if (pg_proc_aclcheck(funcid, GetUserId(), ACL_EXECUTE) != ACLCHECK_OK)
55+
{
56+
const char * username;
57+
#if PG_VERSION_NUM >= 90500
58+
username = GetUserNameFromId(GetUserId(), false);
59+
#else
60+
username = GetUserNameFromId(GetUserId());
61+
#endif
62+
ereport(ERROR,
63+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
64+
errmsg("current user %s does not have permission to call function %s",
65+
username, NameListToString(funcname))));
66+
}
67+
68+
return funcid;
69+
}
70+
71+
/*
72+
* If a hook setup function was specified in the startup parameters, look it up
73+
* in the catalogs, check permissions, call it, and store the resulting hook
74+
* info struct.
75+
*/
76+
void
77+
load_hooks(PGLogicalOutputData *data)
78+
{
79+
Oid hooks_func;
80+
MemoryContext old_ctxt;
81+
bool txn_started = false;
82+
83+
if (!IsTransactionState())
84+
{
85+
txn_started = true;
86+
StartTransactionCommand();
87+
}
88+
89+
if (data->hooks_setup_funcname != NIL)
90+
{
91+
hooks_func = get_hooks_function_oid(data->hooks_setup_funcname);
92+
93+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
94+
(void) OidFunctionCall1(hooks_func, PointerGetDatum(&data->hooks));
95+
MemoryContextSwitchTo(old_ctxt);
96+
97+
elog(DEBUG3, "pglogical_output: Loaded hooks from function %u. Hooks are: \n"
98+
"\tstartup_hook: %p\n"
99+
"\tshutdown_hook: %p\n"
100+
"\trow_filter_hook: %p\n"
101+
"\ttxn_filter_hook: %p\n"
102+
"\thooks_private_data: %p\n",
103+
hooks_func,
104+
data->hooks.startup_hook,
105+
data->hooks.shutdown_hook,
106+
data->hooks.row_filter_hook,
107+
data->hooks.txn_filter_hook,
108+
data->hooks.hooks_private_data);
109+
}
110+
111+
if (txn_started)
112+
CommitTransactionCommand();
113+
}
114+
115+
void
116+
call_startup_hook(PGLogicalOutputData *data, List *plugin_params)
117+
{
118+
struct PGLogicalStartupHookArgs args;
119+
MemoryContext old_ctxt;
120+
121+
if (data->hooks.startup_hook != NULL)
122+
{
123+
bool tx_started = false;
124+
125+
args.private_data = data->hooks.hooks_private_data;
126+
args.in_params = plugin_params;
127+
args.out_params = NIL;
128+
129+
elog(DEBUG3, "calling pglogical startup hook");
130+
131+
if (!IsTransactionState())
132+
{
133+
tx_started = true;
134+
StartTransactionCommand();
135+
}
136+
137+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
138+
(void) (*data->hooks.startup_hook)(&args);
139+
MemoryContextSwitchTo(old_ctxt);
140+
141+
if (tx_started)
142+
CommitTransactionCommand();
143+
144+
data->extra_startup_params = args.out_params;
145+
/* The startup hook might change the private data seg */
146+
data->hooks.hooks_private_data = args.private_data;
147+
148+
elog(DEBUG3, "called pglogical startup hook");
149+
}
150+
}
151+
152+
void
153+
call_shutdown_hook(PGLogicalOutputData *data)
154+
{
155+
struct PGLogicalShutdownHookArgs args;
156+
MemoryContext old_ctxt;
157+
158+
if (data->hooks.shutdown_hook != NULL)
159+
{
160+
args.private_data = data->hooks.hooks_private_data;
161+
162+
elog(DEBUG3, "calling pglogical shutdown hook");
163+
164+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
165+
(void) (*data->hooks.shutdown_hook)(&args);
166+
MemoryContextSwitchTo(old_ctxt);
167+
168+
data->hooks.hooks_private_data = args.private_data;
169+
170+
elog(DEBUG3, "called pglogical shutdown hook");
171+
}
172+
}
173+
174+
/*
175+
* Decide if the individual change should be filtered out by
176+
* calling a client-provided hook.
177+
*/
178+
bool
179+
call_row_filter_hook(PGLogicalOutputData *data, ReorderBufferTXN *txn,
180+
Relation rel, ReorderBufferChange *change)
181+
{
182+
struct PGLogicalRowFilterArgs hook_args;
183+
MemoryContext old_ctxt;
184+
bool ret = true;
185+
186+
if (data->hooks.row_filter_hook != NULL)
187+
{
188+
hook_args.change_type = change->action;
189+
hook_args.private_data = data->hooks.hooks_private_data;
190+
hook_args.changed_rel = rel;
191+
192+
elog(DEBUG3, "calling pglogical row filter hook");
193+
194+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
195+
ret = (*data->hooks.row_filter_hook)(&hook_args);
196+
MemoryContextSwitchTo(old_ctxt);
197+
198+
/* Filter hooks shouldn't change the private data ptr */
199+
Assert(data->hooks.hooks_private_data == hook_args.private_data);
200+
201+
elog(DEBUG3, "called pglogical row filter hook, returned %d", (int)ret);
202+
}
203+
204+
return ret;
205+
}
206+
207+
bool
208+
call_txn_filter_hook(PGLogicalOutputData *data, RepOriginId txn_origin)
209+
{
210+
struct PGLogicalTxnFilterArgs hook_args;
211+
bool ret = true;
212+
MemoryContext old_ctxt;
213+
214+
if (data->hooks.txn_filter_hook != NULL)
215+
{
216+
hook_args.private_data = data->hooks.hooks_private_data;
217+
hook_args.origin_id = txn_origin;
218+
219+
elog(DEBUG3, "calling pglogical txn filter hook");
220+
221+
old_ctxt = MemoryContextSwitchTo(data->hooks_mctxt);
222+
ret = (*data->hooks.txn_filter_hook)(&hook_args);
223+
MemoryContextSwitchTo(old_ctxt);
224+
225+
/* Filter hooks shouldn't change the private data ptr */
226+
Assert(data->hooks.hooks_private_data == hook_args.private_data);
227+
228+
elog(DEBUG3, "called pglogical txn filter hook, returned %d", (int)ret);
229+
}
230+
231+
return ret;
232+
}

0 commit comments

Comments
 (0)