|
22 | 22 | #include "catalog/pg_authid.h"
|
23 | 23 | #include "catalog/pg_foreign_table.h"
|
24 | 24 | #include "commands/copy.h"
|
| 25 | +#include "commands/copyfrom_internal.h" |
25 | 26 | #include "commands/defrem.h"
|
26 | 27 | #include "commands/explain.h"
|
27 | 28 | #include "commands/vacuum.h"
|
@@ -74,6 +75,8 @@ static const struct FileFdwOption valid_options[] = {
|
74 | 75 | {"null", ForeignTableRelationId},
|
75 | 76 | {"default", ForeignTableRelationId},
|
76 | 77 | {"encoding", ForeignTableRelationId},
|
| 78 | + {"on_error", ForeignTableRelationId}, |
| 79 | + {"log_verbosity", ForeignTableRelationId}, |
77 | 80 | {"force_not_null", AttributeRelationId},
|
78 | 81 | {"force_null", AttributeRelationId},
|
79 | 82 |
|
@@ -723,38 +726,74 @@ fileIterateForeignScan(ForeignScanState *node)
|
723 | 726 | FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
|
724 | 727 | EState *estate = CreateExecutorState();
|
725 | 728 | ExprContext *econtext;
|
726 |
| - MemoryContext oldcontext; |
| 729 | + MemoryContext oldcontext = CurrentMemoryContext; |
727 | 730 | TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
|
728 |
| - bool found; |
| 731 | + CopyFromState cstate = festate->cstate; |
729 | 732 | ErrorContextCallback errcallback;
|
730 | 733 |
|
731 | 734 | /* Set up callback to identify error line number. */
|
732 | 735 | errcallback.callback = CopyFromErrorCallback;
|
733 |
| - errcallback.arg = (void *) festate->cstate; |
| 736 | + errcallback.arg = (void *) cstate; |
734 | 737 | errcallback.previous = error_context_stack;
|
735 | 738 | error_context_stack = &errcallback;
|
736 | 739 |
|
737 | 740 | /*
|
738 |
| - * The protocol for loading a virtual tuple into a slot is first |
739 |
| - * ExecClearTuple, then fill the values/isnull arrays, then |
740 |
| - * ExecStoreVirtualTuple. If we don't find another row in the file, we |
741 |
| - * just skip the last step, leaving the slot empty as required. |
742 |
| - * |
743 | 741 | * We pass ExprContext because there might be a use of the DEFAULT option
|
744 | 742 | * in COPY FROM, so we may need to evaluate default expressions.
|
745 | 743 | */
|
746 |
| - ExecClearTuple(slot); |
747 | 744 | econtext = GetPerTupleExprContext(estate);
|
748 | 745 |
|
| 746 | +retry: |
| 747 | + |
749 | 748 | /*
|
750 | 749 | * DEFAULT expressions need to be evaluated in a per-tuple context, so
|
751 | 750 | * switch in case we are doing that.
|
752 | 751 | */
|
753 |
| - oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
754 |
| - found = NextCopyFrom(festate->cstate, econtext, |
755 |
| - slot->tts_values, slot->tts_isnull); |
756 |
| - if (found) |
| 752 | + MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); |
| 753 | + |
| 754 | + /* |
| 755 | + * The protocol for loading a virtual tuple into a slot is first |
| 756 | + * ExecClearTuple, then fill the values/isnull arrays, then |
| 757 | + * ExecStoreVirtualTuple. If we don't find another row in the file, we |
| 758 | + * just skip the last step, leaving the slot empty as required. |
| 759 | + * |
| 760 | + */ |
| 761 | + ExecClearTuple(slot); |
| 762 | + |
| 763 | + if (NextCopyFrom(cstate, econtext, slot->tts_values, slot->tts_isnull)) |
| 764 | + { |
| 765 | + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE && |
| 766 | + cstate->escontext->error_occurred) |
| 767 | + { |
| 768 | + /* |
| 769 | + * Soft error occurred, skip this tuple and just make |
| 770 | + * ErrorSaveContext ready for the next NextCopyFrom. Since we |
| 771 | + * don't set details_wanted and error_data is not to be filled, |
| 772 | + * just resetting error_occurred is enough. |
| 773 | + */ |
| 774 | + cstate->escontext->error_occurred = false; |
| 775 | + |
| 776 | + /* Switch back to original memory context */ |
| 777 | + MemoryContextSwitchTo(oldcontext); |
| 778 | + |
| 779 | + /* |
| 780 | + * Make sure we are interruptible while repeatedly calling |
| 781 | + * NextCopyFrom() until no soft error occurs. |
| 782 | + */ |
| 783 | + CHECK_FOR_INTERRUPTS(); |
| 784 | + |
| 785 | + /* |
| 786 | + * Reset the per-tuple exprcontext, to clean-up after expression |
| 787 | + * evaluations etc. |
| 788 | + */ |
| 789 | + ResetPerTupleExprContext(estate); |
| 790 | + |
| 791 | + /* Repeat NextCopyFrom() until no soft error occurs */ |
| 792 | + goto retry; |
| 793 | + } |
| 794 | + |
757 | 795 | ExecStoreVirtualTuple(slot);
|
| 796 | + } |
758 | 797 |
|
759 | 798 | /* Switch back to original memory context */
|
760 | 799 | MemoryContextSwitchTo(oldcontext);
|
@@ -796,8 +835,19 @@ fileEndForeignScan(ForeignScanState *node)
|
796 | 835 | FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
|
797 | 836 |
|
798 | 837 | /* if festate is NULL, we are in EXPLAIN; nothing to do */
|
799 |
| - if (festate) |
800 |
| - EndCopyFrom(festate->cstate); |
| 838 | + if (!festate) |
| 839 | + return; |
| 840 | + |
| 841 | + if (festate->cstate->opts.on_error == COPY_ON_ERROR_IGNORE && |
| 842 | + festate->cstate->num_errors > 0 && |
| 843 | + festate->cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT) |
| 844 | + ereport(NOTICE, |
| 845 | + errmsg_plural("%llu row was skipped due to data type incompatibility", |
| 846 | + "%llu rows were skipped due to data type incompatibility", |
| 847 | + (unsigned long long) festate->cstate->num_errors, |
| 848 | + (unsigned long long) festate->cstate->num_errors)); |
| 849 | + |
| 850 | + EndCopyFrom(festate->cstate); |
801 | 851 | }
|
802 | 852 |
|
803 | 853 | /*
|
@@ -1113,7 +1163,8 @@ estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
|
1113 | 1163 | * which must have at least targrows entries.
|
1114 | 1164 | * The actual number of rows selected is returned as the function result.
|
1115 | 1165 | * We also count the total number of rows in the file and return it into
|
1116 |
| - * *totalrows. Note that *totaldeadrows is always set to 0. |
| 1166 | + * *totalrows. Rows skipped due to on_error = 'ignore' are not included |
| 1167 | + * in this count. Note that *totaldeadrows is always set to 0. |
1117 | 1168 | *
|
1118 | 1169 | * Note that the returned list of rows is not always in order by physical
|
1119 | 1170 | * position in the file. Therefore, correlation estimates derived later
|
@@ -1191,6 +1242,21 @@ file_acquire_sample_rows(Relation onerel, int elevel,
|
1191 | 1242 | if (!found)
|
1192 | 1243 | break;
|
1193 | 1244 |
|
| 1245 | + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE && |
| 1246 | + cstate->escontext->error_occurred) |
| 1247 | + { |
| 1248 | + /* |
| 1249 | + * Soft error occurred, skip this tuple and just make |
| 1250 | + * ErrorSaveContext ready for the next NextCopyFrom. Since we |
| 1251 | + * don't set details_wanted and error_data is not to be filled, |
| 1252 | + * just resetting error_occurred is enough. |
| 1253 | + */ |
| 1254 | + cstate->escontext->error_occurred = false; |
| 1255 | + |
| 1256 | + /* Repeat NextCopyFrom() until no soft error occurs */ |
| 1257 | + continue; |
| 1258 | + } |
| 1259 | + |
1194 | 1260 | /*
|
1195 | 1261 | * The first targrows sample rows are simply copied into the
|
1196 | 1262 | * reservoir. Then we start replacing tuples in the sample until we
|
@@ -1236,6 +1302,15 @@ file_acquire_sample_rows(Relation onerel, int elevel,
|
1236 | 1302 | /* Clean up. */
|
1237 | 1303 | MemoryContextDelete(tupcontext);
|
1238 | 1304 |
|
| 1305 | + if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE && |
| 1306 | + cstate->num_errors > 0 && |
| 1307 | + cstate->opts.log_verbosity >= COPY_LOG_VERBOSITY_DEFAULT) |
| 1308 | + ereport(NOTICE, |
| 1309 | + errmsg_plural("%llu row was skipped due to data type incompatibility", |
| 1310 | + "%llu rows were skipped due to data type incompatibility", |
| 1311 | + (unsigned long long) cstate->num_errors, |
| 1312 | + (unsigned long long) cstate->num_errors)); |
| 1313 | + |
1239 | 1314 | EndCopyFrom(cstate);
|
1240 | 1315 |
|
1241 | 1316 | pfree(values);
|
|
0 commit comments