Skip to content

Commit 62f1184

Browse files
committed
implement basic FDW support for INSERTs, introduce GUC pg_pathman.insert_into_fdw, restrict FDW INSERTs for COPY FROM stmt, other fixes
1 parent d7fd9b9 commit 62f1184

File tree

5 files changed

+222
-16
lines changed

5 files changed

+222
-16
lines changed

src/copy_stmt_hooking.c

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "commands/copy.h"
2424
#include "commands/trigger.h"
2525
#include "executor/executor.h"
26+
#include "foreign/fdwapi.h"
2627
#include "miscadmin.h"
2728
#include "nodes/makefuncs.h"
2829
#include "utils/builtins.h"
@@ -39,6 +40,10 @@ static uint64 PathmanCopyFrom(CopyState cstate,
3940
List *range_table,
4041
bool old_protocol);
4142

43+
static void prepare_rri_fdw_for_copy(EState *estate,
44+
ResultRelInfoHolder *rri_holder,
45+
void *arg);
46+
4247

4348
/*
4449
* Is pg_pathman supposed to handle this COPY stmt?
@@ -63,7 +68,7 @@ is_pathman_related_copy(Node *parsetree)
6368
if (!copy_stmt->relation)
6469
return false;
6570

66-
/* TODO: select appropriate lock for COPY */
71+
/* Get partition's Oid while locking it */
6772
partitioned_table = RangeVarGetRelid(copy_stmt->relation,
6873
(copy_stmt->is_from ?
6974
RowExclusiveLock :
@@ -387,7 +392,7 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
387392
/* Initialize ResultPartsStorage */
388393
init_result_parts_storage(&parts_storage, estate, false,
389394
ResultPartsStorageStandard,
390-
NULL, NULL);
395+
prepare_rri_fdw_for_copy, NULL);
391396
parts_storage.saved_rel_info = parent_result_rel;
392397

393398
/* Set up a tuple slot too */
@@ -535,3 +540,19 @@ PathmanCopyFrom(CopyState cstate, Relation parent_rel,
535540

536541
return processed;
537542
}
543+
544+
/*
545+
* COPY FROM does not support FDWs, emit ERROR.
546+
*/
547+
static void
548+
prepare_rri_fdw_for_copy(EState *estate,
549+
ResultRelInfoHolder *rri_holder,
550+
void *arg)
551+
{
552+
ResultRelInfo *rri = rri_holder->result_rel_info;
553+
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
554+
555+
if (fdw_routine != NULL)
556+
elog(ERROR, "cannot copy to foreign partition \"%s\"",
557+
get_rel_name(RelationGetRelid(rri->ri_RelationDesc)));
558+
}

src/partition_filter.c

Lines changed: 191 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@
88
* ------------------------------------------------------------------------
99
*/
1010

11-
#include "partition_filter.h"
11+
#include "init.h"
1212
#include "nodes_common.h"
13+
#include "partition_filter.h"
1314
#include "utils.h"
14-
#include "init.h"
1515

16+
#include "foreign/fdwapi.h"
17+
#include "foreign/foreign.h"
18+
#include "nodes/nodeFuncs.h"
1619
#include "utils/guc.h"
1720
#include "utils/memutils.h"
18-
#include "nodes/nodeFuncs.h"
1921
#include "utils/lsyscache.h"
2022
#include "utils/syscache.h"
2123

@@ -35,8 +37,26 @@ typedef struct
3537
bool estate_not_modified; /* did we modify EState somehow? */
3638
} estate_mod_data;
3739

40+
/*
41+
* Allow INSERTs into any FDW \ postgres_fdw \ no FDWs at all.
42+
*/
43+
typedef enum
44+
{
45+
PF_FDW_INSERT_DISABLED = 0, /* INSERTs into FDWs are prohibited */
46+
PF_FDW_INSERT_POSTGRES, /* INSERTs into postgres_fdw are OK */
47+
PF_FDW_INSERT_ANY_FDW /* INSERTs into any FDWs are OK */
48+
} PF_insert_fdw_mode;
49+
50+
static const struct config_enum_entry pg_pathman_insert_into_fdw_options[] = {
51+
{ "disabled", PF_FDW_INSERT_DISABLED, false },
52+
{ "postgres", PF_FDW_INSERT_POSTGRES, false },
53+
{ "any_fdw", PF_FDW_INSERT_ANY_FDW, false },
54+
{ NULL, 0, false }
55+
};
56+
3857

