Skip to content

Commit 9fcdf2c

Browse files
committed
Add support for COPY TO callback functions
This is useful as a way for extensions to process COPY TO rows in the way they see fit (say auditing, analytics, backend, etc.) without the need to invoke an external process running as the OS user running the backend through PROGRAM that requires superuser rights. COPY FROM already provides a similar callback for logical replication. For COPY TO, the callback is triggered when we are ready to send a row in CopySendEndOfRow(), which is the same code path as when sending a row to a frontend or a pipe/file. A small test module, test_copy_callbacks, is added to provide some coverage for this facility. Author: Bilva Sanaba, Nathan Bossart Discussion: https://postgr.es/m/253C21D1-FCEB-41D9-A2AF-E6517015B7D7@amazon.com
1 parent 0e87dfe commit 9fcdf2c

File tree

14 files changed

+175
-5
lines changed

14 files changed

+175
-5
lines changed

src/backend/commands/copy.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
310310

311311
cstate = BeginCopyTo(pstate, rel, query, relid,
312312
stmt->filename, stmt->is_program,
313-
stmt->attlist, stmt->options);
313+
NULL, stmt->attlist, stmt->options);
314314
*processed = DoCopyTo(cstate); /* copy from database to file */
315315
EndCopyTo(cstate);
316316
}

src/backend/commands/copyto.c

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ typedef enum CopyDest
5151
{
5252
COPY_FILE, /* to file (or a piped program) */
5353
COPY_FRONTEND, /* to frontend */
54+
COPY_CALLBACK /* to callback function */
5455
} CopyDest;
5556

5657
/*
@@ -85,6 +86,7 @@ typedef struct CopyToStateData
8586
List *attnumlist; /* integer list of attnums to copy */
8687
char *filename; /* filename, or NULL for STDOUT */
8788
bool is_program; /* is 'filename' a program to popen? */
89+
copy_data_dest_cb data_dest_cb; /* function for writing data */
8890

8991
CopyFormatOptions opts;
9092
Node *whereClause; /* WHERE condition (or NULL) */
@@ -247,6 +249,9 @@ CopySendEndOfRow(CopyToState cstate)
247249
/* Dump the accumulated row as one CopyData message */
248250
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
249251
break;
252+
case COPY_CALLBACK:
253+
cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
254+
break;
250255
}
251256

252257
/* Update the progress */
@@ -336,6 +341,17 @@ EndCopy(CopyToState cstate)
336341

