Skip to content

Commit 381834c

Browse files
committed
light refactoring, introduce GUC variable 'pg_pathman.override_copy', implement COPY TO for partitioned tables
1 parent 2f5804f commit 381834c

File tree

8 files changed

+352
-17
lines changed

8 files changed

+352
-17
lines changed

src/copy_stmt_hooking.c

Lines changed: 289 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,35 @@
1+
/* ------------------------------------------------------------------------
2+
*
3+
* copy_stmt_hooking.c
4+
* Override COPY TO/FROM statement for partitioned tables
5+
*
6+
* Copyright (c) 2016, Postgres Professional
7+
* Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
8+
* Portions Copyright (c) 1994, Regents of the University of California
9+
*
10+
* ------------------------------------------------------------------------
11+
*/
12+
113
#include "copy_stmt_hooking.h"
14+
#include "init.h"
215
#include "relation_info.h"
316

17+
#include "access/htup_details.h"
18+
#include "access/sysattr.h"
19+
#include "access/xact.h"
420
#include "catalog/namespace.h"
21+
#include "catalog/pg_attribute.h"
522
#include "commands/copy.h"
23+
#include "commands/trigger.h"
24+
#include "executor/executor.h"
25+
#include "miscadmin.h"
26+
#include "nodes/makefuncs.h"
27+
#include "utils/builtins.h"
28+
#include "utils/lsyscache.h"
29+
#include "utils/rel.h"
30+
#include "utils/rls.h"
31+
32+
#include "libpq/libpq.h"
633

734

835
/*
@@ -14,6 +41,12 @@ is_pathman_related_copy(Node *parsetree)
1441
CopyStmt *copy_stmt = (CopyStmt *) parsetree;
1542
Oid partitioned_table;
1643

44+
if (!IsOverrideCopyEnabled())
45+
{
46+
elog(DEBUG1, "COPY statement hooking is disabled");
47+
return false;
48+
}
49+
1750
/* Check that it's a CopyStmt */
1851
if (!IsA(parsetree, CopyStmt))
1952
return false;
@@ -23,11 +56,266 @@ is_pathman_related_copy(Node *parsetree)
2356
return false;
2457

2558
/* TODO: select appropriate lock for COPY */
26-
partitioned_table = RangeVarGetRelid(copy_stmt->relation, NoLock, false);
59+
partitioned_table = RangeVarGetRelid(copy_stmt->relation,
60+
(copy_stmt->is_from ?
61+
RowExclusiveLock :
62+
AccessShareLock),
63+
false);
2764

2865
/* Check that relation is partitioned */
2966
if (get_pathman_relation_info(partitioned_table))
67+
{
68+
elog(DEBUG1, "Overriding default behavior for COPY (%u)", partitioned_table);
3069
return true;
70+
}
3171

