Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * copyfrom.c
4 : * COPY <table> FROM file/program/client
5 : *
6 : * This file contains routines needed to efficiently load tuples into a
7 : * table. That includes looking up the correct partition, firing triggers,
8 : * calling the table AM function to insert the data, and updating indexes.
9 : * Reading data from the input file or client and parsing it into Datums
10 : * is handled in copyfromparse.c.
11 : *
12 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
13 : * Portions Copyright (c) 1994, Regents of the University of California
14 : *
15 : *
16 : * IDENTIFICATION
17 : * src/backend/commands/copyfrom.c
18 : *
19 : *-------------------------------------------------------------------------
20 : */
21 : #include "postgres.h"
22 :
23 : #include <ctype.h>
24 : #include <unistd.h>
25 : #include <sys/stat.h>
26 :
27 : #include "access/heapam.h"
28 : #include "access/htup_details.h"
29 : #include "access/tableam.h"
30 : #include "access/xact.h"
31 : #include "access/xlog.h"
32 : #include "catalog/namespace.h"
33 : #include "commands/copy.h"
34 : #include "commands/copyfrom_internal.h"
35 : #include "commands/progress.h"
36 : #include "commands/trigger.h"
37 : #include "executor/execPartition.h"
38 : #include "executor/executor.h"
39 : #include "executor/nodeModifyTable.h"
40 : #include "executor/tuptable.h"
41 : #include "foreign/fdwapi.h"
42 : #include "libpq/libpq.h"
43 : #include "libpq/pqformat.h"
44 : #include "miscadmin.h"
45 : #include "optimizer/optimizer.h"
46 : #include "pgstat.h"
47 : #include "rewrite/rewriteHandler.h"
48 : #include "storage/fd.h"
49 : #include "tcop/tcopprot.h"
50 : #include "utils/lsyscache.h"
51 : #include "utils/memutils.h"
52 : #include "utils/portal.h"
53 : #include "utils/rel.h"
54 : #include "utils/snapmgr.h"
55 :
56 : /*
57 : * No more than this many tuples per CopyMultiInsertBuffer
58 : *
59 : * Caution: Don't make this too big, as we could end up with this many
60 : * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
61 : * multiInsertBuffers list. Increasing this can cause quadratic growth in
62 : * memory requirements during copies into partitioned tables with a large
63 : * number of partitions.
64 : */
65 : #define MAX_BUFFERED_TUPLES 1000
66 :
67 : /*
68 : * Flush buffers if there are >= this many bytes, as counted by the input
69 : * size, of tuples stored.
70 : */
71 : #define MAX_BUFFERED_BYTES 65535
72 :
73 : /* Trim the list of buffers back down to this number after flushing */
74 : #define MAX_PARTITION_BUFFERS 32
75 :
76 : /* Stores multi-insert data related to a single relation in CopyFrom. */
77 : typedef struct CopyMultiInsertBuffer
78 : {
79 : TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
80 : ResultRelInfo *resultRelInfo; /* ResultRelInfo for 'relid' */
81 : BulkInsertState bistate; /* BulkInsertState for this rel if plain
82 : * table; NULL if foreign table */
83 : int nused; /* number of 'slots' containing tuples */
84 : uint64 linenos[MAX_BUFFERED_TUPLES]; /* Line # of tuple in copy
85 : * stream */
86 : } CopyMultiInsertBuffer;
87 :
88 : /*
89 : * Stores one or many CopyMultiInsertBuffers and details about the size and
90 : * number of tuples which are stored in them. This allows multiple buffers to
91 : * exist at once when COPYing into a partitioned table.
92 : */
93 : typedef struct CopyMultiInsertInfo
94 : {
95 : List *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
96 : int bufferedTuples; /* number of tuples buffered over all buffers */
97 : int bufferedBytes; /* number of bytes from all buffered tuples */
98 : CopyFromState cstate; /* Copy state for this CopyMultiInsertInfo */
99 : EState *estate; /* Executor state used for COPY */
100 : CommandId mycid; /* Command Id used for COPY */
101 : int ti_options; /* table insert options */
102 : } CopyMultiInsertInfo;
103 :
104 :
105 : /* non-export function prototypes */
106 : static char *limit_printout_length(const char *str);
107 :
108 : static void ClosePipeFromProgram(CopyFromState cstate);
109 :
110 : /*
111 : * error context callback for COPY FROM
112 : *
113 : * The argument for the error context must be CopyFromState.
114 : */
115 : void
867 heikki.linnakangas 116 GIC 106 : CopyFromErrorCallback(void *arg)
867 heikki.linnakangas 117 ECB : {
738 heikki.linnakangas 118 GIC 106 : CopyFromState cstate = (CopyFromState) arg;
867 heikki.linnakangas 119 ECB :
178 efujita 120 GNC 106 : if (cstate->relname_only)
121 : {
122 1 : errcontext("COPY %s",
123 : cstate->cur_relname);
124 1 : return;
125 : }
867 heikki.linnakangas 126 GIC 105 : if (cstate->opts.binary)
867 heikki.linnakangas 127 ECB : {
128 : /* can't usefully display the data */
867 heikki.linnakangas 129 CBC 1 : if (cstate->cur_attname)
384 tgl 130 GIC 1 : errcontext("COPY %s, line %llu, column %s",
384 tgl 131 ECB : cstate->cur_relname,
384 tgl 132 GIC 1 : (unsigned long long) cstate->cur_lineno,
867 heikki.linnakangas 133 ECB : cstate->cur_attname);
134 : else
384 tgl 135 UIC 0 : errcontext("COPY %s, line %llu",
384 tgl 136 ECB : cstate->cur_relname,
384 tgl 137 LBC 0 : (unsigned long long) cstate->cur_lineno);
138 : }
867 heikki.linnakangas 139 ECB : else
140 : {
867 heikki.linnakangas 141 GIC 104 : if (cstate->cur_attname && cstate->cur_attval)
867 heikki.linnakangas 142 GBC 10 : {
143 : /* error is relevant to a particular column */
867 heikki.linnakangas 144 EUB : char *attval;
145 :
867 heikki.linnakangas 146 GIC 10 : attval = limit_printout_length(cstate->cur_attval);
384 tgl 147 10 : errcontext("COPY %s, line %llu, column %s: \"%s\"",
384 tgl 148 ECB : cstate->cur_relname,
384 tgl 149 CBC 10 : (unsigned long long) cstate->cur_lineno,
150 : cstate->cur_attname,
151 : attval);
867 heikki.linnakangas 152 GIC 10 : pfree(attval);
867 heikki.linnakangas 153 ECB : }
867 heikki.linnakangas 154 CBC 94 : else if (cstate->cur_attname)
155 : {
867 heikki.linnakangas 156 ECB : /* error is relevant to a particular column, value is NULL */
384 tgl 157 GIC 3 : errcontext("COPY %s, line %llu, column %s: null input",
158 : cstate->cur_relname,
384 tgl 159 CBC 3 : (unsigned long long) cstate->cur_lineno,
160 : cstate->cur_attname);
867 heikki.linnakangas 161 ECB : }
162 : else
163 : {
164 : /*
165 : * Error is relevant to a particular line.
166 : *
167 : * If line_buf still contains the correct line, print it.
168 : */
738 heikki.linnakangas 169 GIC 91 : if (cstate->line_buf_valid)
170 : {
171 : char *lineval;
172 :
867 173 86 : lineval = limit_printout_length(cstate->line_buf.data);
384 tgl 174 86 : errcontext("COPY %s, line %llu: \"%s\"",
175 : cstate->cur_relname,
384 tgl 176 CBC 86 : (unsigned long long) cstate->cur_lineno, lineval);
867 heikki.linnakangas 177 GIC 86 : pfree(lineval);
178 : }
179 : else
867 heikki.linnakangas 180 ECB : {
384 tgl 181 CBC 5 : errcontext("COPY %s, line %llu",
182 : cstate->cur_relname,
183 5 : (unsigned long long) cstate->cur_lineno);
867 heikki.linnakangas 184 ECB : }
185 : }
186 : }
187 : }
188 :
189 : /*
190 : * Make sure we don't print an unreasonable amount of COPY data in a message.
191 : *
192 : * Returns a pstrdup'd copy of the input.
193 : */
194 : static char *
867 heikki.linnakangas 195 GIC 96 : limit_printout_length(const char *str)
196 : {
197 : #define MAX_COPY_DATA_DISPLAY 100
198 :
199 96 : int slen = strlen(str);
200 : int len;
201 : char *res;
867 heikki.linnakangas 202 ECB :
203 : /* Fast path if definitely okay */
867 heikki.linnakangas 204 GIC 96 : if (slen <= MAX_COPY_DATA_DISPLAY)
205 96 : return pstrdup(str);
867 heikki.linnakangas 206 ECB :
207 : /* Apply encoding-dependent truncation */
867 heikki.linnakangas 208 UIC 0 : len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
209 :
210 : /*
867 heikki.linnakangas 211 ECB : * Truncate, and add "..." to show we truncated the input.
212 : */
867 heikki.linnakangas 213 UIC 0 : res = (char *) palloc(len + 4);
214 0 : memcpy(res, str, len);
867 heikki.linnakangas 215 UBC 0 : strcpy(res + len, "...");
216 :
867 heikki.linnakangas 217 UIC 0 : return res;
218 : }
219 :
867 heikki.linnakangas 220 EUB : /*
221 : * Allocate memory and initialize a new CopyMultiInsertBuffer for this
222 : * ResultRelInfo.
223 : */
224 : static CopyMultiInsertBuffer *
867 heikki.linnakangas 225 GIC 911 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
226 : {
227 : CopyMultiInsertBuffer *buffer;
228 :
229 911 : buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
230 911 : memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
231 911 : buffer->resultRelInfo = rri;
178 efujita 232 GNC 911 : buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
867 heikki.linnakangas 233 GIC 911 : buffer->nused = 0;
234 :
235 911 : return buffer;
867 heikki.linnakangas 236 ECB : }
237 :
238 : /*
239 : * Make a new buffer for this ResultRelInfo.
240 : */
241 : static inline void
867 heikki.linnakangas 242 CBC 911 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
243 : ResultRelInfo *rri)
244 : {
245 : CopyMultiInsertBuffer *buffer;
246 :
867 heikki.linnakangas 247 GIC 911 : buffer = CopyMultiInsertBufferInit(rri);
248 :
867 heikki.linnakangas 249 ECB : /* Setup back-link so we can easily find this buffer again */
867 heikki.linnakangas 250 GIC 911 : rri->ri_CopyMultiInsertBuffer = buffer;
251 : /* Record that we're tracking this buffer */
252 911 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
253 911 : }
867 heikki.linnakangas 254 ECB :
255 : /*
256 : * Initialize an already allocated CopyMultiInsertInfo.
257 : *
258 : * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
259 : * for that table.
260 : */
261 : static void
867 heikki.linnakangas 262 GIC 905 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
263 : CopyFromState cstate, EState *estate, CommandId mycid,
264 : int ti_options)
265 : {
266 905 : miinfo->multiInsertBuffers = NIL;
267 905 : miinfo->bufferedTuples = 0;
268 905 : miinfo->bufferedBytes = 0;
867 heikki.linnakangas 269 CBC 905 : miinfo->cstate = cstate;
867 heikki.linnakangas 270 GIC 905 : miinfo->estate = estate;
271 905 : miinfo->mycid = mycid;
272 905 : miinfo->ti_options = ti_options;
867 heikki.linnakangas 273 ECB :
274 : /*
275 : * Only setup the buffer when not dealing with a partitioned table.
276 : * Buffers for partitioned tables will just be setup when we need to send
277 : * tuples their way for the first time.
278 : */
867 heikki.linnakangas 279 CBC 905 : if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
867 heikki.linnakangas 280 GIC 869 : CopyMultiInsertInfoSetupBuffer(miinfo, rri);
281 905 : }
282 :
283 : /*
284 : * Returns true if the buffers are full
285 : */
867 heikki.linnakangas 286 ECB : static inline bool
867 heikki.linnakangas 287 CBC 893642 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
867 heikki.linnakangas 288 ECB : {
867 heikki.linnakangas 289 GIC 893642 : if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
290 893045 : miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
291 650 : return true;
292 892992 : return false;
293 : }
867 heikki.linnakangas 294 ECB :
295 : /*
296 : * Returns true if we have no buffered tuples
297 : */
298 : static inline bool
867 heikki.linnakangas 299 CBC 863 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
300 : {
867 heikki.linnakangas 301 GIC 863 : return miinfo->bufferedTuples == 0;
302 : }
303 :
304 : /*
305 : * Write the tuples stored in 'buffer' out to the table.
867 heikki.linnakangas 306 ECB : */
307 : static inline void
867 heikki.linnakangas 308 CBC 1499 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
309 : CopyMultiInsertBuffer *buffer,
310 : int64 *processed)
311 : {
738 heikki.linnakangas 312 GIC 1499 : CopyFromState cstate = miinfo->cstate;
867 heikki.linnakangas 313 CBC 1499 : EState *estate = miinfo->estate;
314 1499 : int nused = buffer->nused;
315 1499 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
316 1499 : TupleTableSlot **slots = buffer->slots;
317 : int i;
867 heikki.linnakangas 318 ECB :
178 efujita 319 GNC 1499 : if (resultRelInfo->ri_FdwRoutine)
178 efujita 320 ECB : {
178 efujita 321 GNC 7 : int batch_size = resultRelInfo->ri_BatchSize;
322 7 : int sent = 0;
867 heikki.linnakangas 323 ECB :
178 efujita 324 GNC 7 : Assert(buffer->bistate == NULL);
325 :
326 : /* Ensure that the FDW supports batching and it's enabled */
327 7 : Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
328 7 : Assert(batch_size > 1);
329 :
330 : /*
331 : * We suppress error context information other than the relation name,
332 : * if one of the operations below fails.
333 : */
334 7 : Assert(!cstate->relname_only);
335 7 : cstate->relname_only = true;
336 :
337 19 : while (sent < nused)
338 : {
339 13 : int size = (batch_size < nused - sent) ? batch_size : (nused - sent);
340 13 : int inserted = size;
341 : TupleTableSlot **rslots;
342 :
343 : /* insert into foreign table: let the FDW do it */
344 : rslots =
345 13 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
346 : resultRelInfo,
347 13 : &slots[sent],
348 : NULL,
349 : &inserted);
350 :
351 12 : sent += size;
352 :
353 : /* No need to do anything if there are no inserted rows */
354 12 : if (inserted <= 0)
355 2 : continue;
356 :
357 : /* Triggers on foreign tables should not have transition tables */
358 10 : Assert(resultRelInfo->ri_TrigDesc == NULL ||
359 : resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
360 :
361 : /* Run AFTER ROW INSERT triggers */
362 10 : if (resultRelInfo->ri_TrigDesc != NULL &&
178 efujita 363 UNC 0 : resultRelInfo->ri_TrigDesc->trig_insert_after_row)
364 : {
365 0 : Oid relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
366 :
367 0 : for (i = 0; i < inserted; i++)
368 : {
369 0 : TupleTableSlot *slot = rslots[i];
370 :
371 : /*
372 : * AFTER ROW Triggers might reference the tableoid column,
373 : * so (re-)initialize tts_tableOid before evaluating them.
374 : */
375 0 : slot->tts_tableOid = relid;
376 :
377 0 : ExecARInsertTriggers(estate, resultRelInfo,
378 : slot, NIL,
379 : cstate->transition_capture);
380 : }
381 : }
382 :
383 : /* Update the row counter and progress of the COPY command */
178 efujita 384 GNC 10 : *processed += inserted;
385 10 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
386 : *processed);
387 : }
867 heikki.linnakangas 388 EUB :
178 efujita 389 GNC 24 : for (i = 0; i < nused; i++)
390 18 : ExecClearTuple(slots[i]);
391 :
392 : /* reset relname_only */
393 6 : cstate->relname_only = false;
394 : }
395 : else
396 : {
397 1492 : CommandId mycid = miinfo->mycid;
398 1492 : int ti_options = miinfo->ti_options;
399 1492 : bool line_buf_valid = cstate->line_buf_valid;
400 1492 : uint64 save_cur_lineno = cstate->cur_lineno;
401 : MemoryContext oldcontext;
402 :
403 1492 : Assert(buffer->bistate != NULL);
404 :
405 : /*
406 : * Print error context information correctly, if one of the operations
407 : * below fails.
408 : */
409 1492 : cstate->line_buf_valid = false;
410 :
411 : /*
412 : * table_multi_insert may leak memory, so switch to short-lived memory
413 : * context before calling it.
414 : */
415 1492 : oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
416 1492 : table_multi_insert(resultRelInfo->ri_RelationDesc,
417 : slots,
418 : nused,
419 : mycid,
420 : ti_options,
421 : buffer->bistate);
422 1492 : MemoryContextSwitchTo(oldcontext);
423 :
424 895062 : for (i = 0; i < nused; i++)
425 : {
426 : /*
427 : * If there are any indexes, update them for all the inserted
428 : * tuples, and run AFTER ROW INSERT triggers.
429 : */
430 893575 : if (resultRelInfo->ri_NumIndices > 0)
431 : {
432 : List *recheckIndexes;
433 :
434 200874 : cstate->cur_lineno = buffer->linenos[i];
435 : recheckIndexes =
436 200874 : ExecInsertIndexTuples(resultRelInfo,
437 : buffer->slots[i], estate, false,
438 : false, NULL, NIL, false);
439 200869 : ExecARInsertTriggers(estate, resultRelInfo,
440 200869 : slots[i], recheckIndexes,
441 : cstate->transition_capture);
442 200869 : list_free(recheckIndexes);
443 : }
444 :
445 : /*
446 : * There's no indexes, but see if we need to run AFTER ROW INSERT
447 : * triggers anyway.
448 : */
449 692701 : else if (resultRelInfo->ri_TrigDesc != NULL &&
450 39 : (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
451 30 : resultRelInfo->ri_TrigDesc->trig_insert_new_table))
452 : {
453 18 : cstate->cur_lineno = buffer->linenos[i];
454 18 : ExecARInsertTriggers(estate, resultRelInfo,
455 18 : slots[i], NIL,
456 : cstate->transition_capture);
457 : }
458 :
459 893570 : ExecClearTuple(slots[i]);
460 : }
461 :
462 : /* Update the row counter and progress of the COPY command */
463 1487 : *processed += nused;
464 1487 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
465 : *processed);
466 :
467 : /* reset cur_lineno and line_buf_valid to what they were */
468 1487 : cstate->line_buf_valid = line_buf_valid;
469 1487 : cstate->cur_lineno = save_cur_lineno;
470 : }
471 :
472 : /* Mark that all slots are free */
867 heikki.linnakangas 473 GIC 1493 : buffer->nused = 0;
474 1493 : }
475 :
476 : /*
477 : * Drop used slots and free member for this buffer.
478 : *
867 heikki.linnakangas 479 ECB : * The buffer must be flushed before cleanup.
480 : */
481 : static inline void
867 heikki.linnakangas 482 GIC 849 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
483 : CopyMultiInsertBuffer *buffer)
867 heikki.linnakangas 484 ECB : {
178 efujita 485 GNC 849 : ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
867 heikki.linnakangas 486 ECB : int i;
487 :
488 : /* Ensure buffer was flushed */
867 heikki.linnakangas 489 CBC 849 : Assert(buffer->nused == 0);
490 :
491 : /* Remove back-link to ourself */
178 efujita 492 GNC 849 : resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
867 heikki.linnakangas 493 ECB :
178 efujita 494 GNC 849 : if (resultRelInfo->ri_FdwRoutine == NULL)
495 : {
496 843 : Assert(buffer->bistate != NULL);
497 843 : FreeBulkInsertState(buffer->bistate);
498 : }
499 : else
500 6 : Assert(buffer->bistate == NULL);
867 heikki.linnakangas 501 ECB :
502 : /* Since we only create slots on demand, just drop the non-null ones. */
867 heikki.linnakangas 503 GIC 315103 : for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
504 314254 : ExecDropSingleTupleTableSlot(buffer->slots[i]);
867 heikki.linnakangas 505 ECB :
178 efujita 506 GNC 849 : if (resultRelInfo->ri_FdwRoutine == NULL)
507 843 : table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
508 : miinfo->ti_options);
509 :
867 heikki.linnakangas 510 GIC 849 : pfree(buffer);
511 849 : }
867 heikki.linnakangas 512 ECB :
513 : /*
514 : * Write out all stored tuples in all buffers out to the tables.
515 : *
516 : * Once flushed we also trim the tracked buffers list down to size by removing
517 : * the buffers created earliest first.
518 : *
730 michael 519 : * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
520 : * used. When cleaning up old buffers we'll never remove the one for
521 : * 'curr_rri'.
522 : */
523 : static inline void
178 efujita 524 GNC 1386 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
525 : int64 *processed)
867 heikki.linnakangas 526 ECB : {
527 : ListCell *lc;
528 :
867 heikki.linnakangas 529 GIC 2879 : foreach(lc, miinfo->multiInsertBuffers)
530 : {
531 1499 : CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
532 :
178 efujita 533 GNC 1499 : CopyMultiInsertBufferFlush(miinfo, buffer, processed);
867 heikki.linnakangas 534 ECB : }
535 :
867 heikki.linnakangas 536 GIC 1380 : miinfo->bufferedTuples = 0;
537 1380 : miinfo->bufferedBytes = 0;
867 heikki.linnakangas 538 ECB :
539 : /*
540 : * Trim the list of tracked buffers down if it exceeds the limit. Here we
541 : * remove buffers starting with the ones we created first. It seems less
542 : * likely that these older ones will be needed than the ones that were
543 : * just created.
544 : */
867 heikki.linnakangas 545 GIC 1380 : while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
867 heikki.linnakangas 546 ECB : {
547 : CopyMultiInsertBuffer *buffer;
548 :
867 heikki.linnakangas 549 UIC 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
550 :
551 : /*
552 : * We never want to remove the buffer that's currently being used, so
867 heikki.linnakangas 553 ECB : * if we happen to find that then move it to the end of the list.
554 : */
867 heikki.linnakangas 555 LBC 0 : if (buffer->resultRelInfo == curr_rri)
556 : {
557 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
558 0 : miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
559 0 : buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
560 : }
561 :
867 heikki.linnakangas 562 UIC 0 : CopyMultiInsertBufferCleanup(miinfo, buffer);
867 heikki.linnakangas 563 LBC 0 : miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
564 : }
867 heikki.linnakangas 565 GIC 1380 : }
566 :
867 heikki.linnakangas 567 ECB : /*
568 : * Cleanup allocated buffers and free memory
569 : */
570 : static inline void
867 heikki.linnakangas 571 GIC 843 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
867 heikki.linnakangas 572 ECB : {
573 : ListCell *lc;
574 :
867 heikki.linnakangas 575 GIC 1692 : foreach(lc, miinfo->multiInsertBuffers)
576 849 : CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
867 heikki.linnakangas 577 ECB :
867 heikki.linnakangas 578 CBC 843 : list_free(miinfo->multiInsertBuffers);
867 heikki.linnakangas 579 GIC 843 : }
580 :
581 : /*
582 : * Get the next TupleTableSlot that the next tuple should be stored in.
583 : *
584 : * Callers must ensure that the buffer is not full.
585 : *
867 heikki.linnakangas 586 ECB : * Note: 'miinfo' is unused but has been included for consistency with the
587 : * other functions in this area.
588 : */
589 : static inline TupleTableSlot *
867 heikki.linnakangas 590 GIC 894514 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
591 : ResultRelInfo *rri)
592 : {
867 heikki.linnakangas 593 CBC 894514 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
867 heikki.linnakangas 594 GIC 894514 : int nused = buffer->nused;
595 :
867 heikki.linnakangas 596 CBC 894514 : Assert(buffer != NULL);
867 heikki.linnakangas 597 GIC 894514 : Assert(nused < MAX_BUFFERED_TUPLES);
867 heikki.linnakangas 598 ECB :
867 heikki.linnakangas 599 GIC 894514 : if (buffer->slots[nused] == NULL)
867 heikki.linnakangas 600 CBC 314369 : buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
601 894514 : return buffer->slots[nused];
602 : }
603 :
867 heikki.linnakangas 604 ECB : /*
605 : * Record the previously reserved TupleTableSlot that was reserved by
606 : * CopyMultiInsertInfoNextFreeSlot as being consumed.
607 : */
608 : static inline void
867 heikki.linnakangas 609 GIC 893642 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
867 heikki.linnakangas 610 ECB : TupleTableSlot *slot, int tuplen, uint64 lineno)
611 : {
867 heikki.linnakangas 612 GIC 893642 : CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
613 :
867 heikki.linnakangas 614 CBC 893642 : Assert(buffer != NULL);
615 893642 : Assert(slot == buffer->slots[buffer->nused]);
616 :
617 : /* Store the line number so we can properly report any errors later */
867 heikki.linnakangas 618 GIC 893642 : buffer->linenos[buffer->nused] = lineno;
619 :
620 : /* Record this slot as being used */
621 893642 : buffer->nused++;
622 :
623 : /* Update how many tuples are stored and their size */
624 893642 : miinfo->bufferedTuples++;
625 893642 : miinfo->bufferedBytes += tuplen;
626 893642 : }
627 :
867 heikki.linnakangas 628 ECB : /*
629 : * Copy FROM file to relation.
630 : */
631 : uint64
867 heikki.linnakangas 632 GIC 991 : CopyFrom(CopyFromState cstate)
867 heikki.linnakangas 633 ECB : {
634 : ResultRelInfo *resultRelInfo;
635 : ResultRelInfo *target_resultRelInfo;
867 heikki.linnakangas 636 GIC 991 : ResultRelInfo *prevResultRelInfo = NULL;
867 heikki.linnakangas 637 CBC 991 : EState *estate = CreateExecutorState(); /* for ExecConstraints() */
638 : ModifyTableState *mtstate;
639 : ExprContext *econtext;
640 991 : TupleTableSlot *singleslot = NULL;
641 991 : MemoryContext oldcontext = CurrentMemoryContext;
642 :
867 heikki.linnakangas 643 GIC 991 : PartitionTupleRouting *proute = NULL;
644 : ErrorContextCallback errcallback;
645 991 : CommandId mycid = GetCurrentCommandId(true);
646 991 : int ti_options = 0; /* start with default options for insert */
647 991 : BulkInsertState bistate = NULL;
648 : CopyInsertMethod insertMethod;
867 heikki.linnakangas 649 CBC 991 : CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
761 michael 650 GIC 991 : int64 processed = 0;
651 991 : int64 excluded = 0;
652 : bool has_before_insert_row_trig;
867 heikki.linnakangas 653 EUB : bool has_instead_insert_row_trig;
867 heikki.linnakangas 654 GIC 991 : bool leafpart_use_multi_insert = false;
655 :
656 991 : Assert(cstate->rel);
657 991 : Assert(list_length(cstate->range_table) == 1);
658 :
867 heikki.linnakangas 659 EUB : /*
660 : * The target must be a plain, foreign, or partitioned relation, or have
661 : * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only
662 : * allowed on views, so we only hint about them in the view case.)
663 : */
867 heikki.linnakangas 664 GIC 991 : if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
665 76 : cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
867 heikki.linnakangas 666 GBC 57 : cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
667 9 : !(cstate->rel->trigdesc &&
867 heikki.linnakangas 668 GIC 6 : cstate->rel->trigdesc->trig_insert_instead_row))
867 heikki.linnakangas 669 ECB : {
867 heikki.linnakangas 670 GIC 3 : if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
671 3 : ereport(ERROR,
672 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
673 : errmsg("cannot copy to view \"%s\"",
674 : RelationGetRelationName(cstate->rel)),
867 heikki.linnakangas 675 ECB : errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
867 heikki.linnakangas 676 UIC 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
677 0 : ereport(ERROR,
678 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
867 heikki.linnakangas 679 ECB : errmsg("cannot copy to materialized view \"%s\"",
680 : RelationGetRelationName(cstate->rel))));
867 heikki.linnakangas 681 UIC 0 : else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
867 heikki.linnakangas 682 LBC 0 : ereport(ERROR,
867 heikki.linnakangas 683 ECB : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
684 : errmsg("cannot copy to sequence \"%s\"",
685 : RelationGetRelationName(cstate->rel))));
686 : else
867 heikki.linnakangas 687 UIC 0 : ereport(ERROR,
688 : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
689 : errmsg("cannot copy to non-table relation \"%s\"",
690 : RelationGetRelationName(cstate->rel))));
691 : }
692 :
693 : /*
867 heikki.linnakangas 694 ECB : * If the target file is new-in-transaction, we assume that checking FSM
695 : * for free space is a waste of time. This could possibly be wrong, but
696 : * it's unlikely.
697 : */
867 heikki.linnakangas 698 CBC 988 : if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
867 heikki.linnakangas 699 GIC 915 : (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
277 rhaas 700 GNC 909 : cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
867 heikki.linnakangas 701 CBC 40 : ti_options |= TABLE_INSERT_SKIP_FSM;
702 :
867 heikki.linnakangas 703 ECB : /*
704 : * Optimize if new relation storage was created in this subxact or one of
705 : * its committed children and we won't see those rows later as part of an
706 : * earlier scan or command. The subxact test ensures that if this subxact
707 : * aborts then the frozen rows won't be visible after xact cleanup. Note
708 : * that the stronger test of exactly which subtransaction created it is
709 : * crucial for correctness of this optimization. The test for an earlier
710 : * scan or command tolerates false negatives. FREEZE causes other sessions
711 : * to see rows they would not see under MVCC, and a false negative merely
712 : * spreads that anomaly to the current session.
713 : */
867 heikki.linnakangas 714 GIC 988 : if (cstate->opts.freeze)
715 : {
867 heikki.linnakangas 716 ECB : /*
717 : * We currently disallow COPY FREEZE on partitioned tables. The
718 : * reason for this is that we've simply not yet opened the partitions
719 : * to determine if the optimization can be applied to them. We could
720 : * go and open them all here, but doing so may be quite a costly
721 : * overhead for small copies. In any case, we may just end up routing
722 : * tuples to a small number of partitions. It seems better just to
723 : * raise an ERROR for partitioned tables.
724 : */
867 heikki.linnakangas 725 CBC 29 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
726 : {
867 heikki.linnakangas 727 GIC 3 : ereport(ERROR,
867 heikki.linnakangas 728 ECB : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
729 : errmsg("cannot perform COPY FREEZE on a partitioned table")));
730 : }
731 :
732 : /*
733 : * Tolerate one registration for the benefit of FirstXactSnapshot.
734 : * Scan-bearing queries generally create at least two registrations,
735 : * though relying on that is fragile, as is ignoring ActiveSnapshot.
736 : * Clear CatalogSnapshot to avoid counting its registration. We'll
737 : * still detect ongoing catalog scans, each of which separately
738 : * registers the snapshot it uses.
739 : */
867 heikki.linnakangas 740 CBC 26 : InvalidateCatalogSnapshot();
741 26 : if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
867 heikki.linnakangas 742 UIC 0 : ereport(ERROR,
743 : (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
867 heikki.linnakangas 744 ECB : errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
745 :
867 heikki.linnakangas 746 GIC 52 : if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
277 rhaas 747 GNC 26 : cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
867 heikki.linnakangas 748 GIC 9 : ereport(ERROR,
867 heikki.linnakangas 749 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
750 : errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
751 :
867 heikki.linnakangas 752 GIC 17 : ti_options |= TABLE_INSERT_FROZEN;
867 heikki.linnakangas 753 ECB : }
754 :
755 : /*
756 : * We need a ResultRelInfo so we can use the regular executor's
757 : * index-entry-making machinery. (There used to be a huge amount of code
758 : * here that basically duplicated execUtils.c ...)
759 : */
34 tgl 760 GNC 976 : ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
867 heikki.linnakangas 761 CBC 976 : resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
867 heikki.linnakangas 762 GIC 976 : ExecInitResultRelation(estate, resultRelInfo, 1);
763 :
764 : /* Verify the named relation is a valid target for INSERT */
765 976 : CheckValidResultRel(resultRelInfo, CMD_INSERT);
766 :
767 975 : ExecOpenIndices(resultRelInfo, false);
867 heikki.linnakangas 768 ECB :
769 : /*
770 : * Set up a ModifyTableState so we can let FDW(s) init themselves for
771 : * foreign-table result relation(s).
772 : */
867 heikki.linnakangas 773 GIC 975 : mtstate = makeNode(ModifyTableState);
867 heikki.linnakangas 774 CBC 975 : mtstate->ps.plan = NULL;
775 975 : mtstate->ps.state = estate;
867 heikki.linnakangas 776 GIC 975 : mtstate->operation = CMD_INSERT;
739 tgl 777 975 : mtstate->mt_nrels = 1;
867 heikki.linnakangas 778 975 : mtstate->resultRelInfo = resultRelInfo;
790 779 975 : mtstate->rootResultRelInfo = resultRelInfo;
867 heikki.linnakangas 780 EUB :
867 heikki.linnakangas 781 GBC 975 : if (resultRelInfo->ri_FdwRoutine != NULL &&
867 heikki.linnakangas 782 GIC 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
783 18 : resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
784 : resultRelInfo);
867 heikki.linnakangas 785 EUB :
786 : /*
787 : * Also, if the named relation is a foreign table, determine if the FDW
788 : * supports batch insert and determine the batch size (a FDW may support
789 : * batching, but it may be disabled for the server/table).
790 : *
791 : * If the FDW does not support batching, we set the batch size to 1.
792 : */
178 efujita 793 GNC 975 : if (resultRelInfo->ri_FdwRoutine != NULL &&
794 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
795 18 : resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
796 18 : resultRelInfo->ri_BatchSize =
797 18 : resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
798 : else
799 957 : resultRelInfo->ri_BatchSize = 1;
800 :
801 975 : Assert(resultRelInfo->ri_BatchSize >= 1);
802 :
867 heikki.linnakangas 803 EUB : /* Prepare to catch AFTER triggers. */
867 heikki.linnakangas 804 GIC 975 : AfterTriggerBeginQuery();
805 :
806 : /*
807 : * If there are any triggers with transition tables on the named relation,
867 heikki.linnakangas 808 EUB : * we need to be prepared to capture transition tuples.
809 : *
810 : * Because partition tuple routing would like to know about whether
811 : * transition capture is active, we also set it in mtstate, which is
812 : * passed to ExecFindPartition() below.
813 : */
867 heikki.linnakangas 814 GIC 975 : cstate->transition_capture = mtstate->mt_transition_capture =
815 975 : MakeTransitionCaptureState(cstate->rel->trigdesc,
816 975 : RelationGetRelid(cstate->rel),
817 : CMD_INSERT);
818 :
867 heikki.linnakangas 819 ECB : /*
820 : * If the named relation is a partitioned table, initialize state for
821 : * CopyFrom tuple routing.
822 : */
867 heikki.linnakangas 823 GIC 975 : if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
733 tgl 824 45 : proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
825 :
867 heikki.linnakangas 826 975 : if (cstate->whereClause)
827 9 : cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
828 : &mtstate->ps);
829 :
830 : /*
831 : * It's generally more efficient to prepare a bunch of tuples for
832 : * insertion, and insert them in one
833 : * table_multi_insert()/ExecForeignBatchInsert() call, than call
834 : * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
835 : * However, there are a number of reasons why we might not be able to do
836 : * this. These are explained below.
837 : */
838 975 : if (resultRelInfo->ri_TrigDesc != NULL &&
839 88 : (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
840 42 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
841 : {
842 : /*
843 : * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
844 : * triggers on the table. Such triggers might query the table we're
845 : * inserting into and act differently if the tuples that have already
846 : * been processed and prepared for insertion are not there.
867 heikki.linnakangas 847 ECB : */
867 heikki.linnakangas 848 GIC 52 : insertMethod = CIM_SINGLE;
867 heikki.linnakangas 849 ECB : }
178 efujita 850 GNC 923 : else if (resultRelInfo->ri_FdwRoutine != NULL &&
851 14 : resultRelInfo->ri_BatchSize == 1)
852 : {
853 : /*
854 : * Can't support multi-inserts to a foreign table if the FDW does not
855 : * support batching, or it's disabled for the server or foreign table.
856 : */
857 9 : insertMethod = CIM_SINGLE;
858 : }
867 heikki.linnakangas 859 GIC 914 : else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
860 13 : resultRelInfo->ri_TrigDesc->trig_insert_new_table)
861 : {
862 : /*
863 : * For partitioned tables we can't support multi-inserts when there
864 : * are any statement level insert triggers. It might be possible to
865 : * allow partitioned tables with such triggers in the future, but for
866 : * now, CopyMultiInsertInfoFlush expects that any after row insert and
867 : * statement level insert triggers are on the same relation.
868 : */
869 9 : insertMethod = CIM_SINGLE;
870 : }
178 efujita 871 GNC 905 : else if (cstate->volatile_defexprs)
867 heikki.linnakangas 872 EUB : {
873 : /*
874 : * Can't support multi-inserts if there are any volatile default
875 : * expressions in the table. Similarly to the trigger case above,
876 : * such expressions may query the table we're inserting into.
867 heikki.linnakangas 877 ECB : *
878 : * Note: It does not matter if any partitions have any volatile
879 : * default expressions as we use the defaults from the target of the
880 : * COPY command.
881 : */
867 heikki.linnakangas 882 UIC 0 : insertMethod = CIM_SINGLE;
883 : }
867 heikki.linnakangas 884 GIC 905 : else if (contain_volatile_functions(cstate->whereClause))
885 : {
886 : /*
887 : * Can't support multi-inserts if there are any volatile function
888 : * expressions in WHERE clause. Similarly to the trigger case above,
867 heikki.linnakangas 889 ECB : * such expressions may query the table we're inserting into.
890 : */
867 heikki.linnakangas 891 LBC 0 : insertMethod = CIM_SINGLE;
892 : }
893 : else
867 heikki.linnakangas 894 ECB : {
895 : /*
896 : * For partitioned tables, we may still be able to perform bulk
897 : * inserts. However, the possibility of this depends on which types
898 : * of triggers exist on the partition. We must disable bulk inserts
899 : * if the partition is a foreign table that can't use batching or it
900 : * has any before row insert or insert instead triggers (same as we
901 : * checked above for the parent table). Since the partition's
902 : * resultRelInfos are initialized only when we actually need to insert
903 : * the first tuple into them, we must have the intermediate insert
904 : * method of CIM_MULTI_CONDITIONAL to flag that we must later
905 : * determine if we can use bulk-inserts for the partition being
906 : * inserted into.
907 : */
867 heikki.linnakangas 908 CBC 905 : if (proute)
909 36 : insertMethod = CIM_MULTI_CONDITIONAL;
910 : else
911 869 : insertMethod = CIM_MULTI;
867 heikki.linnakangas 912 ECB :
867 heikki.linnakangas 913 CBC 905 : CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
914 : estate, mycid, ti_options);
915 : }
916 :
917 : /*
918 : * If not using batch mode (which allocates slots as needed) set up a
919 : * tuple slot too. When inserting into a partitioned table, we also need
920 : * one, even if we might batch insert, to read the tuple in the root
921 : * partition's form.
922 : */
923 975 : if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
867 heikki.linnakangas 924 ECB : {
867 heikki.linnakangas 925 CBC 106 : singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
867 heikki.linnakangas 926 ECB : &estate->es_tupleTable);
867 heikki.linnakangas 927 CBC 106 : bistate = GetBulkInsertState();
928 : }
867 heikki.linnakangas 929 ECB :
867 heikki.linnakangas 930 GIC 1063 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
867 heikki.linnakangas 931 CBC 88 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
932 :
867 heikki.linnakangas 933 GIC 1063 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
867 heikki.linnakangas 934 CBC 88 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
935 :
936 : /*
937 : * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
938 : * should do this for COPY, since it's not really an "INSERT" statement as
939 : * such. However, executing these triggers maintains consistency with the
940 : * EACH ROW triggers that we already fire on COPY.
941 : */
867 heikki.linnakangas 942 GIC 975 : ExecBSInsertTriggers(estate, resultRelInfo);
943 :
867 heikki.linnakangas 944 CBC 975 : econtext = GetPerTupleExprContext(estate);
867 heikki.linnakangas 945 ECB :
946 : /* Set up callback to identify error line number */
867 heikki.linnakangas 947 GIC 975 : errcallback.callback = CopyFromErrorCallback;
948 975 : errcallback.arg = (void *) cstate;
949 975 : errcallback.previous = error_context_stack;
950 975 : error_context_stack = &errcallback;
951 :
952 : for (;;)
867 heikki.linnakangas 953 CBC 893845 : {
867 heikki.linnakangas 954 ECB : TupleTableSlot *myslot;
955 : bool skip_tuple;
956 :
867 heikki.linnakangas 957 CBC 894820 : CHECK_FOR_INTERRUPTS();
958 :
959 : /*
960 : * Reset the per-tuple exprcontext. We do this after every tuple, to
961 : * clean-up after expression evaluations etc.
962 : */
867 heikki.linnakangas 963 GIC 894820 : ResetPerTupleExprContext(estate);
964 :
965 : /* select slot to (initially) load row into */
966 894820 : if (insertMethod == CIM_SINGLE || proute)
967 : {
867 heikki.linnakangas 968 CBC 107430 : myslot = singleslot;
969 107430 : Assert(myslot != NULL);
867 heikki.linnakangas 970 ECB : }
971 : else
972 : {
867 heikki.linnakangas 973 GIC 787390 : Assert(resultRelInfo == target_resultRelInfo);
974 787390 : Assert(insertMethod == CIM_MULTI);
975 :
976 787390 : myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
977 : resultRelInfo);
867 heikki.linnakangas 978 ECB : }
979 :
980 : /*
981 : * Switch to per-tuple context before calling NextCopyFrom, which does
982 : * evaluate default expressions etc. and requires per-tuple context.
983 : */
867 heikki.linnakangas 984 GIC 894820 : MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
985 :
986 894820 : ExecClearTuple(myslot);
867 heikki.linnakangas 987 ECB :
988 : /* Directly store the values/nulls array in the slot */
867 heikki.linnakangas 989 CBC 894820 : if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
990 906 : break;
991 :
867 heikki.linnakangas 992 GIC 893862 : ExecStoreVirtualTuple(myslot);
993 :
994 : /*
995 : * Constraints and where clause might reference the tableoid column,
996 : * so (re-)initialize tts_tableOid before evaluating them.
997 : */
998 893862 : myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
867 heikki.linnakangas 999 ECB :
1000 : /* Triggers and stuff need to be invoked in query context. */
867 heikki.linnakangas 1001 CBC 893862 : MemoryContextSwitchTo(oldcontext);
1002 :
867 heikki.linnakangas 1003 GIC 893862 : if (cstate->whereClause)
1004 : {
1005 33 : econtext->ecxt_scantuple = myslot;
1006 : /* Skip items that don't match COPY's WHERE clause */
1007 33 : if (!ExecQual(cstate->qualexpr, econtext))
1008 : {
1009 : /*
1010 : * Report that this tuple was filtered out by the WHERE
1011 : * clause.
761 michael 1012 EUB : */
761 michael 1013 GIC 18 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
761 michael 1014 ECB : ++excluded);
867 heikki.linnakangas 1015 GIC 18 : continue;
1016 : }
1017 : }
1018 :
1019 : /* Determine the partition to insert the tuple into */
1020 893844 : if (proute)
867 heikki.linnakangas 1021 EUB : {
1022 : TupleConversionMap *map;
1023 :
1024 : /*
1025 : * Attempt to find a partition suitable for this tuple.
1026 : * ExecFindPartition() will raise an error if none can be found or
1027 : * if the found partition is not suitable for INSERTs.
1028 : */
867 heikki.linnakangas 1029 GIC 107193 : resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
1030 : proute, myslot, estate);
1031 :
1032 107192 : if (prevResultRelInfo != resultRelInfo)
1033 : {
1034 : /* Determine which triggers exist on this partition */
1035 50782 : has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1036 27 : resultRelInfo->ri_TrigDesc->trig_insert_before_row);
1037 :
867 heikki.linnakangas 1038 CBC 50782 : has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
1039 27 : resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
1040 :
867 heikki.linnakangas 1041 ECB : /*
1042 : * Disable multi-inserts when the partition has BEFORE/INSTEAD
1043 : * OF triggers, or if the partition is a foreign table that
1044 : * can't use batching.
1045 : */
867 heikki.linnakangas 1046 GIC 101483 : leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
1047 50728 : !has_before_insert_row_trig &&
1048 152199 : !has_instead_insert_row_trig &&
178 efujita 1049 GNC 50716 : (resultRelInfo->ri_FdwRoutine == NULL ||
1050 6 : resultRelInfo->ri_BatchSize > 1);
1051 :
1052 : /* Set the multi-insert buffer to use for this partition. */
867 heikki.linnakangas 1053 GIC 50755 : if (leafpart_use_multi_insert)
1054 : {
867 heikki.linnakangas 1055 CBC 50714 : if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
867 heikki.linnakangas 1056 GIC 42 : CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
867 heikki.linnakangas 1057 ECB : resultRelInfo);
1058 : }
867 heikki.linnakangas 1059 CBC 41 : else if (insertMethod == CIM_MULTI_CONDITIONAL &&
867 heikki.linnakangas 1060 GIC 14 : !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
1061 : {
867 heikki.linnakangas 1062 ECB : /*
1063 : * Flush pending inserts if this partition can't use
1064 : * batching, so rows are visible to triggers etc.
1065 : */
178 efujita 1066 UNC 0 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1067 : resultRelInfo,
1068 : &processed);
1069 : }
1070 :
867 heikki.linnakangas 1071 GIC 50755 : if (bistate != NULL)
1072 50755 : ReleaseBulkInsertStatePin(bistate);
1073 50755 : prevResultRelInfo = resultRelInfo;
1074 : }
1075 :
867 heikki.linnakangas 1076 ECB : /*
1077 : * If we're capturing transition tuples, we might need to convert
1078 : * from the partition rowtype to root rowtype. But if there are no
1079 : * BEFORE triggers on the partition that could change the tuple,
1080 : * we can just remember the original unconverted tuple to avoid a
1081 : * needless round trip conversion.
1082 : */
867 heikki.linnakangas 1083 CBC 107192 : if (cstate->transition_capture != NULL)
1084 27 : cstate->transition_capture->tcs_original_insert_tuple =
867 heikki.linnakangas 1085 GIC 27 : !has_before_insert_row_trig ? myslot : NULL;
1086 :
867 heikki.linnakangas 1087 ECB : /*
1088 : * We might need to convert from the root rowtype to the partition
1089 : * rowtype.
1090 : */
128 alvherre 1091 GNC 107192 : map = ExecGetRootToChildMap(resultRelInfo, estate);
867 heikki.linnakangas 1092 GIC 107192 : if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
1093 : {
1094 : /* non batch insert */
1095 68 : if (map != NULL)
1096 : {
867 heikki.linnakangas 1097 ECB : TupleTableSlot *new_slot;
1098 :
867 heikki.linnakangas 1099 GIC 55 : new_slot = resultRelInfo->ri_PartitionTupleSlot;
867 heikki.linnakangas 1100 CBC 55 : myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
1101 : }
867 heikki.linnakangas 1102 ECB : }
1103 : else
1104 : {
1105 : /*
1106 : * Prepare to queue up tuple for later batch insert into
1107 : * current partition.
1108 : */
1109 : TupleTableSlot *batchslot;
1110 :
1111 : /* no other path available for partitioned table */
867 heikki.linnakangas 1112 GIC 107124 : Assert(insertMethod == CIM_MULTI_CONDITIONAL);
1113 :
1114 107124 : batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
1115 : resultRelInfo);
1116 :
1117 107124 : if (map != NULL)
867 heikki.linnakangas 1118 CBC 6100 : myslot = execute_attr_map_slot(map->attrMap, myslot,
1119 : batchslot);
867 heikki.linnakangas 1120 ECB : else
1121 : {
1122 : /*
1123 : * This looks more expensive than it is (Believe me, I
1124 : * optimized it away. Twice.). The input is in virtual
1125 : * form, and we'll materialize the slot below - for most
1126 : * slot types the copy performs the work materialization
1127 : * would later require anyway.
1128 : */
867 heikki.linnakangas 1129 GIC 101024 : ExecCopySlot(batchslot, myslot);
1130 101024 : myslot = batchslot;
1131 : }
867 heikki.linnakangas 1132 ECB : }
1133 :
1134 : /* ensure that triggers etc see the right relation */
867 heikki.linnakangas 1135 CBC 107192 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1136 : }
867 heikki.linnakangas 1137 ECB :
867 heikki.linnakangas 1138 GIC 893843 : skip_tuple = false;
867 heikki.linnakangas 1139 ECB :
1140 : /* BEFORE ROW INSERT Triggers */
867 heikki.linnakangas 1141 CBC 893843 : if (has_before_insert_row_trig)
1142 : {
867 heikki.linnakangas 1143 GIC 137 : if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
1144 8 : skip_tuple = true; /* "do nothing" */
1145 : }
1146 :
867 heikki.linnakangas 1147 CBC 893843 : if (!skip_tuple)
1148 : {
867 heikki.linnakangas 1149 ECB : /*
1150 : * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
1151 : * tuple. Otherwise, proceed with inserting the tuple into the
1152 : * table or foreign table.
1153 : */
867 heikki.linnakangas 1154 CBC 893835 : if (has_instead_insert_row_trig)
1155 : {
867 heikki.linnakangas 1156 GIC 6 : ExecIRInsertTriggers(estate, resultRelInfo, myslot);
1157 : }
1158 : else
1159 : {
1160 : /* Compute stored generated columns */
1161 893829 : if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
1162 300878 : resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
867 heikki.linnakangas 1163 CBC 18 : ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
1164 : CMD_INSERT);
1165 :
867 heikki.linnakangas 1166 ECB : /*
1167 : * If the target is a plain table, check the constraints of
1168 : * the tuple.
1169 : */
867 heikki.linnakangas 1170 CBC 893829 : if (resultRelInfo->ri_FdwRoutine == NULL &&
867 heikki.linnakangas 1171 GIC 893785 : resultRelInfo->ri_RelationDesc->rd_att->constr)
867 heikki.linnakangas 1172 CBC 300860 : ExecConstraints(resultRelInfo, myslot, estate);
867 heikki.linnakangas 1173 ECB :
1174 : /*
1175 : * Also check the tuple against the partition constraint, if
1176 : * there is one; except that if we got here via tuple-routing,
1177 : * we don't need to if there's no BR trigger defined on the
1178 : * partition.
1179 : */
867 heikki.linnakangas 1180 CBC 893814 : if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
1181 107186 : (proute == NULL || has_before_insert_row_trig))
1182 1154 : ExecPartitionCheck(resultRelInfo, myslot, estate, true);
867 heikki.linnakangas 1183 ECB :
1184 : /* Store the slot in the multi-insert buffer, when enabled. */
867 heikki.linnakangas 1185 GIC 893814 : if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
1186 : {
867 heikki.linnakangas 1187 ECB : /*
1188 : * The slot previously might point into the per-tuple
1189 : * context. For batching it needs to be longer lived.
1190 : */
867 heikki.linnakangas 1191 GIC 893642 : ExecMaterializeSlot(myslot);
1192 :
867 heikki.linnakangas 1193 ECB : /* Add this tuple to the tuple buffer */
867 heikki.linnakangas 1194 CBC 893642 : CopyMultiInsertInfoStore(&multiInsertInfo,
1195 : resultRelInfo, myslot,
1196 : cstate->line_buf.len,
1197 : cstate->cur_lineno);
1198 :
1199 : /*
867 heikki.linnakangas 1200 EUB : * If enough inserts have queued up, then flush all
1201 : * buffers out to their tables.
1202 : */
867 heikki.linnakangas 1203 GIC 893642 : if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
178 efujita 1204 GNC 650 : CopyMultiInsertInfoFlush(&multiInsertInfo,
1205 : resultRelInfo,
1206 : &processed);
1207 :
1208 : /*
1209 : * We delay updating the row counter and progress of the
1210 : * COPY command until after writing the tuples stored in
1211 : * the buffer out to the table, as in single insert mode.
1212 : * See CopyMultiInsertBufferFlush().
1213 : */
1214 893642 : continue; /* next tuple please */
867 heikki.linnakangas 1215 ECB : }
1216 : else
1217 : {
867 heikki.linnakangas 1218 GIC 172 : List *recheckIndexes = NIL;
1219 :
1220 : /* OK, store the tuple */
1221 172 : if (resultRelInfo->ri_FdwRoutine != NULL)
1222 : {
1223 25 : myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
1224 : resultRelInfo,
1225 : myslot,
1226 : NULL);
867 heikki.linnakangas 1227 ECB :
867 heikki.linnakangas 1228 CBC 24 : if (myslot == NULL) /* "do nothing" */
1229 2 : continue; /* next tuple please */
1230 :
1231 : /*
1232 : * AFTER ROW Triggers might reference the tableoid
1233 : * column, so (re-)initialize tts_tableOid before
1234 : * evaluating them.
867 heikki.linnakangas 1235 ECB : */
867 heikki.linnakangas 1236 CBC 22 : myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
1237 : }
1238 : else
867 heikki.linnakangas 1239 ECB : {
1240 : /* OK, store the tuple and create index entries for it */
867 heikki.linnakangas 1241 GIC 147 : table_tuple_insert(resultRelInfo->ri_RelationDesc,
1242 : myslot, mycid, ti_options, bistate);
867 heikki.linnakangas 1243 ECB :
867 heikki.linnakangas 1244 CBC 147 : if (resultRelInfo->ri_NumIndices > 0)
867 heikki.linnakangas 1245 UIC 0 : recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
1246 : myslot,
1247 : estate,
1248 : false,
1249 : false,
1250 : NULL,
1251 : NIL,
1252 : false);
1253 : }
1254 :
1255 : /* AFTER ROW INSERT Triggers */
867 heikki.linnakangas 1256 GIC 169 : ExecARInsertTriggers(estate, resultRelInfo, myslot,
867 heikki.linnakangas 1257 ECB : recheckIndexes, cstate->transition_capture);
1258 :
867 heikki.linnakangas 1259 CBC 169 : list_free(recheckIndexes);
1260 : }
1261 : }
867 heikki.linnakangas 1262 ECB :
1263 : /*
1264 : * We count only tuples not suppressed by a BEFORE INSERT trigger
1265 : * or FDW; this is the same definition used by nodeModifyTable.c
1266 : * for counting tuples inserted by an INSERT command. Update
1267 : * progress of the COPY command as well.
1268 : */
761 michael 1269 GIC 175 : pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
1270 : ++processed);
1271 : }
1272 : }
1273 :
867 heikki.linnakangas 1274 ECB : /* Flush any remaining buffered tuples */
867 heikki.linnakangas 1275 CBC 906 : if (insertMethod != CIM_SINGLE)
1276 : {
867 heikki.linnakangas 1277 GIC 849 : if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
178 efujita 1278 GNC 736 : CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
1279 : }
867 heikki.linnakangas 1280 ECB :
1281 : /* Done, clean up */
867 heikki.linnakangas 1282 GIC 900 : error_context_stack = errcallback.previous;
867 heikki.linnakangas 1283 ECB :
867 heikki.linnakangas 1284 GIC 900 : if (bistate != NULL)
1285 92 : FreeBulkInsertState(bistate);
867 heikki.linnakangas 1286 ECB :
867 heikki.linnakangas 1287 GIC 900 : MemoryContextSwitchTo(oldcontext);
867 heikki.linnakangas 1288 ECB :
1289 : /* Execute AFTER STATEMENT insertion triggers */
867 heikki.linnakangas 1290 GIC 900 : ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
1291 :
867 heikki.linnakangas 1292 ECB : /* Handle queued AFTER triggers */
867 heikki.linnakangas 1293 GIC 900 : AfterTriggerEndQuery(estate);
1294 :
1295 900 : ExecResetTupleTable(estate->es_tupleTable, false);
1296 :
1297 : /* Allow the FDW to shut down */
1298 900 : if (target_resultRelInfo->ri_FdwRoutine != NULL &&
867 heikki.linnakangas 1299 CBC 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
867 heikki.linnakangas 1300 GIC 16 : target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
867 heikki.linnakangas 1301 ECB : target_resultRelInfo);
1302 :
1303 : /* Tear down the multi-insert buffer data */
867 heikki.linnakangas 1304 GIC 900 : if (insertMethod != CIM_SINGLE)
1305 843 : CopyMultiInsertInfoCleanup(&multiInsertInfo);
867 heikki.linnakangas 1306 ECB :
1307 : /* Close all the partitioned tables, leaf partitions, and their indices */
867 heikki.linnakangas 1308 CBC 900 : if (proute)
867 heikki.linnakangas 1309 GIC 44 : ExecCleanupTupleRouting(mtstate, proute);
1310 :
1311 : /* Close the result relations, including any trigger target relations */
1312 900 : ExecCloseResultRelations(estate);
1313 900 : ExecCloseRangeTableRelations(estate);
1314 :
867 heikki.linnakangas 1315 CBC 900 : FreeExecutorState(estate);
867 heikki.linnakangas 1316 ECB :
867 heikki.linnakangas 1317 CBC 900 : return processed;
1318 : }
1319 :
1320 : /*
1321 : * Setup to read tuples from a file for COPY FROM.
1322 : *
1323 : * 'rel': Used as a template for the tuples
1324 : * 'whereClause': WHERE clause from the COPY FROM command
867 heikki.linnakangas 1325 ECB : * 'filename': Name of server-local file to read, NULL for STDIN
1326 : * 'is_program': true if 'filename' is program to execute
1327 : * 'data_source_cb': callback that provides the input data
1328 : * 'attnamelist': List of char *, columns to include. NIL selects all cols.
1329 : * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
1330 : *
1331 : * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
1332 : */
1333 : CopyFromState
867 heikki.linnakangas 1334 GIC 1089 : BeginCopyFrom(ParseState *pstate,
1335 : Relation rel,
867 heikki.linnakangas 1336 ECB : Node *whereClause,
1337 : const char *filename,
1338 : bool is_program,
1339 : copy_data_source_cb data_source_cb,
1340 : List *attnamelist,
1341 : List *options)
1342 : {
1343 : CopyFromState cstate;
867 heikki.linnakangas 1344 GIC 1089 : bool pipe = (filename == NULL);
1345 : TupleDesc tupDesc;
1346 : AttrNumber num_phys_attrs,
1347 : num_defaults;
867 heikki.linnakangas 1348 ECB : FmgrInfo *in_functions;
1349 : Oid *typioparams;
1350 : Oid in_func_oid;
1351 : int *defmap;
1352 : ExprState **defexprs;
1353 : MemoryContext oldcontext;
1354 : bool volatile_defexprs;
761 michael 1355 GIC 1089 : const int progress_cols[] = {
1356 : PROGRESS_COPY_COMMAND,
1357 : PROGRESS_COPY_TYPE,
761 michael 1358 ECB : PROGRESS_COPY_BYTES_TOTAL
1359 : };
761 michael 1360 GIC 1089 : int64 progress_vals[] = {
1361 : PROGRESS_COPY_COMMAND_FROM,
761 michael 1362 ECB : 0,
1363 : 0
1364 : };
867 heikki.linnakangas 1365 :
1366 : /* Allocate workspace and zero all fields */
867 heikki.linnakangas 1367 CBC 1089 : cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
1368 :
1369 : /*
1370 : * We allocate everything used by a cstate in a new memory context. This
1371 : * avoids memory leaks during repeated use of COPY in a query.
867 heikki.linnakangas 1372 ECB : */
867 heikki.linnakangas 1373 CBC 1089 : cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
1374 : "COPY",
1375 : ALLOCSET_DEFAULT_SIZES);
1376 :
867 heikki.linnakangas 1377 GIC 1089 : oldcontext = MemoryContextSwitchTo(cstate->copycontext);
1378 :
1379 : /* Extract options from the statement node tree */
738 heikki.linnakangas 1380 CBC 1089 : ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
1381 :
1382 : /* Process the target relation */
867 heikki.linnakangas 1383 GIC 1028 : cstate->rel = rel;
1384 :
867 heikki.linnakangas 1385 CBC 1028 : tupDesc = RelationGetDescr(cstate->rel);
1386 :
1387 : /* process common options or initialization */
867 heikki.linnakangas 1388 ECB :
867 heikki.linnakangas 1389 EUB : /* Generate or convert list of attributes to process */
867 heikki.linnakangas 1390 GIC 1028 : cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
1391 :
1392 1028 : num_phys_attrs = tupDesc->natts;
1393 :
1394 : /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
1395 1028 : cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1396 1028 : if (cstate->opts.force_notnull)
1397 : {
1398 : List *attnums;
1399 : ListCell *cur;
867 heikki.linnakangas 1400 ECB :
867 heikki.linnakangas 1401 GIC 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
1402 :
867 heikki.linnakangas 1403 CBC 26 : foreach(cur, attnums)
1404 : {
867 heikki.linnakangas 1405 GIC 16 : int attnum = lfirst_int(cur);
1406 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1407 :
1408 16 : if (!list_member_int(cstate->attnumlist, attnum))
1409 3 : ereport(ERROR,
1410 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1411 : errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
1412 : NameStr(attr->attname))));
867 heikki.linnakangas 1413 CBC 13 : cstate->opts.force_notnull_flags[attnum - 1] = true;
1414 : }
1415 : }
1416 :
1417 : /* Convert FORCE_NULL name list to per-column flags, check validity */
867 heikki.linnakangas 1418 GIC 1025 : cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
867 heikki.linnakangas 1419 CBC 1025 : if (cstate->opts.force_null)
1420 : {
867 heikki.linnakangas 1421 ECB : List *attnums;
1422 : ListCell *cur;
1423 :
867 heikki.linnakangas 1424 GIC 13 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
1425 :
867 heikki.linnakangas 1426 CBC 26 : foreach(cur, attnums)
1427 : {
1428 16 : int attnum = lfirst_int(cur);
1429 16 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1430 :
1431 16 : if (!list_member_int(cstate->attnumlist, attnum))
867 heikki.linnakangas 1432 GIC 3 : ereport(ERROR,
1433 : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
867 heikki.linnakangas 1434 ECB : errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
1435 : NameStr(attr->attname))));
867 heikki.linnakangas 1436 GIC 13 : cstate->opts.force_null_flags[attnum - 1] = true;
867 heikki.linnakangas 1437 ECB : }
1438 : }
1439 :
1440 : /* Convert convert_selectively name list to per-column flags */
867 heikki.linnakangas 1441 GIC 1022 : if (cstate->opts.convert_selectively)
867 heikki.linnakangas 1442 ECB : {
1443 : List *attnums;
1444 : ListCell *cur;
1445 :
867 heikki.linnakangas 1446 GIC 2 : cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
1447 :
867 heikki.linnakangas 1448 CBC 2 : attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
867 heikki.linnakangas 1449 ECB :
867 heikki.linnakangas 1450 GIC 4 : foreach(cur, attnums)
1451 : {
867 heikki.linnakangas 1452 CBC 2 : int attnum = lfirst_int(cur);
1453 2 : Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
1454 :
867 heikki.linnakangas 1455 GIC 2 : if (!list_member_int(cstate->attnumlist, attnum))
867 heikki.linnakangas 1456 LBC 0 : ereport(ERROR,
867 heikki.linnakangas 1457 ECB : (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
1458 : errmsg_internal("selected column \"%s\" not referenced by COPY",
1459 : NameStr(attr->attname))));
867 heikki.linnakangas 1460 GIC 2 : cstate->convert_select_flags[attnum - 1] = true;
867 heikki.linnakangas 1461 ECB : }
1462 : }
1463 :
1464 : /* Use client encoding when ENCODING option is not specified. */
867 heikki.linnakangas 1465 GIC 1022 : if (cstate->opts.file_encoding < 0)
1466 1019 : cstate->file_encoding = pg_get_client_encoding();
1467 : else
1468 3 : cstate->file_encoding = cstate->opts.file_encoding;
1469 :
1470 : /*
1471 : * Look up encoding conversion function.
1472 : */
738 1473 1022 : if (cstate->file_encoding == GetDatabaseEncoding() ||
1474 3 : cstate->file_encoding == PG_SQL_ASCII ||
738 heikki.linnakangas 1475 UIC 0 : GetDatabaseEncoding() == PG_SQL_ASCII)
1476 : {
738 heikki.linnakangas 1477 GIC 1022 : cstate->need_transcoding = false;
738 heikki.linnakangas 1478 ECB : }
1479 : else
1480 : {
738 heikki.linnakangas 1481 UIC 0 : cstate->need_transcoding = true;
1482 0 : cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
1483 : GetDatabaseEncoding());
1484 : }
1485 :
867 heikki.linnakangas 1486 GIC 1022 : cstate->copy_src = COPY_FILE; /* default */
1487 :
867 heikki.linnakangas 1488 CBC 1022 : cstate->whereClause = whereClause;
1489 :
1490 : /* Initialize state variables */
867 heikki.linnakangas 1491 GIC 1022 : cstate->eol_type = EOL_UNKNOWN;
1492 1022 : cstate->cur_relname = RelationGetRelationName(cstate->rel);
1493 1022 : cstate->cur_lineno = 0;
1494 1022 : cstate->cur_attname = NULL;
1495 1022 : cstate->cur_attval = NULL;
178 efujita 1496 GNC 1022 : cstate->relname_only = false;
1497 :
1498 : /*
1499 : * Allocate buffers for the input pipeline.
738 heikki.linnakangas 1500 ECB : *
1501 : * attribute_buf and raw_buf are used in both text and binary modes, but
1502 : * input_buf and line_buf only in text mode.
1503 : */
738 heikki.linnakangas 1504 GIC 1022 : cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
867 heikki.linnakangas 1505 CBC 1022 : cstate->raw_buf_index = cstate->raw_buf_len = 0;
738 heikki.linnakangas 1506 GIC 1022 : cstate->raw_reached_eof = false;
1507 :
867 1508 1022 : if (!cstate->opts.binary)
1509 : {
1510 : /*
1511 : * If encoding conversion is needed, we need another buffer to hold
738 heikki.linnakangas 1512 ECB : * the converted input data. Otherwise, we can just point input_buf
1513 : * to the same buffer as raw_buf.
1514 : */
738 heikki.linnakangas 1515 GIC 1014 : if (cstate->need_transcoding)
1516 : {
738 heikki.linnakangas 1517 UIC 0 : cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
738 heikki.linnakangas 1518 LBC 0 : cstate->input_buf_index = cstate->input_buf_len = 0;
1519 : }
1520 : else
738 heikki.linnakangas 1521 GIC 1014 : cstate->input_buf = cstate->raw_buf;
738 heikki.linnakangas 1522 CBC 1014 : cstate->input_reached_eof = false;
1523 :
867 heikki.linnakangas 1524 GIC 1014 : initStringInfo(&cstate->line_buf);
867 heikki.linnakangas 1525 ECB : }
1526 :
738 heikki.linnakangas 1527 GIC 1022 : initStringInfo(&cstate->attribute_buf);
738 heikki.linnakangas 1528 ECB :
1529 : /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
867 heikki.linnakangas 1530 CBC 1022 : if (pstate)
1531 : {
867 heikki.linnakangas 1532 GIC 992 : cstate->range_table = pstate->p_rtable;
124 alvherre 1533 GNC 992 : cstate->rteperminfos = pstate->p_rteperminfos;
1534 : }
1535 :
867 heikki.linnakangas 1536 GIC 1022 : tupDesc = RelationGetDescr(cstate->rel);
1537 1022 : num_phys_attrs = tupDesc->natts;
867 heikki.linnakangas 1538 CBC 1022 : num_defaults = 0;
867 heikki.linnakangas 1539 GIC 1022 : volatile_defexprs = false;
867 heikki.linnakangas 1540 ECB :
1541 : /*
1542 : * Pick up the required catalog information for each attribute in the
1543 : * relation, including the input function, the element type (to pass to
1544 : * the input function), and info about defaults and constraints. (Which
1545 : * input function we use depends on text/binary format choice.)
1546 : */
867 heikki.linnakangas 1547 GIC 1022 : in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
1548 1022 : typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
867 heikki.linnakangas 1549 CBC 1022 : defmap = (int *) palloc(num_phys_attrs * sizeof(int));
867 heikki.linnakangas 1550 GIC 1022 : defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
867 heikki.linnakangas 1551 ECB :
228 drowley 1552 GNC 5086 : for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
867 heikki.linnakangas 1553 ECB : {
867 heikki.linnakangas 1554 CBC 4065 : Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
1555 :
867 heikki.linnakangas 1556 ECB : /* We don't need info for dropped attributes */
867 heikki.linnakangas 1557 CBC 4065 : if (att->attisdropped)
867 heikki.linnakangas 1558 GIC 62 : continue;
1559 :
1560 : /* Fetch the input function and typioparam info */
867 heikki.linnakangas 1561 CBC 4003 : if (cstate->opts.binary)
867 heikki.linnakangas 1562 GIC 31 : getTypeBinaryInputInfo(att->atttypid,
1563 31 : &in_func_oid, &typioparams[attnum - 1]);
1564 : else
1565 3972 : getTypeInputInfo(att->atttypid,
867 heikki.linnakangas 1566 CBC 3972 : &in_func_oid, &typioparams[attnum - 1]);
1567 4002 : fmgr_info(in_func_oid, &in_functions[attnum - 1]);
1568 :
1569 : /* Get default info if available */
27 andrew 1570 GNC 4002 : defexprs[attnum - 1] = NULL;
1571 :
1572 4002 : if (!att->attgenerated)
1573 : {
867 heikki.linnakangas 1574 CBC 3992 : Expr *defexpr = (Expr *) build_column_default(cstate->rel,
1575 : attnum);
867 heikki.linnakangas 1576 ECB :
867 heikki.linnakangas 1577 CBC 3992 : if (defexpr != NULL)
1578 : {
867 heikki.linnakangas 1579 ECB : /* Run the expression through planner */
867 heikki.linnakangas 1580 CBC 206 : defexpr = expression_planner(defexpr);
1581 :
1582 : /* Initialize executable expression in copycontext */
27 andrew 1583 GNC 206 : defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
1584 :
1585 : /* if NOT copied from input */
1586 : /* use default value if one exists */
1587 206 : if (!list_member_int(cstate->attnumlist, attnum))
1588 : {
1589 83 : defmap[num_defaults] = attnum - 1;
1590 83 : num_defaults++;
1591 : }
1592 :
1593 : /*
1594 : * If a default expression looks at the table being loaded,
867 heikki.linnakangas 1595 ECB : * then it could give the wrong answer when using
1596 : * multi-insert. Since database access can be dynamic this is
1597 : * hard to test for exactly, so we use the much wider test of
1598 : * whether the default expression is volatile. We allow for
1599 : * the special case of when the default expression is the
1600 : * nextval() of a sequence which in this specific case is
1601 : * known to be safe for use with the multi-insert
1602 : * optimization. Hence we use this special case function
1603 : * checker rather than the standard check for
1604 : * contain_volatile_functions().
1605 : */
867 heikki.linnakangas 1606 CBC 206 : if (!volatile_defexprs)
1607 206 : volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
1608 : }
867 heikki.linnakangas 1609 ECB : }
867 heikki.linnakangas 1610 EUB : }
1611 :
1612 :
1613 : /* initialize progress */
823 tomas.vondra 1614 CBC 1021 : pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
823 tomas.vondra 1615 GIC 1021 : cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
1616 1021 : cstate->bytes_processed = 0;
1617 :
1618 : /* We keep those variables in cstate. */
867 heikki.linnakangas 1619 CBC 1021 : cstate->in_functions = in_functions;
1620 1021 : cstate->typioparams = typioparams;
867 heikki.linnakangas 1621 GIC 1021 : cstate->defmap = defmap;
867 heikki.linnakangas 1622 CBC 1021 : cstate->defexprs = defexprs;
867 heikki.linnakangas 1623 GIC 1021 : cstate->volatile_defexprs = volatile_defexprs;
1624 1021 : cstate->num_defaults = num_defaults;
1625 1021 : cstate->is_program = is_program;
1626 :
867 heikki.linnakangas 1627 CBC 1021 : if (data_source_cb)
867 heikki.linnakangas 1628 ECB : {
761 michael 1629 GBC 154 : progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
867 heikki.linnakangas 1630 GIC 154 : cstate->copy_src = COPY_CALLBACK;
867 heikki.linnakangas 1631 CBC 154 : cstate->data_source_cb = data_source_cb;
1632 : }
867 heikki.linnakangas 1633 GIC 867 : else if (pipe)
1634 : {
761 michael 1635 GBC 411 : progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
867 heikki.linnakangas 1636 411 : Assert(!is_program); /* the grammar does not allow this */
867 heikki.linnakangas 1637 GIC 411 : if (whereToSendOutput == DestRemote)
1638 411 : ReceiveCopyBegin(cstate);
1639 : else
867 heikki.linnakangas 1640 LBC 0 : cstate->copy_file = stdin;
1641 : }
867 heikki.linnakangas 1642 ECB : else
1643 : {
867 heikki.linnakangas 1644 GIC 456 : cstate->filename = pstrdup(filename);
867 heikki.linnakangas 1645 ECB :
867 heikki.linnakangas 1646 CBC 456 : if (cstate->is_program)
867 heikki.linnakangas 1647 ECB : {
761 michael 1648 LBC 0 : progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
867 heikki.linnakangas 1649 0 : cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
1650 0 : if (cstate->copy_file == NULL)
867 heikki.linnakangas 1651 UIC 0 : ereport(ERROR,
1652 : (errcode_for_file_access(),
1653 : errmsg("could not execute command \"%s\": %m",
1654 : cstate->filename)));
1655 : }
1656 : else
1657 : {
867 heikki.linnakangas 1658 ECB : struct stat st;
1659 :
761 michael 1660 CBC 456 : progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
867 heikki.linnakangas 1661 GIC 456 : cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
867 heikki.linnakangas 1662 CBC 456 : if (cstate->copy_file == NULL)
1663 : {
1664 : /* copy errno because ereport subfunctions might change it */
867 heikki.linnakangas 1665 UIC 0 : int save_errno = errno;
1666 :
1667 0 : ereport(ERROR,
1668 : (errcode_for_file_access(),
867 heikki.linnakangas 1669 ECB : errmsg("could not open file \"%s\" for reading: %m",
1670 : cstate->filename),
867 heikki.linnakangas 1671 EUB : (save_errno == ENOENT || save_errno == EACCES) ?
1672 : errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
1673 : "You may want a client-side facility such as psql's \\copy.") : 0));
1674 : }
867 heikki.linnakangas 1675 ECB :
867 heikki.linnakangas 1676 CBC 456 : if (fstat(fileno(cstate->copy_file), &st))
867 heikki.linnakangas 1677 UIC 0 : ereport(ERROR,
867 heikki.linnakangas 1678 ECB : (errcode_for_file_access(),
1679 : errmsg("could not stat file \"%s\": %m",
1680 : cstate->filename)));
1681 :
867 heikki.linnakangas 1682 GIC 456 : if (S_ISDIR(st.st_mode))
867 heikki.linnakangas 1683 UIC 0 : ereport(ERROR,
867 heikki.linnakangas 1684 ECB : (errcode(ERRCODE_WRONG_OBJECT_TYPE),
1685 : errmsg("\"%s\" is a directory", cstate->filename)));
823 tomas.vondra 1686 :
761 michael 1687 CBC 456 : progress_vals[2] = st.st_size;
1688 : }
1689 : }
867 heikki.linnakangas 1690 ECB :
761 michael 1691 CBC 1021 : pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
761 michael 1692 ECB :
867 heikki.linnakangas 1693 CBC 1021 : if (cstate->opts.binary)
1694 : {
1695 : /* Read and verify binary header */
867 heikki.linnakangas 1696 GIC 7 : ReceiveCopyBinaryHeader(cstate);
1697 : }
1698 :
1699 : /* create workspace for CopyReadAttributes results */
1700 1021 : if (!cstate->opts.binary)
867 heikki.linnakangas 1701 ECB : {
867 heikki.linnakangas 1702 CBC 1014 : AttrNumber attr_count = list_length(cstate->attnumlist);
867 heikki.linnakangas 1703 ECB :
867 heikki.linnakangas 1704 CBC 1014 : cstate->max_fields = attr_count;
867 heikki.linnakangas 1705 GIC 1014 : cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
867 heikki.linnakangas 1706 ECB : }
1707 :
867 heikki.linnakangas 1708 CBC 1021 : MemoryContextSwitchTo(oldcontext);
1709 :
867 heikki.linnakangas 1710 GIC 1021 : return cstate;
867 heikki.linnakangas 1711 ECB : }
1712 :
1713 : /*
1714 : * Clean up storage and release resources for COPY FROM.
1715 : */
1716 : void
867 heikki.linnakangas 1717 CBC 780 : EndCopyFrom(CopyFromState cstate)
1718 : {
867 heikki.linnakangas 1719 ECB : /* No COPY FROM related resources except memory. */
867 heikki.linnakangas 1720 CBC 780 : if (cstate->is_program)
867 heikki.linnakangas 1721 ECB : {
867 heikki.linnakangas 1722 UIC 0 : ClosePipeFromProgram(cstate);
1723 : }
867 heikki.linnakangas 1724 ECB : else
1725 : {
867 heikki.linnakangas 1726 CBC 780 : if (cstate->filename != NULL && FreeFile(cstate->copy_file))
867 heikki.linnakangas 1727 UIC 0 : ereport(ERROR,
867 heikki.linnakangas 1728 ECB : (errcode_for_file_access(),
1729 : errmsg("could not close file \"%s\": %m",
1730 : cstate->filename)));
1731 : }
1732 :
823 tomas.vondra 1733 GIC 780 : pgstat_progress_end_command();
823 tomas.vondra 1734 ECB :
867 heikki.linnakangas 1735 GIC 780 : MemoryContextDelete(cstate->copycontext);
1736 780 : pfree(cstate);
867 heikki.linnakangas 1737 CBC 780 : }
1738 :
1739 : /*
1740 : * Closes the pipe from an external program, checking the pclose() return code.
867 heikki.linnakangas 1741 ECB : */
1742 : static void
867 heikki.linnakangas 1743 LBC 0 : ClosePipeFromProgram(CopyFromState cstate)
867 heikki.linnakangas 1744 ECB : {
1745 : int pclose_rc;
1746 :
867 heikki.linnakangas 1747 UIC 0 : Assert(cstate->is_program);
1748 :
1749 0 : pclose_rc = ClosePipeStream(cstate->copy_file);
1750 0 : if (pclose_rc == -1)
1751 0 : ereport(ERROR,
1752 : (errcode_for_file_access(),
1753 : errmsg("could not close pipe to external command: %m")));
1754 0 : else if (pclose_rc != 0)
1755 : {
1756 : /*
1757 : * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
1758 : * expectable for the called program to fail with SIGPIPE, and we
1759 : * should not report that as an error. Otherwise, SIGPIPE indicates a
867 heikki.linnakangas 1760 ECB : * problem.
1761 : */
738 heikki.linnakangas 1762 UIC 0 : if (!cstate->raw_reached_eof &&
867 1763 0 : wait_result_is_signal(pclose_rc, SIGPIPE))
1764 0 : return;
1765 :
1766 0 : ereport(ERROR,
1767 : (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
867 heikki.linnakangas 1768 ECB : errmsg("program \"%s\" failed",
1769 : cstate->filename),
1770 : errdetail_internal("%s", wait_result_to_str(pclose_rc))));
1771 : }
1772 : }
|