3958
bool pg_pathman_enable_partition_filter = true;
59+
int pg_pathman_insert_into_fdw = PF_FDW_INSERT_POSTGRES;
4060

4161
CustomScanMethods partition_filter_plan_methods;
4262
CustomExecMethods partition_filter_exec_methods;
@@ -47,6 +67,9 @@ static void partition_filter_visitor(Plan *plan, void *context);
4767
static List * pfilter_build_tlist(List *tlist);
4868
static Index append_rte_to_estate(EState *estate, RangeTblEntry *rte);
4969
static int append_rri_to_estate(EState *estate, ResultRelInfo *rri);
70+
static void prepare_rri_fdw_for_insert(EState *estate,
71+
ResultRelInfoHolder *rri_holder,
72+
void *arg);
5073

5174

5275
void
@@ -74,6 +97,18 @@ init_partition_filter_static_data(void)
7497
NULL,
7598
NULL,
7699
NULL);
100+
101+
DefineCustomEnumVariable("pg_pathman.insert_into_fdw",
102+
"Allow INSERTS into FDW partitions.",
103+
NULL,
104+
&pg_pathman_insert_into_fdw,
105+
PF_FDW_INSERT_POSTGRES,
106+
pg_pathman_insert_into_fdw_options,
107+
PGC_SUSET,
108+
0,
109+
NULL,
110+
NULL,
111+
NULL);
77112
}
78113

79114

@@ -179,6 +214,7 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
179214
Index child_rte_idx;
180215
ResultRelInfo *part_result_rel_info;
181216

217+
/* Lock partition and check if it exists */
182218
LockRelationOid(partid, parts_storage->head_open_lock_mode);
183219
if(!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(partid)))
184220
{
@@ -236,18 +272,18 @@ scan_result_parts_storage(Oid partid, ResultPartsStorage *parts_storage)
236272
/* ri_ConstraintExprs will be initialized by ExecRelCheck() */
237273
part_result_rel_info->ri_ConstraintExprs = NULL;
238274

239-
/* Now fill the ResultRelInfo holder */
275+
/* Finally fill the ResultRelInfo holder */
240276
rri_holder->partid = partid;
241277
rri_holder->result_rel_info = part_result_rel_info;
242278

243-
/* Append ResultRelInfo to storage->es_alloc_result_rels */
244-
append_rri_to_estate(parts_storage->estate, part_result_rel_info);
245-
246279
/* Call on_new_rri_holder_callback() if needed */
247280
if (parts_storage->on_new_rri_holder_callback)
248281
parts_storage->on_new_rri_holder_callback(parts_storage->estate,
249282
rri_holder,
250283
parts_storage->callback_arg);
284+
285+
/* Append ResultRelInfo to storage->es_alloc_result_rels */
286+
append_rri_to_estate(parts_storage->estate, part_result_rel_info);
251287
}
252288

253289
return rri_holder;
@@ -351,7 +387,7 @@ partition_filter_begin(CustomScanState *node, EState *estate, int eflags)
351387
/* Init ResultRelInfo cache */
352388
init_result_parts_storage(&state->result_parts, estate,
353389
state->on_conflict_action != ONCONFLICT_NONE,
354-
ResultPartsStorageStandard, NULL, NULL);
390+
ResultPartsStorageStandard, prepare_rri_fdw_for_insert, NULL);
355391

356392
state->warning_triggered = false;
357393
}
@@ -499,6 +535,148 @@ select_partition_for_insert(const PartRelationInfo *prel,
499535
return rri_holder;
500536
}
501537

