LCOV - differential code coverage report
Current view: top level - contrib/tablefunc - tablefunc.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 90.4 % 426 385 41 385
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 19 19 19
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 90.4 % 426 385 41 385
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 100.0 % 19 19 19

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*
                                  2                 :  * contrib/tablefunc/tablefunc.c
                                  3                 :  *
                                  4                 :  *
                                  5                 :  * tablefunc
                                  6                 :  *
                                  7                 :  * Sample to demonstrate C functions which return setof scalar
                                  8                 :  * and setof composite.
                                  9                 :  * Joe Conway <mail@joeconway.com>
                                 10                 :  * And contributors:
                                 11                 :  * Nabil Sayegh <postgresql@e-trolley.de>
                                 12                 :  *
                                 13                 :  * Copyright (c) 2002-2023, PostgreSQL Global Development Group
                                 14                 :  *
                                 15                 :  * Permission to use, copy, modify, and distribute this software and its
                                 16                 :  * documentation for any purpose, without fee, and without a written agreement
                                 17                 :  * is hereby granted, provided that the above copyright notice and this
                                 18                 :  * paragraph and the following two paragraphs appear in all copies.
                                 19                 :  *
                                 20                 :  * IN NO EVENT SHALL THE AUTHORS OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
                                 21                 :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
                                 22                 :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
                                 23                 :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
                                 24                 :  * POSSIBILITY OF SUCH DAMAGE.
                                 25                 :  *
                                 26                 :  * THE AUTHORS AND DISTRIBUTORS SPECIFICALLY DISCLAIM ANY WARRANTIES,
                                 27                 :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
                                 28                 :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
                                 29                 :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
                                 30                 :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
                                 31                 :  *
                                 32                 :  */
                                 33                 : #include "postgres.h"
                                 34                 : 
                                 35                 : #include <math.h>
                                 36                 : 
                                 37                 : #include "access/htup_details.h"
                                 38                 : #include "catalog/pg_type.h"
                                 39                 : #include "common/pg_prng.h"
                                 40                 : #include "executor/spi.h"
                                 41                 : #include "funcapi.h"
                                 42                 : #include "lib/stringinfo.h"
                                 43                 : #include "miscadmin.h"
                                 44                 : #include "tablefunc.h"
                                 45                 : #include "utils/builtins.h"
                                 46                 : 
 6158 tgl                        47 CBC           1 : PG_MODULE_MAGIC;
                                 48                 : 
                                 49                 : static HTAB *load_categories_hash(char *cats_sql, MemoryContext per_query_ctx);
                                 50                 : static Tuplestorestate *get_crosstab_tuplestore(char *sql,
                                 51                 :                                                 HTAB *crosstab_hash,
                                 52                 :                                                 TupleDesc tupdesc,
                                 53                 :                                                 bool randomAccess);
                                 54                 : static void validateConnectbyTupleDesc(TupleDesc td, bool show_branch, bool show_serial);
                                 55                 : static bool compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc);
                                 56                 : static void compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc);
                                 57                 : static void get_normal_pair(float8 *x1, float8 *x2);
                                 58                 : static Tuplestorestate *connectby(char *relname,
                                 59                 :                                   char *key_fld,
                                 60                 :                                   char *parent_key_fld,
                                 61                 :                                   char *orderby_fld,
                                 62                 :                                   char *branch_delim,
                                 63                 :                                   char *start_with,
                                 64                 :                                   int max_depth,
                                 65                 :                                   bool show_branch,
                                 66                 :                                   bool show_serial,
                                 67                 :                                   MemoryContext per_query_ctx,
                                 68                 :                                   bool randomAccess,
                                 69                 :                                   AttInMetadata *attinmeta);
                                 70                 : static void build_tuplestore_recursively(char *key_fld,
                                 71                 :                                          char *parent_key_fld,
                                 72                 :                                          char *relname,
                                 73                 :                                          char *orderby_fld,
                                 74                 :                                          char *branch_delim,
                                 75                 :                                          char *start_with,
                                 76                 :                                          char *branch,
                                 77                 :                                          int level,
                                 78                 :                                          int *serial,
                                 79                 :                                          int max_depth,
                                 80                 :                                          bool show_branch,
                                 81                 :                                          bool show_serial,
                                 82                 :                                          MemoryContext per_query_ctx,
                                 83                 :                                          AttInMetadata *attinmeta,
                                 84                 :                                          Tuplestorestate *tupstore);
                                 85                 : 
                                 86                 : typedef struct
                                 87                 : {
                                 88                 :     float8      mean;           /* mean of the distribution */
                                 89                 :     float8      stddev;         /* stddev of the distribution */
                                 90                 :     float8      carry_val;      /* hold second generated value */
                                 91                 :     bool        use_carry;      /* use second generated value */
                                 92                 : } normal_rand_fctx;
                                 93                 : 
                                 94                 : #define xpfree(var_) \
                                 95                 :     do { \
                                 96                 :         if (var_ != NULL) \
                                 97                 :         { \
                                 98                 :             pfree(var_); \
                                 99                 :             var_ = NULL; \
                                100                 :         } \
                                101                 :     } while (0)
                                102                 : 
                                103                 : #define xpstrdup(tgtvar_, srcvar_) \
                                104                 :     do { \
                                105                 :         if (srcvar_) \
                                106                 :             tgtvar_ = pstrdup(srcvar_); \
                                107                 :         else \
                                108                 :             tgtvar_ = NULL; \
                                109                 :     } while (0)
                                110                 : 
                                111                 : #define xstreq(tgtvar_, srcvar_) \
                                112                 :     (((tgtvar_ == NULL) && (srcvar_ == NULL)) || \
                                113                 :      ((tgtvar_ != NULL) && (srcvar_ != NULL) && (strcmp(tgtvar_, srcvar_) == 0)))
                                114                 : 
                                115                 : /* sign, 10 digits, '\0' */
                                116                 : #define INT32_STRLEN    12
                                117                 : 
                                118                 : /* stored info for a crosstab category */
                                119                 : typedef struct crosstab_cat_desc
                                120                 : {
                                121                 :     char       *catname;        /* full category name */
                                122                 :     uint64      attidx;         /* zero based */
                                123                 : } crosstab_cat_desc;
                                124                 : 
                                125                 : #define MAX_CATNAME_LEN         NAMEDATALEN
                                126                 : #define INIT_CATS               64
                                127                 : 
                                128                 : #define crosstab_HashTableLookup(HASHTAB, CATNAME, CATDESC) \
                                129                 : do { \
                                130                 :     crosstab_HashEnt *hentry; char key[MAX_CATNAME_LEN]; \
                                131                 :     \
                                132                 :     MemSet(key, 0, MAX_CATNAME_LEN); \
                                133                 :     snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATNAME); \
                                134                 :     hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
                                135                 :                                          key, HASH_FIND, NULL); \
                                136                 :     if (hentry) \
                                137                 :         CATDESC = hentry->catdesc; \
                                138                 :     else \
                                139                 :         CATDESC = NULL; \
                                140                 : } while(0)
                                141                 : 
                                142                 : #define crosstab_HashTableInsert(HASHTAB, CATDESC) \
                                143                 : do { \
                                144                 :     crosstab_HashEnt *hentry; bool found; char key[MAX_CATNAME_LEN]; \
                                145                 :     \
                                146                 :     MemSet(key, 0, MAX_CATNAME_LEN); \
                                147                 :     snprintf(key, MAX_CATNAME_LEN - 1, "%s", CATDESC->catname); \
                                148                 :     hentry = (crosstab_HashEnt*) hash_search(HASHTAB, \
                                149                 :                                          key, HASH_ENTER, &found); \
                                150                 :     if (found) \
                                151                 :         ereport(ERROR, \
                                152                 :                 (errcode(ERRCODE_DUPLICATE_OBJECT), \
                                153                 :                  errmsg("duplicate category name"))); \
                                154                 :     hentry->catdesc = CATDESC; \
                                155                 : } while(0)
                                156                 : 
                                157                 : /* hash table */
                                158                 : typedef struct crosstab_hashent
                                159                 : {
                                160                 :     char        internal_catname[MAX_CATNAME_LEN];
                                161                 :     crosstab_cat_desc *catdesc;
                                162                 : } crosstab_HashEnt;
                                163                 : 
                                164                 : /*
                                165                 :  * normal_rand - return requested number of random values
                                166                 :  * with a Gaussian (Normal) distribution.
                                167                 :  *
                                168                 :  * inputs are int numvals, float8 mean, and float8 stddev
                                169                 :  * returns setof float8
                                170                 :  */
 7558 bruce                     171               2 : PG_FUNCTION_INFO_V1(normal_rand);
                                172                 : Datum
                                173             102 : normal_rand(PG_FUNCTION_ARGS)
                                174                 : {
                                175                 :     FuncCallContext *funcctx;
                                176                 :     uint64      call_cntr;
                                177                 :     uint64      max_calls;
                                178                 :     normal_rand_fctx *fctx;
                                179                 :     float8      mean;
                                180                 :     float8      stddev;
                                181                 :     float8      carry_val;
                                182                 :     bool        use_carry;
                                183                 :     MemoryContext oldcontext;
                                184                 : 
                                185                 :     /* stuff done only on the first call of the function */
 7522                           186             102 :     if (SRF_IS_FIRSTCALL())
                                187                 :     {
                                188                 :         int32       num_tuples;
                                189                 : 
                                190                 :         /* create a function context for cross-call persistence */
                                191               2 :         funcctx = SRF_FIRSTCALL_INIT();
                                192                 : 
                                193                 :         /*
                                194                 :          * switch to memory context appropriate for multiple function calls
                                195                 :          */
 7528 tgl                       196               2 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
                                197                 : 
                                198                 :         /* total number of tuples to be returned */
  865 peter                     199               2 :         num_tuples = PG_GETARG_INT32(0);
                                200               2 :         if (num_tuples < 0)
                                201               1 :             ereport(ERROR,
                                202                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                203                 :                      errmsg("number of rows cannot be negative")));
                                204               1 :         funcctx->max_calls = num_tuples;
                                205                 : 
                                206                 :         /* allocate memory for user context */
 7558 bruce                     207               1 :         fctx = (normal_rand_fctx *) palloc(sizeof(normal_rand_fctx));
                                208                 : 
                                209                 :         /*
                                210                 :          * Use fctx to keep track of upper and lower bounds from call to call.
                                211                 :          * It will also be used to carry over the spare value we get from the
                                212                 :          * Box-Muller algorithm so that we only actually calculate a new value
                                213                 :          * every other call.
                                214                 :          */
                                215               1 :         fctx->mean = PG_GETARG_FLOAT8(1);
                                216               1 :         fctx->stddev = PG_GETARG_FLOAT8(2);
                                217               1 :         fctx->carry_val = 0;
                                218               1 :         fctx->use_carry = false;
                                219                 : 
                                220               1 :         funcctx->user_fctx = fctx;
                                221                 : 
 7528 tgl                       222               1 :         MemoryContextSwitchTo(oldcontext);
                                223                 :     }
                                224                 : 
                                225                 :     /* stuff done on every call of the function */
 7522 bruce                     226             101 :     funcctx = SRF_PERCALL_SETUP();
                                227                 : 
 7558                           228             101 :     call_cntr = funcctx->call_cntr;
                                229             101 :     max_calls = funcctx->max_calls;
                                230             101 :     fctx = funcctx->user_fctx;
                                231             101 :     mean = fctx->mean;
                                232             101 :     stddev = fctx->stddev;
                                233             101 :     carry_val = fctx->carry_val;
                                234             101 :     use_carry = fctx->use_carry;
                                235                 : 
 7522                           236             101 :     if (call_cntr < max_calls)   /* do when there is more left to send */
                                237                 :     {
                                238                 :         float8      result;
                                239                 : 
                                240             100 :         if (use_carry)
                                241                 :         {
                                242                 :             /*
                                243                 :              * reset use_carry and use second value obtained on last pass
                                244                 :              */
 7558                           245              50 :             fctx->use_carry = false;
                                246              50 :             result = carry_val;
                                247                 :         }
                                248                 :         else
                                249                 :         {
                                250                 :             float8      normval_1;
                                251                 :             float8      normval_2;
                                252                 : 
                                253                 :             /* Get the next two normal values */
                                254              50 :             get_normal_pair(&normval_1, &normval_2);
                                255                 : 
                                256                 :             /* use the first */
                                257              50 :             result = mean + (stddev * normval_1);
                                258                 : 
                                259                 :             /* and save the second */
                                260              50 :             fctx->carry_val = mean + (stddev * normval_2);
                                261              50 :             fctx->use_carry = true;
                                262                 :         }
                                263                 : 
                                264                 :         /* send the result */
 7522                           265             100 :         SRF_RETURN_NEXT(funcctx, Float8GetDatum(result));
                                266                 :     }
                                267                 :     else
                                268                 :         /* do when there is no more left */
                                269               1 :         SRF_RETURN_DONE(funcctx);
                                270                 : }
                                271                 : 
                                272                 : /*
                                273                 :  * get_normal_pair()
                                274                 :  * Assigns normally distributed (Gaussian) values to a pair of provided
                                275                 :  * parameters, with mean 0, standard deviation 1.
                                276                 :  *
                                277                 :  * This routine implements Algorithm P (Polar method for normal deviates)
                                278                 :  * from Knuth's _The_Art_of_Computer_Programming_, Volume 2, 3rd ed., pages
                                279                 :  * 122-126. Knuth cites his source as "The polar method", G. E. P. Box, M. E.
                                280                 :  * Muller, and G. Marsaglia, _Annals_Math,_Stat._ 29 (1958), 610-611.
                                281                 :  *
                                282                 :  */
                                283                 : static void
 7558                           284              50 : get_normal_pair(float8 *x1, float8 *x2)
                                285                 : {
                                286                 :     float8      u1,
                                287                 :                 u2,
                                288                 :                 v1,
                                289                 :                 v2,
                                290                 :                 s;
                                291                 : 
                                292                 :     do
                                293                 :     {
  497 tgl                       294              69 :         u1 = pg_prng_double(&pg_global_prng_state);
                                295              69 :         u2 = pg_prng_double(&pg_global_prng_state);
                                296                 : 
 7558 bruce                     297              69 :         v1 = (2.0 * u1) - 1.0;
                                298              69 :         v2 = (2.0 * u2) - 1.0;
                                299                 : 
 7512 tgl                       300              69 :         s = v1 * v1 + v2 * v2;
                                301              69 :     } while (s >= 1.0);
                                302                 : 
                                303              50 :     if (s == 0)
                                304                 :     {
 7512 tgl                       305 UBC           0 :         *x1 = 0;
                                306               0 :         *x2 = 0;
                                307                 :     }
                                308                 :     else
                                309                 :     {
 7512 tgl                       310 CBC          50 :         s = sqrt((-2.0 * log(s)) / s);
                                311              50 :         *x1 = v1 * s;
                                312              50 :         *x2 = v2 * s;
                                313                 :     }
 7558 bruce                     314              50 : }
                                315                 : 
                                316                 : /*
                                317                 :  * crosstab - create a crosstab of rowids and values columns from a
                                318                 :  * SQL statement returning one rowid column, one category column,
                                319                 :  * and one value column.
                                320                 :  *
                                321                 :  * e.g. given sql which produces:
                                322                 :  *
                                323                 :  *          rowid   cat     value
                                324                 :  *          ------+-------+-------
                                325                 :  *          row1    cat1    val1
                                326                 :  *          row1    cat2    val2
                                327                 :  *          row1    cat3    val3
                                328                 :  *          row1    cat4    val4
                                329                 :  *          row2    cat1    val5
                                330                 :  *          row2    cat2    val6
                                331                 :  *          row2    cat3    val7
                                332                 :  *          row2    cat4    val8
                                333                 :  *
                                334                 :  * crosstab returns:
                                335                 :  *                  <===== values columns =====>
                                336                 :  *          rowid   cat1    cat2    cat3    cat4
                                337                 :  *          ------+-------+-------+-------+-------
                                338                 :  *          row1    val1    val2    val3    val4
                                339                 :  *          row2    val5    val6    val7    val8
                                340                 :  *
                                341                 :  * NOTES:
                                342                 :  * 1. SQL result must be ordered by 1,2.
                                343                 :  * 2. The number of values columns depends on the tuple description
                                344                 :  *    of the function's declared return type.  The return type's columns
                                345                 :  *    must match the datatypes of the SQL query's result.  The datatype
                                346                 :  *    of the category column can be anything, however.
                                347                 :  * 3. Missing values (i.e. not enough adjacent rows of same rowid to
                                348                 :  *    fill the number of result values columns) are filled in with nulls.
                                349                 :  * 4. Extra values (i.e. too many adjacent rows of same rowid to fill
                                350                 :  *    the number of result values columns) are skipped.
                                351                 :  * 5. Rows with all nulls in the values columns are skipped.
                                352                 :  */
                                353              11 : PG_FUNCTION_INFO_V1(crosstab);
                                354                 : Datum
                                355              16 : crosstab(PG_FUNCTION_ARGS)
                                356                 : {
 5242 tgl                       357              16 :     char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                358              16 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                                359                 :     Tuplestorestate *tupstore;
                                360                 :     TupleDesc   tupdesc;
                                361                 :     uint64      call_cntr;
                                362                 :     uint64      max_calls;
                                363                 :     AttInMetadata *attinmeta;
                                364                 :     SPITupleTable *spi_tuptable;
                                365                 :     TupleDesc   spi_tupdesc;
                                366                 :     bool        firstpass;
                                367                 :     char       *lastrowid;
                                368                 :     int         i;
                                369                 :     int         num_categories;
                                370                 :     MemoryContext per_query_ctx;
                                371                 :     MemoryContext oldcontext;
                                372                 :     int         ret;
                                373                 :     uint64      proc;
                                374                 : 
                                375                 :     /* check to see if caller supports us returning a tuplestore */
                                376              16 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 5242 tgl                       377 UBC           0 :         ereport(ERROR,
                                378                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                379                 :                  errmsg("set-valued function called in context that cannot accept a set")));
 5242 tgl                       380 CBC          16 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
 5242 tgl                       381 UBC           0 :         ereport(ERROR,
                                382                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                383                 :                  errmsg("materialize mode required, but it is not allowed in this context")));
                                384                 : 
 5242 tgl                       385 CBC          16 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
                                386                 : 
                                387                 :     /* Connect to SPI manager */
                                388              16 :     if ((ret = SPI_connect()) < 0)
                                389                 :         /* internal error */
 5242 tgl                       390 UBC           0 :         elog(ERROR, "crosstab: SPI_connect returned %d", ret);
                                391                 : 
                                392                 :     /* Retrieve the desired rows */
 5242 tgl                       393 CBC          16 :     ret = SPI_execute(sql, true, 0);
                                394              16 :     proc = SPI_processed;
                                395                 : 
                                396                 :     /* If no qualifying tuples, fall out early */
 2584                           397              16 :     if (ret != SPI_OK_SELECT || proc == 0)
                                398                 :     {
 5242 tgl                       399 UBC           0 :         SPI_finish();
                                400               0 :         rsinfo->isDone = ExprEndResult;
                                401               0 :         PG_RETURN_NULL();
                                402                 :     }
                                403                 : 
 5242 tgl                       404 CBC          16 :     spi_tuptable = SPI_tuptable;
                                405              16 :     spi_tupdesc = spi_tuptable->tupdesc;
                                406                 : 
                                407                 :     /*----------
                                408                 :      * The provided SQL query must always return three columns.
                                409                 :      *
                                410                 :      * 1. rowname
                                411                 :      *  the label or identifier for each row in the final result
                                412                 :      * 2. category
                                413                 :      *  the label or identifier for each column in the final result
                                414                 :      * 3. values
                                415                 :      *  the value for each column in the final result
                                416                 :      *----------
                                417                 :      */
                                418              16 :     if (spi_tupdesc->natts != 3)
 5242 tgl                       419 UBC           0 :         ereport(ERROR,
                                420                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                421                 :                  errmsg("invalid source data SQL statement"),
                                422                 :                  errdetail("The provided SQL must return 3 "
                                423                 :                            "columns: rowid, category, and values.")));
                                424                 : 
                                425                 :     /* get a tuple descriptor for our result type */
 5242 tgl                       426 CBC          16 :     switch (get_call_result_type(fcinfo, NULL, &tupdesc))
                                427                 :     {
                                428              16 :         case TYPEFUNC_COMPOSITE:
                                429                 :             /* success */
                                430              16 :             break;
 5242 tgl                       431 UBC           0 :         case TYPEFUNC_RECORD:
                                432                 :             /* failed to determine actual type of RECORD */
                                433               0 :             ereport(ERROR,
                                434                 :                     (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                435                 :                      errmsg("function returning record called in context "
                                436                 :                             "that cannot accept type record")));
                                437                 :             break;
                                438               0 :         default:
                                439                 :             /* result type isn't composite */
 2807                           440               0 :             ereport(ERROR,
                                441                 :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
                                442                 :                      errmsg("return type must be a row type")));
                                443                 :             break;
                                444                 :     }
                                445                 : 
                                446                 :     /*
                                447                 :      * Check that return tupdesc is compatible with the data we got from SPI,
                                448                 :      * at least based on number and type of attributes
                                449                 :      */
 5242 tgl                       450 CBC          16 :     if (!compatCrosstabTupleDescs(tupdesc, spi_tupdesc))
 5242 tgl                       451 UBC           0 :         ereport(ERROR,
                                452                 :                 (errcode(ERRCODE_SYNTAX_ERROR),
                                453                 :                  errmsg("return and sql tuple descriptions are " \
                                454                 :                         "incompatible")));
                                455                 : 
                                456                 :     /*
                                457                 :      * switch to long-lived memory context
                                458                 :      */
 5242 tgl                       459 CBC          16 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
                                460                 : 
                                461                 :     /* make sure we have a persistent copy of the result tupdesc */
                                462              16 :     tupdesc = CreateTupleDescCopy(tupdesc);
                                463                 : 
                                464                 :     /* initialize our tuplestore in long-lived context */
                                465                 :     tupstore =
                                466              16 :         tuplestore_begin_heap(rsinfo->allowedModes & SFRM_Materialize_Random,
                                467                 :                               false, work_mem);
                                468                 : 
                                469              16 :     MemoryContextSwitchTo(oldcontext);
                                470                 : 
                                471                 :     /*
                                472                 :      * Generate attribute metadata needed later to produce tuples from raw C
                                473                 :      * strings
                                474                 :      */
                                475              16 :     attinmeta = TupleDescGetAttInMetadata(tupdesc);
                                476                 : 
                                477                 :     /* total number of tuples to be examined */
                                478              16 :     max_calls = proc;
                                479                 : 
                                480                 :     /* the return tuple always must have 1 rowid + num_categories columns */
                                481              16 :     num_categories = tupdesc->natts - 1;
                                482                 : 
                                483              16 :     firstpass = true;
                                484              16 :     lastrowid = NULL;
                                485                 : 
                                486              81 :     for (call_cntr = 0; call_cntr < max_calls; call_cntr++)
                                487                 :     {
 5629 mail                      488              65 :         bool        skip_tuple = false;
                                489                 :         char      **values;
                                490                 : 
                                491                 :         /* allocate and zero space */
 5242 tgl                       492              65 :         values = (char **) palloc0((1 + num_categories) * sizeof(char *));
                                493                 : 
                                494                 :         /*
                                495                 :          * now loop through the sql results and assign each value in sequence
                                496                 :          * to the next category
                                497                 :          */
                                498             174 :         for (i = 0; i < num_categories; i++)
                                499                 :         {
                                500                 :             HeapTuple   spi_tuple;
                                501                 :             char       *rowid;
                                502                 : 
                                503                 :             /* see if we've gone too far already */
                                504             144 :             if (call_cntr >= max_calls)
                                505               5 :                 break;
                                506                 : 
                                507                 :             /* get the next sql result tuple */
                                508             139 :             spi_tuple = spi_tuptable->vals[call_cntr];
                                509                 : 
                                510                 :             /* get the rowid from the current sql result tuple */
                                511             139 :             rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
                                512                 : 
                                513                 :             /*
                                514                 :              * If this is the first pass through the values for this rowid,
                                515                 :              * set the first column to rowid
                                516                 :              */
                                517             139 :             if (i == 0)
                                518                 :             {
                                519              65 :                 xpstrdup(values[0], rowid);
                                520                 : 
                                521                 :                 /*
                                522                 :                  * Check to see if the rowid is the same as that of the last
                                523                 :                  * tuple sent -- if so, skip this tuple entirely
                                524                 :                  */
                                525              65 :                 if (!firstpass && xstreq(lastrowid, rowid))
                                526                 :                 {
                                527              23 :                     xpfree(rowid);
                                528              23 :                     skip_tuple = true;
 7558 bruce                     529              23 :                     break;
                                530                 :                 }
                                531                 :             }
                                532                 : 
                                533                 :             /*
                                534                 :              * If rowid hasn't changed on us, continue building the output
                                535                 :              * tuple.
                                536                 :              */
 5242 tgl                       537             116 :             if (xstreq(rowid, values[0]))
                                538                 :             {
                                539                 :                 /*
                                540                 :                  * Get the next category item value, which is always attribute
                                541                 :                  * number three.
                                542                 :                  *
                                543                 :                  * Be careful to assign the value to the array index based on
                                544                 :                  * which category we are presently processing.
                                545                 :                  */
                                546             109 :                 values[1 + i] = SPI_getvalue(spi_tuple, spi_tupdesc, 3);
                                547                 : 
                                548                 :                 /*
                                549                 :                  * increment the counter since we consume a row for each
                                550                 :                  * category, but not for last pass because the outer loop will
                                551                 :                  * do that for us
                                552                 :                  */
                                553             109 :                 if (i < (num_categories - 1))
                                554              79 :                     call_cntr++;
                                555             109 :                 xpfree(rowid);
                                556                 :             }
                                557                 :             else
                                558                 :             {
                                559                 :                 /*
                                560                 :                  * We'll fill in NULLs for the missing values, but we need to
                                561                 :                  * decrement the counter since this sql result row doesn't
                                562                 :                  * belong to the current output tuple.
                                563                 :                  */
                                564               7 :                 call_cntr--;
                                565               7 :                 xpfree(rowid);
                                566               7 :                 break;
                                567                 :             }
                                568                 :         }
                                569                 : 
                                570              65 :         if (!skip_tuple)
                                571                 :         {
                                572                 :             HeapTuple   tuple;
                                573                 : 
                                574                 :             /* build the tuple and store it */
                                575              42 :             tuple = BuildTupleFromCStrings(attinmeta, values);
                                576              42 :             tuplestore_puttuple(tupstore, tuple);
                                577              42 :             heap_freetuple(tuple);
                                578                 :         }
                                579                 : 
                                580                 :         /* Remember current rowid */
                                581              65 :         xpfree(lastrowid);
                                582              65 :         xpstrdup(lastrowid, values[0]);
                                583              65 :         firstpass = false;
                                584                 : 
                                585                 :         /* Clean up */
                                586             311 :         for (i = 0; i < num_categories + 1; i++)
                                587             246 :             if (values[i] != NULL)
                                588             157 :                 pfree(values[i]);
                                589              65 :         pfree(values);
                                590                 :     }
                                591                 : 
                                592                 :     /* let the caller know we're sending back a tuplestore */
                                593              16 :     rsinfo->returnMode = SFRM_Materialize;
                                594              16 :     rsinfo->setResult = tupstore;
                                595              16 :     rsinfo->setDesc = tupdesc;
                                596                 : 
                                597                 :     /* release SPI related resources (and return to caller's context) */
                                598              16 :     SPI_finish();
                                599                 : 
                                600              16 :     return (Datum) 0;
                                601                 : }
                                602                 : 
                                603                 : /*
                                604                 :  * crosstab_hash - reimplement crosstab as materialized function and
                                605                 :  * properly deal with missing values (i.e. don't pack remaining
                                606                 :  * values to the left)
                                607                 :  *
                                608                 :  * crosstab - create a crosstab of rowids and values columns from a
                                609                 :  * SQL statement returning one rowid column, one category column,
                                610                 :  * and one value column.
                                611                 :  *
                                612                 :  * e.g. given sql which produces:
                                613                 :  *
                                614                 :  *          rowid   cat     value
                                615                 :  *          ------+-------+-------
                                616                 :  *          row1    cat1    val1
                                617                 :  *          row1    cat2    val2
                                618                 :  *          row1    cat4    val4
                                619                 :  *          row2    cat1    val5
                                620                 :  *          row2    cat2    val6
                                621                 :  *          row2    cat3    val7
                                622                 :  *          row2    cat4    val8
                                623                 :  *
                                624                 :  * crosstab returns:
                                625                 :  *                  <===== values columns =====>
                                626                 :  *          rowid   cat1    cat2    cat3    cat4
                                627                 :  *          ------+-------+-------+-------+-------
                                628                 :  *          row1    val1    val2    null    val4
                                629                 :  *          row2    val5    val6    val7    val8
                                630                 :  *
                                631                 :  * NOTES:
                                632                 :  * 1. SQL result must be ordered by 1.
                                633                 :  * 2. The number of values columns depends on the tuple description
                                634                 :  *    of the function's declared return type.
                                635                 :  * 3. Missing values (i.e. missing category) are filled in with nulls.
                                636                 :  * 4. Extra values (i.e. not in category results) are skipped.
                                637                 :  */
 7325 bruce                     638               6 : PG_FUNCTION_INFO_V1(crosstab_hash);
                                639                 : Datum
                                640              10 : crosstab_hash(PG_FUNCTION_ARGS)
                                641                 : {
 5493 tgl                       642              10 :     char       *sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                643              10 :     char       *cats_sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
 7188 bruce                     644              10 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                                645                 :     TupleDesc   tupdesc;
                                646                 :     MemoryContext per_query_ctx;
                                647                 :     MemoryContext oldcontext;
                                648                 :     HTAB       *crosstab_hash;
                                649                 : 
                                650                 :     /* check to see if caller supports us returning a tuplestore */
 6483 tgl                       651              10 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 7199 tgl                       652 UBC           0 :         ereport(ERROR,
                                653                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                654                 :                  errmsg("set-valued function called in context that cannot accept a set")));
 5276 tgl                       655 CBC          10 :     if (!(rsinfo->allowedModes & SFRM_Materialize) ||
                                656              10 :         rsinfo->expectedDesc == NULL)
 6483 tgl                       657 UBC           0 :         ereport(ERROR,
                                658                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                659                 :                  errmsg("materialize mode required, but it is not allowed in this context")));
                                660                 : 
 7325 bruce                     661 CBC          10 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
                                662              10 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
                                663                 : 
                                664                 :     /* get the requested return tuple description */
                                665              10 :     tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
                                666                 : 
                                667                 :     /*
                                668                 :      * Check to make sure we have a reasonable tuple descriptor
                                669                 :      *
                                670                 :      * Note we will attempt to coerce the values into whatever the return
                                671                 :      * attribute type is and depend on the "in" function to complain if
                                672                 :      * needed.
                                673                 :      */
                                674              10 :     if (tupdesc->natts < 2)
 7199 tgl                       675 UBC           0 :         ereport(ERROR,
                                676                 :                 (errcode(ERRCODE_SYNTAX_ERROR),
                                677                 :                  errmsg("query-specified return tuple and " \
                                678                 :                         "crosstab function are not compatible")));
                                679                 : 
                                680                 :     /* load up the categories hash table */
 5602 tgl                       681 CBC          10 :     crosstab_hash = load_categories_hash(cats_sql, per_query_ctx);
                                682                 : 
                                683                 :     /* let the caller know we're sending back a tuplestore */
 7325 bruce                     684               9 :     rsinfo->returnMode = SFRM_Materialize;
                                685                 : 
                                686                 :     /* now go build it */
                                687              17 :     rsinfo->setResult = get_crosstab_tuplestore(sql,
                                688                 :                                                 crosstab_hash,
                                689                 :                                                 tupdesc,
 2118 tgl                       690               9 :                                                 rsinfo->allowedModes & SFRM_Materialize_Random);
                                691                 : 
                                692                 :     /*
                                693                 :      * SFRM_Materialize mode expects us to return a NULL Datum. The actual
                                694                 :      * tuples are in our tuplestore and passed back through rsinfo->setResult.
                                695                 :      * rsinfo->setDesc is set to the tuple description that we actually used
                                696                 :      * to build our tuples with, so the caller can verify we did what it was
                                697                 :      * expecting.
                                698                 :      */
 7325 bruce                     699               8 :     rsinfo->setDesc = tupdesc;
                                700               8 :     MemoryContextSwitchTo(oldcontext);
                                701                 : 
                                702               8 :     return (Datum) 0;
                                703                 : }
                                704                 : 
                                705                 : /*
                                706                 :  * load up the categories hash table
                                707                 :  */
                                708                 : static HTAB *
                                709              10 : load_categories_hash(char *cats_sql, MemoryContext per_query_ctx)
                                710                 : {
                                711                 :     HTAB       *crosstab_hash;
                                712                 :     HASHCTL     ctl;
                                713                 :     int         ret;
                                714                 :     uint64      proc;
                                715                 :     MemoryContext SPIcontext;
                                716                 : 
                                717                 :     /* initialize the category hash table */
                                718              10 :     ctl.keysize = MAX_CATNAME_LEN;
                                719              10 :     ctl.entrysize = sizeof(crosstab_HashEnt);
 5602 tgl                       720              10 :     ctl.hcxt = per_query_ctx;
                                721                 : 
                                722                 :     /*
                                723                 :      * use INIT_CATS, defined above as a guess of how many hash table entries
                                724                 :      * to create, initially
                                725                 :      */
                                726              10 :     crosstab_hash = hash_create("crosstab hash",
                                727                 :                                 INIT_CATS,
                                728                 :                                 &ctl,
                                729                 :                                 HASH_ELEM | HASH_STRINGS | HASH_CONTEXT);
                                730                 : 
                                731                 :     /* Connect to SPI manager */
 7325 bruce                     732              10 :     if ((ret = SPI_connect()) < 0)
                                733                 :         /* internal error */
 7325 bruce                     734 UBC           0 :         elog(ERROR, "load_categories_hash: SPI_connect returned %d", ret);
                                735                 : 
                                736                 :     /* Retrieve the category name rows */
 6782 tgl                       737 CBC          10 :     ret = SPI_execute(cats_sql, true, 0);
 5602                           738              10 :     proc = SPI_processed;
                                739                 : 
                                740                 :     /* Check for qualifying tuples */
 7325 bruce                     741              10 :     if ((ret == SPI_OK_SELECT) && (proc > 0))
                                742                 :     {
 7188                           743               8 :         SPITupleTable *spi_tuptable = SPI_tuptable;
                                744               8 :         TupleDesc   spi_tupdesc = spi_tuptable->tupdesc;
                                745                 :         uint64      i;
                                746                 : 
                                747                 :         /*
                                748                 :          * The provided categories SQL query must always return one column:
                                749                 :          * category - the label or identifier for each column
                                750                 :          */
 7325                           751               8 :         if (spi_tupdesc->natts != 1)
 7199 tgl                       752               1 :             ereport(ERROR,
                                753                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
                                754                 :                      errmsg("provided \"categories\" SQL must " \
                                755                 :                             "return 1 column of at least one row")));
                                756                 : 
 7325 bruce                     757              34 :         for (i = 0; i < proc; i++)
                                758                 :         {
                                759                 :             crosstab_cat_desc *catdesc;
                                760                 :             char       *catname;
                                761                 :             HeapTuple   spi_tuple;
                                762                 : 
                                763                 :             /* get the next sql result tuple */
                                764              27 :             spi_tuple = spi_tuptable->vals[i];
                                765                 : 
                                766                 :             /* get the category from the current sql result tuple */
                                767              27 :             catname = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
 1203 mail                      768              27 :             if (catname == NULL)
 1203 mail                      769 UBC           0 :                 ereport(ERROR,
                                770                 :                         (errcode(ERRCODE_SYNTAX_ERROR),
                                771                 :                          errmsg("provided \"categories\" SQL must " \
                                772                 :                                 "not return NULL values")));
                                773                 : 
 7325 bruce                     774 CBC          27 :             SPIcontext = MemoryContextSwitchTo(per_query_ctx);
                                775                 : 
                                776              27 :             catdesc = (crosstab_cat_desc *) palloc(sizeof(crosstab_cat_desc));
                                777              27 :             catdesc->catname = catname;
                                778              27 :             catdesc->attidx = i;
                                779                 : 
                                780                 :             /* Add the proc description block to the hashtable */
 5602 tgl                       781             243 :             crosstab_HashTableInsert(crosstab_hash, catdesc);
                                782                 : 
 7325 bruce                     783              27 :             MemoryContextSwitchTo(SPIcontext);
                                784                 :         }
                                785                 :     }
                                786                 : 
                                787               9 :     if (SPI_finish() != SPI_OK_FINISH)
                                788                 :         /* internal error */
 7325 bruce                     789 UBC           0 :         elog(ERROR, "load_categories_hash: SPI_finish() failed");
                                790                 : 
 5602 tgl                       791 CBC           9 :     return crosstab_hash;
                                792                 : }
                                793                 : 
                                794                 : /*
                                795                 :  * create and populate the crosstab tuplestore using the provided source query
                                796                 :  */
                                797                 : static Tuplestorestate *
 7325 bruce                     798               9 : get_crosstab_tuplestore(char *sql,
                                799                 :                         HTAB *crosstab_hash,
                                800                 :                         TupleDesc tupdesc,
                                801                 :                         bool randomAccess)
                                802                 : {
                                803                 :     Tuplestorestate *tupstore;
 5602 tgl                       804               9 :     int         num_categories = hash_get_num_entries(crosstab_hash);
 7188 bruce                     805               9 :     AttInMetadata *attinmeta = TupleDescGetAttInMetadata(tupdesc);
                                806                 :     char      **values;
                                807                 :     HeapTuple   tuple;
                                808                 :     int         ret;
                                809                 :     uint64      proc;
                                810                 : 
                                811                 :     /* initialize our tuplestore (while still in query context!) */
 5275 tgl                       812               9 :     tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
                                813                 : 
                                814                 :     /* Connect to SPI manager */
 7325 bruce                     815               9 :     if ((ret = SPI_connect()) < 0)
                                816                 :         /* internal error */
 7325 bruce                     817 UBC           0 :         elog(ERROR, "get_crosstab_tuplestore: SPI_connect returned %d", ret);
                                818                 : 
                                819                 :     /* Now retrieve the crosstab source rows */
 6782 tgl                       820 CBC           9 :     ret = SPI_execute(sql, true, 0);
 7325 bruce                     821               9 :     proc = SPI_processed;
                                822                 : 
                                823                 :     /* Check for qualifying tuples */
                                824               9 :     if ((ret == SPI_OK_SELECT) && (proc > 0))
                                825                 :     {
 7188                           826               7 :         SPITupleTable *spi_tuptable = SPI_tuptable;
                                827               7 :         TupleDesc   spi_tupdesc = spi_tuptable->tupdesc;
                                828               7 :         int         ncols = spi_tupdesc->natts;
                                829                 :         char       *rowid;
                                830               7 :         char       *lastrowid = NULL;
 5629 mail                      831               7 :         bool        firstpass = true;
                                832                 :         uint64      i;
                                833                 :         int         j;
                                834                 :         int         result_ncols;
                                835                 : 
 6815                           836               7 :         if (num_categories == 0)
                                837                 :         {
                                838                 :             /* no qualifying category tuples */
                                839               1 :             ereport(ERROR,
                                840                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
                                841                 :                      errmsg("provided \"categories\" SQL must " \
                                842                 :                             "return 1 column of at least one row")));
                                843                 :         }
                                844                 : 
                                845                 :         /*
                                846                 :          * The provided SQL query must always return at least three columns:
                                847                 :          *
                                848                 :          * 1. rowname   the label for each row - column 1 in the final result
                                849                 :          * 2. category  the label for each value-column in the final result 3.
                                850                 :          * value     the values used to populate the value-columns
                                851                 :          *
                                852                 :          * If there are more than three columns, the last two are taken as
                                853                 :          * "category" and "values". The first column is taken as "rowname".
                                854                 :          * Additional columns (2 thru N-2) are assumed the same for the same
                                855                 :          * "rowname", and are copied into the result tuple from the first time
                                856                 :          * we encounter a particular rowname.
                                857                 :          */
 7325 bruce                     858               6 :         if (ncols < 3)
 7199 tgl                       859 UBC           0 :             ereport(ERROR,
                                860                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                861                 :                      errmsg("invalid source data SQL statement"),
                                862                 :                      errdetail("The provided SQL must return 3 " \
                                863                 :                                " columns; rowid, category, and values.")));
                                864                 : 
 7325 bruce                     865 CBC           6 :         result_ncols = (ncols - 2) + num_categories;
                                866                 : 
                                867                 :         /* Recheck to make sure we tuple descriptor still looks reasonable */
                                868               6 :         if (tupdesc->natts != result_ncols)
 7199 tgl                       869 UBC           0 :             ereport(ERROR,
                                870                 :                     (errcode(ERRCODE_SYNTAX_ERROR),
                                871                 :                      errmsg("invalid return type"),
                                872                 :                      errdetail("Query-specified return " \
                                873                 :                                "tuple has %d columns but crosstab " \
                                874                 :                                "returns %d.", tupdesc->natts, result_ncols)));
                                875                 : 
                                876                 :         /* allocate space and make sure it's clear */
 1474 michael                   877 CBC           6 :         values = (char **) palloc0(result_ncols * sizeof(char *));
                                878                 : 
 7325 bruce                     879              72 :         for (i = 0; i < proc; i++)
                                880                 :         {
                                881                 :             HeapTuple   spi_tuple;
                                882                 :             crosstab_cat_desc *catdesc;
                                883                 :             char       *catname;
                                884                 : 
                                885                 :             /* get the next sql result tuple */
                                886              66 :             spi_tuple = spi_tuptable->vals[i];
                                887                 : 
                                888                 :             /* get the rowid from the current sql result tuple */
                                889              66 :             rowid = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
                                890                 : 
                                891                 :             /*
                                892                 :              * if we're on a new output row, grab the column values up to
                                893                 :              * column N-2 now
                                894                 :              */
 5629 mail                      895              66 :             if (firstpass || !xstreq(lastrowid, rowid))
                                896                 :             {
                                897                 :                 /*
                                898                 :                  * a new row means we need to flush the old one first, unless
                                899                 :                  * we're on the very first row
                                900                 :                  */
                                901              18 :                 if (!firstpass)
                                902                 :                 {
                                903                 :                     /* rowid changed, flush the previous output row */
 7325 bruce                     904              12 :                     tuple = BuildTupleFromCStrings(attinmeta, values);
                                905                 : 
                                906              12 :                     tuplestore_puttuple(tupstore, tuple);
                                907                 : 
                                908              80 :                     for (j = 0; j < result_ncols; j++)
                                909              68 :                         xpfree(values[j]);
                                910                 :                 }
                                911                 : 
                                912              18 :                 values[0] = rowid;
                                913              33 :                 for (j = 1; j < ncols - 2; j++)
                                914              15 :                     values[j] = SPI_getvalue(spi_tuple, spi_tupdesc, j + 1);
                                915                 : 
                                916                 :                 /* we're no longer on the first pass */
 5629 mail                      917              18 :                 firstpass = false;
                                918                 :             }
                                919                 : 
                                920                 :             /* look up the category and fill in the appropriate column */
 7325 bruce                     921              66 :             catname = SPI_getvalue(spi_tuple, spi_tupdesc, ncols - 1);
                                922                 : 
                                923              66 :             if (catname != NULL)
                                924                 :             {
 5602 tgl                       925             594 :                 crosstab_HashTableLookup(crosstab_hash, catname, catdesc);
                                926                 : 
 7325 bruce                     927              66 :                 if (catdesc)
 7188                           928              63 :                     values[catdesc->attidx + ncols - 2] =
 7325                           929              63 :                         SPI_getvalue(spi_tuple, spi_tupdesc, ncols);
                                930                 :             }
                                931                 : 
                                932              66 :             xpfree(lastrowid);
 5629 mail                      933              66 :             xpstrdup(lastrowid, rowid);
                                934                 :         }
                                935                 : 
                                936                 :         /* flush the last output row */
 7325 bruce                     937               6 :         tuple = BuildTupleFromCStrings(attinmeta, values);
                                938                 : 
 6130 tgl                       939               6 :         tuplestore_puttuple(tupstore, tuple);
                                940                 :     }
                                941                 : 
 7325 bruce                     942               8 :     if (SPI_finish() != SPI_OK_FINISH)
                                943                 :         /* internal error */
 7325 bruce                     944 UBC           0 :         elog(ERROR, "get_crosstab_tuplestore: SPI_finish() failed");
                                945                 : 
 7325 bruce                     946 CBC           8 :     return tupstore;
                                947                 : }
                                948                 : 
                                949                 : /*
                                950                 :  * connectby_text - produce a result set from a hierarchical (parent/child)
                                951                 :  * table.
                                952                 :  *
                                953                 :  * e.g. given table foo:
                                954                 :  *
                                955                 :  *          keyid   parent_keyid pos
                                956                 :  *          ------+------------+--
                                957                 :  *          row1    NULL         0
                                958                 :  *          row2    row1         0
                                959                 :  *          row3    row1         0
                                960                 :  *          row4    row2         1
                                961                 :  *          row5    row2         0
                                962                 :  *          row6    row4         0
                                963                 :  *          row7    row3         0
                                964                 :  *          row8    row6         0
                                965                 :  *          row9    row5         0
                                966                 :  *
                                967                 :  *
                                968                 :  * connectby(text relname, text keyid_fld, text parent_keyid_fld
                                969                 :  *            [, text orderby_fld], text start_with, int max_depth
                                970                 :  *            [, text branch_delim])
                                971                 :  * connectby('foo', 'keyid', 'parent_keyid', 'pos', 'row2', 0, '~') returns:
                                972                 :  *
                                973                 :  *      keyid   parent_id   level    branch             serial
                                974                 :  *      ------+-----------+--------+-----------------------
                                975                 :  *      row2    NULL          0       row2                1
                                976                 :  *      row5    row2          1       row2~row5           2
                                977                 :  *      row9    row5          2       row2~row5~row9      3
                                978                 :  *      row4    row2          1       row2~row4           4
                                979                 :  *      row6    row4          2       row2~row4~row6      5
                                980                 :  *      row8    row6          3       row2~row4~row6~row8 6
                                981                 :  *
                                982                 :  */
 7524                           983               4 : PG_FUNCTION_INFO_V1(connectby_text);
                                984                 : 
                                985                 : #define CONNECTBY_NCOLS                 4
                                986                 : #define CONNECTBY_NCOLS_NOBRANCH        3
                                987                 : 
                                988                 : Datum
                                989              13 : connectby_text(PG_FUNCTION_ARGS)
                                990                 : {
 5493 tgl                       991              13 :     char       *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                                992              13 :     char       *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
                                993              13 :     char       *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
                                994              13 :     char       *start_with = text_to_cstring(PG_GETARG_TEXT_PP(3));
 7522 bruce                     995              13 :     int         max_depth = PG_GETARG_INT32(4);
                                996              13 :     char       *branch_delim = NULL;
                                997              13 :     bool        show_branch = false;
 7196                           998              13 :     bool        show_serial = false;
 7522                           999              13 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1000                 :     TupleDesc   tupdesc;
                               1001                 :     AttInMetadata *attinmeta;
                               1002                 :     MemoryContext per_query_ctx;
                               1003                 :     MemoryContext oldcontext;
                               1004                 : 
                               1005                 :     /* check to see if caller supports us returning a tuplestore */
 6483 tgl                      1006              13 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 7199 tgl                      1007 UBC           0 :         ereport(ERROR,
                               1008                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1009                 :                  errmsg("set-valued function called in context that cannot accept a set")));
 5276 tgl                      1010 CBC          13 :     if (!(rsinfo->allowedModes & SFRM_Materialize) ||
                               1011              13 :         rsinfo->expectedDesc == NULL)
 6483 tgl                      1012 UBC           0 :         ereport(ERROR,
                               1013                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1014                 :                  errmsg("materialize mode required, but it is not allowed in this context")));
                               1015                 : 
 7524 bruce                    1016 CBC          13 :     if (fcinfo->nargs == 6)
                               1017                 :     {
 5493 tgl                      1018               7 :         branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(5));
 7524 bruce                    1019               7 :         show_branch = true;
                               1020                 :     }
                               1021                 :     else
                               1022                 :         /* default is no show, tilde for the delimiter */
 7493                          1023               6 :         branch_delim = pstrdup("~");
                               1024                 : 
 7524                          1025              13 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
                               1026              13 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
                               1027                 : 
                               1028                 :     /* get the requested return tuple description */
                               1029              13 :     tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
                               1030                 : 
                               1031                 :     /* does it meet our needs */
 7196                          1032              13 :     validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
                               1033                 : 
                               1034                 :     /* OK, use it then */
 7524                          1035              12 :     attinmeta = TupleDescGetAttInMetadata(tupdesc);
                               1036                 : 
                               1037                 :     /* OK, go to work */
                               1038              12 :     rsinfo->returnMode = SFRM_Materialize;
                               1039              20 :     rsinfo->setResult = connectby(relname,
                               1040                 :                                   key_fld,
                               1041                 :                                   parent_key_fld,
                               1042                 :                                   NULL,
                               1043                 :                                   branch_delim,
                               1044                 :                                   start_with,
                               1045                 :                                   max_depth,
                               1046                 :                                   show_branch,
                               1047                 :                                   show_serial,
                               1048                 :                                   per_query_ctx,
 2118 tgl                      1049              12 :                                   rsinfo->allowedModes & SFRM_Materialize_Random,
                               1050                 :                                   attinmeta);
 7524 bruce                    1051               8 :     rsinfo->setDesc = tupdesc;
                               1052                 : 
                               1053               8 :     MemoryContextSwitchTo(oldcontext);
                               1054                 : 
                               1055                 :     /*
                               1056                 :      * SFRM_Materialize mode expects us to return a NULL Datum. The actual
                               1057                 :      * tuples are in our tuplestore and passed back through rsinfo->setResult.
                               1058                 :      * rsinfo->setDesc is set to the tuple description that we actually used
                               1059                 :      * to build our tuples with, so the caller can verify we did what it was
                               1060                 :      * expecting.
                               1061                 :      */
                               1062               8 :     return (Datum) 0;
                               1063                 : }
                               1064                 : 
 7196                          1065               4 : PG_FUNCTION_INFO_V1(connectby_text_serial);
                               1066                 : Datum
                               1067               2 : connectby_text_serial(PG_FUNCTION_ARGS)
                               1068                 : {
 5493 tgl                      1069               2 :     char       *relname = text_to_cstring(PG_GETARG_TEXT_PP(0));
                               1070               2 :     char       *key_fld = text_to_cstring(PG_GETARG_TEXT_PP(1));
                               1071               2 :     char       *parent_key_fld = text_to_cstring(PG_GETARG_TEXT_PP(2));
                               1072               2 :     char       *orderby_fld = text_to_cstring(PG_GETARG_TEXT_PP(3));
                               1073               2 :     char       *start_with = text_to_cstring(PG_GETARG_TEXT_PP(4));
 7196 bruce                    1074               2 :     int         max_depth = PG_GETARG_INT32(5);
                               1075               2 :     char       *branch_delim = NULL;
                               1076               2 :     bool        show_branch = false;
                               1077               2 :     bool        show_serial = true;
                               1078               2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
                               1079                 :     TupleDesc   tupdesc;
                               1080                 :     AttInMetadata *attinmeta;
                               1081                 :     MemoryContext per_query_ctx;
                               1082                 :     MemoryContext oldcontext;
                               1083                 : 
                               1084                 :     /* check to see if caller supports us returning a tuplestore */
 6483 tgl                      1085               2 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
 6483 tgl                      1086 UBC           0 :         ereport(ERROR,
                               1087                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1088                 :                  errmsg("set-valued function called in context that cannot accept a set")));
 5276 tgl                      1089 CBC           2 :     if (!(rsinfo->allowedModes & SFRM_Materialize) ||
                               1090               2 :         rsinfo->expectedDesc == NULL)
 6483 tgl                      1091 UBC           0 :         ereport(ERROR,
                               1092                 :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                               1093                 :                  errmsg("materialize mode required, but it is not allowed in this context")));
                               1094                 : 
 7196 bruce                    1095 CBC           2 :     if (fcinfo->nargs == 7)
                               1096                 :     {
 5493 tgl                      1097               1 :         branch_delim = text_to_cstring(PG_GETARG_TEXT_PP(6));
 7196 bruce                    1098               1 :         show_branch = true;
                               1099                 :     }
                               1100                 :     else
                               1101                 :         /* default is no show, tilde for the delimiter */
                               1102               1 :         branch_delim = pstrdup("~");
                               1103                 : 
                               1104               2 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
                               1105               2 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
                               1106                 : 
                               1107                 :     /* get the requested return tuple description */
                               1108               2 :     tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
                               1109                 : 
                               1110                 :     /* does it meet our needs */
                               1111               2 :     validateConnectbyTupleDesc(tupdesc, show_branch, show_serial);
                               1112                 : 
                               1113                 :     /* OK, use it then */
                               1114               2 :     attinmeta = TupleDescGetAttInMetadata(tupdesc);
                               1115                 : 
                               1116                 :     /* OK, go to work */
                               1117               2 :     rsinfo->returnMode = SFRM_Materialize;
                               1118               4 :     rsinfo->setResult = connectby(relname,
                               1119                 :                                   key_fld,
                               1120                 :                                   parent_key_fld,
                               1121                 :                                   orderby_fld,
                               1122                 :                                   branch_delim,
                               1123                 :                                   start_with,
                               1124                 :                                   max_depth,
                               1125                 :                                   show_branch,
                               1126                 :                                   show_serial,
                               1127                 :                                   per_query_ctx,
 2118 tgl                      1128               2 :                                   rsinfo->allowedModes & SFRM_Materialize_Random,
                               1129                 :                                   attinmeta);
 7196 bruce                    1130               2 :     rsinfo->setDesc = tupdesc;
                               1131                 : 
                               1132               2 :     MemoryContextSwitchTo(oldcontext);
                               1133                 : 
                               1134                 :     /*
                               1135                 :      * SFRM_Materialize mode expects us to return a NULL Datum. The actual
                               1136                 :      * tuples are in our tuplestore and passed back through rsinfo->setResult.
                               1137                 :      * rsinfo->setDesc is set to the tuple description that we actually used
                               1138                 :      * to build our tuples with, so the caller can verify we did what it was
                               1139                 :      * expecting.
                               1140                 :      */
                               1141               2 :     return (Datum) 0;
                               1142                 : }
                               1143                 : 
                               1144                 : 
                               1145                 : /*
                               1146                 :  * connectby - does the real work for connectby_text()
                               1147                 :  */
                               1148                 : static Tuplestorestate *
 7524                          1149              14 : connectby(char *relname,
                               1150                 :           char *key_fld,
                               1151                 :           char *parent_key_fld,
                               1152                 :           char *orderby_fld,
                               1153                 :           char *branch_delim,
                               1154                 :           char *start_with,
                               1155                 :           int max_depth,
                               1156                 :           bool show_branch,
                               1157                 :           bool show_serial,
                               1158                 :           MemoryContext per_query_ctx,
                               1159                 :           bool randomAccess,
                               1160                 :           AttInMetadata *attinmeta)
                               1161                 : {
 7522                          1162              14 :     Tuplestorestate *tupstore = NULL;
                               1163                 :     int         ret;
                               1164                 :     MemoryContext oldcontext;
                               1165                 : 
 7188                          1166              14 :     int         serial = 1;
                               1167                 : 
                               1168                 :     /* Connect to SPI manager */
 7524                          1169              14 :     if ((ret = SPI_connect()) < 0)
                               1170                 :         /* internal error */
 7524 bruce                    1171 UBC           0 :         elog(ERROR, "connectby: SPI_connect returned %d", ret);
                               1172                 : 
                               1173                 :     /* switch to longer term context to create the tuple store */
 7524 bruce                    1174 CBC          14 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
                               1175                 : 
                               1176                 :     /* initialize our tuplestore */
 5275 tgl                      1177              14 :     tupstore = tuplestore_begin_heap(randomAccess, false, work_mem);
                               1178                 : 
 7524 bruce                    1179              14 :     MemoryContextSwitchTo(oldcontext);
                               1180                 : 
                               1181                 :     /* now go get the whole tree */
 2992 tgl                      1182              14 :     build_tuplestore_recursively(key_fld,
                               1183                 :                                  parent_key_fld,
                               1184                 :                                  relname,
                               1185                 :                                  orderby_fld,
                               1186                 :                                  branch_delim,
                               1187                 :                                  start_with,
                               1188                 :                                  start_with,    /* current_branch */
                               1189                 :                                  0, /* initial level is 0 */
                               1190                 :                                  &serial,   /* initial serial is 1 */
                               1191                 :                                  max_depth,
                               1192                 :                                  show_branch,
                               1193                 :                                  show_serial,
                               1194                 :                                  per_query_ctx,
                               1195                 :                                  attinmeta,
                               1196                 :                                  tupstore);
                               1197                 : 
 7524 bruce                    1198              10 :     SPI_finish();
                               1199                 : 
                               1200              10 :     return tupstore;
                               1201                 : }
                               1202                 : 
                               1203                 : static void
                               1204              62 : build_tuplestore_recursively(char *key_fld,
                               1205                 :                              char *parent_key_fld,
                               1206                 :                              char *relname,
                               1207                 :                              char *orderby_fld,
                               1208                 :                              char *branch_delim,
                               1209                 :                              char *start_with,
                               1210                 :                              char *branch,
                               1211                 :                              int level,
                               1212                 :                              int *serial,
                               1213                 :                              int max_depth,
                               1214                 :                              bool show_branch,
                               1215                 :                              bool show_serial,
                               1216                 :                              MemoryContext per_query_ctx,
                               1217                 :                              AttInMetadata *attinmeta,
                               1218                 :                              Tuplestorestate *tupstore)
                               1219                 : {
 7522                          1220              62 :     TupleDesc   tupdesc = attinmeta->tupdesc;
                               1221                 :     int         ret;
                               1222                 :     uint64      proc;
                               1223                 :     int         serial_column;
                               1224                 :     StringInfoData sql;
                               1225                 :     char      **values;
                               1226                 :     char       *current_key;
                               1227                 :     char       *current_key_parent;
                               1228                 :     char        current_level[INT32_STRLEN];
                               1229                 :     char        serial_str[INT32_STRLEN];
                               1230                 :     char       *current_branch;
                               1231                 :     HeapTuple   tuple;
                               1232                 : 
                               1233              62 :     if (max_depth > 0 && level > max_depth)
 2992 tgl                      1234               1 :         return;
                               1235                 : 
 6248 neilc                    1236              61 :     initStringInfo(&sql);
                               1237                 : 
                               1238                 :     /* Build initial sql statement */
 7196 bruce                    1239              61 :     if (!show_serial)
                               1240                 :     {
 6248 neilc                    1241              49 :         appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s",
                               1242                 :                          key_fld,
                               1243                 :                          parent_key_fld,
                               1244                 :                          relname,
                               1245                 :                          parent_key_fld,
                               1246                 :                          quote_literal_cstr(start_with),
                               1247                 :                          key_fld, key_fld, parent_key_fld);
 7188 bruce                    1248              49 :         serial_column = 0;
                               1249                 :     }
                               1250                 :     else
                               1251                 :     {
 6248 neilc                    1252              12 :         appendStringInfo(&sql, "SELECT %s, %s FROM %s WHERE %s = %s AND %s IS NOT NULL AND %s <> %s ORDER BY %s",
                               1253                 :                          key_fld,
                               1254                 :                          parent_key_fld,
                               1255                 :                          relname,
                               1256                 :                          parent_key_fld,
                               1257                 :                          quote_literal_cstr(start_with),
                               1258                 :                          key_fld, key_fld, parent_key_fld,
                               1259                 :                          orderby_fld);
 7188 bruce                    1260              12 :         serial_column = 1;
                               1261                 :     }
                               1262                 : 
 7129 tgl                      1263              61 :     if (show_branch)
                               1264              38 :         values = (char **) palloc((CONNECTBY_NCOLS + serial_column) * sizeof(char *));
                               1265                 :     else
                               1266              23 :         values = (char **) palloc((CONNECTBY_NCOLS_NOBRANCH + serial_column) * sizeof(char *));
                               1267                 : 
                               1268                 :     /* First time through, do a little setup */
                               1269              61 :     if (level == 0)
                               1270                 :     {
                               1271                 :         /* root value is the one we initially start with */
                               1272              14 :         values[0] = start_with;
                               1273                 : 
                               1274                 :         /* root value has no parent */
                               1275              14 :         values[1] = NULL;
                               1276                 : 
                               1277                 :         /* root level is 0 */
                               1278              14 :         sprintf(current_level, "%d", level);
                               1279              14 :         values[2] = current_level;
                               1280                 : 
                               1281                 :         /* root branch is just starting root value */
                               1282              14 :         if (show_branch)
                               1283               7 :             values[3] = start_with;
                               1284                 : 
                               1285                 :         /* root starts the serial with 1 */
                               1286              14 :         if (show_serial)
                               1287                 :         {
                               1288               2 :             sprintf(serial_str, "%d", (*serial)++);
                               1289               2 :             if (show_branch)
                               1290               1 :                 values[4] = serial_str;
                               1291                 :             else
                               1292               1 :                 values[3] = serial_str;
                               1293                 :         }
                               1294                 : 
                               1295                 :         /* construct the tuple */
                               1296              14 :         tuple = BuildTupleFromCStrings(attinmeta, values);
                               1297                 : 
                               1298                 :         /* now store it */
                               1299              14 :         tuplestore_puttuple(tupstore, tuple);
                               1300                 : 
                               1301                 :         /* increment level */
                               1302              14 :         level++;
                               1303                 :     }
                               1304                 : 
                               1305                 :     /* Retrieve the desired rows */
 6248 neilc                    1306              61 :     ret = SPI_execute(sql.data, true, 0);
 7524 bruce                    1307              61 :     proc = SPI_processed;
                               1308                 : 
                               1309                 :     /* Check for qualifying tuples */
                               1310              61 :     if ((ret == SPI_OK_SELECT) && (proc > 0))
                               1311                 :     {
                               1312                 :         HeapTuple   spi_tuple;
 7522                          1313              45 :         SPITupleTable *tuptable = SPI_tuptable;
                               1314              45 :         TupleDesc   spi_tupdesc = tuptable->tupdesc;
                               1315                 :         uint64      i;
                               1316                 :         StringInfoData branchstr;
                               1317                 :         StringInfoData chk_branchstr;
                               1318                 :         StringInfoData chk_current_key;
                               1319                 : 
                               1320                 :         /*
                               1321                 :          * Check that return tupdesc is compatible with the one we got from
                               1322                 :          * the query.
                               1323                 :          */
 2992 tgl                      1324              45 :         compatConnectbyTupleDescs(tupdesc, spi_tupdesc);
                               1325                 : 
 5881 neilc                    1326              43 :         initStringInfo(&branchstr);
                               1327              43 :         initStringInfo(&chk_branchstr);
                               1328              43 :         initStringInfo(&chk_current_key);
                               1329                 : 
 7524 bruce                    1330              88 :         for (i = 0; i < proc; i++)
                               1331                 :         {
                               1332                 :             /* initialize branch for this pass */
 3447 rhaas                    1333              52 :             appendStringInfoString(&branchstr, branch);
 6248 neilc                    1334              52 :             appendStringInfo(&chk_branchstr, "%s%s%s", branch_delim, branch, branch_delim);
                               1335                 : 
                               1336                 :             /* get the next sql result tuple */
 7524 bruce                    1337              52 :             spi_tuple = tuptable->vals[i];
                               1338                 : 
                               1339                 :             /* get the current key (might be NULL) */
                               1340              52 :             current_key = SPI_getvalue(spi_tuple, spi_tupdesc, 1);
                               1341                 : 
                               1342                 :             /* get the parent key (might be NULL) */
 2992 tgl                      1343              52 :             current_key_parent = SPI_getvalue(spi_tuple, spi_tupdesc, 2);
                               1344                 : 
                               1345                 :             /* get the current level */
 7524 bruce                    1346              52 :             sprintf(current_level, "%d", level);
                               1347                 : 
                               1348                 :             /* check to see if this key is also an ancestor */
 2992 tgl                      1349              52 :             if (current_key)
                               1350                 :             {
                               1351              50 :                 appendStringInfo(&chk_current_key, "%s%s%s",
                               1352                 :                                  branch_delim, current_key, branch_delim);
                               1353              50 :                 if (strstr(chk_branchstr.data, chk_current_key.data))
 2807                          1354               2 :                     ereport(ERROR,
                               1355                 :                             (errcode(ERRCODE_INVALID_RECURSION),
                               1356                 :                              errmsg("infinite recursion detected")));
                               1357                 :             }
                               1358                 : 
                               1359                 :             /* OK, extend the branch */
 2992                          1360              50 :             if (current_key)
                               1361              48 :                 appendStringInfo(&branchstr, "%s%s", branch_delim, current_key);
 6248 neilc                    1362              50 :             current_branch = branchstr.data;
                               1363                 : 
                               1364                 :             /* build a tuple */
 2992 tgl                      1365              50 :             values[0] = current_key;
 7524 bruce                    1366              50 :             values[1] = current_key_parent;
                               1367              50 :             values[2] = current_level;
                               1368              50 :             if (show_branch)
                               1369              32 :                 values[3] = current_branch;
 7196                          1370              50 :             if (show_serial)
                               1371                 :             {
                               1372              10 :                 sprintf(serial_str, "%d", (*serial)++);
                               1373              10 :                 if (show_branch)
                               1374               5 :                     values[4] = serial_str;
                               1375                 :                 else
                               1376               5 :                     values[3] = serial_str;
                               1377                 :             }
                               1378                 : 
 7524                          1379              50 :             tuple = BuildTupleFromCStrings(attinmeta, values);
                               1380                 : 
                               1381                 :             /* store the tuple for later use */
                               1382              50 :             tuplestore_puttuple(tupstore, tuple);
                               1383                 : 
                               1384              50 :             heap_freetuple(tuple);
                               1385                 : 
                               1386                 :             /* recurse using current_key as the new start_with */
 2992 tgl                      1387              50 :             if (current_key)
                               1388              48 :                 build_tuplestore_recursively(key_fld,
                               1389                 :                                              parent_key_fld,
                               1390                 :                                              relname,
                               1391                 :                                              orderby_fld,
                               1392                 :                                              branch_delim,
                               1393                 :                                              current_key,
                               1394                 :                                              current_branch,
                               1395                 :                                              level + 1,
                               1396                 :                                              serial,
                               1397                 :                                              max_depth,
                               1398                 :                                              show_branch,
                               1399                 :                                              show_serial,
                               1400                 :                                              per_query_ctx,
                               1401                 :                                              attinmeta,
                               1402                 :                                              tupstore);
                               1403                 : 
                               1404              45 :             xpfree(current_key);
                               1405              45 :             xpfree(current_key_parent);
                               1406                 : 
                               1407                 :             /* reset branch for next pass */
 5881 neilc                    1408              45 :             resetStringInfo(&branchstr);
                               1409              45 :             resetStringInfo(&chk_branchstr);
                               1410              45 :             resetStringInfo(&chk_current_key);
                               1411                 :         }
                               1412                 : 
                               1413              36 :         xpfree(branchstr.data);
                               1414              36 :         xpfree(chk_branchstr.data);
                               1415              36 :         xpfree(chk_current_key.data);
                               1416                 :     }
                               1417                 : }
                               1418                 : 
                               1419                 : /*
                               1420                 :  * Check expected (query runtime) tupdesc suitable for Connectby
                               1421                 :  */
                               1422                 : static void
 2058 andres                   1423              15 : validateConnectbyTupleDesc(TupleDesc td, bool show_branch, bool show_serial)
                               1424                 : {
 7188 bruce                    1425              15 :     int         serial_column = 0;
                               1426                 : 
 7196                          1427              15 :     if (show_serial)
 7188                          1428               2 :         serial_column = 1;
                               1429                 : 
                               1430                 :     /* are there the correct number of columns */
 7524                          1431              15 :     if (show_branch)
                               1432                 :     {
 2058 andres                   1433               8 :         if (td->natts != (CONNECTBY_NCOLS + serial_column))
 7199 tgl                      1434 UBC           0 :             ereport(ERROR,
                               1435                 :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1436                 :                      errmsg("invalid return type"),
                               1437                 :                      errdetail("Query-specified return tuple has " \
                               1438                 :                                "wrong number of columns.")));
                               1439                 :     }
                               1440                 :     else
                               1441                 :     {
 2058 andres                   1442 CBC           7 :         if (td->natts != CONNECTBY_NCOLS_NOBRANCH + serial_column)
 7199 tgl                      1443 UBC           0 :             ereport(ERROR,
                               1444                 :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1445                 :                      errmsg("invalid return type"),
                               1446                 :                      errdetail("Query-specified return tuple has " \
                               1447                 :                                "wrong number of columns.")));
                               1448                 :     }
                               1449                 : 
                               1450                 :     /* check that the types of the first two columns match */
 2058 andres                   1451 CBC          15 :     if (TupleDescAttr(td, 0)->atttypid != TupleDescAttr(td, 1)->atttypid)
 7199 tgl                      1452               1 :         ereport(ERROR,
                               1453                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1454                 :                  errmsg("invalid return type"),
                               1455                 :                  errdetail("First two columns must be the same type.")));
                               1456                 : 
                               1457                 :     /* check that the type of the third column is INT4 */
 2058 andres                   1458              14 :     if (TupleDescAttr(td, 2)->atttypid != INT4OID)
 7199 tgl                      1459 UBC           0 :         ereport(ERROR,
                               1460                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1461                 :                  errmsg("invalid return type"),
                               1462                 :                  errdetail("Third column must be type %s.",
                               1463                 :                            format_type_be(INT4OID))));
                               1464                 : 
                               1465                 :     /* check that the type of the fourth column is TEXT if applicable */
 2058 andres                   1466 CBC          14 :     if (show_branch && TupleDescAttr(td, 3)->atttypid != TEXTOID)
 7199 tgl                      1467 UBC           0 :         ereport(ERROR,
                               1468                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1469                 :                  errmsg("invalid return type"),
                               1470                 :                  errdetail("Fourth column must be type %s.",
                               1471                 :                            format_type_be(TEXTOID))));
                               1472                 : 
                               1473                 :     /* check that the type of the fifth column is INT4 */
 2058 andres                   1474 CBC          14 :     if (show_branch && show_serial &&
                               1475               1 :         TupleDescAttr(td, 4)->atttypid != INT4OID)
 2807 tgl                      1476 UBC           0 :         ereport(ERROR,
                               1477                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1478                 :                  errmsg("query-specified return tuple not valid for Connectby: "
                               1479                 :                         "fifth column must be type %s",
                               1480                 :                         format_type_be(INT4OID))));
                               1481                 : 
                               1482                 :     /* check that the type of the fourth column is INT4 */
 2058 andres                   1483 CBC          14 :     if (!show_branch && show_serial &&
                               1484               1 :         TupleDescAttr(td, 3)->atttypid != INT4OID)
 2807 tgl                      1485 UBC           0 :         ereport(ERROR,
                               1486                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1487                 :                  errmsg("query-specified return tuple not valid for Connectby: "
                               1488                 :                         "fourth column must be type %s",
                               1489                 :                         format_type_be(INT4OID))));
                               1490                 : 
                               1491                 :     /* OK, the tupdesc is valid for our purposes */
 7524 bruce                    1492 CBC          14 : }
                               1493                 : 
                               1494                 : /*
                               1495                 :  * Check if spi sql tupdesc and return tupdesc are compatible
                               1496                 :  */
                               1497                 : static void
                               1498              45 : compatConnectbyTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
                               1499                 : {
                               1500                 :     Oid         ret_atttypid;
                               1501                 :     Oid         sql_atttypid;
                               1502                 :     int32       ret_atttypmod;
                               1503                 :     int32       sql_atttypmod;
                               1504                 : 
                               1505                 :     /*
                               1506                 :      * Result must have at least 2 columns.
                               1507                 :      */
 2992 tgl                      1508              45 :     if (sql_tupdesc->natts < 2)
                               1509               1 :         ereport(ERROR,
                               1510                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1511                 :                  errmsg("invalid return type"),
                               1512                 :                  errdetail("Query must return at least two columns.")));
                               1513                 : 
                               1514                 :     /*
                               1515                 :      * These columns must match the result type indicated by the calling
                               1516                 :      * query.
                               1517                 :      */
 2058 andres                   1518              44 :     ret_atttypid = TupleDescAttr(ret_tupdesc, 0)->atttypid;
                               1519              44 :     sql_atttypid = TupleDescAttr(sql_tupdesc, 0)->atttypid;
                               1520              44 :     ret_atttypmod = TupleDescAttr(ret_tupdesc, 0)->atttypmod;
                               1521              44 :     sql_atttypmod = TupleDescAttr(sql_tupdesc, 0)->atttypmod;
 2992 tgl                      1522              44 :     if (ret_atttypid != sql_atttypid ||
 2992 tgl                      1523 UBC           0 :         (ret_atttypmod >= 0 && ret_atttypmod != sql_atttypmod))
 7199 tgl                      1524 CBC           1 :         ereport(ERROR,
                               1525                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1526                 :                  errmsg("invalid return type"),
                               1527                 :                  errdetail("SQL key field type %s does " \
                               1528                 :                            "not match return key field type %s.",
                               1529                 :                            format_type_with_typemod(ret_atttypid, ret_atttypmod),
                               1530                 :                            format_type_with_typemod(sql_atttypid, sql_atttypmod))));
                               1531                 : 
 2058 andres                   1532              43 :     ret_atttypid = TupleDescAttr(ret_tupdesc, 1)->atttypid;
                               1533              43 :     sql_atttypid = TupleDescAttr(sql_tupdesc, 1)->atttypid;
                               1534              43 :     ret_atttypmod = TupleDescAttr(ret_tupdesc, 1)->atttypmod;
                               1535              43 :     sql_atttypmod = TupleDescAttr(sql_tupdesc, 1)->atttypmod;
 2992 tgl                      1536              43 :     if (ret_atttypid != sql_atttypid ||
 2992 tgl                      1537 UBC           0 :         (ret_atttypmod >= 0 && ret_atttypmod != sql_atttypmod))
 7199                          1538               0 :         ereport(ERROR,
                               1539                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1540                 :                  errmsg("invalid return type"),
                               1541                 :                  errdetail("SQL parent key field type %s does " \
                               1542                 :                            "not match return parent key field type %s.",
                               1543                 :                            format_type_with_typemod(ret_atttypid, ret_atttypmod),
                               1544                 :                            format_type_with_typemod(sql_atttypid, sql_atttypmod))));
                               1545                 : 
                               1546                 :     /* OK, the two tupdescs are compatible for our purposes */
 7524 bruce                    1547 CBC          43 : }
                               1548                 : 
                               1549                 : /*
                               1550                 :  * Check if two tupdescs match in type of attributes
                               1551                 :  */
                               1552                 : static bool
                               1553              16 : compatCrosstabTupleDescs(TupleDesc ret_tupdesc, TupleDesc sql_tupdesc)
                               1554                 : {
                               1555                 :     int         i;
                               1556                 :     Form_pg_attribute ret_attr;
                               1557                 :     Oid         ret_atttypid;
                               1558                 :     Form_pg_attribute sql_attr;
                               1559                 :     Oid         sql_atttypid;
                               1560                 : 
 5242 tgl                      1561              16 :     if (ret_tupdesc->natts < 2 ||
                               1562              16 :         sql_tupdesc->natts < 3)
 5242 tgl                      1563 UBC           0 :         return false;
                               1564                 : 
                               1565                 :     /* check the rowid types match */
 2058 andres                   1566 CBC          16 :     ret_atttypid = TupleDescAttr(ret_tupdesc, 0)->atttypid;
                               1567              16 :     sql_atttypid = TupleDescAttr(sql_tupdesc, 0)->atttypid;
 7558 bruce                    1568              16 :     if (ret_atttypid != sql_atttypid)
 7199 tgl                      1569 UBC           0 :         ereport(ERROR,
                               1570                 :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
                               1571                 :                  errmsg("invalid return type"),
                               1572                 :                  errdetail("SQL rowid datatype does not match " \
                               1573                 :                            "return rowid datatype.")));
                               1574                 : 
                               1575                 :     /*
                               1576                 :      * - attribute [1] of the sql tuple is the category; no need to check it -
                               1577                 :      * attribute [2] of the sql tuple should match attributes [1] to [natts]
                               1578                 :      * of the return tuple
                               1579                 :      */
 2058 andres                   1580 CBC          16 :     sql_attr = TupleDescAttr(sql_tupdesc, 2);
 7558 bruce                    1581              64 :     for (i = 1; i < ret_tupdesc->natts; i++)
                               1582                 :     {
 2058 andres                   1583              48 :         ret_attr = TupleDescAttr(ret_tupdesc, i);
                               1584                 : 
 7558 bruce                    1585              48 :         if (ret_attr->atttypid != sql_attr->atttypid)
 7558 bruce                    1586 UBC           0 :             return false;
                               1587                 :     }
                               1588                 : 
                               1589                 :     /* OK, the two tupdescs are compatible for our purposes */
 7558 bruce                    1590 CBC          16 :     return true;
                               1591                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a