Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * copyfrom.c
4 : : * COPY <table> FROM file/program/client
5 : : *
6 : : * This file contains routines needed to efficiently load tuples into a
7 : : * table. That includes looking up the correct partition, firing triggers,
8 : : * calling the table AM function to insert the data, and updating indexes.
9 : : * Reading data from the input file or client and parsing it into Datums
10 : : * is handled in copyfromparse.c.
11 : : *
12 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
13 : : * Portions Copyright (c) 1994, Regents of the University of California
14 : : *
15 : : *
16 : : * IDENTIFICATION
17 : : * src/backend/commands/copyfrom.c
18 : : *
19 : : *-------------------------------------------------------------------------
20 : : */
21 : : #include "postgres.h"
22 : :
23 : : #include <ctype.h>
24 : : #include <unistd.h>
25 : : #include <sys/stat.h>
26 : :
27 : : #include "access/heapam.h"
28 : : #include "access/tableam.h"
29 : : #include "access/xact.h"
30 : : #include "catalog/namespace.h"
31 : : #include "commands/copy.h"
32 : : #include "commands/copyfrom_internal.h"
33 : : #include "commands/progress.h"
34 : : #include "commands/trigger.h"
35 : : #include "executor/execPartition.h"
36 : : #include "executor/executor.h"
37 : : #include "executor/nodeModifyTable.h"
38 : : #include "executor/tuptable.h"
39 : : #include "foreign/fdwapi.h"
40 : : #include "mb/pg_wchar.h"
41 : : #include "miscadmin.h"
42 : : #include "nodes/miscnodes.h"
43 : : #include "optimizer/optimizer.h"
44 : : #include "pgstat.h"
45 : : #include "rewrite/rewriteHandler.h"
46 : : #include "storage/fd.h"
47 : : #include "tcop/tcopprot.h"
48 : : #include "utils/lsyscache.h"
49 : : #include "utils/memutils.h"
50 : : #include "utils/portal.h"
51 : : #include "utils/rel.h"
52 : : #include "utils/snapmgr.h"
53 : :
54 : : /*
55 : : * No more than this many tuples per CopyMultiInsertBuffer
56 : : *
57 : : * Caution: Don't make this too big, as we could end up with this many
58 : : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
59 : : * multiInsertBuffers list. Increasing this can cause quadratic growth in
60 : : * memory requirements during copies into partitioned tables with a large
61 : : * number of partitions.
62 : : */
63 : : #define MAX_BUFFERED_TUPLES 1000
64 : :
65 : : /*
66 : : * Flush buffers if there are >= this many bytes, as counted by the input
67 : : * size, of tuples stored.
68 : : */
69 : : #define MAX_BUFFERED_BYTES 65535
70 : :
71 : : /* Trim the list of buffers back down to this number after flushing */
72 : : #define MAX_PARTITION_BUFFERS 32
73 : :
74 : : /* Stores multi-insert data related to a single relation in CopyFrom. */
75 : : typedef struct CopyMultiInsertBuffer
76 : : {
77 : : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
78 : : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
79 : : BulkInsertState bistate; /* BulkInsertState for this rel if plain
80 : : * table; NULL if foreign table */
81 : : int nused; /* number of 'slots' containing tuples */
82 : : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
83 : : * stream */
84 : : } CopyMultiInsertBuffer;
85 : :
86 : : /*
87 : : * Stores one or many CopyMultiInsertBuffers and details about the size and
88 : : * number of tuples which are stored in them. This allows multiple buffers to
89 : : * exist at once when COPYing into a partitioned table.
90 : : */
91 : : typedef struct CopyMultiInsertInfo
92 : : {
93 : : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
94 : : int bufferedTuples; /* number of tuples buffered over all buffers */
95 : : int bufferedBytes; /* number of bytes from all buffered tuples */
96 : : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
97 : : EState *estate; /* Executor state used for COPY */
98 : : CommandId mycid; /* Command Id used for COPY */
99 : : int ti_options; /* table insert options */
100 : : } CopyMultiInsertInfo;
101 : :
102 : :
103 : : /* non-export function prototypes */
104 : : static void ClosePipeFromProgram(CopyFromState cstate);
105 : :
106 : : /*
107 : : * error context callback for COPY FROM
108 : : *
109 : : * The argument for the error context must be CopyFromState.
110 : : */
111 : : void
1238 heikki.linnakangas@i 112 :CBC 140 : CopyFromErrorCallback(void *arg)
113 : : {
1109 114 : 140 : CopyFromState cstate = (CopyFromState) arg;
115 : :
549 efujita@postgresql.o 116 [ + + ]: 140 : if (cstate->relname_only)
117 : : {
118 : 22 : errcontext("COPY %s",
119 : : cstate->cur_relname);
120 : 22 : return;
121 : : }
1238 heikki.linnakangas@i 122 [ + + ]: 118 : if (cstate->opts.binary)
123 : : {
124 : : /* can't usefully display the data */
125 [ + - ]: 1 : if (cstate->cur_attname)
755 tgl@sss.pgh.pa.us 126 : 1 : errcontext("COPY %s, line %llu, column %s",
127 : : cstate->cur_relname,
128 : 1 : (unsigned long long) cstate->cur_lineno,
129 : : cstate->cur_attname);
130 : : else
755 tgl@sss.pgh.pa.us 131 :UBC 0 : errcontext("COPY %s, line %llu",
132 : : cstate->cur_relname,
133 : 0 : (unsigned long long) cstate->cur_lineno);
134 : : }
135 : : else
136 : : {
1238 heikki.linnakangas@i 137 [ + + + + ]:CBC 117 : if (cstate->cur_attname && cstate->cur_attval)
138 : 16 : {
139 : : /* error is relevant to a particular column */
140 : : char *attval;
141 : :
13 msawada@postgresql.o 142 :GNC 16 : attval = CopyLimitPrintoutLength(cstate->cur_attval);
755 tgl@sss.pgh.pa.us 143 :CBC 16 : errcontext("COPY %s, line %llu, column %s: \"%s\"",
144 : : cstate->cur_relname,
145 : 16 : (unsigned long long) cstate->cur_lineno,
146 : : cstate->cur_attname,
147 : : attval);
1238 heikki.linnakangas@i 148 : 16 : pfree(attval);
149 : : }
150 [ + + ]: 101 : else if (cstate->cur_attname)
151 : : {
152 : : /* error is relevant to a particular column, value is NULL */
755 tgl@sss.pgh.pa.us 153 : 3 : errcontext("COPY %s, line %llu, column %s: null input",
154 : : cstate->cur_relname,
155 : 3 : (unsigned long long) cstate->cur_lineno,
156 : : cstate->cur_attname);
157 : : }
158 : : else
159 : : {
160 : : /*
161 : : * Error is relevant to a particular line.
162 : : *
163 : : * If line_buf still contains the correct line, print it.
164 : : */
1109 heikki.linnakangas@i 165 [ + + ]: 98 : if (cstate->line_buf_valid)
166 : : {
167 : : char *lineval;
168 : :
13 msawada@postgresql.o 169 :GNC 92 : lineval = CopyLimitPrintoutLength(cstate->line_buf.data);
755 tgl@sss.pgh.pa.us 170 :CBC 92 : errcontext("COPY %s, line %llu: \"%s\"",
171 : : cstate->cur_relname,
172 : 92 : (unsigned long long) cstate->cur_lineno, lineval);
1238 heikki.linnakangas@i 173 : 92 : pfree(lineval);
174 : : }
175 : : else
176 : : {
755 tgl@sss.pgh.pa.us 177 : 6 : errcontext("COPY %s, line %llu",
178 : : cstate->cur_relname,
179 : 6 : (unsigned long long) cstate->cur_lineno);
180 : : }
181 : : }
182 : : }
183 : : }
184 : :
185 : : /*
186 : : * Make sure we don't print an unreasonable amount of COPY data in a message.
187 : : *
188 : : * Returns a pstrdup'd copy of the input.
189 : : */
190 : : char *
13 msawada@postgresql.o 191 :GNC 126 : CopyLimitPrintoutLength(const char *str)
192 : : {
193 : : #define MAX_COPY_DATA_DISPLAY 100
194 : :
1238 heikki.linnakangas@i 195 :CBC 126 : int slen = strlen(str);
196 : : int len;
197 : : char *res;
198 : :
199 : : /* Fast path if definitely okay */
200 [ + - ]: 126 : if (slen <= MAX_COPY_DATA_DISPLAY)
201 : 126 : return pstrdup(str);
202 : :
203 : : /* Apply encoding-dependent truncation */
1238 heikki.linnakangas@i 204 :UBC 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
205 : :
206 : : /*
207 : : * Truncate, and add "..." to show we truncated the input.
208 : : */
209 : 0 : res = (char *) palloc(len + 4);
210 : 0 : memcpy(res, str, len);
211 : 0 : strcpy(res + len, "...");
212 : :
213 : 0 : return res;
214 : : }
215 : :
216 : : /*
217 : : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
218 : : * ResultRelInfo.
219 : : */
220 : : static CopyMultiInsertBuffer *
1238 heikki.linnakangas@i 221 :CBC 693 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
222 : : {
223 : : CopyMultiInsertBuffer *buffer;
224 : :
225 : 693 : buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
226 : 693 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
227 : 693 : buffer->resultRelInfo = rri;
549 efujita@postgresql.o 228 [ + + ]: 693 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
1238 heikki.linnakangas@i 229 : 693 : buffer->nused = 0;
230 : :
231 : 693 : return buffer;
232 : : }
233 : :
234 : : /*
235 : : * Make a new buffer for this ResultRelInfo.
236 : : */
237 : : static inline void
238 : 693 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
239 : : ResultRelInfo *rri)
240 : : {
241 : : CopyMultiInsertBuffer *buffer;
242 : :
243 : 693 : buffer = CopyMultiInsertBufferInit(rri);
244 : :
245 : : /* Setup back-link so we can easily find this buffer again */
246 : 693 : rri->ri_CopyMultiInsertBuffer = buffer;
247 : : /* Record that we're tracking this buffer */
248 : 693 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
249 : 693 : }
250 : :
251 : : /*
252 : : * Initialize an already allocated CopyMultiInsertInfo.
253 : : *
254 : : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
255 : : * for that table.
256 : : */
257 : : static void
258 : 687 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
259 : : CopyFromState cstate, EState *estate, CommandId mycid,
260 : : int ti_options)
261 : : {
262 : 687 : miinfo->multiInsertBuffers = NIL;
263 : 687 : miinfo->bufferedTuples = 0;
264 : 687 : miinfo->bufferedBytes = 0;
265 : 687 : miinfo->cstate = cstate;
266 : 687 : miinfo->estate = estate;
267 : 687 : miinfo->mycid = mycid;
268 : 687 : miinfo->ti_options = ti_options;
269 : :
270 : : /*
271 : : * Only setup the buffer when not dealing with a partitioned table.
272 : : * Buffers for partitioned tables will just be setup when we need to send
273 : : * tuples their way for the first time.
274 : : */
275 [ + + ]: 687 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
276 : 650 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
277 : 687 : }
278 : :
279 : : /*
280 : : * Returns true if the buffers are full
281 : : */
282 : : static inline bool
283 : 693217 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
284 : : {
285 [ + + ]: 693217 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
286 [ + + ]: 692620 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
287 : 650 : return true;
288 : 692567 : return false;
289 : : }
290 : :
291 : : /*
292 : : * Returns true if we have no buffered tuples
293 : : */
294 : : static inline bool
295 : 633 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
296 : : {
297 : 633 : return miinfo->bufferedTuples == 0;
298 : : }
299 : :
300 : : /*
301 : : * Write the tuples stored in 'buffer' out to the table.
302 : : */
303 : : static inline void
304 : 1266 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
305 : : CopyMultiInsertBuffer *buffer,
306 : : int64 *processed)
307 : : {
1109 308 : 1266 : CopyFromState cstate = miinfo->cstate;
1238 309 : 1266 : EState *estate = miinfo->estate;
310 : 1266 : int nused = buffer->nused;
311 : 1266 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
312 : 1266 : TupleTableSlot **slots = buffer->slots;
313 : : int i;
314 : :
549 efujita@postgresql.o 315 [ + + ]: 1266 : if (resultRelInfo->ri_FdwRoutine)
316 : : {
317 : 7 : int batch_size = resultRelInfo->ri_BatchSize;
318 : 7 : int sent = 0;
319 : :
320 [ - + ]: 7 : Assert(buffer->bistate == NULL);
321 : :
322 : : /* Ensure that the FDW supports batching and it's enabled */
323 [ - + ]: 7 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
324 [ - + ]: 7 : Assert(batch_size > 1);
325 : :
326 : : /*
327 : : * We suppress error context information other than the relation name,
328 : : * if one of the operations below fails.
329 : : */
330 [ - + ]: 7 : Assert(!cstate->relname_only);
331 : 7 : cstate->relname_only = true;
332 : :
333 [ + + ]: 19 : while (sent < nused)
334 : : {
335 : 13 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
336 : 13 : int inserted = size;
337 : : TupleTableSlot **rslots;
338 : :
339 : : /* insert into foreign table: let the FDW do it */
340 : : rslots =
341 : 13 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
342 : : resultRelInfo,
343 : 13 : &slots[sent],
344 : : NULL,
345 : : &inserted);
346 : :
347 : 12 : sent += size;
348 : :
349 : : /* No need to do anything if there are no inserted rows */
350 [ + + ]: 12 : if (inserted <= 0)
351 : 2 : continue;
352 : :
353 : : /* Triggers on foreign tables should not have transition tables */
354 [ - + - - ]: 10 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
355 : : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
356 : :
357 : : /* Run AFTER ROW INSERT triggers */
358 [ - + ]: 10 : if (resultRelInfo->ri_TrigDesc != NULL &&
549 efujita@postgresql.o 359 [ # # ]:UBC 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
360 : : {
361 : 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
362 : :
363 [ # # ]: 0 : for (i = 0; i < inserted; i++)
364 : : {
365 : 0 : TupleTableSlot *slot = rslots[i];
366 : :
367 : : /*
368 : : * AFTER ROW Triggers might reference the tableoid column,
369 : : * so (re-)initialize tts_tableOid before evaluating them.
370 : : */
371 : 0 : slot->tts_tableOid = relid;
372 : :
373 : 0 : ExecARInsertTriggers(estate, resultRelInfo,
374 : : slot, NIL,
375 : : cstate->transition_capture);
376 : : }
377 : : }
378 : :
379 : : /* Update the row counter and progress of the COPY command */
549 efujita@postgresql.o 380 :CBC 10 : *processed += inserted;
381 : 10 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
382 : : *processed);
383 : : }
384 : :
385 [ + + ]: 24 : for (i = 0; i < nused; i++)
386 : 18 : ExecClearTuple(slots[i]);
387 : :
388 : : /* reset relname_only */
389 : 6 : cstate->relname_only = false;
390 : : }
391 : : else
392 : : {
393 : 1259 : CommandId mycid = miinfo->mycid;
394 : 1259 : int ti_options = miinfo->ti_options;
395 : 1259 : bool line_buf_valid = cstate->line_buf_valid;
396 : 1259 : uint64 save_cur_lineno = cstate->cur_lineno;
397 : : MemoryContext oldcontext;
398 : :
399 [ - + ]: 1259 : Assert(buffer->bistate != NULL);
400 : :
401 : : /*
402 : : * Print error context information correctly, if one of the operations
403 : : * below fails.
404 : : */
405 : 1259 : cstate->line_buf_valid = false;
406 : :
407 : : /*
408 : : * table_multi_insert may leak memory, so switch to short-lived memory
409 : : * context before calling it.
410 : : */
411 [ + - ]: 1259 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
412 : 1259 : table_multi_insert(resultRelInfo->ri_RelationDesc,
413 : : slots,
414 : : nused,
415 : : mycid,
416 : : ti_options,
417 : : buffer->bistate);
418 : 1259 : MemoryContextSwitchTo(oldcontext);
419 : :
420 [ + + ]: 694400 : for (i = 0; i < nused; i++)
421 : : {
422 : : /*
423 : : * If there are any indexes, update them for all the inserted
424 : : * tuples, and run AFTER ROW INSERT triggers.
425 : : */
3 akorotkov@postgresql 426 [ + + ]: 693147 : if (resultRelInfo->ri_NumIndices > 0)
427 : : {
428 : : List *recheckIndexes;
429 : :
549 efujita@postgresql.o 430 : 200917 : cstate->cur_lineno = buffer->linenos[i];
431 : : recheckIndexes =
432 : 200917 : ExecInsertIndexTuples(resultRelInfo,
433 : : buffer->slots[i], estate, false,
434 : : false, NULL, NIL, false);
435 : 200911 : ExecARInsertTriggers(estate, resultRelInfo,
436 : 200911 : slots[i], recheckIndexes,
437 : : cstate->transition_capture);
438 : 200911 : list_free(recheckIndexes);
439 : : }
440 : :
441 : : /*
442 : : * There's no indexes, but see if we need to run AFTER ROW INSERT
443 : : * triggers anyway.
444 : : */
445 [ + + ]: 492230 : else if (resultRelInfo->ri_TrigDesc != NULL &&
446 [ + + ]: 39 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
447 [ + + ]: 30 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
448 : : {
449 : 18 : cstate->cur_lineno = buffer->linenos[i];
450 : 18 : ExecARInsertTriggers(estate, resultRelInfo,
451 : 18 : slots[i], NIL,
452 : : cstate->transition_capture);
453 : : }
454 : :
455 : 693141 : ExecClearTuple(slots[i]);
456 : : }
457 : :
458 : : /* Update the row counter and progress of the COPY command */
459 : 1253 : *processed += nused;
460 : 1253 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
461 : : *processed);
462 : :
463 : : /* reset cur_lineno and line_buf_valid to what they were */
464 : 1253 : cstate->line_buf_valid = line_buf_valid;
465 : 1253 : cstate->cur_lineno = save_cur_lineno;
466 : : }
467 : :
468 : : /* Mark that all slots are free */
1238 heikki.linnakangas@i 469 : 1259 : buffer->nused = 0;
470 : 1259 : }
471 : :
472 : : /*
473 : : * Drop used slots and free member for this buffer.
474 : : *
475 : : * The buffer must be flushed before cleanup.
476 : : */
477 : : static inline void
478 : 618 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
479 : : CopyMultiInsertBuffer *buffer)
480 : : {
549 efujita@postgresql.o 481 : 618 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
482 : : int i;
483 : :
484 : : /* Ensure buffer was flushed */
1238 heikki.linnakangas@i 485 [ - + ]: 618 : Assert(buffer->nused == 0);
486 : :
487 : : /* Remove back-link to ourself */
549 efujita@postgresql.o 488 : 618 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
489 : :
490 [ + + ]: 618 : if (resultRelInfo->ri_FdwRoutine == NULL)
491 : : {
492 [ - + ]: 612 : Assert(buffer->bistate != NULL);
493 : 612 : FreeBulkInsertState(buffer->bistate);
494 : : }
495 : : else
496 [ - + ]: 6 : Assert(buffer->bistate == NULL);
497 : :
498 : : /* Since we only create slots on demand, just drop the non-null ones. */
1238 heikki.linnakangas@i 499 [ + + + + ]: 114210 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
500 : 113592 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
501 : :
549 efujita@postgresql.o 502 [ + + ]: 618 : if (resultRelInfo->ri_FdwRoutine == NULL)
503 : 612 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
504 : : miinfo->ti_options);
505 : :
1238 heikki.linnakangas@i 506 : 618 : pfree(buffer);
507 : 618 : }
508 : :
509 : : /*
510 : : * Write out all stored tuples in all buffers out to the tables.
511 : : *
512 : : * Once flushed we also trim the tracked buffers list down to size by removing
513 : : * the buffers created earliest first.
514 : : *
515 : : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
516 : : * used. When cleaning up old buffers we'll never remove the one for
517 : : * 'curr_rri'.
518 : : */
519 : : static inline void
549 efujita@postgresql.o 520 : 1153 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
521 : : int64 *processed)
522 : : {
523 : : ListCell *lc;
524 : :
1238 heikki.linnakangas@i 525 [ + - + + : 2412 : foreach(lc, miinfo->multiInsertBuffers)
+ + ]
526 : : {
527 : 1266 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
528 : :
549 efujita@postgresql.o 529 : 1266 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
530 : : }
531 : :
1238 heikki.linnakangas@i 532 : 1146 : miinfo->bufferedTuples = 0;
533 : 1146 : miinfo->bufferedBytes = 0;
534 : :
535 : : /*
536 : : * Trim the list of tracked buffers down if it exceeds the limit. Here we
537 : : * remove buffers starting with the ones we created first. It seems less
538 : : * likely that these older ones will be needed than the ones that were
539 : : * just created.
540 : : */
541 [ - + ]: 1146 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
542 : : {
543 : : CopyMultiInsertBuffer *buffer;
544 : :
1238 heikki.linnakangas@i 545 :UBC 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
546 : :
547 : : /*
548 : : * We never want to remove the buffer that's currently being used, so
549 : : * if we happen to find that then move it to the end of the list.
550 : : */
551 [ # # ]: 0 : if (buffer->resultRelInfo == curr_rri)
552 : : {
553 : 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
554 : 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
555 : 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
556 : : }
557 : :
558 : 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
559 : 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
560 : : }
1238 heikki.linnakangas@i 561 :CBC 1146 : }
562 : :
563 : : /*
564 : : * Cleanup allocated buffers and free memory
565 : : */
566 : : static inline void
567 : 612 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
568 : : {
569 : : ListCell *lc;
570 : :
571 [ + + + + : 1230 : foreach(lc, miinfo->multiInsertBuffers)
+ + ]
572 : 618 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
573 : :
574 : 612 : list_free(miinfo->multiInsertBuffers);
575 : 612 : }
576 : :
577 : : /*
578 : : * Get the next TupleTableSlot that the next tuple should be stored in.
579 : : *
580 : : * Callers must ensure that the buffer is not full.
581 : : *
582 : : * Note: 'miinfo' is unused but has been included for consistency with the
583 : : * other functions in this area.
584 : : */
585 : : static inline TupleTableSlot *
586 : 693891 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
587 : : ResultRelInfo *rri)
588 : : {
589 : 693891 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
590 : 693891 : int nused = buffer->nused;
591 : :
592 [ - + ]: 693891 : Assert(buffer != NULL);
593 [ - + ]: 693891 : Assert(nused < MAX_BUFFERED_TUPLES);
594 : :
595 [ + + ]: 693891 : if (buffer->slots[nused] == NULL)
596 : 113724 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
597 : 693891 : return buffer->slots[nused];
598 : : }
599 : :
600 : : /*
601 : : * Record the previously reserved TupleTableSlot that was reserved by
602 : : * CopyMultiInsertInfoNextFreeSlot as being consumed.
603 : : */
604 : : static inline void
605 : 693217 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
606 : : TupleTableSlot *slot, int tuplen, uint64 lineno)
607 : : {
608 : 693217 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
609 : :
610 [ - + ]: 693217 : Assert(buffer != NULL);
611 [ - + ]: 693217 : Assert(slot == buffer->slots[buffer->nused]);
612 : :
613 : : /* Store the line number so we can properly report any errors later */
614 : 693217 : buffer->linenos[buffer->nused] = lineno;
615 : :
616 : : /* Record this slot as being used */
617 : 693217 : buffer->nused++;
618 : :
619 : : /* Update how many tuples are stored and their size */
620 : 693217 : miinfo->bufferedTuples++;
621 : 693217 : miinfo->bufferedBytes += tuplen;
622 : 693217 : }
623 : :
624 : : /*
625 : : * Copy FROM file to relation.
626 : : */
627 : : uint64
628 : 776 : CopyFrom(CopyFromState cstate)
629 : : {
630 : : ResultRelInfo *resultRelInfo;
631 : : ResultRelInfo *target_resultRelInfo;
632 : 776 : ResultRelInfo *prevResultRelInfo = NULL;
633 : 776 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
634 : : ModifyTableState *mtstate;
635 : : ExprContext *econtext;
636 : 776 : TupleTableSlot *singleslot = NULL;
637 : 776 : MemoryContext oldcontext = CurrentMemoryContext;
638 : :
639 : 776 : PartitionTupleRouting *proute = NULL;
640 : : ErrorContextCallback errcallback;
641 : 776 : CommandId mycid = GetCurrentCommandId(true);
642 : 776 : int ti_options = 0; /* start with default options for insert */
643 : 776 : BulkInsertState bistate = NULL;
644 : : CopyInsertMethod insertMethod;
645 : 776 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
1132 michael@paquier.xyz 646 : 776 : int64 processed = 0;
647 : 776 : int64 excluded = 0;
80 msawada@postgresql.o 648 :GNC 776 : int64 skipped = 0;
649 : : bool has_before_insert_row_trig;
650 : : bool has_instead_insert_row_trig;
1238 heikki.linnakangas@i 651 :CBC 776 : bool leafpart_use_multi_insert = false;
652 : :
653 [ - + ]: 776 : Assert(cstate->rel);
654 [ - + ]: 776 : Assert(list_length(cstate->range_table) == 1);
655 : :
86 akorotkov@postgresql 656 [ + + ]:GNC 776 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
89 657 [ - + ]: 15 : Assert(cstate->escontext);
658 : :
659 : : /*
660 : : * The target must be a plain, foreign, or partitioned relation, or have
661 : : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
662 : : * allowed on views, so we only hint about them in the view case.)
663 : : */
1238 heikki.linnakangas@i 664 [ + + ]:CBC 776 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
665 [ + + ]: 80 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
666 [ + + ]: 61 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
667 [ + + ]: 9 : !(cstate->rel->trigdesc &&
668 [ - + ]: 6 : cstate->rel->trigdesc->trig_insert_instead_row))
669 : : {
670 [ + - ]: 3 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
671 [ + - ]: 3 : ereport(ERROR,
672 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
673 : : errmsg("cannot copy to view \"%s\"",
674 : : RelationGetRelationName(cstate->rel)),
675 : : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
1238 heikki.linnakangas@i 676 [ # # ]:UBC 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
677 [ # # ]: 0 : ereport(ERROR,
678 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
679 : : errmsg("cannot copy to materialized view \"%s\"",
680 : : RelationGetRelationName(cstate->rel))));
681 [ # # ]: 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
682 [ # # ]: 0 : ereport(ERROR,
683 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
684 : : errmsg("cannot copy to sequence \"%s\"",
685 : : RelationGetRelationName(cstate->rel))));
686 : : else
687 [ # # ]: 0 : ereport(ERROR,
688 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
689 : : errmsg("cannot copy to non-table relation \"%s\"",
690 : : RelationGetRelationName(cstate->rel))));
691 : : }
692 : :
693 : : /*
694 : : * If the target file is new-in-transaction, we assume that checking FSM
695 : : * for free space is a waste of time. This could possibly be wrong, but
696 : : * it's unlikely.
697 : : */
1238 heikki.linnakangas@i 698 [ + + + - :CBC 773 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
+ - + - -
+ ]
699 [ + + ]: 696 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
648 rhaas@postgresql.org 700 [ + + ]: 690 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
1238 heikki.linnakangas@i 701 : 44 : ti_options |= TABLE_INSERT_SKIP_FSM;
702 : :
703 : : /*
704 : : * Optimize if new relation storage was created in this subxact or one of
705 : : * its committed children and we won't see those rows later as part of an
706 : : * earlier scan or command. The subxact test ensures that if this subxact
707 : : * aborts then the frozen rows won't be visible after xact cleanup. Note
708 : : * that the stronger test of exactly which subtransaction created it is
709 : : * crucial for correctness of this optimization. The test for an earlier
710 : : * scan or command tolerates false negatives. FREEZE causes other sessions
711 : : * to see rows they would not see under MVCC, and a false negative merely
712 : : * spreads that anomaly to the current session.
713 : : */
714 [ + + ]: 773 : if (cstate->opts.freeze)
715 : : {
716 : : /*
717 : : * We currently disallow COPY FREEZE on partitioned tables. The
718 : : * reason for this is that we've simply not yet opened the partitions
719 : : * to determine if the optimization can be applied to them. We could
720 : : * go and open them all here, but doing so may be quite a costly
721 : : * overhead for small copies. In any case, we may just end up routing
722 : : * tuples to a small number of partitions. It seems better just to
723 : : * raise an ERROR for partitioned tables.
724 : : */
725 [ + + ]: 33 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
726 : : {
727 [ + - ]: 3 : ereport(ERROR,
728 : : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
729 : : errmsg("cannot perform COPY FREEZE on a partitioned table")));
730 : : }
731 : :
732 : : /*
733 : : * Tolerate one registration for the benefit of FirstXactSnapshot.
734 : : * Scan-bearing queries generally create at least two registrations,
735 : : * though relying on that is fragile, as is ignoring ActiveSnapshot.
736 : : * Clear CatalogSnapshot to avoid counting its registration. We'll
737 : : * still detect ongoing catalog scans, each of which separately
738 : : * registers the snapshot it uses.
739 : : */
740 : 30 : InvalidateCatalogSnapshot();
741 [ + - - + ]: 30 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
1238 heikki.linnakangas@i 742 [ # # ]:UBC 0 : ereport(ERROR,
743 : : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
744 : : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
745 : :
1238 heikki.linnakangas@i 746 [ + - + + ]:CBC 60 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
648 rhaas@postgresql.org 747 : 30 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
1238 heikki.linnakangas@i 748 [ + - ]: 9 : ereport(ERROR,
749 : : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
750 : : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
751 : :
752 : 21 : ti_options |= TABLE_INSERT_FROZEN;
753 : : }
754 : :
755 : : /*
756 : : * We need a ResultRelInfo so we can use the regular executor's
757 : : * index-entry-making machinery. (There used to be a huge amount of code
758 : : * here that basically duplicated execUtils.c ...)
759 : : */
405 tgl@sss.pgh.pa.us 760 : 761 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
1238 heikki.linnakangas@i 761 : 761 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
762 : 761 : ExecInitResultRelation(estate, resultRelInfo, 1);
763 : :
764 : : /* Verify the named relation is a valid target for INSERT */
45 dean.a.rasheed@gmail 765 :GNC 761 : CheckValidResultRel(resultRelInfo, CMD_INSERT, NIL);
766 : :
1238 heikki.linnakangas@i 767 :CBC 760 : ExecOpenIndices(resultRelInfo, false);
768 : :
769 : : /*
770 : : * Set up a ModifyTableState so we can let FDW(s) init themselves for
771 : : * foreign-table result relation(s).
772 : : */
773 : 760 : mtstate = makeNode(ModifyTableState);
774 : 760 : mtstate->ps.plan = NULL;
775 : 760 : mtstate->ps.state = estate;
776 : 760 : mtstate->operation = CMD_INSERT;
1110 tgl@sss.pgh.pa.us 777 : 760 : mtstate->mt_nrels = 1;
1238 heikki.linnakangas@i 778 : 760 : mtstate->resultRelInfo = resultRelInfo;
1161 779 : 760 : mtstate->rootResultRelInfo = resultRelInfo;
780 : :
1238 781 [ + + ]: 760 : if (resultRelInfo->ri_FdwRoutine != NULL &&
782 [ + - ]: 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
783 : 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
784 : : resultRelInfo);
785 : :
786 : : /*
787 : : * Also, if the named relation is a foreign table, determine if the FDW
788 : : * supports batch insert and determine the batch size (a FDW may support
789 : : * batching, but it may be disabled for the server/table).
790 : : *
791 : : * If the FDW does not support batching, we set the batch size to 1.
792 : : */
549 efujita@postgresql.o 793 [ + + ]: 760 : if (resultRelInfo->ri_FdwRoutine != NULL &&
794 [ + - ]: 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
795 [ + - ]: 18 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
796 : 18 : resultRelInfo->ri_BatchSize =
797 : 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
798 : : else
799 : 742 : resultRelInfo->ri_BatchSize = 1;
800 : :
801 [ - + ]: 760 : Assert(resultRelInfo->ri_BatchSize >= 1);
802 : :
803 : : /* Prepare to catch AFTER triggers. */
1238 heikki.linnakangas@i 804 : 760 : AfterTriggerBeginQuery();
805 : :
806 : : /*
807 : : * If there are any triggers with transition tables on the named relation,
808 : : * we need to be prepared to capture transition tuples.
809 : : *
810 : : * Because partition tuple routing would like to know about whether
811 : : * transition capture is active, we also set it in mtstate, which is
812 : : * passed to ExecFindPartition() below.
813 : : */
814 : 760 : cstate->transition_capture = mtstate->mt_transition_capture =
815 : 760 : MakeTransitionCaptureState(cstate->rel->trigdesc,
816 : 760 : RelationGetRelid(cstate->rel),
817 : : CMD_INSERT);
818 : :
819 : : /*
820 : : * If the named relation is a partitioned table, initialize state for
821 : : * CopyFrom tuple routing.
822 : : */
823 [ + + ]: 760 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
1104 tgl@sss.pgh.pa.us 824 : 49 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
825 : :
1238 heikki.linnakangas@i 826 [ + + ]: 760 : if (cstate->whereClause)
827 : 9 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
828 : : &mtstate->ps);
829 : :
830 : : /*
831 : : * It's generally more efficient to prepare a bunch of tuples for
832 : : * insertion, and insert them in one
833 : : * table_multi_insert()/ExecForeignBatchInsert() call, than call
834 : : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
835 : : * However, there are a number of reasons why we might not be able to do
836 : : * this. These are explained below.
837 : : */
838 [ + + ]: 760 : if (resultRelInfo->ri_TrigDesc != NULL &&
839 [ + + ]: 87 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
840 [ + + ]: 41 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
841 : : {
842 : : /*
843 : : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
844 : : * triggers on the table. Such triggers might query the table we're
845 : : * inserting into and act differently if the tuples that have already
846 : : * been processed and prepared for insertion are not there.
847 : : */
848 : 52 : insertMethod = CIM_SINGLE;
849 : : }
549 efujita@postgresql.o 850 [ + + ]: 708 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
851 [ + + ]: 14 : resultRelInfo->ri_BatchSize == 1)
852 : : {
853 : : /*
854 : : * Can't support multi-inserts to a foreign table if the FDW does not
855 : : * support batching, or it's disabled for the server or foreign table.
856 : : */
857 : 9 : insertMethod = CIM_SINGLE;
858 : : }
1238 heikki.linnakangas@i 859 [ + + + + ]: 699 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
860 [ + + ]: 13 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
861 : : {
862 : : /*
863 : : * For partitioned tables we can't support multi-inserts when there
864 : : * are any statement level insert triggers. It might be possible to
865 : : * allow partitioned tables with such triggers in the future, but for
866 : : * now, CopyMultiInsertInfoFlush expects that any after row insert and
867 : : * statement level insert triggers are on the same relation.
868 : : */
869 : 9 : insertMethod = CIM_SINGLE;
870 : : }
549 efujita@postgresql.o 871 [ + + ]: 690 : else if (cstate->volatile_defexprs)
872 : : {
873 : : /*
874 : : * Can't support multi-inserts if there are any volatile default
875 : : * expressions in the table. Similarly to the trigger case above,
876 : : * such expressions may query the table we're inserting into.
877 : : *
878 : : * Note: It does not matter if any partitions have any volatile
879 : : * default expressions as we use the defaults from the target of the
880 : : * COPY command.
881 : : */
1238 heikki.linnakangas@i 882 : 3 : insertMethod = CIM_SINGLE;
883 : : }
884 [ - + ]: 687 : else if (contain_volatile_functions(cstate->whereClause))
885 : : {
886 : : /*
887 : : * Can't support multi-inserts if there are any volatile function
888 : : * expressions in WHERE clause. Similarly to the trigger case above,
889 : : * such expressions may query the table we're inserting into.
890 : : *
891 : : * Note: the whereClause was already preprocessed in DoCopy(), so it's
892 : : * okay to use contain_volatile_functions() directly.
893 : : */
1238 heikki.linnakangas@i 894 :UBC 0 : insertMethod = CIM_SINGLE;
895 : : }
896 : : else
897 : : {
898 : : /*
899 : : * For partitioned tables, we may still be able to perform bulk
900 : : * inserts. However, the possibility of this depends on which types
901 : : * of triggers exist on the partition. We must disable bulk inserts
902 : : * if the partition is a foreign table that can't use batching or it
903 : : * has any before row insert or insert instead triggers (same as we
904 : : * checked above for the parent table). Since the partition's
905 : : * resultRelInfos are initialized only when we actually need to insert
906 : : * the first tuple into them, we must have the intermediate insert
907 : : * method of CIM_MULTI_CONDITIONAL to flag that we must later
908 : : * determine if we can use bulk-inserts for the partition being
909 : : * inserted into.
910 : : */
1238 heikki.linnakangas@i 911 [ + + ]:CBC 687 : if (proute)
912 : 37 : insertMethod = CIM_MULTI_CONDITIONAL;
913 : : else
914 : 650 : insertMethod = CIM_MULTI;
915 : :
916 : 687 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
917 : : estate, mycid, ti_options);
918 : : }
919 : :
920 : : /*
921 : : * If not using batch mode (which allocates slots as needed) set up a
922 : : * tuple slot too. When inserting into a partitioned table, we also need
923 : : * one, even if we might batch insert, to read the tuple in the root
924 : : * partition's form.
925 : : */
926 [ + + + + ]: 760 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
927 : : {
928 : 110 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
929 : : &estate->es_tupleTable);
930 : 110 : bistate = GetBulkInsertState();
931 : : }
932 : :
933 [ + + ]: 847 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
934 [ + + ]: 87 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
935 : :
936 [ + + ]: 847 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
937 [ + + ]: 87 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
938 : :
939 : : /*
940 : : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
941 : : * should do this for COPY, since it's not really an "INSERT" statement as
942 : : * such. However, executing these triggers maintains consistency with the
943 : : * EACH ROW triggers that we already fire on COPY.
944 : : */
945 : 760 : ExecBSInsertTriggers(estate, resultRelInfo);
946 : :
947 [ + + ]: 760 : econtext = GetPerTupleExprContext(estate);
948 : :
949 : : /* Set up callback to identify error line number */
950 : 760 : errcallback.callback = CopyFromErrorCallback;
951 : 760 : errcallback.arg = (void *) cstate;
952 : 760 : errcallback.previous = error_context_stack;
953 : 760 : error_context_stack = &errcallback;
954 : :
955 : : for (;;)
956 : 723441 : {
957 : : TupleTableSlot *myslot;
958 : : bool skip_tuple;
959 : :
960 [ - + ]: 724201 : CHECK_FOR_INTERRUPTS();
961 : :
962 : : /*
963 : : * Reset the per-tuple exprcontext. We do this after every tuple, to
964 : : * clean-up after expression evaluations etc.
965 : : */
966 [ + - ]: 724201 : ResetPerTupleExprContext(estate);
967 : :
968 : : /* select slot to (initially) load row into */
969 [ + + + + ]: 724201 : if (insertMethod == CIM_SINGLE || proute)
970 : : {
971 : 137436 : myslot = singleslot;
972 [ - + ]: 137436 : Assert(myslot != NULL);
973 : : }
974 : : else
975 : : {
976 [ - + ]: 586765 : Assert(resultRelInfo == target_resultRelInfo);
977 [ - + ]: 586765 : Assert(insertMethod == CIM_MULTI);
978 : :
979 : 586765 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
980 : : resultRelInfo);
981 : : }
982 : :
983 : : /*
984 : : * Switch to per-tuple context before calling NextCopyFrom, which does
985 : : * evaluate default expressions etc. and requires per-tuple context.
986 : : */
987 [ + - ]: 724201 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
988 : :
989 : 724201 : ExecClearTuple(myslot);
990 : :
991 : : /* Directly store the values/nulls array in the slot */
992 [ + + ]: 724201 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
993 : 679 : break;
994 : :
86 akorotkov@postgresql 995 [ + + ]:GNC 723458 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
89 996 [ + + ]: 33 : cstate->escontext->error_occurred)
997 : : {
998 : : /*
999 : : * Soft error occured, skip this tuple and deal with error
1000 : : * information according to ON_ERROR.
1001 : : */
86 1002 [ + - ]: 21 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
1003 : :
1004 : : /*
1005 : : * Just make ErrorSaveContext ready for the next NextCopyFrom.
1006 : : * Since we don't set details_wanted and error_data is not to
1007 : : * be filled, just resetting error_occurred is enough.
1008 : : */
89 1009 : 21 : cstate->escontext->error_occurred = false;
1010 : :
1011 : : /* Report that this tuple was skipped by the ON_ERROR clause */
80 msawada@postgresql.o 1012 : 21 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
1013 : : ++skipped);
1014 : :
89 akorotkov@postgresql 1015 : 21 : continue;
1016 : : }
1017 : :
1238 heikki.linnakangas@i 1018 :CBC 723437 : ExecStoreVirtualTuple(myslot);
1019 : :
1020 : : /*
1021 : : * Constraints and where clause might reference the tableoid column,
1022 : : * so (re-)initialize tts_tableOid before evaluating them.
1023 : : */
1024 : 723437 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
1025 : :
1026 : : /* Triggers and stuff need to be invoked in query context. */
1027 : 723437 : MemoryContextSwitchTo(oldcontext);
1028 : :
1029 [ + + ]: 723437 : if (cstate->whereClause)
1030 : : {
1031 : 33 : econtext->ecxt_scantuple = myslot;
1032 : : /* Skip items that don't match COPY's WHERE clause */
1033 [ + + ]: 33 : if (!ExecQual(cstate->qualexpr, econtext))
1034 : : {
1035 : : /*
1036 : : * Report that this tuple was filtered out by the WHERE
1037 : : * clause.
1038 : : */
1132 michael@paquier.xyz 1039 : 18 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
1040 : : ++excluded);
1238 heikki.linnakangas@i 1041 : 18 : continue;
1042 : : }
1043 : : }
1044 : :
1045 : : /* Determine the partition to insert the tuple into */
1046 [ + + ]: 723419 : if (proute)
1047 : : {
1048 : : TupleConversionMap *map;
1049 : :
1050 : : /*
1051 : : * Attempt to find a partition suitable for this tuple.
1052 : : * ExecFindPartition() will raise an error if none can be found or
1053 : : * if the found partition is not suitable for INSERTs.
1054 : : */
1055 : 137195 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1056 : : proute, myslot, estate);
1057 : :
1058 [ + + ]: 137194 : if (prevResultRelInfo != resultRelInfo)
1059 : : {
1060 : : /* Determine which triggers exist on this partition */
1061 [ + + ]: 80783 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1062 [ + + ]: 27 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1063 : :
1064 [ + + ]: 80783 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1065 [ - + ]: 27 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1066 : :
1067 : : /*
1068 : : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1069 : : * OF triggers, or if the partition is a foreign table that
1070 : : * can't use batching.
1071 : : */
1072 : 131485 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1073 [ + + ]: 50729 : !has_before_insert_row_trig &&
1074 [ + + + - ]: 182202 : !has_instead_insert_row_trig &&
549 efujita@postgresql.o 1075 [ + + ]: 50717 : (resultRelInfo->ri_FdwRoutine == NULL ||
1076 [ + + ]: 6 : resultRelInfo->ri_BatchSize > 1);
1077 : :
1078 : : /* Set the multi-insert buffer to use for this partition. */
1238 heikki.linnakangas@i 1079 [ + + ]: 80756 : if (leafpart_use_multi_insert)
1080 : : {
1081 [ + + ]: 50715 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
1082 : 43 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
1083 : : resultRelInfo);
1084 : : }
1085 [ + + ]: 30041 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
1086 [ - + ]: 14 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1087 : : {
1088 : : /*
1089 : : * Flush pending inserts if this partition can't use
1090 : : * batching, so rows are visible to triggers etc.
1091 : : */
549 efujita@postgresql.o 1092 :UBC 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1093 : : resultRelInfo,
1094 : : &processed);
1095 : : }
1096 : :
1238 heikki.linnakangas@i 1097 [ + - ]:CBC 80756 : if (bistate != NULL)
1098 : 80756 : ReleaseBulkInsertStatePin(bistate);
1099 : 80756 : prevResultRelInfo = resultRelInfo;
1100 : : }
1101 : :
1102 : : /*
1103 : : * If we're capturing transition tuples, we might need to convert
1104 : : * from the partition rowtype to root rowtype. But if there are no
1105 : : * BEFORE triggers on the partition that could change the tuple,
1106 : : * we can just remember the original unconverted tuple to avoid a
1107 : : * needless round trip conversion.
1108 : : */
1109 [ + + ]: 137194 : if (cstate->transition_capture != NULL)
1110 : 27 : cstate->transition_capture->tcs_original_insert_tuple =
1111 [ + + ]: 27 : !has_before_insert_row_trig ? myslot : NULL;
1112 : :
1113 : : /*
1114 : : * We might need to convert from the root rowtype to the partition
1115 : : * rowtype.
1116 : : */
499 alvherre@alvh.no-ip. 1117 : 137194 : map = ExecGetRootToChildMap(resultRelInfo, estate);
1238 heikki.linnakangas@i 1118 [ + + + + ]: 137194 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1119 : : {
1120 : : /* non batch insert */
1121 [ + + ]: 30068 : if (map != NULL)
1122 : : {
1123 : : TupleTableSlot *new_slot;
1124 : :
1125 : 55 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
1126 : 55 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1127 : : }
1128 : : }
1129 : : else
1130 : : {
1131 : : /*
1132 : : * Prepare to queue up tuple for later batch insert into
1133 : : * current partition.
1134 : : */
1135 : : TupleTableSlot *batchslot;
1136 : :
1137 : : /* no other path available for partitioned table */
1138 [ - + ]: 107126 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1139 : :
1140 : 107126 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1141 : : resultRelInfo);
1142 : :
1143 [ + + ]: 107126 : if (map != NULL)
1144 : 6100 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1145 : : batchslot);
1146 : : else
1147 : : {
1148 : : /*
1149 : : * This looks more expensive than it is (Believe me, I
1150 : : * optimized it away. Twice.). The input is in virtual
1151 : : * form, and we'll materialize the slot below - for most
1152 : : * slot types the copy performs the work materialization
1153 : : * would later require anyway.
1154 : : */
1155 : 101026 : ExecCopySlot(batchslot, myslot);
1156 : 101026 : myslot = batchslot;
1157 : : }
1158 : : }
1159 : :
1160 : : /* ensure that triggers etc see the right relation */
1161 : 137194 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1162 : : }
1163 : :
1164 : 723418 : skip_tuple = false;
1165 : :
1166 : : /* BEFORE ROW INSERT Triggers */
1167 [ + + ]: 723418 : if (has_before_insert_row_trig)
1168 : : {
1169 [ + + ]: 137 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1170 : 8 : skip_tuple = true; /* "do nothing" */
1171 : : }
1172 : :
1173 [ + + ]: 723418 : if (!skip_tuple)
1174 : : {
1175 : : /*
1176 : : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1177 : : * tuple. Otherwise, proceed with inserting the tuple into the
1178 : : * table or foreign table.
1179 : : */
1180 [ + + ]: 723410 : if (has_instead_insert_row_trig)
1181 : : {
1182 : 6 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1183 : : }
1184 : : else
1185 : : {
1186 : : /* Compute stored generated columns */
1187 [ + + ]: 723404 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1188 [ + + ]: 330930 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
1189 : 18 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1190 : : CMD_INSERT);
1191 : :
1192 : : /*
1193 : : * If the target is a plain table, check the constraints of
1194 : : * the tuple.
1195 : : */
1196 [ + + ]: 723404 : if (resultRelInfo->ri_FdwRoutine == NULL &&
1197 [ + + ]: 723360 : resultRelInfo->ri_RelationDesc->rd_att->constr)
1198 : 330912 : ExecConstraints(resultRelInfo, myslot, estate);
1199 : :
1200 : : /*
1201 : : * Also check the tuple against the partition constraint, if
1202 : : * there is one; except that if we got here via tuple-routing,
1203 : : * we don't need to if there's no BR trigger defined on the
1204 : : * partition.
1205 : : */
1206 [ + + + + ]: 723389 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1207 [ + + ]: 137188 : (proute == NULL || has_before_insert_row_trig))
1208 : 1154 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
1209 : :
1210 : : /* Store the slot in the multi-insert buffer, when enabled. */
1211 [ + + + + ]: 723389 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1212 : : {
1213 : : /*
1214 : : * The slot previously might point into the per-tuple
1215 : : * context. For batching it needs to be longer lived.
1216 : : */
1217 : 693217 : ExecMaterializeSlot(myslot);
1218 : :
1219 : : /* Add this tuple to the tuple buffer */
1220 : 693217 : CopyMultiInsertInfoStore(&multiInsertInfo,
1221 : : resultRelInfo, myslot,
1222 : : cstate->line_buf.len,
1223 : : cstate->cur_lineno);
1224 : :
1225 : : /*
1226 : : * If enough inserts have queued up, then flush all
1227 : : * buffers out to their tables.
1228 : : */
1229 [ + + ]: 693217 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
549 efujita@postgresql.o 1230 : 650 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1231 : : resultRelInfo,
1232 : : &processed);
1233 : :
1234 : : /*
1235 : : * We delay updating the row counter and progress of the
1236 : : * COPY command until after writing the tuples stored in
1237 : : * the buffer out to the table, as in single insert mode.
1238 : : * See CopyMultiInsertBufferFlush().
1239 : : */
1240 : 693217 : continue; /* next tuple please */
1241 : : }
1242 : : else
1243 : : {
1238 heikki.linnakangas@i 1244 : 30172 : List *recheckIndexes = NIL;
1245 : :
1246 : : /* OK, store the tuple */
1247 [ + + ]: 30172 : if (resultRelInfo->ri_FdwRoutine != NULL)
1248 : : {
1249 : 25 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1250 : : resultRelInfo,
1251 : : myslot,
1252 : : NULL);
1253 : :
1254 [ + + ]: 24 : if (myslot == NULL) /* "do nothing" */
1255 : 2 : continue; /* next tuple please */
1256 : :
1257 : : /*
1258 : : * AFTER ROW Triggers might reference the tableoid
1259 : : * column, so (re-)initialize tts_tableOid before
1260 : : * evaluating them.
1261 : : */
1262 : 22 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1263 : : }
1264 : : else
1265 : : {
1266 : : /* OK, store the tuple and create index entries for it */
1267 : 30147 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1268 : : myslot, mycid, ti_options, bistate);
1269 : :
3 akorotkov@postgresql 1270 [ - + ]: 30147 : if (resultRelInfo->ri_NumIndices > 0)
1238 heikki.linnakangas@i 1271 :UBC 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1272 : : myslot,
1273 : : estate,
1274 : : false,
1275 : : false,
1276 : : NULL,
1277 : : NIL,
1278 : : false);
1279 : : }
1280 : :
1281 : : /* AFTER ROW INSERT Triggers */
1238 heikki.linnakangas@i 1282 :CBC 30169 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
1283 : : recheckIndexes, cstate->transition_capture);
1284 : :
1285 : 30169 : list_free(recheckIndexes);
1286 : : }
1287 : : }
1288 : :
1289 : : /*
1290 : : * We count only tuples not suppressed by a BEFORE INSERT trigger
1291 : : * or FDW; this is the same definition used by nodeModifyTable.c
1292 : : * for counting tuples inserted by an INSERT command. Update
1293 : : * progress of the COPY command as well.
1294 : : */
1132 michael@paquier.xyz 1295 : 30175 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1296 : : ++processed);
1297 : : }
1298 : : }
1299 : :
1300 : : /* Flush any remaining buffered tuples */
1238 heikki.linnakangas@i 1301 [ + + ]: 679 : if (insertMethod != CIM_SINGLE)
1302 : : {
1303 [ + + ]: 619 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
549 efujita@postgresql.o 1304 : 503 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1305 : : }
1306 : :
1307 : : /* Done, clean up */
1238 heikki.linnakangas@i 1308 : 672 : error_context_stack = errcallback.previous;
1309 : :
86 akorotkov@postgresql 1310 [ + + ]:GNC 672 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
89 1311 [ + - ]: 6 : cstate->num_errors > 0)
1312 [ + - ]: 6 : ereport(NOTICE,
1313 : : errmsg_plural("%llu row was skipped due to data type incompatibility",
1314 : : "%llu rows were skipped due to data type incompatibility",
1315 : : (unsigned long long) cstate->num_errors,
1316 : : (unsigned long long) cstate->num_errors));
1317 : :
1238 heikki.linnakangas@i 1318 [ + + ]:CBC 672 : if (bistate != NULL)
1319 : 96 : FreeBulkInsertState(bistate);
1320 : :
1321 : 672 : MemoryContextSwitchTo(oldcontext);
1322 : :
1323 : : /* Execute AFTER STATEMENT insertion triggers */
1324 : 672 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1325 : :
1326 : : /* Handle queued AFTER triggers */
1327 : 672 : AfterTriggerEndQuery(estate);
1328 : :
1329 : 672 : ExecResetTupleTable(estate->es_tupleTable, false);
1330 : :
1331 : : /* Allow the FDW to shut down */
1332 [ + + ]: 672 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
1333 [ + - ]: 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
1334 : 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
1335 : : target_resultRelInfo);
1336 : :
1337 : : /* Tear down the multi-insert buffer data */
1338 [ + + ]: 672 : if (insertMethod != CIM_SINGLE)
1339 : 612 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
1340 : :
1341 : : /* Close all the partitioned tables, leaf partitions, and their indices */
1342 [ + + ]: 672 : if (proute)
1343 : 48 : ExecCleanupTupleRouting(mtstate, proute);
1344 : :
1345 : : /* Close the result relations, including any trigger target relations */
1346 : 672 : ExecCloseResultRelations(estate);
1347 : 672 : ExecCloseRangeTableRelations(estate);
1348 : :
1349 : 672 : FreeExecutorState(estate);
1350 : :
1351 : 672 : return processed;
1352 : : }
1353 : :
1354 : : /*
1355 : : * Setup to read tuples from a file for COPY FROM.
1356 : : *
1357 : : * 'rel': Used as a template for the tuples
1358 : : * 'whereClause': WHERE clause from the COPY FROM command
1359 : : * 'filename': Name of server-local file to read, NULL for STDIN
1360 : : * 'is_program': true if 'filename' is program to execute
1361 : : * 'data_source_cb': callback that provides the input data
1362 : : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1363 : : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1364 : : *
1365 : : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1366 : : */
1367 : : CopyFromState
1368 : 898 : BeginCopyFrom(ParseState *pstate,
1369 : : Relation rel,
1370 : : Node *whereClause,
1371 : : const char *filename,
1372 : : bool is_program,
1373 : : copy_data_source_cb data_source_cb,
1374 : : List *attnamelist,
1375 : : List *options)
1376 : : {
1377 : : CopyFromState cstate;
1378 : 898 : bool pipe = (filename == NULL);
1379 : : TupleDesc tupDesc;
1380 : : AttrNumber num_phys_attrs,
1381 : : num_defaults;
1382 : : FmgrInfo *in_functions;
1383 : : Oid *typioparams;
1384 : : Oid in_func_oid;
1385 : : int *defmap;
1386 : : ExprState **defexprs;
1387 : : MemoryContext oldcontext;
1388 : : bool volatile_defexprs;
1132 michael@paquier.xyz 1389 : 898 : const int progress_cols[] = {
1390 : : PROGRESS_COPY_COMMAND,
1391 : : PROGRESS_COPY_TYPE,
1392 : : PROGRESS_COPY_BYTES_TOTAL
1393 : : };
1394 : 898 : int64 progress_vals[] = {
1395 : : PROGRESS_COPY_COMMAND_FROM,
1396 : : 0,
1397 : : 0
1398 : : };
1399 : :
1400 : : /* Allocate workspace and zero all fields */
1238 heikki.linnakangas@i 1401 : 898 : cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1402 : :
1403 : : /*
1404 : : * We allocate everything used by a cstate in a new memory context. This
1405 : : * avoids memory leaks during repeated use of COPY in a query.
1406 : : */
1407 : 898 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1408 : : "COPY",
1409 : : ALLOCSET_DEFAULT_SIZES);
1410 : :
1411 : 898 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1412 : :
1413 : : /* Extract options from the statement node tree */
1109 1414 : 898 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1415 : :
1416 : : /* Process the target relation */
1238 1417 : 819 : cstate->rel = rel;
1418 : :
1419 : 819 : tupDesc = RelationGetDescr(cstate->rel);
1420 : :
1421 : : /* process common options or initialization */
1422 : :
1423 : : /* Generate or convert list of attributes to process */
1424 : 819 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1425 : :
1426 : 819 : num_phys_attrs = tupDesc->natts;
1427 : :
1428 : : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1429 : 819 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
197 andrew@dunslane.net 1430 [ + + ]:GNC 819 : if (cstate->opts.force_notnull_all)
1431 [ + - - + : 6 : MemSet(cstate->opts.force_notnull_flags, true, num_phys_attrs * sizeof(bool));
- - - - -
- ]
1432 [ + + ]: 813 : else if (cstate->opts.force_notnull)
1433 : : {
1434 : : List *attnums;
1435 : : ListCell *cur;
1436 : :
1238 heikki.linnakangas@i 1437 :CBC 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1438 : :
1439 [ + - + + : 26 : foreach(cur, attnums)
+ + ]
1440 : : {
1441 : 16 : int attnum = lfirst_int(cur);
1442 : 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1443 : :
1444 [ + + ]: 16 : if (!list_member_int(cstate->attnumlist, attnum))
1445 [ + - ]: 3 : ereport(ERROR,
1446 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1447 : : errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1448 : : NameStr(attr->attname))));
1449 : 13 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1450 : : }
1451 : : }
1452 : :
1453 : : /* Set up soft error handler for ON_ERROR */
86 akorotkov@postgresql 1454 [ + + ]:GNC 816 : if (cstate->opts.on_error != COPY_ON_ERROR_STOP)
1455 : : {
89 1456 : 15 : cstate->escontext = makeNode(ErrorSaveContext);
1457 : 15 : cstate->escontext->type = T_ErrorSaveContext;
1458 : 15 : cstate->escontext->error_occurred = false;
1459 : :
1460 : : /*
1461 : : * Currently we only support COPY_ON_ERROR_IGNORE. We'll add other
1462 : : * options later
1463 : : */
86 1464 [ + - ]: 15 : if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
89 1465 : 15 : cstate->escontext->details_wanted = false;
1466 : : }
1467 : : else
1468 : 801 : cstate->escontext = NULL;
1469 : :
1470 : : /* Convert FORCE_NULL name list to per-column flags, check validity */
1238 heikki.linnakangas@i 1471 :CBC 816 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
197 andrew@dunslane.net 1472 [ + + ]:GNC 816 : if (cstate->opts.force_null_all)
1473 [ + - - + : 6 : MemSet(cstate->opts.force_null_flags, true, num_phys_attrs * sizeof(bool));
- - - - -
- ]
1474 [ + + ]: 810 : else if (cstate->opts.force_null)
1475 : : {
1476 : : List *attnums;
1477 : : ListCell *cur;
1478 : :
1238 heikki.linnakangas@i 1479 :CBC 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1480 : :
1481 [ + - + + : 26 : foreach(cur, attnums)
+ + ]
1482 : : {
1483 : 16 : int attnum = lfirst_int(cur);
1484 : 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1485 : :
1486 [ + + ]: 16 : if (!list_member_int(cstate->attnumlist, attnum))
1487 [ + - ]: 3 : ereport(ERROR,
1488 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1489 : : errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1490 : : NameStr(attr->attname))));
1491 : 13 : cstate->opts.force_null_flags[attnum - 1] = true;
1492 : : }
1493 : : }
1494 : :
1495 : : /* Convert convert_selectively name list to per-column flags */
1496 [ + + ]: 813 : if (cstate->opts.convert_selectively)
1497 : : {
1498 : : List *attnums;
1499 : : ListCell *cur;
1500 : :
1501 : 2 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1502 : :
1503 : 2 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
1504 : :
1505 [ + - + + : 4 : foreach(cur, attnums)
+ + ]
1506 : : {
1507 : 2 : int attnum = lfirst_int(cur);
1508 : 2 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1509 : :
1510 [ - + ]: 2 : if (!list_member_int(cstate->attnumlist, attnum))
1238 heikki.linnakangas@i 1511 [ # # ]:UBC 0 : ereport(ERROR,
1512 : : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1513 : : errmsg_internal("selected column \"%s\" not referenced by COPY",
1514 : : NameStr(attr->attname))));
1238 heikki.linnakangas@i 1515 :CBC 2 : cstate->convert_select_flags[attnum - 1] = true;
1516 : : }
1517 : : }
1518 : :
1519 : : /* Use client encoding when ENCODING option is not specified. */
1520 [ + + ]: 813 : if (cstate->opts.file_encoding < 0)
1521 : 810 : cstate->file_encoding = pg_get_client_encoding();
1522 : : else
1523 : 3 : cstate->file_encoding = cstate->opts.file_encoding;
1524 : :
1525 : : /*
1526 : : * Look up encoding conversion function.
1527 : : */
1109 1528 [ + + ]: 813 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1529 [ - + - - ]: 3 : cstate->file_encoding == PG_SQL_ASCII ||
1109 heikki.linnakangas@i 1530 :UBC 0 : GetDatabaseEncoding() == PG_SQL_ASCII)
1531 : : {
1109 heikki.linnakangas@i 1532 :CBC 813 : cstate->need_transcoding = false;
1533 : : }
1534 : : else
1535 : : {
1109 heikki.linnakangas@i 1536 :UBC 0 : cstate->need_transcoding = true;
1537 : 0 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1538 : : GetDatabaseEncoding());
196 tgl@sss.pgh.pa.us 1539 [ # # ]: 0 : if (!OidIsValid(cstate->conversion_proc))
1540 [ # # ]: 0 : ereport(ERROR,
1541 : : (errcode(ERRCODE_UNDEFINED_FUNCTION),
1542 : : errmsg("default conversion function for encoding \"%s\" to \"%s\" does not exist",
1543 : : pg_encoding_to_char(cstate->file_encoding),
1544 : : pg_encoding_to_char(GetDatabaseEncoding()))));
1545 : : }
1546 : :
1238 heikki.linnakangas@i 1547 :CBC 813 : cstate->copy_src = COPY_FILE; /* default */
1548 : :
1549 : 813 : cstate->whereClause = whereClause;
1550 : :
1551 : : /* Initialize state variables */
1552 : 813 : cstate->eol_type = EOL_UNKNOWN;
1553 : 813 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1554 : 813 : cstate->cur_lineno = 0;
1555 : 813 : cstate->cur_attname = NULL;
1556 : 813 : cstate->cur_attval = NULL;
549 efujita@postgresql.o 1557 : 813 : cstate->relname_only = false;
1558 : :
1559 : : /*
1560 : : * Allocate buffers for the input pipeline.
1561 : : *
1562 : : * attribute_buf and raw_buf are used in both text and binary modes, but
1563 : : * input_buf and line_buf only in text mode.
1564 : : */
1109 heikki.linnakangas@i 1565 : 813 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
1238 1566 : 813 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
1109 1567 : 813 : cstate->raw_reached_eof = false;
1568 : :
1238 1569 [ + + ]: 813 : if (!cstate->opts.binary)
1570 : : {
1571 : : /*
1572 : : * If encoding conversion is needed, we need another buffer to hold
1573 : : * the converted input data. Otherwise, we can just point input_buf
1574 : : * to the same buffer as raw_buf.
1575 : : */
1109 1576 [ - + ]: 805 : if (cstate->need_transcoding)
1577 : : {
1109 heikki.linnakangas@i 1578 :UBC 0 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
1579 : 0 : cstate->input_buf_index = cstate->input_buf_len = 0;
1580 : : }
1581 : : else
1109 heikki.linnakangas@i 1582 :CBC 805 : cstate->input_buf = cstate->raw_buf;
1583 : 805 : cstate->input_reached_eof = false;
1584 : :
1238 1585 : 805 : initStringInfo(&cstate->line_buf);
1586 : : }
1587 : :
1109 1588 : 813 : initStringInfo(&cstate->attribute_buf);
1589 : :
1590 : : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
1238 1591 [ + + ]: 813 : if (pstate)
1592 : : {
1593 : 783 : cstate->range_table = pstate->p_rtable;
495 alvherre@alvh.no-ip. 1594 : 783 : cstate->rteperminfos = pstate->p_rteperminfos;
1595 : : }
1596 : :
1238 heikki.linnakangas@i 1597 : 813 : num_defaults = 0;
1598 : 813 : volatile_defexprs = false;
1599 : :
1600 : : /*
1601 : : * Pick up the required catalog information for each attribute in the
1602 : : * relation, including the input function, the element type (to pass to
1603 : : * the input function), and info about defaults and constraints. (Which
1604 : : * input function we use depends on text/binary format choice.)
1605 : : */
1606 : 813 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1607 : 813 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
1608 : 813 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
1609 : 813 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
1610 : :
599 drowley@postgresql.o 1611 [ + + ]: 3155 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
1612 : : {
1238 heikki.linnakangas@i 1613 : 2349 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1614 : :
1615 : : /* We don't need info for dropped attributes */
1616 [ + + ]: 2349 : if (att->attisdropped)
1617 : 62 : continue;
1618 : :
1619 : : /* Fetch the input function and typioparam info */
1620 [ + + ]: 2287 : if (cstate->opts.binary)
1621 : 31 : getTypeBinaryInputInfo(att->atttypid,
1622 : 31 : &in_func_oid, &typioparams[attnum - 1]);
1623 : : else
1624 : 2256 : getTypeInputInfo(att->atttypid,
1625 : 2256 : &in_func_oid, &typioparams[attnum - 1]);
1626 : 2286 : fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1627 : :
1628 : : /* Get default info if available */
398 andrew@dunslane.net 1629 : 2286 : defexprs[attnum - 1] = NULL;
1630 : :
1631 : : /*
1632 : : * We only need the default values for columns that do not appear in
1633 : : * the column list, unless the DEFAULT option was given. We never need
1634 : : * default values for generated columns.
1635 : : */
196 1636 [ + + ]: 2286 : if ((cstate->opts.default_print != NULL ||
1637 [ + + ]: 2223 : !list_member_int(cstate->attnumlist, attnum)) &&
1638 [ + + ]: 299 : !att->attgenerated)
1639 : : {
1238 heikki.linnakangas@i 1640 : 289 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1641 : : attnum);
1642 : :
1643 [ + + ]: 289 : if (defexpr != NULL)
1644 : : {
1645 : : /* Run the expression through planner */
1646 : 132 : defexpr = expression_planner(defexpr);
1647 : :
1648 : : /* Initialize executable expression in copycontext */
398 andrew@dunslane.net 1649 : 126 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1650 : :
1651 : : /* if NOT copied from input */
1652 : : /* use default value if one exists */
1653 [ + + ]: 126 : if (!list_member_int(cstate->attnumlist, attnum))
1654 : : {
1655 : 86 : defmap[num_defaults] = attnum - 1;
1656 : 86 : num_defaults++;
1657 : : }
1658 : :
1659 : : /*
1660 : : * If a default expression looks at the table being loaded,
1661 : : * then it could give the wrong answer when using
1662 : : * multi-insert. Since database access can be dynamic this is
1663 : : * hard to test for exactly, so we use the much wider test of
1664 : : * whether the default expression is volatile. We allow for
1665 : : * the special case of when the default expression is the
1666 : : * nextval() of a sequence which in this specific case is
1667 : : * known to be safe for use with the multi-insert
1668 : : * optimization. Hence we use this special case function
1669 : : * checker rather than the standard check for
1670 : : * contain_volatile_functions(). Note also that we already
1671 : : * ran the expression through expression_planner().
1672 : : */
1238 heikki.linnakangas@i 1673 [ + - ]: 126 : if (!volatile_defexprs)
1674 : 126 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1675 : : }
1676 : : }
1677 : : }
1678 : :
262 drowley@postgresql.o 1679 : 806 : cstate->defaults = (bool *) palloc0(tupDesc->natts * sizeof(bool));
1680 : :
1681 : : /* initialize progress */
1194 tomas.vondra@postgre 1682 :UBC 0 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
1194 tomas.vondra@postgre 1683 [ + - ]:CBC 806 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1684 : 806 : cstate->bytes_processed = 0;
1685 : :
1686 : : /* We keep those variables in cstate. */
1238 heikki.linnakangas@i 1687 : 806 : cstate->in_functions = in_functions;
1688 : 806 : cstate->typioparams = typioparams;
1689 : 806 : cstate->defmap = defmap;
1690 : 806 : cstate->defexprs = defexprs;
1691 : 806 : cstate->volatile_defexprs = volatile_defexprs;
1692 : 806 : cstate->num_defaults = num_defaults;
1693 : 806 : cstate->is_program = is_program;
1694 : :
1695 [ + + ]: 806 : if (data_source_cb)
1696 : : {
1132 michael@paquier.xyz 1697 : 167 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
1238 heikki.linnakangas@i 1698 : 167 : cstate->copy_src = COPY_CALLBACK;
1699 : 167 : cstate->data_source_cb = data_source_cb;
1700 : : }
1701 [ + + ]: 639 : else if (pipe)
1702 : : {
1132 michael@paquier.xyz 1703 : 446 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
1238 heikki.linnakangas@i 1704 [ - + ]: 446 : Assert(!is_program); /* the grammar does not allow this */
1705 [ + - ]: 446 : if (whereToSendOutput == DestRemote)
1706 : 446 : ReceiveCopyBegin(cstate);
1707 : : else
1238 heikki.linnakangas@i 1708 :UBC 0 : cstate->copy_file = stdin;
1709 : : }
1710 : : else
1711 : : {
1238 heikki.linnakangas@i 1712 :CBC 193 : cstate->filename = pstrdup(filename);
1713 : :
1714 [ - + ]: 193 : if (cstate->is_program)
1715 : : {
1132 michael@paquier.xyz 1716 :UBC 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
1238 heikki.linnakangas@i 1717 : 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1718 [ # # ]: 0 : if (cstate->copy_file == NULL)
1719 [ # # ]: 0 : ereport(ERROR,
1720 : : (errcode_for_file_access(),
1721 : : errmsg("could not execute command \"%s\": %m",
1722 : : cstate->filename)));
1723 : : }
1724 : : else
1725 : : {
1726 : : struct stat st;
1727 : :
1132 michael@paquier.xyz 1728 :CBC 193 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
1238 heikki.linnakangas@i 1729 : 193 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
1730 [ - + ]: 193 : if (cstate->copy_file == NULL)
1731 : : {
1732 : : /* copy errno because ereport subfunctions might change it */
1238 heikki.linnakangas@i 1733 :UBC 0 : int save_errno = errno;
1734 : :
1735 [ # # # # : 0 : ereport(ERROR,
# # ]
1736 : : (errcode_for_file_access(),
1737 : : errmsg("could not open file \"%s\" for reading: %m",
1738 : : cstate->filename),
1739 : : (save_errno == ENOENT || save_errno == EACCES) ?
1740 : : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1741 : : "You may want a client-side facility such as psql's \\copy.") : 0));
1742 : : }
1743 : :
1238 heikki.linnakangas@i 1744 [ - + ]:CBC 193 : if (fstat(fileno(cstate->copy_file), &st))
1238 heikki.linnakangas@i 1745 [ # # ]:UBC 0 : ereport(ERROR,
1746 : : (errcode_for_file_access(),
1747 : : errmsg("could not stat file \"%s\": %m",
1748 : : cstate->filename)));
1749 : :
1238 heikki.linnakangas@i 1750 [ - + ]:CBC 193 : if (S_ISDIR(st.st_mode))
1238 heikki.linnakangas@i 1751 [ # # ]:UBC 0 : ereport(ERROR,
1752 : : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1753 : : errmsg("\"%s\" is a directory", cstate->filename)));
1754 : :
1132 michael@paquier.xyz 1755 :CBC 193 : progress_vals[2] = st.st_size;
1756 : : }
1757 : : }
1758 : :
1759 : 806 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
1760 : :
1238 heikki.linnakangas@i 1761 [ + + ]: 806 : if (cstate->opts.binary)
1762 : : {
1763 : : /* Read and verify binary header */
1764 : 7 : ReceiveCopyBinaryHeader(cstate);
1765 : : }
1766 : :
1767 : : /* create workspace for CopyReadAttributes results */
1768 [ + + ]: 806 : if (!cstate->opts.binary)
1769 : : {
1770 : 799 : AttrNumber attr_count = list_length(cstate->attnumlist);
1771 : :
1772 : 799 : cstate->max_fields = attr_count;
1773 : 799 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
1774 : : }
1775 : :
1776 : 806 : MemoryContextSwitchTo(oldcontext);
1777 : :
1778 : 806 : return cstate;
1779 : : }
1780 : :
1781 : : /*
1782 : : * Clean up storage and release resources for COPY FROM.
1783 : : */
1784 : : void
1785 : 540 : EndCopyFrom(CopyFromState cstate)
1786 : : {
1787 : : /* No COPY FROM related resources except memory. */
1788 [ - + ]: 540 : if (cstate->is_program)
1789 : : {
1238 heikki.linnakangas@i 1790 :UBC 0 : ClosePipeFromProgram(cstate);
1791 : : }
1792 : : else
1793 : : {
1238 heikki.linnakangas@i 1794 [ + + - + ]:CBC 540 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
1238 heikki.linnakangas@i 1795 [ # # ]:UBC 0 : ereport(ERROR,
1796 : : (errcode_for_file_access(),
1797 : : errmsg("could not close file \"%s\": %m",
1798 : : cstate->filename)));
1799 : : }
1800 : :
1194 tomas.vondra@postgre 1801 :CBC 540 : pgstat_progress_end_command();
1802 : :
1238 heikki.linnakangas@i 1803 : 540 : MemoryContextDelete(cstate->copycontext);
1804 : 540 : pfree(cstate);
1805 : 540 : }
1806 : :
1807 : : /*
1808 : : * Closes the pipe from an external program, checking the pclose() return code.
1809 : : */
1810 : : static void
1238 heikki.linnakangas@i 1811 :UBC 0 : ClosePipeFromProgram(CopyFromState cstate)
1812 : : {
1813 : : int pclose_rc;
1814 : :
1815 [ # # ]: 0 : Assert(cstate->is_program);
1816 : :
1817 : 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1818 [ # # ]: 0 : if (pclose_rc == -1)
1819 [ # # ]: 0 : ereport(ERROR,
1820 : : (errcode_for_file_access(),
1821 : : errmsg("could not close pipe to external command: %m")));
1822 [ # # ]: 0 : else if (pclose_rc != 0)
1823 : : {
1824 : : /*
1825 : : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1826 : : * expectable for the called program to fail with SIGPIPE, and we
1827 : : * should not report that as an error. Otherwise, SIGPIPE indicates a
1828 : : * problem.
1829 : : */
1109 1830 [ # # # # ]: 0 : if (!cstate->raw_reached_eof &&
1238 1831 : 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1832 : 0 : return;
1833 : :
1834 [ # # ]: 0 : ereport(ERROR,
1835 : : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
1836 : : errmsg("program \"%s\" failed",
1837 : : cstate->filename),
1838 : : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1839 : : }
1840 : : }
|