Skip to content

Add file_fdw support for external decompressors #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
Binary file added contrib/file_fdw/data/agg.csv.gz
Binary file not shown.
Binary file added contrib/file_fdw/data/it's_ok.csv.gz
Binary file not shown.
162 changes: 137 additions & 25 deletions contrib/file_fdw/file_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ struct FileFdwOption
Oid optcontext; /* Oid of catalog in which option may appear */
};

/* Totally made-up compression ratio */
static const double program_compression_ratio = 2.7708899835032f;

/*
* Valid options for file_fdw.
* These options are based on the options for COPY FROM command.
Expand All @@ -68,6 +71,7 @@ static const struct FileFdwOption valid_options[] = {
{"escape", ForeignTableRelationId},
{"null", ForeignTableRelationId},
{"encoding", ForeignTableRelationId},
{"decompressor", ForeignTableRelationId},
{"force_not_null", AttributeRelationId},

/*
Expand All @@ -84,6 +88,7 @@ static const struct FileFdwOption valid_options[] = {
typedef struct FileFdwPlanState
{
char *filename; /* file to read */
bool is_program; /* whether a program is used to read the file */
List *options; /* merged COPY options, excluding filename */
BlockNumber pages; /* estimate of file's physical size */
double ntuples; /* estimate of number of rows in file */
Expand All @@ -95,6 +100,7 @@ typedef struct FileFdwPlanState
typedef struct FileFdwExecutionState
{
char *filename; /* file to read */
char *program; /* optional program to use to read file */
List *options; /* merged COPY options, excluding filename */
CopyState cstate; /* state of reading file */
} FileFdwExecutionState;
Expand Down Expand Up @@ -137,7 +143,7 @@ static bool fileAnalyzeForeignTable(Relation relation,
*/
static bool is_valid_option(const char *option, Oid context);
static void fileGetOptions(Oid foreigntableid,
char **filename, List **other_options);
char **filename, char **program, List **other_options);
static List *get_file_fdw_attribute_options(Oid relid);
static bool check_selective_binary_conversion(RelOptInfo *baserel,
Oid foreigntableid,
Expand Down Expand Up @@ -186,6 +192,7 @@ file_fdw_validator(PG_FUNCTION_ARGS)
List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
Oid catalog = PG_GETARG_OID(1);
char *filename = NULL;
char *decompressor = NULL;
DefElem *force_not_null = NULL;
List *other_options = NIL;
ListCell *cell;
Expand Down Expand Up @@ -243,9 +250,9 @@ file_fdw_validator(PG_FUNCTION_ARGS)
}

