Skip to content

Commit 870d896

Browse files
committed
Refactor PgFdwModifyState creation/destruction into separate functions.
Etsuro Fujita. The larger patch series of which this is a part has been reviewed by Amit Langote, David Fetter, Maksim Milyutin, Álvaro Herrera, Stephen Frost, and me. Discussion: http://postgr.es/m/5A95487E.9050808@lab.ntt.co.jp
1 parent bcf79b5 commit 870d896

File tree

1 file changed

+169
-109
lines changed

1 file changed

+169
-109
lines changed

contrib/postgres_fdw/postgres_fdw.c

Lines changed: 169 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -376,12 +376,21 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
376376
static void create_cursor(ForeignScanState *node);
377377
static void fetch_more_data(ForeignScanState *node);
378378
static void close_cursor(PGconn *conn, unsigned int cursor_number);
379+
static PgFdwModifyState *create_foreign_modify(EState *estate,
380+
ResultRelInfo *resultRelInfo,
381+
CmdType operation,
382+
Plan *subplan,
383+
char *query,
384+
List *target_attrs,
385+
bool has_returning,
386+
List *retrieved_attrs);
379387
static void prepare_foreign_modify(PgFdwModifyState *fmstate);
380388
static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
381389
ItemPointer tupleid,
382390
TupleTableSlot *slot);
383391
static void store_returning_result(PgFdwModifyState *fmstate,
384392
TupleTableSlot *slot, PGresult *res);
393+
static void finish_foreign_modify(PgFdwModifyState *fmstate);
385394
static List *build_remote_returning(Index rtindex, Relation rel,
386395
List *returningList);
387396
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -1681,18 +1690,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
16811690
int eflags)
16821691
{
16831692
PgFdwModifyState *fmstate;
1684-
EState *estate = mtstate->ps.state;
1685-
CmdType operation = mtstate->operation;
1686-
Relation rel = resultRelInfo->ri_RelationDesc;
1687-
RangeTblEntry *rte;
1688-
Oid userid;
1689-
ForeignTable *table;
1690-
UserMapping *user;
1691-
AttrNumber n_params;
1692-
Oid typefnoid;
1693-
bool isvarlena;
1694-
ListCell *lc;
1695-
TupleDesc tupdesc = RelationGetDescr(rel);
1693+
char *query;
1694+
List *target_attrs;
1695+
bool has_returning;
1696+
List *retrieved_attrs;
16961697

16971698
/*
16981699
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
@@ -1701,82 +1702,25 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
17011702
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
17021703
return;
17031704

1704-
/* Begin constructing PgFdwModifyState. */
1705-
fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
1706-
fmstate->rel = rel;
1707-
1708-
/*
1709-
* Identify which user to do the remote access as. This should match what
1710-
* ExecCheckRTEPerms() does.
1711-
*/
1712-
rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
1713-
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
1714-
1715-
/* Get info about foreign table. */
1716-
table = GetForeignTable(RelationGetRelid(rel));
1717-
user = GetUserMapping(userid, table->serverid);
1718-
1719-
/* Open connection; report that we'll create a prepared statement. */
1720-
fmstate->conn = GetConnection(user, true);
1721-
fmstate->p_name = NULL; /* prepared statement not made yet */
1722-
17231705
/* Deconstruct fdw_private data. */
1724-
fmstate->query = strVal(list_nth(fdw_private,
1725-
FdwModifyPrivateUpdateSql));
1726-
fmstate->target_attrs = (List *) list_nth(fdw_private,
1727-
FdwModifyPrivateTargetAttnums);
1728-
fmstate->has_returning = intVal(list_nth(fdw_private,
1729-
FdwModifyPrivateHasReturning));
1730-
fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
1731-
FdwModifyPrivateRetrievedAttrs);
1732-
1733-
/* Create context for per-tuple temp workspace. */
1734-
fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
1735-
"postgres_fdw temporary data",
1736-
ALLOCSET_SMALL_SIZES);
1737-
1738-
/* Prepare for input conversion of RETURNING results. */
1739-
if (fmstate->has_returning)
1740-
fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1741-
1742-
/* Prepare for output conversion of parameters used in prepared stmt. */
1743-
n_params = list_length(fmstate->target_attrs) + 1;
1744-
fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
1745-
fmstate->p_nums = 0;
1746-
1747-
if (operation == CMD_UPDATE || operation == CMD_DELETE)
1748-
{
1749-
/* Find the ctid resjunk column in the subplan's result */
1750-
Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
1751-
1752-
fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
1753-
"ctid");
1754-
if (!AttributeNumberIsValid(fmstate->ctidAttno))
1755-
elog(ERROR, "could not find junk ctid column");
1756-
1757-
/* First transmittable parameter will be ctid */
1758-
getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
1759-
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1760-
fmstate->p_nums++;
1761-
}
1762-
1763-
if (operation == CMD_INSERT || operation == CMD_UPDATE)
1764-
{
1765-
/* Set up for remaining transmittable parameters */
1766-
foreach(lc, fmstate->target_attrs)
1767-
{
1768-
int attnum = lfirst_int(lc);
1769-
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
1770-
1771-
Assert(!attr->attisdropped);
1772-
1773-
getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
1774-
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
1775-
fmstate->p_nums++;
1776-
}
1777-
}
1778-
1779-
Assert(fmstate->p_nums <= n_params);
1706+
query = strVal(list_nth(fdw_private,
1707+
FdwModifyPrivateUpdateSql));
1708+
target_attrs = (List *) list_nth(fdw_private,
1709+
FdwModifyPrivateTargetAttnums);
1710+
has_returning = intVal(list_nth(fdw_private,
1711+
FdwModifyPrivateHasReturning));
1712+
retrieved_attrs = (List *) list_nth(fdw_private,
1713+
FdwModifyPrivateRetrievedAttrs);
1714+
1715+
/* Construct an execution state. */
1716+
fmstate = create_foreign_modify(mtstate->ps.state,
1717+
resultRelInfo,
1718+
mtstate->operation,
1719+
mtstate->mt_plans[subplan_index]->plan,
1720+
query,
1721+
target_attrs,
1722+
has_returning,
1723+
retrieved_attrs);
17801724

