3
3
* copyfrom.c
4
4
* COPY <table> FROM file/program/client
5
5
*
6
+ * This file contains routines needed to efficiently load tuples into a
7
+ * table. That includes looking up the correct partition, firing triggers,
8
+ * calling the table AM function to insert the data, and updating indexes.
9
+ * Reading data from the input file or client and parsing it into Datums
10
+ * is handled in copyfromparse.c.
11
+ *
6
12
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7
13
* Portions Copyright (c) 1994, Regents of the University of California
8
14
*
23
29
#include "access/tableam.h"
24
30
#include "access/xact.h"
25
31
#include "access/xlog.h"
32
+ #include "catalog/namespace.h"
26
33
#include "commands/copy.h"
27
34
#include "commands/copyfrom_internal.h"
28
35
#include "commands/progress.h"
@@ -87,7 +94,7 @@ typedef struct CopyMultiInsertInfo
87
94
List * multiInsertBuffers ; /* List of tracked CopyMultiInsertBuffers */
88
95
int bufferedTuples ; /* number of tuples buffered over all buffers */
89
96
int bufferedBytes ; /* number of bytes from all buffered tuples */
90
- CopyFromState cstate ; /* Copy state for this CopyMultiInsertInfo */
97
+ CopyFromState cstate ; /* Copy state for this CopyMultiInsertInfo */
91
98
EState * estate ; /* Executor state used for COPY */
92
99
CommandId mycid ; /* Command Id used for COPY */
93
100
int ti_options ; /* table insert options */
@@ -107,7 +114,7 @@ static void ClosePipeFromProgram(CopyFromState cstate);
107
114
void
108
115
CopyFromErrorCallback (void * arg )
109
116
{
110
- CopyFromState cstate = (CopyFromState ) arg ;
117
+ CopyFromState cstate = (CopyFromState ) arg ;
111
118
char curlineno_str [32 ];
112
119
113
120
snprintf (curlineno_str , sizeof (curlineno_str ), UINT64_FORMAT ,
@@ -149,15 +156,9 @@ CopyFromErrorCallback(void *arg)
149
156
/*
150
157
* Error is relevant to a particular line.
151
158
*
152
- * If line_buf still contains the correct line, and it's already
153
- * transcoded, print it. If it's still in a foreign encoding, it's
154
- * quite likely that the error is precisely a failure to do
155
- * encoding conversion (ie, bad data). We dare not try to convert
156
- * it, and at present there's no way to regurgitate it without
157
- * conversion. So we have to punt and just report the line number.
159
+ * If line_buf still contains the correct line, print it.
158
160
*/
159
- if (cstate -> line_buf_valid &&
160
- (cstate -> line_buf_converted || !cstate -> need_transcoding ))
161
+ if (cstate -> line_buf_valid )
161
162
{
162
163
char * lineval ;
163
164
@@ -300,7 +301,7 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
300
301
MemoryContext oldcontext ;
301
302
int i ;
302
303
uint64 save_cur_lineno ;
303
- CopyFromState cstate = miinfo -> cstate ;
304
+ CopyFromState cstate = miinfo -> cstate ;
304
305
EState * estate = miinfo -> estate ;
305
306
CommandId mycid = miinfo -> mycid ;
306
307
int ti_options = miinfo -> ti_options ;
@@ -1191,7 +1192,7 @@ BeginCopyFrom(ParseState *pstate,
1191
1192
List * attnamelist ,
1192
1193
List * options )
1193
1194
{
1194
- CopyFromState cstate ;
1195
+ CopyFromState cstate ;
1195
1196
bool pipe = (filename == NULL );
1196
1197
TupleDesc tupDesc ;
1197
1198
AttrNumber num_phys_attrs ,
@@ -1229,7 +1230,7 @@ BeginCopyFrom(ParseState *pstate,
1229
1230
oldcontext = MemoryContextSwitchTo (cstate -> copycontext );
1230
1231
1231
1232
/* Extract options from the statement node tree */
1232
- ProcessCopyOptions (pstate , & cstate -> opts , true /* is_from */ , options );
1233
+ ProcessCopyOptions (pstate , & cstate -> opts , true /* is_from */ , options );
1233
1234
1234
1235
/* Process the target relation */
1235
1236
cstate -> rel = rel ;
@@ -1320,15 +1321,20 @@ BeginCopyFrom(ParseState *pstate,
1320
1321
cstate -> file_encoding = cstate -> opts .file_encoding ;
1321
1322
1322
1323
/*
1323
- * Set up encoding conversion info. Even if the file and server encodings
1324
- * are the same, we must apply pg_any_to_server() to validate data in
1325
- * multibyte encodings.
1324
+ * Look up encoding conversion function.
1326
1325
*/
1327
- cstate -> need_transcoding =
1328
- (cstate -> file_encoding != GetDatabaseEncoding () ||
1329
- pg_database_encoding_max_length () > 1 );
1330
- /* See Multibyte encoding comment above */
1331
- cstate -> encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY (cstate -> file_encoding );
1326
+ if (cstate -> file_encoding == GetDatabaseEncoding () ||
1327
+ cstate -> file_encoding == PG_SQL_ASCII ||
1328
+ GetDatabaseEncoding () == PG_SQL_ASCII )
1329
+ {
1330
+ cstate -> need_transcoding = false;
1331
+ }
1332
+ else
1333
+ {
1334
+ cstate -> need_transcoding = true;
1335
+ cstate -> conversion_proc = FindDefaultConversionProc (cstate -> file_encoding ,
1336
+ GetDatabaseEncoding ());
1337
+ }
1332
1338
1333
1339
cstate -> copy_src = COPY_FILE ; /* default */
1334
1340
@@ -1339,27 +1345,43 @@ BeginCopyFrom(ParseState *pstate,
1339
1345
oldcontext = MemoryContextSwitchTo (cstate -> copycontext );
1340
1346
1341
1347
/* Initialize state variables */
1342
- cstate -> reached_eof = false;
1343
1348
cstate -> eol_type = EOL_UNKNOWN ;
1344
1349
cstate -> cur_relname = RelationGetRelationName (cstate -> rel );
1345
1350
cstate -> cur_lineno = 0 ;
1346
1351
cstate -> cur_attname = NULL ;
1347
1352
cstate -> cur_attval = NULL ;
1348
1353
1349
1354
/*
1350
- * Set up variables to avoid per-attribute overhead. attribute_buf and
1351
- * raw_buf are used in both text and binary modes, but we use line_buf
1352
- * only in text mode.
1355
+ * Allocate buffers for the input pipeline.
1356
+ *
1357
+ * attribute_buf and raw_buf are used in both text and binary modes, but
1358
+ * input_buf and line_buf only in text mode.
1353
1359
*/
1354
- initStringInfo (& cstate -> attribute_buf );
1355
- cstate -> raw_buf = (char * ) palloc (RAW_BUF_SIZE + 1 );
1360
+ cstate -> raw_buf = palloc (RAW_BUF_SIZE + 1 );
1356
1361
cstate -> raw_buf_index = cstate -> raw_buf_len = 0 ;
1362
+ cstate -> raw_reached_eof = false;
1363
+
1357
1364
if (!cstate -> opts .binary )
1358
1365
{
1366
+ /*
1367
+ * If encoding conversion is needed, we need another buffer to hold
1368
+ * the converted input data. Otherwise, we can just point input_buf
1369
+ * to the same buffer as raw_buf.
1370
+ */
1371
+ if (cstate -> need_transcoding )
1372
+ {
1373
+ cstate -> input_buf = (char * ) palloc (INPUT_BUF_SIZE + 1 );
1374
+ cstate -> input_buf_index = cstate -> input_buf_len = 0 ;
1375
+ }
1376
+ else
1377
+ cstate -> input_buf = cstate -> raw_buf ;
1378
+ cstate -> input_reached_eof = false;
1379
+
1359
1380
initStringInfo (& cstate -> line_buf );
1360
- cstate -> line_buf_converted = false;
1361
1381
}
1362
1382
1383
+ initStringInfo (& cstate -> attribute_buf );
1384
+
1363
1385
/* Assign range table, we'll need it in CopyFrom. */
1364
1386
if (pstate )
1365
1387
cstate -> range_table = pstate -> p_rtable ;
@@ -1584,7 +1606,7 @@ ClosePipeFromProgram(CopyFromState cstate)
1584
1606
* should not report that as an error. Otherwise, SIGPIPE indicates a
1585
1607
* problem.
1586
1608
*/
1587
- if (!cstate -> reached_eof &&
1609
+ if (!cstate -> raw_reached_eof &&
1588
1610
wait_result_is_signal (pclose_rc , SIGPIPE ))
1589
1611
return ;
1590
1612
0 commit comments