3272
return false;
3373
}
74+
75+
/*
76+
* CopyGetAttnums - build an integer list of attnums to be copied
77+
*
78+
* The input attnamelist is either the user-specified column list,
79+
* or NIL if there was none (in which case we want all the non-dropped
80+
* columns).
81+
*
82+
* rel can be NULL ... it's only used for error reports.
83+
*/
84+
static List *
85+
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
86+
{
87+
List *attnums = NIL;
88+
89+
if (attnamelist == NIL)
90+
{
91+
/* Generate default column list */
92+
Form_pg_attribute *attr = tupDesc->attrs;
93+
int attr_count = tupDesc->natts;
94+
int i;
95+
96+
for (i = 0; i < attr_count; i++)
97+
{
98+
if (attr[i]->attisdropped)
99+
continue;
100+
attnums = lappend_int(attnums, i + 1);
101+
}
102+
}
103+
else
104+
{
105+
/* Validate the user-supplied list and extract attnums */
106+
ListCell *l;
107+
108+
foreach(l, attnamelist)
109+
{
110+
char *name = strVal(lfirst(l));
111+
int attnum;
112+
int i;
113+
114+
/* Lookup column name */
115+
attnum = InvalidAttrNumber;
116+
for (i = 0; i < tupDesc->natts; i++)
117+
{
118+
if (tupDesc->attrs[i]->attisdropped)
119+
continue;
120+
if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
121+
{
122+
attnum = tupDesc->attrs[i]->attnum;
123+
break;
124+
}
125+
}
126+
if (attnum == InvalidAttrNumber)
127+
{
128+
if (rel != NULL)
129+
ereport(ERROR,
130+
(errcode(ERRCODE_UNDEFINED_COLUMN),
131+
errmsg("column \"%s\" of relation \"%s\" does not exist",
132+
name, RelationGetRelationName(rel))));
133+
else
134+
ereport(ERROR,
135+
(errcode(ERRCODE_UNDEFINED_COLUMN),
136+
errmsg("column \"%s\" does not exist",
137+
name)));
138+
}
139+
/* Check for duplicates */
140+
if (list_member_int(attnums, attnum))
141+
ereport(ERROR,
142+
(errcode(ERRCODE_DUPLICATE_COLUMN),
143+
errmsg("column \"%s\" specified more than once",
144+
name)));
145+
attnums = lappend_int(attnums, attnum);
146+
}
147+
}
148+
149+
return attnums;
150+
}
151+
152+
/*
153+
* Execute COPY TO/FROM statement for a partitioned table.
154+
* NOTE: based on DoCopy() (see copy.c).
155+
*/
156+
void
157+
PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
158+
{
159+
CopyState cstate;
160+
bool is_from = stmt->is_from;
161+
bool pipe = (stmt->filename == NULL);
162+
Relation rel;
163+
Oid relid;
164+
Node *query = NULL;
165+
List *range_table = NIL;
166+
167+
/* Disallow COPY TO/FROM file or program except to superusers. */
168+
if (!pipe && !superuser())
169+
{
170+
if (stmt->is_program)
171+
ereport(ERROR,
172+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
173+
errmsg("must be superuser to COPY to or from an external program"),
174+
errhint("Anyone can COPY to stdout or from stdin. "
175+
"psql's \\copy command also works for anyone.")));
176+
else
177+
ereport(ERROR,
178+
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
179+
errmsg("must be superuser to COPY to or from a file"),
180+
errhint("Anyone can COPY to stdout or from stdin. "
181+
"psql's \\copy command also works for anyone.")));
182+
}
183+
184+
if (stmt->relation)
185+
{
186+
TupleDesc tupDesc;
187+
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
188+
List *attnums;
189+
ListCell *cur;
190+
RangeTblEntry *rte;
191+
192+
Assert(!stmt->query);
193+
194+
/* Open the relation (we've locked it in is_pathman_related_copy()) */
195+
rel = heap_openrv(stmt->relation, NoLock);
196+
197+
relid = RelationGetRelid(rel);
198+
199+
rte = makeNode(RangeTblEntry);
200+
rte->rtekind = RTE_RELATION;
201+
rte->relid = RelationGetRelid(rel);
202+
rte->relkind = rel->rd_rel->relkind;
203+
rte->requiredPerms = required_access;
204+
range_table = list_make1(rte);
205+
206+
tupDesc = RelationGetDescr(rel);
207+
attnums = CopyGetAttnums(tupDesc, rel, stmt->attlist);
208+
foreach(cur, attnums)
209+
{
210+
int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
211+
212+
if (is_from)
213+
rte->insertedCols = bms_add_member(rte->insertedCols, attno);
214+
else
215+
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
216+
}
217+
ExecCheckRTPerms(range_table, true);
218+
219+
/*
220+
* We should perform a query instead of low-level heap scan whenever:
221+
* a) table has a RLS policy;
222+
* b) table is partitioned & it's COPY FROM.
223+
*/
224+
if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED ||
225+
is_from == false) /* rewrite COPY table TO statements */
226+
{
227+
SelectStmt *select;
228+
ColumnRef *cr;
229+
ResTarget *target;
230+
RangeVar *from;
231+
232+
if (is_from)
233+
ereport(ERROR,
234+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
235+
errmsg("COPY FROM not supported with row-level security"),
236+
errhint("Use INSERT statements instead.")));
237+
238+
/* Build target list */
239+
cr = makeNode(ColumnRef);
240+
241+
if (!stmt->attlist)
242+
cr->fields = list_make1(makeNode(A_Star));
243+
else
244+
cr->fields = stmt->attlist;
245+
246+
cr->location = 1;
247+
248+
target = makeNode(ResTarget);
249+
target->name = NULL;
250+
target->indirection = NIL;
251+
target->val = (Node *) cr;
252+
target->location = 1;
253+
254+
/*
255+
* Build RangeVar for from clause, fully qualified based on the
256+
* relation which we have opened and locked.
257+
*/
258+
from = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
259+
RelationGetRelationName(rel), -1);
260+
261+
/* Build query */
262+
select = makeNode(SelectStmt);
263+
select->targetList = list_make1(target);
264+
select->fromClause = list_make1(from);
265+
266+
query = (Node *) select;
267+
268+
/*
269+
* Close the relation for now, but keep the lock on it to prevent
270+
* changes between now and when we start the query-based COPY.
271+
*
272+
* We'll reopen it later as part of the query-based COPY.
273+
*/
274+
heap_close(rel, NoLock);
275+
rel = NULL;
276+
}
277+
}
278+
else
279+
{
280+
Assert(stmt->query);
281+
282+
query = stmt->query;
283+
relid = InvalidOid;
284+
rel = NULL;
285+
}
286+
287+
/* COPY ... FROM ... */
288+
if (is_from)
289+
{
290+
/* There should be relation */
291+
Assert(rel);
292+
293+
/* check read-only transaction and parallel mode */
294+
if (XactReadOnly && !rel->rd_islocaltemp)
295+
PreventCommandIfReadOnly("PATHMAN COPY FROM");
296+
PreventCommandIfParallelMode("PATHMAN COPY FROM");
297+
298+
cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
299+
stmt->attlist, stmt->options);
300+
/* TODO: copy files to DB */
301+
heap_close(rel, NoLock);
302+
*processed = 0;
303+
EndCopyFrom(cstate);
304+
}
305+
/* COPY ... TO ... */
306+
else
307+
{
308+
CopyStmt modified_copy_stmt;
309+
310+
/* We should've created a query */
311+
Assert(query);
312+
313+
/* Copy 'stmt' and override some of the fields */
314+
modified_copy_stmt = *stmt;
315+
modified_copy_stmt.relation = NULL;
316+
modified_copy_stmt.query = query;
317+
318+
/* Call standard DoCopy using a new CopyStmt */
319+
DoCopy(&modified_copy_stmt, queryString, processed);
320+
}
321+
}

