Age Owner TLA Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * worker_spi.c
4 : * Sample background worker code that demonstrates various coding
5 : * patterns: establishing a database connection; starting and committing
6 : * transactions; using GUC variables, and heeding SIGHUP to reread
7 : * the configuration file; reporting to pg_stat_activity; using the
8 : * process latch to sleep and exit in case of postmaster death.
9 : *
10 : * This code connects to a database, creates a schema and table, and summarizes
11 : * the numbers contained therein. To see it working, insert an initial value
12 : * with "total" type and some initial value; then insert some other rows with
13 : * "delta" type. Delta rows will be deleted by this worker and their values
14 : * aggregated into the total.
15 : *
16 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
17 : *
18 : * IDENTIFICATION
19 : * src/test/modules/worker_spi/worker_spi.c
20 : *
21 : * -------------------------------------------------------------------------
22 : */
23 : #include "postgres.h"
24 :
25 : /* These are always necessary for a bgworker */
26 : #include "miscadmin.h"
27 : #include "postmaster/bgworker.h"
28 : #include "postmaster/interrupt.h"
29 : #include "storage/ipc.h"
30 : #include "storage/latch.h"
31 : #include "storage/lwlock.h"
32 : #include "storage/proc.h"
33 : #include "storage/shmem.h"
34 :
35 : /* these headers are used by this particular worker's code */
36 : #include "access/xact.h"
37 : #include "executor/spi.h"
38 : #include "fmgr.h"
39 : #include "lib/stringinfo.h"
40 : #include "pgstat.h"
41 : #include "utils/builtins.h"
42 : #include "utils/snapmgr.h"
43 : #include "tcop/utility.h"
44 :
3776 alvherre 45 CBC 1 : PG_MODULE_MAGIC;
46 :
3554 rhaas 47 2 : PG_FUNCTION_INFO_V1(worker_spi_launch);
48 :
49 : PGDLLEXPORT void worker_spi_main(Datum) pg_attribute_noreturn();
50 :
51 : /* GUC variables */
52 : static int worker_spi_naptime = 10;
53 : static int worker_spi_total_workers = 2;
54 : static char *worker_spi_database = NULL;
55 :
56 :
57 : typedef struct worktable
58 : {
59 : const char *schema;
60 : const char *name;
61 : } worktable;
62 :
63 : /*
64 : * Initialize workspace for a worker process: create the schema if it doesn't
65 : * already exist.
66 : */
3776 alvherre 67 ECB : static void
3776 alvherre 68 GIC 1 : initialize_worker_spi(worktable *table)
69 : {
70 : int ret;
71 : int ntup;
72 : bool isnull;
73 : StringInfoData buf;
3776 alvherre 74 ECB :
3651 alvherre 75 CBC 1 : SetCurrentStatementStartTimestamp();
3776 76 1 : StartTransactionCommand();
77 1 : SPI_connect();
78 1 : PushActiveSnapshot(GetTransactionSnapshot());
2047 peter_e 79 GIC 1 : pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
80 :
3651 alvherre 81 ECB : /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
3776 alvherre 82 CBC 1 : initStringInfo(&buf);
3776 alvherre 83 GIC 1 : appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
84 : table->schema);
3776 alvherre 85 ECB :
890 noah 86 CBC 1 : debug_query_string = buf.data;
3776 alvherre 87 1 : ret = SPI_execute(buf.data, true, 0);
3776 alvherre 88 GBC 1 : if (ret != SPI_OK_SELECT)
3776 alvherre 89 UIC 0 : elog(FATAL, "SPI_execute failed: error code %d", ret);
3776 alvherre 90 ECB :
3776 alvherre 91 GBC 1 : if (SPI_processed != 1)
3776 alvherre 92 UIC 0 : elog(FATAL, "not a singleton result");
3776 alvherre 93 ECB :
3505 stark 94 CBC 1 : ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
3776 alvherre 95 GIC 1 : SPI_tuptable->tupdesc,
3776 alvherre 96 ECB : 1, &isnull));
3776 alvherre 97 GBC 1 : if (isnull)
3776 alvherre 98 UIC 0 : elog(FATAL, "null result");
3776 alvherre 99 ECB :
3776 alvherre 100 GIC 1 : if (ntup == 0)
3776 alvherre 101 ECB : {
890 noah 102 CBC 1 : debug_query_string = NULL;
3776 alvherre 103 1 : resetStringInfo(&buf);
3776 alvherre 104 GIC 1 : appendStringInfo(&buf,
105 : "CREATE SCHEMA \"%s\" "
106 : "CREATE TABLE \"%s\" ("
107 : " type text CHECK (type IN ('total', 'delta')), "
108 : " value integer)"
109 : "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
110 : "WHERE type = 'total'",
111 : table->schema, table->name, table->name, table->name);
112 :
3651 alvherre 113 ECB : /* set statement start time */
3651 alvherre 114 GIC 1 : SetCurrentStatementStartTimestamp();
3651 alvherre 115 ECB :
890 noah 116 CBC 1 : debug_query_string = buf.data;
3776 alvherre 117 GIC 1 : ret = SPI_execute(buf.data, false, 0);
3776 alvherre 118 ECB :
3776 alvherre 119 GBC 1 : if (ret != SPI_OK_UTILITY)
3776 alvherre 120 UIC 0 : elog(FATAL, "failed to create my schema");
890 noah 121 ECB :
890 noah 122 GIC 1 : debug_query_string = NULL; /* rest is not statement-specific */
123 : }
3776 alvherre 124 ECB :
3776 alvherre 125 CBC 1 : SPI_finish();
126 1 : PopActiveSnapshot();
127 1 : CommitTransactionCommand();
890 noah 128 1 : debug_query_string = NULL;
3651 alvherre 129 1 : pgstat_report_activity(STATE_IDLE, NULL);
3776 alvherre 130 GIC 1 : }
131 :
3554 rhaas 132 ECB : void
3554 rhaas 133 GIC 3 : worker_spi_main(Datum main_arg)
3776 alvherre 134 ECB : {
3554 rhaas 135 GIC 3 : int index = DatumGetInt32(main_arg);
136 : worktable *table;
137 : StringInfoData buf;
138 : char name[20];
3554 rhaas 139 ECB :
3554 rhaas 140 CBC 3 : table = palloc(sizeof(worktable));
141 3 : sprintf(name, "schema%d", index);
142 3 : table->schema = pstrdup(name);
3554 rhaas 143 GIC 3 : table->name = pstrdup("counted");
144 :
3554 rhaas 145 ECB : /* Establish signal handlers before unblocking signals. */
863 fujii 146 CBC 3 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
863 fujii 147 GIC 3 : pqsignal(SIGTERM, die);
148 :
3776 alvherre 149 ECB : /* We're now ready to receive signals */
3776 alvherre 150 GIC 3 : BackgroundWorkerUnblockSignals();
151 :
3776 alvherre 152 ECB : /* Connect to our database */
1407 alvherre 153 GIC 3 : BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
3776 alvherre 154 ECB :
3776 alvherre 155 GIC 1 : elog(LOG, "%s initialized with %s.%s",
3776 alvherre 156 ECB : MyBgworkerEntry->bgw_name, table->schema, table->name);
3776 alvherre 157 GIC 1 : initialize_worker_spi(table);
158 :
159 : /*
160 : * Quote identifiers passed to us. Note that this must be done after
161 : * initialize_worker_spi, because that routine assumes the names are not
162 : * quoted.
163 : *
164 : * Note some memory might be leaked here.
3776 alvherre 165 ECB : */
3776 alvherre 166 CBC 1 : table->schema = quote_identifier(table->schema);
3776 alvherre 167 GIC 1 : table->name = quote_identifier(table->name);
3776 alvherre 168 ECB :
3776 alvherre 169 CBC 1 : initStringInfo(&buf);
3776 alvherre 170 GIC 1 : appendStringInfo(&buf,
171 : "WITH deleted AS (DELETE "
172 : "FROM %s.%s "
173 : "WHERE type = 'delta' RETURNING value), "
174 : "total AS (SELECT coalesce(sum(value), 0) as sum "
175 : "FROM deleted) "
176 : "UPDATE %s.%s "
177 : "SET value = %s.value + total.sum "
178 : "FROM total WHERE type = 'total' "
179 : "RETURNING %s.value",
180 : table->schema, table->name,
181 : table->schema, table->name,
182 : table->name,
183 : table->name);
184 :
185 : /*
186 : * Main loop: do this until SIGTERM is received and processed by
187 : * ProcessInterrupts.
188 : */
863 fujii 189 ECB : for (;;)
3776 alvherre 190 GIC 2 : {
191 : int ret;
192 :
193 : /*
194 : * Background workers mustn't call usleep() or any direct equivalent:
195 : * instead, they may wait on their process latch, which sleeps as
196 : * necessary, but is awakened if postmaster dies. That way the
197 : * background process goes away immediately in an emergency.
3776 alvherre 198 ECB : */
1598 alvherre 199 GIC 3 : (void) WaitLatch(MyLatch,
200 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
201 : worker_spi_naptime * 1000L,
1598 alvherre 202 ECB : PG_WAIT_EXTENSION);
3007 andres 203 GIC 3 : ResetLatch(MyLatch);
3776 alvherre 204 ECB :
2133 andres 205 GIC 3 : CHECK_FOR_INTERRUPTS();
206 :
207 : /*
208 : * In case of a SIGHUP, just reload the configuration.
3651 alvherre 209 ECB : */
863 fujii 210 GIC 2 : if (ConfigReloadPending)
3602 bruce 211 ECB : {
863 fujii 212 CBC 1 : ConfigReloadPending = false;
3602 bruce 213 GIC 1 : ProcessConfigFile(PGC_SIGHUP);
214 : }
215 :
216 : /*
217 : * Start a transaction on which we can run queries. Note that each
218 : * StartTransactionCommand() call should be preceded by a
219 : * SetCurrentStatementStartTimestamp() call, which sets both the time
220 : * for the statement we're about the run, and also the transaction
221 : * start time. Also, each other query sent to SPI should probably be
222 : * preceded by SetCurrentStatementStartTimestamp(), so that statement
223 : * start time is always up to date.
224 : *
225 : * The SPI_connect() call lets us run queries through the SPI manager,
226 : * and the PushActiveSnapshot() call creates an "active" snapshot
227 : * which is necessary for queries to have MVCC data to work on.
228 : *
229 : * The pgstat_report_activity() call makes our activity visible
230 : * through the pgstat views.
3651 alvherre 231 ECB : */
3651 alvherre 232 CBC 2 : SetCurrentStatementStartTimestamp();
3776 233 2 : StartTransactionCommand();
234 2 : SPI_connect();
235 2 : PushActiveSnapshot(GetTransactionSnapshot());
890 noah 236 2 : debug_query_string = buf.data;
3651 alvherre 237 GIC 2 : pgstat_report_activity(STATE_RUNNING, buf.data);
238 :
3651 alvherre 239 ECB : /* We can now execute queries via SPI */
3776 alvherre 240 GIC 2 : ret = SPI_execute(buf.data, false, 0);
3776 alvherre 241 ECB :
3776 alvherre 242 GBC 2 : if (ret != SPI_OK_UPDATE_RETURNING)
3776 alvherre 243 UIC 0 : elog(FATAL, "cannot select from table %s.%s: error code %d",
244 : table->schema, table->name, ret);
3776 alvherre 245 ECB :
3776 alvherre 246 GIC 2 : if (SPI_processed > 0)
247 : {
248 : bool isnull;
249 : int32 val;
3776 alvherre 250 ECB :
3776 alvherre 251 CBC 1 : val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
3602 bruce 252 GIC 1 : SPI_tuptable->tupdesc,
3602 bruce 253 ECB : 1, &isnull));
3776 alvherre 254 CBC 1 : if (!isnull)
3776 alvherre 255 GIC 1 : elog(LOG, "%s: count in %s.%s is now %d",
256 : MyBgworkerEntry->bgw_name,
257 : table->schema, table->name, val);
258 : }
259 :
260 : /*
261 : * And finish our transaction.
3651 alvherre 262 ECB : */
3776 alvherre 263 CBC 2 : SPI_finish();
264 2 : PopActiveSnapshot();
265 2 : CommitTransactionCommand();
890 noah 266 2 : debug_query_string = NULL;
368 andres 267 2 : pgstat_report_stat(true);
3651 alvherre 268 GIC 2 : pgstat_report_activity(STATE_IDLE, NULL);
269 : }
270 :
271 : /* Not reachable */
272 : }
273 :
274 : /*
275 : * Entrypoint of this module.
276 : *
277 : * We register more than one worker process here, to demonstrate how that can
278 : * be done.
279 : */
3776 alvherre 280 ECB : void
3776 alvherre 281 GIC 1 : _PG_init(void)
282 : {
283 : BackgroundWorker worker;
284 :
3651 alvherre 285 ECB : /* get the configuration */
3651 alvherre 286 GIC 1 : DefineCustomIntVariable("worker_spi.naptime",
287 : "Duration between each check (in seconds).",
288 : NULL,
289 : &worker_spi_naptime,
290 : 10,
291 : 1,
292 : INT_MAX,
293 : PGC_SIGHUP,
294 : 0,
295 : NULL,
296 : NULL,
297 : NULL);
3554 rhaas 298 ECB :
3554 rhaas 299 GBC 1 : if (!process_shared_preload_libraries_in_progress)
3554 rhaas 300 UIC 0 : return;
3554 rhaas 301 ECB :
3651 alvherre 302 GIC 1 : DefineCustomIntVariable("worker_spi.total_workers",
303 : "Number of workers.",
304 : NULL,
305 : &worker_spi_total_workers,
306 : 2,
307 : 1,
308 : 100,
309 : PGC_POSTMASTER,
310 : 0,
311 : NULL,
312 : NULL,
313 : NULL);
3651 alvherre 314 ECB :
1407 alvherre 315 GIC 1 : DefineCustomStringVariable("worker_spi.database",
316 : "Database to connect to.",
317 : NULL,
318 : &worker_spi_database,
319 : "postgres",
320 : PGC_POSTMASTER,
321 : 0,
322 : NULL, NULL, NULL);
1407 alvherre 323 ECB :
412 tgl 324 GIC 1 : MarkGUCPrefixReserved("worker_spi");
325 :
3651 alvherre 326 ECB : /* set up common data for all our workers */
2184 tgl 327 CBC 1 : memset(&worker, 0, sizeof(worker));
3776 alvherre 328 GIC 1 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
3776 alvherre 329 ECB : BGWORKER_BACKEND_DATABASE_CONNECTION;
3776 alvherre 330 CBC 1 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
3651 331 1 : worker.bgw_restart_time = BGW_NEVER_RESTART;
2200 rhaas 332 1 : sprintf(worker.bgw_library_name, "worker_spi");
333 1 : sprintf(worker.bgw_function_name, "worker_spi_main");
3240 rhaas 334 GIC 1 : worker.bgw_notify_pid = 0;
335 :
336 : /*
337 : * Now fill in worker-specific data, and do the actual registrations.
3776 alvherre 338 ECB : */
550 peter 339 GIC 3 : for (int i = 1; i <= worker_spi_total_workers; i++)
3651 alvherre 340 ECB : {
2047 peter_e 341 CBC 2 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
342 2 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
3554 rhaas 343 GIC 2 : worker.bgw_main_arg = Int32GetDatum(i);
3651 alvherre 344 ECB :
3651 alvherre 345 GIC 2 : RegisterBackgroundWorker(&worker);
346 : }
347 : }
348 :
349 : /*
350 : * Dynamically launch an SPI worker.
351 : */
3554 rhaas 352 ECB : Datum
3554 rhaas 353 GIC 1 : worker_spi_launch(PG_FUNCTION_ARGS)
3554 rhaas 354 ECB : {
3554 rhaas 355 GIC 1 : int32 i = PG_GETARG_INT32(0);
356 : BackgroundWorker worker;
357 : BackgroundWorkerHandle *handle;
358 : BgwHandleStatus status;
359 : pid_t pid;
3554 rhaas 360 ECB :
2184 tgl 361 CBC 1 : memset(&worker, 0, sizeof(worker));
3554 rhaas 362 GIC 1 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
3554 rhaas 363 ECB : BGWORKER_BACKEND_DATABASE_CONNECTION;
3554 rhaas 364 CBC 1 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
365 1 : worker.bgw_restart_time = BGW_NEVER_RESTART;
366 1 : sprintf(worker.bgw_library_name, "worker_spi");
367 1 : sprintf(worker.bgw_function_name, "worker_spi_main");
2047 peter_e 368 1 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
369 1 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
3554 rhaas 370 GIC 1 : worker.bgw_main_arg = Int32GetDatum(i);
3511 rhaas 371 ECB : /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
3511 rhaas 372 GIC 1 : worker.bgw_notify_pid = MyProcPid;
3511 rhaas 373 ECB :
3511 rhaas 374 GBC 1 : if (!RegisterDynamicBackgroundWorker(&worker, &handle))
3511 rhaas 375 UIC 0 : PG_RETURN_NULL();
3511 rhaas 376 ECB :
3511 rhaas 377 GIC 1 : status = WaitForBackgroundWorkerStartup(handle, &pid);
3511 rhaas 378 ECB :
3511 rhaas 379 GBC 1 : if (status == BGWH_STOPPED)
3511 rhaas 380 UIC 0 : ereport(ERROR,
381 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
382 : errmsg("could not start background process"),
2118 tgl 383 ECB : errhint("More details may be available in the server log.")));
3511 rhaas 384 GBC 1 : if (status == BGWH_POSTMASTER_DIED)
3511 rhaas 385 UIC 0 : ereport(ERROR,
386 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
387 : errmsg("cannot start background processes without postmaster"),
3511 rhaas 388 ECB : errhint("Kill all remaining database processes and restart the database.")));
3511 rhaas 389 GIC 1 : Assert(status == BGWH_STARTED);
3511 rhaas 390 ECB :
3511 rhaas 391 GIC 1 : PG_RETURN_INT32(pid);
392 : }
|