337342
/*
338343
* Setup CopyToState to read tuples from a table or a query for COPY TO.
344+
*
345+
* 'rel': Relation to be copied
346+
* 'raw_query': Query whose results are to be copied
347+
* 'queryRelId': OID of base relation to convert to a query (for RLS)
348+
* 'filename': Name of server-local file to write, NULL for STDOUT
349+
* 'is_program': true if 'filename' is program to execute
350+
* 'data_dest_cb': Callback that processes the output data
351+
* 'attnamelist': List of char *, columns to include. NIL selects all cols.
352+
* 'options': List of DefElem. See copy_opt_item in gram.y for selections.
353+
*
354+
* Returns a CopyToState, to be passed to DoCopyTo() and related functions.
339355
*/
340356
CopyToState
341357
BeginCopyTo(ParseState *pstate,
@@ -344,11 +360,12 @@ BeginCopyTo(ParseState *pstate,
344360
Oid queryRelId,
345361
const char *filename,
346362
bool is_program,
363+
copy_data_dest_cb data_dest_cb,
347364
List *attnamelist,
348365
List *options)
349366
{
350367
CopyToState cstate;
351-
bool pipe = (filename == NULL);
368+
bool pipe = (filename == NULL && data_dest_cb == NULL);
352369
TupleDesc tupDesc;
353370
int num_phys_attrs;
354371
MemoryContext oldcontext;
@@ -656,7 +673,13 @@ BeginCopyTo(ParseState *pstate,
656673

657674
cstate->copy_dest = COPY_FILE; /* default */
658675

659-
if (pipe)
676+
if (data_dest_cb)
677+
{
678+
progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
679+
cstate->copy_dest = COPY_CALLBACK;
680+
cstate->data_dest_cb = data_dest_cb;
681+
}
682+
else if (pipe)
660683
{
661684
progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
662685

@@ -765,11 +788,13 @@ EndCopyTo(CopyToState cstate)
765788

766789
/*
767790
* Copy from relation or query TO file.
791+
*
792+
* Returns the number of rows processed.
768793
*/
769794
uint64
770795
DoCopyTo(CopyToState cstate)
771796
{
772-
bool pipe = (cstate->filename == NULL);
797+
bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
773798
bool fe_copy = (pipe && whereToSendOutput == DestRemote);
774799
TupleDesc tupDesc;
775800
int num_phys_attrs;

src/include/commands/copy.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ typedef struct CopyFromStateData *CopyFromState;
6666
typedef struct CopyToStateData *CopyToState;
6767

6868
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
69+
typedef void (*copy_data_dest_cb) (void *data, int len);
6970

7071
extern void DoCopy(ParseState *pstate, const CopyStmt *stmt,
7172
int stmt_location, int stmt_len,
@@ -91,7 +92,7 @@ extern DestReceiver *CreateCopyDestReceiver(void);
9192
*/
9293
extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *raw_query,
9394
Oid queryRelId, const char *filename, bool is_program,
94-
List *attnamelist, List *options);
95+
copy_data_dest_cb data_dest_cb, List *attnamelist, List *options);
9596
extern void EndCopyTo(CopyToState cstate);
9697
extern uint64 DoCopyTo(CopyToState cstate);
9798
extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,

src/test/modules/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ SUBDIRS = \
1515
snapshot_too_old \
1616
spgist_name_ops \
1717
test_bloomfilter \
18+
test_copy_callbacks \
1819
test_ddl_deparse \
1920
test_extensions \
2021
test_ginpostinglist \

src/test/modules/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ subdir('snapshot_too_old')
99
subdir('spgist_name_ops')
1010
subdir('ssl_passphrase_callback')
1111
subdir('test_bloomfilter')
12+
subdir('test_copy_callbacks')
1213
subdir('test_ddl_deparse')
1314
subdir('test_extensions')
1415
subdir('test_ginpostinglist')
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Generated subdirectories
2+
/log/
3+
/results/
4+
/tmp_check/
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# src/test/modules/test_copy_callbacks/Makefile
2+
3+
MODULE_big = test_copy_callbacks
4+
OBJS = \
5+
$(WIN32RES) \
6+
test_copy_callbacks.o
7+
PGFILEDESC = "test_copy_callbacks - test COPY callbacks"
8+
9+
EXTENSION = test_copy_callbacks
10+
DATA = test_copy_callbacks--1.0.sql
11+
12+
REGRESS = test_copy_callbacks
13+
14+
ifdef USE_PGXS
15+
PG_CONFIG = pg_config
16+
PGXS := $(shell $(PG_CONFIG) --pgxs)
17+
include $(PGXS)
18+
else
19+
subdir = src/test/modules/test_copy_callbacks
20+
top_builddir = ../../../..
21+
include $(top_builddir)/src/Makefile.global
22+
include $(top_srcdir)/contrib/contrib-global.mk
23+
endif
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
CREATE EXTENSION test_copy_callbacks;
2+
CREATE TABLE public.test (a INT, b INT, c INT);
3+
INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
4+
SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
5+
NOTICE: COPY TO callback called with data "1 2 3" and length 5
6+
NOTICE: COPY TO callback called with data "12 34 56" and length 8
7+
NOTICE: COPY TO callback called with data "123 456 789" and length 11
8+
NOTICE: COPY TO callback has processed 3 rows
9+
test_copy_to_callback
10+
-----------------------
11+
12+
(1 row)
13+
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
# FIXME: prevent install during main install, but not during test :/
2+
3+
test_copy_callbacks_sources = files(
4+
'test_copy_callbacks.c',
5+
)
6+
7+
if host_system == 'windows'
8+
test_copy_callbacks_sources += rc_lib_gen.process(win32ver_rc, extra_args: [
9+
'--NAME', 'test_copy_callbacks',
10+
'--FILEDESC', 'test_copy_callbacks - test COPY callbacks',])
11+
endif
12+
13+
test_copy_callbacks = shared_module('test_copy_callbacks',
14+
test_copy_callbacks_sources,
15+
kwargs: pg_mod_args,
16+
)
17+
testprep_targets += test_copy_callbacks
18+
19+
install_data(
20+
'test_copy_callbacks.control',
21+
'test_copy_callbacks--1.0.sql',
22+
kwargs: contrib_data_args,
23+
)
24+
25+
tests += {
26+
'name': 'test_copy_callbacks',
27+
'sd': meson.current_source_dir(),
28+
'bd': meson.current_build_dir(),
29+
'regress': {
30+
'sql': [
31+
'test_copy_callbacks',
32+
],
33+
},
34+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
CREATE EXTENSION test_copy_callbacks;
2+
CREATE TABLE public.test (a INT, b INT, c INT);
3+
INSERT INTO public.test VALUES (1, 2, 3), (12, 34, 56), (123, 456, 789);
4+
SELECT test_copy_to_callback('public.test'::pg_catalog.regclass);
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
/* src/test/modules/test_copy_callbacks/test_copy_callbacks--1.0.sql */
2+
3+
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
4+
\echo Use "CREATE EXTENSION test_copy_callbacks" to load this file. \quit
5+
6+
CREATE FUNCTION test_copy_to_callback(pg_catalog.regclass)
7+
RETURNS pg_catalog.void
8+
AS 'MODULE_PATHNAME' LANGUAGE C;
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*--------------------------------------------------------------------------
2+
*
3+
* test_copy_callbacks.c
4+
* Code for testing COPY callbacks.
5+
*
6+
* Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
7+
* Portions Copyright (c) 1994, Regents of the University of California
8+
*
9+
* IDENTIFICATION
10+
* src/test/modules/test_copy_callbacks/test_copy_callbacks.c
11+
*
12+
* -------------------------------------------------------------------------
13+
*/
14+
15+
#include "postgres.h"
16+
17+
#include "access/table.h"
18+
#include "commands/copy.h"
19+
#include "fmgr.h"
20+
#include "utils/rel.h"
21+
22+
PG_MODULE_MAGIC;
23+
24+
static void
25+
to_cb(void *data, int len)
26+
{
27+
ereport(NOTICE,
28+
(errmsg("COPY TO callback called with data \"%s\" and length %d",
29+
(char *) data, len)));
30+
}
31+
32+
PG_FUNCTION_INFO_V1(test_copy_to_callback);
33+
Datum
34+
test_copy_to_callback(PG_FUNCTION_ARGS)
35+
{
36+
Relation rel = table_open(PG_GETARG_OID(0), AccessShareLock);
37+
CopyToState cstate;
38+
int64 processed;
39+
40+
cstate = BeginCopyTo(NULL, rel, NULL, RelationGetRelid(rel), NULL, NULL,
41+
to_cb, NIL, NIL);
42+
processed = DoCopyTo(cstate);
43+
EndCopyTo(cstate);
44+
45+
ereport(NOTICE, (errmsg("COPY TO callback has processed %lld rows",
46+
(long long) processed)));
47+
48+
table_close(rel, NoLock);
49+
50+
PG_RETURN_VOID();
51+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
comment = 'Test code for COPY callbacks'
2+
default_version = '1.0'
3+
module_pathname = '$libdir/test_copy_callbacks'
4+
relocatable = true

src/tools/pgindent/typedefs.list

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3177,6 +3177,7 @@ compare_context
31773177
config_var_value
31783178
contain_aggs_of_level_context
31793179
convert_testexpr_context
3180+
copy_data_dest_cb
31803181
copy_data_source_cb
31813182
core_YYSTYPE
31823183
core_yy_extra_type

0 commit comments

Comments
 (0)