TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * file_fdw.c
4 : * foreign-data wrapper for server-side flat files (or programs).
5 : *
6 : * Copyright (c) 2010-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/file_fdw/file_fdw.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include <sys/stat.h>
16 : #include <unistd.h>
17 :
18 : #include "access/htup_details.h"
19 : #include "access/reloptions.h"
20 : #include "access/sysattr.h"
21 : #include "access/table.h"
22 : #include "catalog/pg_authid.h"
23 : #include "catalog/pg_foreign_table.h"
24 : #include "commands/copy.h"
25 : #include "commands/defrem.h"
26 : #include "commands/explain.h"
27 : #include "commands/vacuum.h"
28 : #include "foreign/fdwapi.h"
29 : #include "foreign/foreign.h"
30 : #include "miscadmin.h"
31 : #include "nodes/makefuncs.h"
32 : #include "optimizer/optimizer.h"
33 : #include "optimizer/pathnode.h"
34 : #include "optimizer/planmain.h"
35 : #include "optimizer/restrictinfo.h"
36 : #include "utils/acl.h"
37 : #include "utils/memutils.h"
38 : #include "utils/rel.h"
39 : #include "utils/sampling.h"
40 : #include "utils/varlena.h"
41 :
42 GIC 1 : PG_MODULE_MAGIC;
43 ECB :
44 : /*
45 : * Describes the valid options for objects that use this wrapper.
46 : */
47 : struct FileFdwOption
48 : {
49 : const char *optname;
50 : Oid optcontext; /* Oid of catalog in which option may appear */
51 : };
52 :
53 : /*
54 : * Valid options for file_fdw.
55 : * These options are based on the options for the COPY FROM command.
56 : * But note that force_not_null and force_null are handled as boolean options
57 : * attached to a column, not as table options.
58 : *
59 : * Note: If you are adding new option for user mapping, you need to modify
60 : * fileGetOptions(), which currently doesn't bother to look at user mappings.
61 : */
62 : static const struct FileFdwOption valid_options[] = {
63 : /* Data source options */
64 : {"filename", ForeignTableRelationId},
65 : {"program", ForeignTableRelationId},
66 :
67 : /* Format options */
68 : /* oids option is not supported */
69 : {"format", ForeignTableRelationId},
70 : {"header", ForeignTableRelationId},
71 : {"delimiter", ForeignTableRelationId},
72 : {"quote", ForeignTableRelationId},
73 : {"escape", ForeignTableRelationId},
74 : {"null", ForeignTableRelationId},
75 : {"default", ForeignTableRelationId},
76 : {"encoding", ForeignTableRelationId},
77 : {"force_not_null", AttributeRelationId},
78 : {"force_null", AttributeRelationId},
79 :
80 : /*
81 : * force_quote is not supported by file_fdw because it's for COPY TO.
82 : */
83 :
84 : /* Sentinel */
85 : {NULL, InvalidOid}
86 : };
87 :
88 : /*
89 : * FDW-specific information for RelOptInfo.fdw_private.
90 : */
91 : typedef struct FileFdwPlanState
92 : {
93 : char *filename; /* file or program to read from */
94 : bool is_program; /* true if filename represents an OS command */
95 : List *options; /* merged COPY options, excluding filename and
96 : * is_program */
97 : BlockNumber pages; /* estimate of file's physical size */
98 : double ntuples; /* estimate of number of data rows */
99 : } FileFdwPlanState;
100 :
101 : /*
102 : * FDW-specific information for ForeignScanState.fdw_state.
103 : */
104 : typedef struct FileFdwExecutionState
105 : {
106 : char *filename; /* file or program to read from */
107 : bool is_program; /* true if filename represents an OS command */
108 : List *options; /* merged COPY options, excluding filename and
109 : * is_program */
110 : CopyFromState cstate; /* COPY execution state */
111 : } FileFdwExecutionState;
112 :
113 : /*
114 : * SQL functions
115 : */
116 GIC 2 : PG_FUNCTION_INFO_V1(file_fdw_handler);
117 2 : PG_FUNCTION_INFO_V1(file_fdw_validator);
118 ECB :
119 : /*
120 : * FDW callback routines
121 : */
122 : static void fileGetForeignRelSize(PlannerInfo *root,
123 : RelOptInfo *baserel,
124 : Oid foreigntableid);
125 : static void fileGetForeignPaths(PlannerInfo *root,
126 : RelOptInfo *baserel,
127 : Oid foreigntableid);
128 : static ForeignScan *fileGetForeignPlan(PlannerInfo *root,
129 : RelOptInfo *baserel,
130 : Oid foreigntableid,
131 : ForeignPath *best_path,
132 : List *tlist,
133 : List *scan_clauses,
134 : Plan *outer_plan);
135 : static void fileExplainForeignScan(ForeignScanState *node, ExplainState *es);
136 : static void fileBeginForeignScan(ForeignScanState *node, int eflags);
137 : static TupleTableSlot *fileIterateForeignScan(ForeignScanState *node);
138 : static void fileReScanForeignScan(ForeignScanState *node);
139 : static void fileEndForeignScan(ForeignScanState *node);
140 : static bool fileAnalyzeForeignTable(Relation relation,
141 : AcquireSampleRowsFunc *func,
142 : BlockNumber *totalpages);
143 : static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
144 : RangeTblEntry *rte);
145 :
146 : /*
147 : * Helper functions
148 : */
149 : static bool is_valid_option(const char *option, Oid context);
150 : static void fileGetOptions(Oid foreigntableid,
151 : char **filename,
152 : bool *is_program,
153 : List **other_options);
154 : static List *get_file_fdw_attribute_options(Oid relid);
155 : static bool check_selective_binary_conversion(RelOptInfo *baserel,
156 : Oid foreigntableid,
157 : List **columns);
158 : static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
159 : FileFdwPlanState *fdw_private);
160 : static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
161 : FileFdwPlanState *fdw_private,
162 : Cost *startup_cost, Cost *total_cost);
163 : static int file_acquire_sample_rows(Relation onerel, int elevel,
164 : HeapTuple *rows, int targrows,
165 : double *totalrows, double *totaldeadrows);
166 :
167 :
168 : /*
169 : * Foreign-data wrapper handler function: return a struct with pointers
170 : * to my callback routines.
171 : */
172 : Datum
173 GIC 15 : file_fdw_handler(PG_FUNCTION_ARGS)
174 : {
175 CBC 15 : FdwRoutine *fdwroutine = makeNode(FdwRoutine);
176 :
177 15 : fdwroutine->GetForeignRelSize = fileGetForeignRelSize;
178 GIC 15 : fdwroutine->GetForeignPaths = fileGetForeignPaths;
179 CBC 15 : fdwroutine->GetForeignPlan = fileGetForeignPlan;
180 15 : fdwroutine->ExplainForeignScan = fileExplainForeignScan;
181 15 : fdwroutine->BeginForeignScan = fileBeginForeignScan;
182 15 : fdwroutine->IterateForeignScan = fileIterateForeignScan;
183 15 : fdwroutine->ReScanForeignScan = fileReScanForeignScan;
184 15 : fdwroutine->EndForeignScan = fileEndForeignScan;
185 15 : fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
186 15 : fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
187 ECB :
188 CBC 15 : PG_RETURN_POINTER(fdwroutine);
189 : }
190 ECB :
191 : /*
192 : * Validate the generic options given to a FOREIGN DATA WRAPPER, SERVER,
193 : * USER MAPPING or FOREIGN TABLE that uses file_fdw.
194 : *
195 : * Raise an ERROR if the option or its value is considered invalid.
196 : */
197 : Datum
198 GIC 52 : file_fdw_validator(PG_FUNCTION_ARGS)
199 : {
200 CBC 52 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
201 GIC 52 : Oid catalog = PG_GETARG_OID(1);
202 CBC 52 : char *filename = NULL;
203 52 : DefElem *force_not_null = NULL;
204 52 : DefElem *force_null = NULL;
205 52 : List *other_options = NIL;
206 ECB : ListCell *cell;
207 :
208 : /*
209 : * Check that only options supported by file_fdw, and allowed for the
210 : * current object type, are given.
211 : */
212 GIC 148 : foreach(cell, options_list)
213 : {
214 CBC 105 : DefElem *def = (DefElem *) lfirst(cell);
215 :
216 105 : if (!is_valid_option(def->defname, catalog))
217 : {
218 ECB : const struct FileFdwOption *opt;
219 : const char *closest_match;
220 : ClosestMatchState match_state;
221 GNC 8 : bool has_valid_options = false;
222 :
223 : /*
224 : * Unknown option specified, complain about it. Provide a hint
225 : * with a valid option that looks similar, if there is one.
226 : */
227 8 : initClosestMatch(&match_state, def->defname, 4);
228 GIC 104 : for (opt = valid_options; opt->optname; opt++)
229 : {
230 96 : if (catalog == opt->optcontext)
231 : {
232 GNC 20 : has_valid_options = true;
233 20 : updateClosestMatch(&match_state, opt->optname);
234 : }
235 : }
236 ECB :
237 GNC 8 : closest_match = getClosestMatch(&match_state);
238 GIC 8 : ereport(ERROR,
239 ECB : (errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
240 : errmsg("invalid option \"%s\"", def->defname),
241 : has_valid_options ? closest_match ?
242 : errhint("Perhaps you meant the option \"%s\".",
243 : closest_match) : 0 :
244 : errhint("There are no valid options in this context.")));
245 : }
246 :
247 : /*
248 : * Separate out filename, program, and column-specific options, since
249 : * ProcessCopyOptions won't accept them.
250 : */
251 GIC 97 : if (strcmp(def->defname, "filename") == 0 ||
252 85 : strcmp(def->defname, "program") == 0)
253 : {
254 12 : if (filename)
255 UIC 0 : ereport(ERROR,
256 : (errcode(ERRCODE_SYNTAX_ERROR),
257 : errmsg("conflicting or redundant options")));
258 ECB :
259 : /*
260 : * Check permissions for changing which file or program is used by
261 : * the file_fdw.
262 EUB : *
263 : * Only members of the role 'pg_read_server_files' are allowed to
264 : * set the 'filename' option of a file_fdw foreign table, while
265 : * only members of the role 'pg_execute_server_program' are
266 : * allowed to set the 'program' option. This is because we don't
267 : * want regular users to be able to control which file gets read
268 : * or which program gets executed.
269 : *
270 : * Putting this sort of permissions check in a validator is a bit
271 : * of a crock, but there doesn't seem to be any other place that
272 : * can enforce the check more cleanly.
273 : *
274 : * Note that the valid_options[] array disallows setting filename
275 : * and program at any options level other than foreign table ---
276 : * otherwise there'd still be a security hole.
277 : */
278 GIC 12 : if (strcmp(def->defname, "filename") == 0 &&
279 12 : !has_privs_of_role(GetUserId(), ROLE_PG_READ_SERVER_FILES))
280 1 : ereport(ERROR,
281 : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
282 : errmsg("permission denied to set the \"%s\" option of a file_fdw foreign table",
283 : "filename"),
284 : errdetail("Only roles with privileges of the \"%s\" role may set this option.",
285 : "pg_read_server_files")));
286 :
287 11 : if (strcmp(def->defname, "program") == 0 &&
288 LBC 0 : !has_privs_of_role(GetUserId(), ROLE_PG_EXECUTE_SERVER_PROGRAM))
289 0 : ereport(ERROR,
290 ECB : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
291 : errmsg("permission denied to set the \"%s\" option of a file_fdw foreign table",
292 : "program"),
293 : errdetail("Only roles with privileges of the \"%s\" role may set this option.",
294 : "pg_execute_server_program")));
295 :
296 GIC 11 : filename = defGetString(def);
297 : }
298 :
299 : /*
300 ECB : * force_not_null is a boolean option; after validation we can discard
301 EUB : * it - it will be retrieved later in get_file_fdw_attribute_options()
302 : */
303 GIC 85 : else if (strcmp(def->defname, "force_not_null") == 0)
304 : {
305 4 : if (force_not_null)
306 UIC 0 : ereport(ERROR,
307 : (errcode(ERRCODE_SYNTAX_ERROR),
308 : errmsg("conflicting or redundant options"),
309 ECB : errhint("Option \"force_not_null\" supplied more than once for a column.")));
310 GIC 4 : force_not_null = def;
311 : /* Don't care what the value is, as long as it's a legal boolean */
312 4 : (void) defGetBoolean(def);
313 : }
314 : /* See comments for force_not_null above */
315 81 : else if (strcmp(def->defname, "force_null") == 0)
316 ECB : {
317 GIC 4 : if (force_null)
318 LBC 0 : ereport(ERROR,
319 EUB : (errcode(ERRCODE_SYNTAX_ERROR),
320 : errmsg("conflicting or redundant options"),
321 : errhint("Option \"force_null\" supplied more than once for a column.")));
322 GIC 4 : force_null = def;
323 CBC 4 : (void) defGetBoolean(def);
324 : }
325 ECB : else
326 GIC 77 : other_options = lappend(other_options, def);
327 : }
328 ECB :
329 : /*
330 : * Now apply the core COPY code's validation logic for more checks.
331 EUB : */
332 GIC 43 : ProcessCopyOptions(NULL, NULL, true, other_options);
333 :
334 : /*
335 ECB : * Either filename or program option is required for file_fdw foreign
336 : * tables.
337 : */
338 GIC 23 : if (catalog == ForeignTableRelationId && filename == NULL)
339 CBC 1 : ereport(ERROR,
340 : (errcode(ERRCODE_FDW_DYNAMIC_PARAMETER_VALUE_NEEDED),
341 : errmsg("either filename or program is required for file_fdw foreign tables")));
342 :
343 GIC 22 : PG_RETURN_VOID();
344 : }
345 ECB :
346 : /*
347 : * Check if the provided option is one of the valid options.
348 : * context is the Oid of the catalog holding the object the option is for.
349 : */
350 : static bool
351 CBC 105 : is_valid_option(const char *option, Oid context)
352 ECB : {
353 : const struct FileFdwOption *opt;
354 :
355 GIC 574 : for (opt = valid_options; opt->optname; opt++)
356 ECB : {
357 GIC 566 : if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
358 97 : return true;
359 : }
360 8 : return false;
361 : }
362 :
363 : /*
364 ECB : * Fetch the options for a file_fdw foreign table.
365 : *
366 : * We have to separate out filename/program from the other options because
367 : * those must not appear in the options list passed to the core COPY code.
368 : */
369 : static void
370 CBC 67 : fileGetOptions(Oid foreigntableid,
371 ECB : char **filename, bool *is_program, List **other_options)
372 : {
373 : ForeignTable *table;
374 : ForeignServer *server;
375 : ForeignDataWrapper *wrapper;
376 : List *options;
377 : ListCell *lc;
378 :
379 : /*
380 : * Extract options from FDW objects. We ignore user mappings because
381 : * file_fdw doesn't have any options that can be specified there.
382 : *
383 : * (XXX Actually, given the current contents of valid_options[], there's
384 : * no point in examining anything except the foreign table's own options.
385 : * Simplify?)
386 : */
387 GIC 67 : table = GetForeignTable(foreigntableid);
388 67 : server = GetForeignServer(table->serverid);
389 67 : wrapper = GetForeignDataWrapper(server->fdwid);
390 :
391 67 : options = NIL;
392 67 : options = list_concat(options, wrapper->options);
393 67 : options = list_concat(options, server->options);
394 67 : options = list_concat(options, table->options);
395 67 : options = list_concat(options, get_file_fdw_attribute_options(foreigntableid));
396 :
397 : /*
398 : * Separate out the filename or program option (we assume there is only
399 : * one).
400 ECB : */
401 CBC 67 : *filename = NULL;
402 67 : *is_program = false;
403 GIC 134 : foreach(lc, options)
404 ECB : {
405 CBC 134 : DefElem *def = (DefElem *) lfirst(lc);
406 ECB :
407 CBC 134 : if (strcmp(def->defname, "filename") == 0)
408 ECB : {
409 GIC 67 : *filename = defGetString(def);
410 67 : options = foreach_delete_current(options, lc);
411 67 : break;
412 : }
413 67 : else if (strcmp(def->defname, "program") == 0)
414 ECB : {
415 LBC 0 : *filename = defGetString(def);
416 0 : *is_program = true;
417 UIC 0 : options = foreach_delete_current(options, lc);
418 LBC 0 : break;
419 : }
420 ECB : }
421 :
422 : /*
423 : * The validator should have checked that filename or program was included
424 : * in the options, but check again, just in case.
425 : */
426 CBC 67 : if (*filename == NULL)
427 UIC 0 : elog(ERROR, "either filename or program is required for file_fdw foreign tables");
428 EUB :
429 GBC 67 : *other_options = options;
430 67 : }
431 EUB :
432 : /*
433 : * Retrieve per-column generic options from pg_attribute and construct a list
434 : * of DefElems representing them.
435 : *
436 : * At the moment we only have "force_not_null", and "force_null",
437 : * which should each be combined into a single DefElem listing all such
438 : * columns, since that's what COPY expects.
439 ECB : */
440 EUB : static List *
441 GIC 67 : get_file_fdw_attribute_options(Oid relid)
442 ECB : {
443 : Relation rel;
444 : TupleDesc tupleDesc;
445 : AttrNumber natts;
446 : AttrNumber attnum;
447 GIC 67 : List *fnncolumns = NIL;
448 67 : List *fncolumns = NIL;
449 :
450 67 : List *options = NIL;
451 :
452 67 : rel = table_open(relid, AccessShareLock);
453 67 : tupleDesc = RelationGetDescr(rel);
454 CBC 67 : natts = tupleDesc->natts;
455 :
456 : /* Retrieve FDW options for all user-defined attributes. */
457 GIC 213 : for (attnum = 1; attnum <= natts; attnum++)
458 : {
459 146 : Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
460 : List *column_options;
461 ECB : ListCell *lc;
462 :
463 : /* Skip dropped attributes. */
464 GIC 146 : if (attr->attisdropped)
465 LBC 0 : continue;
466 ECB :
467 GNC 146 : column_options = GetForeignColumnOptions(relid, attnum);
468 162 : foreach(lc, column_options)
469 : {
470 CBC 16 : DefElem *def = (DefElem *) lfirst(lc);
471 :
472 16 : if (strcmp(def->defname, "force_not_null") == 0)
473 : {
474 GIC 8 : if (defGetBoolean(def))
475 : {
476 4 : char *attname = pstrdup(NameStr(attr->attname));
477 ECB :
478 GBC 4 : fnncolumns = lappend(fnncolumns, makeString(attname));
479 : }
480 ECB : }
481 CBC 8 : else if (strcmp(def->defname, "force_null") == 0)
482 : {
483 8 : if (defGetBoolean(def))
484 : {
485 4 : char *attname = pstrdup(NameStr(attr->attname));
486 :
487 4 : fncolumns = lappend(fncolumns, makeString(attname));
488 : }
489 ECB : }
490 : /* maybe in future handle other column options here */
491 : }
492 : }
493 :
494 CBC 67 : table_close(rel, AccessShareLock);
495 :
496 ECB : /*
497 : * Return DefElem only when some column(s) have force_not_null /
498 : * force_null options set
499 : */
500 CBC 67 : if (fnncolumns != NIL)
501 GIC 4 : options = lappend(options, makeDefElem("force_not_null", (Node *) fnncolumns, -1));
502 :
503 67 : if (fncolumns != NIL)
504 4 : options = lappend(options, makeDefElem("force_null", (Node *) fncolumns, -1));
505 :
506 67 : return options;
507 ECB : }
508 :
509 : /*
510 : * fileGetForeignRelSize
511 : * Obtain relation size estimates for a foreign table
512 : */
513 : static void
514 CBC 36 : fileGetForeignRelSize(PlannerInfo *root,
515 : RelOptInfo *baserel,
516 ECB : Oid foreigntableid)
517 : {
518 : FileFdwPlanState *fdw_private;
519 :
520 : /*
521 : * Fetch options. We only need filename (or program) at this point, but
522 : * we might as well get everything and not need to re-fetch it later in
523 : * planning.
524 : */
525 GIC 36 : fdw_private = (FileFdwPlanState *) palloc(sizeof(FileFdwPlanState));
526 36 : fileGetOptions(foreigntableid,
527 ECB : &fdw_private->filename,
528 : &fdw_private->is_program,
529 : &fdw_private->options);
530 GIC 36 : baserel->fdw_private = (void *) fdw_private;
531 :
532 : /* Estimate relation size */
533 36 : estimate_size(root, baserel, fdw_private);
534 36 : }
535 :
536 : /*
537 : * fileGetForeignPaths
538 ECB : * Create possible access paths for a scan on the foreign table
539 : *
540 : * Currently we don't support any push-down feature, so there is only one
541 : * possible access path, which simply returns all records in the order in
542 : * the data file.
543 : */
544 : static void
545 GIC 36 : fileGetForeignPaths(PlannerInfo *root,
546 ECB : RelOptInfo *baserel,
547 : Oid foreigntableid)
548 : {
549 GIC 36 : FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
550 : Cost startup_cost;
551 : Cost total_cost;
552 : List *columns;
553 36 : List *coptions = NIL;
554 :
555 : /* Decide whether to selectively perform binary conversion */
556 36 : if (check_selective_binary_conversion(baserel,
557 : foreigntableid,
558 ECB : &columns))
559 GIC 4 : coptions = list_make1(makeDefElem("convert_selectively",
560 : (Node *) columns, -1));
561 :
562 ECB : /* Estimate costs */
563 GIC 36 : estimate_costs(root, baserel, fdw_private,
564 : &startup_cost, &total_cost);
565 :
566 ECB : /*
567 : * Create a ForeignPath node and add it as only possible path. We use the
568 : * fdw_private list of the path to carry the convert_selectively option;
569 : * it will be propagated into the fdw_private list of the Plan node.
570 : *
571 : * We don't support pushing join clauses into the quals of this path, but
572 : * it could still have required parameterization due to LATERAL refs in
573 : * its tlist.
574 : */
575 GIC 36 : add_path(baserel, (Path *)
576 CBC 36 : create_foreignscan_path(root, baserel,
577 : NULL, /* default pathtarget */
578 : baserel->rows,
579 : startup_cost,
580 : total_cost,
581 : NIL, /* no pathkeys */
582 : baserel->lateral_relids,
583 : NULL, /* no extra plan */
584 : coptions));
585 :
586 : /*
587 : * If data file was sorted, and we knew it somehow, we could insert
588 ECB : * appropriate pathkeys into the ForeignPath node to tell the planner
589 : * that.
590 : */
591 GIC 36 : }
592 :
593 : /*
594 : * fileGetForeignPlan
595 : * Create a ForeignScan plan node for scanning the foreign table
596 : */
597 : static ForeignScan *
598 36 : fileGetForeignPlan(PlannerInfo *root,
599 : RelOptInfo *baserel,
600 : Oid foreigntableid,
601 : ForeignPath *best_path,
602 : List *tlist,
603 : List *scan_clauses,
604 ECB : Plan *outer_plan)
605 : {
606 GIC 36 : Index scan_relid = baserel->relid;
607 :
608 : /*
609 : * We have no native ability to evaluate restriction clauses, so we just
610 : * put all the scan_clauses into the plan node's qual list for the
611 ECB : * executor to check. So all we have to do here is strip RestrictInfo
612 : * nodes from the clauses and ignore pseudoconstants (which will be
613 : * handled elsewhere).
614 : */
615 GIC 36 : scan_clauses = extract_actual_clauses(scan_clauses, false);
616 :
617 : /* Create the ForeignScan node */
618 36 : return make_foreignscan(tlist,
619 ECB : scan_clauses,
620 : scan_relid,
621 : NIL, /* no expressions to evaluate */
622 : best_path->fdw_private,
623 : NIL, /* no custom tlist */
624 : NIL, /* no remote quals */
625 : outer_plan);
626 : }
627 :
628 : /*
629 : * fileExplainForeignScan
630 : * Produce extra output for EXPLAIN
631 : */
632 : static void
633 GIC 3 : fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
634 : {
635 : char *filename;
636 : bool is_program;
637 : List *options;
638 :
639 : /* Fetch options --- we only need filename and is_program at this point */
640 3 : fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
641 : &filename, &is_program, &options);
642 :
643 3 : if (is_program)
644 UIC 0 : ExplainPropertyText("Foreign Program", filename, es);
645 : else
646 CBC 3 : ExplainPropertyText("Foreign File", filename, es);
647 :
648 : /* Suppress file size if we're not showing cost details */
649 GIC 3 : if (es->costs)
650 : {
651 : struct stat stat_buf;
652 :
653 LBC 0 : if (!is_program &&
654 UIC 0 : stat(filename, &stat_buf) == 0)
655 0 : ExplainPropertyInteger("Foreign File Size", "b",
656 LBC 0 : (int64) stat_buf.st_size, es);
657 EUB : }
658 GIC 3 : }
659 ECB :
660 : /*
661 : * fileBeginForeignScan
662 : * Initiate access to the file by creating CopyState
663 : */
664 : static void
665 GIC 31 : fileBeginForeignScan(ForeignScanState *node, int eflags)
666 EUB : {
667 GBC 31 : ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
668 EUB : char *filename;
669 : bool is_program;
670 : List *options;
671 ECB : CopyFromState cstate;
672 : FileFdwExecutionState *festate;
673 :
674 : /*
675 : * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
676 : */
677 GIC 31 : if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
678 CBC 3 : return;
679 :
680 ECB : /* Fetch options of foreign table */
681 GIC 28 : fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
682 : &filename, &is_program, &options);
683 :
684 : /* Add any options from the plan (currently only convert_selectively) */
685 28 : options = list_concat(options, plan->fdw_private);
686 :
687 : /*
688 : * Create CopyState from FDW options. We always acquire all columns, so
689 : * as to match the expected ScanTupleSlot signature.
690 ECB : */
691 CBC 28 : cstate = BeginCopyFrom(NULL,
692 : node->ss.ss_currentRelation,
693 : NULL,
694 ECB : filename,
695 : is_program,
696 : NULL,
697 : NIL,
698 : options);
699 :
700 : /*
701 : * Save state in node->fdw_state. We must save enough information to call
702 : * BeginCopyFrom() again.
703 : */
704 CBC 27 : festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
705 GIC 27 : festate->filename = filename;
706 27 : festate->is_program = is_program;
707 27 : festate->options = options;
708 27 : festate->cstate = cstate;
709 :
710 27 : node->fdw_state = (void *) festate;
711 : }
712 :
713 : /*
714 : * fileIterateForeignScan
715 : * Read next record from the data file and store it into the
716 : * ScanTupleSlot as a virtual tuple
717 ECB : */
718 : static TupleTableSlot *
719 CBC 111 : fileIterateForeignScan(ForeignScanState *node)
720 ECB : {
721 CBC 111 : FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
722 GNC 111 : EState *estate = CreateExecutorState();
723 : ExprContext *econtext;
724 : MemoryContext oldcontext;
725 GIC 111 : TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
726 ECB : bool found;
727 : ErrorContextCallback errcallback;
728 :
729 : /* Set up callback to identify error line number. */
730 GIC 111 : errcallback.callback = CopyFromErrorCallback;
731 111 : errcallback.arg = (void *) festate->cstate;
732 111 : errcallback.previous = error_context_stack;
733 111 : error_context_stack = &errcallback;
734 :
735 ECB : /*
736 : * The protocol for loading a virtual tuple into a slot is first
737 : * ExecClearTuple, then fill the values/isnull arrays, then
738 : * ExecStoreVirtualTuple. If we don't find another row in the file, we
739 : * just skip the last step, leaving the slot empty as required.
740 : *
741 : * We pass ExprContext because there might be a use of the DEFAULT option
742 : * in COPY FROM, so we may need to evaluate default expressions.
743 : */
744 GIC 111 : ExecClearTuple(slot);
745 GNC 111 : econtext = GetPerTupleExprContext(estate);
746 :
747 : /*
748 : * DEFAULT expressions need to be evaluated in a per-tuple context, so
749 : * switch in case we are doing that.
750 : */
751 111 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
752 111 : found = NextCopyFrom(festate->cstate, econtext,
753 ECB : slot->tts_values, slot->tts_isnull);
754 CBC 109 : if (found)
755 82 : ExecStoreVirtualTuple(slot);
756 ECB :
757 : /* Switch back to original memory context */
758 GNC 109 : MemoryContextSwitchTo(oldcontext);
759 :
760 : /* Remove error callback. */
761 GIC 109 : error_context_stack = errcallback.previous;
762 :
763 109 : return slot;
764 : }
765 :
766 : /*
767 : * fileReScanForeignScan
768 : * Rescan table, possibly with new parameters
769 : */
770 ECB : static void
771 CBC 3 : fileReScanForeignScan(ForeignScanState *node)
772 : {
773 GIC 3 : FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
774 :
775 3 : EndCopyFrom(festate->cstate);
776 :
777 CBC 6 : festate->cstate = BeginCopyFrom(NULL,
778 ECB : node->ss.ss_currentRelation,
779 : NULL,
780 CBC 3 : festate->filename,
781 3 : festate->is_program,
782 : NULL,
783 : NIL,
784 ECB : festate->options);
785 GIC 3 : }
786 :
787 ECB : /*
788 : * fileEndForeignScan
789 : * Finish scanning foreign table and dispose objects used for this scan
790 : */
791 : static void
792 GIC 28 : fileEndForeignScan(ForeignScanState *node)
793 : {
794 28 : FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
795 :
796 : /* if festate is NULL, we are in EXPLAIN; nothing to do */
797 CBC 28 : if (festate)
798 GIC 25 : EndCopyFrom(festate->cstate);
799 CBC 28 : }
800 :
801 ECB : /*
802 : * fileAnalyzeForeignTable
803 : * Test whether analyzing this foreign table is supported
804 : */
805 : static bool
806 LBC 0 : fileAnalyzeForeignTable(Relation relation,
807 ECB : AcquireSampleRowsFunc *func,
808 : BlockNumber *totalpages)
809 : {
810 : char *filename;
811 : bool is_program;
812 : List *options;
813 : struct stat stat_buf;
814 :
815 : /* Fetch options of foreign table */
816 UIC 0 : fileGetOptions(RelationGetRelid(relation), &filename, &is_program, &options);
817 :
818 ECB : /*
819 : * If this is a program instead of a file, just return false to skip
820 : * analyzing the table. We could run the program and collect stats on
821 : * whatever it currently returns, but it seems likely that in such cases
822 : * the output would be too volatile for the stats to be useful. Maybe
823 : * there should be an option to enable doing this?
824 : */
825 LBC 0 : if (is_program)
826 UIC 0 : return false;
827 :
828 : /*
829 : * Get size of the file. (XXX if we fail here, would it be better to just
830 : * return false to skip analyzing the table?)
831 : */
832 UBC 0 : if (stat(filename, &stat_buf) < 0)
833 UIC 0 : ereport(ERROR,
834 : (errcode_for_file_access(),
835 : errmsg("could not stat file \"%s\": %m",
836 : filename)));
837 :
838 : /*
839 : * Convert size to pages. Must return at least 1 so that we can tell
840 : * later on that pg_class.relpages is not default.
841 : */
842 UBC 0 : *totalpages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
843 UIC 0 : if (*totalpages < 1)
844 0 : *totalpages = 1;
845 :
846 0 : *func = file_acquire_sample_rows;
847 :
848 0 : return true;
849 : }
850 :
851 EUB : /*
852 : * fileIsForeignScanParallelSafe
853 : * Reading a file, or external program, in a parallel worker should work
854 : * just the same as reading it in the leader, so mark scans safe.
855 : */
856 : static bool
857 GIC 32 : fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
858 EUB : RangeTblEntry *rte)
859 : {
860 GIC 32 : return true;
861 : }
862 :
863 : /*
864 : * check_selective_binary_conversion
865 : *
866 : * Check to see if it's useful to convert only a subset of the file's columns
867 : * to binary. If so, construct a list of the column names to be converted,
868 EUB : * return that at *columns, and return true. (Note that it's possible to
869 : * determine that no columns need be converted, for instance with a COUNT(*)
870 : * query. So we can't use returning a NIL list to indicate failure.)
871 : */
872 : static bool
873 GIC 36 : check_selective_binary_conversion(RelOptInfo *baserel,
874 EUB : Oid foreigntableid,
875 : List **columns)
876 : {
877 : ForeignTable *table;
878 : ListCell *lc;
879 : Relation rel;
880 : TupleDesc tupleDesc;
881 : int attidx;
882 GIC 36 : Bitmapset *attrs_used = NULL;
883 CBC 36 : bool has_wholerow = false;
884 : int numattrs;
885 : int i;
886 ECB :
887 GIC 36 : *columns = NIL; /* default result */
888 :
889 : /*
890 : * Check format of the file. If binary format, this is irrelevant.
891 : */
892 36 : table = GetForeignTable(foreigntableid);
893 36 : foreach(lc, table->options)
894 : {
895 36 : DefElem *def = (DefElem *) lfirst(lc);
896 :
897 36 : if (strcmp(def->defname, "format") == 0)
898 : {
899 CBC 36 : char *format = defGetString(def);
900 :
901 GIC 36 : if (strcmp(format, "binary") == 0)
902 UIC 0 : return false;
903 GIC 36 : break;
904 : }
905 : }
906 :
907 : /* Collect all the attributes needed for joins or final output. */
908 CBC 36 : pull_varattnos((Node *) baserel->reltarget->exprs, baserel->relid,
909 ECB : &attrs_used);
910 :
911 : /* Add all the attributes used by restriction clauses. */
912 GIC 44 : foreach(lc, baserel->baserestrictinfo)
913 ECB : {
914 GIC 8 : RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
915 :
916 8 : pull_varattnos((Node *) rinfo->clause, baserel->relid,
917 : &attrs_used);
918 ECB : }
919 :
920 : /* Convert attribute numbers to column names. */
921 CBC 36 : rel = table_open(foreigntableid, AccessShareLock);
922 GIC 36 : tupleDesc = RelationGetDescr(rel);
923 ECB :
924 GNC 36 : attidx = -1;
925 115 : while ((attidx = bms_next_member(attrs_used, attidx)) >= 0)
926 ECB : {
927 : /* attidx is zero-based, attnum is the normal attribute number */
928 GNC 83 : AttrNumber attnum = attidx + FirstLowInvalidHeapAttributeNumber;
929 EUB :
930 CBC 83 : if (attnum == 0)
931 : {
932 GIC 4 : has_wholerow = true;
933 4 : break;
934 : }
935 ECB :
936 : /* Ignore system attributes. */
937 GIC 79 : if (attnum < 0)
938 13 : continue;
939 ECB :
940 : /* Get user attributes. */
941 CBC 66 : if (attnum > 0)
942 : {
943 66 : Form_pg_attribute attr = TupleDescAttr(tupleDesc, attnum - 1);
944 GIC 66 : char *attname = NameStr(attr->attname);
945 :
946 : /* Skip dropped attributes (probably shouldn't see any here). */
947 66 : if (attr->attisdropped)
948 LBC 0 : continue;
949 ECB :
950 : /*
951 : * Skip generated columns (COPY won't accept them in the column
952 : * list)
953 : */
954 GIC 66 : if (attr->attgenerated)
955 CBC 1 : continue;
956 GIC 65 : *columns = lappend(*columns, makeString(pstrdup(attname)));
957 ECB : }
958 : }
959 :
960 : /* Count non-dropped user attributes while we have the tupdesc. */
961 GIC 36 : numattrs = 0;
962 114 : for (i = 0; i < tupleDesc->natts; i++)
963 : {
964 CBC 78 : Form_pg_attribute attr = TupleDescAttr(tupleDesc, i);
965 ECB :
966 GIC 78 : if (attr->attisdropped)
967 UIC 0 : continue;
968 CBC 78 : numattrs++;
969 : }
970 ECB :
971 CBC 36 : table_close(rel, AccessShareLock);
972 :
973 : /* If there's a whole-row reference, fail: we need all the columns. */
974 36 : if (has_wholerow)
975 EUB : {
976 GIC 4 : *columns = NIL;
977 4 : return false;
978 : }
979 :
980 : /* If all the user attributes are needed, fail. */
981 CBC 32 : if (numattrs == list_length(*columns))
982 ECB : {
983 CBC 28 : *columns = NIL;
984 GIC 28 : return false;
985 : }
986 :
987 4 : return true;
988 ECB : }
989 :
990 : /*
991 : * Estimate size of a foreign table.
992 : *
993 : * The main result is returned in baserel->rows. We also set
994 EUB : * fdw_private->pages and fdw_private->ntuples for later use in the cost
995 ECB : * calculation.
996 : */
997 : static void
998 CBC 36 : estimate_size(PlannerInfo *root, RelOptInfo *baserel,
999 : FileFdwPlanState *fdw_private)
1000 : {
1001 ECB : struct stat stat_buf;
1002 : BlockNumber pages;
1003 : double ntuples;
1004 : double nrows;
1005 :
1006 : /*
1007 : * Get size of the file. It might not be there at plan time, though, in
1008 : * which case we have to use a default estimate. We also have to fall
1009 : * back to the default if using a program as the input.
1010 : */
1011 CBC 36 : if (fdw_private->is_program || stat(fdw_private->filename, &stat_buf) < 0)
1012 UIC 0 : stat_buf.st_size = 10 * BLCKSZ;
1013 :
1014 ECB : /*
1015 : * Convert size to pages for use in I/O cost estimate later.
1016 : */
1017 GIC 36 : pages = (stat_buf.st_size + (BLCKSZ - 1)) / BLCKSZ;
1018 36 : if (pages < 1)
1019 UIC 0 : pages = 1;
1020 GIC 36 : fdw_private->pages = pages;
1021 :
1022 : /*
1023 : * Estimate the number of tuples in the file.
1024 : */
1025 CBC 36 : if (baserel->tuples >= 0 && baserel->pages > 0)
1026 UIC 0 : {
1027 : /*
1028 : * We have # of pages and # of tuples from pg_class (that is, from a
1029 : * previous ANALYZE), so compute a tuples-per-page estimate and scale
1030 : * that by the current file size.
1031 : */
1032 : double density;
1033 :
1034 0 : density = baserel->tuples / (double) baserel->pages;
1035 0 : ntuples = clamp_row_est(density * (double) pages);
1036 : }
1037 : else
1038 ECB : {
1039 EUB : /*
1040 : * Otherwise we have to fake it. We back into this estimate using the
1041 : * planner's idea of the relation width; which is bogus if not all
1042 : * columns are being read, not to mention that the text representation
1043 : * of a row probably isn't the same size as its internal
1044 ECB : * representation. Possibly we could do something better, but the
1045 : * real answer to anyone who complains is "ANALYZE" ...
1046 EUB : */
1047 ECB : int tuple_width;
1048 :
1049 GIC 36 : tuple_width = MAXALIGN(baserel->reltarget->width) +
1050 : MAXALIGN(SizeofHeapTupleHeader);
1051 36 : ntuples = clamp_row_est((double) stat_buf.st_size /
1052 CBC 36 : (double) tuple_width);
1053 EUB : }
1054 GIC 36 : fdw_private->ntuples = ntuples;
1055 :
1056 : /*
1057 : * Now estimate the number of rows returned by the scan after applying the
1058 : * baserestrictinfo quals.
1059 : */
1060 36 : nrows = ntuples *
1061 GBC 36 : clauselist_selectivity(root,
1062 EUB : baserel->baserestrictinfo,
1063 : 0,
1064 : JOIN_INNER,
1065 : NULL);
1066 :
1067 GIC 36 : nrows = clamp_row_est(nrows);
1068 :
1069 : /* Save the output-rows estimate for the planner */
1070 36 : baserel->rows = nrows;
1071 36 : }
1072 :
1073 : /*
1074 : * Estimate costs of scanning a foreign table.
1075 : *
1076 ECB : * Results are returned in *startup_cost and *total_cost.
1077 : */
1078 : static void
1079 CBC 36 : estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
1080 : FileFdwPlanState *fdw_private,
1081 ECB : Cost *startup_cost, Cost *total_cost)
1082 : {
1083 GIC 36 : BlockNumber pages = fdw_private->pages;
1084 36 : double ntuples = fdw_private->ntuples;
1085 36 : Cost run_cost = 0;
1086 : Cost cpu_per_tuple;
1087 ECB :
1088 : /*
1089 : * We estimate costs almost the same way as cost_seqscan(), thus assuming
1090 : * that I/O costs are equivalent to a regular table file of the same size.
1091 : * However, we take per-tuple CPU costs as 10x of a seqscan, to account
1092 : * for the cost of parsing records.
1093 : *
1094 : * In the case of a program source, this calculation is even more divorced
1095 : * from reality, but we have no good alternative; and it's not clear that
1096 : * the numbers we produce here matter much anyway, since there's only one
1097 : * access path for the rel.
1098 : */
1099 GIC 36 : run_cost += seq_page_cost * pages;
1100 :
1101 36 : *startup_cost = baserel->baserestrictcost.startup;
1102 36 : cpu_per_tuple = cpu_tuple_cost * 10 + baserel->baserestrictcost.per_tuple;
1103 36 : run_cost += cpu_per_tuple * ntuples;
1104 36 : *total_cost = *startup_cost + run_cost;
1105 36 : }
1106 ECB :
1107 : /*
1108 : * file_acquire_sample_rows -- acquire a random sample of rows from the table
1109 : *
1110 : * Selected rows are returned in the caller-allocated array rows[],
1111 : * which must have at least targrows entries.
1112 : * The actual number of rows selected is returned as the function result.
1113 : * We also count the total number of rows in the file and return it into
1114 : * *totalrows. Note that *totaldeadrows is always set to 0.
1115 : *
1116 : * Note that the returned list of rows is not always in order by physical
1117 : * position in the file. Therefore, correlation estimates derived later
1118 : * may be meaningless, but it's OK because we don't use the estimates
1119 : * currently (the planner only pays attention to correlation for indexscans).
1120 : */
1121 : static int
1122 UIC 0 : file_acquire_sample_rows(Relation onerel, int elevel,
1123 : HeapTuple *rows, int targrows,
1124 : double *totalrows, double *totaldeadrows)
1125 : {
1126 LBC 0 : int numrows = 0;
1127 UIC 0 : double rowstoskip = -1; /* -1 means not set yet */
1128 ECB : ReservoirStateData rstate;
1129 : TupleDesc tupDesc;
1130 : Datum *values;
1131 : bool *nulls;
1132 : bool found;
1133 : char *filename;
1134 : bool is_program;
1135 : List *options;
1136 : CopyFromState cstate;
1137 : ErrorContextCallback errcallback;
1138 UIC 0 : MemoryContext oldcontext = CurrentMemoryContext;
1139 : MemoryContext tupcontext;
1140 :
1141 0 : Assert(onerel);
1142 0 : Assert(targrows > 0);
1143 :
1144 0 : tupDesc = RelationGetDescr(onerel);
1145 0 : values = (Datum *) palloc(tupDesc->natts * sizeof(Datum));
1146 0 : nulls = (bool *) palloc(tupDesc->natts * sizeof(bool));
1147 :
1148 : /* Fetch options of foreign table */
1149 UBC 0 : fileGetOptions(RelationGetRelid(onerel), &filename, &is_program, &options);
1150 :
1151 : /*
1152 : * Create CopyState from FDW options.
1153 EUB : */
1154 UBC 0 : cstate = BeginCopyFrom(NULL, onerel, NULL, filename, is_program, NULL, NIL,
1155 : options);
1156 :
1157 : /*
1158 : * Use per-tuple memory context to prevent leak of memory used to read
1159 : * rows from the file with Copy routines.
1160 : */
1161 UIC 0 : tupcontext = AllocSetContextCreate(CurrentMemoryContext,
1162 : "file_fdw temporary context",
1163 : ALLOCSET_DEFAULT_SIZES);
1164 :
1165 EUB : /* Prepare for sampling rows */
1166 UIC 0 : reservoir_init_selection_state(&rstate, targrows);
1167 :
1168 EUB : /* Set up callback to identify error line number. */
1169 UBC 0 : errcallback.callback = CopyFromErrorCallback;
1170 UIC 0 : errcallback.arg = (void *) cstate;
1171 UBC 0 : errcallback.previous = error_context_stack;
1172 0 : error_context_stack = &errcallback;
1173 EUB :
1174 UIC 0 : *totalrows = 0;
1175 0 : *totaldeadrows = 0;
1176 EUB : for (;;)
1177 : {
1178 : /* Check for user-requested abort or sleep */
1179 UIC 0 : vacuum_delay_point();
1180 :
1181 EUB : /* Fetch next row */
1182 UIC 0 : MemoryContextReset(tupcontext);
1183 0 : MemoryContextSwitchTo(tupcontext);
1184 :
1185 0 : found = NextCopyFrom(cstate, NULL, values, nulls);
1186 :
1187 0 : MemoryContextSwitchTo(oldcontext);
1188 EUB :
1189 UIC 0 : if (!found)
1190 0 : break;
1191 :
1192 : /*
1193 EUB : * The first targrows sample rows are simply copied into the
1194 : * reservoir. Then we start replacing tuples in the sample until we
1195 : * reach the end of the relation. This algorithm is from Jeff Vitter's
1196 : * paper (see more info in commands/analyze.c).
1197 : */
1198 UBC 0 : if (numrows < targrows)
1199 EUB : {
1200 UIC 0 : rows[numrows++] = heap_form_tuple(tupDesc, values, nulls);
1201 EUB : }
1202 : else
1203 : {
1204 : /*
1205 : * t in Vitter's paper is the number of records already processed.
1206 : * If we need to compute a new S value, we must use the
1207 : * not-yet-incremented value of totalrows as t.
1208 : */
1209 UBC 0 : if (rowstoskip < 0)
1210 0 : rowstoskip = reservoir_get_next_S(&rstate, *totalrows, targrows);
1211 :
1212 0 : if (rowstoskip <= 0)
1213 : {
1214 EUB : /*
1215 : * Found a suitable tuple, so save it, replacing one old tuple
1216 : * at random
1217 : */
1218 UIC 0 : int k = (int) (targrows * sampler_random_fract(&rstate.randstate));
1219 :
1220 0 : Assert(k >= 0 && k < targrows);
1221 0 : heap_freetuple(rows[k]);
1222 0 : rows[k] = heap_form_tuple(tupDesc, values, nulls);
1223 : }
1224 :
1225 UBC 0 : rowstoskip -= 1;
1226 : }
1227 EUB :
1228 UIC 0 : *totalrows += 1;
1229 : }
1230 :
1231 : /* Remove error callback. */
1232 0 : error_context_stack = errcallback.previous;
1233 :
1234 : /* Clean up. */
1235 0 : MemoryContextDelete(tupcontext);
1236 EUB :
1237 UBC 0 : EndCopyFrom(cstate);
1238 :
1239 0 : pfree(values);
1240 UIC 0 : pfree(nulls);
1241 :
1242 : /*
1243 : * Emit some interesting relation info
1244 : */
1245 UBC 0 : ereport(elevel,
1246 : (errmsg("\"%s\": file contains %.0f rows; "
1247 EUB : "%d rows in sample",
1248 : RelationGetRelationName(onerel),
1249 : *totalrows, numrows)));
1250 :
1251 UIC 0 : return numrows;
1252 EUB : }
|