Skip to content

Commit 9d2d457

Browse files
committed
Add support for more progress reporting in COPY
The command (TO or FROM), its type (file, pipe, program or callback), and the number of tuples excluded by a WHERE clause in COPY FROM are added to the progress reporting already available. The column "lines_processed" is renamed to "tuples_processed" to disambiguate the meaning of this column in the cases of CSV and BINARY COPY and to be more consistent with the other catalog progress views. Bump catalog version, again. Author: Matthias van de Meent Reviewed-by: Michael Paquier, Justin Pryzby, Bharath Rupireddy, Josef Šimánek, Tomas Vondra Discussion: https://postgr.es/m/CAEze2WiOcgdH4aQA8NtZq-4dgvnJzp8PohdeKchPkhMY-jWZXA@mail.gmail.com
1 parent f9264d1 commit 9d2d457

File tree

7 files changed

+134
-18
lines changed

7 files changed

+134
-18
lines changed

doc/src/sgml/monitoring.sgml

+40-5
Original file line numberDiff line numberDiff line change
@@ -6531,8 +6531,33 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
65316531
<structfield>relid</structfield> <type>oid</type>
65326532
</para>
65336533
<para>
6534-
OID of the table on which the <command>COPY</command> command is executed.
6535-
It is set to 0 if copying from a <command>SELECT</command> query.
6534+
OID of the table on which the <command>COPY</command> command is
6535+
executed. It is set to <literal>0</literal> if copying from a
6536+
<command>SELECT</command> query.
6537+
</para></entry>
6538+
</row>
6539+
6540+
<row>
6541+
<entry role="catalog_table_entry"><para role="column_definition">
6542+
<structfield>command</structfield> <type>text</type>
6543+
</para>
6544+
<para>
6545+
The command that is running: <literal>COPY FROM</literal>, or
6546+
<literal>COPY TO</literal>.
6547+
</para></entry>
6548+
</row>
6549+
6550+
<row>
6551+
<entry role="catalog_table_entry"><para role="column_definition">
6552+
<structfield>type</structfield> <type>text</type>
6553+
</para>
6554+
<para>
6555+
The io type that the data is read from or written to:
6556+
<literal>FILE</literal>, <literal>PROGRAM</literal>,
6557+
<literal>PIPE</literal> (for <command>COPY FROM STDIN</command> and
6558+
<command>COPY TO STDOUT</command>), or <literal>CALLBACK</literal>
6559+
(used for example during the initial table synchronization in
6560+
logical replication).
65366561
</para></entry>
65376562
</row>
65386563

@@ -6551,16 +6576,26 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
65516576
</para>
65526577
<para>
65536578
Size of source file for <command>COPY FROM</command> command in bytes.
6554-
It is set to 0 if not available.
6579+
It is set to <literal>0</literal> if not available.
6580+
</para></entry>
6581+
</row>
6582+
6583+
<row>
6584+
<entry role="catalog_table_entry"><para role="column_definition">
6585+
<structfield>tuples_processed</structfield> <type>bigint</type>
6586+
</para>
6587+
<para>
6588+
Number of tuples already processed by <command>COPY</command> command.
65556589
</para></entry>
65566590
</row>
65576591

65586592
<row>
65596593
<entry role="catalog_table_entry"><para role="column_definition">
6560-
<structfield>lines_processed</structfield> <type>bigint</type>
6594+
<structfield>tuples_excluded</structfield> <type>bigint</type>
65616595
</para>
65626596
<para>
6563-
Number of lines already processed by <command>COPY</command> command.
6597+
Number of tuples not processed because they were excluded by the
6598+
<command>WHERE</command> clause of the <command>COPY</command> command.
65646599
</para></entry>
65656600
</row>
65666601
</tbody>

src/backend/catalog/system_views.sql

+10-1
Original file line numberDiff line numberDiff line change
@@ -1129,9 +1129,18 @@ CREATE VIEW pg_stat_progress_copy AS
11291129
SELECT
11301130
S.pid AS pid, S.datid AS datid, D.datname AS datname,
11311131
S.relid AS relid,
1132+
CASE S.param5 WHEN 1 THEN 'COPY FROM'
1133+
WHEN 2 THEN 'COPY TO'
1134+
END AS command,
1135+
CASE S.param6 WHEN 1 THEN 'FILE'
1136+
WHEN 2 THEN 'PROGRAM'
1137+
WHEN 3 THEN 'PIPE'
1138+
WHEN 4 THEN 'CALLBACK'
1139+
END AS "type",
11321140
S.param1 AS bytes_processed,
11331141
S.param2 AS bytes_total,
1134-
S.param3 AS lines_processed
1142+
S.param3 AS tuples_processed,
1143+
S.param4 AS tuples_excluded
11351144
FROM pg_stat_get_progress_info('COPY') AS S
11361145
LEFT JOIN pg_database D ON S.datid = D.oid;
11371146