17811725
resultRelInfo->ri_FdwState = fmstate;
17821726
}
@@ -2011,28 +1955,8 @@ postgresEndForeignModify(EState *estate,
20111955
if (fmstate == NULL)
20121956
return;
20131957

2014-
/* If we created a prepared statement, destroy it */
2015-
if (fmstate->p_name)
2016-
{
2017-
char sql[64];
2018-
PGresult *res;
2019-
2020-
snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
2021-
2022-
/*
2023-
* We don't use a PG_TRY block here, so be careful not to throw error
2024-
* without releasing the PGresult.
2025-
*/
2026-
res = pgfdw_exec_query(fmstate->conn, sql);
2027-
if (PQresultStatus(res) != PGRES_COMMAND_OK)
2028-
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
2029-
PQclear(res);
2030-
fmstate->p_name = NULL;
2031-
}
2032-
2033-
/* Release remote connection */
2034-
ReleaseConnection(fmstate->conn);
2035-
fmstate->conn = NULL;
1958+
/* Destroy the execution state */
1959+
finish_foreign_modify(fmstate);
20361960
}
20371961

20381962
/*
@@ -3228,6 +3152,109 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
32283152
PQclear(res);
32293153
}
32303154

3155+
/*
3156+
* create_foreign_modify
3157+
* Construct an execution state of a foreign insert/update/delete
3158+
* operation
3159+
*/
3160+
static PgFdwModifyState *
3161+
create_foreign_modify(EState *estate,
3162+
ResultRelInfo *resultRelInfo,
3163+
CmdType operation,
3164+
Plan *subplan,
3165+
char *query,
3166+
List *target_attrs,
3167+
bool has_returning,
3168+
List *retrieved_attrs)
3169+
{
3170+
PgFdwModifyState *fmstate;
3171+
Relation rel = resultRelInfo->ri_RelationDesc;
3172+
TupleDesc tupdesc = RelationGetDescr(rel);
3173+
RangeTblEntry *rte;
3174+
Oid userid;
3175+
ForeignTable *table;
3176+
UserMapping *user;
3177+
AttrNumber n_params;
3178+
Oid typefnoid;
3179+
bool isvarlena;
3180+
ListCell *lc;
3181+
3182+
/* Begin constructing PgFdwModifyState. */
3183+
fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
3184+
fmstate->rel = rel;
3185+
3186+
/*
3187+
* Identify which user to do the remote access as. This should match what
3188+
* ExecCheckRTEPerms() does.
3189+
*/
3190+
rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
3191+
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
3192+
3193+
/* Get info about foreign table. */
3194+
table = GetForeignTable(RelationGetRelid(rel));
3195+
user = GetUserMapping(userid, table->serverid);
3196+
3197+
/* Open connection; report that we'll create a prepared statement. */
3198+
fmstate->conn = GetConnection(user, true);
3199+
fmstate->p_name = NULL; /* prepared statement not made yet */
3200+
3201+
/* Set up remote query information. */
3202+
fmstate->query = query;
3203+
fmstate->target_attrs = target_attrs;
3204+
fmstate->has_returning = has_returning;
3205+
fmstate->retrieved_attrs = retrieved_attrs;
3206+
3207+
/* Create context for per-tuple temp workspace. */
3208+
fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
3209+
"postgres_fdw temporary data",
3210+
ALLOCSET_SMALL_SIZES);
3211+
3212+
/* Prepare for input conversion of RETURNING results. */
3213+
if (fmstate->has_returning)
3214+
fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
3215+
3216+
/* Prepare for output conversion of parameters used in prepared stmt. */
3217+
n_params = list_length(fmstate->target_attrs) + 1;
3218+
fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
3219+
fmstate->p_nums = 0;
3220+
3221+
if (operation == CMD_UPDATE || operation == CMD_DELETE)
3222+
{
3223+
Assert(subplan != NULL);
3224+
3225+
/* Find the ctid resjunk column in the subplan's result */
3226+
fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
3227+
"ctid");
3228+
if (!AttributeNumberIsValid(fmstate->ctidAttno))
3229+
elog(ERROR, "could not find junk ctid column");
3230+
3231+
/* First transmittable parameter will be ctid */
3232+
getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
3233+
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3234+
fmstate->p_nums++;
3235+
}
3236+
3237+
if (operation == CMD_INSERT || operation == CMD_UPDATE)
3238+
{
3239+
/* Set up for remaining transmittable parameters */
3240+
foreach(lc, fmstate->target_attrs)
3241+
{
3242+
int attnum = lfirst_int(lc);
3243+
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
3244+
3245+
Assert(!attr->attisdropped);
3246+
3247+
getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
3248+
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
3249+
fmstate->p_nums++;
3250+
}
3251+
}
3252+
3253+
Assert(fmstate->p_nums <= n_params);
3254+
3255+
return fmstate;
3256+
}
3257+
32313258
/*
32323259
* prepare_foreign_modify
32333260
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
@@ -3370,6 +3397,39 @@ store_returning_result(PgFdwModifyState *fmstate,
33703397
PG_END_TRY();
33713398
}
33723399

3400+
/*
3401+
* finish_foreign_modify
3402+
* Release resources for a foreign insert/update/delete operation
3403+
*/
3404+
static void
3405+
finish_foreign_modify(PgFdwModifyState *fmstate)
3406+
{
3407+
Assert(fmstate != NULL);
3408+
3409+
/* If we created a prepared statement, destroy it */
3410+
if (fmstate->p_name)
3411+
{
3412+
char sql[64];
3413+
PGresult *res;
3414+
3415+
snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
3416+
3417+
/*
3418+
* We don't use a PG_TRY block here, so be careful not to throw error
3419+
* without releasing the PGresult.
3420+
*/
3421+
res = pgfdw_exec_query(fmstate->conn, sql);
3422+
if (PQresultStatus(res) != PGRES_COMMAND_OK)
3423+
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
3424+
PQclear(res);
3425+
fmstate->p_name = NULL;
3426+
}
3427+
3428+
/* Release remote connection */
3429+
ReleaseConnection(fmstate->conn);
3430+
fmstate->conn = NULL;
3431+
}
3432+
33733433
/*
33743434
* build_remote_returning
33753435
* Build a RETURNING targetlist of a remote query for performing an

0 commit comments

Comments
 (0)