LCOV - differential code coverage report
Current view: top level - src/test/modules/worker_spi - worker_spi.c (source / functions) Coverage Total Hit UIC GBC GIC CBC ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 92.4 % 119 110 9 9 50 51 59
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 6 6 4 2 4
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /* -------------------------------------------------------------------------
       2                 :  *
       3                 :  * worker_spi.c
       4                 :  *      Sample background worker code that demonstrates various coding
       5                 :  *      patterns: establishing a database connection; starting and committing
       6                 :  *      transactions; using GUC variables, and heeding SIGHUP to reread
       7                 :  *      the configuration file; reporting to pg_stat_activity; using the
       8                 :  *      process latch to sleep and exit in case of postmaster death.
       9                 :  *
      10                 :  * This code connects to a database, creates a schema and table, and summarizes
      11                 :  * the numbers contained therein.  To see it working, insert an initial value
      12                 :  * with "total" type and some initial value; then insert some other rows with
      13                 :  * "delta" type.  Delta rows will be deleted by this worker and their values
      14                 :  * aggregated into the total.
      15                 :  *
      16                 :  * Copyright (c) 2013-2023, PostgreSQL Global Development Group
      17                 :  *
      18                 :  * IDENTIFICATION
      19                 :  *      src/test/modules/worker_spi/worker_spi.c
      20                 :  *
      21                 :  * -------------------------------------------------------------------------
      22                 :  */
      23                 : #include "postgres.h"
      24                 : 
      25                 : /* These are always necessary for a bgworker */
      26                 : #include "miscadmin.h"
      27                 : #include "postmaster/bgworker.h"
      28                 : #include "postmaster/interrupt.h"
      29                 : #include "storage/ipc.h"
      30                 : #include "storage/latch.h"
      31                 : #include "storage/lwlock.h"
      32                 : #include "storage/proc.h"
      33                 : #include "storage/shmem.h"
      34                 : 
      35                 : /* these headers are used by this particular worker's code */
      36                 : #include "access/xact.h"
      37                 : #include "executor/spi.h"
      38                 : #include "fmgr.h"
      39                 : #include "lib/stringinfo.h"
      40                 : #include "pgstat.h"
      41                 : #include "utils/builtins.h"
      42                 : #include "utils/snapmgr.h"
      43                 : #include "tcop/utility.h"
      44                 : 
      45 CBC           1 : PG_MODULE_MAGIC;
      46                 : 
      47               2 : PG_FUNCTION_INFO_V1(worker_spi_launch);
      48                 : 
      49                 : PGDLLEXPORT void worker_spi_main(Datum) pg_attribute_noreturn();
      50                 : 
      51                 : /* GUC variables */
      52                 : static int  worker_spi_naptime = 10;
      53                 : static int  worker_spi_total_workers = 2;
      54                 : static char *worker_spi_database = NULL;
      55                 : 
      56                 : 
      57                 : typedef struct worktable
      58                 : {
      59                 :     const char *schema;
      60                 :     const char *name;
      61                 : } worktable;
      62                 : 
      63                 : /*
      64                 :  * Initialize workspace for a worker process: create the schema if it doesn't
      65                 :  * already exist.
      66                 :  */
      67 ECB             : static void
      68 GIC           1 : initialize_worker_spi(worktable *table)
      69                 : {
      70                 :     int         ret;
      71                 :     int         ntup;
      72                 :     bool        isnull;
      73                 :     StringInfoData buf;
      74 ECB             : 
      75 CBC           1 :     SetCurrentStatementStartTimestamp();
      76               1 :     StartTransactionCommand();
      77               1 :     SPI_connect();
      78               1 :     PushActiveSnapshot(GetTransactionSnapshot());
      79 GIC           1 :     pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
      80                 : 
      81 ECB             :     /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
      82 CBC           1 :     initStringInfo(&buf);
      83 GIC           1 :     appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
      84                 :                      table->schema);
      85 ECB             : 
      86 CBC           1 :     debug_query_string = buf.data;
      87               1 :     ret = SPI_execute(buf.data, true, 0);
      88 GBC           1 :     if (ret != SPI_OK_SELECT)
      89 UIC           0 :         elog(FATAL, "SPI_execute failed: error code %d", ret);
      90 ECB             : 
      91 GBC           1 :     if (SPI_processed != 1)
      92 UIC           0 :         elog(FATAL, "not a singleton result");
      93 ECB             : 
      94 CBC           1 :     ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
      95 GIC           1 :                                        SPI_tuptable->tupdesc,
      96 ECB             :                                        1, &isnull));
      97 GBC           1 :     if (isnull)
      98 UIC           0 :         elog(FATAL, "null result");
      99 ECB             : 
     100 GIC           1 :     if (ntup == 0)
     101 ECB             :     {
     102 CBC           1 :         debug_query_string = NULL;
     103               1 :         resetStringInfo(&buf);
     104 GIC           1 :         appendStringInfo(&buf,
     105                 :                          "CREATE SCHEMA \"%s\" "
     106                 :                          "CREATE TABLE \"%s\" ("
     107                 :                          "     type text CHECK (type IN ('total', 'delta')), "
     108                 :                          "     value   integer)"
     109                 :                          "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
     110                 :                          "WHERE type = 'total'",
     111                 :                          table->schema, table->name, table->name, table->name);
     112                 : 
     113 ECB             :         /* set statement start time */
     114 GIC           1 :         SetCurrentStatementStartTimestamp();
     115 ECB             : 
     116 CBC           1 :         debug_query_string = buf.data;
     117 GIC           1 :         ret = SPI_execute(buf.data, false, 0);
     118 ECB             : 
     119 GBC           1 :         if (ret != SPI_OK_UTILITY)
     120 UIC           0 :             elog(FATAL, "failed to create my schema");
     121 ECB             : 
     122 GIC           1 :         debug_query_string = NULL;  /* rest is not statement-specific */
     123                 :     }
     124 ECB             : 
     125 CBC           1 :     SPI_finish();
     126               1 :     PopActiveSnapshot();
     127               1 :     CommitTransactionCommand();
     128               1 :     debug_query_string = NULL;
     129               1 :     pgstat_report_activity(STATE_IDLE, NULL);
     130 GIC           1 : }
     131                 : 
     132 ECB             : void
     133 GIC           3 : worker_spi_main(Datum main_arg)
     134 ECB             : {
     135 GIC           3 :     int         index = DatumGetInt32(main_arg);
     136                 :     worktable  *table;
     137                 :     StringInfoData buf;
     138                 :     char        name[20];
     139 ECB             : 
     140 CBC           3 :     table = palloc(sizeof(worktable));
     141               3 :     sprintf(name, "schema%d", index);
     142               3 :     table->schema = pstrdup(name);
     143 GIC           3 :     table->name = pstrdup("counted");
     144                 : 
     145 ECB             :     /* Establish signal handlers before unblocking signals. */
     146 CBC           3 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
     147 GIC           3 :     pqsignal(SIGTERM, die);
     148                 : 
     149 ECB             :     /* We're now ready to receive signals */
     150 GIC           3 :     BackgroundWorkerUnblockSignals();
     151                 : 
     152 ECB             :     /* Connect to our database */
     153 GIC           3 :     BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
     154 ECB             : 
     155 GIC           1 :     elog(LOG, "%s initialized with %s.%s",
     156 ECB             :          MyBgworkerEntry->bgw_name, table->schema, table->name);
     157 GIC           1 :     initialize_worker_spi(table);
     158                 : 
     159                 :     /*
     160                 :      * Quote identifiers passed to us.  Note that this must be done after
     161                 :      * initialize_worker_spi, because that routine assumes the names are not
     162                 :      * quoted.
     163                 :      *
     164                 :      * Note some memory might be leaked here.
     165 ECB             :      */
     166 CBC           1 :     table->schema = quote_identifier(table->schema);
     167 GIC           1 :     table->name = quote_identifier(table->name);
     168 ECB             : 
     169 CBC           1 :     initStringInfo(&buf);
     170 GIC           1 :     appendStringInfo(&buf,
     171                 :                      "WITH deleted AS (DELETE "
     172                 :                      "FROM %s.%s "
     173                 :                      "WHERE type = 'delta' RETURNING value), "
     174                 :                      "total AS (SELECT coalesce(sum(value), 0) as sum "
     175                 :                      "FROM deleted) "
     176                 :                      "UPDATE %s.%s "
     177                 :                      "SET value = %s.value + total.sum "
     178                 :                      "FROM total WHERE type = 'total' "
     179                 :                      "RETURNING %s.value",
     180                 :                      table->schema, table->name,
     181                 :                      table->schema, table->name,
     182                 :                      table->name,
     183                 :                      table->name);
     184                 : 
     185                 :     /*
     186                 :      * Main loop: do this until SIGTERM is received and processed by
     187                 :      * ProcessInterrupts.
     188                 :      */
     189 ECB             :     for (;;)
     190 GIC           2 :     {
     191                 :         int         ret;
     192                 : 
     193                 :         /*
     194                 :          * Background workers mustn't call usleep() or any direct equivalent:
     195                 :          * instead, they may wait on their process latch, which sleeps as
     196                 :          * necessary, but is awakened if postmaster dies.  That way the
     197                 :          * background process goes away immediately in an emergency.
     198 ECB             :          */
     199 GIC           3 :         (void) WaitLatch(MyLatch,
     200                 :                          WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
     201                 :                          worker_spi_naptime * 1000L,
     202 ECB             :                          PG_WAIT_EXTENSION);
     203 GIC           3 :         ResetLatch(MyLatch);
     204 ECB             : 
     205 GIC           3 :         CHECK_FOR_INTERRUPTS();
     206                 : 
     207                 :         /*
     208                 :          * In case of a SIGHUP, just reload the configuration.
     209 ECB             :          */
     210 GIC           2 :         if (ConfigReloadPending)
     211 ECB             :         {
     212 CBC           1 :             ConfigReloadPending = false;
     213 GIC           1 :             ProcessConfigFile(PGC_SIGHUP);
     214                 :         }
     215                 : 
     216                 :         /*
     217                 :          * Start a transaction on which we can run queries.  Note that each
     218                 :          * StartTransactionCommand() call should be preceded by a
     219                 :          * SetCurrentStatementStartTimestamp() call, which sets both the time
     220                 :          * for the statement we're about the run, and also the transaction
     221                 :          * start time.  Also, each other query sent to SPI should probably be
     222                 :          * preceded by SetCurrentStatementStartTimestamp(), so that statement
     223                 :          * start time is always up to date.
     224                 :          *
     225                 :          * The SPI_connect() call lets us run queries through the SPI manager,
     226                 :          * and the PushActiveSnapshot() call creates an "active" snapshot
     227                 :          * which is necessary for queries to have MVCC data to work on.
     228                 :          *
     229                 :          * The pgstat_report_activity() call makes our activity visible
     230                 :          * through the pgstat views.
     231 ECB             :          */
     232 CBC           2 :         SetCurrentStatementStartTimestamp();
     233               2 :         StartTransactionCommand();
     234               2 :         SPI_connect();
     235               2 :         PushActiveSnapshot(GetTransactionSnapshot());
     236               2 :         debug_query_string = buf.data;
     237 GIC           2 :         pgstat_report_activity(STATE_RUNNING, buf.data);
     238                 : 
     239 ECB             :         /* We can now execute queries via SPI */
     240 GIC           2 :         ret = SPI_execute(buf.data, false, 0);
     241 ECB             : 
     242 GBC           2 :         if (ret != SPI_OK_UPDATE_RETURNING)
     243 UIC           0 :             elog(FATAL, "cannot select from table %s.%s: error code %d",
     244                 :                  table->schema, table->name, ret);
     245 ECB             : 
     246 GIC           2 :         if (SPI_processed > 0)
     247                 :         {
     248                 :             bool        isnull;
     249                 :             int32       val;
     250 ECB             : 
     251 CBC           1 :             val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
     252 GIC           1 :                                               SPI_tuptable->tupdesc,
     253 ECB             :                                               1, &isnull));
     254 CBC           1 :             if (!isnull)
     255 GIC           1 :                 elog(LOG, "%s: count in %s.%s is now %d",
     256                 :                      MyBgworkerEntry->bgw_name,
     257                 :                      table->schema, table->name, val);
     258                 :         }
     259                 : 
     260                 :         /*
     261                 :          * And finish our transaction.
     262 ECB             :          */
     263 CBC           2 :         SPI_finish();
     264               2 :         PopActiveSnapshot();
     265               2 :         CommitTransactionCommand();
     266               2 :         debug_query_string = NULL;
     267               2 :         pgstat_report_stat(true);
     268 GIC           2 :         pgstat_report_activity(STATE_IDLE, NULL);
     269                 :     }
     270                 : 
     271                 :     /* Not reachable */
     272                 : }
     273                 : 
     274                 : /*
     275                 :  * Entrypoint of this module.
     276                 :  *
     277                 :  * We register more than one worker process here, to demonstrate how that can
     278                 :  * be done.
     279                 :  */
     280 ECB             : void
     281 GIC           1 : _PG_init(void)
     282                 : {
     283                 :     BackgroundWorker worker;
     284                 : 
     285 ECB             :     /* get the configuration */
     286 GIC           1 :     DefineCustomIntVariable("worker_spi.naptime",
     287                 :                             "Duration between each check (in seconds).",
     288                 :                             NULL,
     289                 :                             &worker_spi_naptime,
     290                 :                             10,
     291                 :                             1,
     292                 :                             INT_MAX,
     293                 :                             PGC_SIGHUP,
     294                 :                             0,
     295                 :                             NULL,
     296                 :                             NULL,
     297                 :                             NULL);
     298 ECB             : 
     299 GBC           1 :     if (!process_shared_preload_libraries_in_progress)
     300 UIC           0 :         return;
     301 ECB             : 
     302 GIC           1 :     DefineCustomIntVariable("worker_spi.total_workers",
     303                 :                             "Number of workers.",
     304                 :                             NULL,
     305                 :                             &worker_spi_total_workers,
     306                 :                             2,
     307                 :                             1,
     308                 :                             100,
     309                 :                             PGC_POSTMASTER,
     310                 :                             0,
     311                 :                             NULL,
     312                 :                             NULL,
     313                 :                             NULL);
     314 ECB             : 
     315 GIC           1 :     DefineCustomStringVariable("worker_spi.database",
     316                 :                                "Database to connect to.",
     317                 :                                NULL,
     318                 :                                &worker_spi_database,
     319                 :                                "postgres",
     320                 :                                PGC_POSTMASTER,
     321                 :                                0,
     322                 :                                NULL, NULL, NULL);
     323 ECB             : 
     324 GIC           1 :     MarkGUCPrefixReserved("worker_spi");
     325                 : 
     326 ECB             :     /* set up common data for all our workers */
     327 CBC           1 :     memset(&worker, 0, sizeof(worker));
     328 GIC           1 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     329 ECB             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     330 CBC           1 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     331               1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     332               1 :     sprintf(worker.bgw_library_name, "worker_spi");
     333               1 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     334 GIC           1 :     worker.bgw_notify_pid = 0;
     335                 : 
     336                 :     /*
     337                 :      * Now fill in worker-specific data, and do the actual registrations.
     338 ECB             :      */
     339 GIC           3 :     for (int i = 1; i <= worker_spi_total_workers; i++)
     340 ECB             :     {
     341 CBC           2 :         snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     342               2 :         snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     343 GIC           2 :         worker.bgw_main_arg = Int32GetDatum(i);
     344 ECB             : 
     345 GIC           2 :         RegisterBackgroundWorker(&worker);
     346                 :     }
     347                 : }
     348                 : 
     349                 : /*
     350                 :  * Dynamically launch an SPI worker.
     351                 :  */
     352 ECB             : Datum
     353 GIC           1 : worker_spi_launch(PG_FUNCTION_ARGS)
     354 ECB             : {
     355 GIC           1 :     int32       i = PG_GETARG_INT32(0);
     356                 :     BackgroundWorker worker;
     357                 :     BackgroundWorkerHandle *handle;
     358                 :     BgwHandleStatus status;
     359                 :     pid_t       pid;
     360 ECB             : 
     361 CBC           1 :     memset(&worker, 0, sizeof(worker));
     362 GIC           1 :     worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
     363 ECB             :         BGWORKER_BACKEND_DATABASE_CONNECTION;
     364 CBC           1 :     worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
     365               1 :     worker.bgw_restart_time = BGW_NEVER_RESTART;
     366               1 :     sprintf(worker.bgw_library_name, "worker_spi");
     367               1 :     sprintf(worker.bgw_function_name, "worker_spi_main");
     368               1 :     snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
     369               1 :     snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
     370 GIC           1 :     worker.bgw_main_arg = Int32GetDatum(i);
     371 ECB             :     /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
     372 GIC           1 :     worker.bgw_notify_pid = MyProcPid;
     373 ECB             : 
     374 GBC           1 :     if (!RegisterDynamicBackgroundWorker(&worker, &handle))
     375 UIC           0 :         PG_RETURN_NULL();
     376 ECB             : 
     377 GIC           1 :     status = WaitForBackgroundWorkerStartup(handle, &pid);
     378 ECB             : 
     379 GBC           1 :     if (status == BGWH_STOPPED)
     380 UIC           0 :         ereport(ERROR,
     381                 :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     382                 :                  errmsg("could not start background process"),
     383 ECB             :                  errhint("More details may be available in the server log.")));
     384 GBC           1 :     if (status == BGWH_POSTMASTER_DIED)
     385 UIC           0 :         ereport(ERROR,
     386                 :                 (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
     387                 :                  errmsg("cannot start background processes without postmaster"),
     388 ECB             :                  errhint("Kill all remaining database processes and restart the database.")));
     389 GIC           1 :     Assert(status == BGWH_STARTED);
     390 ECB             : 
     391 GIC           1 :     PG_RETURN_INT32(pid);
     392                 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a