Skip to content

Commit a1c4c8a

Browse files
committed
file_fdw: Add on_error and log_verbosity options to file_fdw.
In v17, the on_error and log_verbosity options were introduced for the COPY command. This commit extends support for these options to file_fdw. Setting on_error = 'ignore' for a file_fdw foreign table allows users to query it without errors, even when the input file contains malformed rows, by skipping the problematic rows. Both on_error and log_verbosity options apply to SELECT and ANALYZE operations on file_fdw foreign tables. Author: Atsushi Torikoshi Reviewed-by: Masahiko Sawada, Fujii Masao Discussion: https://postgr.es/m/ab59dad10490ea3734cf022b16c24cfd@oss.nttdata.com
1 parent e7834a1 commit a1c4c8a

File tree

4 files changed

+140
-16
lines changed

4 files changed

+140
-16
lines changed

contrib/file_fdw/expected/file_fdw.out

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,25 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
206206
SELECT * FROM agg_bad; -- ERROR
207207
ERROR: invalid input syntax for type real: "aaa"
208208
CONTEXT: COPY agg_bad, line 3, column b: "aaa"
209+
-- on_error and log_verbosity tests
210+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
211+
SELECT * FROM agg_bad;
212+
NOTICE: 1 row was skipped due to data type incompatibility
213+
a | b
214+
-----+--------
215+
100 | 99.097
216+
42 | 324.78
217+
(2 rows)
218+
219+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
220+
SELECT * FROM agg_bad;
221+
a | b
222+
-----+--------
223+
100 | 99.097
224+
42 | 324.78
225+
(2 rows)
226+
227+
ANALYZE agg_bad;
209228
-- misc query tests
210229
\t on
211230
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');

contrib/file_fdw/file_fdw.c

Lines changed: 91 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "catalog/pg_authid.h"
2323
#include "catalog/pg_foreign_table.h"
2424
#include "commands/copy.h"
25+
#include "commands/copyfrom_internal.h"
2526
#include "commands/defrem.h"
2627
#include "commands/explain.h"
2728
#include "commands/vacuum.h"
@@ -74,6 +75,8 @@ static const struct FileFdwOption valid_options[] = {
7475
{"null", ForeignTableRelationId},
7576
{"default", ForeignTableRelationId},
7677
{"encoding", ForeignTableRelationId},
78+
{"on_error", ForeignTableRelationId},
79+
{"log_verbosity", ForeignTableRelationId},
7780
{"force_not_null", AttributeRelationId},
7881
{"force_null", AttributeRelationId},
7982

@@ -723,38 +726,74 @@ fileIterateForeignScan(ForeignScanState *node)
723726
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
724727
EState *estate = CreateExecutorState();
725728
ExprContext *econtext;
726-
MemoryContext oldcontext;
729+
MemoryContext oldcontext = CurrentMemoryContext;
727730
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
728-
bool found;
731+
CopyFromState cstate = festate->cstate;
729732
ErrorContextCallback errcallback;
730733

731734
/* Set up callback to identify error line number. */
732735
errcallback.callback = CopyFromErrorCallback;
733-
errcallback.arg = (void *) festate->cstate;
736+
errcallback.arg = (void *) cstate;
734737
errcallback.previous = error_context_stack;
735738
error_context_stack = &errcallback;
736739

737740
/*
738-
* The protocol for loading a virtual tuple into a slot is first
739-
* ExecClearTuple, then fill the values/isnull arrays, then
740-
* ExecStoreVirtualTuple. If we don't find another row in the file, we
741-
* just skip the last step, leaving the slot empty as required.
742-
*
743741
* We pass ExprContext because there might be a use of the DEFAULT option
744742
* in COPY FROM, so we may need to evaluate default expressions.
745743
*/
746-
ExecClearTuple(slot);
747744
econtext = GetPerTupleExprContext(estate);
748745

746+
retry:
747+
749748
/*
750749
* DEFAULT expressions need to be evaluated in a per-tuple context, so
751750
* switch in case we are doing that.
752751
*/
753-
oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
754-
found = NextCopyFrom(festate->cstate, econtext,
755-
slot->tts_values, slot->tts_isnull);
756-
if (found)
752+
MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
753+
754+
/*
755+
* The protocol for loading a virtual tuple into a slot is first
756+
* ExecClearTuple, then fill the values/isnull arrays, then
757+
* ExecStoreVirtualTuple. If we don't find another row in the file, we
758+
* just skip the last step, leaving the slot empty as required.
759+
*
760+
*/
761+
ExecClearTuple(slot);
762+
763+
if (NextCopyFrom(cstate, econtext, slot->tts_values, slot->tts_isnull))
764+
{
765+
if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
766+
cstate->escontext->error_occurred)
767+
{
768+
/*
769+
* Soft error occurred, skip this tuple and just make
770+
* ErrorSaveContext ready for the next NextCopyFrom. Since we
771+
* don't set details_wanted and error_data is not to be filled,
772+
* just resetting error_occurred is enough.
773+
*/
774+
cstate->escontext->error_occurred = false;
775+
776+
/* Switch back to original memory context */
777+
MemoryContextSwitchTo(oldcontext);
778+
779+
/*
780+
* Make sure we are interruptible while repeatedly calling
781+
* NextCopyFrom() until no soft error occurs.
782+
*/
783+
CHECK_FOR_INTERRUPTS();
784+
785+
/*
786+
* Reset the per-tuple exprcontext, to clean-up after expression
787+
* evaluations etc.
788+
*/
789+
ResetPerTupleExprContext(estate);
790+
791+
/* Repeat NextCopyFrom() until no soft error occurs */
792+
goto retry;
793+
}
794+
757795
ExecStoreVirtualTuple(slot);
796+
}
758797