538+
/*
539+
* Callback to be executed on FDW partitions.
540+
*/
541+
static void
542+
prepare_rri_fdw_for_insert(EState *estate,
543+
ResultRelInfoHolder *rri_holder,
544+
void *arg)
545+
{
546+
ResultRelInfo *rri = rri_holder->result_rel_info;
547+
FdwRoutine *fdw_routine = rri->ri_FdwRoutine;
548+
Oid partid;
549+
550+
/* Nothing to do if not FDW */
551+
if (fdw_routine == NULL)
552+
return;
553+
554+
partid = RelationGetRelid(rri->ri_RelationDesc);
555+
556+
/* Perform some checks according to 'pg_pathman_insert_into_fdw' */
557+
switch (pg_pathman_insert_into_fdw)
558+
{
559+
case PF_FDW_INSERT_DISABLED:
560+
elog(ERROR, "INSERTs into FDW partitions are disabled");
561+
break;
562+
563+
case PF_FDW_INSERT_POSTGRES:
564+
{
565+
ForeignDataWrapper *fdw;
566+
ForeignServer *fserver;
567+
568+
/* Check if it's PostgreSQL FDW */
569+
fserver = GetForeignServer(GetForeignTable(partid)->serverid);
570+
fdw = GetForeignDataWrapper(fserver->fdwid);
571+
if (strcmp("postgres_fdw", fdw->fdwname) != 0)
572+
elog(ERROR, "FDWs other than postgres_fdw are restricted");
573+
}
574+
break;
575+
576+
case PF_FDW_INSERT_ANY_FDW:
577+
{
578+
ForeignDataWrapper *fdw;
579+
ForeignServer *fserver;
580+
581+
fserver = GetForeignServer(GetForeignTable(partid)->serverid);
582+
fdw = GetForeignDataWrapper(fserver->fdwid);
583+
if (strcmp("postgres_fdw", fdw->fdwname) != 0)
584+
elog(WARNING, "unrestricted FDW mode may lead to \"%s\" crashes",
585+
fdw->fdwname);
586+
}
587+
break; /* do nothing */
588+
589+
default:
590+
elog(ERROR, "Mode is not implemented yet");
591+
break;
592+
}
593+
594+
if (fdw_routine->PlanForeignModify)
595+
{
596+
RangeTblEntry *rte;
597+
ModifyTableState mtstate;
598+
List *fdw_private;
599+
Query query;
600+
PlannedStmt *plan;
601+
TupleDesc tupdesc;
602+
int i,
603+
target_attr;
604+
605+
/* Fetch RangeTblEntry for partition */
606+
rte = rt_fetch(rri->ri_RangeTableIndex, estate->es_range_table);
607+
608+
/* Fetch tuple descriptor */
609+
tupdesc = RelationGetDescr(rri->ri_RelationDesc);
610+
611+
/* Create fake Query node */
612+
memset((void *) &query, 0, sizeof(Query));
613+
NodeSetTag(&query, T_Query);
614+
615+
query.commandType = CMD_INSERT;
616+
query.querySource = QSRC_ORIGINAL;
617+
query.resultRelation = 1;
618+
query.rtable = list_make1(copyObject(rte));
619+
query.jointree = makeNode(FromExpr);
620+
621+
query.targetList = NIL;
622+
query.returningList = NIL;
623+
624+
/* Generate 'query.targetList' using 'tupdesc' */
625+
target_attr = 1;
626+
for (i = 0; i < tupdesc->natts; i++)
627+
{
628+
Form_pg_attribute attr;
629+
TargetEntry *te;
630+
Param *param;
631+
632+
attr = tupdesc->attrs[i];
633+
634+
if (attr->attisdropped)
635+
continue;
636+
637+
param = makeNode(Param);
638+
param->paramkind = PARAM_EXTERN;
639+
param->paramid = target_attr;
640+
param->paramtype = attr->atttypid;
641+
param->paramtypmod = attr->atttypmod;
642+
param->paramcollid = attr->attcollation;
643+
param->location = -1;
644+
645+
te = makeTargetEntry((Expr *) param, target_attr,
646+
pstrdup(NameStr(attr->attname)),
647+
false);
648+
649+
query.targetList = lappend(query.targetList, te);
650+
651+
target_attr++;
652+
}
653+
654+
/* Create fake ModifyTableState */
655+
memset((void *) &mtstate, 0, sizeof(ModifyTableState));
656+
NodeSetTag(&mtstate, T_ModifyTableState);
657+
mtstate.ps.state = estate;
658+
mtstate.operation = CMD_INSERT;
659+
mtstate.resultRelInfo = rri;
660+
mtstate.mt_onconflict = ONCONFLICT_NONE;
661+
662+
/* Plan fake query in for FDW access to be planned as well */
663+
elog(DEBUG1, "FDW(%u): plan fake query for fdw_private", partid);
664+
plan = standard_planner(&query, 0, NULL);
665+
666+
/* Extract fdw_private from useless plan */
667+
elog(DEBUG1, "FDW(%u): extract fdw_private", partid);
668+
fdw_private = (List *)
669+
linitial(((ModifyTable *) plan->planTree)->fdwPrivLists);
670+
671+
/* call BeginForeignModify on 'rri' */
672+
elog(DEBUG1, "FDW(%u): call BeginForeignModify on a fake INSERT node", partid);
673+
fdw_routine->BeginForeignModify(&mtstate, rri, fdw_private, 0, 0);
674+
675+
/* Report success */
676+
elog(DEBUG1, "FDW(%u): success", partid);
677+
}
678+
}
679+
502680
/*
503681
* Used by fetch_estate_mod_data() to find estate_mod_data.
504682
*/
@@ -581,7 +759,11 @@ append_rri_to_estate(EState *estate, ResultRelInfo *rri)
581759
estate->es_num_result_relations * sizeof(ResultRelInfo));
582760
}
583761

