Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyto.c
4 : * COPY <table> TO file/program/client
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : * Portions Copyright (c) 1994, Regents of the University of California
8 : *
9 : *
10 : * IDENTIFICATION
11 : * src/backend/commands/copyto.c
12 : *
13 : *-------------------------------------------------------------------------
14 : */
15 : #include "postgres.h"
16 :
17 : #include <ctype.h>
18 : #include <unistd.h>
19 : #include <sys/stat.h>
20 :
21 : #include "access/heapam.h"
22 : #include "access/htup_details.h"
23 : #include "access/tableam.h"
24 : #include "access/xact.h"
25 : #include "access/xlog.h"
26 : #include "commands/copy.h"
27 : #include "commands/progress.h"
28 : #include "executor/execdesc.h"
29 : #include "executor/executor.h"
30 : #include "executor/tuptable.h"
31 : #include "libpq/libpq.h"
32 : #include "libpq/pqformat.h"
33 : #include "mb/pg_wchar.h"
34 : #include "miscadmin.h"
35 : #include "optimizer/optimizer.h"
36 : #include "pgstat.h"
37 : #include "rewrite/rewriteHandler.h"
38 : #include "storage/fd.h"
39 : #include "tcop/tcopprot.h"
40 : #include "utils/lsyscache.h"
41 : #include "utils/memutils.h"
42 : #include "utils/partcache.h"
43 : #include "utils/rel.h"
44 : #include "utils/snapmgr.h"
45 :
46 : /*
47 : * Represents the different dest cases we need to worry about at
48 : * the bottom level
49 : */
50 : typedef enum CopyDest
51 : {
52 : COPY_FILE, /* to file (or a piped program) */
53 : COPY_FRONTEND, /* to frontend */
54 : COPY_CALLBACK /* to callback function */
55 : } CopyDest;
56 :
57 : /*
58 : * This struct contains all the state variables used throughout a COPY TO
59 : * operation.
60 : *
61 : * Multi-byte encodings: all supported client-side encodings encode multi-byte
62 : * characters by having the first byte's high bit set. Subsequent bytes of the
63 : * character can have the high bit not set. When scanning data in such an
64 : * encoding to look for a match to a single-byte (ie ASCII) character, we must
65 : * use the full pg_encoding_mblen() machinery to skip over multibyte
66 : * characters, else we might find a false match to a trailing byte. In
67 : * supported server encodings, there is no possibility of a false match, and
68 : * it's faster to make useless comparisons to trailing bytes than it is to
69 : * invoke pg_encoding_mblen() to skip over them. encoding_embeds_ascii is true
70 : * when we have to do it the hard way.
71 : */
72 : typedef struct CopyToStateData
73 : {
74 : /* low-level state data */
75 : CopyDest copy_dest; /* type of copy source/destination */
76 : FILE *copy_file; /* used if copy_dest == COPY_FILE */
77 : StringInfo fe_msgbuf; /* used for all dests during COPY TO */
78 :
79 : int file_encoding; /* file or remote side's character encoding */
80 : bool need_transcoding; /* file encoding diff from server? */
81 : bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
82 :
83 : /* parameters from the COPY command */
84 : Relation rel; /* relation to copy to */
85 : QueryDesc *queryDesc; /* executable query to copy from */
86 : List *attnumlist; /* integer list of attnums to copy */
87 : char *filename; /* filename, or NULL for STDOUT */
88 : bool is_program; /* is 'filename' a program to popen? */
89 : copy_data_dest_cb data_dest_cb; /* function for writing data */
90 :
91 : CopyFormatOptions opts;
92 : Node *whereClause; /* WHERE condition (or NULL) */
93 :
94 : /*
95 : * Working state
96 : */
97 : MemoryContext copycontext; /* per-copy execution context */
98 :
99 : FmgrInfo *out_functions; /* lookup info for output functions */
100 : MemoryContext rowcontext; /* per-row evaluation context */
101 : uint64 bytes_processed; /* number of bytes processed so far */
102 : } CopyToStateData;
103 :
104 : /* DestReceiver for COPY (query) TO */
105 : typedef struct
106 : {
107 : DestReceiver pub; /* publicly-known function pointers */
108 : CopyToState cstate; /* CopyToStateData for the command */
109 : uint64 processed; /* # of tuples processed */
110 : } DR_copy;
111 :
112 : /* NOTE: there's a copy of this in copyfromparse.c */
113 : static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
114 :
115 :
116 : /* non-export function prototypes */
117 : static void EndCopy(CopyToState cstate);
118 : static void ClosePipeToProgram(CopyToState cstate);
119 : static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
120 : static void CopyAttributeOutText(CopyToState cstate, const char *string);
121 : static void CopyAttributeOutCSV(CopyToState cstate, const char *string,
122 : bool use_quote, bool single_attr);
123 :
124 : /* Low-level communications functions */
125 : static void SendCopyBegin(CopyToState cstate);
126 : static void SendCopyEnd(CopyToState cstate);
127 : static void CopySendData(CopyToState cstate, const void *databuf, int datasize);
128 : static void CopySendString(CopyToState cstate, const char *str);
129 : static void CopySendChar(CopyToState cstate, char c);
130 : static void CopySendEndOfRow(CopyToState cstate);
131 : static void CopySendInt32(CopyToState cstate, int32 val);
132 : static void CopySendInt16(CopyToState cstate, int16 val);
133 :
134 :
135 : /*
136 : * Send copy start/stop messages for frontend copies. These have changed
137 : * in past protocol redesigns.
138 : */
139 : static void
867 heikki.linnakangas 140 GIC 3381 : SendCopyBegin(CopyToState cstate)
141 : {
766 heikki.linnakangas 142 ECB : StringInfoData buf;
766 heikki.linnakangas 143 GIC 3381 : int natts = list_length(cstate->attnumlist);
144 3381 : int16 format = (cstate->opts.binary ? 1 : 0);
766 heikki.linnakangas 145 ECB : int i;
146 :
766 heikki.linnakangas 147 GIC 3381 : pq_beginmessage(&buf, 'H');
148 3381 : pq_sendbyte(&buf, format); /* overall format */
766 heikki.linnakangas 149 CBC 3381 : pq_sendint16(&buf, natts);
150 15705 : for (i = 0; i < natts; i++)
151 12324 : pq_sendint16(&buf, format); /* per-column formats */
152 3381 : pq_endmessage(&buf);
153 3381 : cstate->copy_dest = COPY_FRONTEND;
867 154 3381 : }
867 heikki.linnakangas 155 ECB :
156 : static void
867 heikki.linnakangas 157 GIC 3380 : SendCopyEnd(CopyToState cstate)
158 : {
766 heikki.linnakangas 159 ECB : /* Shouldn't have any unsent data */
766 heikki.linnakangas 160 GIC 3380 : Assert(cstate->fe_msgbuf->len == 0);
161 : /* Send Copy Done message */
766 heikki.linnakangas 162 CBC 3380 : pq_putemptymessage('c');
867 heikki.linnakangas 163 GIC 3380 : }
867 heikki.linnakangas 164 ECB :
165 : /*----------
166 : * CopySendData sends output data to the destination (file or frontend)
167 : * CopySendString does the same for null-terminated strings
168 : * CopySendChar does the same for single characters
169 : * CopySendEndOfRow does the appropriate thing at end of each data row
170 : * (data is not actually flushed except by CopySendEndOfRow)
171 : *
172 : * NB: no data conversion is applied by these functions
173 : *----------
174 : */
175 : static void
867 heikki.linnakangas 176 GIC 4771190 : CopySendData(CopyToState cstate, const void *databuf, int datasize)
177 : {
867 heikki.linnakangas 178 CBC 4771190 : appendBinaryStringInfo(cstate->fe_msgbuf, databuf, datasize);
867 heikki.linnakangas 179 GIC 4771190 : }
867 heikki.linnakangas 180 ECB :
181 : static void
867 heikki.linnakangas 182 GIC 258129 : CopySendString(CopyToState cstate, const char *str)
183 : {
867 heikki.linnakangas 184 CBC 258129 : appendBinaryStringInfo(cstate->fe_msgbuf, str, strlen(str));
867 heikki.linnakangas 185 GIC 258129 : }
867 heikki.linnakangas 186 ECB :
187 : static void
867 heikki.linnakangas 188 GIC 5038063 : CopySendChar(CopyToState cstate, char c)
189 : {
867 heikki.linnakangas 190 CBC 5038063 : appendStringInfoCharMacro(cstate->fe_msgbuf, c);
867 heikki.linnakangas 191 GIC 5038063 : }
867 heikki.linnakangas 192 ECB :
193 : static void
867 heikki.linnakangas 194 GIC 1712439 : CopySendEndOfRow(CopyToState cstate)
195 : {
867 heikki.linnakangas 196 CBC 1712439 : StringInfo fe_msgbuf = cstate->fe_msgbuf;
197 :
198 1712439 : switch (cstate->copy_dest)
199 : {
200 6129 : case COPY_FILE:
867 heikki.linnakangas 201 GIC 6129 : if (!cstate->opts.binary)
867 heikki.linnakangas 202 ECB : {
203 : /* Default line termination depends on platform */
204 : #ifndef WIN32
867 heikki.linnakangas 205 GIC 6117 : CopySendChar(cstate, '\n');
206 : #else
867 heikki.linnakangas 207 ECB : CopySendString(cstate, "\r\n");
208 : #endif
209 : }
210 :
867 heikki.linnakangas 211 GIC 6129 : if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
212 6129 : cstate->copy_file) != 1 ||
867 heikki.linnakangas 213 CBC 6129 : ferror(cstate->copy_file))
867 heikki.linnakangas 214 ECB : {
867 heikki.linnakangas 215 LBC 0 : if (cstate->is_program)
216 : {
867 heikki.linnakangas 217 UBC 0 : if (errno == EPIPE)
218 : {
867 heikki.linnakangas 219 EUB : /*
220 : * The pipe will be closed automatically on error at
221 : * the end of transaction, but we might get a better
222 : * error message from the subprocess' exit code than
223 : * just "Broken Pipe"
224 : */
867 heikki.linnakangas 225 UIC 0 : ClosePipeToProgram(cstate);
226 :
867 heikki.linnakangas 227 EUB : /*
228 : * If ClosePipeToProgram() didn't throw an error, the
229 : * program terminated normally, but closed the pipe
230 : * first. Restore errno, and throw an error.
231 : */
867 heikki.linnakangas 232 UIC 0 : errno = EPIPE;
233 : }
867 heikki.linnakangas 234 UBC 0 : ereport(ERROR,
235 : (errcode_for_file_access(),
867 heikki.linnakangas 236 EUB : errmsg("could not write to COPY program: %m")));
237 : }
238 : else
867 heikki.linnakangas 239 UIC 0 : ereport(ERROR,
240 : (errcode_for_file_access(),
867 heikki.linnakangas 241 EUB : errmsg("could not write to COPY file: %m")));
242 : }
867 heikki.linnakangas 243 GIC 6129 : break;
766 244 1706307 : case COPY_FRONTEND:
867 heikki.linnakangas 245 ECB : /* The FE/BE protocol uses \n as newline for all platforms */
867 heikki.linnakangas 246 CBC 1706307 : if (!cstate->opts.binary)
867 heikki.linnakangas 247 GIC 1706296 : CopySendChar(cstate, '\n');
867 heikki.linnakangas 248 ECB :
249 : /* Dump the accumulated row as one CopyData message */
867 heikki.linnakangas 250 GIC 1706307 : (void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
251 1706307 : break;
180 michael 252 GNC 3 : case COPY_CALLBACK:
253 3 : cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
254 3 : break;
867 heikki.linnakangas 255 ECB : }
256 :
823 tomas.vondra 257 : /* Update the progress */
823 tomas.vondra 258 CBC 1712439 : cstate->bytes_processed += fe_msgbuf->len;
259 1712439 : pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, cstate->bytes_processed);
260 :
867 heikki.linnakangas 261 GIC 1712439 : resetStringInfo(fe_msgbuf);
262 1712439 : }
867 heikki.linnakangas 263 ECB :
264 : /*
265 : * These functions do apply some data conversion
266 : */
267 :
268 : /*
269 : * CopySendInt32 sends an int32 in network byte order
270 : */
271 : static inline void
867 heikki.linnakangas 272 GIC 94 : CopySendInt32(CopyToState cstate, int32 val)
273 : {
274 : uint32 buf;
275 :
276 94 : buf = pg_hton32((uint32) val);
867 heikki.linnakangas 277 CBC 94 : CopySendData(cstate, &buf, sizeof(buf));
867 heikki.linnakangas 278 GIC 94 : }
279 :
280 : /*
867 heikki.linnakangas 281 ECB : * CopySendInt16 sends an int16 in network byte order
282 : */
283 : static inline void
867 heikki.linnakangas 284 GIC 23 : CopySendInt16(CopyToState cstate, int16 val)
285 : {
286 : uint16 buf;
287 :
288 23 : buf = pg_hton16((uint16) val);
867 heikki.linnakangas 289 CBC 23 : CopySendData(cstate, &buf, sizeof(buf));
867 heikki.linnakangas 290 GIC 23 : }
291 :
292 : /*
867 heikki.linnakangas 293 ECB : * Closes the pipe to an external program, checking the pclose() return code.
294 : */
295 : static void
867 heikki.linnakangas 296 UIC 0 : ClosePipeToProgram(CopyToState cstate)
297 : {
298 : int pclose_rc;
299 :
300 0 : Assert(cstate->is_program);
867 heikki.linnakangas 301 EUB :
867 heikki.linnakangas 302 UIC 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
303 0 : if (pclose_rc == -1)
304 0 : ereport(ERROR,
867 heikki.linnakangas 305 EUB : (errcode_for_file_access(),
306 : errmsg("could not close pipe to external command: %m")));
867 heikki.linnakangas 307 UBC 0 : else if (pclose_rc != 0)
867 heikki.linnakangas 308 EUB : {
867 heikki.linnakangas 309 UBC 0 : ereport(ERROR,
310 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
311 : errmsg("program \"%s\" failed",
867 heikki.linnakangas 312 EUB : cstate->filename),
313 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
314 : }
867 heikki.linnakangas 315 UIC 0 : }
316 :
317 : /*
318 : * Release resources allocated in a cstate for COPY TO/FROM.
319 : */
867 heikki.linnakangas 320 EUB : static void
867 heikki.linnakangas 321 GIC 3400 : EndCopy(CopyToState cstate)
322 : {
323 3400 : if (cstate->is_program)
324 : {
867 heikki.linnakangas 325 UIC 0 : ClosePipeToProgram(cstate);
867 heikki.linnakangas 326 ECB : }
327 : else
328 : {
867 heikki.linnakangas 329 GIC 3400 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
867 heikki.linnakangas 330 UBC 0 : ereport(ERROR,
331 : (errcode_for_file_access(),
332 : errmsg("could not close file \"%s\": %m",
333 : cstate->filename)));
867 heikki.linnakangas 334 ECB : }
867 heikki.linnakangas 335 EUB :
823 tomas.vondra 336 GIC 3400 : pgstat_progress_end_command();
337 :
867 heikki.linnakangas 338 3400 : MemoryContextDelete(cstate->copycontext);
339 3400 : pfree(cstate);
340 3400 : }
867 heikki.linnakangas 341 ECB :
342 : /*
343 : * Setup CopyToState to read tuples from a table or a query for COPY TO.
344 : *
345 : * 'rel': Relation to be copied
346 : * 'raw_query': Query whose results are to be copied
347 : * 'queryRelId': OID of base relation to convert to a query (for RLS)
348 : * 'filename': Name of server-local file to write, NULL for STDOUT
349 : * 'is_program': true if 'filename' is program to execute
350 : * 'data_dest_cb': Callback that processes the output data
351 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
352 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
353 : *
354 : * Returns a CopyToState, to be passed to DoCopyTo() and related functions.
355 : */
356 : CopyToState
867 heikki.linnakangas 357 GIC 3495 : BeginCopyTo(ParseState *pstate,
358 : Relation rel,
359 : RawStmt *raw_query,
360 : Oid queryRelId,
361 : const char *filename,
362 : bool is_program,
363 : copy_data_dest_cb data_dest_cb,
364 : List *attnamelist,
365 : List *options)
366 : {
367 : CopyToState cstate;
180 michael 368 GNC 3495 : bool pipe = (filename == NULL && data_dest_cb == NULL);
369 : TupleDesc tupDesc;
370 : int num_phys_attrs;
371 : MemoryContext oldcontext;
761 michael 372 GIC 3495 : const int progress_cols[] = {
373 : PROGRESS_COPY_COMMAND,
761 michael 374 ECB : PROGRESS_COPY_TYPE
375 : };
761 michael 376 GIC 3495 : int64 progress_vals[] = {
377 : PROGRESS_COPY_COMMAND_TO,
378 : 0
379 : };
380 :
867 heikki.linnakangas 381 3495 : if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
382 : {
383 6 : if (rel->rd_rel->relkind == RELKIND_VIEW)
384 6 : ereport(ERROR,
867 heikki.linnakangas 385 ECB : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
386 : errmsg("cannot copy from view \"%s\"",
387 : RelationGetRelationName(rel)),
388 : errhint("Try the COPY (SELECT ...) TO variant.")));
867 heikki.linnakangas 389 LBC 0 : else if (rel->rd_rel->relkind == RELKIND_MATVIEW)
867 heikki.linnakangas 390 UIC 0 : ereport(ERROR,
391 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
392 : errmsg("cannot copy from materialized view \"%s\"",
867 heikki.linnakangas 393 ECB : RelationGetRelationName(rel)),
394 : errhint("Try the COPY (SELECT ...) TO variant.")));
867 heikki.linnakangas 395 UIC 0 : else if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
396 0 : ereport(ERROR,
397 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
867 heikki.linnakangas 398 ECB : errmsg("cannot copy from foreign table \"%s\"",
399 : RelationGetRelationName(rel)),
400 : errhint("Try the COPY (SELECT ...) TO variant.")));
867 heikki.linnakangas 401 LBC 0 : else if (rel->rd_rel->relkind == RELKIND_SEQUENCE)
867 heikki.linnakangas 402 UIC 0 : ereport(ERROR,
403 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
404 : errmsg("cannot copy from sequence \"%s\"",
405 : RelationGetRelationName(rel))));
867 heikki.linnakangas 406 UBC 0 : else if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
407 0 : ereport(ERROR,
408 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
409 : errmsg("cannot copy from partitioned table \"%s\"",
410 : RelationGetRelationName(rel)),
411 : errhint("Try the COPY (SELECT ...) TO variant.")));
867 heikki.linnakangas 412 EUB : else
867 heikki.linnakangas 413 UBC 0 : ereport(ERROR,
414 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
415 : errmsg("cannot copy from non-table relation \"%s\"",
416 : RelationGetRelationName(rel))));
417 : }
867 heikki.linnakangas 418 EUB :
419 :
420 : /* Allocate workspace and zero all fields */
867 heikki.linnakangas 421 GIC 3489 : cstate = (CopyToStateData *) palloc0(sizeof(CopyToStateData));
422 :
867 heikki.linnakangas 423 EUB : /*
424 : * We allocate everything used by a cstate in a new memory context. This
425 : * avoids memory leaks during repeated use of COPY in a query.
426 : */
867 heikki.linnakangas 427 GIC 3489 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
428 : "COPY",
429 : ALLOCSET_DEFAULT_SIZES);
867 heikki.linnakangas 430 EUB :
867 heikki.linnakangas 431 GIC 3489 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
432 :
433 : /* Extract options from the statement node tree */
697 tgl 434 3489 : ProcessCopyOptions(pstate, &cstate->opts, false /* is_from */ , options);
435 :
436 : /* Process the source/target relation or query */
867 heikki.linnakangas 437 3462 : if (rel)
867 heikki.linnakangas 438 ECB : {
867 heikki.linnakangas 439 GIC 3248 : Assert(!raw_query);
440 :
441 3248 : cstate->rel = rel;
442 :
443 3248 : tupDesc = RelationGetDescr(cstate->rel);
867 heikki.linnakangas 444 ECB : }
445 : else
446 : {
447 : List *rewritten;
448 : Query *query;
449 : PlannedStmt *plan;
450 : DestReceiver *dest;
451 :
867 heikki.linnakangas 452 GIC 214 : cstate->rel = NULL;
453 :
867 heikki.linnakangas 454 ECB : /*
455 : * Run parse analysis and rewrite. Note this also acquires sufficient
456 : * locks on the source table(s).
457 : */
401 peter 458 CBC 214 : rewritten = pg_analyze_and_rewrite_fixedparams(raw_query,
459 : pstate->p_sourcetext, NULL, 0,
332 tgl 460 ECB : NULL);
461 :
462 : /* check that we got back something we can work with */
867 heikki.linnakangas 463 GIC 208 : if (rewritten == NIL)
464 : {
465 9 : ereport(ERROR,
466 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
467 : errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
468 : }
867 heikki.linnakangas 469 CBC 199 : else if (list_length(rewritten) > 1)
470 : {
471 : ListCell *lc;
472 :
473 : /* examine queries to determine which error message to issue */
867 heikki.linnakangas 474 GIC 51 : foreach(lc, rewritten)
867 heikki.linnakangas 475 ECB : {
867 heikki.linnakangas 476 GIC 42 : Query *q = lfirst_node(Query, lc);
477 :
478 42 : if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
479 9 : ereport(ERROR,
867 heikki.linnakangas 480 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
481 : errmsg("conditional DO INSTEAD rules are not supported for COPY")));
867 heikki.linnakangas 482 CBC 33 : if (q->querySource == QSRC_NON_INSTEAD_RULE)
867 heikki.linnakangas 483 GIC 9 : ereport(ERROR,
484 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
485 : errmsg("DO ALSO rules are not supported for the COPY")));
867 heikki.linnakangas 486 ECB : }
487 :
867 heikki.linnakangas 488 GIC 9 : ereport(ERROR,
489 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
490 : errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
867 heikki.linnakangas 491 ECB : }
492 :
867 heikki.linnakangas 493 CBC 172 : query = linitial_node(Query, rewritten);
494 :
867 heikki.linnakangas 495 ECB : /* The grammar allows SELECT INTO, but we don't support that */
867 heikki.linnakangas 496 CBC 172 : if (query->utilityStmt != NULL &&
867 heikki.linnakangas 497 GIC 6 : IsA(query->utilityStmt, CreateTableAsStmt))
498 6 : ereport(ERROR,
867 heikki.linnakangas 499 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
500 : errmsg("COPY (SELECT INTO) is not supported")));
501 :
867 heikki.linnakangas 502 GIC 166 : Assert(query->utilityStmt == NULL);
503 :
504 : /*
867 heikki.linnakangas 505 ECB : * Similarly the grammar doesn't enforce the presence of a RETURNING
506 : * clause, but this is required here.
507 : */
867 heikki.linnakangas 508 GIC 166 : if (query->commandType != CMD_SELECT &&
509 41 : query->returningList == NIL)
867 heikki.linnakangas 510 ECB : {
867 heikki.linnakangas 511 GIC 9 : Assert(query->commandType == CMD_INSERT ||
512 : query->commandType == CMD_UPDATE ||
867 heikki.linnakangas 513 ECB : query->commandType == CMD_DELETE);
514 :
867 heikki.linnakangas 515 CBC 9 : ereport(ERROR,
516 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
517 : errmsg("COPY query must have a RETURNING clause")));
518 : }
867 heikki.linnakangas 519 ECB :
520 : /* plan the query */
867 heikki.linnakangas 521 GIC 157 : plan = pg_plan_query(query, pstate->p_sourcetext,
522 : CURSOR_OPT_PARALLEL_OK, NULL);
523 :
524 : /*
718 peter 525 ECB : * With row-level security and a user using "COPY relation TO", we
867 heikki.linnakangas 526 : * have to convert the "COPY relation TO" to a query-based COPY (eg:
527 : * "COPY (SELECT * FROM ONLY relation) TO"), to allow the rewriter to
30 tgl 528 : * add in any RLS clauses.
529 : *
530 : * When this happens, we are passed in the relid of the originally
531 : * found relation (which we have locked). As the planner will look up
867 heikki.linnakangas 532 : * the relation again, we double-check here to make sure it found the
533 : * same one that we have locked.
534 : */
867 heikki.linnakangas 535 GIC 156 : if (queryRelId != InvalidOid)
536 : {
537 : /*
867 heikki.linnakangas 538 ECB : * Note that with RLS involved there may be multiple relations,
539 : * and while the one we need is almost certainly first, we don't
540 : * make any guarantees of that in the planner, so check the whole
541 : * list and make sure we find the original relation.
542 : */
867 heikki.linnakangas 543 GIC 27 : if (!list_member_oid(plan->relationOids, queryRelId))
867 heikki.linnakangas 544 UIC 0 : ereport(ERROR,
545 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
546 : errmsg("relation referenced by COPY statement has changed")));
547 : }
548 :
549 : /*
550 : * Use a snapshot with an updated command ID to ensure this query sees
551 : * results of any previously executed queries.
867 heikki.linnakangas 552 ECB : */
867 heikki.linnakangas 553 GIC 156 : PushCopiedSnapshot(GetActiveSnapshot());
554 156 : UpdateActiveSnapshotCommandId();
555 :
556 : /* Create dest receiver for COPY OUT */
557 156 : dest = CreateDestReceiver(DestCopyOut);
558 156 : ((DR_copy *) dest)->cstate = cstate;
559 :
867 heikki.linnakangas 560 ECB : /* Create a QueryDesc requesting no output */
867 heikki.linnakangas 561 GBC 156 : cstate->queryDesc = CreateQueryDesc(plan, pstate->p_sourcetext,
562 : GetActiveSnapshot(),
563 : InvalidSnapshot,
564 : dest, NULL, NULL, 0);
565 :
566 : /*
567 : * Call ExecutorStart to prepare the plan for execution.
568 : *
569 : * ExecutorStart computes a result tupdesc for us
867 heikki.linnakangas 570 ECB : */
867 heikki.linnakangas 571 CBC 156 : ExecutorStart(cstate->queryDesc, 0);
572 :
867 heikki.linnakangas 573 GIC 153 : tupDesc = cstate->queryDesc->tupDesc;
867 heikki.linnakangas 574 ECB : }
575 :
576 : /* Generate or convert list of attributes to process */
867 heikki.linnakangas 577 GIC 3401 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
867 heikki.linnakangas 578 ECB :
867 heikki.linnakangas 579 GIC 3401 : num_phys_attrs = tupDesc->natts;
580 :
581 : /* Convert FORCE_QUOTE name list to per-column flags, check validity */
582 3401 : cstate->opts.force_quote_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
583 3401 : if (cstate->opts.force_quote_all)
584 : {
585 : int i;
586 :
587 27 : for (i = 0; i < num_phys_attrs; i++)
867 heikki.linnakangas 588 CBC 18 : cstate->opts.force_quote_flags[i] = true;
589 : }
590 3392 : else if (cstate->opts.force_quote)
591 : {
592 : List *attnums;
593 : ListCell *cur;
867 heikki.linnakangas 594 ECB :
867 heikki.linnakangas 595 GIC 12 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_quote);
867 heikki.linnakangas 596 ECB :
867 heikki.linnakangas 597 GIC 24 : foreach(cur, attnums)
598 : {
867 heikki.linnakangas 599 CBC 12 : int attnum = lfirst_int(cur);
600 12 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
601 :
867 heikki.linnakangas 602 GIC 12 : if (!list_member_int(cstate->attnumlist, attnum))
867 heikki.linnakangas 603 UIC 0 : ereport(ERROR,
867 heikki.linnakangas 604 ECB : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
605 : errmsg("FORCE_QUOTE column \"%s\" not referenced by COPY",
606 : NameStr(attr->attname))));
867 heikki.linnakangas 607 CBC 12 : cstate->opts.force_quote_flags[attnum - 1] = true;
608 : }
609 : }
610 :
867 heikki.linnakangas 611 ECB : /* Use client encoding when ENCODING option is not specified. */
867 heikki.linnakangas 612 CBC 3401 : if (cstate->opts.file_encoding < 0)
867 heikki.linnakangas 613 GBC 3398 : cstate->file_encoding = pg_get_client_encoding();
614 : else
867 heikki.linnakangas 615 GIC 3 : cstate->file_encoding = cstate->opts.file_encoding;
616 :
867 heikki.linnakangas 617 ECB : /*
618 : * Set up encoding conversion info. Even if the file and server encodings
619 : * are the same, we must apply pg_any_to_server() to validate data in
620 : * multibyte encodings.
621 : */
867 heikki.linnakangas 622 GBC 3401 : cstate->need_transcoding =
623 6798 : (cstate->file_encoding != GetDatabaseEncoding() ||
624 3397 : pg_database_encoding_max_length() > 1);
867 heikki.linnakangas 625 EUB : /* See Multibyte encoding comment above */
867 heikki.linnakangas 626 GIC 3401 : cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
627 :
628 3401 : cstate->copy_dest = COPY_FILE; /* default */
629 :
180 michael 630 GNC 3401 : if (data_dest_cb)
631 : {
632 1 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
633 1 : cstate->copy_dest = COPY_CALLBACK;
634 1 : cstate->data_dest_cb = data_dest_cb;
635 : }
636 3400 : else if (pipe)
637 : {
761 michael 638 GIC 3381 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
639 :
867 heikki.linnakangas 640 3381 : Assert(!is_program); /* the grammar does not allow this */
867 heikki.linnakangas 641 CBC 3381 : if (whereToSendOutput != DestRemote)
867 heikki.linnakangas 642 UIC 0 : cstate->copy_file = stdout;
643 : }
644 : else
645 : {
867 heikki.linnakangas 646 GIC 19 : cstate->filename = pstrdup(filename);
867 heikki.linnakangas 647 CBC 19 : cstate->is_program = is_program;
867 heikki.linnakangas 648 EUB :
867 heikki.linnakangas 649 GIC 19 : if (is_program)
650 : {
761 michael 651 UIC 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
867 heikki.linnakangas 652 LBC 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
653 0 : if (cstate->copy_file == NULL)
867 heikki.linnakangas 654 UIC 0 : ereport(ERROR,
867 heikki.linnakangas 655 ECB : (errcode_for_file_access(),
656 : errmsg("could not execute command \"%s\": %m",
867 heikki.linnakangas 657 EUB : cstate->filename)));
658 : }
867 heikki.linnakangas 659 ECB : else
660 : {
661 : mode_t oumask; /* Pre-existing umask value */
662 : struct stat st;
663 :
761 michael 664 GIC 19 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
761 michael 665 EUB :
666 : /*
867 heikki.linnakangas 667 : * Prevent write to relative path ... too easy to shoot oneself in
668 : * the foot by overwriting a database file ...
669 : */
867 heikki.linnakangas 670 GIC 19 : if (!is_absolute_path(filename))
867 heikki.linnakangas 671 UIC 0 : ereport(ERROR,
672 : (errcode(ERRCODE_INVALID_NAME),
673 : errmsg("relative path not allowed for COPY to file")));
674 :
867 heikki.linnakangas 675 GIC 19 : oumask = umask(S_IWGRP | S_IWOTH);
867 heikki.linnakangas 676 CBC 19 : PG_TRY();
867 heikki.linnakangas 677 EUB : {
867 heikki.linnakangas 678 GIC 19 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
679 : }
867 heikki.linnakangas 680 UIC 0 : PG_FINALLY();
681 : {
867 heikki.linnakangas 682 CBC 19 : umask(oumask);
867 heikki.linnakangas 683 EUB : }
867 heikki.linnakangas 684 GIC 19 : PG_END_TRY();
685 19 : if (cstate->copy_file == NULL)
686 : {
687 : /* copy errno because ereport subfunctions might change it */
867 heikki.linnakangas 688 UIC 0 : int save_errno = errno;
689 :
867 heikki.linnakangas 690 LBC 0 : ereport(ERROR,
867 heikki.linnakangas 691 ECB : (errcode_for_file_access(),
692 : errmsg("could not open file \"%s\" for writing: %m",
693 : cstate->filename),
694 : (save_errno == ENOENT || save_errno == EACCES) ?
695 : errhint("COPY TO instructs the PostgreSQL server process to write a file. "
696 : "You may want a client-side facility such as psql's \\copy.") : 0));
697 : }
698 :
867 heikki.linnakangas 699 GIC 19 : if (fstat(fileno(cstate->copy_file), &st))
867 heikki.linnakangas 700 UIC 0 : ereport(ERROR,
701 : (errcode_for_file_access(),
702 : errmsg("could not stat file \"%s\": %m",
703 : cstate->filename)));
704 :
867 heikki.linnakangas 705 CBC 19 : if (S_ISDIR(st.st_mode))
867 heikki.linnakangas 706 UIC 0 : ereport(ERROR,
867 heikki.linnakangas 707 ECB : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
708 : errmsg("\"%s\" is a directory", cstate->filename)));
709 : }
710 : }
711 :
823 tomas.vondra 712 : /* initialize progress */
823 tomas.vondra 713 CBC 3401 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
823 tomas.vondra 714 GIC 3401 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
761 michael 715 3401 : pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
716 :
823 tomas.vondra 717 CBC 3401 : cstate->bytes_processed = 0;
823 tomas.vondra 718 ECB :
867 heikki.linnakangas 719 GIC 3401 : MemoryContextSwitchTo(oldcontext);
720 :
721 3401 : return cstate;
722 : }
723 :
724 : /*
725 : * Clean up storage and release resources for COPY TO.
867 heikki.linnakangas 726 ECB : */
727 : void
867 heikki.linnakangas 728 CBC 3400 : EndCopyTo(CopyToState cstate)
867 heikki.linnakangas 729 ECB : {
867 heikki.linnakangas 730 GIC 3400 : if (cstate->queryDesc != NULL)
731 : {
732 : /* Close down the query and free resources. */
733 153 : ExecutorFinish(cstate->queryDesc);
734 153 : ExecutorEnd(cstate->queryDesc);
867 heikki.linnakangas 735 CBC 153 : FreeQueryDesc(cstate->queryDesc);
736 153 : PopActiveSnapshot();
737 : }
867 heikki.linnakangas 738 ECB :
739 : /* Clean up storage */
867 heikki.linnakangas 740 GIC 3400 : EndCopy(cstate);
867 heikki.linnakangas 741 CBC 3400 : }
867 heikki.linnakangas 742 ECB :
743 : /*
744 : * Copy from relation or query TO file.
745 : *
746 : * Returns the number of rows processed.
747 : */
766 748 : uint64
766 heikki.linnakangas 749 GIC 3401 : DoCopyTo(CopyToState cstate)
750 : {
180 michael 751 GNC 3401 : bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
766 heikki.linnakangas 752 CBC 3401 : bool fe_copy = (pipe && whereToSendOutput == DestRemote);
753 : TupleDesc tupDesc;
867 heikki.linnakangas 754 ECB : int num_phys_attrs;
755 : ListCell *cur;
756 : uint64 processed;
757 :
766 heikki.linnakangas 758 GIC 3401 : if (fe_copy)
766 heikki.linnakangas 759 CBC 3381 : SendCopyBegin(cstate);
766 heikki.linnakangas 760 ECB :
867 heikki.linnakangas 761 GIC 3401 : if (cstate->rel)
762 3248 : tupDesc = RelationGetDescr(cstate->rel);
763 : else
867 heikki.linnakangas 764 CBC 153 : tupDesc = cstate->queryDesc->tupDesc;
867 heikki.linnakangas 765 GIC 3401 : num_phys_attrs = tupDesc->natts;
697 tgl 766 3401 : cstate->opts.null_print_client = cstate->opts.null_print; /* default */
867 heikki.linnakangas 767 ECB :
768 : /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
867 heikki.linnakangas 769 GIC 3401 : cstate->fe_msgbuf = makeStringInfo();
770 :
771 : /* Get info about the columns we need to process. */
772 3401 : cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
773 15828 : foreach(cur, cstate->attnumlist)
774 : {
775 12428 : int attnum = lfirst_int(cur);
867 heikki.linnakangas 776 ECB : Oid out_func_oid;
777 : bool isvarlena;
867 heikki.linnakangas 778 GIC 12428 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
779 :
867 heikki.linnakangas 780 CBC 12428 : if (cstate->opts.binary)
867 heikki.linnakangas 781 GIC 31 : getTypeBinaryOutputInfo(attr->atttypid,
782 : &out_func_oid,
783 : &isvarlena);
784 : else
785 12397 : getTypeOutputInfo(attr->atttypid,
867 heikki.linnakangas 786 ECB : &out_func_oid,
787 : &isvarlena);
867 heikki.linnakangas 788 CBC 12427 : fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
867 heikki.linnakangas 789 ECB : }
790 :
791 : /*
792 : * Create a temporary memory context that we can reset once per row to
793 : * recover palloc'd memory. This avoids any problems with leaks inside
794 : * datatype output routines, and should be faster than retail pfree's
795 : * anyway. (We don't need a whole econtext as CopyFrom does.)
796 : */
867 heikki.linnakangas 797 GIC 3400 : cstate->rowcontext = AllocSetContextCreate(CurrentMemoryContext,
798 : "COPY TO",
799 : ALLOCSET_DEFAULT_SIZES);
867 heikki.linnakangas 800 ECB :
867 heikki.linnakangas 801 CBC 3400 : if (cstate->opts.binary)
802 : {
803 : /* Generate header for a binary copy */
804 : int32 tmp;
805 :
867 heikki.linnakangas 806 ECB : /* Signature */
867 heikki.linnakangas 807 GIC 7 : CopySendData(cstate, BinarySignature, 11);
867 heikki.linnakangas 808 ECB : /* Flags field */
867 heikki.linnakangas 809 GIC 7 : tmp = 0;
867 heikki.linnakangas 810 CBC 7 : CopySendInt32(cstate, tmp);
811 : /* No header extension */
812 7 : tmp = 0;
867 heikki.linnakangas 813 GIC 7 : CopySendInt32(cstate, tmp);
814 : }
867 heikki.linnakangas 815 ECB : else
816 : {
817 : /*
818 : * For non-binary copy, we need to convert null_print to file
819 : * encoding, because it will be sent directly with CopySendString.
820 : */
867 heikki.linnakangas 821 CBC 3393 : if (cstate->need_transcoding)
822 3390 : cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
697 tgl 823 ECB : cstate->opts.null_print_len,
824 : cstate->file_encoding);
867 heikki.linnakangas 825 :
826 : /* if a header has been requested send the line */
867 heikki.linnakangas 827 GIC 3393 : if (cstate->opts.header_line)
867 heikki.linnakangas 828 ECB : {
867 heikki.linnakangas 829 GIC 9 : bool hdr_delim = false;
830 :
831 27 : foreach(cur, cstate->attnumlist)
867 heikki.linnakangas 832 ECB : {
867 heikki.linnakangas 833 GIC 18 : int attnum = lfirst_int(cur);
834 : char *colname;
835 :
836 18 : if (hdr_delim)
867 heikki.linnakangas 837 CBC 9 : CopySendChar(cstate, cstate->opts.delim[0]);
838 18 : hdr_delim = true;
839 :
840 18 : colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
867 heikki.linnakangas 841 ECB :
436 peter 842 GIC 18 : if (cstate->opts.csv_mode)
436 peter 843 CBC 12 : CopyAttributeOutCSV(cstate, colname, false,
332 tgl 844 GIC 12 : list_length(cstate->attnumlist) == 1);
845 : else
436 peter 846 CBC 6 : CopyAttributeOutText(cstate, colname);
847 : }
848 :
867 heikki.linnakangas 849 9 : CopySendEndOfRow(cstate);
850 : }
851 : }
852 :
867 heikki.linnakangas 853 GIC 3400 : if (cstate->rel)
854 : {
867 heikki.linnakangas 855 ECB : TupleTableSlot *slot;
856 : TableScanDesc scandesc;
857 :
867 heikki.linnakangas 858 GIC 3247 : scandesc = table_beginscan(cstate->rel, GetActiveSnapshot(), 0, NULL);
867 heikki.linnakangas 859 CBC 3247 : slot = table_slot_create(cstate->rel, NULL);
867 heikki.linnakangas 860 ECB :
867 heikki.linnakangas 861 GIC 3247 : processed = 0;
862 1712220 : while (table_scan_getnextslot(scandesc, ForwardScanDirection, slot))
863 : {
864 1708973 : CHECK_FOR_INTERRUPTS();
867 heikki.linnakangas 865 ECB :
866 : /* Deconstruct the tuple ... */
867 heikki.linnakangas 867 GIC 1708973 : slot_getallattrs(slot);
868 :
867 heikki.linnakangas 869 ECB : /* Format and send the data */
867 heikki.linnakangas 870 GIC 1708973 : CopyOneRowTo(cstate, slot);
871 :
761 michael 872 ECB : /*
873 : * Increment the number of processed tuples, and report the
874 : * progress.
875 : */
761 michael 876 GIC 1708973 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
761 michael 877 ECB : ++processed);
878 : }
867 heikki.linnakangas 879 :
867 heikki.linnakangas 880 CBC 3247 : ExecDropSingleTupleTableSlot(slot);
867 heikki.linnakangas 881 GIC 3247 : table_endscan(scandesc);
867 heikki.linnakangas 882 ECB : }
883 : else
884 : {
885 : /* run the plan --- the dest receiver will send tuples */
11 peter 886 GNC 153 : ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0, true);
867 heikki.linnakangas 887 GIC 153 : processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
888 : }
867 heikki.linnakangas 889 ECB :
867 heikki.linnakangas 890 GIC 3400 : if (cstate->opts.binary)
867 heikki.linnakangas 891 ECB : {
892 : /* Generate trailer for a binary copy */
867 heikki.linnakangas 893 GIC 7 : CopySendInt16(cstate, -1);
894 : /* Need to flush out the trailer */
895 7 : CopySendEndOfRow(cstate);
896 : }
867 heikki.linnakangas 897 ECB :
867 heikki.linnakangas 898 CBC 3400 : MemoryContextDelete(cstate->rowcontext);
899 :
766 900 3400 : if (fe_copy)
766 heikki.linnakangas 901 GIC 3380 : SendCopyEnd(cstate);
902 :
867 heikki.linnakangas 903 CBC 3400 : return processed;
904 : }
905 :
906 : /*
766 heikki.linnakangas 907 ECB : * Emit one row during DoCopyTo().
908 : */
867 909 : static void
867 heikki.linnakangas 910 GIC 1712423 : CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
867 heikki.linnakangas 911 ECB : {
867 heikki.linnakangas 912 CBC 1712423 : bool need_delim = false;
913 1712423 : FmgrInfo *out_functions = cstate->out_functions;
914 : MemoryContext oldcontext;
867 heikki.linnakangas 915 ECB : ListCell *cur;
916 : char *string;
917 :
867 heikki.linnakangas 918 CBC 1712423 : MemoryContextReset(cstate->rowcontext);
919 1712423 : oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
920 :
867 heikki.linnakangas 921 GIC 1712423 : if (cstate->opts.binary)
867 heikki.linnakangas 922 ECB : {
923 : /* Binary per-tuple header */
867 heikki.linnakangas 924 CBC 16 : CopySendInt16(cstate, list_length(cstate->attnumlist));
867 heikki.linnakangas 925 ECB : }
926 :
927 : /* Make sure the tuple is fully deconstructed */
867 heikki.linnakangas 928 GIC 1712423 : slot_getallattrs(slot);
929 :
930 6744349 : foreach(cur, cstate->attnumlist)
867 heikki.linnakangas 931 ECB : {
867 heikki.linnakangas 932 GIC 5031926 : int attnum = lfirst_int(cur);
867 heikki.linnakangas 933 CBC 5031926 : Datum value = slot->tts_values[attnum - 1];
867 heikki.linnakangas 934 GIC 5031926 : bool isnull = slot->tts_isnull[attnum - 1];
867 heikki.linnakangas 935 ECB :
867 heikki.linnakangas 936 CBC 5031926 : if (!cstate->opts.binary)
867 heikki.linnakangas 937 ECB : {
867 heikki.linnakangas 938 CBC 5031846 : if (need_delim)
867 heikki.linnakangas 939 GIC 3319509 : CopySendChar(cstate, cstate->opts.delim[0]);
867 heikki.linnakangas 940 CBC 5031846 : need_delim = true;
941 : }
942 :
867 heikki.linnakangas 943 GIC 5031926 : if (isnull)
944 : {
945 258009 : if (!cstate->opts.binary)
867 heikki.linnakangas 946 CBC 257994 : CopySendString(cstate, cstate->opts.null_print_client);
947 : else
948 15 : CopySendInt32(cstate, -1);
867 heikki.linnakangas 949 ECB : }
950 : else
951 : {
867 heikki.linnakangas 952 GIC 4773917 : if (!cstate->opts.binary)
953 : {
954 4773852 : string = OutputFunctionCall(&out_functions[attnum - 1],
867 heikki.linnakangas 955 ECB : value);
867 heikki.linnakangas 956 GIC 4773852 : if (cstate->opts.csv_mode)
867 heikki.linnakangas 957 CBC 285 : CopyAttributeOutCSV(cstate, string,
958 285 : cstate->opts.force_quote_flags[attnum - 1],
867 heikki.linnakangas 959 GIC 285 : list_length(cstate->attnumlist) == 1);
960 : else
961 4773567 : CopyAttributeOutText(cstate, string);
962 : }
963 : else
964 : {
965 : bytea *outputbytes;
966 :
967 65 : outputbytes = SendFunctionCall(&out_functions[attnum - 1],
968 : value);
969 65 : CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
867 heikki.linnakangas 970 CBC 65 : CopySendData(cstate, VARDATA(outputbytes),
867 heikki.linnakangas 971 GIC 65 : VARSIZE(outputbytes) - VARHDRSZ);
972 : }
973 : }
974 : }
867 heikki.linnakangas 975 ECB :
867 heikki.linnakangas 976 GIC 1712423 : CopySendEndOfRow(cstate);
867 heikki.linnakangas 977 ECB :
867 heikki.linnakangas 978 CBC 1712423 : MemoryContextSwitchTo(oldcontext);
867 heikki.linnakangas 979 GIC 1712423 : }
867 heikki.linnakangas 980 EUB :
981 : /*
982 : * Send text representation of one attribute, with conversion and escaping
983 : */
984 : #define DUMPSOFAR() \
985 : do { \
986 : if (ptr > start) \
987 : CopySendData(cstate, start, ptr - start); \
988 : } while (0)
989 :
990 : static void
436 peter 991 GIC 4773573 : CopyAttributeOutText(CopyToState cstate, const char *string)
992 : {
993 : const char *ptr;
994 : const char *start;
995 : char c;
867 heikki.linnakangas 996 CBC 4773573 : char delimc = cstate->opts.delim[0];
997 :
867 heikki.linnakangas 998 GBC 4773573 : if (cstate->need_transcoding)
999 4773573 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1000 : else
867 heikki.linnakangas 1001 UBC 0 : ptr = string;
1002 :
1003 : /*
1004 : * We have to grovel through the string searching for control characters
1005 : * and instances of the delimiter character. In most cases, though, these
1006 : * are infrequent. To avoid overhead from calling CopySendData once per
1007 : * character, we dump out all characters between escaped characters in a
1008 : * single call. The loop invariant is that the data from "start" to "ptr"
1009 : * can be sent literally, but hasn't yet been.
867 heikki.linnakangas 1010 EUB : *
1011 : * We can skip pg_encoding_mblen() overhead when encoding is safe, because
1012 : * in valid backend encodings, extra bytes of a multibyte character never
1013 : * look like ASCII. This loop is sufficiently performance-critical that
1014 : * it's worth making two copies of it to get the IS_HIGHBIT_SET() test out
1015 : * of the normal safe-encoding path.
1016 : */
867 heikki.linnakangas 1017 GBC 4773573 : if (cstate->encoding_embeds_ascii)
867 heikki.linnakangas 1018 EUB : {
867 heikki.linnakangas 1019 UBC 0 : start = ptr;
1020 0 : while ((c = *ptr) != '\0')
867 heikki.linnakangas 1021 EUB : {
867 heikki.linnakangas 1022 UBC 0 : if ((unsigned char) c < (unsigned char) 0x20)
867 heikki.linnakangas 1023 EUB : {
1024 : /*
1025 : * \r and \n must be escaped, the others are traditional. We
1026 : * prefer to dump these using the C-like notation, rather than
1027 : * a backslash and the literal character, because it makes the
1028 : * dump file a bit more proof against Microsoftish data
1029 : * mangling.
1030 : */
867 heikki.linnakangas 1031 UIC 0 : switch (c)
867 heikki.linnakangas 1032 EUB : {
867 heikki.linnakangas 1033 UBC 0 : case '\b':
867 heikki.linnakangas 1034 UIC 0 : c = 'b';
867 heikki.linnakangas 1035 UBC 0 : break;
1036 0 : case '\f':
867 heikki.linnakangas 1037 UIC 0 : c = 'f';
1038 0 : break;
867 heikki.linnakangas 1039 UBC 0 : case '\n':
1040 0 : c = 'n';
1041 0 : break;
1042 0 : case '\r':
867 heikki.linnakangas 1043 UIC 0 : c = 'r';
867 heikki.linnakangas 1044 UBC 0 : break;
867 heikki.linnakangas 1045 UIC 0 : case '\t':
867 heikki.linnakangas 1046 UBC 0 : c = 't';
1047 0 : break;
1048 0 : case '\v':
867 heikki.linnakangas 1049 UIC 0 : c = 'v';
867 heikki.linnakangas 1050 UBC 0 : break;
1051 0 : default:
1052 : /* If it's the delimiter, must backslash it */
1053 0 : if (c == delimc)
867 heikki.linnakangas 1054 UIC 0 : break;
1055 : /* All ASCII control chars are length 1 */
1056 0 : ptr++;
1057 0 : continue; /* fall to end of loop */
867 heikki.linnakangas 1058 ECB : }
1059 : /* if we get here, we need to convert the control char */
867 heikki.linnakangas 1060 UIC 0 : DUMPSOFAR();
867 heikki.linnakangas 1061 LBC 0 : CopySendChar(cstate, '\\');
867 heikki.linnakangas 1062 UIC 0 : CopySendChar(cstate, c);
1063 0 : start = ++ptr; /* do not include char in next run */
1064 : }
1065 0 : else if (c == '\\' || c == delimc)
1066 : {
1067 0 : DUMPSOFAR();
1068 0 : CopySendChar(cstate, '\\');
1069 0 : start = ptr++; /* we include char in next run */
867 heikki.linnakangas 1070 ECB : }
867 heikki.linnakangas 1071 UIC 0 : else if (IS_HIGHBIT_SET(c))
867 heikki.linnakangas 1072 UBC 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
867 heikki.linnakangas 1073 EUB : else
867 heikki.linnakangas 1074 UBC 0 : ptr++;
867 heikki.linnakangas 1075 EUB : }
1076 : }
1077 : else
867 heikki.linnakangas 1078 ECB : {
867 heikki.linnakangas 1079 CBC 4773573 : start = ptr;
1080 48545480 : while ((c = *ptr) != '\0')
867 heikki.linnakangas 1081 EUB : {
867 heikki.linnakangas 1082 GBC 43771907 : if ((unsigned char) c < (unsigned char) 0x20)
867 heikki.linnakangas 1083 EUB : {
867 heikki.linnakangas 1084 ECB : /*
1085 : * \r and \n must be escaped, the others are traditional. We
1086 : * prefer to dump these using the C-like notation, rather than
867 heikki.linnakangas 1087 EUB : * a backslash and the literal character, because it makes the
1088 : * dump file a bit more proof against Microsoftish data
1089 : * mangling.
1090 : */
867 heikki.linnakangas 1091 GIC 2267 : switch (c)
867 heikki.linnakangas 1092 EUB : {
867 heikki.linnakangas 1093 UBC 0 : case '\b':
867 heikki.linnakangas 1094 UIC 0 : c = 'b';
867 heikki.linnakangas 1095 UBC 0 : break;
1096 0 : case '\f':
867 heikki.linnakangas 1097 UIC 0 : c = 'f';
1098 0 : break;
867 heikki.linnakangas 1099 CBC 2264 : case '\n':
1100 2264 : c = 'n';
1101 2264 : break;
867 heikki.linnakangas 1102 LBC 0 : case '\r':
867 heikki.linnakangas 1103 UIC 0 : c = 'r';
867 heikki.linnakangas 1104 LBC 0 : break;
867 heikki.linnakangas 1105 GIC 3 : case '\t':
867 heikki.linnakangas 1106 CBC 3 : c = 't';
1107 3 : break;
867 heikki.linnakangas 1108 LBC 0 : case '\v':
867 heikki.linnakangas 1109 UIC 0 : c = 'v';
1110 0 : break;
867 heikki.linnakangas 1111 LBC 0 : default:
1112 : /* If it's the delimiter, must backslash it */
867 heikki.linnakangas 1113 UIC 0 : if (c == delimc)
1114 0 : break;
867 heikki.linnakangas 1115 ECB : /* All ASCII control chars are length 1 */
867 heikki.linnakangas 1116 LBC 0 : ptr++;
867 heikki.linnakangas 1117 UIC 0 : continue; /* fall to end of loop */
1118 : }
1119 : /* if we get here, we need to convert the control char */
867 heikki.linnakangas 1120 GIC 2267 : DUMPSOFAR();
1121 2267 : CopySendChar(cstate, '\\');
1122 2267 : CopySendChar(cstate, c);
867 heikki.linnakangas 1123 CBC 2267 : start = ++ptr; /* do not include char in next run */
1124 : }
867 heikki.linnakangas 1125 GIC 43769640 : else if (c == '\\' || c == delimc)
1126 : {
1127 1196 : DUMPSOFAR();
1128 1196 : CopySendChar(cstate, '\\');
867 heikki.linnakangas 1129 CBC 1196 : start = ptr++; /* we include char in next run */
867 heikki.linnakangas 1130 ECB : }
1131 : else
867 heikki.linnakangas 1132 GIC 43768444 : ptr++;
1133 : }
867 heikki.linnakangas 1134 ECB : }
1135 :
867 heikki.linnakangas 1136 GIC 4773573 : DUMPSOFAR();
867 heikki.linnakangas 1137 CBC 4773573 : }
867 heikki.linnakangas 1138 ECB :
1139 : /*
867 heikki.linnakangas 1140 EUB : * Send text representation of one attribute, with conversion and
1141 : * CSV-style escaping
1142 : */
1143 : static void
436 peter 1144 GIC 297 : CopyAttributeOutCSV(CopyToState cstate, const char *string,
867 heikki.linnakangas 1145 ECB : bool use_quote, bool single_attr)
1146 : {
1147 : const char *ptr;
1148 : const char *start;
1149 : char c;
867 heikki.linnakangas 1150 GIC 297 : char delimc = cstate->opts.delim[0];
867 heikki.linnakangas 1151 CBC 297 : char quotec = cstate->opts.quote[0];
1152 297 : char escapec = cstate->opts.escape[0];
1153 :
1154 : /* force quoting if it matches null_print (before conversion!) */
1155 297 : if (!use_quote && strcmp(string, cstate->opts.null_print) == 0)
867 heikki.linnakangas 1156 GIC 27 : use_quote = true;
867 heikki.linnakangas 1157 ECB :
867 heikki.linnakangas 1158 GIC 297 : if (cstate->need_transcoding)
867 heikki.linnakangas 1159 CBC 297 : ptr = pg_server_to_any(string, strlen(string), cstate->file_encoding);
1160 : else
867 heikki.linnakangas 1161 LBC 0 : ptr = string;
867 heikki.linnakangas 1162 ECB :
1163 : /*
1164 : * Make a preliminary pass to discover if it needs quoting
867 heikki.linnakangas 1165 EUB : */
867 heikki.linnakangas 1166 GIC 297 : if (!use_quote)
867 heikki.linnakangas 1167 ECB : {
1168 : /*
1169 : * Because '\.' can be a data value, quote it if it appears alone on a
1170 : * line so it is not interpreted as the end-of-data marker.
1171 : */
867 heikki.linnakangas 1172 CBC 204 : if (single_attr && strcmp(ptr, "\\.") == 0)
867 heikki.linnakangas 1173 GIC 3 : use_quote = true;
867 heikki.linnakangas 1174 ECB : else
1175 : {
436 peter 1176 GIC 201 : const char *tptr = ptr;
1177 :
867 heikki.linnakangas 1178 1056 : while ((c = *tptr) != '\0')
867 heikki.linnakangas 1179 ECB : {
867 heikki.linnakangas 1180 CBC 921 : if (c == delimc || c == quotec || c == '\n' || c == '\r')
1181 : {
1182 66 : use_quote = true;
867 heikki.linnakangas 1183 GIC 66 : break;
867 heikki.linnakangas 1184 ECB : }
867 heikki.linnakangas 1185 CBC 855 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
867 heikki.linnakangas 1186 LBC 0 : tptr += pg_encoding_mblen(cstate->file_encoding, tptr);
1187 : else
867 heikki.linnakangas 1188 CBC 855 : tptr++;
867 heikki.linnakangas 1189 EUB : }
1190 : }
867 heikki.linnakangas 1191 ECB : }
1192 :
867 heikki.linnakangas 1193 CBC 297 : if (use_quote)
1194 : {
1195 162 : CopySendChar(cstate, quotec);
1196 :
1197 : /*
1198 : * We adopt the same optimization strategy as in CopyAttributeOutText
1199 : */
1200 162 : start = ptr;
867 heikki.linnakangas 1201 GIC 1269 : while ((c = *ptr) != '\0')
867 heikki.linnakangas 1202 ECB : {
867 heikki.linnakangas 1203 GIC 1107 : if (c == quotec || c == escapec)
1204 : {
1205 78 : DUMPSOFAR();
1206 78 : CopySendChar(cstate, escapec);
1207 78 : start = ptr; /* we include char in next run */
867 heikki.linnakangas 1208 ECB : }
867 heikki.linnakangas 1209 GIC 1107 : if (IS_HIGHBIT_SET(c) && cstate->encoding_embeds_ascii)
867 heikki.linnakangas 1210 UIC 0 : ptr += pg_encoding_mblen(cstate->file_encoding, ptr);
867 heikki.linnakangas 1211 ECB : else
867 heikki.linnakangas 1212 GIC 1107 : ptr++;
1213 : }
1214 162 : DUMPSOFAR();
1215 :
1216 162 : CopySendChar(cstate, quotec);
867 heikki.linnakangas 1217 ECB : }
1218 : else
1219 : {
1220 : /* If it doesn't need quoting, we can just dump it as-is */
867 heikki.linnakangas 1221 GIC 135 : CopySendString(cstate, ptr);
1222 : }
867 heikki.linnakangas 1223 CBC 297 : }
1224 :
1225 : /*
867 heikki.linnakangas 1226 ECB : * copy_dest_startup --- executor startup
1227 : */
1228 : static void
867 heikki.linnakangas 1229 CBC 153 : copy_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
1230 : {
1231 : /* no-op */
867 heikki.linnakangas 1232 GIC 153 : }
1233 :
1234 : /*
1235 : * copy_dest_receive --- receive one tuple
867 heikki.linnakangas 1236 ECB : */
1237 : static bool
867 heikki.linnakangas 1238 GIC 3450 : copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
867 heikki.linnakangas 1239 ECB : {
867 heikki.linnakangas 1240 GIC 3450 : DR_copy *myState = (DR_copy *) self;
697 tgl 1241 3450 : CopyToState cstate = myState->cstate;
1242 :
1243 : /* Send the data */
867 heikki.linnakangas 1244 3450 : CopyOneRowTo(cstate, slot);
823 tomas.vondra 1245 EUB :
1246 : /* Increment the number of processed tuples, and report the progress */
761 michael 1247 GBC 3450 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1248 3450 : ++myState->processed);
1249 :
867 heikki.linnakangas 1250 GIC 3450 : return true;
1251 : }
1252 :
1253 : /*
867 heikki.linnakangas 1254 ECB : * copy_dest_shutdown --- executor end
1255 : */
1256 : static void
867 heikki.linnakangas 1257 GIC 153 : copy_dest_shutdown(DestReceiver *self)
867 heikki.linnakangas 1258 ECB : {
1259 : /* no-op */
867 heikki.linnakangas 1260 CBC 153 : }
867 heikki.linnakangas 1261 ECB :
1262 : /*
1263 : * copy_dest_destroy --- release DestReceiver object
1264 : */
1265 : static void
867 heikki.linnakangas 1266 UIC 0 : copy_dest_destroy(DestReceiver *self)
867 heikki.linnakangas 1267 ECB : {
867 heikki.linnakangas 1268 UIC 0 : pfree(self);
1269 0 : }
1270 :
1271 : /*
1272 : * CreateCopyDestReceiver -- create a suitable DestReceiver object
1273 : */
1274 : DestReceiver *
867 heikki.linnakangas 1275 GIC 156 : CreateCopyDestReceiver(void)
1276 : {
1277 156 : DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));
1278 :
1279 156 : self->pub.receiveSlot = copy_dest_receive;
1280 156 : self->pub.rStartup = copy_dest_startup;
1281 156 : self->pub.rShutdown = copy_dest_shutdown;
1282 156 : self->pub.rDestroy = copy_dest_destroy;
1283 156 : self->pub.mydest = DestCopyOut;
1284 :
1285 156 : self->cstate = NULL; /* will be set later */
1286 156 : self->processed = 0;
1287 :
1288 156 : return (DestReceiver *) self;
1289 : }
|