759798
/* Switch back to original memory context */
760799
MemoryContextSwitchTo(oldcontext);
@@ -796,8 +835,19 @@ fileEndForeignScan(ForeignScanState *node)
796835
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
797836

798837
/* if festate is NULL, we are in EXPLAIN; nothing to do */
799-
if (festate)
800-
EndCopyFrom(festate->cstate);
838+
if (!festate)
839+
return;
840+
841+
if (festate->cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
842+
festate->cstate->num_errors > 0 &&
843+
festate->cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
844+
ereport(NOTICE,
845+
errmsg_plural("%llu row was skipped due to data type incompatibility",
846+
"%llu rows were skipped due to data type incompatibility",
847+
(unsigned long long) festate->cstate->num_errors,
848+
(unsigned long long) festate->cstate->num_errors));
849+
850+
EndCopyFrom(festate->cstate);
801851
}
802852

803853
/*
@@ -1113,7 +1163,8 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
11131163
* which must have at least targrows entries.
11141164
* The actual number of rows selected is returned as the function result.
11151165
* We also count the total number of rows in the file and return it into
1116-
* *totalrows. Note that *totaldeadrows is always set to 0.
1166+
* *totalrows. Rows skipped due to on_error = 'ignore' are not included
1167+
* in this count. Note that *totaldeadrows is always set to 0.
11171168
*
11181169
* Note that the returned list of rows is not always in order by physical
11191170
* position in the file. Therefore, correlation estimates derived later
@@ -1191,6 +1242,21 @@ file_acquire_sample_rows(Relation onerel, int elevel,
11911242
if (!found)
11921243
break;
11931244

1245+
if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1246+
cstate->escontext->error_occurred)
1247+
{
1248+
/*
1249+
* Soft error occurred, skip this tuple and just make
1250+
* ErrorSaveContext ready for the next NextCopyFrom. Since we
1251+
* don't set details_wanted and error_data is not to be filled,
1252+
* just resetting error_occurred is enough.
1253+
*/
1254+
cstate->escontext->error_occurred = false;
1255+
1256+
/* Repeat NextCopyFrom() until no soft error occurs */
1257+
continue;
1258+
}
1259+
11941260
/*
11951261
* The first targrows sample rows are simply copied into the
11961262
* reservoir. Then we start replacing tuples in the sample until we
@@ -1236,6 +1302,15 @@ file_acquire_sample_rows(Relation onerel, int elevel,
12361302
/* Clean up. */
12371303
MemoryContextDelete(tupcontext);
12381304

1305+
if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE &&
1306+
cstate->num_errors > 0 &&
1307+
cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT)
1308+
ereport(NOTICE,
1309+
errmsg_plural("%llu row was skipped due to data type incompatibility",
1310+
"%llu rows were skipped due to data type incompatibility",
1311+
(unsigned long long) cstate->num_errors,
1312+
(unsigned long long) cstate->num_errors));
1313+
12391314
EndCopyFrom(cstate);
12401315

12411316
pfree(values);

contrib/file_fdw/sql/file_fdw.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,13 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
150150
-- error context report tests
151151
SELECT * FROM agg_bad; -- ERROR
152152
153+
-- on_error and log_verbosity tests
154+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
155+
SELECT * FROM agg_bad;
156+
ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'silent');
157+
SELECT * FROM agg_bad;
158+
ANALYZE agg_bad;
159+
153160
-- misc query tests
154161
\t on
155162
SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');

doc/src/sgml/file-fdw.sgml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,29 @@
126126
</listitem>
127127
</varlistentry>
128128

129+
<varlistentry>
130+
<term><literal>on_error</literal></term>
131+
132+
<listitem>
133+
<para>
134+
Specifies how to behave when encountering an error converting a column's
135+
input value into its data type,
136+
the same as <command>COPY</command>'s <literal>ON_ERROR</literal> option.
137+
</para>
138+
</listitem>
139+
</varlistentry>
140+
141+
<varlistentry>
142+
<term><literal>log_verbosity</literal></term>
143+
144+
<listitem>
145+
<para>
146+
Specifies the amount of messages emitted by <literal>file_fdw</literal>,
147+
the same as <command>COPY</command>'s <literal>LOG_VERBOSITY</literal> option.
148+
</para>
149+
</listitem>
150+
</varlistentry>
151+
129152
</variablelist>
130153

131154
<para>

0 commit comments

Comments
 (0)