src/backend/commands/copyfrom.c

+30-4
Original file line numberDiff line numberDiff line change
@@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate)
539539
BulkInsertState bistate = NULL;
540540
CopyInsertMethod insertMethod;
541541
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
542-
uint64 processed = 0;
542+
int64 processed = 0;
543+
int64 excluded = 0;
543544
bool has_before_insert_row_trig;
544545
bool has_instead_insert_row_trig;
545546
bool leafpart_use_multi_insert = false;
@@ -869,7 +870,15 @@ CopyFrom(CopyFromState cstate)
869870
econtext->ecxt_scantuple = myslot;
870871
/* Skip items that don't match COPY's WHERE clause */
871872
if (!ExecQual(cstate->qualexpr, econtext))
873+
{
874+
/*
875+
* Report that this tuple was filtered out by the WHERE
876+
* clause.
877+
*/
878+
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
879+
++excluded);
872880
continue;
881+
}
873882
}
874883

875884
/* Determine the partition to insert the tuple into */
@@ -1104,10 +1113,11 @@ CopyFrom(CopyFromState cstate)
11041113
/*
11051114
* We count only tuples not suppressed by a BEFORE INSERT trigger
11061115
* or FDW; this is the same definition used by nodeModifyTable.c
1107-
* for counting tuples inserted by an INSERT command. Update
1116+
* for counting tuples inserted by an INSERT command. Update
11081117
* progress of the COPY command as well.
11091118
*/
1110-
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
1119+
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1120+
++processed);
11111121
}
11121122
}
11131123

