LCOV - differential code coverage report
Current view: top level - src/backend/commands - copyfrom.c (source / functions) Coverage Total Hit UNC LBC UIC UBC GBC GIC GNC CBC EUB ECB DUB DCB
Current: Differential Code Coverage HEAD vs 15 Lines: 89.8 % 558 501 7 15 34 1 7 268 93 133 44 315 5 38
Current Date: 2023-04-08 15:15:32 Functions: 94.1 % 17 16 1 15 1 1 15 1
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * copyfrom.c
       4                 :  *      COPY <table> FROM file/program/client
       5                 :  *
       6                 :  * This file contains routines needed to efficiently load tuples into a
       7                 :  * table.  That includes looking up the correct partition, firing triggers,
       8                 :  * calling the table AM function to insert the data, and updating indexes.
       9                 :  * Reading data from the input file or client and parsing it into Datums
      10                 :  * is handled in copyfromparse.c.
      11                 :  *
      12                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      13                 :  * Portions Copyright (c) 1994, Regents of the University of California
      14                 :  *
      15                 :  *
      16                 :  * IDENTIFICATION
      17                 :  *    src/backend/commands/copyfrom.c
      18                 :  *
      19                 :  *-------------------------------------------------------------------------
      20                 :  */
      21                 : #include "postgres.h"
      22                 : 
      23                 : #include <ctype.h>
      24                 : #include <unistd.h>
      25                 : #include <sys/stat.h>
      26                 : 
      27                 : #include "access/heapam.h"
      28                 : #include "access/htup_details.h"
      29                 : #include "access/tableam.h"
      30                 : #include "access/xact.h"
      31                 : #include "access/xlog.h"
      32                 : #include "catalog/namespace.h"
      33                 : #include "commands/copy.h"
      34                 : #include "commands/copyfrom_internal.h"
      35                 : #include "commands/progress.h"
      36                 : #include "commands/trigger.h"
      37                 : #include "executor/execPartition.h"
      38                 : #include "executor/executor.h"
      39                 : #include "executor/nodeModifyTable.h"
      40                 : #include "executor/tuptable.h"
      41                 : #include "foreign/fdwapi.h"
      42                 : #include "libpq/libpq.h"
      43                 : #include "libpq/pqformat.h"
      44                 : #include "miscadmin.h"
      45                 : #include "optimizer/optimizer.h"
      46                 : #include "pgstat.h"
      47                 : #include "rewrite/rewriteHandler.h"
      48                 : #include "storage/fd.h"
      49                 : #include "tcop/tcopprot.h"
      50                 : #include "utils/lsyscache.h"
      51                 : #include "utils/memutils.h"
      52                 : #include "utils/portal.h"
      53                 : #include "utils/rel.h"
      54                 : #include "utils/snapmgr.h"
      55                 : 
      56                 : /*
      57                 :  * No more than this many tuples per CopyMultiInsertBuffer
      58                 :  *
      59                 :  * Caution: Don't make this too big, as we could end up with this many
      60                 :  * CopyMultiInsertBuffer items stored in CopyMultiInsertInfo's
      61                 :  * multiInsertBuffers list.  Increasing this can cause quadratic growth in
      62                 :  * memory requirements during copies into partitioned tables with a large
      63                 :  * number of partitions.
      64                 :  */
      65                 : #define MAX_BUFFERED_TUPLES     1000
      66                 : 
      67                 : /*
      68                 :  * Flush buffers if there are >= this many bytes, as counted by the input
      69                 :  * size, of tuples stored.
      70                 :  */
      71                 : #define MAX_BUFFERED_BYTES      65535
      72                 : 
      73                 : /* Trim the list of buffers back down to this number after flushing */
      74                 : #define MAX_PARTITION_BUFFERS   32
      75                 : 
      76                 : /* Stores multi-insert data related to a single relation in CopyFrom. */
      77                 : typedef struct CopyMultiInsertBuffer
      78                 : {
      79                 :     TupleTableSlot *slots[MAX_BUFFERED_TUPLES]; /* Array to store tuples */
      80                 :     ResultRelInfo *resultRelInfo;   /* ResultRelInfo for 'relid' */
      81                 :     BulkInsertState bistate;    /* BulkInsertState for this rel if plain
      82                 :                                  * table; NULL if foreign table */
      83                 :     int         nused;          /* number of 'slots' containing tuples */
      84                 :     uint64      linenos[MAX_BUFFERED_TUPLES];   /* Line # of tuple in copy
      85                 :                                                  * stream */
      86                 : } CopyMultiInsertBuffer;
      87                 : 
      88                 : /*
      89                 :  * Stores one or many CopyMultiInsertBuffers and details about the size and
      90                 :  * number of tuples which are stored in them.  This allows multiple buffers to
      91                 :  * exist at once when COPYing into a partitioned table.
      92                 :  */
      93                 : typedef struct CopyMultiInsertInfo
      94                 : {
      95                 :     List       *multiInsertBuffers; /* List of tracked CopyMultiInsertBuffers */
      96                 :     int         bufferedTuples; /* number of tuples buffered over all buffers */
      97                 :     int         bufferedBytes;  /* number of bytes from all buffered tuples */
      98                 :     CopyFromState cstate;       /* Copy state for this CopyMultiInsertInfo */
      99                 :     EState     *estate;         /* Executor state used for COPY */
     100                 :     CommandId   mycid;          /* Command Id used for COPY */
     101                 :     int         ti_options;     /* table insert options */
     102                 : } CopyMultiInsertInfo;
     103                 : 
     104                 : 
     105                 : /* non-export function prototypes */
     106                 : static char *limit_printout_length(const char *str);
     107                 : 
     108                 : static void ClosePipeFromProgram(CopyFromState cstate);
     109                 : 
     110                 : /*
     111                 :  * error context callback for COPY FROM
     112                 :  *
     113                 :  * The argument for the error context must be CopyFromState.
     114                 :  */
     115                 : void
     116 GIC         106 : CopyFromErrorCallback(void *arg)
     117 ECB             : {
     118 GIC         106 :     CopyFromState cstate = (CopyFromState) arg;
     119 ECB             : 
     120 GNC         106 :     if (cstate->relname_only)
     121                 :     {
     122               1 :         errcontext("COPY %s",
     123                 :                    cstate->cur_relname);
     124               1 :         return;
     125                 :     }
     126 GIC         105 :     if (cstate->opts.binary)
     127 ECB             :     {
     128                 :         /* can't usefully display the data */
     129 CBC           1 :         if (cstate->cur_attname)
     130 GIC           1 :             errcontext("COPY %s, line %llu, column %s",
     131 ECB             :                        cstate->cur_relname,
     132 GIC           1 :                        (unsigned long long) cstate->cur_lineno,
     133 ECB             :                        cstate->cur_attname);
     134                 :         else
     135 UIC           0 :             errcontext("COPY %s, line %llu",
     136 ECB             :                        cstate->cur_relname,
     137 LBC           0 :                        (unsigned long long) cstate->cur_lineno);
     138                 :     }
     139 ECB             :     else
     140                 :     {
     141 GIC         104 :         if (cstate->cur_attname && cstate->cur_attval)
     142 GBC          10 :         {
     143                 :             /* error is relevant to a particular column */
     144 EUB             :             char       *attval;
     145                 : 
     146 GIC          10 :             attval = limit_printout_length(cstate->cur_attval);
     147              10 :             errcontext("COPY %s, line %llu, column %s: \"%s\"",
     148 ECB             :                        cstate->cur_relname,
     149 CBC          10 :                        (unsigned long long) cstate->cur_lineno,
     150                 :                        cstate->cur_attname,
     151                 :                        attval);
     152 GIC          10 :             pfree(attval);
     153 ECB             :         }
     154 CBC          94 :         else if (cstate->cur_attname)
     155                 :         {
     156 ECB             :             /* error is relevant to a particular column, value is NULL */
     157 GIC           3 :             errcontext("COPY %s, line %llu, column %s: null input",
     158                 :                        cstate->cur_relname,
     159 CBC           3 :                        (unsigned long long) cstate->cur_lineno,
     160                 :                        cstate->cur_attname);
     161 ECB             :         }
     162                 :         else
     163                 :         {
     164                 :             /*
     165                 :              * Error is relevant to a particular line.
     166                 :              *
     167                 :              * If line_buf still contains the correct line, print it.
     168                 :              */
     169 GIC          91 :             if (cstate->line_buf_valid)
     170                 :             {
     171                 :                 char       *lineval;
     172                 : 
     173              86 :                 lineval = limit_printout_length(cstate->line_buf.data);
     174              86 :                 errcontext("COPY %s, line %llu: \"%s\"",
     175                 :                            cstate->cur_relname,
     176 CBC          86 :                            (unsigned long long) cstate->cur_lineno, lineval);
     177 GIC          86 :                 pfree(lineval);
     178                 :             }
     179                 :             else
     180 ECB             :             {
     181 CBC           5 :                 errcontext("COPY %s, line %llu",
     182                 :                            cstate->cur_relname,
     183               5 :                            (unsigned long long) cstate->cur_lineno);
     184 ECB             :             }
     185                 :         }
     186                 :     }
     187                 : }
     188                 : 
     189                 : /*
     190                 :  * Make sure we don't print an unreasonable amount of COPY data in a message.
     191                 :  *
     192                 :  * Returns a pstrdup'd copy of the input.
     193                 :  */
     194                 : static char *
     195 GIC          96 : limit_printout_length(const char *str)
     196                 : {
     197                 : #define MAX_COPY_DATA_DISPLAY 100
     198                 : 
     199              96 :     int         slen = strlen(str);
     200                 :     int         len;
     201                 :     char       *res;
     202 ECB             : 
     203                 :     /* Fast path if definitely okay */
     204 GIC          96 :     if (slen <= MAX_COPY_DATA_DISPLAY)
     205              96 :         return pstrdup(str);
     206 ECB             : 
     207                 :     /* Apply encoding-dependent truncation */
     208 UIC           0 :     len = pg_mbcliplen(str, slen, MAX_COPY_DATA_DISPLAY);
     209                 : 
     210                 :     /*
     211 ECB             :      * Truncate, and add "..." to show we truncated the input.
     212                 :      */
     213 UIC           0 :     res = (char *) palloc(len + 4);
     214               0 :     memcpy(res, str, len);
     215 UBC           0 :     strcpy(res + len, "...");
     216                 : 
     217 UIC           0 :     return res;
     218                 : }
     219                 : 
     220 EUB             : /*
     221                 :  * Allocate memory and initialize a new CopyMultiInsertBuffer for this
     222                 :  * ResultRelInfo.
     223                 :  */
     224                 : static CopyMultiInsertBuffer *
     225 GIC         911 : CopyMultiInsertBufferInit(ResultRelInfo *rri)
     226                 : {
     227                 :     CopyMultiInsertBuffer *buffer;
     228                 : 
     229             911 :     buffer = (CopyMultiInsertBuffer *) palloc(sizeof(CopyMultiInsertBuffer));
     230             911 :     memset(buffer->slots, 0, sizeof(TupleTableSlot *) * MAX_BUFFERED_TUPLES);
     231             911 :     buffer->resultRelInfo = rri;
     232 GNC         911 :     buffer->bistate = (rri->ri_FdwRoutine == NULL) ? GetBulkInsertState() : NULL;
     233 GIC         911 :     buffer->nused = 0;
     234                 : 
     235             911 :     return buffer;
     236 ECB             : }
     237                 : 
     238                 : /*
     239                 :  * Make a new buffer for this ResultRelInfo.
     240                 :  */
     241                 : static inline void
     242 CBC         911 : CopyMultiInsertInfoSetupBuffer(CopyMultiInsertInfo *miinfo,
     243                 :                                ResultRelInfo *rri)
     244                 : {
     245                 :     CopyMultiInsertBuffer *buffer;
     246                 : 
     247 GIC         911 :     buffer = CopyMultiInsertBufferInit(rri);
     248                 : 
     249 ECB             :     /* Setup back-link so we can easily find this buffer again */
     250 GIC         911 :     rri->ri_CopyMultiInsertBuffer = buffer;
     251                 :     /* Record that we're tracking this buffer */
     252             911 :     miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     253             911 : }
     254 ECB             : 
     255                 : /*
     256                 :  * Initialize an already allocated CopyMultiInsertInfo.
     257                 :  *
     258                 :  * If rri is a non-partitioned table then a CopyMultiInsertBuffer is set up
     259                 :  * for that table.
     260                 :  */
     261                 : static void
     262 GIC         905 : CopyMultiInsertInfoInit(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     263                 :                         CopyFromState cstate, EState *estate, CommandId mycid,
     264                 :                         int ti_options)
     265                 : {
     266             905 :     miinfo->multiInsertBuffers = NIL;
     267             905 :     miinfo->bufferedTuples = 0;
     268             905 :     miinfo->bufferedBytes = 0;
     269 CBC         905 :     miinfo->cstate = cstate;
     270 GIC         905 :     miinfo->estate = estate;
     271             905 :     miinfo->mycid = mycid;
     272             905 :     miinfo->ti_options = ti_options;
     273 ECB             : 
     274                 :     /*
     275                 :      * Only setup the buffer when not dealing with a partitioned table.
     276                 :      * Buffers for partitioned tables will just be setup when we need to send
     277                 :      * tuples their way for the first time.
     278                 :      */
     279 CBC         905 :     if (rri->ri_RelationDesc->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
     280 GIC         869 :         CopyMultiInsertInfoSetupBuffer(miinfo, rri);
     281             905 : }
     282                 : 
     283                 : /*
     284                 :  * Returns true if the buffers are full
     285                 :  */
     286 ECB             : static inline bool
     287 CBC      893642 : CopyMultiInsertInfoIsFull(CopyMultiInsertInfo *miinfo)
     288 ECB             : {
     289 GIC      893642 :     if (miinfo->bufferedTuples >= MAX_BUFFERED_TUPLES ||
     290          893045 :         miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
     291             650 :         return true;
     292          892992 :     return false;
     293                 : }
     294 ECB             : 
     295                 : /*
     296                 :  * Returns true if we have no buffered tuples
     297                 :  */
     298                 : static inline bool
     299 CBC         863 : CopyMultiInsertInfoIsEmpty(CopyMultiInsertInfo *miinfo)
     300                 : {
     301 GIC         863 :     return miinfo->bufferedTuples == 0;
     302                 : }
     303                 : 
     304                 : /*
     305                 :  * Write the tuples stored in 'buffer' out to the table.
     306 ECB             :  */
     307                 : static inline void
     308 CBC        1499 : CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
     309                 :                            CopyMultiInsertBuffer *buffer,
     310                 :                            int64 *processed)
     311                 : {
     312 GIC        1499 :     CopyFromState cstate = miinfo->cstate;
     313 CBC        1499 :     EState     *estate = miinfo->estate;
     314            1499 :     int         nused = buffer->nused;
     315            1499 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     316            1499 :     TupleTableSlot **slots = buffer->slots;
     317                 :     int         i;
     318 ECB             : 
     319 GNC        1499 :     if (resultRelInfo->ri_FdwRoutine)
     320 ECB             :     {
     321 GNC           7 :         int         batch_size = resultRelInfo->ri_BatchSize;
     322               7 :         int         sent = 0;
     323 ECB             : 
     324 GNC           7 :         Assert(buffer->bistate == NULL);
     325                 : 
     326                 :         /* Ensure that the FDW supports batching and it's enabled */
     327               7 :         Assert(resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert);
     328               7 :         Assert(batch_size > 1);
     329                 : 
     330                 :         /*
     331                 :          * We suppress error context information other than the relation name,
     332                 :          * if one of the operations below fails.
     333                 :          */
     334               7 :         Assert(!cstate->relname_only);
     335               7 :         cstate->relname_only = true;
     336                 : 
     337              19 :         while (sent < nused)
     338                 :         {
     339              13 :             int         size = (batch_size < nused - sent) ? batch_size : (nused - sent);
     340              13 :             int         inserted = size;
     341                 :             TupleTableSlot **rslots;
     342                 : 
     343                 :             /* insert into foreign table: let the FDW do it */
     344                 :             rslots =
     345              13 :                 resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert(estate,
     346                 :                                                                      resultRelInfo,
     347              13 :                                                                      &slots[sent],
     348                 :                                                                      NULL,
     349                 :                                                                      &inserted);
     350                 : 
     351              12 :             sent += size;
     352                 : 
     353                 :             /* No need to do anything if there are no inserted rows */
     354              12 :             if (inserted <= 0)
     355               2 :                 continue;
     356                 : 
     357                 :             /* Triggers on foreign tables should not have transition tables */
     358              10 :             Assert(resultRelInfo->ri_TrigDesc == NULL ||
     359                 :                    resultRelInfo->ri_TrigDesc->trig_insert_new_table == false);
     360                 : 
     361                 :             /* Run AFTER ROW INSERT triggers */
     362              10 :             if (resultRelInfo->ri_TrigDesc != NULL &&
     363 UNC           0 :                 resultRelInfo->ri_TrigDesc->trig_insert_after_row)
     364                 :             {
     365               0 :                 Oid         relid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
     366                 : 
     367               0 :                 for (i = 0; i < inserted; i++)
     368                 :                 {
     369               0 :                     TupleTableSlot *slot = rslots[i];
     370                 : 
     371                 :                     /*
     372                 :                      * AFTER ROW Triggers might reference the tableoid column,
     373                 :                      * so (re-)initialize tts_tableOid before evaluating them.
     374                 :                      */
     375               0 :                     slot->tts_tableOid = relid;
     376                 : 
     377               0 :                     ExecARInsertTriggers(estate, resultRelInfo,
     378                 :                                          slot, NIL,
     379                 :                                          cstate->transition_capture);
     380                 :                 }
     381                 :             }
     382                 : 
     383                 :             /* Update the row counter and progress of the COPY command */
     384 GNC          10 :             *processed += inserted;
     385              10 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     386                 :                                          *processed);
     387                 :         }
     388 EUB             : 
     389 GNC          24 :         for (i = 0; i < nused; i++)
     390              18 :             ExecClearTuple(slots[i]);
     391                 : 
     392                 :         /* reset relname_only */
     393               6 :         cstate->relname_only = false;
     394                 :     }
     395                 :     else
     396                 :     {
     397            1492 :         CommandId   mycid = miinfo->mycid;
     398            1492 :         int         ti_options = miinfo->ti_options;
     399            1492 :         bool        line_buf_valid = cstate->line_buf_valid;
     400            1492 :         uint64      save_cur_lineno = cstate->cur_lineno;
     401                 :         MemoryContext oldcontext;
     402                 : 
     403            1492 :         Assert(buffer->bistate != NULL);
     404                 : 
     405                 :         /*
     406                 :          * Print error context information correctly, if one of the operations
     407                 :          * below fails.
     408                 :          */
     409            1492 :         cstate->line_buf_valid = false;
     410                 : 
     411                 :         /*
     412                 :          * table_multi_insert may leak memory, so switch to short-lived memory
     413                 :          * context before calling it.
     414                 :          */
     415            1492 :         oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     416            1492 :         table_multi_insert(resultRelInfo->ri_RelationDesc,
     417                 :                            slots,
     418                 :                            nused,
     419                 :                            mycid,
     420                 :                            ti_options,
     421                 :                            buffer->bistate);
     422            1492 :         MemoryContextSwitchTo(oldcontext);
     423                 : 
     424          895062 :         for (i = 0; i < nused; i++)
     425                 :         {
     426                 :             /*
     427                 :              * If there are any indexes, update them for all the inserted
     428                 :              * tuples, and run AFTER ROW INSERT triggers.
     429                 :              */
     430          893575 :             if (resultRelInfo->ri_NumIndices > 0)
     431                 :             {
     432                 :                 List       *recheckIndexes;
     433                 : 
     434          200874 :                 cstate->cur_lineno = buffer->linenos[i];
     435                 :                 recheckIndexes =
     436          200874 :                     ExecInsertIndexTuples(resultRelInfo,
     437                 :                                           buffer->slots[i], estate, false,
     438                 :                                           false, NULL, NIL, false);
     439          200869 :                 ExecARInsertTriggers(estate, resultRelInfo,
     440          200869 :                                      slots[i], recheckIndexes,
     441                 :                                      cstate->transition_capture);
     442          200869 :                 list_free(recheckIndexes);
     443                 :             }
     444                 : 
     445                 :             /*
     446                 :              * There's no indexes, but see if we need to run AFTER ROW INSERT
     447                 :              * triggers anyway.
     448                 :              */
     449          692701 :             else if (resultRelInfo->ri_TrigDesc != NULL &&
     450              39 :                      (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
     451              30 :                       resultRelInfo->ri_TrigDesc->trig_insert_new_table))
     452                 :             {
     453              18 :                 cstate->cur_lineno = buffer->linenos[i];
     454              18 :                 ExecARInsertTriggers(estate, resultRelInfo,
     455              18 :                                      slots[i], NIL,
     456                 :                                      cstate->transition_capture);
     457                 :             }
     458                 : 
     459          893570 :             ExecClearTuple(slots[i]);
     460                 :         }
     461                 : 
     462                 :         /* Update the row counter and progress of the COPY command */
     463            1487 :         *processed += nused;
     464            1487 :         pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
     465                 :                                      *processed);
     466                 : 
     467                 :         /* reset cur_lineno and line_buf_valid to what they were */
     468            1487 :         cstate->line_buf_valid = line_buf_valid;
     469            1487 :         cstate->cur_lineno = save_cur_lineno;
     470                 :     }
     471                 : 
     472                 :     /* Mark that all slots are free */
     473 GIC        1493 :     buffer->nused = 0;
     474            1493 : }
     475                 : 
     476                 : /*
     477                 :  * Drop used slots and free member for this buffer.
     478                 :  *
     479 ECB             :  * The buffer must be flushed before cleanup.
     480                 :  */
     481                 : static inline void
     482 GIC         849 : CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
     483                 :                              CopyMultiInsertBuffer *buffer)
     484 ECB             : {
     485 GNC         849 :     ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
     486 ECB             :     int         i;
     487                 : 
     488                 :     /* Ensure buffer was flushed */
     489 CBC         849 :     Assert(buffer->nused == 0);
     490                 : 
     491                 :     /* Remove back-link to ourself */
     492 GNC         849 :     resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
     493 ECB             : 
     494 GNC         849 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     495                 :     {
     496             843 :         Assert(buffer->bistate != NULL);
     497             843 :         FreeBulkInsertState(buffer->bistate);
     498                 :     }
     499                 :     else
     500               6 :         Assert(buffer->bistate == NULL);
     501 ECB             : 
     502                 :     /* Since we only create slots on demand, just drop the non-null ones. */
     503 GIC      315103 :     for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
     504          314254 :         ExecDropSingleTupleTableSlot(buffer->slots[i]);
     505 ECB             : 
     506 GNC         849 :     if (resultRelInfo->ri_FdwRoutine == NULL)
     507             843 :         table_finish_bulk_insert(resultRelInfo->ri_RelationDesc,
     508                 :                                  miinfo->ti_options);
     509                 : 
     510 GIC         849 :     pfree(buffer);
     511             849 : }
     512 ECB             : 
     513                 : /*
     514                 :  * Write out all stored tuples in all buffers out to the tables.
     515                 :  *
     516                 :  * Once flushed we also trim the tracked buffers list down to size by removing
     517                 :  * the buffers created earliest first.
     518                 :  *
     519                 :  * Callers should pass 'curr_rri' as the ResultRelInfo that's currently being
     520                 :  * used.  When cleaning up old buffers we'll never remove the one for
     521                 :  * 'curr_rri'.
     522                 :  */
     523                 : static inline void
     524 GNC        1386 : CopyMultiInsertInfoFlush(CopyMultiInsertInfo *miinfo, ResultRelInfo *curr_rri,
     525                 :                          int64 *processed)
     526 ECB             : {
     527                 :     ListCell   *lc;
     528                 : 
     529 GIC        2879 :     foreach(lc, miinfo->multiInsertBuffers)
     530                 :     {
     531            1499 :         CopyMultiInsertBuffer *buffer = (CopyMultiInsertBuffer *) lfirst(lc);
     532                 : 
     533 GNC        1499 :         CopyMultiInsertBufferFlush(miinfo, buffer, processed);
     534 ECB             :     }
     535                 : 
     536 GIC        1380 :     miinfo->bufferedTuples = 0;
     537            1380 :     miinfo->bufferedBytes = 0;
     538 ECB             : 
     539                 :     /*
     540                 :      * Trim the list of tracked buffers down if it exceeds the limit.  Here we
     541                 :      * remove buffers starting with the ones we created first.  It seems less
     542                 :      * likely that these older ones will be needed than the ones that were
     543                 :      * just created.
     544                 :      */
     545 GIC        1380 :     while (list_length(miinfo->multiInsertBuffers) > MAX_PARTITION_BUFFERS)
     546 ECB             :     {
     547                 :         CopyMultiInsertBuffer *buffer;
     548                 : 
     549 UIC           0 :         buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     550                 : 
     551                 :         /*
     552                 :          * We never want to remove the buffer that's currently being used, so
     553 ECB             :          * if we happen to find that then move it to the end of the list.
     554                 :          */
     555 LBC           0 :         if (buffer->resultRelInfo == curr_rri)
     556                 :         {
     557               0 :             miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     558               0 :             miinfo->multiInsertBuffers = lappend(miinfo->multiInsertBuffers, buffer);
     559               0 :             buffer = (CopyMultiInsertBuffer *) linitial(miinfo->multiInsertBuffers);
     560                 :         }
     561                 : 
     562 UIC           0 :         CopyMultiInsertBufferCleanup(miinfo, buffer);
     563 LBC           0 :         miinfo->multiInsertBuffers = list_delete_first(miinfo->multiInsertBuffers);
     564                 :     }
     565 GIC        1380 : }
     566                 : 
     567 ECB             : /*
     568                 :  * Cleanup allocated buffers and free memory
     569                 :  */
     570                 : static inline void
     571 GIC         843 : CopyMultiInsertInfoCleanup(CopyMultiInsertInfo *miinfo)
     572 ECB             : {
     573                 :     ListCell   *lc;
     574                 : 
     575 GIC        1692 :     foreach(lc, miinfo->multiInsertBuffers)
     576             849 :         CopyMultiInsertBufferCleanup(miinfo, lfirst(lc));
     577 ECB             : 
     578 CBC         843 :     list_free(miinfo->multiInsertBuffers);
     579 GIC         843 : }
     580                 : 
     581                 : /*
     582                 :  * Get the next TupleTableSlot that the next tuple should be stored in.
     583                 :  *
     584                 :  * Callers must ensure that the buffer is not full.
     585                 :  *
     586 ECB             :  * Note: 'miinfo' is unused but has been included for consistency with the
     587                 :  * other functions in this area.
     588                 :  */
     589                 : static inline TupleTableSlot *
     590 GIC      894514 : CopyMultiInsertInfoNextFreeSlot(CopyMultiInsertInfo *miinfo,
     591                 :                                 ResultRelInfo *rri)
     592                 : {
     593 CBC      894514 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     594 GIC      894514 :     int         nused = buffer->nused;
     595                 : 
     596 CBC      894514 :     Assert(buffer != NULL);
     597 GIC      894514 :     Assert(nused < MAX_BUFFERED_TUPLES);
     598 ECB             : 
     599 GIC      894514 :     if (buffer->slots[nused] == NULL)
     600 CBC      314369 :         buffer->slots[nused] = table_slot_create(rri->ri_RelationDesc, NULL);
     601          894514 :     return buffer->slots[nused];
     602                 : }
     603                 : 
     604 ECB             : /*
     605                 :  * Record the previously reserved TupleTableSlot that was reserved by
     606                 :  * CopyMultiInsertInfoNextFreeSlot as being consumed.
     607                 :  */
     608                 : static inline void
     609 GIC      893642 : CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
     610 ECB             :                          TupleTableSlot *slot, int tuplen, uint64 lineno)
     611                 : {
     612 GIC      893642 :     CopyMultiInsertBuffer *buffer = rri->ri_CopyMultiInsertBuffer;
     613                 : 
     614 CBC      893642 :     Assert(buffer != NULL);
     615          893642 :     Assert(slot == buffer->slots[buffer->nused]);
     616                 : 
     617                 :     /* Store the line number so we can properly report any errors later */
     618 GIC      893642 :     buffer->linenos[buffer->nused] = lineno;
     619                 : 
     620                 :     /* Record this slot as being used */
     621          893642 :     buffer->nused++;
     622                 : 
     623                 :     /* Update how many tuples are stored and their size */
     624          893642 :     miinfo->bufferedTuples++;
     625          893642 :     miinfo->bufferedBytes += tuplen;
     626          893642 : }
     627                 : 
     628 ECB             : /*
     629                 :  * Copy FROM file to relation.
     630                 :  */
     631                 : uint64
     632 GIC         991 : CopyFrom(CopyFromState cstate)
     633 ECB             : {
     634                 :     ResultRelInfo *resultRelInfo;
     635                 :     ResultRelInfo *target_resultRelInfo;
     636 GIC         991 :     ResultRelInfo *prevResultRelInfo = NULL;
     637 CBC         991 :     EState     *estate = CreateExecutorState(); /* for ExecConstraints() */
     638                 :     ModifyTableState *mtstate;
     639                 :     ExprContext *econtext;
     640             991 :     TupleTableSlot *singleslot = NULL;
     641             991 :     MemoryContext oldcontext = CurrentMemoryContext;
     642                 : 
     643 GIC         991 :     PartitionTupleRouting *proute = NULL;
     644                 :     ErrorContextCallback errcallback;
     645             991 :     CommandId   mycid = GetCurrentCommandId(true);
     646             991 :     int         ti_options = 0; /* start with default options for insert */
     647             991 :     BulkInsertState bistate = NULL;
     648                 :     CopyInsertMethod insertMethod;
     649 CBC         991 :     CopyMultiInsertInfo multiInsertInfo = {0};  /* pacify compiler */
     650 GIC         991 :     int64       processed = 0;
     651             991 :     int64       excluded = 0;
     652                 :     bool        has_before_insert_row_trig;
     653 EUB             :     bool        has_instead_insert_row_trig;
     654 GIC         991 :     bool        leafpart_use_multi_insert = false;
     655                 : 
     656             991 :     Assert(cstate->rel);
     657             991 :     Assert(list_length(cstate->range_table) == 1);
     658                 : 
     659 EUB             :     /*
     660                 :      * The target must be a plain, foreign, or partitioned relation, or have
     661                 :      * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
     662                 :      * allowed on views, so we only hint about them in the view case.)
     663                 :      */
     664 GIC         991 :     if (cstate->rel->rd_rel->relkind != RELKIND_RELATION &&
     665              76 :         cstate->rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&
     666 GBC          57 :         cstate->rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE &&
     667               9 :         !(cstate->rel->trigdesc &&
     668 GIC           6 :           cstate->rel->trigdesc->trig_insert_instead_row))
     669 ECB             :     {
     670 GIC           3 :         if (cstate->rel->rd_rel->relkind == RELKIND_VIEW)
     671               3 :             ereport(ERROR,
     672                 :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     673                 :                      errmsg("cannot copy to view \"%s\"",
     674                 :                             RelationGetRelationName(cstate->rel)),
     675 ECB             :                      errhint("To enable copying to a view, provide an INSTEAD OF INSERT trigger.")));
     676 UIC           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_MATVIEW)
     677               0 :             ereport(ERROR,
     678                 :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     679 ECB             :                      errmsg("cannot copy to materialized view \"%s\"",
     680                 :                             RelationGetRelationName(cstate->rel))));
     681 UIC           0 :         else if (cstate->rel->rd_rel->relkind == RELKIND_SEQUENCE)
     682 LBC           0 :             ereport(ERROR,
     683 ECB             :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     684                 :                      errmsg("cannot copy to sequence \"%s\"",
     685                 :                             RelationGetRelationName(cstate->rel))));
     686                 :         else
     687 UIC           0 :             ereport(ERROR,
     688                 :                     (errcode(ERRCODE_WRONG_OBJECT_TYPE),
     689                 :                      errmsg("cannot copy to non-table relation \"%s\"",
     690                 :                             RelationGetRelationName(cstate->rel))));
     691                 :     }
     692                 : 
     693                 :     /*
     694 ECB             :      * If the target file is new-in-transaction, we assume that checking FSM
     695                 :      * for free space is a waste of time.  This could possibly be wrong, but
     696                 :      * it's unlikely.
     697                 :      */
     698 CBC         988 :     if (RELKIND_HAS_STORAGE(cstate->rel->rd_rel->relkind) &&
     699 GIC         915 :         (cstate->rel->rd_createSubid != InvalidSubTransactionId ||
     700 GNC         909 :          cstate->rel->rd_firstRelfilelocatorSubid != InvalidSubTransactionId))
     701 CBC          40 :         ti_options |= TABLE_INSERT_SKIP_FSM;
     702                 : 
     703 ECB             :     /*
     704                 :      * Optimize if new relation storage was created in this subxact or one of
     705                 :      * its committed children and we won't see those rows later as part of an
     706                 :      * earlier scan or command. The subxact test ensures that if this subxact
     707                 :      * aborts then the frozen rows won't be visible after xact cleanup.  Note
     708                 :      * that the stronger test of exactly which subtransaction created it is
     709                 :      * crucial for correctness of this optimization. The test for an earlier
     710                 :      * scan or command tolerates false negatives. FREEZE causes other sessions
     711                 :      * to see rows they would not see under MVCC, and a false negative merely
     712                 :      * spreads that anomaly to the current session.
     713                 :      */
     714 GIC         988 :     if (cstate->opts.freeze)
     715                 :     {
     716 ECB             :         /*
     717                 :          * We currently disallow COPY FREEZE on partitioned tables.  The
     718                 :          * reason for this is that we've simply not yet opened the partitions
     719                 :          * to determine if the optimization can be applied to them.  We could
     720                 :          * go and open them all here, but doing so may be quite a costly
     721                 :          * overhead for small copies.  In any case, we may just end up routing
     722                 :          * tuples to a small number of partitions.  It seems better just to
     723                 :          * raise an ERROR for partitioned tables.
     724                 :          */
     725 CBC          29 :         if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     726                 :         {
     727 GIC           3 :             ereport(ERROR,
     728 ECB             :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     729                 :                      errmsg("cannot perform COPY FREEZE on a partitioned table")));
     730                 :         }
     731                 : 
     732                 :         /*
     733                 :          * Tolerate one registration for the benefit of FirstXactSnapshot.
     734                 :          * Scan-bearing queries generally create at least two registrations,
     735                 :          * though relying on that is fragile, as is ignoring ActiveSnapshot.
     736                 :          * Clear CatalogSnapshot to avoid counting its registration.  We'll
     737                 :          * still detect ongoing catalog scans, each of which separately
     738                 :          * registers the snapshot it uses.
     739                 :          */
     740 CBC          26 :         InvalidateCatalogSnapshot();
     741              26 :         if (!ThereAreNoPriorRegisteredSnapshots() || !ThereAreNoReadyPortals())
     742 UIC           0 :             ereport(ERROR,
     743                 :                     (errcode(ERRCODE_INVALID_TRANSACTION_STATE),
     744 ECB             :                      errmsg("cannot perform COPY FREEZE because of prior transaction activity")));
     745                 : 
     746 GIC          52 :         if (cstate->rel->rd_createSubid != GetCurrentSubTransactionId() &&
     747 GNC          26 :             cstate->rel->rd_newRelfilelocatorSubid != GetCurrentSubTransactionId())
     748 GIC           9 :             ereport(ERROR,
     749 ECB             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     750                 :                      errmsg("cannot perform COPY FREEZE because the table was not created or truncated in the current subtransaction")));
     751                 : 
     752 GIC          17 :         ti_options |= TABLE_INSERT_FROZEN;
     753 ECB             :     }
     754                 : 
     755                 :     /*
     756                 :      * We need a ResultRelInfo so we can use the regular executor's
     757                 :      * index-entry-making machinery.  (There used to be a huge amount of code
     758                 :      * here that basically duplicated execUtils.c ...)
     759                 :      */
     760 GNC         976 :     ExecInitRangeTable(estate, cstate->range_table, cstate->rteperminfos);
     761 CBC         976 :     resultRelInfo = target_resultRelInfo = makeNode(ResultRelInfo);
     762 GIC         976 :     ExecInitResultRelation(estate, resultRelInfo, 1);
     763                 : 
     764                 :     /* Verify the named relation is a valid target for INSERT */
     765             976 :     CheckValidResultRel(resultRelInfo, CMD_INSERT);
     766                 : 
     767             975 :     ExecOpenIndices(resultRelInfo, false);
     768 ECB             : 
     769                 :     /*
     770                 :      * Set up a ModifyTableState so we can let FDW(s) init themselves for
     771                 :      * foreign-table result relation(s).
     772                 :      */
     773 GIC         975 :     mtstate = makeNode(ModifyTableState);
     774 CBC         975 :     mtstate->ps.plan = NULL;
     775             975 :     mtstate->ps.state = estate;
     776 GIC         975 :     mtstate->operation = CMD_INSERT;
     777             975 :     mtstate->mt_nrels = 1;
     778             975 :     mtstate->resultRelInfo = resultRelInfo;
     779             975 :     mtstate->rootResultRelInfo = resultRelInfo;
     780 EUB             : 
     781 GBC         975 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     782 GIC          18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
     783              18 :         resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
     784                 :                                                          resultRelInfo);
     785 EUB             : 
     786                 :     /*
     787                 :      * Also, if the named relation is a foreign table, determine if the FDW
     788                 :      * supports batch insert and determine the batch size (a FDW may support
     789                 :      * batching, but it may be disabled for the server/table).
     790                 :      *
     791                 :      * If the FDW does not support batching, we set the batch size to 1.
     792                 :      */
     793 GNC         975 :     if (resultRelInfo->ri_FdwRoutine != NULL &&
     794              18 :         resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize &&
     795              18 :         resultRelInfo->ri_FdwRoutine->ExecForeignBatchInsert)
     796              18 :         resultRelInfo->ri_BatchSize =
     797              18 :             resultRelInfo->ri_FdwRoutine->GetForeignModifyBatchSize(resultRelInfo);
     798                 :     else
     799             957 :         resultRelInfo->ri_BatchSize = 1;
     800                 : 
     801             975 :     Assert(resultRelInfo->ri_BatchSize >= 1);
     802                 : 
     803 EUB             :     /* Prepare to catch AFTER triggers. */
     804 GIC         975 :     AfterTriggerBeginQuery();
     805                 : 
     806                 :     /*
     807                 :      * If there are any triggers with transition tables on the named relation,
     808 EUB             :      * we need to be prepared to capture transition tuples.
     809                 :      *
     810                 :      * Because partition tuple routing would like to know about whether
     811                 :      * transition capture is active, we also set it in mtstate, which is
     812                 :      * passed to ExecFindPartition() below.
     813                 :      */
     814 GIC         975 :     cstate->transition_capture = mtstate->mt_transition_capture =
     815             975 :         MakeTransitionCaptureState(cstate->rel->trigdesc,
     816             975 :                                    RelationGetRelid(cstate->rel),
     817                 :                                    CMD_INSERT);
     818                 : 
     819 ECB             :     /*
     820                 :      * If the named relation is a partitioned table, initialize state for
     821                 :      * CopyFrom tuple routing.
     822                 :      */
     823 GIC         975 :     if (cstate->rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
     824              45 :         proute = ExecSetupPartitionTupleRouting(estate, cstate->rel);
     825                 : 
     826             975 :     if (cstate->whereClause)
     827               9 :         cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
     828                 :                                         &mtstate->ps);
     829                 : 
     830                 :     /*
     831                 :      * It's generally more efficient to prepare a bunch of tuples for
     832                 :      * insertion, and insert them in one
     833                 :      * table_multi_insert()/ExecForeignBatchInsert() call, than call
     834                 :      * table_tuple_insert()/ExecForeignInsert() separately for every tuple.
     835                 :      * However, there are a number of reasons why we might not be able to do
     836                 :      * this.  These are explained below.
     837                 :      */
     838             975 :     if (resultRelInfo->ri_TrigDesc != NULL &&
     839              88 :         (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
     840              42 :          resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
     841                 :     {
     842                 :         /*
     843                 :          * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
     844                 :          * triggers on the table. Such triggers might query the table we're
     845                 :          * inserting into and act differently if the tuples that have already
     846                 :          * been processed and prepared for insertion are not there.
     847 ECB             :          */
     848 GIC          52 :         insertMethod = CIM_SINGLE;
     849 ECB             :     }
     850 GNC         923 :     else if (resultRelInfo->ri_FdwRoutine != NULL &&
     851              14 :              resultRelInfo->ri_BatchSize == 1)
     852                 :     {
     853                 :         /*
     854                 :          * Can't support multi-inserts to a foreign table if the FDW does not
     855                 :          * support batching, or it's disabled for the server or foreign table.
     856                 :          */
     857               9 :         insertMethod = CIM_SINGLE;
     858                 :     }
     859 GIC         914 :     else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
     860              13 :              resultRelInfo->ri_TrigDesc->trig_insert_new_table)
     861                 :     {
     862                 :         /*
     863                 :          * For partitioned tables we can't support multi-inserts when there
     864                 :          * are any statement level insert triggers. It might be possible to
     865                 :          * allow partitioned tables with such triggers in the future, but for
     866                 :          * now, CopyMultiInsertInfoFlush expects that any after row insert and
     867                 :          * statement level insert triggers are on the same relation.
     868                 :          */
     869               9 :         insertMethod = CIM_SINGLE;
     870                 :     }
     871 GNC         905 :     else if (cstate->volatile_defexprs)
     872 EUB             :     {
     873                 :         /*
     874                 :          * Can't support multi-inserts if there are any volatile default
     875                 :          * expressions in the table.  Similarly to the trigger case above,
     876                 :          * such expressions may query the table we're inserting into.
     877 ECB             :          *
     878                 :          * Note: It does not matter if any partitions have any volatile
     879                 :          * default expressions as we use the defaults from the target of the
     880                 :          * COPY command.
     881                 :          */
     882 UIC           0 :         insertMethod = CIM_SINGLE;
     883                 :     }
     884 GIC         905 :     else if (contain_volatile_functions(cstate->whereClause))
     885                 :     {
     886                 :         /*
     887                 :          * Can't support multi-inserts if there are any volatile function
     888                 :          * expressions in WHERE clause.  Similarly to the trigger case above,
     889 ECB             :          * such expressions may query the table we're inserting into.
     890                 :          */
     891 LBC           0 :         insertMethod = CIM_SINGLE;
     892                 :     }
     893                 :     else
     894 ECB             :     {
     895                 :         /*
     896                 :          * For partitioned tables, we may still be able to perform bulk
     897                 :          * inserts.  However, the possibility of this depends on which types
     898                 :          * of triggers exist on the partition.  We must disable bulk inserts
     899                 :          * if the partition is a foreign table that can't use batching or it
     900                 :          * has any before row insert or insert instead triggers (same as we
     901                 :          * checked above for the parent table).  Since the partition's
     902                 :          * resultRelInfos are initialized only when we actually need to insert
     903                 :          * the first tuple into them, we must have the intermediate insert
     904                 :          * method of CIM_MULTI_CONDITIONAL to flag that we must later
     905                 :          * determine if we can use bulk-inserts for the partition being
     906                 :          * inserted into.
     907                 :          */
     908 CBC         905 :         if (proute)
     909              36 :             insertMethod = CIM_MULTI_CONDITIONAL;
     910                 :         else
     911             869 :             insertMethod = CIM_MULTI;
     912 ECB             : 
     913 CBC         905 :         CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
     914                 :                                 estate, mycid, ti_options);
     915                 :     }
     916                 : 
     917                 :     /*
     918                 :      * If not using batch mode (which allocates slots as needed) set up a
     919                 :      * tuple slot too. When inserting into a partitioned table, we also need
     920                 :      * one, even if we might batch insert, to read the tuple in the root
     921                 :      * partition's form.
     922                 :      */
     923             975 :     if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
     924 ECB             :     {
     925 CBC         106 :         singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
     926 ECB             :                                        &estate->es_tupleTable);
     927 CBC         106 :         bistate = GetBulkInsertState();
     928                 :     }
     929 ECB             : 
     930 GIC        1063 :     has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     931 CBC          88 :                                   resultRelInfo->ri_TrigDesc->trig_insert_before_row);
     932                 : 
     933 GIC        1063 :     has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
     934 CBC          88 :                                    resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
     935                 : 
     936                 :     /*
     937                 :      * Check BEFORE STATEMENT insertion triggers. It's debatable whether we
     938                 :      * should do this for COPY, since it's not really an "INSERT" statement as
     939                 :      * such. However, executing these triggers maintains consistency with the
     940                 :      * EACH ROW triggers that we already fire on COPY.
     941                 :      */
     942 GIC         975 :     ExecBSInsertTriggers(estate, resultRelInfo);
     943                 : 
     944 CBC         975 :     econtext = GetPerTupleExprContext(estate);
     945 ECB             : 
     946                 :     /* Set up callback to identify error line number */
     947 GIC         975 :     errcallback.callback = CopyFromErrorCallback;
     948             975 :     errcallback.arg = (void *) cstate;
     949             975 :     errcallback.previous = error_context_stack;
     950             975 :     error_context_stack = &errcallback;
     951                 : 
     952                 :     for (;;)
     953 CBC      893845 :     {
     954 ECB             :         TupleTableSlot *myslot;
     955                 :         bool        skip_tuple;
     956                 : 
     957 CBC      894820 :         CHECK_FOR_INTERRUPTS();
     958                 : 
     959                 :         /*
     960                 :          * Reset the per-tuple exprcontext. We do this after every tuple, to
     961                 :          * clean-up after expression evaluations etc.
     962                 :          */
     963 GIC      894820 :         ResetPerTupleExprContext(estate);
     964                 : 
     965                 :         /* select slot to (initially) load row into */
     966          894820 :         if (insertMethod == CIM_SINGLE || proute)
     967                 :         {
     968 CBC      107430 :             myslot = singleslot;
     969          107430 :             Assert(myslot != NULL);
     970 ECB             :         }
     971                 :         else
     972                 :         {
     973 GIC      787390 :             Assert(resultRelInfo == target_resultRelInfo);
     974          787390 :             Assert(insertMethod == CIM_MULTI);
     975                 : 
     976          787390 :             myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
     977                 :                                                      resultRelInfo);
     978 ECB             :         }
     979                 : 
     980                 :         /*
     981                 :          * Switch to per-tuple context before calling NextCopyFrom, which does
     982                 :          * evaluate default expressions etc. and requires per-tuple context.
     983                 :          */
     984 GIC      894820 :         MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
     985                 : 
     986          894820 :         ExecClearTuple(myslot);
     987 ECB             : 
     988                 :         /* Directly store the values/nulls array in the slot */
     989 CBC      894820 :         if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
     990             906 :             break;
     991                 : 
     992 GIC      893862 :         ExecStoreVirtualTuple(myslot);
     993                 : 
     994                 :         /*
     995                 :          * Constraints and where clause might reference the tableoid column,
     996                 :          * so (re-)initialize tts_tableOid before evaluating them.
     997                 :          */
     998          893862 :         myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
     999 ECB             : 
    1000                 :         /* Triggers and stuff need to be invoked in query context. */
    1001 CBC      893862 :         MemoryContextSwitchTo(oldcontext);
    1002                 : 
    1003 GIC      893862 :         if (cstate->whereClause)
    1004                 :         {
    1005              33 :             econtext->ecxt_scantuple = myslot;
    1006                 :             /* Skip items that don't match COPY's WHERE clause */
    1007              33 :             if (!ExecQual(cstate->qualexpr, econtext))
    1008                 :             {
    1009                 :                 /*
    1010                 :                  * Report that this tuple was filtered out by the WHERE
    1011                 :                  * clause.
    1012 EUB             :                  */
    1013 GIC          18 :                 pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED,
    1014 ECB             :                                              ++excluded);
    1015 GIC          18 :                 continue;
    1016                 :             }
    1017                 :         }
    1018                 : 
    1019                 :         /* Determine the partition to insert the tuple into */
    1020          893844 :         if (proute)
    1021 EUB             :         {
    1022                 :             TupleConversionMap *map;
    1023                 : 
    1024                 :             /*
    1025                 :              * Attempt to find a partition suitable for this tuple.
    1026                 :              * ExecFindPartition() will raise an error if none can be found or
    1027                 :              * if the found partition is not suitable for INSERTs.
    1028                 :              */
    1029 GIC      107193 :             resultRelInfo = ExecFindPartition(mtstate, target_resultRelInfo,
    1030                 :                                               proute, myslot, estate);
    1031                 : 
    1032          107192 :             if (prevResultRelInfo != resultRelInfo)
    1033                 :             {
    1034                 :                 /* Determine which triggers exist on this partition */
    1035           50782 :                 has_before_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1036              27 :                                               resultRelInfo->ri_TrigDesc->trig_insert_before_row);
    1037                 : 
    1038 CBC       50782 :                 has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
    1039              27 :                                                resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
    1040                 : 
    1041 ECB             :                 /*
    1042                 :                  * Disable multi-inserts when the partition has BEFORE/INSTEAD
    1043                 :                  * OF triggers, or if the partition is a foreign table that
    1044                 :                  * can't use batching.
    1045                 :                  */
    1046 GIC      101483 :                 leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
    1047           50728 :                     !has_before_insert_row_trig &&
    1048          152199 :                     !has_instead_insert_row_trig &&
    1049 GNC       50716 :                     (resultRelInfo->ri_FdwRoutine == NULL ||
    1050               6 :                      resultRelInfo->ri_BatchSize > 1);
    1051                 : 
    1052                 :                 /* Set the multi-insert buffer to use for this partition. */
    1053 GIC       50755 :                 if (leafpart_use_multi_insert)
    1054                 :                 {
    1055 CBC       50714 :                     if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
    1056 GIC          42 :                         CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
    1057 ECB             :                                                        resultRelInfo);
    1058                 :                 }
    1059 CBC          41 :                 else if (insertMethod == CIM_MULTI_CONDITIONAL &&
    1060 GIC          14 :                          !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1061                 :                 {
    1062 ECB             :                     /*
    1063                 :                      * Flush pending inserts if this partition can't use
    1064                 :                      * batching, so rows are visible to triggers etc.
    1065                 :                      */
    1066 UNC           0 :                     CopyMultiInsertInfoFlush(&multiInsertInfo,
    1067                 :                                              resultRelInfo,
    1068                 :                                              &processed);
    1069                 :                 }
    1070                 : 
    1071 GIC       50755 :                 if (bistate != NULL)
    1072           50755 :                     ReleaseBulkInsertStatePin(bistate);
    1073           50755 :                 prevResultRelInfo = resultRelInfo;
    1074                 :             }
    1075                 : 
    1076 ECB             :             /*
    1077                 :              * If we're capturing transition tuples, we might need to convert
    1078                 :              * from the partition rowtype to root rowtype. But if there are no
    1079                 :              * BEFORE triggers on the partition that could change the tuple,
    1080                 :              * we can just remember the original unconverted tuple to avoid a
    1081                 :              * needless round trip conversion.
    1082                 :              */
    1083 CBC      107192 :             if (cstate->transition_capture != NULL)
    1084              27 :                 cstate->transition_capture->tcs_original_insert_tuple =
    1085 GIC          27 :                     !has_before_insert_row_trig ? myslot : NULL;
    1086                 : 
    1087 ECB             :             /*
    1088                 :              * We might need to convert from the root rowtype to the partition
    1089                 :              * rowtype.
    1090                 :              */
    1091 GNC      107192 :             map = ExecGetRootToChildMap(resultRelInfo, estate);
    1092 GIC      107192 :             if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
    1093                 :             {
    1094                 :                 /* non batch insert */
    1095              68 :                 if (map != NULL)
    1096                 :                 {
    1097 ECB             :                     TupleTableSlot *new_slot;
    1098                 : 
    1099 GIC          55 :                     new_slot = resultRelInfo->ri_PartitionTupleSlot;
    1100 CBC          55 :                     myslot = execute_attr_map_slot(map->attrMap, myslot, new_slot);
    1101                 :                 }
    1102 ECB             :             }
    1103                 :             else
    1104                 :             {
    1105                 :                 /*
    1106                 :                  * Prepare to queue up tuple for later batch insert into
    1107                 :                  * current partition.
    1108                 :                  */
    1109                 :                 TupleTableSlot *batchslot;
    1110                 : 
    1111                 :                 /* no other path available for partitioned table */
    1112 GIC      107124 :                 Assert(insertMethod == CIM_MULTI_CONDITIONAL);
    1113                 : 
    1114          107124 :                 batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
    1115                 :                                                             resultRelInfo);
    1116                 : 
    1117          107124 :                 if (map != NULL)
    1118 CBC        6100 :                     myslot = execute_attr_map_slot(map->attrMap, myslot,
    1119                 :                                                    batchslot);
    1120 ECB             :                 else
    1121                 :                 {
    1122                 :                     /*
    1123                 :                      * This looks more expensive than it is (Believe me, I
    1124                 :                      * optimized it away. Twice.). The input is in virtual
    1125                 :                      * form, and we'll materialize the slot below - for most
    1126                 :                      * slot types the copy performs the work materialization
    1127                 :                      * would later require anyway.
    1128                 :                      */
    1129 GIC      101024 :                     ExecCopySlot(batchslot, myslot);
    1130          101024 :                     myslot = batchslot;
    1131                 :                 }
    1132 ECB             :             }
    1133                 : 
    1134                 :             /* ensure that triggers etc see the right relation  */
    1135 CBC      107192 :             myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1136                 :         }
    1137 ECB             : 
    1138 GIC      893843 :         skip_tuple = false;
    1139 ECB             : 
    1140                 :         /* BEFORE ROW INSERT Triggers */
    1141 CBC      893843 :         if (has_before_insert_row_trig)
    1142                 :         {
    1143 GIC         137 :             if (!ExecBRInsertTriggers(estate, resultRelInfo, myslot))
    1144               8 :                 skip_tuple = true;  /* "do nothing" */
    1145                 :         }
    1146                 : 
    1147 CBC      893843 :         if (!skip_tuple)
    1148                 :         {
    1149 ECB             :             /*
    1150                 :              * If there is an INSTEAD OF INSERT ROW trigger, let it handle the
    1151                 :              * tuple.  Otherwise, proceed with inserting the tuple into the
    1152                 :              * table or foreign table.
    1153                 :              */
    1154 CBC      893835 :             if (has_instead_insert_row_trig)
    1155                 :             {
    1156 GIC           6 :                 ExecIRInsertTriggers(estate, resultRelInfo, myslot);
    1157                 :             }
    1158                 :             else
    1159                 :             {
    1160                 :                 /* Compute stored generated columns */
    1161          893829 :                 if (resultRelInfo->ri_RelationDesc->rd_att->constr &&
    1162          300878 :                     resultRelInfo->ri_RelationDesc->rd_att->constr->has_generated_stored)
    1163 CBC          18 :                     ExecComputeStoredGenerated(resultRelInfo, estate, myslot,
    1164                 :                                                CMD_INSERT);
    1165                 : 
    1166 ECB             :                 /*
    1167                 :                  * If the target is a plain table, check the constraints of
    1168                 :                  * the tuple.
    1169                 :                  */
    1170 CBC      893829 :                 if (resultRelInfo->ri_FdwRoutine == NULL &&
    1171 GIC      893785 :                     resultRelInfo->ri_RelationDesc->rd_att->constr)
    1172 CBC      300860 :                     ExecConstraints(resultRelInfo, myslot, estate);
    1173 ECB             : 
    1174                 :                 /*
    1175                 :                  * Also check the tuple against the partition constraint, if
    1176                 :                  * there is one; except that if we got here via tuple-routing,
    1177                 :                  * we don't need to if there's no BR trigger defined on the
    1178                 :                  * partition.
    1179                 :                  */
    1180 CBC      893814 :                 if (resultRelInfo->ri_RelationDesc->rd_rel->relispartition &&
    1181          107186 :                     (proute == NULL || has_before_insert_row_trig))
    1182            1154 :                     ExecPartitionCheck(resultRelInfo, myslot, estate, true);
    1183 ECB             : 
    1184                 :                 /* Store the slot in the multi-insert buffer, when enabled. */
    1185 GIC      893814 :                 if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
    1186                 :                 {
    1187 ECB             :                     /*
    1188                 :                      * The slot previously might point into the per-tuple
    1189                 :                      * context. For batching it needs to be longer lived.
    1190                 :                      */
    1191 GIC      893642 :                     ExecMaterializeSlot(myslot);
    1192                 : 
    1193 ECB             :                     /* Add this tuple to the tuple buffer */
    1194 CBC      893642 :                     CopyMultiInsertInfoStore(&multiInsertInfo,
    1195                 :                                              resultRelInfo, myslot,
    1196                 :                                              cstate->line_buf.len,
    1197                 :                                              cstate->cur_lineno);
    1198                 : 
    1199                 :                     /*
    1200 EUB             :                      * If enough inserts have queued up, then flush all
    1201                 :                      * buffers out to their tables.
    1202                 :                      */
    1203 GIC      893642 :                     if (CopyMultiInsertInfoIsFull(&multiInsertInfo))
    1204 GNC         650 :                         CopyMultiInsertInfoFlush(&multiInsertInfo,
    1205                 :                                                  resultRelInfo,
    1206                 :                                                  &processed);
    1207                 : 
    1208                 :                     /*
    1209                 :                      * We delay updating the row counter and progress of the
    1210                 :                      * COPY command until after writing the tuples stored in
    1211                 :                      * the buffer out to the table, as in single insert mode.
    1212                 :                      * See CopyMultiInsertBufferFlush().
    1213                 :                      */
    1214          893642 :                     continue;   /* next tuple please */
    1215 ECB             :                 }
    1216                 :                 else
    1217                 :                 {
    1218 GIC         172 :                     List       *recheckIndexes = NIL;
    1219                 : 
    1220                 :                     /* OK, store the tuple */
    1221             172 :                     if (resultRelInfo->ri_FdwRoutine != NULL)
    1222                 :                     {
    1223              25 :                         myslot = resultRelInfo->ri_FdwRoutine->ExecForeignInsert(estate,
    1224                 :                                                                                  resultRelInfo,
    1225                 :                                                                                  myslot,
    1226                 :                                                                                  NULL);
    1227 ECB             : 
    1228 CBC          24 :                         if (myslot == NULL) /* "do nothing" */
    1229               2 :                             continue;   /* next tuple please */
    1230                 : 
    1231                 :                         /*
    1232                 :                          * AFTER ROW Triggers might reference the tableoid
    1233                 :                          * column, so (re-)initialize tts_tableOid before
    1234                 :                          * evaluating them.
    1235 ECB             :                          */
    1236 CBC          22 :                         myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
    1237                 :                     }
    1238                 :                     else
    1239 ECB             :                     {
    1240                 :                         /* OK, store the tuple and create index entries for it */
    1241 GIC         147 :                         table_tuple_insert(resultRelInfo->ri_RelationDesc,
    1242                 :                                            myslot, mycid, ti_options, bistate);
    1243 ECB             : 
    1244 CBC         147 :                         if (resultRelInfo->ri_NumIndices > 0)
    1245 UIC           0 :                             recheckIndexes = ExecInsertIndexTuples(resultRelInfo,
    1246                 :                                                                    myslot,
    1247                 :                                                                    estate,
    1248                 :                                                                    false,
    1249                 :                                                                    false,
    1250                 :                                                                    NULL,
    1251                 :                                                                    NIL,
    1252                 :                                                                    false);
    1253                 :                     }
    1254                 : 
    1255                 :                     /* AFTER ROW INSERT Triggers */
    1256 GIC         169 :                     ExecARInsertTriggers(estate, resultRelInfo, myslot,
    1257 ECB             :                                          recheckIndexes, cstate->transition_capture);
    1258                 : 
    1259 CBC         169 :                     list_free(recheckIndexes);
    1260                 :                 }
    1261                 :             }
    1262 ECB             : 
    1263                 :             /*
    1264                 :              * We count only tuples not suppressed by a BEFORE INSERT trigger
    1265                 :              * or FDW; this is the same definition used by nodeModifyTable.c
    1266                 :              * for counting tuples inserted by an INSERT command.  Update
    1267                 :              * progress of the COPY command as well.
    1268                 :              */
    1269 GIC         175 :             pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,
    1270                 :                                          ++processed);
    1271                 :         }
    1272                 :     }
    1273                 : 
    1274 ECB             :     /* Flush any remaining buffered tuples */
    1275 CBC         906 :     if (insertMethod != CIM_SINGLE)
    1276                 :     {
    1277 GIC         849 :         if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
    1278 GNC         736 :             CopyMultiInsertInfoFlush(&multiInsertInfo, NULL, &processed);
    1279                 :     }
    1280 ECB             : 
    1281                 :     /* Done, clean up */
    1282 GIC         900 :     error_context_stack = errcallback.previous;
    1283 ECB             : 
    1284 GIC         900 :     if (bistate != NULL)
    1285              92 :         FreeBulkInsertState(bistate);
    1286 ECB             : 
    1287 GIC         900 :     MemoryContextSwitchTo(oldcontext);
    1288 ECB             : 
    1289                 :     /* Execute AFTER STATEMENT insertion triggers */
    1290 GIC         900 :     ExecASInsertTriggers(estate, target_resultRelInfo, cstate->transition_capture);
    1291                 : 
    1292 ECB             :     /* Handle queued AFTER triggers */
    1293 GIC         900 :     AfterTriggerEndQuery(estate);
    1294                 : 
    1295             900 :     ExecResetTupleTable(estate->es_tupleTable, false);
    1296                 : 
    1297                 :     /* Allow the FDW to shut down */
    1298             900 :     if (target_resultRelInfo->ri_FdwRoutine != NULL &&
    1299 CBC          16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
    1300 GIC          16 :         target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
    1301 ECB             :                                                               target_resultRelInfo);
    1302                 : 
    1303                 :     /* Tear down the multi-insert buffer data */
    1304 GIC         900 :     if (insertMethod != CIM_SINGLE)
    1305             843 :         CopyMultiInsertInfoCleanup(&multiInsertInfo);
    1306 ECB             : 
    1307                 :     /* Close all the partitioned tables, leaf partitions, and their indices */
    1308 CBC         900 :     if (proute)
    1309 GIC          44 :         ExecCleanupTupleRouting(mtstate, proute);
    1310                 : 
    1311                 :     /* Close the result relations, including any trigger target relations */
    1312             900 :     ExecCloseResultRelations(estate);
    1313             900 :     ExecCloseRangeTableRelations(estate);
    1314                 : 
    1315 CBC         900 :     FreeExecutorState(estate);
    1316 ECB             : 
    1317 CBC         900 :     return processed;
    1318                 : }
    1319                 : 
    1320                 : /*
    1321                 :  * Setup to read tuples from a file for COPY FROM.
    1322                 :  *
    1323                 :  * 'rel': Used as a template for the tuples
    1324                 :  * 'whereClause': WHERE clause from the COPY FROM command
    1325 ECB             :  * 'filename': Name of server-local file to read, NULL for STDIN
    1326                 :  * 'is_program': true if 'filename' is program to execute
    1327                 :  * 'data_source_cb': callback that provides the input data
    1328                 :  * 'attnamelist': List of char *, columns to include. NIL selects all cols.
    1329                 :  * 'options': List of DefElem. See copy_opt_item in gram.y for selections.
    1330                 :  *
    1331                 :  * Returns a CopyFromState, to be passed to NextCopyFrom and related functions.
    1332                 :  */
    1333                 : CopyFromState
    1334 GIC        1089 : BeginCopyFrom(ParseState *pstate,
    1335                 :               Relation rel,
    1336 ECB             :               Node *whereClause,
    1337                 :               const char *filename,
    1338                 :               bool is_program,
    1339                 :               copy_data_source_cb data_source_cb,
    1340                 :               List *attnamelist,
    1341                 :               List *options)
    1342                 : {
    1343                 :     CopyFromState cstate;
    1344 GIC        1089 :     bool        pipe = (filename == NULL);
    1345                 :     TupleDesc   tupDesc;
    1346                 :     AttrNumber  num_phys_attrs,
    1347                 :                 num_defaults;
    1348 ECB             :     FmgrInfo   *in_functions;
    1349                 :     Oid        *typioparams;
    1350                 :     Oid         in_func_oid;
    1351                 :     int        *defmap;
    1352                 :     ExprState **defexprs;
    1353                 :     MemoryContext oldcontext;
    1354                 :     bool        volatile_defexprs;
    1355 GIC        1089 :     const int   progress_cols[] = {
    1356                 :         PROGRESS_COPY_COMMAND,
    1357                 :         PROGRESS_COPY_TYPE,
    1358 ECB             :         PROGRESS_COPY_BYTES_TOTAL
    1359                 :     };
    1360 GIC        1089 :     int64       progress_vals[] = {
    1361                 :         PROGRESS_COPY_COMMAND_FROM,
    1362 ECB             :         0,
    1363                 :         0
    1364                 :     };
    1365                 : 
    1366                 :     /* Allocate workspace and zero all fields */
    1367 CBC        1089 :     cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
    1368                 : 
    1369                 :     /*
    1370                 :      * We allocate everything used by a cstate in a new memory context. This
    1371                 :      * avoids memory leaks during repeated use of COPY in a query.
    1372 ECB             :      */
    1373 CBC        1089 :     cstate->copycontext = AllocSetContextCreate(CurrentMemoryContext,
    1374                 :                                                 "COPY",
    1375                 :                                                 ALLOCSET_DEFAULT_SIZES);
    1376                 : 
    1377 GIC        1089 :     oldcontext = MemoryContextSwitchTo(cstate->copycontext);
    1378                 : 
    1379                 :     /* Extract options from the statement node tree */
    1380 CBC        1089 :     ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options);
    1381                 : 
    1382                 :     /* Process the target relation */
    1383 GIC        1028 :     cstate->rel = rel;
    1384                 : 
    1385 CBC        1028 :     tupDesc = RelationGetDescr(cstate->rel);
    1386                 : 
    1387                 :     /* process common options or initialization */
    1388 ECB             : 
    1389 EUB             :     /* Generate or convert list of attributes to process */
    1390 GIC        1028 :     cstate->attnumlist = CopyGetAttnums(tupDesc, cstate->rel, attnamelist);
    1391                 : 
    1392            1028 :     num_phys_attrs = tupDesc->natts;
    1393                 : 
    1394                 :     /* Convert FORCE_NOT_NULL name list to per-column flags, check validity */
    1395            1028 :     cstate->opts.force_notnull_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1396            1028 :     if (cstate->opts.force_notnull)
    1397                 :     {
    1398                 :         List       *attnums;
    1399                 :         ListCell   *cur;
    1400 ECB             : 
    1401 GIC          13 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_notnull);
    1402                 : 
    1403 CBC          26 :         foreach(cur, attnums)
    1404                 :         {
    1405 GIC          16 :             int         attnum = lfirst_int(cur);
    1406              16 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1407                 : 
    1408              16 :             if (!list_member_int(cstate->attnumlist, attnum))
    1409               3 :                 ereport(ERROR,
    1410                 :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1411                 :                          errmsg("FORCE_NOT_NULL column \"%s\" not referenced by COPY",
    1412                 :                                 NameStr(attr->attname))));
    1413 CBC          13 :             cstate->opts.force_notnull_flags[attnum - 1] = true;
    1414                 :         }
    1415                 :     }
    1416                 : 
    1417                 :     /* Convert FORCE_NULL name list to per-column flags, check validity */
    1418 GIC        1025 :     cstate->opts.force_null_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1419 CBC        1025 :     if (cstate->opts.force_null)
    1420                 :     {
    1421 ECB             :         List       *attnums;
    1422                 :         ListCell   *cur;
    1423                 : 
    1424 GIC          13 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.force_null);
    1425                 : 
    1426 CBC          26 :         foreach(cur, attnums)
    1427                 :         {
    1428              16 :             int         attnum = lfirst_int(cur);
    1429              16 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1430                 : 
    1431              16 :             if (!list_member_int(cstate->attnumlist, attnum))
    1432 GIC           3 :                 ereport(ERROR,
    1433                 :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1434 ECB             :                          errmsg("FORCE_NULL column \"%s\" not referenced by COPY",
    1435                 :                                 NameStr(attr->attname))));
    1436 GIC          13 :             cstate->opts.force_null_flags[attnum - 1] = true;
    1437 ECB             :         }
    1438                 :     }
    1439                 : 
    1440                 :     /* Convert convert_selectively name list to per-column flags */
    1441 GIC        1022 :     if (cstate->opts.convert_selectively)
    1442 ECB             :     {
    1443                 :         List       *attnums;
    1444                 :         ListCell   *cur;
    1445                 : 
    1446 GIC           2 :         cstate->convert_select_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
    1447                 : 
    1448 CBC           2 :         attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->opts.convert_select);
    1449 ECB             : 
    1450 GIC           4 :         foreach(cur, attnums)
    1451                 :         {
    1452 CBC           2 :             int         attnum = lfirst_int(cur);
    1453               2 :             Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
    1454                 : 
    1455 GIC           2 :             if (!list_member_int(cstate->attnumlist, attnum))
    1456 LBC           0 :                 ereport(ERROR,
    1457 ECB             :                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
    1458                 :                          errmsg_internal("selected column \"%s\" not referenced by COPY",
    1459                 :                                          NameStr(attr->attname))));
    1460 GIC           2 :             cstate->convert_select_flags[attnum - 1] = true;
    1461 ECB             :         }
    1462                 :     }
    1463                 : 
    1464                 :     /* Use client encoding when ENCODING option is not specified. */
    1465 GIC        1022 :     if (cstate->opts.file_encoding < 0)
    1466            1019 :         cstate->file_encoding = pg_get_client_encoding();
    1467                 :     else
    1468               3 :         cstate->file_encoding = cstate->opts.file_encoding;
    1469                 : 
    1470                 :     /*
    1471                 :      * Look up encoding conversion function.
    1472                 :      */
    1473            1022 :     if (cstate->file_encoding == GetDatabaseEncoding() ||
    1474               3 :         cstate->file_encoding == PG_SQL_ASCII ||
    1475 UIC           0 :         GetDatabaseEncoding() == PG_SQL_ASCII)
    1476                 :     {
    1477 GIC        1022 :         cstate->need_transcoding = false;
    1478 ECB             :     }
    1479                 :     else
    1480                 :     {
    1481 UIC           0 :         cstate->need_transcoding = true;
    1482               0 :         cstate->conversion_proc = FindDefaultConversionProc(cstate->file_encoding,
    1483                 :                                                             GetDatabaseEncoding());
    1484                 :     }
    1485                 : 
    1486 GIC        1022 :     cstate->copy_src = COPY_FILE;    /* default */
    1487                 : 
    1488 CBC        1022 :     cstate->whereClause = whereClause;
    1489                 : 
    1490                 :     /* Initialize state variables */
    1491 GIC        1022 :     cstate->eol_type = EOL_UNKNOWN;
    1492            1022 :     cstate->cur_relname = RelationGetRelationName(cstate->rel);
    1493            1022 :     cstate->cur_lineno = 0;
    1494            1022 :     cstate->cur_attname = NULL;
    1495            1022 :     cstate->cur_attval = NULL;
    1496 GNC        1022 :     cstate->relname_only = false;
    1497                 : 
    1498                 :     /*
    1499                 :      * Allocate buffers for the input pipeline.
    1500 ECB             :      *
    1501                 :      * attribute_buf and raw_buf are used in both text and binary modes, but
    1502                 :      * input_buf and line_buf only in text mode.
    1503                 :      */
    1504 GIC        1022 :     cstate->raw_buf = palloc(RAW_BUF_SIZE + 1);
    1505 CBC        1022 :     cstate->raw_buf_index = cstate->raw_buf_len = 0;
    1506 GIC        1022 :     cstate->raw_reached_eof = false;
    1507                 : 
    1508            1022 :     if (!cstate->opts.binary)
    1509                 :     {
    1510                 :         /*
    1511                 :          * If encoding conversion is needed, we need another buffer to hold
    1512 ECB             :          * the converted input data.  Otherwise, we can just point input_buf
    1513                 :          * to the same buffer as raw_buf.
    1514                 :          */
    1515 GIC        1014 :         if (cstate->need_transcoding)
    1516                 :         {
    1517 UIC           0 :             cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
    1518 LBC           0 :             cstate->input_buf_index = cstate->input_buf_len = 0;
    1519                 :         }
    1520                 :         else
    1521 GIC        1014 :             cstate->input_buf = cstate->raw_buf;
    1522 CBC        1014 :         cstate->input_reached_eof = false;
    1523                 : 
    1524 GIC        1014 :         initStringInfo(&cstate->line_buf);
    1525 ECB             :     }
    1526                 : 
    1527 GIC        1022 :     initStringInfo(&cstate->attribute_buf);
    1528 ECB             : 
    1529                 :     /* Assign range table and rteperminfos, we'll need them in CopyFrom. */
    1530 CBC        1022 :     if (pstate)
    1531                 :     {
    1532 GIC         992 :         cstate->range_table = pstate->p_rtable;
    1533 GNC         992 :         cstate->rteperminfos = pstate->p_rteperminfos;
    1534                 :     }
    1535                 : 
    1536 GIC        1022 :     tupDesc = RelationGetDescr(cstate->rel);
    1537            1022 :     num_phys_attrs = tupDesc->natts;
    1538 CBC        1022 :     num_defaults = 0;
    1539 GIC        1022 :     volatile_defexprs = false;
    1540 ECB             : 
    1541                 :     /*
    1542                 :      * Pick up the required catalog information for each attribute in the
    1543                 :      * relation, including the input function, the element type (to pass to
    1544                 :      * the input function), and info about defaults and constraints. (Which
    1545                 :      * input function we use depends on text/binary format choice.)
    1546                 :      */
    1547 GIC        1022 :     in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
    1548            1022 :     typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
    1549 CBC        1022 :     defmap = (int *) palloc(num_phys_attrs * sizeof(int));
    1550 GIC        1022 :     defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
    1551 ECB             : 
    1552 GNC        5086 :     for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
    1553 ECB             :     {
    1554 CBC        4065 :         Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
    1555                 : 
    1556 ECB             :         /* We don't need info for dropped attributes */
    1557 CBC        4065 :         if (att->attisdropped)
    1558 GIC          62 :             continue;
    1559                 : 
    1560                 :         /* Fetch the input function and typioparam info */
    1561 CBC        4003 :         if (cstate->opts.binary)
    1562 GIC          31 :             getTypeBinaryInputInfo(att->atttypid,
    1563              31 :                                    &in_func_oid, &typioparams[attnum - 1]);
    1564                 :         else
    1565            3972 :             getTypeInputInfo(att->atttypid,
    1566 CBC        3972 :                              &in_func_oid, &typioparams[attnum - 1]);
    1567            4002 :         fmgr_info(in_func_oid, &in_functions[attnum - 1]);
    1568                 : 
    1569                 :         /* Get default info if available */
    1570 GNC        4002 :         defexprs[attnum - 1] = NULL;
    1571                 : 
    1572            4002 :         if (!att->attgenerated)
    1573                 :         {
    1574 CBC        3992 :             Expr       *defexpr = (Expr *) build_column_default(cstate->rel,
    1575                 :                                                                 attnum);
    1576 ECB             : 
    1577 CBC        3992 :             if (defexpr != NULL)
    1578                 :             {
    1579 ECB             :                 /* Run the expression through planner */
    1580 CBC         206 :                 defexpr = expression_planner(defexpr);
    1581                 : 
    1582                 :                 /* Initialize executable expression in copycontext */
    1583 GNC         206 :                 defexprs[attnum - 1] = ExecInitExpr(defexpr, NULL);
    1584                 : 
    1585                 :                 /* if NOT copied from input */
    1586                 :                 /* use default value if one exists */
    1587             206 :                 if (!list_member_int(cstate->attnumlist, attnum))
    1588                 :                 {
    1589              83 :                     defmap[num_defaults] = attnum - 1;
    1590              83 :                     num_defaults++;
    1591                 :                 }
    1592                 : 
    1593                 :                 /*
    1594                 :                  * If a default expression looks at the table being loaded,
    1595 ECB             :                  * then it could give the wrong answer when using
    1596                 :                  * multi-insert. Since database access can be dynamic this is
    1597                 :                  * hard to test for exactly, so we use the much wider test of
    1598                 :                  * whether the default expression is volatile. We allow for
    1599                 :                  * the special case of when the default expression is the
    1600                 :                  * nextval() of a sequence which in this specific case is
    1601                 :                  * known to be safe for use with the multi-insert
    1602                 :                  * optimization. Hence we use this special case function
    1603                 :                  * checker rather than the standard check for
    1604                 :                  * contain_volatile_functions().
    1605                 :                  */
    1606 CBC         206 :                 if (!volatile_defexprs)
    1607             206 :                     volatile_defexprs = contain_volatile_functions_not_nextval((Node *) defexpr);
    1608                 :             }
    1609 ECB             :         }
    1610 EUB             :     }
    1611                 : 
    1612                 : 
    1613                 :     /* initialize progress */
    1614 CBC        1021 :     pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
    1615 GIC        1021 :                                   cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
    1616            1021 :     cstate->bytes_processed = 0;
    1617                 : 
    1618                 :     /* We keep those variables in cstate. */
    1619 CBC        1021 :     cstate->in_functions = in_functions;
    1620            1021 :     cstate->typioparams = typioparams;
    1621 GIC        1021 :     cstate->defmap = defmap;
    1622 CBC        1021 :     cstate->defexprs = defexprs;
    1623 GIC        1021 :     cstate->volatile_defexprs = volatile_defexprs;
    1624            1021 :     cstate->num_defaults = num_defaults;
    1625            1021 :     cstate->is_program = is_program;
    1626                 : 
    1627 CBC        1021 :     if (data_source_cb)
    1628 ECB             :     {
    1629 GBC         154 :         progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
    1630 GIC         154 :         cstate->copy_src = COPY_CALLBACK;
    1631 CBC         154 :         cstate->data_source_cb = data_source_cb;
    1632                 :     }
    1633 GIC         867 :     else if (pipe)
    1634                 :     {
    1635 GBC         411 :         progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
    1636             411 :         Assert(!is_program);    /* the grammar does not allow this */
    1637 GIC         411 :         if (whereToSendOutput == DestRemote)
    1638             411 :             ReceiveCopyBegin(cstate);
    1639                 :         else
    1640 LBC           0 :             cstate->copy_file = stdin;
    1641                 :     }
    1642 ECB             :     else
    1643                 :     {
    1644 GIC         456 :         cstate->filename = pstrdup(filename);
    1645 ECB             : 
    1646 CBC         456 :         if (cstate->is_program)
    1647 ECB             :         {
    1648 LBC           0 :             progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
    1649               0 :             cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
    1650               0 :             if (cstate->copy_file == NULL)
    1651 UIC           0 :                 ereport(ERROR,
    1652                 :                         (errcode_for_file_access(),
    1653                 :                          errmsg("could not execute command \"%s\": %m",
    1654                 :                                 cstate->filename)));
    1655                 :         }
    1656                 :         else
    1657                 :         {
    1658 ECB             :             struct stat st;
    1659                 : 
    1660 CBC         456 :             progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
    1661 GIC         456 :             cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
    1662 CBC         456 :             if (cstate->copy_file == NULL)
    1663                 :             {
    1664                 :                 /* copy errno because ereport subfunctions might change it */
    1665 UIC           0 :                 int         save_errno = errno;
    1666                 : 
    1667               0 :                 ereport(ERROR,
    1668                 :                         (errcode_for_file_access(),
    1669 ECB             :                          errmsg("could not open file \"%s\" for reading: %m",
    1670                 :                                 cstate->filename),
    1671 EUB             :                          (save_errno == ENOENT || save_errno == EACCES) ?
    1672                 :                          errhint("COPY FROM instructs the PostgreSQL server process to read a file. "
    1673                 :                                  "You may want a client-side facility such as psql's \\copy.") : 0));
    1674                 :             }
    1675 ECB             : 
    1676 CBC         456 :             if (fstat(fileno(cstate->copy_file), &st))
    1677 UIC           0 :                 ereport(ERROR,
    1678 ECB             :                         (errcode_for_file_access(),
    1679                 :                          errmsg("could not stat file \"%s\": %m",
    1680                 :                                 cstate->filename)));
    1681                 : 
    1682 GIC         456 :             if (S_ISDIR(st.st_mode))
    1683 UIC           0 :                 ereport(ERROR,
    1684 ECB             :                         (errcode(ERRCODE_WRONG_OBJECT_TYPE),
    1685                 :                          errmsg("\"%s\" is a directory", cstate->filename)));
    1686                 : 
    1687 CBC         456 :             progress_vals[2] = st.st_size;
    1688                 :         }
    1689                 :     }
    1690 ECB             : 
    1691 CBC        1021 :     pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
    1692 ECB             : 
    1693 CBC        1021 :     if (cstate->opts.binary)
    1694                 :     {
    1695                 :         /* Read and verify binary header */
    1696 GIC           7 :         ReceiveCopyBinaryHeader(cstate);
    1697                 :     }
    1698                 : 
    1699                 :     /* create workspace for CopyReadAttributes results */
    1700            1021 :     if (!cstate->opts.binary)
    1701 ECB             :     {
    1702 CBC        1014 :         AttrNumber  attr_count = list_length(cstate->attnumlist);
    1703 ECB             : 
    1704 CBC        1014 :         cstate->max_fields = attr_count;
    1705 GIC        1014 :         cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
    1706 ECB             :     }
    1707                 : 
    1708 CBC        1021 :     MemoryContextSwitchTo(oldcontext);
    1709                 : 
    1710 GIC        1021 :     return cstate;
    1711 ECB             : }
    1712                 : 
    1713                 : /*
    1714                 :  * Clean up storage and release resources for COPY FROM.
    1715                 :  */
    1716                 : void
    1717 CBC         780 : EndCopyFrom(CopyFromState cstate)
    1718                 : {
    1719 ECB             :     /* No COPY FROM related resources except memory. */
    1720 CBC         780 :     if (cstate->is_program)
    1721 ECB             :     {
    1722 UIC           0 :         ClosePipeFromProgram(cstate);
    1723                 :     }
    1724 ECB             :     else
    1725                 :     {
    1726 CBC         780 :         if (cstate->filename != NULL && FreeFile(cstate->copy_file))
    1727 UIC           0 :             ereport(ERROR,
    1728 ECB             :                     (errcode_for_file_access(),
    1729                 :                      errmsg("could not close file \"%s\": %m",
    1730                 :                             cstate->filename)));
    1731                 :     }
    1732                 : 
    1733 GIC         780 :     pgstat_progress_end_command();
    1734 ECB             : 
    1735 GIC         780 :     MemoryContextDelete(cstate->copycontext);
    1736             780 :     pfree(cstate);
    1737 CBC         780 : }
    1738                 : 
    1739                 : /*
    1740                 :  * Closes the pipe from an external program, checking the pclose() return code.
    1741 ECB             :  */
    1742                 : static void
    1743 LBC           0 : ClosePipeFromProgram(CopyFromState cstate)
    1744 ECB             : {
    1745                 :     int         pclose_rc;
    1746                 : 
    1747 UIC           0 :     Assert(cstate->is_program);
    1748                 : 
    1749               0 :     pclose_rc = ClosePipeStream(cstate->copy_file);
    1750               0 :     if (pclose_rc == -1)
    1751               0 :         ereport(ERROR,
    1752                 :                 (errcode_for_file_access(),
    1753                 :                  errmsg("could not close pipe to external command: %m")));
    1754               0 :     else if (pclose_rc != 0)
    1755                 :     {
    1756                 :         /*
    1757                 :          * If we ended a COPY FROM PROGRAM before reaching EOF, then it's
    1758                 :          * expectable for the called program to fail with SIGPIPE, and we
    1759                 :          * should not report that as an error.  Otherwise, SIGPIPE indicates a
    1760 ECB             :          * problem.
    1761                 :          */
    1762 UIC           0 :         if (!cstate->raw_reached_eof &&
    1763               0 :             wait_result_is_signal(pclose_rc, SIGPIPE))
    1764               0 :             return;
    1765                 : 
    1766               0 :         ereport(ERROR,
    1767                 :                 (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
    1768 ECB             :                  errmsg("program \"%s\" failed",
    1769                 :                         cstate->filename),
    1770                 :                  errdetail_internal("%s", wait_result_to_str(pclose_rc))));
    1771                 :     }
    1772                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a