@@ -376,12 +376,21 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
376
376
static void create_cursor (ForeignScanState * node );
377
377
static void fetch_more_data (ForeignScanState * node );
378
378
static void close_cursor (PGconn * conn , unsigned int cursor_number );
379
+ static PgFdwModifyState * create_foreign_modify (EState * estate ,
380
+ ResultRelInfo * resultRelInfo ,
381
+ CmdType operation ,
382
+ Plan * subplan ,
383
+ char * query ,
384
+ List * target_attrs ,
385
+ bool has_returning ,
386
+ List * retrieved_attrs );
379
387
static void prepare_foreign_modify (PgFdwModifyState * fmstate );
380
388
static const char * * convert_prep_stmt_params (PgFdwModifyState * fmstate ,
381
389
ItemPointer tupleid ,
382
390
TupleTableSlot * slot );
383
391
static void store_returning_result (PgFdwModifyState * fmstate ,
384
392
TupleTableSlot * slot , PGresult * res );
393
+ static void finish_foreign_modify (PgFdwModifyState * fmstate );
385
394
static List * build_remote_returning (Index rtindex , Relation rel ,
386
395
List * returningList );
387
396
static void rebuild_fdw_scan_tlist (ForeignScan * fscan , List * tlist );
@@ -1681,18 +1690,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
1681
1690
int eflags )
1682
1691
{
1683
1692
PgFdwModifyState * fmstate ;
1684
- EState * estate = mtstate -> ps .state ;
1685
- CmdType operation = mtstate -> operation ;
1686
- Relation rel = resultRelInfo -> ri_RelationDesc ;
1687
- RangeTblEntry * rte ;
1688
- Oid userid ;
1689
- ForeignTable * table ;
1690
- UserMapping * user ;
1691
- AttrNumber n_params ;
1692
- Oid typefnoid ;
1693
- bool isvarlena ;
1694
- ListCell * lc ;
1695
- TupleDesc tupdesc = RelationGetDescr (rel );
1693
+ char * query ;
1694
+ List * target_attrs ;
1695
+ bool has_returning ;
1696
+ List * retrieved_attrs ;
1696
1697
1697
1698
/*
1698
1699
* Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
@@ -1701,82 +1702,25 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
1701
1702
if (eflags & EXEC_FLAG_EXPLAIN_ONLY )
1702
1703
return ;
1703
1704
1704
- /* Begin constructing PgFdwModifyState. */
1705
- fmstate = (PgFdwModifyState * ) palloc0 (sizeof (PgFdwModifyState ));
1706
- fmstate -> rel = rel ;
1707
-
1708
- /*
1709
- * Identify which user to do the remote access as. This should match what
1710
- * ExecCheckRTEPerms() does.
1711
- */
1712
- rte = rt_fetch (resultRelInfo -> ri_RangeTableIndex , estate -> es_range_table );
1713
- userid = rte -> checkAsUser ? rte -> checkAsUser : GetUserId ();
1714
-
1715
- /* Get info about foreign table. */
1716
- table = GetForeignTable (RelationGetRelid (rel ));
1717
- user = GetUserMapping (userid , table -> serverid );
1718
-
1719
- /* Open connection; report that we'll create a prepared statement. */
1720
- fmstate -> conn = GetConnection (user , true);
1721
- fmstate -> p_name = NULL ; /* prepared statement not made yet */
1722
-
1723
1705
/* Deconstruct fdw_private data. */
1724
- fmstate -> query = strVal (list_nth (fdw_private ,
1725
- FdwModifyPrivateUpdateSql ));
1726
- fmstate -> target_attrs = (List * ) list_nth (fdw_private ,
1727
- FdwModifyPrivateTargetAttnums );
1728
- fmstate -> has_returning = intVal (list_nth (fdw_private ,
1729
- FdwModifyPrivateHasReturning ));
1730
- fmstate -> retrieved_attrs = (List * ) list_nth (fdw_private ,
1731
- FdwModifyPrivateRetrievedAttrs );
1732
-
1733
- /* Create context for per-tuple temp workspace. */
1734
- fmstate -> temp_cxt = AllocSetContextCreate (estate -> es_query_cxt ,
1735
- "postgres_fdw temporary data" ,
1736
- ALLOCSET_SMALL_SIZES );
1737
-
1738
- /* Prepare for input conversion of RETURNING results. */
1739
- if (fmstate -> has_returning )
1740
- fmstate -> attinmeta = TupleDescGetAttInMetadata (tupdesc );
1741
-
1742
- /* Prepare for output conversion of parameters used in prepared stmt. */
1743
- n_params = list_length (fmstate -> target_attrs ) + 1 ;
1744
- fmstate -> p_flinfo = (FmgrInfo * ) palloc0 (sizeof (FmgrInfo ) * n_params );
1745
- fmstate -> p_nums = 0 ;
1746
-
1747
- if (operation == CMD_UPDATE || operation == CMD_DELETE )
1748
- {
1749
- /* Find the ctid resjunk column in the subplan's result */
1750
- Plan * subplan = mtstate -> mt_plans [subplan_index ]-> plan ;
1751
-
1752
- fmstate -> ctidAttno = ExecFindJunkAttributeInTlist (subplan -> targetlist ,
1753
- "ctid" );
1754
- if (!AttributeNumberIsValid (fmstate -> ctidAttno ))
1755
- elog (ERROR , "could not find junk ctid column" );
1756
-
1757
- /* First transmittable parameter will be ctid */
1758
- getTypeOutputInfo (TIDOID , & typefnoid , & isvarlena );
1759
- fmgr_info (typefnoid , & fmstate -> p_flinfo [fmstate -> p_nums ]);
1760
- fmstate -> p_nums ++ ;
1761
- }
1762
-
1763
- if (operation == CMD_INSERT || operation == CMD_UPDATE )
1764
- {
1765
- /* Set up for remaining transmittable parameters */
1766
- foreach (lc , fmstate -> target_attrs )
1767
- {
1768
- int attnum = lfirst_int (lc );
1769
- Form_pg_attribute attr = TupleDescAttr (tupdesc , attnum - 1 );
1770
-
1771
- Assert (!attr -> attisdropped );
1772
-
1773
- getTypeOutputInfo (attr -> atttypid , & typefnoid , & isvarlena );
1774
- fmgr_info (typefnoid , & fmstate -> p_flinfo [fmstate -> p_nums ]);
1775
- fmstate -> p_nums ++ ;
1776
- }
1777
- }
1778
-
1779
- Assert (fmstate -> p_nums <= n_params );
1706
+ query = strVal (list_nth (fdw_private ,
1707
+ FdwModifyPrivateUpdateSql ));
1708
+ target_attrs = (List * ) list_nth (fdw_private ,
1709
+ FdwModifyPrivateTargetAttnums );
1710
+ has_returning = intVal (list_nth (fdw_private ,
1711
+ FdwModifyPrivateHasReturning ));
1712
+ retrieved_attrs = (List * ) list_nth (fdw_private ,
1713
+ FdwModifyPrivateRetrievedAttrs );
1714
+
1715
+ /* Construct an execution state. */
1716
+ fmstate = create_foreign_modify (mtstate -> ps .state ,
1717
+ resultRelInfo ,
1718
+ mtstate -> operation ,
1719
+ mtstate -> mt_plans [subplan_index ]-> plan ,
1720
+ query ,
1721
+ target_attrs ,
1722
+ has_returning ,
1723
+ retrieved_attrs );
1780
1724
1781
1725
resultRelInfo -> ri_FdwState = fmstate ;
1782
1726
}
@@ -2011,28 +1955,8 @@ postgresEndForeignModify(EState *estate,
2011
1955
if (fmstate == NULL )
2012
1956
return ;
2013
1957
2014
- /* If we created a prepared statement, destroy it */
2015
- if (fmstate -> p_name )
2016
- {
2017
- char sql [64 ];
2018
- PGresult * res ;
2019
-
2020
- snprintf (sql , sizeof (sql ), "DEALLOCATE %s" , fmstate -> p_name );
2021
-
2022
- /*
2023
- * We don't use a PG_TRY block here, so be careful not to throw error
2024
- * without releasing the PGresult.
2025
- */
2026
- res = pgfdw_exec_query (fmstate -> conn , sql );
2027
- if (PQresultStatus (res ) != PGRES_COMMAND_OK )
2028
- pgfdw_report_error (ERROR , res , fmstate -> conn , true, sql );
2029
- PQclear (res );
2030
- fmstate -> p_name = NULL ;
2031
- }
2032
-
2033
- /* Release remote connection */
2034
- ReleaseConnection (fmstate -> conn );
2035
- fmstate -> conn = NULL ;
1958
+ /* Destroy the execution state */
1959
+ finish_foreign_modify (fmstate );
2036
1960
}
2037
1961
2038
1962
/*
@@ -3228,6 +3152,109 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
3228
3152
PQclear (res );
3229
3153
}
3230
3154
3155
+ /*
3156
+ * create_foreign_modify
3157
+ * Construct an execution state of a foreign insert/update/delete
3158
+ * operation
3159
+ */
3160
+ static PgFdwModifyState *
3161
+ create_foreign_modify (EState * estate ,
3162
+ ResultRelInfo * resultRelInfo ,
3163
+ CmdType operation ,
3164
+ Plan * subplan ,
3165
+ char * query ,
3166
+ List * target_attrs ,
3167
+ bool has_returning ,
3168
+ List * retrieved_attrs )
3169
+ {
3170
+ PgFdwModifyState * fmstate ;
3171
+ Relation rel = resultRelInfo -> ri_RelationDesc ;
3172
+ TupleDesc tupdesc = RelationGetDescr (rel );
3173
+ RangeTblEntry * rte ;
3174
+ Oid userid ;
3175
+ ForeignTable * table ;
3176
+ UserMapping * user ;
3177
+ AttrNumber n_params ;
3178
+ Oid typefnoid ;
3179
+ bool isvarlena ;
3180
+ ListCell * lc ;
3181
+
3182
+ /* Begin constructing PgFdwModifyState. */
3183
+ fmstate = (PgFdwModifyState * ) palloc0 (sizeof (PgFdwModifyState ));
3184
+ fmstate -> rel = rel ;
3185
+
3186
+ /*
3187
+ * Identify which user to do the remote access as. This should match what
3188
+ * ExecCheckRTEPerms() does.
3189
+ */
3190
+ rte = rt_fetch (resultRelInfo -> ri_RangeTableIndex , estate -> es_range_table );
3191
+ userid = rte -> checkAsUser ? rte -> checkAsUser : GetUserId ();
3192
+
3193
+ /* Get info about foreign table. */
3194
+ table = GetForeignTable (RelationGetRelid (rel ));
3195
+ user = GetUserMapping (userid , table -> serverid );
3196
+
3197
+ /* Open connection; report that we'll create a prepared statement. */
3198
+ fmstate -> conn = GetConnection (user , true);
3199
+ fmstate -> p_name = NULL ; /* prepared statement not made yet */
3200
+
3201
+ /* Set up remote query information. */
3202
+ fmstate -> query = query ;
3203
+ fmstate -> target_attrs = target_attrs ;
3204
+ fmstate -> has_returning = has_returning ;
3205
+ fmstate -> retrieved_attrs = retrieved_attrs ;
3206
+
3207
+ /* Create context for per-tuple temp workspace. */
3208
+ fmstate -> temp_cxt = AllocSetContextCreate (estate -> es_query_cxt ,
3209
+ "postgres_fdw temporary data" ,
3210
+ ALLOCSET_SMALL_SIZES );
3211
+
3212
+ /* Prepare for input conversion of RETURNING results. */
3213
+ if (fmstate -> has_returning )
3214
+ fmstate -> attinmeta = TupleDescGetAttInMetadata (tupdesc );
3215
+
3216
+ /* Prepare for output conversion of parameters used in prepared stmt. */
3217
+ n_params = list_length (fmstate -> target_attrs ) + 1 ;
3218
+ fmstate -> p_flinfo = (FmgrInfo * ) palloc0 (sizeof (FmgrInfo ) * n_params );
3219
+ fmstate -> p_nums = 0 ;
3220
+
3221
+ if (operation == CMD_UPDATE || operation == CMD_DELETE )
3222
+ {
3223
+ Assert (subplan != NULL );
3224
+
3225
+ /* Find the ctid resjunk column in the subplan's result */
3226
+ fmstate -> ctidAttno = ExecFindJunkAttributeInTlist (subplan -> targetlist ,
3227
+ "ctid" );
3228
+ if (!AttributeNumberIsValid (fmstate -> ctidAttno ))
3229
+ elog (ERROR , "could not find junk ctid column" );
3230
+
3231
+ /* First transmittable parameter will be ctid */
3232
+ getTypeOutputInfo (TIDOID , & typefnoid , & isvarlena );
3233
+ fmgr_info (typefnoid , & fmstate -> p_flinfo [fmstate -> p_nums ]);
3234
+ fmstate -> p_nums ++ ;
3235
+ }
3236
+
3237
+ if (operation == CMD_INSERT || operation == CMD_UPDATE )
3238
+ {
3239
+ /* Set up for remaining transmittable parameters */
3240
+ foreach (lc , fmstate -> target_attrs )
3241
+ {
3242
+ int attnum = lfirst_int (lc );
3243
+ Form_pg_attribute attr = TupleDescAttr (tupdesc , attnum - 1 );
3244
+
3245
+ Assert (!attr -> attisdropped );
3246
+
3247
+ getTypeOutputInfo (attr -> atttypid , & typefnoid , & isvarlena );
3248
+ fmgr_info (typefnoid , & fmstate -> p_flinfo [fmstate -> p_nums ]);
3249
+ fmstate -> p_nums ++ ;
3250
+ }
3251
+ }
3252
+
3253
+ Assert (fmstate -> p_nums <= n_params );
3254
+
3255
+ return fmstate ;
3256
+ }
3257
+
3231
3258
/*
3232
3259
* prepare_foreign_modify
3233
3260
* Establish a prepared statement for execution of INSERT/UPDATE/DELETE
@@ -3370,6 +3397,39 @@ store_returning_result(PgFdwModifyState *fmstate,
3370
3397
PG_END_TRY ();
3371
3398
}
3372
3399
3400
+ /*
3401
+ * finish_foreign_modify
3402
+ * Release resources for a foreign insert/update/delete operation
3403
+ */
3404
+ static void
3405
+ finish_foreign_modify (PgFdwModifyState * fmstate )
3406
+ {
3407
+ Assert (fmstate != NULL );
3408
+
3409
+ /* If we created a prepared statement, destroy it */
3410
+ if (fmstate -> p_name )
3411
+ {
3412
+ char sql [64 ];
3413
+ PGresult * res ;
3414
+
3415
+ snprintf (sql , sizeof (sql ), "DEALLOCATE %s" , fmstate -> p_name );
3416
+
3417
+ /*
3418
+ * We don't use a PG_TRY block here, so be careful not to throw error
3419
+ * without releasing the PGresult.
3420
+ */
3421
+ res = pgfdw_exec_query (fmstate -> conn , sql );
3422
+ if (PQresultStatus (res ) != PGRES_COMMAND_OK )
3423
+ pgfdw_report_error (ERROR , res , fmstate -> conn , true, sql );
3424
+ PQclear (res );
3425
+ fmstate -> p_name = NULL ;
3426
+ }
3427
+
3428
+ /* Release remote connection */
3429
+ ReleaseConnection (fmstate -> conn );
3430
+ fmstate -> conn = NULL ;
3431
+ }
3432
+
3373
3433
/*
3374
3434
* build_remote_returning
3375
3435
* Build a RETURNING targetlist of a remote query for performing an
0 commit comments