Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.h
4 : * Exports from replication/walreceiverfuncs.c.
5 : *
6 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
7 : *
8 : * src/include/replication/walreceiver.h
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #ifndef _WALRECEIVER_H
13 : #define _WALRECEIVER_H
14 :
15 : #include <netdb.h>
16 : #include <sys/socket.h>
17 :
18 : #include "access/xlog.h"
19 : #include "access/xlogdefs.h"
20 : #include "pgtime.h"
21 : #include "port/atomics.h"
22 : #include "replication/logicalproto.h"
23 : #include "replication/walsender.h"
24 : #include "storage/condition_variable.h"
25 : #include "storage/latch.h"
26 : #include "storage/spin.h"
27 : #include "utils/tuplestore.h"
28 :
29 : /* user-settable parameters */
30 : extern PGDLLIMPORT int wal_receiver_status_interval;
31 : extern PGDLLIMPORT int wal_receiver_timeout;
32 : extern PGDLLIMPORT bool hot_standby_feedback;
33 :
34 : /*
35 : * MAXCONNINFO: maximum size of a connection string.
36 : *
37 : * XXX: Should this move to pg_config_manual.h?
38 : */
39 : #define MAXCONNINFO 1024
40 :
41 : /* Can we allow the standby to accept replication connection from another standby? */
42 : #define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
43 :
44 : /*
45 : * Values for WalRcv->walRcvState.
46 : */
47 : typedef enum
48 : {
49 : WALRCV_STOPPED, /* stopped and mustn't start up again */
50 : WALRCV_STARTING, /* launched, but the process hasn't
51 : * initialized yet */
52 : WALRCV_STREAMING, /* walreceiver is streaming */
53 : WALRCV_WAITING, /* stopped streaming, waiting for orders */
54 : WALRCV_RESTARTING, /* asked to restart streaming */
55 : WALRCV_STOPPING /* requested to stop, but still running */
56 : } WalRcvState;
57 :
58 : /* Shared memory area for management of walreceiver process */
59 : typedef struct
60 : {
61 : /*
62 : * PID of currently active walreceiver process, its current state and
63 : * start time (actually, the time at which it was requested to be
64 : * started).
65 : */
66 : pid_t pid;
67 : WalRcvState walRcvState;
68 : ConditionVariable walRcvStoppedCV;
69 : pg_time_t startTime;
70 :
71 : /*
72 : * receiveStart and receiveStartTLI indicate the first byte position and
73 : * timeline that will be received. When startup process starts the
74 : * walreceiver, it sets these to the point where it wants the streaming to
75 : * begin.
76 : */
77 : XLogRecPtr receiveStart;
78 : TimeLineID receiveStartTLI;
79 :
80 : /*
81 : * flushedUpto-1 is the last byte position that has already been received,
82 : * and receivedTLI is the timeline it came from. At the first startup of
83 : * walreceiver, these are set to receiveStart and receiveStartTLI. After
84 : * that, walreceiver updates these whenever it flushes the received WAL to
85 : * disk.
86 : */
87 : XLogRecPtr flushedUpto;
88 : TimeLineID receivedTLI;
89 :
90 : /*
91 : * latestChunkStart is the starting byte position of the current "batch"
92 : * of received WAL. It's actually the same as the previous value of
93 : * flushedUpto before the last flush to disk. Startup process can use
94 : * this to detect whether it's keeping up or not.
95 : */
96 : XLogRecPtr latestChunkStart;
97 :
98 : /*
99 : * Time of send and receive of any message received.
100 : */
101 : TimestampTz lastMsgSendTime;
102 : TimestampTz lastMsgReceiptTime;
103 :
104 : /*
105 : * Latest reported end of WAL on the sender
106 : */
107 : XLogRecPtr latestWalEnd;
108 : TimestampTz latestWalEndTime;
109 :
110 : /*
111 : * connection string; initially set to connect to the primary, and later
112 : * clobbered to hide security-sensitive fields.
113 : */
114 : char conninfo[MAXCONNINFO];
115 :
116 : /*
117 : * Host name (this can be a host name, an IP address, or a directory path)
118 : * and port number of the active replication connection.
119 : */
120 : char sender_host[NI_MAXHOST];
121 : int sender_port;
122 :
123 : /*
124 : * replication slot name; is also used for walreceiver to connect with the
125 : * primary
126 : */
127 : char slotname[NAMEDATALEN];
128 :
129 : /*
130 : * If it's a temporary replication slot, it needs to be recreated when
131 : * connecting.
132 : */
133 : bool is_temp_slot;
134 :
135 : /* set true once conninfo is ready to display (obfuscated pwds etc) */
136 : bool ready_to_display;
137 :
138 : /*
139 : * Latch used by startup process to wake up walreceiver after telling it
140 : * where to start streaming (after setting receiveStart and
141 : * receiveStartTLI), and also to tell it to send apply feedback to the
142 : * primary whenever specially marked commit records are applied. This is
143 : * normally mapped to procLatch when walreceiver is running.
144 : */
145 : Latch *latch;
146 :
147 : slock_t mutex; /* locks shared variables shown above */
148 :
149 : /*
150 : * Like flushedUpto, but advanced after writing and before flushing,
151 : * without the need to acquire the spin lock. Data can be read by another
152 : * process up to this point, but shouldn't be used for data integrity
153 : * purposes.
154 : */
155 : pg_atomic_uint64 writtenUpto;
156 :
157 : /*
158 : * force walreceiver reply? This doesn't need to be locked; memory
159 : * barriers for ordering are sufficient. But we do need atomic fetch and
160 : * store semantics, so use sig_atomic_t.
161 : */
162 : sig_atomic_t force_reply; /* used as a bool */
163 : } WalRcvData;
164 :
165 : extern PGDLLIMPORT WalRcvData *WalRcv;
166 :
167 : typedef struct
168 : {
169 : bool logical; /* True if this is logical replication stream,
170 : * false if physical stream. */
171 : char *slotname; /* Name of the replication slot or NULL. */
172 : XLogRecPtr startpoint; /* LSN of starting point. */
173 :
174 : union
175 : {
176 : struct
177 : {
178 : TimeLineID startpointTLI; /* Starting timeline */
179 : } physical;
180 : struct
181 : {
182 : uint32 proto_version; /* Logical protocol version */
183 : List *publication_names; /* String list of publications */
184 : bool binary; /* Ask publisher to use binary */
185 : char *streaming_str; /* Streaming of large transactions */
186 : bool twophase; /* Streaming of two-phase transactions at
187 : * prepare time */
188 : char *origin; /* Only publish data originating from the
189 : * specified origin */
190 : } logical;
191 : } proto;
192 : } WalRcvStreamOptions;
193 :
194 : struct WalReceiverConn;
195 : typedef struct WalReceiverConn WalReceiverConn;
196 :
197 : /*
198 : * Status of walreceiver query execution.
199 : *
200 : * We only define statuses that are currently used.
201 : */
202 : typedef enum
203 : {
204 : WALRCV_ERROR, /* There was error when executing the query. */
205 : WALRCV_OK_COMMAND, /* Query executed utility or replication
206 : * command. */
207 : WALRCV_OK_TUPLES, /* Query returned tuples. */
208 : WALRCV_OK_COPY_IN, /* Query started COPY FROM. */
209 : WALRCV_OK_COPY_OUT, /* Query started COPY TO. */
210 : WALRCV_OK_COPY_BOTH /* Query started COPY BOTH replication
211 : * protocol. */
212 : } WalRcvExecStatus;
213 :
214 : /*
215 : * Return value for walrcv_exec, returns the status of the execution and
216 : * tuples if any.
217 : */
218 : typedef struct WalRcvExecResult
219 : {
220 : WalRcvExecStatus status;
221 : int sqlstate;
222 : char *err;
223 : Tuplestorestate *tuplestore;
224 : TupleDesc tupledesc;
225 : } WalRcvExecResult;
226 :
227 : /* WAL receiver - libpqwalreceiver hooks */
228 :
229 : /*
230 : * walrcv_connect_fn
231 : *
232 : * Establish connection to a cluster. 'logical' is true if the
233 : * connection is logical, and false if the connection is physical.
234 : * 'appname' is a name associated to the connection, to use for example
235 : * with fallback_application_name or application_name. Returns the
236 : * details about the connection established, as defined by
237 : * WalReceiverConn for each WAL receiver module. On error, NULL is
238 : * returned with 'err' including the error generated.
239 : */
240 : typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
241 : bool logical,
242 : bool must_use_password,
243 : const char *appname,
244 : char **err);
245 :
246 : /*
247 : * walrcv_check_conninfo_fn
248 : *
249 : * Parse and validate the connection string given as of 'conninfo'.
250 : */
251 : typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
252 : bool must_use_password);
253 :
254 : /*
255 : * walrcv_get_conninfo_fn
256 : *
257 : * Returns a user-displayable conninfo string. Note that any
258 : * security-sensitive fields should be obfuscated.
259 : */
260 : typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
261 :
262 : /*
263 : * walrcv_get_senderinfo_fn
264 : *
265 : * Provide information of the WAL sender this WAL receiver is connected
266 : * to, as of 'sender_host' for the host of the sender and 'sender_port'
267 : * for its port.
268 : */
269 : typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
270 : char **sender_host,
271 : int *sender_port);
272 :
273 : /*
274 : * walrcv_identify_system_fn
275 : *
276 : * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
277 : * identity of the cluster. Returns the system ID of the cluster
278 : * connected to. 'primary_tli' is the timeline ID of the sender.
279 : */
280 : typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
281 : TimeLineID *primary_tli);
282 :
283 : /*
284 : * walrcv_server_version_fn
285 : *
286 : * Returns the version number of the cluster connected to.
287 : */
288 : typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
289 :
290 : /*
291 : * walrcv_readtimelinehistoryfile_fn
292 : *
293 : * Fetch from cluster the timeline history file for timeline 'tli'.
294 : * Returns the name of the timeline history file as of 'filename', its
295 : * contents as of 'content' and its 'size'.
296 : */
297 : typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
298 : TimeLineID tli,
299 : char **filename,
300 : char **content,
301 : int *size);
302 :
303 : /*
304 : * walrcv_startstreaming_fn
305 : *
306 : * Start streaming WAL data from given streaming options. Returns true
307 : * if the connection has switched successfully to copy-both mode and false
308 : * if the server received the command and executed it successfully, but
309 : * didn't switch to copy-mode.
310 : */
311 : typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
312 : const WalRcvStreamOptions *options);
313 :
314 : /*
315 : * walrcv_endstreaming_fn
316 : *
317 : * Stop streaming of WAL data. Returns the next timeline ID of the cluster
318 : * connected to in 'next_tli', or 0 if there was no report.
319 : */
320 : typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
321 : TimeLineID *next_tli);
322 :
323 : /*
324 : * walrcv_receive_fn
325 : *
326 : * Receive a message available from the WAL stream. 'buffer' is a pointer
327 : * to a buffer holding the message received. Returns the length of the data,
328 : * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
329 : * be waited on before a retry), and -1 if the cluster ended the COPY.
330 : */
331 : typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
332 : char **buffer,
333 : pgsocket *wait_fd);
334 :
335 : /*
336 : * walrcv_send_fn
337 : *
338 : * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
339 : * contents.
340 : */
341 : typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
342 : const char *buffer,
343 : int nbytes);
344 :
345 : /*
346 : * walrcv_create_slot_fn
347 : *
348 : * Create a new replication slot named 'slotname'. 'temporary' defines
349 : * if the slot is temporary. 'snapshot_action' defines the behavior wanted
350 : * for an exported snapshot (see replication protocol for more details).
351 : * 'lsn' includes the LSN position at which the created slot became
352 : * consistent. Returns the name of the exported snapshot for a logical
353 : * slot, or NULL for a physical slot.
354 : */
355 : typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
356 : const char *slotname,
357 : bool temporary,
358 : bool two_phase,
359 : CRSSnapshotAction snapshot_action,
360 : XLogRecPtr *lsn);
361 :
362 : /*
363 : * walrcv_get_backend_pid_fn
364 : *
365 : * Returns the PID of the remote backend process.
366 : */
367 : typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
368 :
369 : /*
370 : * walrcv_exec_fn
371 : *
372 : * Send generic queries (and commands) to the remote cluster. 'nRetTypes'
373 : * is the expected number of returned attributes, and 'retTypes' an array
374 : * including their type OIDs. Returns the status of the execution and
375 : * tuples if any.
376 : */
377 : typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
378 : const char *query,
379 : const int nRetTypes,
380 : const Oid *retTypes);
381 :
382 : /*
383 : * walrcv_disconnect_fn
384 : *
385 : * Disconnect with the cluster.
386 : */
387 : typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
388 :
389 : typedef struct WalReceiverFunctionsType
390 : {
391 : walrcv_connect_fn walrcv_connect;
392 : walrcv_check_conninfo_fn walrcv_check_conninfo;
393 : walrcv_get_conninfo_fn walrcv_get_conninfo;
394 : walrcv_get_senderinfo_fn walrcv_get_senderinfo;
395 : walrcv_identify_system_fn walrcv_identify_system;
396 : walrcv_server_version_fn walrcv_server_version;
397 : walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
398 : walrcv_startstreaming_fn walrcv_startstreaming;
399 : walrcv_endstreaming_fn walrcv_endstreaming;
400 : walrcv_receive_fn walrcv_receive;
401 : walrcv_send_fn walrcv_send;
402 : walrcv_create_slot_fn walrcv_create_slot;
403 : walrcv_get_backend_pid_fn walrcv_get_backend_pid;
404 : walrcv_exec_fn walrcv_exec;
405 : walrcv_disconnect_fn walrcv_disconnect;
406 : } WalReceiverFunctionsType;
407 :
408 : extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
409 :
410 : #define walrcv_connect(conninfo, logical, must_use_password, appname, err) \
411 : WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err)
412 : #define walrcv_check_conninfo(conninfo, must_use_password) \
413 : WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
414 : #define walrcv_get_conninfo(conn) \
415 : WalReceiverFunctions->walrcv_get_conninfo(conn)
416 : #define walrcv_get_senderinfo(conn, sender_host, sender_port) \
417 : WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
418 : #define walrcv_identify_system(conn, primary_tli) \
419 : WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
420 : #define walrcv_server_version(conn) \
421 : WalReceiverFunctions->walrcv_server_version(conn)
422 : #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
423 : WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
424 : #define walrcv_startstreaming(conn, options) \
425 : WalReceiverFunctions->walrcv_startstreaming(conn, options)
426 : #define walrcv_endstreaming(conn, next_tli) \
427 : WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
428 : #define walrcv_receive(conn, buffer, wait_fd) \
429 : WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
430 : #define walrcv_send(conn, buffer, nbytes) \
431 : WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
432 : #define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \
433 : WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn)
434 : #define walrcv_get_backend_pid(conn) \
435 : WalReceiverFunctions->walrcv_get_backend_pid(conn)
436 : #define walrcv_exec(conn, exec, nRetTypes, retTypes) \
437 : WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
438 : #define walrcv_disconnect(conn) \
439 : WalReceiverFunctions->walrcv_disconnect(conn)
440 :
441 : static inline void
2208 peter_e 442 GIC 1469 : walrcv_clear_result(WalRcvExecResult *walres)
443 : {
444 1469 : if (!walres)
2208 peter_e 445 UIC 0 : return;
446 :
2208 peter_e 447 GIC 1469 : if (walres->err)
2208 peter_e 448 LBC 0 : pfree(walres->err);
449 :
2208 peter_e 450 CBC 1469 : if (walres->tuplestore)
2208 peter_e 451 GBC 822 : tuplestore_end(walres->tuplestore);
452 :
2208 peter_e 453 CBC 1469 : if (walres->tupledesc)
2208 peter_e 454 GBC 822 : FreeTupleDesc(walres->tupledesc);
455 :
2208 peter_e 456 CBC 1469 : pfree(walres);
2208 peter_e 457 ECB : }
458 :
4591 heikki.linnakangas 459 : /* prototypes for functions in walreceiver.c */
2936 tgl 460 : extern void WalReceiverMain(void) pg_attribute_noreturn();
461 : extern void ProcessWalRcvInterrupts(void);
462 : extern void WalRcvForceReply(void);
4591 heikki.linnakangas 463 :
464 : /* prototypes for functions in walreceiverfuncs.c */
465 : extern Size WalRcvShmemSize(void);
466 : extern void WalRcvShmemInit(void);
467 : extern void ShutdownWalRcv(void);
468 : extern bool WalRcvStreaming(void);
469 : extern bool WalRcvRunning(void);
470 : extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
471 : const char *conninfo, const char *slotname,
472 : bool create_temp_slot);
473 : extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
474 : extern XLogRecPtr GetWalRcvWriteRecPtr(void);
475 : extern int GetReplicationApplyDelay(void);
476 : extern int GetReplicationTransferLatency(void);
477 :
478 : #endif /* _WALRECEIVER_H */
|