Skip to content

Commit 4d154b4

Browse files
committed
WIP working prototype of COPY FROM for partitioned tables (PathmanCopyFrom), refactoring
1 parent 7c15a2a commit 4d154b4

File tree

3 files changed

+366
-45
lines changed

3 files changed

+366
-45
lines changed

src/copy_stmt_hooking.c

Lines changed: 246 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include "copy_stmt_hooking.h"
1414
#include "init.h"
15+
#include "partition_filter.h"
1516
#include "relation_info.h"
1617

1718
#include "access/htup_details.h"
@@ -26,12 +27,22 @@
2627
#include "nodes/makefuncs.h"
2728
#include "utils/builtins.h"
2829
#include "utils/lsyscache.h"
30+
#include "utils/memutils.h"
2931
#include "utils/rel.h"
3032
#include "utils/rls.h"
3133

3234
#include "libpq/libpq.h"
3335

3436

37+
static uint64 PathmanCopyFrom(CopyState cstate,
38+
Relation parent_rel,
39+
List *range_table,
40+
bool old_protocol);
41+
static ResultRelInfoHolder *select_partition_for_copy(const PartRelationInfo *prel,
42+
ResultPartsStorage *parts_storage,
43+
Datum value, EState *estate);
44+
45+
3546
/*
3647
* Is pg_pathman supposed to handle this COPY stmt?
3748
*/
@@ -283,6 +294,11 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
283294
/* COPY ... FROM ... */
284295
if (is_from)
285296
{
297+
bool is_old_protocol;
298+
299+
is_old_protocol = PG_PROTOCOL_MAJOR(FrontendProtocol) < 3 &&
300+
stmt->filename == NULL;
301+
286302
/* There should be relation */
287303
Assert(rel);
288304

@@ -293,9 +309,7 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
293309

294310
cstate = BeginCopyFrom(rel, stmt->filename, stmt->is_program,
295311
stmt->attlist, stmt->options);
296-
/* TODO: copy files to DB */
297-
heap_close(rel, NoLock);
298-
*processed = 0;
312+
*processed = PathmanCopyFrom(cstate, rel, range_table, is_old_protocol);
299313
EndCopyFrom(cstate);
300314
}
301315
/* COPY ... TO ... */
@@ -314,4 +328,233 @@ PathmanDoCopy(const CopyStmt *stmt, const char *queryString, uint64 *processed)
314328
/* Call standard DoCopy using a new CopyStmt */
315329
DoCopy(&modified_copy_stmt, queryString, processed);
316330
}
331+
332+
/*
333+
* Close the relation. If reading, we can release the AccessShareLock we
334+
* got; if writing, we should hold the lock until end of transaction to
335+
* ensure that updates will be committed before lock is released.
336+
*/
337+
if (rel != NULL)
338+
heap_close(rel, (is_from ? NoLock : AccessShareLock));
339+
}
340+
341+
/*
342+
* Copy FROM file to relation.
343+
*/
344+
static uint64
345+
PathmanCopyFrom(CopyState cstate, Relation parent_rel,
346+
List *range_table, bool old_protocol)
347+
{
348+
HeapTuple tuple;
349+
TupleDesc tupDesc;
350+
Datum *values;
351+
bool *nulls;
352+
353+
ResultPartsStorage parts_storage;
354+
ResultRelInfo *parent_result_rel;
355+
356+
EState *estate = CreateExecutorState(); /* for ExecConstraints() */
357+
ExprContext *econtext;
358+
TupleTableSlot *myslot;
359+
MemoryContext oldcontext = CurrentMemoryContext;
360+
361+
uint64 processed = 0;
362+
363+
364+
tupDesc = RelationGetDescr(parent_rel);
365+
366+
parent_result_rel = makeNode(ResultRelInfo);
367+
InitResultRelInfo(parent_result_rel,
368+
parent_rel,
369+
1, /* dummy rangetable index */
370+
0);
371+
ExecOpenIndices(parent_result_rel, false);
372+
373+
estate->es_result_relations = parent_result_rel;
374+
estate->es_num_result_relations = 1;
375+
estate->es_result_relation_info = parent_result_rel;
376+
estate->es_range_table = range_table;
377+
378+
/* Initialize ResultPartsStorage */
379+
init_result_parts_storage(&parts_storage, estate, false,
380+
ResultPartsStorageStandard,
381+
check_acl_for_partition, NULL);
382+
parts_storage.saved_rel_info = parent_result_rel;
383+
384+
/* Set up a tuple slot too */
385+
myslot = ExecInitExtraTupleSlot(estate);
386+
ExecSetSlotDescriptor(myslot, tupDesc);
387+
/* Triggers might need a slot as well */
388+
estate->es_trig_tuple_slot = ExecInitExtraTupleSlot(estate);
389+
390+
/* Prepare to catch AFTER triggers. */
391+
AfterTriggerBeginQuery();
392+
393+
/*
394+
* Check BEFORE STATEMENT insertion triggers. It's debatable whether we
395+
* should do this for COPY, since it's not really an "INSERT" statement as
396+
* such. However, executing these triggers maintains consistency with the
397+
* EACH ROW triggers that we already fire on COPY.
398+
*/
399+
ExecBSInsertTriggers(estate, parent_result_rel);
400+
401+
values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
402+
nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
403+
404+
econtext = GetPerTupleExprContext(estate);
405+
406+
for (;;)
407+
{
408+
TupleTableSlot *slot;
409+
bool skip_tuple;
410+
Oid tuple_oid = InvalidOid;
411+
412+
const PartRelationInfo *prel;
413+
ResultRelInfoHolder *rri_holder_child;
414+
ResultRelInfo *child_result_rel;
415+
416+
CHECK_FOR_INTERRUPTS();
417+
418+
ResetPerTupleExprContext(estate);
419+
420+
/* Fetch PartRelationInfo for parent relation */
421+
prel = get_pathman_relation_info(RelationGetRelid(parent_rel));
422+
423+
/* Switch into its memory context */
424+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
425+
426+
if (!NextCopyFrom(cstate, econtext, values, nulls, &tuple_oid))
427+
break;
428+
429+
/* Search for a matching partition */
430+
rri_holder_child = select_partition_for_copy(prel, &parts_storage,
431+
values[prel->attnum - 1],
432+
estate);
433+
child_result_rel = rri_holder_child->result_rel_info;
434+
estate->es_result_relation_info = child_result_rel;
435+
436+
/* And now we can form the input tuple. */
437+
tuple = heap_form_tuple(tupDesc, values, nulls);
438+
if (tuple_oid != InvalidOid)
439+
HeapTupleSetOid(tuple, tuple_oid);
440+
441+
/*
442+
* Constraints might reference the tableoid column, so initialize
443+
* t_tableOid before evaluating them.
444+
*/
445+
tuple->t_tableOid = RelationGetRelid(child_result_rel->ri_RelationDesc);
446+
447+
/* Triggers and stuff need to be invoked in query context. */
448+
MemoryContextSwitchTo(oldcontext);
449+
450+
/* Place tuple in tuple slot --- but slot shouldn't free it */
451+
slot = myslot;
452+
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
453+
454+
skip_tuple = false;
455+
456+
/* BEFORE ROW INSERT Triggers */
457+
if (child_result_rel->ri_TrigDesc &&
458+
child_result_rel->ri_TrigDesc->trig_insert_before_row)
459+
{
460+
slot = ExecBRInsertTriggers(estate, child_result_rel, slot);
461+
462+
if (slot == NULL) /* "do nothing" */
463+
skip_tuple = true;
464+
else /* trigger might have changed tuple */
465+
tuple = ExecMaterializeSlot(slot);
466+
}
467+
468+
/* Proceed if we still have a tuple */
469+
if (!skip_tuple)
470+
{
471+
List *recheckIndexes = NIL;
472+
473+
/* Check the constraints of the tuple */
474+
if (child_result_rel->ri_RelationDesc->rd_att->constr)
475+
ExecConstraints(child_result_rel, slot, estate);
476+
477+
/* OK, store the tuple and create index entries for it */
478+
simple_heap_insert(child_result_rel->ri_RelationDesc, tuple);
479+
480+
if (child_result_rel->ri_NumIndices > 0)
481+
recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
482+
estate, false, NULL,
483+
NIL);
484+
485+
/* AFTER ROW INSERT Triggers */
486+
ExecARInsertTriggers(estate, child_result_rel, tuple,
487+
recheckIndexes);
488+
489+
list_free(recheckIndexes);
490+
491+
/*
492+
* We count only tuples not suppressed by a BEFORE INSERT trigger;
493+
* this is the same definition used by execMain.c for counting
494+
* tuples inserted by an INSERT command.
495+
*/
496+
processed++;
497+
}
498+
}
499+
500+
MemoryContextSwitchTo(oldcontext);
501+
502+
/*
503+
* In the old protocol, tell pqcomm that we can process normal protocol
504+
* messages again.
505+
*/
506+
if (old_protocol)
507+
pq_endmsgread();
508+
509+
/* Execute AFTER STATEMENT insertion triggers */
510+
ExecASInsertTriggers(estate, parent_result_rel);
511+
512+
/* Handle queued AFTER triggers */
513+
AfterTriggerEndQuery(estate);
514+
515+
pfree(values);
516+
pfree(nulls);
517+
518+
ExecResetTupleTable(estate->es_tupleTable, false);
519+
fini_result_parts_storage(&parts_storage);
520+
521+
FreeExecutorState(estate);
522+
523+
return processed;
524+
}
525+
526+
/*
527+
* Smart wrapper for scan_result_parts_storage().
528+
*/
529+
static ResultRelInfoHolder *
530+
select_partition_for_copy(const PartRelationInfo *prel,
531+
ResultPartsStorage *parts_storage,
532+
Datum value, EState *estate)
533+
{
534+
ExprContext *econtext;
535+
ResultRelInfoHolder *rri_holder;
536+
Oid selected_partid = InvalidOid;
537+
Oid *parts;
538+
int nparts;
539+
540+
econtext = GetPerTupleExprContext(estate);
541+
542+
/* Search for matching partitions using partitioned column */
543+
parts = find_partitions_for_value(value, prel, econtext, &nparts);
544+
545+
if (nparts > 1)
546+
elog(ERROR, "PATHMAN COPY selected more than one partition");
547+
else if (nparts == 0)
548+
elog(ERROR,
549+
"There is no suitable partition for key '%s'",
550+
datum_to_cstring(value, prel->atttype));
551+
else
552+
selected_partid = parts[0];
553+
554+
/* Replace parent table with a suitable partition */
555+
MemoryContextSwitchTo(estate->es_query_cxt);
556+
rri_holder = scan_result_parts_storage(selected_partid, parts_storage);
557+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
558+
559+
return rri_holder;
317560
}

0 commit comments

Comments
 (0)