LCOV - differential code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Coverage Total Hit UIC GBC GIC CBC ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 92.4 % 119 110 9 9 50 51 59
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 6 6 4 2 4
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 92.4 % 119 110 9 9 50 51 59
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 60.0 % 10 6 4 2 4

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /* -------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * worker_spi.c
                                  4                 :  *      Sample background worker code that demonstrates various coding
                                  5                 :  *      patterns: establishing a database connection; starting and committing
                                  6                 :  *      transactions; using GUC variables, and heeding SIGHUP to reread
                                  7                 :  *      the configuration file; reporting to pg_stat_activity; using the
                                  8                 :  *      process latch to sleep and exit in case of postmaster death.
                                  9                 :  *
                                 10                 :  * This code connects to a database, creates a schema and table, and summarizes
                                 11                 :  * the numbers contained therein.  To see it working, insert an initial value
                                 12                 :  * with "total" type and some initial value; then insert some other rows with
                                 13                 :  * "delta" type.  Delta rows will be deleted by this worker and their values
                                 14                 :  * aggregated into the total.
                                 15                 :  *
                                 16                 :  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
                                 17                 :  *
                                 18                 :  * IDENTIFICATION
                                 19                 :  *      src/test/modules/worker_spi/worker_spi.c
                                 20                 :  *
                                 21                 :  * -------------------------------------------------------------------------
                                 22                 :  */
                                 23                 : #include "postgres.h"
                                 24                 : 
                                 25                 : /* These are always necessary for a bgworker */
                                 26                 : #include "miscadmin.h"
                                 27                 : #include "postmaster/bgworker.h"
                                 28                 : #include "postmaster/interrupt.h"
                                 29                 : #include "storage/ipc.h"
                                 30                 : #include "storage/latch.h"
                                 31                 : #include "storage/lwlock.h"
                                 32                 : #include "storage/proc.h"
                                 33                 : #include "storage/shmem.h"
                                 34                 : 
                                 35                 : /* these headers are used by this particular worker's code */
                                 36                 : #include "access/xact.h"
                                 37                 : #include "executor/spi.h"
                                 38                 : #include "fmgr.h"
                                 39                 : #include "lib/stringinfo.h"
                                 40                 : #include "pgstat.h"
                                 41                 : #include "utils/builtins.h"
                                 42                 : #include "utils/snapmgr.h"
                                 43                 : #include "tcop/utility.h"
                                 44                 : 
 3776 alvherre                   45 CBC           1 : PG_MODULE_MAGIC;
                                 46                 : 
 3554 rhaas                      47               2 : PG_FUNCTION_INFO_V1(worker_spi_launch);
                                 48                 : 
                                 49                 : PGDLLEXPORT void worker_spi_main(Datum) pg_attribute_noreturn();
                                 50                 : 
                                 51                 : /* GUC variables */
                                 52                 : static int  worker_spi_naptime = 10;
                                 53                 : static int  worker_spi_total_workers = 2;
                                 54                 : static char *worker_spi_database = NULL;
                                 55                 : 
                                 56                 : 
                                 57                 : typedef struct worktable
                                 58                 : {
                                 59                 :     const char *schema;
                                 60                 :     const char *name;
                                 61                 : } worktable;
                                 62                 : 
                                 63                 : /*
                                 64                 :  * Initialize workspace for a worker process: create the schema if it doesn't
                                 65                 :  * already exist.
                                 66                 :  */
 3776 alvherre                   67 ECB             : static void
 3776 alvherre                   68 GIC           1 : initialize_worker_spi(worktable *table)
                                 69                 : {
                                 70                 :     int         ret;
                                 71                 :     int         ntup;
                                 72                 :     bool        isnull;
                                 73                 :     StringInfoData buf;
 3776 alvherre                   74 ECB             : 
 3651 alvherre                   75 CBC           1 :     SetCurrentStatementStartTimestamp();
 3776                            76               1 :     StartTransactionCommand();
                                 77               1 :     SPI_connect();
                                 78               1 :     PushActiveSnapshot(GetTransactionSnapshot());
 2047 peter_e                    79 GIC           1 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
                                 80                 : 
 3651 alvherre                   81 ECB             :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
 3776 alvherre                   82 CBC           1 :     initStringInfo(&buf);
 3776 alvherre                   83 GIC           1 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
                                 84                 :                      table->schema);
 3776 alvherre                   85 ECB             : 
  890 noah                       86 CBC           1 :     debug_query_string = buf.data;
 3776 alvherre                   87               1 :     ret = SPI_execute(buf.data, true, 0);
 3776 alvherre                   88 GBC           1 :     if (ret != SPI_OK_SELECT)
 3776 alvherre                   89 UIC           0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
 3776 alvherre                   90 ECB             : 
 3776 alvherre                   91 GBC           1 :     if (SPI_processed != 1)
 3776 alvherre                   92 UIC           0 :         elog(FATAL, "not a singleton result");
 3776 alvherre                   93 ECB             : 
 3505 stark                      94 CBC           1 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
 3776 alvherre                   95 GIC           1 :                                        SPI_tuptable->tupdesc,
 3776 alvherre                   96 ECB             :                                        1, &isnull));
 3776 alvherre                   97 GBC           1 :     if (isnull)
 3776 alvherre                   98 UIC           0 :         elog(FATAL, "null result");
 3776 alvherre                   99 ECB             : 
 3776 alvherre                  100 GIC           1 :     if (ntup == 0)
 3776 alvherre                  101 ECB             :     {
  890 noah                      102 CBC           1 :         debug_query_string = NULL;
 3776 alvherre                  103               1 :         resetStringInfo(&buf);
 3776 alvherre                  104 GIC           1 :         appendStringInfo(&buf,
                                105                 :                          "CREATE SCHEMA \"%s\" "
                                106                 :                          "CREATE TABLE \"%s\" ("
                                107                 :                          "     type text CHECK (type IN ('total', 'delta')), "
                                108                 :                          "     value   integer)"
                                109                 :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
                                110                 :                          "WHERE type = 'total'",
                                111                 :                          table->schema, table->name, table->name, table->name);
                                112                 : 
 3651 alvherre                  113 ECB             :         /* set statement start time */
 3651 alvherre                  114 GIC           1 :         SetCurrentStatementStartTimestamp();
 3651 alvherre                  115 ECB             : 
  890 noah                      116 CBC           1 :         debug_query_string = buf.data;
 3776 alvherre                  117 GIC           1 :         ret = SPI_execute(buf.data, false, 0);
 3776 alvherre                  118 ECB             : 
 3776 alvherre                  119 GBC           1 :         if (ret != SPI_OK_UTILITY)
 3776 alvherre                  120 UIC           0 :             elog(FATAL, "failed to create my schema");
  890 noah                      121 ECB             : 
  890 noah                      122 GIC           1 :         debug_query_string = NULL;  /* rest is not statement-specific */
                                123                 :     }
 3776 alvherre                  124 ECB             : 
 3776 alvherre                  125 CBC           1 :     SPI_finish();
                                126               1 :     PopActiveSnapshot();
                                127               1 :     CommitTransactionCommand();
  890 noah                      128               1 :     debug_query_string = NULL;
 3651 alvherre                  129               1 :     pgstat_report_activity(STATE_IDLE, NULL);
 3776 alvherre                  130 GIC           1 : }
                                131                 : 
 3554 rhaas                     132 ECB             : void
 3554 rhaas                     133 GIC           3 : worker_spi_main(Datum main_arg)
 3776 alvherre                  134 ECB             : {
 3554 rhaas                     135 GIC           3 :     int         index = DatumGetInt32(main_arg);
                                136                 :     worktable  *table;
                                137                 :     StringInfoData buf;
                                138                 :     char        name[20];
 3554 rhaas                     139 ECB             : 
 3554 rhaas                     140 CBC           3 :     table = palloc(sizeof(worktable));
                                141               3 :     sprintf(name, "schema%d", index);
                                142               3 :     table->schema = pstrdup(name);
 3554 rhaas                     143 GIC           3 :     table->name = pstrdup("counted");
                                144                 : 
 3554 rhaas                     145 ECB             :     /* Establish signal handlers before unblocking signals. */
  863 fujii                     146 CBC           3 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
  863 fujii                     147 GIC           3 :     pqsignal(SIGTERM, die);
                                148                 : 
 3776 alvherre                  149 ECB             :     /* We're now ready to receive signals */
 3776 alvherre                  150 GIC           3 :     BackgroundWorkerUnblockSignals();
                                151                 : 
 3776 alvherre                  152 ECB             :     /* Connect to our database */
 1407 alvherre                  153 GIC           3 :     BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
 3776 alvherre                  154 ECB             : 
 3776 alvherre                  155 GIC           1 :     elog(LOG, "%s initialized with %s.%s",
 3776 alvherre                  156 ECB             :          MyBgworkerEntry->bgw_name, table->schema, table->name);
 3776 alvherre                  157 GIC           1 :     initialize_worker_spi(table);
                                158                 : 
                                159                 :     /*
                                160                 :      * Quote identifiers passed to us.  Note that this must be done after
                                161                 :      * initialize_worker_spi, because that routine assumes the names are not
                                162                 :      * quoted.
                                163                 :      *
                                164                 :      * Note some memory might be leaked here.
 3776 alvherre                  165 ECB             :      */
 3776 alvherre                  166 CBC           1 :     table->schema = quote_identifier(table->schema);
 3776 alvherre                  167 GIC           1 :     table->name = quote_identifier(table->name);
 3776 alvherre                  168 ECB             : 
 3776 alvherre                  169 CBC           1 :     initStringInfo(&buf);
 3776 alvherre                  170 GIC           1 :     appendStringInfo(&buf,
                                171                 :                      "WITH deleted AS (DELETE "
                                172                 :                      "FROM %s.%s "
                                173                 :                      "WHERE type = 'delta' RETURNING value), "
                                174                 :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
                                175                 :                      "FROM deleted) "
                                176                 :                      "UPDATE %s.%s "
                                177                 :                      "SET value = %s.value + total.sum "
                                178                 :                      "FROM total WHERE type = 'total' "
                                179                 :                      "RETURNING %s.value",
                                180                 :                      table->schema, table->name,
                                181                 :                      table->schema, table->name,
                                182                 :                      table->name,
                                183                 :                      table->name);
                                184                 : 
                                185                 :     /*
                                186                 :      * Main loop: do this until SIGTERM is received and processed by
                                187                 :      * ProcessInterrupts.
                                188                 :      */
  863 fujii                     189 ECB             :     for (;;)
 3776 alvherre                  190 GIC           2 :     {
                                191                 :         int         ret;
                                192                 : 
                                193                 :         /*
                                194                 :          * Background workers mustn't call usleep() or any direct equivalent:
                                195                 :          * instead, they may wait on their process latch, which sleeps as
                                196                 :          * necessary, but is awakened if postmaster dies.  That way the
                                197                 :          * background process goes away immediately in an emergency.
 3776 alvherre                  198 ECB             :          */
 1598 alvherre                  199 GIC           3 :         (void) WaitLatch(MyLatch,
                                200                 :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
                                201                 :                          worker_spi_naptime * 1000L,
 1598 alvherre                  202 ECB             :                          PG_WAIT_EXTENSION);
 3007 andres                    203 GIC           3 :         ResetLatch(MyLatch);
 3776 alvherre                  204 ECB             : 
 2133 andres                    205 GIC           3 :         CHECK_FOR_INTERRUPTS();
                                206                 : 
                                207                 :         /*
                                208                 :          * In case of a SIGHUP, just reload the configuration.
 3651 alvherre                  209 ECB             :          */
  863 fujii                     210 GIC           2 :         if (ConfigReloadPending)
 3602 bruce                     211 ECB             :         {
  863 fujii                     212 CBC           1 :             ConfigReloadPending = false;
 3602 bruce                     213 GIC           1 :             ProcessConfigFile(PGC_SIGHUP);
                                214                 :         }
                                215                 : 
                                216                 :         /*
                                217                 :          * Start a transaction on which we can run queries.  Note that each
                                218                 :          * StartTransactionCommand() call should be preceded by a
                                219                 :          * SetCurrentStatementStartTimestamp() call, which sets both the time
                                220                 :          * for the statement we're about the run, and also the transaction
                                221                 :          * start time.  Also, each other query sent to SPI should probably be
                                222                 :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
                                223                 :          * start time is always up to date.
                                224                 :          *
                                225                 :          * The SPI_connect() call lets us run queries through the SPI manager,
                                226                 :          * and the PushActiveSnapshot() call creates an "active" snapshot
                                227                 :          * which is necessary for queries to have MVCC data to work on.
                                228                 :          *
                                229                 :          * The pgstat_report_activity() call makes our activity visible
                                230                 :          * through the pgstat views.
 3651 alvherre                  231 ECB             :          */
 3651 alvherre                  232 CBC           2 :         SetCurrentStatementStartTimestamp();
 3776                           233               2 :         StartTransactionCommand();
                                234               2 :         SPI_connect();
                                235               2 :         PushActiveSnapshot(GetTransactionSnapshot());
  890 noah                      236               2 :         debug_query_string = buf.data;
 3651 alvherre                  237 GIC           2 :         pgstat_report_activity(STATE_RUNNING, buf.data);
                                238                 : 
 3651 alvherre                  239 ECB             :         /* We can now execute queries via SPI */
 3776 alvherre                  240 GIC           2 :         ret = SPI_execute(buf.data, false, 0);
 3776 alvherre                  241 ECB             : 
 3776 alvherre                  242 GBC           2 :         if (ret != SPI_OK_UPDATE_RETURNING)
 3776 alvherre                  243 UIC           0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
                                244                 :                  table->schema, table->name, ret);
 3776 alvherre                  245 ECB             : 
 3776 alvherre                  246 GIC           2 :         if (SPI_processed > 0)
                                247                 :         {
                                248                 :             bool        isnull;
                                249                 :             int32       val;
 3776 alvherre                  250 ECB             : 
 3776 alvherre                  251 CBC           1 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
 3602 bruce                     252 GIC           1 :                                               SPI_tuptable->tupdesc,
 3602 bruce                     253 ECB             :                                               1, &isnull));
 3776 alvherre                  254 CBC           1 :             if (!isnull)
 3776 alvherre                  255 GIC           1 :                 elog(LOG, "%s: count in %s.%s is now %d",
                                256                 :                      MyBgworkerEntry->bgw_name,
                                257                 :                      table->schema, table->name, val);
                                258                 :         }
                                259                 : 
                                260                 :         /*
                                261                 :          * And finish our transaction.
 3651 alvherre                  262 ECB             :          */
 3776 alvherre                  263 CBC           2 :         SPI_finish();
                                264               2 :         PopActiveSnapshot();
                                265               2 :         CommitTransactionCommand();
  890 noah                      266               2 :         debug_query_string = NULL;
  368 andres                    267               2 :         pgstat_report_stat(true);
 3651 alvherre                  268 GIC           2 :         pgstat_report_activity(STATE_IDLE, NULL);
                                269                 :     }
                                270                 : 
                                271                 :     /* Not reachable */
                                272                 : }
                                273                 : 
                                274                 : /*
                                275                 :  * Entrypoint of this module.
                                276                 :  *
                                277                 :  * We register more than one worker process here, to demonstrate how that can
                                278                 :  * be done.
                                279                 :  */
 3776 alvherre                  280 ECB             : void
 3776 alvherre                  281 GIC           1 : _PG_init(void)
                                282                 : {
                                283                 :     BackgroundWorker worker;
                                284                 : 
 3651 alvherre                  285 ECB             :     /* get the configuration */
 3651 alvherre                  286 GIC           1 :     DefineCustomIntVariable("worker_spi.naptime",
                                287                 :                             "Duration between each check (in seconds).",
                                288                 :                             NULL,
                                289                 :                             &worker_spi_naptime,
                                290                 :                             10,
                                291                 :                             1,
                                292                 :                             INT_MAX,
                                293                 :                             PGC_SIGHUP,
                                294                 :                             0,
                                295                 :                             NULL,
                                296                 :                             NULL,
                                297                 :                             NULL);
 3554 rhaas                     298 ECB             : 
 3554 rhaas                     299 GBC           1 :     if (!process_shared_preload_libraries_in_progress)
 3554 rhaas                     300 UIC           0 :         return;
 3554 rhaas                     301 ECB             : 
 3651 alvherre                  302 GIC           1 :     DefineCustomIntVariable("worker_spi.total_workers",
                                303                 :                             "Number of workers.",
                                304                 :                             NULL,
                                305                 :                             &worker_spi_total_workers,
                                306                 :                             2,
                                307                 :                             1,
                                308                 :                             100,
                                309                 :                             PGC_POSTMASTER,
                                310                 :                             0,
                                311                 :                             NULL,
                                312                 :                             NULL,
                                313                 :                             NULL);
 3651 alvherre                  314 ECB             : 
 1407 alvherre                  315 GIC           1 :     DefineCustomStringVariable("worker_spi.database",
                                316                 :                                "Database to connect to.",
                                317                 :                                NULL,
                                318                 :                                &worker_spi_database,
                                319                 :                                "postgres",
                                320                 :                                PGC_POSTMASTER,
                                321                 :                                0,
                                322                 :                                NULL, NULL, NULL);
 1407 alvherre                  323 ECB             : 
  412 tgl                       324 GIC           1 :     MarkGUCPrefixReserved("worker_spi");
                                325                 : 
 3651 alvherre                  326 ECB             :     /* set up common data for all our workers */
 2184 tgl                       327 CBC           1 :     memset(&worker, 0, sizeof(worker));
 3776 alvherre                  328 GIC           1 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
 3776 alvherre                  329 ECB             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3776 alvherre                  330 CBC           1 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
 3651                           331               1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
 2200 rhaas                     332               1 :     sprintf(worker.bgw_library_name, "worker_spi");
                                333               1 :     sprintf(worker.bgw_function_name, "worker_spi_main");
 3240 rhaas                     334 GIC           1 :     worker.bgw_notify_pid = 0;
                                335                 : 
                                336                 :     /*
                                337                 :      * Now fill in worker-specific data, and do the actual registrations.
 3776 alvherre                  338 ECB             :      */
  550 peter                     339 GIC           3 :     for (int i = 1; i <= worker_spi_total_workers; i++)
 3651 alvherre                  340 ECB             :     {
 2047 peter_e                   341 CBC           2 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
                                342               2 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
 3554 rhaas                     343 GIC           2 :         worker.bgw_main_arg = Int32GetDatum(i);
 3651 alvherre                  344 ECB             : 
 3651 alvherre                  345 GIC           2 :         RegisterBackgroundWorker(&worker);
                                346                 :     }
                                347                 : }
                                348                 : 
                                349                 : /*
                                350                 :  * Dynamically launch an SPI worker.
                                351                 :  */
 3554 rhaas                     352 ECB             : Datum
 3554 rhaas                     353 GIC           1 : worker_spi_launch(PG_FUNCTION_ARGS)
 3554 rhaas                     354 ECB             : {
 3554 rhaas                     355 GIC           1 :     int32       i = PG_GETARG_INT32(0);
                                356                 :     BackgroundWorker worker;
                                357                 :     BackgroundWorkerHandle *handle;
                                358                 :     BgwHandleStatus status;
                                359                 :     pid_t       pid;
 3554 rhaas                     360 ECB             : 
 2184 tgl                       361 CBC           1 :     memset(&worker, 0, sizeof(worker));
 3554 rhaas                     362 GIC           1 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
 3554 rhaas                     363 ECB             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
 3554 rhaas                     364 CBC           1 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
                                365               1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
                                366               1 :     sprintf(worker.bgw_library_name, "worker_spi");
                                367               1 :     sprintf(worker.bgw_function_name, "worker_spi_main");
 2047 peter_e                   368               1 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
                                369               1 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
 3554 rhaas                     370 GIC           1 :     worker.bgw_main_arg = Int32GetDatum(i);
 3511 rhaas                     371 ECB             :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
 3511 rhaas                     372 GIC           1 :     worker.bgw_notify_pid = MyProcPid;
 3511 rhaas                     373 ECB             : 
 3511 rhaas                     374 GBC           1 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
 3511 rhaas                     375 UIC           0 :         PG_RETURN_NULL();
 3511 rhaas                     376 ECB             : 
 3511 rhaas                     377 GIC           1 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
 3511 rhaas                     378 ECB             : 
 3511 rhaas                     379 GBC           1 :     if (status == BGWH_STOPPED)
 3511 rhaas                     380 UIC           0 :         ereport(ERROR,
                                381                 :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
                                382                 :                  errmsg("could not start background process"),
 2118 tgl                       383 ECB             :                  errhint("More details may be available in the server log.")));
 3511 rhaas                     384 GBC           1 :     if (status == BGWH_POSTMASTER_DIED)
 3511 rhaas                     385 UIC           0 :         ereport(ERROR,
                                386                 :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
                                387                 :                  errmsg("cannot start background processes without postmaster"),
 3511 rhaas                     388 ECB             :                  errhint("Kill all remaining database processes and restart the database.")));
 3511 rhaas                     389 GIC           1 :     Assert(status == BGWH_STARTED);
 3511 rhaas                     390 ECB             : 
 3511 rhaas                     391 GIC           1 :     PG_RETURN_INT32(pid);
                                392                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a