TLA Line data Source code
1 : /* -------------------------------------------------------------------------
2 : *
3 : * worker_spi.c
4 : * Sample background worker code that demonstrates various coding
5 : * patterns: establishing a database connection; starting and committing
6 : * transactions; using GUC variables, and heeding SIGHUP to reread
7 : * the configuration file; reporting to pg_stat_activity; using the
8 : * process latch to sleep and exit in case of postmaster death.
9 : *
10 : * This code connects to a database, creates a schema and table, and summarizes
11 : * the numbers contained therein. To see it working, insert an initial value
12 : * with "total" type and some initial value; then insert some other rows with
13 : * "delta" type. Delta rows will be deleted by this worker and their values
14 : * aggregated into the total.
15 : *
16 : * Copyright (c) 2013-2023, PostgreSQL Global Development Group
17 : *
18 : * IDENTIFICATION
19 : * src/test/modules/worker_spi/worker_spi.c
20 : *
21 : * -------------------------------------------------------------------------
22 : */
23 : #include "postgres.h"
24 :
25 : /* These are always necessary for a bgworker */
26 : #include "miscadmin.h"
27 : #include "postmaster/bgworker.h"
28 : #include "postmaster/interrupt.h"
29 : #include "storage/ipc.h"
30 : #include "storage/latch.h"
31 : #include "storage/lwlock.h"
32 : #include "storage/proc.h"
33 : #include "storage/shmem.h"
34 :
35 : /* these headers are used by this particular worker's code */
36 : #include "access/xact.h"
37 : #include "executor/spi.h"
38 : #include "fmgr.h"
39 : #include "lib/stringinfo.h"
40 : #include "pgstat.h"
41 : #include "utils/builtins.h"
42 : #include "utils/snapmgr.h"
43 : #include "tcop/utility.h"
44 :
45 CBC 1 : PG_MODULE_MAGIC;
46 :
47 2 : PG_FUNCTION_INFO_V1(worker_spi_launch);
48 :
49 : PGDLLEXPORT void worker_spi_main(Datum) pg_attribute_noreturn();
50 :
51 : /* GUC variables */
52 : static int worker_spi_naptime = 10;
53 : static int worker_spi_total_workers = 2;
54 : static char *worker_spi_database = NULL;
55 :
56 :
57 : typedef struct worktable
58 : {
59 : const char *schema;
60 : const char *name;
61 : } worktable;
62 :
63 : /*
64 : * Initialize workspace for a worker process: create the schema if it doesn't
65 : * already exist.
66 : */
67 ECB : static void
68 GIC 1 : initialize_worker_spi(worktable *table)
69 : {
70 : int ret;
71 : int ntup;
72 : bool isnull;
73 : StringInfoData buf;
74 ECB :
75 CBC 1 : SetCurrentStatementStartTimestamp();
76 1 : StartTransactionCommand();
77 1 : SPI_connect();
78 1 : PushActiveSnapshot(GetTransactionSnapshot());
79 GIC 1 : pgstat_report_activity(STATE_RUNNING, "initializing worker_spi schema");
80 :
81 ECB : /* XXX could we use CREATE SCHEMA IF NOT EXISTS? */
82 CBC 1 : initStringInfo(&buf);
83 GIC 1 : appendStringInfo(&buf, "select count(*) from pg_namespace where nspname = '%s'",
84 : table->schema);
85 ECB :
86 CBC 1 : debug_query_string = buf.data;
87 1 : ret = SPI_execute(buf.data, true, 0);
88 GBC 1 : if (ret != SPI_OK_SELECT)
89 UIC 0 : elog(FATAL, "SPI_execute failed: error code %d", ret);
90 ECB :
91 GBC 1 : if (SPI_processed != 1)
92 UIC 0 : elog(FATAL, "not a singleton result");
93 ECB :
94 CBC 1 : ntup = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[0],
95 GIC 1 : SPI_tuptable->tupdesc,
96 ECB : 1, &isnull));
97 GBC 1 : if (isnull)
98 UIC 0 : elog(FATAL, "null result");
99 ECB :
100 GIC 1 : if (ntup == 0)
101 ECB : {
102 CBC 1 : debug_query_string = NULL;
103 1 : resetStringInfo(&buf);
104 GIC 1 : appendStringInfo(&buf,
105 : "CREATE SCHEMA \"%s\" "
106 : "CREATE TABLE \"%s\" ("
107 : " type text CHECK (type IN ('total', 'delta')), "
108 : " value integer)"
109 : "CREATE UNIQUE INDEX \"%s_unique_total\" ON \"%s\" (type) "
110 : "WHERE type = 'total'",
111 : table->schema, table->name, table->name, table->name);
112 :
113 ECB : /* set statement start time */
114 GIC 1 : SetCurrentStatementStartTimestamp();
115 ECB :
116 CBC 1 : debug_query_string = buf.data;
117 GIC 1 : ret = SPI_execute(buf.data, false, 0);
118 ECB :
119 GBC 1 : if (ret != SPI_OK_UTILITY)
120 UIC 0 : elog(FATAL, "failed to create my schema");
121 ECB :
122 GIC 1 : debug_query_string = NULL; /* rest is not statement-specific */
123 : }
124 ECB :
125 CBC 1 : SPI_finish();
126 1 : PopActiveSnapshot();
127 1 : CommitTransactionCommand();
128 1 : debug_query_string = NULL;
129 1 : pgstat_report_activity(STATE_IDLE, NULL);
130 GIC 1 : }
131 :
132 ECB : void
133 GIC 3 : worker_spi_main(Datum main_arg)
134 ECB : {
135 GIC 3 : int index = DatumGetInt32(main_arg);
136 : worktable *table;
137 : StringInfoData buf;
138 : char name[20];
139 ECB :
140 CBC 3 : table = palloc(sizeof(worktable));
141 3 : sprintf(name, "schema%d", index);
142 3 : table->schema = pstrdup(name);
143 GIC 3 : table->name = pstrdup("counted");
144 :
145 ECB : /* Establish signal handlers before unblocking signals. */
146 CBC 3 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
147 GIC 3 : pqsignal(SIGTERM, die);
148 :
149 ECB : /* We're now ready to receive signals */
150 GIC 3 : BackgroundWorkerUnblockSignals();
151 :
152 ECB : /* Connect to our database */
153 GIC 3 : BackgroundWorkerInitializeConnection(worker_spi_database, NULL, 0);
154 ECB :
155 GIC 1 : elog(LOG, "%s initialized with %s.%s",
156 ECB : MyBgworkerEntry->bgw_name, table->schema, table->name);
157 GIC 1 : initialize_worker_spi(table);
158 :
159 : /*
160 : * Quote identifiers passed to us. Note that this must be done after
161 : * initialize_worker_spi, because that routine assumes the names are not
162 : * quoted.
163 : *
164 : * Note some memory might be leaked here.
165 ECB : */
166 CBC 1 : table->schema = quote_identifier(table->schema);
167 GIC 1 : table->name = quote_identifier(table->name);
168 ECB :
169 CBC 1 : initStringInfo(&buf);
170 GIC 1 : appendStringInfo(&buf,
171 : "WITH deleted AS (DELETE "
172 : "FROM %s.%s "
173 : "WHERE type = 'delta' RETURNING value), "
174 : "total AS (SELECT coalesce(sum(value), 0) as sum "
175 : "FROM deleted) "
176 : "UPDATE %s.%s "
177 : "SET value = %s.value + total.sum "
178 : "FROM total WHERE type = 'total' "
179 : "RETURNING %s.value",
180 : table->schema, table->name,
181 : table->schema, table->name,
182 : table->name,
183 : table->name);
184 :
185 : /*
186 : * Main loop: do this until SIGTERM is received and processed by
187 : * ProcessInterrupts.
188 : */
189 ECB : for (;;)
190 GIC 2 : {
191 : int ret;
192 :
193 : /*
194 : * Background workers mustn't call usleep() or any direct equivalent:
195 : * instead, they may wait on their process latch, which sleeps as
196 : * necessary, but is awakened if postmaster dies. That way the
197 : * background process goes away immediately in an emergency.
198 ECB : */
199 GIC 3 : (void) WaitLatch(MyLatch,
200 : WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
201 : worker_spi_naptime * 1000L,
202 ECB : PG_WAIT_EXTENSION);
203 GIC 3 : ResetLatch(MyLatch);
204 ECB :
205 GIC 3 : CHECK_FOR_INTERRUPTS();
206 :
207 : /*
208 : * In case of a SIGHUP, just reload the configuration.
209 ECB : */
210 GIC 2 : if (ConfigReloadPending)
211 ECB : {
212 CBC 1 : ConfigReloadPending = false;
213 GIC 1 : ProcessConfigFile(PGC_SIGHUP);
214 : }
215 :
216 : /*
217 : * Start a transaction on which we can run queries. Note that each
218 : * StartTransactionCommand() call should be preceded by a
219 : * SetCurrentStatementStartTimestamp() call, which sets both the time
220 : * for the statement we're about the run, and also the transaction
221 : * start time. Also, each other query sent to SPI should probably be
222 : * preceded by SetCurrentStatementStartTimestamp(), so that statement
223 : * start time is always up to date.
224 : *
225 : * The SPI_connect() call lets us run queries through the SPI manager,
226 : * and the PushActiveSnapshot() call creates an "active" snapshot
227 : * which is necessary for queries to have MVCC data to work on.
228 : *
229 : * The pgstat_report_activity() call makes our activity visible
230 : * through the pgstat views.
231 ECB : */
232 CBC 2 : SetCurrentStatementStartTimestamp();
233 2 : StartTransactionCommand();
234 2 : SPI_connect();
235 2 : PushActiveSnapshot(GetTransactionSnapshot());
236 2 : debug_query_string = buf.data;
237 GIC 2 : pgstat_report_activity(STATE_RUNNING, buf.data);
238 :
239 ECB : /* We can now execute queries via SPI */
240 GIC 2 : ret = SPI_execute(buf.data, false, 0);
241 ECB :
242 GBC 2 : if (ret != SPI_OK_UPDATE_RETURNING)
243 UIC 0 : elog(FATAL, "cannot select from table %s.%s: error code %d",
244 : table->schema, table->name, ret);
245 ECB :
246 GIC 2 : if (SPI_processed > 0)
247 : {
248 : bool isnull;
249 : int32 val;
250 ECB :
251 CBC 1 : val = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
252 GIC 1 : SPI_tuptable->tupdesc,
253 ECB : 1, &isnull));
254 CBC 1 : if (!isnull)
255 GIC 1 : elog(LOG, "%s: count in %s.%s is now %d",
256 : MyBgworkerEntry->bgw_name,
257 : table->schema, table->name, val);
258 : }
259 :
260 : /*
261 : * And finish our transaction.
262 ECB : */
263 CBC 2 : SPI_finish();
264 2 : PopActiveSnapshot();
265 2 : CommitTransactionCommand();
266 2 : debug_query_string = NULL;
267 2 : pgstat_report_stat(true);
268 GIC 2 : pgstat_report_activity(STATE_IDLE, NULL);
269 : }
270 :
271 : /* Not reachable */
272 : }
273 :
274 : /*
275 : * Entrypoint of this module.
276 : *
277 : * We register more than one worker process here, to demonstrate how that can
278 : * be done.
279 : */
280 ECB : void
281 GIC 1 : _PG_init(void)
282 : {
283 : BackgroundWorker worker;
284 :
285 ECB : /* get the configuration */
286 GIC 1 : DefineCustomIntVariable("worker_spi.naptime",
287 : "Duration between each check (in seconds).",
288 : NULL,
289 : &worker_spi_naptime,
290 : 10,
291 : 1,
292 : INT_MAX,
293 : PGC_SIGHUP,
294 : 0,
295 : NULL,
296 : NULL,
297 : NULL);
298 ECB :
299 GBC 1 : if (!process_shared_preload_libraries_in_progress)
300 UIC 0 : return;
301 ECB :
302 GIC 1 : DefineCustomIntVariable("worker_spi.total_workers",
303 : "Number of workers.",
304 : NULL,
305 : &worker_spi_total_workers,
306 : 2,
307 : 1,
308 : 100,
309 : PGC_POSTMASTER,
310 : 0,
311 : NULL,
312 : NULL,
313 : NULL);
314 ECB :
315 GIC 1 : DefineCustomStringVariable("worker_spi.database",
316 : "Database to connect to.",
317 : NULL,
318 : &worker_spi_database,
319 : "postgres",
320 : PGC_POSTMASTER,
321 : 0,
322 : NULL, NULL, NULL);
323 ECB :
324 GIC 1 : MarkGUCPrefixReserved("worker_spi");
325 :
326 ECB : /* set up common data for all our workers */
327 CBC 1 : memset(&worker, 0, sizeof(worker));
328 GIC 1 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
329 ECB : BGWORKER_BACKEND_DATABASE_CONNECTION;
330 CBC 1 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
331 1 : worker.bgw_restart_time = BGW_NEVER_RESTART;
332 1 : sprintf(worker.bgw_library_name, "worker_spi");
333 1 : sprintf(worker.bgw_function_name, "worker_spi_main");
334 GIC 1 : worker.bgw_notify_pid = 0;
335 :
336 : /*
337 : * Now fill in worker-specific data, and do the actual registrations.
338 ECB : */
339 GIC 3 : for (int i = 1; i <= worker_spi_total_workers; i++)
340 ECB : {
341 CBC 2 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
342 2 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
343 GIC 2 : worker.bgw_main_arg = Int32GetDatum(i);
344 ECB :
345 GIC 2 : RegisterBackgroundWorker(&worker);
346 : }
347 : }
348 :
349 : /*
350 : * Dynamically launch an SPI worker.
351 : */
352 ECB : Datum
353 GIC 1 : worker_spi_launch(PG_FUNCTION_ARGS)
354 ECB : {
355 GIC 1 : int32 i = PG_GETARG_INT32(0);
356 : BackgroundWorker worker;
357 : BackgroundWorkerHandle *handle;
358 : BgwHandleStatus status;
359 : pid_t pid;
360 ECB :
361 CBC 1 : memset(&worker, 0, sizeof(worker));
362 GIC 1 : worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
363 ECB : BGWORKER_BACKEND_DATABASE_CONNECTION;
364 CBC 1 : worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
365 1 : worker.bgw_restart_time = BGW_NEVER_RESTART;
366 1 : sprintf(worker.bgw_library_name, "worker_spi");
367 1 : sprintf(worker.bgw_function_name, "worker_spi_main");
368 1 : snprintf(worker.bgw_name, BGW_MAXLEN, "worker_spi worker %d", i);
369 1 : snprintf(worker.bgw_type, BGW_MAXLEN, "worker_spi");
370 GIC 1 : worker.bgw_main_arg = Int32GetDatum(i);
371 ECB : /* set bgw_notify_pid so that we can use WaitForBackgroundWorkerStartup */
372 GIC 1 : worker.bgw_notify_pid = MyProcPid;
373 ECB :
374 GBC 1 : if (!RegisterDynamicBackgroundWorker(&worker, &handle))
375 UIC 0 : PG_RETURN_NULL();
376 ECB :
377 GIC 1 : status = WaitForBackgroundWorkerStartup(handle, &pid);
378 ECB :
379 GBC 1 : if (status == BGWH_STOPPED)
380 UIC 0 : ereport(ERROR,
381 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
382 : errmsg("could not start background process"),
383 ECB : errhint("More details may be available in the server log.")));
384 GBC 1 : if (status == BGWH_POSTMASTER_DIED)
385 UIC 0 : ereport(ERROR,
386 : (errcode(ERRCODE_INSUFFICIENT_RESOURCES),
387 : errmsg("cannot start background processes without postmaster"),
388 ECB : errhint("Kill all remaining database processes and restart the database.")));
389 GIC 1 : Assert(status == BGWH_STARTED);
390 ECB :
391 GIC 1 : PG_RETURN_INT32(pid);
392 : }
|