/*
* Separate out filename and force_not_null, since ProcessCopyOptions
* won't accept them. (force_not_null only comes in a boolean
* per-column flavor here.)
* Separate out filename, decompressor, and force_not_null, since
* ProcessCopyOptions won't accept them. (force_not_null only comes in
* a boolean per-column flavor here.)
*/
if (strcmp(def->defname, "filename") == 0)
{
Expand All @@ -255,6 +262,14 @@ file_fdw_validator(PG_FUNCTION_ARGS)
errmsg("conflicting or redundant options")));
filename = defGetString(def);
}
else if (strcmp(def->defname, "decompressor") == 0)
{
if (decompressor)
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("conflicting or redundant options")));
decompressor = defGetString(def);
}
else if (strcmp(def->defname, "force_not_null") == 0)
{
if (force_not_null)
Expand All @@ -274,13 +289,28 @@ file_fdw_validator(PG_FUNCTION_ARGS)
*/
ProcessCopyOptions(NULL, true, other_options);

/*
* Filename option is required for file_fdw foreign tables.
*/
if (catalog == ForeignTableRelationId && filename == NULL)
ereport(ERROR,
(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
errmsg("filename is required for file_fdw foreign tables")));
if (catalog == ForeignTableRelationId)
{
/*
* Filename option is required for file_fdw foreign tables.
*/
if (filename == NULL)
ereport(ERROR,
(errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
errmsg("filename is required for file_fdw foreign tables")));


/*
* Decompressors must be executable.
*/
if (decompressor && (access(decompressor, R_OK | X_OK) != 0))
{
ereport(ERROR,
(errcode_for_file_access(),
errmsg("decompressor must be readable/executable \"%s\": %m",
decompressor)));
}
}

PG_RETURN_VOID();
}
Expand All @@ -305,12 +335,14 @@ is_valid_option(const char *option, Oid context)
/*
* Fetch the options for a file_fdw foreign table.
*
* We have to separate out "filename" from the other options because
* it must not appear in the options list passed to the core COPY code.
* We have to separate out "filename" and "decompressor" from the other options
* because they must not appear in the options passed to the core COPY code. If
* a decompressor is present, a string consisting of it concatenated to the
* escaped file name is stored at `program`.
*/
static void
fileGetOptions(Oid foreigntableid,
char **filename, List **other_options)
char **filename, char **program, List **other_options)
{
ForeignTable *table;
ForeignServer *server;
Expand All @@ -319,6 +351,9 @@ fileGetOptions(Oid foreigntableid,
ListCell *lc,
*prev;

char *decompressor;
char *write_ptr, *token, *input, *read_ptr;

/*
* Extract options from FDW objects. We ignore user mappings because
* file_fdw doesn't have any options that can be specified there.
Expand Down Expand Up @@ -352,6 +387,27 @@ fileGetOptions(Oid foreigntableid,
options = list_delete_cell(options, lc, prev);
break;
}

prev = lc;
}

/*
* Separate out the decompressor, which will be used to calculate program.
*/
decompressor = NULL;
*program = NULL;
prev = NULL;
foreach(lc, options)
{
DefElem *def = (DefElem *) lfirst(lc);

if (strcmp(def->defname, "decompressor") == 0)
{
decompressor = defGetString(def);
options = list_delete_cell(options, lc, prev);
break;
}

prev = lc;
}

Expand All @@ -362,6 +418,41 @@ fileGetOptions(Oid foreigntableid,
if (*filename == NULL)
elog(ERROR, "filename is required for file_fdw foreign tables");

/*
* Set up the decompressor if present.
*/
if (decompressor != NULL)
{
/*
* We will escape the filename by wrapping it in single quotes. To deal
* with single quotes in the name itself, we will replace all single
* quotes with the string "'\''", which is four characters long. Strings
* of only single quotes will need four times as much space, plus the
* room for the quotes, a space, and a null terminator.
*/
*program = palloc0(
(strlen(decompressor) + (4 * strlen(*filename)) + 4)
* sizeof(char));

write_ptr = stpcpy(*program, decompressor);
write_ptr = stpcpy(write_ptr, " '");

/* We're mutating filename so copy it */
input = read_ptr = pstrdup(*filename);

write_ptr = stpcpy(write_ptr, strsep(&read_ptr, "'"));

while ((token = strsep(&read_ptr, "'")) != NULL)
{
write_ptr = stpcpy(write_ptr, "'\\''");
write_ptr = stpcpy(write_ptr, token);
}

stpcpy(write_ptr, "'");

pfree(input);
}

*other_options = options;
}

Expand Down Expand Up @@ -433,14 +524,16 @@ fileGetForeignRelSize(PlannerInfo *root,
Oid foreigntableid)
{
FileFdwPlanState *fdw_private;
char *program;

/*
* Fetch options. We only need filename at this point, but we might as
* well get everything and not need to re-fetch it later in planning.
*/
fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
fileGetOptions(foreigntableid,
&fdw_private->filename, &fdw_private->options);
fileGetOptions(foreigntableid, &fdw_private->filename, &program,
&fdw_private->options);
fdw_private->is_program = (program != NULL);
baserel->fdw_private = (void *) fdw_private;

/* Estimate relation size */
Expand Down Expand Up @@ -537,14 +630,22 @@ static void
fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
{
char *filename;
char *program;
List *options;

/* Fetch options --- we only need filename at this point */
fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
&filename, &options);
&filename, &program, &options);

ExplainPropertyText("Foreign File", filename, es);

if (program != NULL)
{
ExplainPropertyText("Foreign Program", program, es);
ExplainPropertyFloat("Foreign Program Compression Est.",
program_compression_ratio, 4, es);
}

/* Suppress file size if we're not showing cost details */
if (es->costs)
{
Expand All @@ -565,6 +666,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
{
ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
char *filename;
char *program;
List *options;
CopyState cstate;
FileFdwExecutionState *festate;
Expand All @@ -577,7 +679,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)

/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
&filename, &options);
&filename, &program, &options);

/* Add any options from the plan (currently only convert_selectively) */
options = list_concat(options, plan->fdw_private);
Expand All @@ -587,8 +689,8 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
* as to match the expected ScanTupleSlot signature.
*/
cstate = BeginCopyFrom(node->ss.ss_currentRelation,
filename,
false,
(program != NULL) ? program : filename,
(program != NULL),
NIL,
options);

Expand All @@ -598,6 +700,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
*/
festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
festate->filename = filename;
festate->program = program;
festate->options = options;
festate->cstate = cstate;

Expand Down Expand Up @@ -656,12 +759,16 @@ static void
fileReScanForeignScan(ForeignScanState *node)
{
FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
char *filename_or_program;

EndCopyFrom(festate->cstate);

filename_or_program =
(festate->program != NULL) ? festate->program : festate->filename;

festate->cstate = BeginCopyFrom(node->ss.ss_currentRelation,
festate->filename,
false,
filename_or_program,
(festate->program != NULL),
NIL,
festate->options);
}
Expand Down Expand Up @@ -690,11 +797,12 @@ fileAnalyzeForeignTable(Relation relation,
BlockNumber *totalpages)
{
char *filename;
char *program;
List *options;
struct stat stat_buf;

/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(relation), &filename, &options);
fileGetOptions(RelationGetRelid(relation), &filename, &program, &options);

/*
* Get size of the file. (XXX if we fail here, would it be better to just
Expand Down Expand Up @@ -900,6 +1008,8 @@ estimate_size(PlannerInfo *root, RelOptInfo *baserel,
MAXALIGN(sizeof(HeapTupleHeaderData));
ntuples = clamp_row_est((double) stat_buf.st_size /
(double) tuple_width);
if (fdw_private->is_program)
ntuples *= program_compression_ratio;
}
fdw_private->ntuples = ntuples;

Expand Down Expand Up @@ -976,6 +1086,7 @@ file_acquire_sample_rows(Relation onerel, int elevel,
bool *nulls;
bool found;
char *filename;
char *program;
List *options;
CopyState cstate;
ErrorContextCallback errcallback;
Expand All @@ -990,12 +1101,13 @@ file_acquire_sample_rows(Relation onerel, int elevel,
nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));

/* Fetch options of foreign table */
fileGetOptions(RelationGetRelid(onerel), &filename, &options);
fileGetOptions(RelationGetRelid(onerel), &filename, &program, &options);

/*
* Create CopyState from FDW options.
*/
cstate = BeginCopyFrom(onerel, filename, false, NIL, options);
cstate = BeginCopyFrom(onerel, (program != NULL) ? program : filename,
(program != NULL), NIL, options);

/*
* Use per-tuple memory context to prevent leak of memory used to read
Expand Down
Loading