584-
/* Append ResultRelInfo to 'es_result_relations' array */
762+
/*
763+
* Append ResultRelInfo to 'es_result_relations' array.
764+
* NOTE: this is probably safe since ResultRelInfo
765+
* contains nothing but pointers to various structs.
766+
*/
585767
estate->es_result_relations[estate->es_num_result_relations] = *rri;
586768

587769
/* Update estate_mod_data */

src/partition_filter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ typedef struct
8181

8282

8383
extern bool pg_pathman_enable_partition_filter;
84+
extern int pg_pathman_insert_into_fdw;
8485

8586
extern CustomScanMethods partition_filter_plan_methods;
8687
extern CustomExecMethods partition_filter_exec_methods;

src/pg_pathman.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -904,8 +904,8 @@ create_partitions_internal(Oid relid, Datum value, Oid value_type)
904904
shout_if_prel_is_invalid(relid, prel, PT_RANGE);
905905

906906
/* Read max & min range values from PartRelationInfo */
907-
min_rvalue = prel->ranges[0].min;
908-
max_rvalue = prel->ranges[PrelLastChild(prel)].max;
907+
min_rvalue = PrelGetRangesArray(prel)[0].min;
908+
max_rvalue = PrelGetRangesArray(prel)[PrelLastChild(prel)].max;
909909

910910
/* Retrieve interval as TEXT from tuple */
911911
interval_text = values[Anum_pathman_config_range_interval - 1];
@@ -1222,7 +1222,7 @@ handle_binary_opexpr(WalkerContext *context, WrapperNode *result,
12221222
{
12231223
select_range_partitions(c->constvalue,
12241224
&cmp_func,
1225-
context->prel->ranges,
1225+
PrelGetRangesArray(context->prel),
12261226
PrelChildrenCount(context->prel),
12271227
strategy,
12281228
result);
@@ -1383,7 +1383,7 @@ handle_const(const Const *c, WalkerContext *context)
13831383

13841384
select_range_partitions(c->constvalue,
13851385
&tce->cmp_proc_finfo,
1386-
context->prel->ranges,
1386+
PrelGetRangesArray(context->prel),
13871387
PrelChildrenCount(context->prel),
13881388
BTEqualStrategyNumber,
13891389
result);

src/utils.c

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ lock_rows_visitor(Plan *plan, void *context)
154154
}
155155
}
156156

157-
/* NOTE: Used for debug */
157+
/*
158+
* Print Bitmapset as cstring.
159+
*/
158160
#ifdef __GNUC__
159161
__attribute__((unused))
160162
#endif

0 commit comments

Comments
 (0)