@@ -1193,6 +1203,16 @@ BeginCopyFrom(ParseState *pstate,
11931203
ExprState **defexprs;
11941204
MemoryContext oldcontext;
11951205
bool volatile_defexprs;
1206+
const int progress_cols[] = {
1207+
PROGRESS_COPY_COMMAND,
1208+
PROGRESS_COPY_TYPE,
1209+
PROGRESS_COPY_BYTES_TOTAL
1210+
};
1211+
int64 progress_vals[] = {
1212+
PROGRESS_COPY_COMMAND_FROM,
1213+
0,
1214+
0
1215+
};
11961216

11971217
/* Allocate workspace and zero all fields */
11981218
cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
@@ -1430,11 +1450,13 @@ BeginCopyFrom(ParseState *pstate,
14301450

14311451
if (data_source_cb)
14321452
{
1453+
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
14331454
cstate->copy_src = COPY_CALLBACK;
14341455
cstate->data_source_cb = data_source_cb;
14351456
}
14361457
else if (pipe)
14371458
{
1459+
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
14381460
Assert(!is_program); /* the grammar does not allow this */
14391461
if (whereToSendOutput == DestRemote)
14401462
ReceiveCopyBegin(cstate);
@@ -1447,6 +1469,7 @@ BeginCopyFrom(ParseState *pstate,
14471469

14481470
if (cstate->is_program)
14491471
{
1472+
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
14501473
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
14511474
if (cstate->copy_file == NULL)
14521475
ereport(ERROR,
@@ -1458,6 +1481,7 @@ BeginCopyFrom(ParseState *pstate,
14581481
{
14591482
struct stat st;
14601483

1484+
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
14611485
cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
14621486
if (cstate->copy_file == NULL)
14631487
{
@@ -1484,10 +1508,12 @@ BeginCopyFrom(ParseState *pstate,
14841508
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
14851509
errmsg("\"%s\" is a directory", cstate->filename)));
14861510

1487-
pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
1511+
progress_vals[2] = st.st_size;
14881512
}
14891513
}
14901514

1515+
pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1516+
14911517
if (cstate->opts.binary)
14921518
{
14931519
/* Read and verify binary header */

src/backend/commands/copyto.c

+24-4
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate,
353353
TupleDesc tupDesc;
354354
int num_phys_attrs;
355355
MemoryContext oldcontext;
356+
const int progress_cols[] = {
357+
PROGRESS_COPY_COMMAND,
358+
PROGRESS_COPY_TYPE
359+
};
360+
int64 progress_vals[] = {
361+
PROGRESS_COPY_COMMAND_TO,
362+
0
363+
};
356364

357365
if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
358366
{
@@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate,
659667

660668
if (pipe)
661669
{
670+
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
671+
662672
Assert(!is_program); /* the grammar does not allow this */
663673
if (whereToSendOutput != DestRemote)
664674
cstate->copy_file = stdout;
@@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate,
670680

671681
if (is_program)
672682
{
683+
progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
673684
cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
674685
if (cstate->copy_file == NULL)
675686
ereport(ERROR,
@@ -682,6 +693,8 @@ BeginCopyTo(ParseState *pstate,
682693
mode_t oumask; /* Pre-existing umask value */
683694
struct stat st;
684695

696+
progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
697+
685698
/*
686699
* Prevent write to relative path ... too easy to shoot oneself in
687700
* the foot by overwriting a database file ...
@@ -731,6 +744,8 @@ BeginCopyTo(ParseState *pstate,
731744
/* initialize progress */
732745
pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
733746
cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
747+
pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
748+
734749
cstate->bytes_processed = 0;
735750

736751
MemoryContextSwitchTo(oldcontext);
@@ -881,8 +896,12 @@ DoCopyTo(CopyToState cstate)
881896
/* Format and send the data */
882897
CopyOneRowTo(cstate, slot);
883898

884-
/* Increment amount of processed tuples and update the progress */
885-
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
899+
/*
900+
* Increment the number of processed tuples, and report the
901+
* progress.
902+
*/
903+
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
904+
++processed);
886905
}
887906

888907
ExecDropSingleTupleTableSlot(slot);
@@ -1251,8 +1270,9 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
12511270
/* Send the data */
12521271
CopyOneRowTo(cstate, slot);
12531272

1254-
/* Increment amount of processed tuples and update the progress */
1255-
pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
1273+
/* Increment the number of processed tuples, and report the progress */
1274+
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1275+
++myState->processed);
12561276

12571277
return true;
12581278
}

src/include/catalog/catversion.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/* yyyymmddN */
56-
#define CATALOG_VERSION_NO 202103091
56+
#define CATALOG_VERSION_NO 202103092
5757

5858
#endif

src/include/commands/progress.h

+15-2
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,22 @@
133133
#define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4
134134
#define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5
135135

136-
/* Commands of PROGRESS_COPY */
136+
/* Progress parameters for PROGRESS_COPY */
137137
#define PROGRESS_COPY_BYTES_PROCESSED 0
138138
#define PROGRESS_COPY_BYTES_TOTAL 1
139-
#define PROGRESS_COPY_LINES_PROCESSED 2
139+
#define PROGRESS_COPY_TUPLES_PROCESSED 2
140+
#define PROGRESS_COPY_TUPLES_EXCLUDED 3
141+
#define PROGRESS_COPY_COMMAND 4
142+
#define PROGRESS_COPY_TYPE 5
143+
144+
/* Commands of COPY (as advertised via PROGRESS_COPY_COMMAND) */
145+
#define PROGRESS_COPY_COMMAND_FROM 1
146+
#define PROGRESS_COPY_COMMAND_TO 2
147+
148+
/* Types of COPY commands (as advertised via PROGRESS_COPY_TYPE) */
149+
#define PROGRESS_COPY_TYPE_FILE 1
150+
#define PROGRESS_COPY_TYPE_PROGRAM 2
151+
#define PROGRESS_COPY_TYPE_PIPE 3
152+
#define PROGRESS_COPY_TYPE_CALLBACK 4
140153

141154
#endif

src/test/regress/expected/rules.out

+14-1
Original file line numberDiff line numberDiff line change
@@ -1950,9 +1950,22 @@ pg_stat_progress_copy| SELECT s.pid,
19501950
s.datid,
19511951
d.datname,
19521952
s.relid,
1953+
CASE s.param5
1954+
WHEN 1 THEN 'COPY FROM'::text
1955+
WHEN 2 THEN 'COPY TO'::text
1956+
ELSE NULL::text
1957+
END AS command,
1958+
CASE s.param6
1959+
WHEN 1 THEN 'FILE'::text
1960+
WHEN 2 THEN 'PROGRAM'::text
1961+
WHEN 3 THEN 'PIPE'::text
1962+
WHEN 4 THEN 'CALLBACK'::text
1963+
ELSE NULL::text
1964+
END AS type,
19531965
s.param1 AS bytes_processed,
19541966
s.param2 AS bytes_total,
1955-
s.param3 AS lines_processed
1967+
s.param3 AS tuples_processed,
1968+
s.param4 AS tuples_excluded
19561969
FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
19571970
LEFT JOIN pg_database d ON ((s.datid = d.oid)));
19581971
pg_stat_progress_create_index| SELECT s.pid,

0 commit comments

Comments
 (0)