src/copy_stmt_hooking.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,13 @@
1+
/* ------------------------------------------------------------------------
2+
*
3+
* copy_stmt_hooking.h
4+
* Transaction-specific locks and other functions
5+
*
6+
* Copyright (c) 2016, Postgres Professional
7+
*
8+
* ------------------------------------------------------------------------
9+
*/
10+
111
#ifndef COPY_STMT_HOOKING_H
212
#define COPY_STMT_HOOKING_H
313

@@ -8,5 +18,6 @@
818

919

1020
bool is_pathman_related_copy(Node *parsetree);
21+
void PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed);
1122

1223
#endif

src/hooks.c

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,17 +387,22 @@ pg_pathman_enable_assign_hook(bool newval, void *extra)
387387

388388
/* Return quickly if nothing has changed */
389389
if (newval == (pg_pathman_init_state.pg_pathman_enable &&
390+
pg_pathman_init_state.auto_partition &&
391+
pg_pathman_init_state.override_copy &&
390392
pg_pathman_enable_runtimeappend &&
391393
pg_pathman_enable_runtime_merge_append &&
392394
pg_pathman_enable_partition_filter))
393395
return;
394396

397+
pg_pathman_init_state.auto_partition = newval;
398+
pg_pathman_init_state.override_copy = newval;
395399
pg_pathman_enable_runtime_merge_append = newval;
396400
pg_pathman_enable_runtimeappend = newval;
397401
pg_pathman_enable_partition_filter = newval;
398402

399403
elog(NOTICE,
400-
"RuntimeAppend, RuntimeMergeAppend and PartitionFilter nodes have been %s",
404+
"RuntimeAppend, RuntimeMergeAppend and PartitionFilter nodes "
405+
"and some other options have been %s",
401406
newval ? "enabled" : "disabled");
402407
}
403408

@@ -594,10 +599,17 @@ pathman_process_utility_hook(Node *parsetree,
594599
context, params,
595600
dest, completionTag);
596601

597-
/* Override standard COPY statements if needed */
602+
/* Override standard COPY statement if needed */
598603
if (is_pathman_related_copy(parsetree))
599604
{
600-
elog(INFO, "copy!");
605+
uint64 processed;
606+
607+
PathmanDoCopy((CopyStmt *) parsetree, queryString, &processed);
608+
if (completionTag)
609+
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
610+
"PATHMAN COPY " UINT64_FORMAT, processed);
611+
612+
return; /* don't call standard_ProcessUtility() */
601613
}
602614

603615
/* Call internal implementation */

0 commit comments

Comments
 (0)