TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walsender.c
4 : *
5 : * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 : * care of sending XLOG from the primary server to a single recipient.
7 : * (Note that there can be more than one walsender process concurrently.)
8 : * It is started by the postmaster when the walreceiver of a standby server
9 : * connects to the primary server and requests XLOG streaming replication.
10 : *
11 : * A walsender is similar to a regular backend, ie. there is a one-to-one
12 : * relationship between a connection and a walsender process, but instead
13 : * of processing SQL queries, it understands a small set of special
14 : * replication-mode commands. The START_REPLICATION command begins streaming
15 : * WAL to the client. While streaming, the walsender keeps reading XLOG
16 : * records from the disk and sends them to the standby server over the
17 : * COPY protocol, until either side ends the replication by exiting COPY
18 : * mode (or until the connection is closed).
19 : *
20 : * Normal termination is by SIGTERM, which instructs the walsender to
21 : * close the connection and exit(0) at the next convenient moment. Emergency
22 : * termination is by SIGQUIT; like any backend, the walsender will simply
23 : * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 : * are treated as not a crash but approximately normal termination;
25 : * the walsender will exit quickly without sending any more XLOG records.
26 : *
27 : * If the server is shut down, checkpointer sends us
28 : * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 : * the backend is idle or runs an SQL query this causes the backend to
30 : * shutdown, if logical replication is in progress all existing WAL records
31 : * are processed followed by a shutdown. Otherwise this causes the walsender
32 : * to switch to the "stopping" state. In this state, the walsender will reject
33 : * any further replication commands. The checkpointer begins the shutdown
34 : * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 : * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 : * walsender to send any outstanding WAL, including the shutdown checkpoint
37 : * record, wait for it to be replicated to the standby, and then exit.
38 : *
39 : *
40 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
41 : *
42 : * IDENTIFICATION
43 : * src/backend/replication/walsender.c
44 : *
45 : *-------------------------------------------------------------------------
46 : */
47 : #include "postgres.h"
48 :
49 : #include <signal.h>
50 : #include <unistd.h>
51 :
52 : #include "access/printtup.h"
53 : #include "access/timeline.h"
54 : #include "access/transam.h"
55 : #include "access/xact.h"
56 : #include "access/xlog_internal.h"
57 : #include "access/xlogreader.h"
58 : #include "access/xlogrecovery.h"
59 : #include "access/xlogutils.h"
60 : #include "backup/basebackup.h"
61 : #include "catalog/pg_authid.h"
62 : #include "catalog/pg_type.h"
63 : #include "commands/dbcommands.h"
64 : #include "commands/defrem.h"
65 : #include "funcapi.h"
66 : #include "libpq/libpq.h"
67 : #include "libpq/pqformat.h"
68 : #include "miscadmin.h"
69 : #include "nodes/replnodes.h"
70 : #include "pgstat.h"
71 : #include "postmaster/interrupt.h"
72 : #include "replication/decode.h"
73 : #include "replication/logical.h"
74 : #include "replication/slot.h"
75 : #include "replication/snapbuild.h"
76 : #include "replication/syncrep.h"
77 : #include "replication/walreceiver.h"
78 : #include "replication/walsender.h"
79 : #include "replication/walsender_private.h"
80 : #include "storage/condition_variable.h"
81 : #include "storage/fd.h"
82 : #include "storage/ipc.h"
83 : #include "storage/pmsignal.h"
84 : #include "storage/proc.h"
85 : #include "storage/procarray.h"
86 : #include "tcop/dest.h"
87 : #include "tcop/tcopprot.h"
88 : #include "utils/acl.h"
89 : #include "utils/builtins.h"
90 : #include "utils/guc.h"
91 : #include "utils/memutils.h"
92 : #include "utils/pg_lsn.h"
93 : #include "utils/portal.h"
94 : #include "utils/ps_status.h"
95 : #include "utils/timeout.h"
96 : #include "utils/timestamp.h"
97 :
98 : /*
99 : * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
100 : *
101 : * We don't have a good idea of what a good value would be; there's some
102 : * overhead per message in both walsender and walreceiver, but on the other
103 : * hand sending large batches makes walsender less responsive to signals
104 : * because signals are checked only between messages. 128kB (with
105 : * default 8k blocks) seems like a reasonable guess for now.
106 : */
107 : #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
108 :
109 : /* Array of WalSnds in shared memory */
110 : WalSndCtlData *WalSndCtl = NULL;
111 :
112 : /* My slot in the shared memory array */
113 : WalSnd *MyWalSnd = NULL;
114 :
115 : /* Global state */
116 : bool am_walsender = false; /* Am I a walsender process? */
117 : bool am_cascading_walsender = false; /* Am I cascading WAL to another
118 : * standby? */
119 : bool am_db_walsender = false; /* Connected to a database? */
120 :
121 : /* GUC variables */
122 : int max_wal_senders = 10; /* the maximum number of concurrent
123 : * walsenders */
124 : int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
125 : * data message */
126 : bool log_replication_commands = false;
127 :
128 : /*
129 : * State for WalSndWakeupRequest
130 : */
131 : bool wake_wal_senders = false;
132 :
133 : /*
134 : * xlogreader used for replication. Note that a WAL sender doing physical
135 : * replication does not need xlogreader to read WAL, but it needs one to
136 : * keep a state of its work.
137 : */
138 : static XLogReaderState *xlogreader = NULL;
139 :
140 : /*
141 : * These variables keep track of the state of the timeline we're currently
142 : * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
143 : * the timeline is not the latest timeline on this server, and the server's
144 : * history forked off from that timeline at sendTimeLineValidUpto.
145 : */
146 : static TimeLineID sendTimeLine = 0;
147 : static TimeLineID sendTimeLineNextTLI = 0;
148 : static bool sendTimeLineIsHistoric = false;
149 : static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
150 :
151 : /*
152 : * How far have we sent WAL already? This is also advertised in
153 : * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
154 : */
155 : static XLogRecPtr sentPtr = InvalidXLogRecPtr;
156 :
157 : /* Buffers for constructing outgoing messages and processing reply messages. */
158 : static StringInfoData output_message;
159 : static StringInfoData reply_message;
160 : static StringInfoData tmpbuf;
161 :
162 : /* Timestamp of last ProcessRepliesIfAny(). */
163 : static TimestampTz last_processing = 0;
164 :
165 : /*
166 : * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
167 : * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
168 : */
169 : static TimestampTz last_reply_timestamp = 0;
170 :
171 : /* Have we sent a heartbeat message asking for reply, since last reply? */
172 : static bool waiting_for_ping_response = false;
173 :
174 : /*
175 : * While streaming WAL in Copy mode, streamingDoneSending is set to true
176 : * after we have sent CopyDone. We should not send any more CopyData messages
177 : * after that. streamingDoneReceiving is set to true when we receive CopyDone
178 : * from the other end. When both become true, it's time to exit Copy mode.
179 : */
180 : static bool streamingDoneSending;
181 : static bool streamingDoneReceiving;
182 :
183 : /* Are we there yet? */
184 : static bool WalSndCaughtUp = false;
185 :
186 : /* Flags set by signal handlers for later service in main loop */
187 : static volatile sig_atomic_t got_SIGUSR2 = false;
188 : static volatile sig_atomic_t got_STOPPING = false;
189 :
190 : /*
191 : * This is set while we are streaming. When not set
192 : * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
193 : * the main loop is responsible for checking got_STOPPING and terminating when
194 : * it's set (after streaming any remaining WAL).
195 : */
196 : static volatile sig_atomic_t replication_active = false;
197 :
198 : static LogicalDecodingContext *logical_decoding_ctx = NULL;
199 :
200 : /* A sample associating a WAL location with the time it was written. */
201 : typedef struct
202 : {
203 : XLogRecPtr lsn;
204 : TimestampTz time;
205 : } WalTimeSample;
206 :
207 : /* The size of our buffer of time samples. */
208 : #define LAG_TRACKER_BUFFER_SIZE 8192
209 :
210 : /* A mechanism for tracking replication lag. */
211 : typedef struct
212 : {
213 : XLogRecPtr last_lsn;
214 : WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
215 : int write_head;
216 : int read_heads[NUM_SYNC_REP_WAIT_MODE];
217 : WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
218 : } LagTracker;
219 :
220 : static LagTracker *lag_tracker;
221 :
222 : /* Signal handlers */
223 : static void WalSndLastCycleHandler(SIGNAL_ARGS);
224 :
225 : /* Prototypes for private functions */
226 : typedef void (*WalSndSendDataCallback) (void);
227 : static void WalSndLoop(WalSndSendDataCallback send_data);
228 : static void InitWalSenderSlot(void);
229 : static void WalSndKill(int code, Datum arg);
230 : static void WalSndShutdown(void) pg_attribute_noreturn();
231 : static void XLogSendPhysical(void);
232 : static void XLogSendLogical(void);
233 : static void WalSndDone(WalSndSendDataCallback send_data);
234 : static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
235 : static void IdentifySystem(void);
236 : static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
237 : static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
238 : static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
239 : static void StartReplication(StartReplicationCmd *cmd);
240 : static void StartLogicalReplication(StartReplicationCmd *cmd);
241 : static void ProcessStandbyMessage(void);
242 : static void ProcessStandbyReplyMessage(void);
243 : static void ProcessStandbyHSFeedbackMessage(void);
244 : static void ProcessRepliesIfAny(void);
245 : static void ProcessPendingWrites(void);
246 : static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
247 : static void WalSndKeepaliveIfNecessary(void);
248 : static void WalSndCheckTimeOut(void);
249 : static long WalSndComputeSleeptime(TimestampTz now);
250 : static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
251 : static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 : static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
253 : static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
254 : bool skipped_xact);
255 : static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
256 : static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
257 : static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
258 : static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
259 :
260 : static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
261 : TimeLineID *tli_p);
262 :
263 :
264 : /* Initialize walsender process before entering the main command loop */
265 : void
266 CBC 831 : InitWalSender(void)
267 : {
268 831 : am_cascading_walsender = RecoveryInProgress();
269 :
270 : /* Create a per-walsender data structure in shared memory */
271 831 : InitWalSenderSlot();
272 :
273 : /*
274 : * We don't currently need any ResourceOwner in a walsender process, but
275 : * if we did, we could call CreateAuxProcessResourceOwner here.
276 : */
277 :
278 : /*
279 : * Let postmaster know that we're a WAL sender. Once we've declared us as
280 : * a WAL sender process, postmaster will let us outlive the bgwriter and
281 : * kill us last in the shutdown sequence, so we get a chance to stream all
282 : * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283 : * there's no going back, and we mustn't write any WAL records after this.
284 : */
285 831 : MarkPostmasterChildWalSender();
286 831 : SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
287 :
288 : /*
289 : * If the client didn't specify a database to connect to, show in PGPROC
290 : * that our advertised xmin should affect vacuum horizons in all
291 : * databases. This allows physical replication clients to send hot
292 : * standby feedback that will delay vacuum cleanup in all databases.
293 : */
294 831 : if (MyDatabaseId == InvalidOid)
295 : {
296 349 : Assert(MyProc->xmin == InvalidTransactionId);
297 349 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
298 349 : MyProc->statusFlags |= PROC_AFFECTS_ALL_HORIZONS;
299 349 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
300 349 : LWLockRelease(ProcArrayLock);
301 : }
302 :
303 : /* Initialize empty timestamp buffer for lag tracking. */
304 831 : lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
305 831 : }
306 :
307 : /*
308 : * Clean up after an error.
309 : *
310 : * WAL sender processes don't use transactions like regular backends do.
311 : * This function does any cleanup required after an error in a WAL sender
312 : * process, similar to what transaction abort does in a regular backend.
313 : */
314 : void
315 44 : WalSndErrorCleanup(void)
316 : {
317 44 : LWLockReleaseAll();
318 44 : ConditionVariableCancelSleep();
319 44 : pgstat_report_wait_end();
320 :
321 44 : if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
322 9 : wal_segment_close(xlogreader);
323 :
324 44 : if (MyReplicationSlot != NULL)
325 16 : ReplicationSlotRelease();
326 :
327 44 : ReplicationSlotCleanup();
328 :
329 44 : replication_active = false;
330 :
331 : /*
332 : * If there is a transaction in progress, it will clean up our
333 : * ResourceOwner, but if a replication command set up a resource owner
334 : * without a transaction, we've got to clean that up now.
335 : */
336 44 : if (!IsTransactionOrTransactionBlock())
337 43 : WalSndResourceCleanup(false);
338 :
339 44 : if (got_STOPPING || got_SIGUSR2)
340 UBC 0 : proc_exit(0);
341 :
342 : /* Revert back to startup state */
343 CBC 44 : WalSndSetState(WALSNDSTATE_STARTUP);
344 44 : }
345 :
346 : /*
347 : * Clean up any ResourceOwner we created.
348 : */
349 : void
350 160 : WalSndResourceCleanup(bool isCommit)
351 : {
352 : ResourceOwner resowner;
353 :
354 160 : if (CurrentResourceOwner == NULL)
355 39 : return;
356 :
357 : /*
358 : * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359 : * in a local variable and clear it first.
360 : */
361 121 : resowner = CurrentResourceOwner;
362 121 : CurrentResourceOwner = NULL;
363 :
364 : /* Now we can release resources and delete it. */
365 121 : ResourceOwnerRelease(resowner,
366 : RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367 121 : ResourceOwnerRelease(resowner,
368 : RESOURCE_RELEASE_LOCKS, isCommit, true);
369 121 : ResourceOwnerRelease(resowner,
370 : RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371 121 : ResourceOwnerDelete(resowner);
372 : }
373 :
374 : /*
375 : * Handle a client's connection abort in an orderly manner.
376 : */
377 : static void
378 6 : WalSndShutdown(void)
379 : {
380 : /*
381 : * Reset whereToSendOutput to prevent ereport from attempting to send any
382 : * more messages to the standby.
383 : */
384 6 : if (whereToSendOutput == DestRemote)
385 6 : whereToSendOutput = DestNone;
386 :
387 6 : proc_exit(0);
388 : abort(); /* keep the compiler quiet */
389 : }
390 :
391 : /*
392 : * Handle the IDENTIFY_SYSTEM command.
393 : */
394 : static void
395 522 : IdentifySystem(void)
396 : {
397 : char sysid[32];
398 : char xloc[MAXFNAMELEN];
399 : XLogRecPtr logptr;
400 522 : char *dbname = NULL;
401 : DestReceiver *dest;
402 : TupOutputState *tstate;
403 : TupleDesc tupdesc;
404 : Datum values[4];
405 GNC 522 : bool nulls[4] = {0};
406 : TimeLineID currTLI;
407 :
408 : /*
409 : * Reply with a result set with one row, four columns. First col is system
410 : * ID, second is timeline ID, third is current xlog location and the
411 : * fourth contains the database name if we are connected to one.
412 : */
413 :
414 CBC 522 : snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
415 : GetSystemIdentifier());
416 :
417 522 : am_cascading_walsender = RecoveryInProgress();
418 522 : if (am_cascading_walsender)
419 53 : logptr = GetStandbyFlushRecPtr(&currTLI);
420 : else
421 469 : logptr = GetFlushRecPtr(&currTLI);
422 :
423 522 : snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 :
425 522 : if (MyDatabaseId != InvalidOid)
426 : {
427 171 : MemoryContext cur = CurrentMemoryContext;
428 :
429 : /* syscache access needs a transaction env. */
430 171 : StartTransactionCommand();
431 : /* make dbname live outside TX context */
432 171 : MemoryContextSwitchTo(cur);
433 171 : dbname = get_database_name(MyDatabaseId);
434 171 : CommitTransactionCommand();
435 : /* CommitTransactionCommand switches to TopMemoryContext */
436 171 : MemoryContextSwitchTo(cur);
437 : }
438 :
439 522 : dest = CreateDestReceiver(DestRemoteSimple);
440 :
441 ECB : /* need a tuple descriptor representing four columns */
442 CBC 522 : tupdesc = CreateTemplateTupleDesc(4);
443 GIC 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
444 ECB : TEXTOID, -1, 0);
445 GIC 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
446 : INT8OID, -1, 0);
447 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
448 ECB : TEXTOID, -1, 0);
449 GIC 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
450 : TEXTOID, -1, 0);
451 :
452 ECB : /* prepare for projection of tuples */
453 GIC 522 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
454 :
455 ECB : /* column 1: system identifier */
456 GIC 522 : values[0] = CStringGetTextDatum(sysid);
457 :
458 ECB : /* column 2: timeline */
459 GNC 522 : values[1] = Int64GetDatum(currTLI);
460 :
461 ECB : /* column 3: wal location */
462 GIC 522 : values[2] = CStringGetTextDatum(xloc);
463 :
464 ECB : /* column 4: database name, or NULL if none */
465 CBC 522 : if (dbname)
466 GIC 171 : values[3] = CStringGetTextDatum(dbname);
467 ECB : else
468 GIC 351 : nulls[3] = true;
469 :
470 ECB : /* send it to dest */
471 GIC 522 : do_tup_output(tstate, values, nulls);
472 ECB :
473 CBC 522 : end_tup_output(tstate);
474 GIC 522 : }
475 :
476 : /* Handle READ_REPLICATION_SLOT command */
477 ECB : static void
478 GIC 6 : ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
479 : {
480 : #define READ_REPLICATION_SLOT_COLS 3
481 : ReplicationSlot *slot;
482 : DestReceiver *dest;
483 : TupOutputState *tstate;
484 ECB : TupleDesc tupdesc;
485 GNC 6 : Datum values[READ_REPLICATION_SLOT_COLS] = {0};
486 : bool nulls[READ_REPLICATION_SLOT_COLS];
487 ECB :
488 CBC 6 : tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
489 GIC 6 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
490 ECB : TEXTOID, -1, 0);
491 GIC 6 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
492 : TEXTOID, -1, 0);
493 ECB : /* TimeLineID is unsigned, so int4 is not wide enough. */
494 GIC 6 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
495 : INT8OID, -1, 0);
496 ECB :
497 GNC 6 : memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
498 ECB :
499 CBC 6 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
500 GIC 6 : slot = SearchNamedReplicationSlot(cmd->slotname, false);
501 CBC 6 : if (slot == NULL || !slot->in_use)
502 : {
503 GIC 2 : LWLockRelease(ReplicationSlotControlLock);
504 : }
505 : else
506 ECB : {
507 : ReplicationSlot slot_contents;
508 GIC 4 : int i = 0;
509 ECB :
510 : /* Copy slot contents while holding spinlock */
511 CBC 4 : SpinLockAcquire(&slot->mutex);
512 4 : slot_contents = *slot;
513 GIC 4 : SpinLockRelease(&slot->mutex);
514 CBC 4 : LWLockRelease(ReplicationSlotControlLock);
515 ECB :
516 GIC 4 : if (OidIsValid(slot_contents.data.database))
517 1 : ereport(ERROR,
518 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519 : errmsg("cannot use %s with a logical replication slot",
520 : "READ_REPLICATION_SLOT"));
521 ECB :
522 : /* slot type */
523 CBC 3 : values[i] = CStringGetTextDatum("physical");
524 GIC 3 : nulls[i] = false;
525 3 : i++;
526 ECB :
527 : /* start LSN */
528 GIC 3 : if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
529 : {
530 ECB : char xloc[64];
531 :
532 CBC 3 : snprintf(xloc, sizeof(xloc), "%X/%X",
533 3 : LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
534 GIC 3 : values[i] = CStringGetTextDatum(xloc);
535 CBC 3 : nulls[i] = false;
536 : }
537 GIC 3 : i++;
538 ECB :
539 : /* timeline this WAL was produced on */
540 GIC 3 : if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
541 : {
542 ECB : TimeLineID slots_position_timeline;
543 : TimeLineID current_timeline;
544 GIC 3 : List *timeline_history = NIL;
545 :
546 : /*
547 : * While in recovery, use as timeline the currently-replaying one
548 ECB : * to get the LSN position's history.
549 EUB : */
550 GIC 3 : if (RecoveryInProgress())
551 LBC 0 : (void) GetXLogReplayRecPtr(¤t_timeline);
552 : else
553 CBC 3 : current_timeline = GetWALInsertionTimeLine();
554 ECB :
555 GIC 3 : timeline_history = readTimeLineHistory(current_timeline);
556 CBC 3 : slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
557 ECB : timeline_history);
558 GIC 3 : values[i] = Int64GetDatum((int64) slots_position_timeline);
559 CBC 3 : nulls[i] = false;
560 : }
561 3 : i++;
562 :
563 GIC 3 : Assert(i == READ_REPLICATION_SLOT_COLS);
564 ECB : }
565 :
566 CBC 5 : dest = CreateDestReceiver(DestRemoteSimple);
567 5 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
568 5 : do_tup_output(tstate, values, nulls);
569 GIC 5 : end_tup_output(tstate);
570 5 : }
571 :
572 :
573 : /*
574 : * Handle TIMELINE_HISTORY command.
575 ECB : */
576 : static void
577 GIC 13 : SendTimeLineHistory(TimeLineHistoryCmd *cmd)
578 : {
579 : DestReceiver *dest;
580 : TupleDesc tupdesc;
581 : StringInfoData buf;
582 : char histfname[MAXFNAMELEN];
583 : char path[MAXPGPATH];
584 : int fd;
585 : off_t histfilelen;
586 : off_t bytesleft;
587 : Size len;
588 :
589 GNC 13 : dest = CreateDestReceiver(DestRemoteSimple);
590 :
591 ECB : /*
592 : * Reply with a result set with one row, and two columns. The first col is
593 : * the name of the history file, 2nd is the contents.
594 : */
595 GNC 13 : tupdesc = CreateTemplateTupleDesc(2);
596 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
597 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
598 :
599 GIC 13 : TLHistoryFileName(histfname, cmd->timeline);
600 CBC 13 : TLHistoryFilePath(path, cmd->timeline);
601 ECB :
602 : /* Send a RowDescription message */
603 GNC 13 : dest->rStartup(dest, CMD_SELECT, tupdesc);
604 ECB :
605 : /* Send a DataRow message */
606 GBC 13 : pq_beginmessage(&buf, 'D');
607 GIC 13 : pq_sendint16(&buf, 2); /* # of columns */
608 13 : len = strlen(histfname);
609 CBC 13 : pq_sendint32(&buf, len); /* col1 len */
610 GBC 13 : pq_sendbytes(&buf, histfname, len);
611 :
612 GIC 13 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
613 13 : if (fd < 0)
614 LBC 0 : ereport(ERROR,
615 : (errcode_for_file_access(),
616 ECB : errmsg("could not open file \"%s\": %m", path)));
617 :
618 : /* Determine file length and send it to client */
619 GIC 13 : histfilelen = lseek(fd, 0, SEEK_END);
620 13 : if (histfilelen < 0)
621 UIC 0 : ereport(ERROR,
622 ECB : (errcode_for_file_access(),
623 : errmsg("could not seek to end of file \"%s\": %m", path)));
624 CBC 13 : if (lseek(fd, 0, SEEK_SET) != 0)
625 LBC 0 : ereport(ERROR,
626 EUB : (errcode_for_file_access(),
627 : errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 :
629 GIC 13 : pq_sendint32(&buf, histfilelen); /* col2 len */
630 ECB :
631 GBC 13 : bytesleft = histfilelen;
632 GIC 26 : while (bytesleft > 0)
633 : {
634 : PGAlignedBlock rbuf;
635 : int nread;
636 ECB :
637 CBC 13 : pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
638 GIC 13 : nread = read(fd, rbuf.data, sizeof(rbuf));
639 13 : pgstat_report_wait_end();
640 CBC 13 : if (nread < 0)
641 UBC 0 : ereport(ERROR,
642 : (errcode_for_file_access(),
643 : errmsg("could not read file \"%s\": %m",
644 : path)));
645 CBC 13 : else if (nread == 0)
646 LBC 0 : ereport(ERROR,
647 : (errcode(ERRCODE_DATA_CORRUPTED),
648 : errmsg("could not read file \"%s\": read %d of %zu",
649 : path, nread, (Size) bytesleft)));
650 :
651 GIC 13 : pq_sendbytes(&buf, rbuf.data, nread);
652 13 : bytesleft -= nread;
653 : }
654 :
655 CBC 13 : if (CloseTransientFile(fd) != 0)
656 UIC 0 : ereport(ERROR,
657 : (errcode_for_file_access(),
658 : errmsg("could not close file \"%s\": %m", path)));
659 :
660 GIC 13 : pq_endmessage(&buf);
661 13 : }
662 ECB :
663 : /*
664 : * Handle START_REPLICATION command.
665 : *
666 : * At the moment, this never returns, but an ereport(ERROR) will take us back
667 : * to the main loop.
668 : */
669 EUB : static void
670 GIC 195 : StartReplication(StartReplicationCmd *cmd)
671 : {
672 : StringInfoData buf;
673 : XLogRecPtr FlushPtr;
674 : TimeLineID FlushTLI;
675 :
676 : /* create xlogreader for physical replication */
677 195 : xlogreader =
678 195 : XLogReaderAllocate(wal_segment_size, NULL,
679 195 : XL_ROUTINE(.segment_open = WalSndSegmentOpen,
680 : .segment_close = wal_segment_close),
681 : NULL);
682 :
683 CBC 195 : if (!xlogreader)
684 UIC 0 : ereport(ERROR,
685 ECB : (errcode(ERRCODE_OUT_OF_MEMORY),
686 : errmsg("out of memory"),
687 EUB : errdetail("Failed while allocating a WAL reading processor.")));
688 :
689 : /*
690 : * We assume here that we're logging enough information in the WAL for
691 : * log-shipping, since this is checked in PostmasterMain().
692 : *
693 : * NOTE: wal_level can only change at shutdown, so in most cases it is
694 : * difficult for there to be WAL data that we can still see that was
695 : * written at wal_level='minimal'.
696 : */
697 :
698 GIC 195 : if (cmd->slotname)
699 : {
700 122 : ReplicationSlotAcquire(cmd->slotname, true);
701 120 : if (SlotIsLogical(MyReplicationSlot))
702 LBC 0 : ereport(ERROR,
703 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
704 : errmsg("cannot use a logical replication slot for physical replication")));
705 :
706 : /*
707 : * We don't need to verify the slot's restart_lsn here; instead we
708 : * rely on the caller requesting the starting point to use. If the
709 : * WAL segment doesn't exist, we'll fail later.
710 : */
711 : }
712 :
713 : /*
714 : * Select the timeline. If it was given explicitly by the client, use
715 : * that. Otherwise use the timeline of the last replayed record.
716 : */
717 GIC 193 : am_cascading_walsender = RecoveryInProgress();
718 193 : if (am_cascading_walsender)
719 12 : FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
720 : else
721 181 : FlushPtr = GetFlushRecPtr(&FlushTLI);
722 ECB :
723 GIC 193 : if (cmd->timeline != 0)
724 : {
725 : XLogRecPtr switchpoint;
726 :
727 192 : sendTimeLine = cmd->timeline;
728 CBC 192 : if (sendTimeLine == FlushTLI)
729 ECB : {
730 GIC 180 : sendTimeLineIsHistoric = false;
731 CBC 180 : sendTimeLineValidUpto = InvalidXLogRecPtr;
732 : }
733 : else
734 : {
735 : List *timeLineHistory;
736 :
737 GIC 12 : sendTimeLineIsHistoric = true;
738 :
739 : /*
740 : * Check that the timeline the client requested exists, and the
741 : * requested start location is on that timeline.
742 : */
743 12 : timeLineHistory = readTimeLineHistory(FlushTLI);
744 12 : switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
745 : &sendTimeLineNextTLI);
746 12 : list_free_deep(timeLineHistory);
747 :
748 : /*
749 : * Found the requested timeline in the history. Check that
750 : * requested startpoint is on that timeline in our history.
751 ECB : *
752 : * This is quite loose on purpose. We only check that we didn't
753 : * fork off the requested timeline before the switchpoint. We
754 EUB : * don't check that we switched *to* it before the requested
755 : * starting point. This is because the client can legitimately
756 : * request to start replication from the beginning of the WAL
757 : * segment that contains switchpoint, but on the new timeline, so
758 : * that it doesn't end up with a partial segment. If you ask for
759 : * too old a starting point, you'll get an error later when we
760 : * fail to find the requested WAL segment in pg_wal.
761 : *
762 ECB : * XXX: we could be more strict here and only allow a startpoint
763 : * that's older than the switchpoint, if it's still in the same
764 : * WAL segment.
765 : */
766 GIC 12 : if (!XLogRecPtrIsInvalid(switchpoint) &&
767 CBC 12 : switchpoint < cmd->startpoint)
768 ECB : {
769 LBC 0 : ereport(ERROR,
770 : (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
771 : LSN_FORMAT_ARGS(cmd->startpoint),
772 ECB : cmd->timeline),
773 : errdetail("This server's history forked from timeline %u at %X/%X.",
774 : cmd->timeline,
775 : LSN_FORMAT_ARGS(switchpoint))));
776 : }
777 GIC 12 : sendTimeLineValidUpto = switchpoint;
778 : }
779 : }
780 : else
781 : {
782 1 : sendTimeLine = FlushTLI;
783 1 : sendTimeLineValidUpto = InvalidXLogRecPtr;
784 1 : sendTimeLineIsHistoric = false;
785 : }
786 ECB :
787 GIC 193 : streamingDoneSending = streamingDoneReceiving = false;
788 :
789 ECB : /* If there is nothing to stream, don't even enter COPY mode */
790 CBC 193 : if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
791 ECB : {
792 : /*
793 : * When we first start replication the standby will be behind the
794 : * primary. For some applications, for example synchronous
795 : * replication, it is important to have a clear state for this initial
796 : * catchup mode, so we can trigger actions when we change streaming
797 : * state later. We may stay in this state for a long time, which is
798 : * exactly why we want to be able to monitor whether or not we are
799 : * still here.
800 : */
801 GBC 193 : WalSndSetState(WALSNDSTATE_CATCHUP);
802 :
803 : /* Send a CopyBothResponse message, and start streaming */
804 GIC 193 : pq_beginmessage(&buf, 'W');
805 193 : pq_sendbyte(&buf, 0);
806 193 : pq_sendint16(&buf, 0);
807 193 : pq_endmessage(&buf);
808 CBC 193 : pq_flush();
809 :
810 : /*
811 ECB : * Don't allow a request to stream from a future point in WAL that
812 : * hasn't been flushed to disk in this server yet.
813 : */
814 GIC 193 : if (FlushPtr < cmd->startpoint)
815 ECB : {
816 UIC 0 : ereport(ERROR,
817 : (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
818 ECB : LSN_FORMAT_ARGS(cmd->startpoint),
819 : LSN_FORMAT_ARGS(FlushPtr))));
820 : }
821 :
822 : /* Start streaming from the requested point */
823 CBC 193 : sentPtr = cmd->startpoint;
824 EUB :
825 ECB : /* Initialize shared memory status, too */
826 GIC 193 : SpinLockAcquire(&MyWalSnd->mutex);
827 CBC 193 : MyWalSnd->sentPtr = sentPtr;
828 GIC 193 : SpinLockRelease(&MyWalSnd->mutex);
829 :
830 CBC 193 : SyncRepInitConfig();
831 ECB :
832 : /* Main loop of walsender */
833 GIC 193 : replication_active = true;
834 :
835 193 : WalSndLoop(XLogSendPhysical);
836 :
837 CBC 110 : replication_active = false;
838 GIC 110 : if (got_STOPPING)
839 UIC 0 : proc_exit(0);
840 GIC 110 : WalSndSetState(WALSNDSTATE_STARTUP);
841 :
842 110 : Assert(streamingDoneSending && streamingDoneReceiving);
843 : }
844 ECB :
845 GIC 110 : if (cmd->slotname)
846 CBC 94 : ReplicationSlotRelease();
847 ECB :
848 : /*
849 : * Copy is finished now. Send a single-row result set indicating the next
850 : * timeline.
851 : */
852 GIC 110 : if (sendTimeLineIsHistoric)
853 : {
854 : char startpos_str[8 + 1 + 8 + 1];
855 : DestReceiver *dest;
856 ECB : TupOutputState *tstate;
857 : TupleDesc tupdesc;
858 : Datum values[2];
859 GNC 13 : bool nulls[2] = {0};
860 :
861 GIC 13 : snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
862 13 : LSN_FORMAT_ARGS(sendTimeLineValidUpto));
863 ECB :
864 GIC 13 : dest = CreateDestReceiver(DestRemoteSimple);
865 ECB :
866 : /*
867 : * Need a tuple descriptor representing two columns. int8 may seem
868 : * like a surprising data type for this, but in theory int4 would not
869 : * be wide enough for this, as TimeLineID is unsigned.
870 : */
871 GIC 13 : tupdesc = CreateTemplateTupleDesc(2);
872 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
873 : INT8OID, -1, 0);
874 CBC 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
875 ECB : TEXTOID, -1, 0);
876 :
877 : /* prepare for projection of tuple */
878 GIC 13 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
879 :
880 13 : values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
881 13 : values[1] = CStringGetTextDatum(startpos_str);
882 :
883 : /* send it to dest */
884 13 : do_tup_output(tstate, values, nulls);
885 :
886 CBC 13 : end_tup_output(tstate);
887 : }
888 :
889 : /* Send CommandComplete message */
890 GIC 110 : EndReplicationCommand("START_STREAMING");
891 110 : }
892 :
893 : /*
894 : * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
895 : * walsender process.
896 : *
897 : * Inside the walsender we can do better than read_local_xlog_page,
898 : * which has to do a plain sleep/busy loop, because the walsender's latch gets
899 : * set every time WAL is flushed.
900 ECB : */
901 : static int
902 GIC 10781 : logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
903 : XLogRecPtr targetRecPtr, char *cur_page)
904 : {
905 : XLogRecPtr flushptr;
906 : int count;
907 : WALReadError errinfo;
908 ECB : XLogSegNo segno;
909 : TimeLineID currTLI;
910 :
911 : /*
912 : * Make sure we have enough WAL available before retrieving the current
913 : * timeline. This is needed to determine am_cascading_walsender accurately
914 : * which is needed to determine the current timeline.
915 : */
916 GNC 10781 : flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
917 :
918 : /*
919 : * Since logical decoding is also permitted on a standby server, we need
920 : * to check if the server is in recovery to decide how to get the current
921 : * timeline ID (so that it also cover the promotion or timeline change
922 : * cases).
923 : */
924 10659 : am_cascading_walsender = RecoveryInProgress();
925 :
926 10659 : if (am_cascading_walsender)
927 311 : GetXLogReplayRecPtr(&currTLI);
928 : else
929 10348 : currTLI = GetWALInsertionTimeLine();
930 :
931 CBC 10659 : XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
932 10659 : sendTimeLineIsHistoric = (state->currTLI != currTLI);
933 10659 : sendTimeLine = state->currTLI;
934 GIC 10659 : sendTimeLineValidUpto = state->currTLIValidUntil;
935 10659 : sendTimeLineNextTLI = state->nextTLI;
936 ECB :
937 : /* fail if not (implies we are going to shut down) */
938 GIC 10659 : if (flushptr < targetPagePtr + reqLen)
939 CBC 204 : return -1;
940 :
941 GIC 10455 : if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
942 CBC 8909 : count = XLOG_BLCKSZ; /* more than one block available */
943 : else
944 GIC 1546 : count = flushptr - targetPagePtr; /* part of the page available */
945 :
946 : /* now actually read the data, we know it's there */
947 10455 : if (!WALRead(state,
948 : cur_page,
949 : targetPagePtr,
950 EUB : XLOG_BLCKSZ,
951 : currTLI, /* Pass the current TLI because only
952 : * WalSndSegmentOpen controls whether new TLI
953 : * is needed. */
954 : &errinfo))
955 UIC 0 : WALReadRaiseError(&errinfo);
956 :
957 : /*
958 : * After reading into the buffer, check that what we read was valid. We do
959 ECB : * this after reading, because even though the segment was present when we
960 : * opened it, it might get recycled or removed while we read it. The
961 : * read() succeeds in that case, but the data we tried to read might
962 : * already have been overwritten with new WAL records.
963 : */
964 GIC 10455 : XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
965 10455 : CheckXLogRemoved(segno, state->seg.ws_tli);
966 :
967 10455 : return count;
968 : }
969 ECB :
970 : /*
971 : * Process extra options given to CREATE_REPLICATION_SLOT.
972 : */
973 : static void
974 GIC 344 : parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
975 ECB : bool *reserve_wal,
976 : CRSSnapshotAction *snapshot_action,
977 : bool *two_phase)
978 : {
979 : ListCell *lc;
980 CBC 344 : bool snapshot_action_given = false;
981 GIC 344 : bool reserve_wal_given = false;
982 CBC 344 : bool two_phase_given = false;
983 :
984 ECB : /* Parse options */
985 GIC 689 : foreach(lc, cmd->options)
986 : {
987 345 : DefElem *defel = (DefElem *) lfirst(lc);
988 ECB :
989 GBC 345 : if (strcmp(defel->defname, "snapshot") == 0)
990 : {
991 : char *action;
992 :
993 CBC 247 : if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
994 LBC 0 : ereport(ERROR,
995 : (errcode(ERRCODE_SYNTAX_ERROR),
996 ECB : errmsg("conflicting or redundant options")));
997 EUB :
998 CBC 247 : action = defGetString(defel);
999 247 : snapshot_action_given = true;
1000 ECB :
1001 CBC 247 : if (strcmp(action, "export") == 0)
1002 UIC 0 : *snapshot_action = CRS_EXPORT_SNAPSHOT;
1003 GBC 247 : else if (strcmp(action, "nothing") == 0)
1004 GIC 92 : *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1005 155 : else if (strcmp(action, "use") == 0)
1006 155 : *snapshot_action = CRS_USE_SNAPSHOT;
1007 : else
1008 LBC 0 : ereport(ERROR,
1009 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
1010 ECB : errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
1011 EUB : defel->defname, action)));
1012 : }
1013 GIC 98 : else if (strcmp(defel->defname, "reserve_wal") == 0)
1014 : {
1015 CBC 96 : if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
1016 LBC 0 : ereport(ERROR,
1017 : (errcode(ERRCODE_SYNTAX_ERROR),
1018 ECB : errmsg("conflicting or redundant options")));
1019 :
1020 CBC 96 : reserve_wal_given = true;
1021 GBC 96 : *reserve_wal = defGetBoolean(defel);
1022 : }
1023 GIC 2 : else if (strcmp(defel->defname, "two_phase") == 0)
1024 ECB : {
1025 CBC 2 : if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
1026 UIC 0 : ereport(ERROR,
1027 : (errcode(ERRCODE_SYNTAX_ERROR),
1028 EUB : errmsg("conflicting or redundant options")));
1029 GIC 2 : two_phase_given = true;
1030 CBC 2 : *two_phase = defGetBoolean(defel);
1031 : }
1032 : else
1033 UIC 0 : elog(ERROR, "unrecognized option: %s", defel->defname);
1034 : }
1035 GIC 344 : }
1036 ECB :
1037 : /*
1038 : * Create a new replication slot.
1039 : */
1040 : static void
1041 CBC 344 : CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
1042 ECB : {
1043 CBC 344 : const char *snapshot_name = NULL;
1044 : char xloc[MAXFNAMELEN];
1045 : char *slot_name;
1046 GIC 344 : bool reserve_wal = false;
1047 344 : bool two_phase = false;
1048 CBC 344 : CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1049 : DestReceiver *dest;
1050 ECB : TupOutputState *tstate;
1051 : TupleDesc tupdesc;
1052 : Datum values[4];
1053 GNC 344 : bool nulls[4] = {0};
1054 ECB :
1055 GIC 344 : Assert(!MyReplicationSlot);
1056 ECB :
1057 CBC 344 : parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1058 :
1059 GIC 344 : if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1060 : {
1061 97 : ReplicationSlotCreate(cmd->slotname, false,
1062 CBC 97 : cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
1063 : false);
1064 : }
1065 : else
1066 : {
1067 GIC 247 : CheckLogicalDecodingRequirements();
1068 :
1069 : /*
1070 : * Initially create persistent slot as ephemeral - that allows us to
1071 ECB : * nicely handle errors during initialization because it'll get
1072 : * dropped if this transaction fails. We'll make it persistent at the
1073 : * end. Temporary slots can be created as temporary from beginning as
1074 : * they get dropped on error as well.
1075 : */
1076 CBC 247 : ReplicationSlotCreate(cmd->slotname, true,
1077 GIC 247 : cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
1078 : two_phase);
1079 ECB : }
1080 :
1081 GIC 343 : if (cmd->kind == REPLICATION_KIND_LOGICAL)
1082 : {
1083 : LogicalDecodingContext *ctx;
1084 247 : bool need_full_snapshot = false;
1085 ECB :
1086 : /*
1087 EUB : * Do options check early so that we can bail before calling the
1088 : * DecodingContextFindStartpoint which can take long time.
1089 : */
1090 GIC 247 : if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1091 : {
1092 UIC 0 : if (IsTransactionBlock())
1093 UBC 0 : ereport(ERROR,
1094 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1095 ECB : (errmsg("%s must not be called inside a transaction",
1096 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
1097 :
1098 UBC 0 : need_full_snapshot = true;
1099 : }
1100 GIC 247 : else if (snapshot_action == CRS_USE_SNAPSHOT)
1101 : {
1102 155 : if (!IsTransactionBlock())
1103 LBC 0 : ereport(ERROR,
1104 EUB : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1105 : (errmsg("%s must be called inside a transaction",
1106 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1107 :
1108 CBC 155 : if (XactIsoLevel != XACT_REPEATABLE_READ)
1109 UBC 0 : ereport(ERROR,
1110 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111 : (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1112 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1113 GNC 155 : if (!XactReadOnly)
1114 UNC 0 : ereport(ERROR,
1115 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1116 : (errmsg("%s must be called in a read only transaction",
1117 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1118 :
1119 CBC 155 : if (FirstSnapshotSet)
1120 UBC 0 : ereport(ERROR,
1121 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1122 : (errmsg("%s must be called before any query",
1123 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1124 :
1125 CBC 155 : if (IsSubTransaction())
1126 UBC 0 : ereport(ERROR,
1127 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1128 : (errmsg("%s must not be called in a subtransaction",
1129 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1130 :
1131 CBC 155 : need_full_snapshot = true;
1132 : }
1133 :
1134 247 : ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1135 : InvalidXLogRecPtr,
1136 247 : XL_ROUTINE(.page_read = logical_read_xlog_page,
1137 : .segment_open = WalSndSegmentOpen,
1138 : .segment_close = wal_segment_close),
1139 : WalSndPrepareWrite, WalSndWriteData,
1140 : WalSndUpdateProgress);
1141 :
1142 : /*
1143 : * Signal that we don't need the timeout mechanism. We're just
1144 : * creating the replication slot and don't yet accept feedback
1145 : * messages or send keepalives. As we possibly need to wait for
1146 : * further WAL the walsender would otherwise possibly be killed too
1147 : * soon.
1148 : */
1149 247 : last_reply_timestamp = 0;
1150 :
1151 : /* build initial snapshot, might take a while */
1152 247 : DecodingContextFindStartpoint(ctx);
1153 :
1154 : /*
1155 : * Export or use the snapshot if we've been asked to do so.
1156 : *
1157 : * NB. We will convert the snapbuild.c kind of snapshot to normal
1158 : * snapshot when doing this.
1159 : */
1160 247 : if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1161 : {
1162 UBC 0 : snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1163 : }
1164 CBC 247 : else if (snapshot_action == CRS_USE_SNAPSHOT)
1165 : {
1166 : Snapshot snap;
1167 :
1168 155 : snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
1169 155 : RestoreTransactionSnapshot(snap, MyProc);
1170 : }
1171 :
1172 : /* don't need the decoding context anymore */
1173 247 : FreeDecodingContext(ctx);
1174 :
1175 247 : if (!cmd->temporary)
1176 247 : ReplicationSlotPersist();
1177 : }
1178 96 : else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1179 : {
1180 95 : ReplicationSlotReserveWal();
1181 :
1182 95 : ReplicationSlotMarkDirty();
1183 :
1184 : /* Write this slot to disk if it's a permanent one. */
1185 95 : if (!cmd->temporary)
1186 3 : ReplicationSlotSave();
1187 : }
1188 :
1189 343 : snprintf(xloc, sizeof(xloc), "%X/%X",
1190 343 : LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
1191 :
1192 343 : dest = CreateDestReceiver(DestRemoteSimple);
1193 :
1194 : /*----------
1195 : * Need a tuple descriptor representing four columns:
1196 : * - first field: the slot name
1197 : * - second field: LSN at which we became consistent
1198 : * - third field: exported snapshot's name
1199 : * - fourth field: output plugin
1200 : *----------
1201 ECB : */
1202 CBC 343 : tupdesc = CreateTemplateTupleDesc(4);
1203 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
1204 ECB : TEXTOID, -1, 0);
1205 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
1206 ECB : TEXTOID, -1, 0);
1207 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
1208 ECB : TEXTOID, -1, 0);
1209 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1210 : TEXTOID, -1, 0);
1211 :
1212 ECB : /* prepare for projection of tuples */
1213 GIC 343 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1214 :
1215 ECB : /* slot_name */
1216 CBC 343 : slot_name = NameStr(MyReplicationSlot->data.name);
1217 GIC 343 : values[0] = CStringGetTextDatum(slot_name);
1218 :
1219 ECB : /* consistent wal location */
1220 GIC 343 : values[1] = CStringGetTextDatum(xloc);
1221 :
1222 ECB : /* snapshot name, or NULL if none */
1223 GBC 343 : if (snapshot_name != NULL)
1224 UIC 0 : values[2] = CStringGetTextDatum(snapshot_name);
1225 ECB : else
1226 GIC 343 : nulls[2] = true;
1227 :
1228 ECB : /* plugin, or NULL if none */
1229 CBC 343 : if (cmd->plugin != NULL)
1230 GIC 247 : values[3] = CStringGetTextDatum(cmd->plugin);
1231 ECB : else
1232 GIC 96 : nulls[3] = true;
1233 :
1234 ECB : /* send it to dest */
1235 CBC 343 : do_tup_output(tstate, values, nulls);
1236 GIC 343 : end_tup_output(tstate);
1237 ECB :
1238 CBC 343 : ReplicationSlotRelease();
1239 GIC 343 : }
1240 :
1241 : /*
1242 : * Get rid of a replication slot that is no longer wanted.
1243 : */
1244 ECB : static void
1245 GIC 192 : DropReplicationSlot(DropReplicationSlotCmd *cmd)
1246 ECB : {
1247 CBC 192 : ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1248 GIC 192 : }
1249 :
1250 : /*
1251 : * Load previously initiated logical slot and prepare for sending data (via
1252 : * WalSndLoop).
1253 : */
1254 ECB : static void
1255 GIC 298 : StartLogicalReplication(StartReplicationCmd *cmd)
1256 : {
1257 : StringInfoData buf;
1258 : QueryCompletion qc;
1259 :
1260 ECB : /* make sure that our requirements are still fulfilled */
1261 GIC 298 : CheckLogicalDecodingRequirements();
1262 ECB :
1263 GIC 296 : Assert(!MyReplicationSlot);
1264 ECB :
1265 GIC 296 : ReplicationSlotAcquire(cmd->slotname, true);
1266 :
1267 : /*
1268 EUB : * Force a disconnect, so that the decoding code doesn't need to care
1269 : * about an eventual switch from running in recovery, to running in a
1270 : * normal environment. Client code is expected to handle reconnects.
1271 : */
1272 GIC 296 : if (am_cascading_walsender && !RecoveryInProgress())
1273 : {
1274 UIC 0 : ereport(LOG,
1275 : (errmsg("terminating walsender process after promotion")));
1276 0 : got_STOPPING = true;
1277 : }
1278 ECB :
1279 : /*
1280 : * Create our decoding context, making it start at the previously ack'ed
1281 : * position.
1282 : *
1283 : * Do this before sending a CopyBothResponse message, so that any errors
1284 : * are reported early.
1285 : */
1286 GIC 290 : logical_decoding_ctx =
1287 CBC 296 : CreateDecodingContext(cmd->startpoint, cmd->options, false,
1288 GIC 296 : XL_ROUTINE(.page_read = logical_read_xlog_page,
1289 : .segment_open = WalSndSegmentOpen,
1290 ECB : .segment_close = wal_segment_close),
1291 : WalSndPrepareWrite, WalSndWriteData,
1292 : WalSndUpdateProgress);
1293 CBC 290 : xlogreader = logical_decoding_ctx->reader;
1294 ECB :
1295 GIC 290 : WalSndSetState(WALSNDSTATE_CATCHUP);
1296 :
1297 ECB : /* Send a CopyBothResponse message, and start streaming */
1298 CBC 290 : pq_beginmessage(&buf, 'W');
1299 GIC 290 : pq_sendbyte(&buf, 0);
1300 290 : pq_sendint16(&buf, 0);
1301 290 : pq_endmessage(&buf);
1302 290 : pq_flush();
1303 :
1304 ECB : /* Start reading WAL from the oldest required WAL. */
1305 GIC 290 : XLogBeginRead(logical_decoding_ctx->reader,
1306 290 : MyReplicationSlot->data.restart_lsn);
1307 ECB :
1308 : /*
1309 : * Report the location after which we'll send out further commits as the
1310 : * current sentPtr.
1311 : */
1312 GIC 290 : sentPtr = MyReplicationSlot->data.confirmed_flush;
1313 ECB :
1314 : /* Also update the sent position status in shared memory */
1315 GIC 290 : SpinLockAcquire(&MyWalSnd->mutex);
1316 CBC 290 : MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
1317 GIC 290 : SpinLockRelease(&MyWalSnd->mutex);
1318 ECB :
1319 CBC 290 : replication_active = true;
1320 :
1321 290 : SyncRepInitConfig();
1322 ECB :
1323 EUB : /* Main loop of walsender */
1324 CBC 290 : WalSndLoop(XLogSendLogical);
1325 :
1326 GIC 154 : FreeDecodingContext(logical_decoding_ctx);
1327 CBC 154 : ReplicationSlotRelease();
1328 ECB :
1329 CBC 154 : replication_active = false;
1330 GIC 154 : if (got_STOPPING)
1331 UIC 0 : proc_exit(0);
1332 GIC 154 : WalSndSetState(WALSNDSTATE_STARTUP);
1333 :
1334 : /* Get out of COPY mode (CommandComplete). */
1335 154 : SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1336 154 : EndCommand(&qc, DestRemote, false);
1337 154 : }
1338 :
1339 : /*
1340 ECB : * LogicalDecodingContext 'prepare_write' callback.
1341 : *
1342 : * Prepare a write into a StringInfo.
1343 : *
1344 : * Don't do anything lasting in here, it's quite possible that nothing will be done
1345 : * with the data.
1346 : */
1347 : static void
1348 CBC 184362 : WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
1349 ECB : {
1350 : /* can't have sync rep confused by sending the same LSN several times */
1351 GIC 184362 : if (!last_write)
1352 318 : lsn = InvalidXLogRecPtr;
1353 :
1354 184362 : resetStringInfo(ctx->out);
1355 :
1356 CBC 184362 : pq_sendbyte(ctx->out, 'w');
1357 184362 : pq_sendint64(ctx->out, lsn); /* dataStart */
1358 GIC 184362 : pq_sendint64(ctx->out, lsn); /* walEnd */
1359 :
1360 : /*
1361 : * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1362 : * reserve space here.
1363 : */
1364 184362 : pq_sendint64(ctx->out, 0); /* sendtime */
1365 184362 : }
1366 :
1367 ECB : /*
1368 : * LogicalDecodingContext 'write' callback.
1369 : *
1370 : * Actually write out data previously prepared by WalSndPrepareWrite out to
1371 : * the network. Take as long as needed, but process replies from the other
1372 : * side and check timeouts during that.
1373 : */
1374 : static void
1375 GIC 184362 : WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1376 : bool last_write)
1377 ECB : {
1378 : TimestampTz now;
1379 :
1380 : /*
1381 : * Fill the send timestamp last, so that it is taken as late as possible.
1382 : * This is somewhat ugly, but the protocol is set as it's already used for
1383 : * several releases by streaming physical replication.
1384 : */
1385 GIC 184362 : resetStringInfo(&tmpbuf);
1386 CBC 184362 : now = GetCurrentTimestamp();
1387 GIC 184362 : pq_sendint64(&tmpbuf, now);
1388 184362 : memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
1389 CBC 184362 : tmpbuf.data, sizeof(int64));
1390 ECB :
1391 : /* output previously gathered data in a CopyData packet */
1392 GIC 184362 : pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1393 ECB :
1394 CBC 184362 : CHECK_FOR_INTERRUPTS();
1395 ECB :
1396 : /* Try to flush pending output to the client */
1397 CBC 184362 : if (pq_flush_if_writable() != 0)
1398 GIC 6 : WalSndShutdown();
1399 :
1400 : /* Try taking fast path unless we get too close to walsender timeout. */
1401 CBC 184356 : if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1402 GIC 184356 : wal_sender_timeout / 2) &&
1403 184356 : !pq_is_send_pending())
1404 : {
1405 184250 : return;
1406 : }
1407 :
1408 : /* If we have pending write here, go to slow path */
1409 CBC 106 : ProcessPendingWrites();
1410 : }
1411 :
1412 ECB : /*
1413 : * Wait until there is no pending write. Also process replies from the other
1414 : * side and check timeouts during that.
1415 : */
1416 : static void
1417 GIC 106 : ProcessPendingWrites(void)
1418 : {
1419 ECB : for (;;)
1420 GIC 136 : {
1421 : long sleeptime;
1422 ECB :
1423 : /* Check for input from the client */
1424 CBC 242 : ProcessRepliesIfAny();
1425 ECB :
1426 : /* die if timeout was reached */
1427 CBC 242 : WalSndCheckTimeOut();
1428 :
1429 : /* Send keepalive if the time has come */
1430 242 : WalSndKeepaliveIfNecessary();
1431 :
1432 GIC 242 : if (!pq_is_send_pending())
1433 106 : break;
1434 ECB :
1435 GIC 136 : sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1436 ECB :
1437 : /* Sleep until something happens or we time out */
1438 GIC 136 : WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
1439 ECB : WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1440 :
1441 EUB : /* Clear any already-pending wakeups */
1442 GBC 136 : ResetLatch(MyLatch);
1443 EUB :
1444 GIC 136 : CHECK_FOR_INTERRUPTS();
1445 :
1446 : /* Process any requests or signals received recently */
1447 CBC 136 : if (ConfigReloadPending)
1448 EUB : {
1449 UIC 0 : ConfigReloadPending = false;
1450 0 : ProcessConfigFile(PGC_SIGHUP);
1451 0 : SyncRepInitConfig();
1452 ECB : }
1453 :
1454 : /* Try to flush pending output to the client */
1455 GIC 136 : if (pq_flush_if_writable() != 0)
1456 UIC 0 : WalSndShutdown();
1457 : }
1458 :
1459 : /* reactivate latch so WalSndLoop knows to continue */
1460 GIC 106 : SetLatch(MyLatch);
1461 106 : }
1462 :
1463 ECB : /*
1464 : * LogicalDecodingContext 'update_progress' callback.
1465 : *
1466 : * Write the current position to the lag tracker (see XLogSendPhysical).
1467 : *
1468 : * When skipping empty transactions, send a keepalive message if necessary.
1469 : */
1470 : static void
1471 GIC 2215 : WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1472 : bool skipped_xact)
1473 : {
1474 : static TimestampTz sendTime = 0;
1475 2215 : TimestampTz now = GetCurrentTimestamp();
1476 2215 : bool pending_writes = false;
1477 2215 : bool end_xact = ctx->end_xact;
1478 :
1479 : /*
1480 ECB : * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1481 : * avoid flooding the lag tracker when we commit frequently.
1482 : *
1483 : * We don't have a mechanism to get the ack for any LSN other than end
1484 : * xact LSN from the downstream. So, we track lag only for end of
1485 : * transaction LSN.
1486 : */
1487 : #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
1488 GIC 2215 : if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1489 : WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1490 : {
1491 164 : LagTrackerWrite(lsn, now);
1492 164 : sendTime = now;
1493 : }
1494 :
1495 ECB : /*
1496 : * When skipping empty transactions in synchronous replication, we send a
1497 : * keepalive message to avoid delaying such transactions.
1498 : *
1499 EUB : * It is okay to check sync_standbys_defined flag without lock here as in
1500 : * the worst case we will just send an extra keepalive message when it is
1501 : * really not required.
1502 : */
1503 GBC 2215 : if (skipped_xact &&
1504 GIC 255 : SyncRepRequested() &&
1505 255 : ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
1506 EUB : {
1507 UBC 0 : WalSndKeepalive(false, lsn);
1508 :
1509 : /* Try to flush pending output to the client */
1510 UIC 0 : if (pq_flush_if_writable() != 0)
1511 0 : WalSndShutdown();
1512 :
1513 : /* If we have pending write here, make sure it's actually flushed */
1514 0 : if (pq_is_send_pending())
1515 0 : pending_writes = true;
1516 : }
1517 ECB :
1518 : /*
1519 : * Process pending writes if any or try to send a keepalive if required.
1520 EUB : * We don't need to try sending keep alive messages at the transaction end
1521 ECB : * as that will be done at a later point in time. This is required only
1522 : * for large transactions where we don't send any changes to the
1523 : * downstream and the receiver can timeout due to that.
1524 : */
1525 GIC 2215 : if (pending_writes || (!end_xact &&
1526 1528 : now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
1527 : wal_sender_timeout / 2)))
1528 UIC 0 : ProcessPendingWrites();
1529 GIC 2215 : }
1530 :
1531 ECB : /*
1532 : * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1533 : *
1534 : * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1535 : * if we detect a shutdown request (either from postmaster or client)
1536 : * we will return early, so caller must always check.
1537 : */
1538 : static XLogRecPtr
1539 GIC 10781 : WalSndWaitForWal(XLogRecPtr loc)
1540 : {
1541 ECB : int wakeEvents;
1542 : static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1543 :
1544 : /*
1545 : * Fast path to avoid acquiring the spinlock in case we already know we
1546 : * have enough WAL available. This is particularly interesting if we're
1547 : * far behind.
1548 : */
1549 CBC 10781 : if (RecentFlushPtr != InvalidXLogRecPtr &&
1550 GIC 10392 : loc <= RecentFlushPtr)
1551 8957 : return RecentFlushPtr;
1552 ECB :
1553 : /* Get a more recent flush pointer. */
1554 GIC 1824 : if (!RecoveryInProgress())
1555 1553 : RecentFlushPtr = GetFlushRecPtr(NULL);
1556 ECB : else
1557 GIC 271 : RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1558 ECB :
1559 : for (;;)
1560 GIC 2087 : {
1561 ECB : long sleeptime;
1562 :
1563 : /* Clear any already-pending wakeups */
1564 CBC 3911 : ResetLatch(MyLatch);
1565 ECB :
1566 GIC 3911 : CHECK_FOR_INTERRUPTS();
1567 :
1568 : /* Process any requests or signals received recently */
1569 CBC 3905 : if (ConfigReloadPending)
1570 : {
1571 GIC 3 : ConfigReloadPending = false;
1572 3 : ProcessConfigFile(PGC_SIGHUP);
1573 3 : SyncRepInitConfig();
1574 : }
1575 :
1576 ECB : /* Check for input from the client */
1577 CBC 3905 : ProcessRepliesIfAny();
1578 :
1579 : /*
1580 ECB : * If we're shutting down, trigger pending WAL to be written out,
1581 : * otherwise we'd possibly end up waiting for WAL that never gets
1582 : * written, because walwriter has shut down already.
1583 : */
1584 GIC 3789 : if (got_STOPPING)
1585 108 : XLogBackgroundFlush();
1586 :
1587 : /* Update our idea of the currently flushed position. */
1588 3789 : if (!RecoveryInProgress())
1589 3453 : RecentFlushPtr = GetFlushRecPtr(NULL);
1590 : else
1591 336 : RecentFlushPtr = GetXLogReplayRecPtr(NULL);
1592 ECB :
1593 : /*
1594 : * If postmaster asked us to stop, don't wait anymore.
1595 : *
1596 : * It's important to do this check after the recomputation of
1597 : * RecentFlushPtr, so we can send all remaining data before shutting
1598 : * down.
1599 : */
1600 GIC 3789 : if (got_STOPPING)
1601 108 : break;
1602 :
1603 ECB : /*
1604 : * We only send regular messages to the client for full decoded
1605 : * transactions, but a synchronous replication and walsender shutdown
1606 : * possibly are waiting for a later location. So, before sleeping, we
1607 : * send a ping containing the flush location. If the receiver is
1608 : * otherwise idle, this keepalive will trigger a reply. Processing the
1609 : * reply will update these MyWalSnd locations.
1610 : */
1611 GIC 3681 : if (MyWalSnd->flush < sentPtr &&
1612 1970 : MyWalSnd->write < sentPtr &&
1613 CBC 1463 : !waiting_for_ping_response)
1614 GIC 1463 : WalSndKeepalive(false, InvalidXLogRecPtr);
1615 :
1616 : /* check whether we're done */
1617 3681 : if (loc <= RecentFlushPtr)
1618 CBC 1498 : break;
1619 EUB :
1620 : /* Waiting for new WAL. Since we need to wait, we're now caught up. */
1621 GIC 2183 : WalSndCaughtUp = true;
1622 :
1623 : /*
1624 : * Try to flush any pending output to the client.
1625 : */
1626 CBC 2183 : if (pq_flush_if_writable() != 0)
1627 LBC 0 : WalSndShutdown();
1628 ECB :
1629 : /*
1630 : * If we have received CopyDone from the client, sent CopyDone
1631 : * ourselves, and the output buffer is empty, it's time to exit
1632 : * streaming, so fail the current WAL fetch request.
1633 : */
1634 CBC 2183 : if (streamingDoneReceiving && streamingDoneSending &&
1635 GIC 96 : !pq_is_send_pending())
1636 96 : break;
1637 :
1638 : /* die if timeout was reached */
1639 2087 : WalSndCheckTimeOut();
1640 :
1641 : /* Send keepalive if the time has come */
1642 2087 : WalSndKeepaliveIfNecessary();
1643 ECB :
1644 : /*
1645 : * Sleep until something happens or we time out. Also wait for the
1646 : * socket becoming writable, if there's still pending output.
1647 : * Otherwise we might sit on sendable output data while waiting for
1648 EUB : * new WAL to be generated. (But if we have nothing to send, we don't
1649 : * want to wake on socket-writable.)
1650 ECB : */
1651 GIC 2087 : sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1652 :
1653 2087 : wakeEvents = WL_SOCKET_READABLE;
1654 ECB :
1655 CBC 2087 : if (pq_is_send_pending())
1656 UIC 0 : wakeEvents |= WL_SOCKET_WRITEABLE;
1657 :
1658 GIC 2087 : WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1659 : }
1660 :
1661 : /* reactivate latch so WalSndLoop knows to continue */
1662 1702 : SetLatch(MyLatch);
1663 1702 : return RecentFlushPtr;
1664 : }
1665 ECB :
1666 : /*
1667 : * Execute an incoming replication command.
1668 : *
1669 : * Returns true if the cmd_string was recognized as WalSender command, false
1670 : * if not.
1671 : */
1672 : bool
1673 GIC 3914 : exec_replication_command(const char *cmd_string)
1674 : {
1675 : int parse_rc;
1676 : Node *cmd_node;
1677 ECB : const char *cmdtag;
1678 EUB : MemoryContext cmd_context;
1679 : MemoryContext old_context;
1680 :
1681 : /*
1682 : * If WAL sender has been told that shutdown is getting close, switch its
1683 : * status accordingly to handle the next replication commands correctly.
1684 : */
1685 CBC 3914 : if (got_STOPPING)
1686 UBC 0 : WalSndSetState(WALSNDSTATE_STOPPING);
1687 :
1688 : /*
1689 : * Throw error if in stopping mode. We need prevent commands that could
1690 : * generate WAL while the shutdown checkpoint is being written. To be
1691 : * safe, we just prohibit all new commands.
1692 : */
1693 GIC 3914 : if (MyWalSnd->state == WALSNDSTATE_STOPPING)
1694 LBC 0 : ereport(ERROR,
1695 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1696 ECB : errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1697 :
1698 : /*
1699 : * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1700 : * command arrives. Clean up the old stuff if there's anything.
1701 : */
1702 GIC 3914 : SnapBuildClearExportedSnapshot();
1703 :
1704 CBC 3914 : CHECK_FOR_INTERRUPTS();
1705 :
1706 ECB : /*
1707 : * Prepare to parse and execute the command.
1708 : */
1709 GIC 3914 : cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1710 : "Replication command context",
1711 ECB : ALLOCSET_DEFAULT_SIZES);
1712 GIC 3914 : old_context = MemoryContextSwitchTo(cmd_context);
1713 :
1714 CBC 3914 : replication_scanner_init(cmd_string);
1715 :
1716 ECB : /*
1717 : * Is it a WalSender command?
1718 : */
1719 GIC 3914 : if (!replication_scanner_is_replication_command())
1720 ECB : {
1721 EUB : /* Nope; clean up and get out. */
1722 GIC 1752 : replication_scanner_finish();
1723 :
1724 1752 : MemoryContextSwitchTo(old_context);
1725 1752 : MemoryContextDelete(cmd_context);
1726 ECB :
1727 : /* XXX this is a pretty random place to make this check */
1728 GIC 1752 : if (MyDatabaseId == InvalidOid)
1729 UIC 0 : ereport(ERROR,
1730 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1731 : errmsg("cannot execute SQL commands in WAL sender for physical replication")));
1732 ECB :
1733 : /* Tell the caller that this wasn't a WalSender command. */
1734 GBC 1752 : return false;
1735 : }
1736 :
1737 : /*
1738 ECB : * Looks like a WalSender command, so parse it.
1739 : */
1740 CBC 2162 : parse_rc = replication_yyparse();
1741 GIC 2162 : if (parse_rc != 0)
1742 UIC 0 : ereport(ERROR,
1743 : (errcode(ERRCODE_SYNTAX_ERROR),
1744 : errmsg_internal("replication command parser returned %d",
1745 : parse_rc)));
1746 CBC 2162 : replication_scanner_finish();
1747 :
1748 2162 : cmd_node = replication_parse_result;
1749 :
1750 : /*
1751 : * Report query to various monitoring facilities. For this purpose, we
1752 : * report replication commands just like SQL commands.
1753 : */
1754 GIC 2162 : debug_query_string = cmd_string;
1755 ECB :
1756 GIC 2162 : pgstat_report_activity(STATE_RUNNING, cmd_string);
1757 :
1758 : /*
1759 : * Log replication command if log_replication_commands is enabled. Even
1760 : * when it's disabled, log the command with DEBUG1 level for backward
1761 ECB : * compatibility.
1762 EUB : */
1763 GIC 2162 : ereport(log_replication_commands ? LOG : DEBUG1,
1764 : (errmsg("received replication command: %s", cmd_string)));
1765 :
1766 : /*
1767 ECB : * Disallow replication commands in aborted transaction blocks.
1768 : */
1769 GIC 2162 : if (IsAbortedTransactionBlockState())
1770 UIC 0 : ereport(ERROR,
1771 : (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1772 : errmsg("current transaction is aborted, "
1773 ECB : "commands ignored until end of transaction block")));
1774 :
1775 CBC 2162 : CHECK_FOR_INTERRUPTS();
1776 :
1777 ECB : /*
1778 : * Allocate buffers that will be used for each outgoing and incoming
1779 : * message. We do this just once per command to reduce palloc overhead.
1780 : */
1781 CBC 2162 : initStringInfo(&output_message);
1782 2162 : initStringInfo(&reply_message);
1783 2162 : initStringInfo(&tmpbuf);
1784 ECB :
1785 GIC 2162 : switch (cmd_node->type)
1786 ECB : {
1787 CBC 522 : case T_IdentifySystemCmd:
1788 522 : cmdtag = "IDENTIFY_SYSTEM";
1789 522 : set_ps_display(cmdtag);
1790 522 : IdentifySystem();
1791 522 : EndReplicationCommand(cmdtag);
1792 GIC 522 : break;
1793 ECB :
1794 CBC 6 : case T_ReadReplicationSlotCmd:
1795 6 : cmdtag = "READ_REPLICATION_SLOT";
1796 6 : set_ps_display(cmdtag);
1797 6 : ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
1798 5 : EndReplicationCommand(cmdtag);
1799 5 : break;
1800 :
1801 143 : case T_BaseBackupCmd:
1802 143 : cmdtag = "BASE_BACKUP";
1803 143 : set_ps_display(cmdtag);
1804 143 : PreventInTransactionBlock(true, cmdtag);
1805 143 : SendBaseBackup((BaseBackupCmd *) cmd_node);
1806 117 : EndReplicationCommand(cmdtag);
1807 GIC 117 : break;
1808 ECB :
1809 CBC 344 : case T_CreateReplicationSlotCmd:
1810 344 : cmdtag = "CREATE_REPLICATION_SLOT";
1811 344 : set_ps_display(cmdtag);
1812 344 : CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1813 343 : EndReplicationCommand(cmdtag);
1814 GIC 343 : break;
1815 ECB :
1816 GIC 192 : case T_DropReplicationSlotCmd:
1817 CBC 192 : cmdtag = "DROP_REPLICATION_SLOT";
1818 GIC 192 : set_ps_display(cmdtag);
1819 CBC 192 : DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1820 192 : EndReplicationCommand(cmdtag);
1821 192 : break;
1822 :
1823 493 : case T_StartReplicationCmd:
1824 ECB : {
1825 GIC 493 : StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
1826 ECB :
1827 GIC 493 : cmdtag = "START_REPLICATION";
1828 493 : set_ps_display(cmdtag);
1829 CBC 493 : PreventInTransactionBlock(true, cmdtag);
1830 :
1831 493 : if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1832 195 : StartReplication(cmd);
1833 : else
1834 GIC 298 : StartLogicalReplication(cmd);
1835 ECB :
1836 : /* dupe, but necessary per libpqrcv_endstreaming */
1837 CBC 264 : EndReplicationCommand(cmdtag);
1838 ECB :
1839 CBC 264 : Assert(xlogreader != NULL);
1840 264 : break;
1841 ECB : }
1842 :
1843 CBC 13 : case T_TimeLineHistoryCmd:
1844 GIC 13 : cmdtag = "TIMELINE_HISTORY";
1845 CBC 13 : set_ps_display(cmdtag);
1846 13 : PreventInTransactionBlock(true, cmdtag);
1847 GIC 13 : SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1848 CBC 13 : EndReplicationCommand(cmdtag);
1849 13 : break;
1850 :
1851 GIC 449 : case T_VariableShowStmt:
1852 ECB : {
1853 CBC 449 : DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1854 449 : VariableShowStmt *n = (VariableShowStmt *) cmd_node;
1855 ECB :
1856 GIC 449 : cmdtag = "SHOW";
1857 CBC 449 : set_ps_display(cmdtag);
1858 :
1859 EUB : /* syscache access needs a transaction environment */
1860 GBC 449 : StartTransactionCommand();
1861 GIC 449 : GetPGVariable(n->name, dest);
1862 449 : CommitTransactionCommand();
1863 449 : EndReplicationCommand(cmdtag);
1864 : }
1865 CBC 449 : break;
1866 ECB :
1867 UIC 0 : default:
1868 0 : elog(ERROR, "unrecognized replication command node tag: %u",
1869 : cmd_node->type);
1870 : }
1871 :
1872 : /* done */
1873 CBC 1905 : MemoryContextSwitchTo(old_context);
1874 GIC 1905 : MemoryContextDelete(cmd_context);
1875 ECB :
1876 : /*
1877 : * We need not update ps display or pg_stat_activity, because PostgresMain
1878 : * will reset those to "idle". But we must reset debug_query_string to
1879 : * ensure it doesn't become a dangling pointer.
1880 : */
1881 GIC 1905 : debug_query_string = NULL;
1882 :
1883 CBC 1905 : return true;
1884 : }
1885 :
1886 : /*
1887 : * Process any incoming messages while streaming. Also checks if the remote
1888 ECB : * end has closed the connection.
1889 : */
1890 : static void
1891 GIC 954113 : ProcessRepliesIfAny(void)
1892 : {
1893 : unsigned char firstchar;
1894 : int maxmsglen;
1895 : int r;
1896 954113 : bool received = false;
1897 ECB :
1898 GIC 954113 : last_processing = GetCurrentTimestamp();
1899 ECB :
1900 : /*
1901 : * If we already received a CopyDone from the frontend, any subsequent
1902 : * message is the beginning of a new command, and should be processed in
1903 : * the main processing loop.
1904 : */
1905 GIC 1045647 : while (!streamingDoneReceiving)
1906 : {
1907 CBC 1045124 : pq_startmsgread();
1908 GIC 1045124 : r = pq_getbyte_if_available(&firstchar);
1909 CBC 1045124 : if (r < 0)
1910 : {
1911 : /* unexpected error or EOF */
1912 16 : ereport(COMMERROR,
1913 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1914 : errmsg("unexpected EOF on standby connection")));
1915 GIC 16 : proc_exit(0);
1916 : }
1917 CBC 1045108 : if (r == 0)
1918 : {
1919 ECB : /* no data available without blocking */
1920 CBC 953418 : pq_endmsgread();
1921 953418 : break;
1922 ECB : }
1923 :
1924 : /* Validate message type and set packet size limit */
1925 CBC 91690 : switch (firstchar)
1926 EUB : {
1927 GBC 91269 : case 'd':
1928 GIC 91269 : maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1929 91269 : break;
1930 421 : case 'c':
1931 : case 'X':
1932 421 : maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1933 421 : break;
1934 UIC 0 : default:
1935 0 : ereport(FATAL,
1936 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1937 : errmsg("invalid standby message type \"%c\"",
1938 : firstchar)));
1939 EUB : maxmsglen = 0; /* keep compiler quiet */
1940 : break;
1941 : }
1942 :
1943 : /* Read the message contents */
1944 GIC 91690 : resetStringInfo(&reply_message);
1945 91690 : if (pq_getmessage(&reply_message, maxmsglen))
1946 ECB : {
1947 UIC 0 : ereport(COMMERROR,
1948 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1949 : errmsg("unexpected EOF on standby connection")));
1950 0 : proc_exit(0);
1951 ECB : }
1952 :
1953 : /* ... and process it */
1954 CBC 91690 : switch (firstchar)
1955 : {
1956 : /*
1957 : * 'd' means a standby reply wrapped in a CopyData packet.
1958 : */
1959 GIC 91269 : case 'd':
1960 CBC 91269 : ProcessStandbyMessage();
1961 91269 : received = true;
1962 GIC 91269 : break;
1963 ECB :
1964 : /*
1965 : * CopyDone means the standby requested to finish streaming.
1966 : * Reply with CopyDone, if we had not sent that already.
1967 : */
1968 CBC 265 : case 'c':
1969 265 : if (!streamingDoneSending)
1970 : {
1971 GIC 252 : pq_putmessage_noblock('c', NULL, 0);
1972 252 : streamingDoneSending = true;
1973 : }
1974 ECB :
1975 CBC 265 : streamingDoneReceiving = true;
1976 GIC 265 : received = true;
1977 GBC 265 : break;
1978 EUB :
1979 : /*
1980 : * 'X' means that the standby is closing down the socket.
1981 : */
1982 GIC 156 : case 'X':
1983 156 : proc_exit(0);
1984 :
1985 LBC 0 : default:
1986 UIC 0 : Assert(false); /* NOT REACHED */
1987 ECB : }
1988 : }
1989 :
1990 : /*
1991 : * Save the last reply timestamp if we've received at least one reply.
1992 : */
1993 GIC 953941 : if (received)
1994 : {
1995 34263 : last_reply_timestamp = last_processing;
1996 CBC 34263 : waiting_for_ping_response = false;
1997 : }
1998 GIC 953941 : }
1999 :
2000 : /*
2001 : * Process a status update message received from standby.
2002 : */
2003 ECB : static void
2004 GIC 91269 : ProcessStandbyMessage(void)
2005 ECB : {
2006 : char msgtype;
2007 :
2008 : /*
2009 : * Check message type from the first byte.
2010 : */
2011 CBC 91269 : msgtype = pq_getmsgbyte(&reply_message);
2012 ECB :
2013 CBC 91269 : switch (msgtype)
2014 : {
2015 GBC 91163 : case 'r':
2016 91163 : ProcessStandbyReplyMessage();
2017 GIC 91163 : break;
2018 :
2019 GBC 106 : case 'h':
2020 GIC 106 : ProcessStandbyHSFeedbackMessage();
2021 CBC 106 : break;
2022 :
2023 UIC 0 : default:
2024 0 : ereport(COMMERROR,
2025 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2026 : errmsg("unexpected message type \"%c\"", msgtype)));
2027 LBC 0 : proc_exit(0);
2028 : }
2029 CBC 91269 : }
2030 ECB :
2031 : /*
2032 : * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
2033 : */
2034 : static void
2035 GIC 46270 : PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
2036 ECB : {
2037 CBC 46270 : bool changed = false;
2038 GIC 46270 : ReplicationSlot *slot = MyReplicationSlot;
2039 ECB :
2040 GIC 46270 : Assert(lsn != InvalidXLogRecPtr);
2041 CBC 46270 : SpinLockAcquire(&slot->mutex);
2042 GIC 46270 : if (slot->data.restart_lsn != lsn)
2043 ECB : {
2044 CBC 17531 : changed = true;
2045 GIC 17531 : slot->data.restart_lsn = lsn;
2046 : }
2047 46270 : SpinLockRelease(&slot->mutex);
2048 :
2049 46270 : if (changed)
2050 : {
2051 17531 : ReplicationSlotMarkDirty();
2052 17531 : ReplicationSlotsComputeRequiredLSN();
2053 ECB : }
2054 :
2055 : /*
2056 : * One could argue that the slot should be saved to disk now, but that'd
2057 : * be energy wasted - the worst thing lost information could cause here is
2058 : * to give wrong information in a statistics view - we'll just potentially
2059 : * be more conservative in removing files.
2060 : */
2061 GIC 46270 : }
2062 :
2063 : /*
2064 : * Regular reply from standby advising of WAL locations on standby server.
2065 : */
2066 : static void
2067 91163 : ProcessStandbyReplyMessage(void)
2068 : {
2069 : XLogRecPtr writePtr,
2070 : flushPtr,
2071 : applyPtr;
2072 : bool replyRequested;
2073 : TimeOffset writeLag,
2074 : flushLag,
2075 ECB : applyLag;
2076 : bool clearLagTimes;
2077 : TimestampTz now;
2078 : TimestampTz replyTime;
2079 :
2080 : static bool fullyAppliedLastTime = false;
2081 :
2082 : /* the caller already consumed the msgtype byte */
2083 GIC 91163 : writePtr = pq_getmsgint64(&reply_message);
2084 91163 : flushPtr = pq_getmsgint64(&reply_message);
2085 91163 : applyPtr = pq_getmsgint64(&reply_message);
2086 CBC 91163 : replyTime = pq_getmsgint64(&reply_message);
2087 GIC 91163 : replyRequested = pq_getmsgbyte(&reply_message);
2088 ECB :
2089 GIC 91163 : if (message_level_is_interesting(DEBUG2))
2090 : {
2091 : char *replyTimeStr;
2092 :
2093 : /* Copy because timestamptz_to_str returns a static buffer */
2094 1220 : replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2095 ECB :
2096 GIC 1220 : elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2097 : LSN_FORMAT_ARGS(writePtr),
2098 : LSN_FORMAT_ARGS(flushPtr),
2099 ECB : LSN_FORMAT_ARGS(applyPtr),
2100 : replyRequested ? " (reply requested)" : "",
2101 : replyTimeStr);
2102 :
2103 GIC 1220 : pfree(replyTimeStr);
2104 : }
2105 :
2106 : /* See if we can compute the round-trip lag for these positions. */
2107 91163 : now = GetCurrentTimestamp();
2108 91163 : writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2109 91163 : flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2110 91163 : applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2111 :
2112 ECB : /*
2113 : * If the standby reports that it has fully replayed the WAL in two
2114 : * consecutive reply messages, then the second such message must result
2115 : * from wal_receiver_status_interval expiring on the standby. This is a
2116 : * convenient time to forget the lag times measured when it last
2117 : * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2118 : * until more WAL traffic arrives.
2119 : */
2120 CBC 91163 : clearLagTimes = false;
2121 GIC 91163 : if (applyPtr == sentPtr)
2122 : {
2123 CBC 12132 : if (fullyAppliedLastTime)
2124 GBC 810 : clearLagTimes = true;
2125 GIC 12132 : fullyAppliedLastTime = true;
2126 : }
2127 : else
2128 79031 : fullyAppliedLastTime = false;
2129 :
2130 : /* Send a reply if the standby requested one. */
2131 CBC 91163 : if (replyRequested)
2132 UIC 0 : WalSndKeepalive(false, InvalidXLogRecPtr);
2133 ECB :
2134 : /*
2135 : * Update shared state for this WalSender process based on reply data from
2136 : * standby.
2137 : */
2138 : {
2139 CBC 91163 : WalSnd *walsnd = MyWalSnd;
2140 ECB :
2141 CBC 91163 : SpinLockAcquire(&walsnd->mutex);
2142 91163 : walsnd->write = writePtr;
2143 91163 : walsnd->flush = flushPtr;
2144 91163 : walsnd->apply = applyPtr;
2145 GIC 91163 : if (writeLag != -1 || clearLagTimes)
2146 20487 : walsnd->writeLag = writeLag;
2147 CBC 91163 : if (flushLag != -1 || clearLagTimes)
2148 37925 : walsnd->flushLag = flushLag;
2149 GIC 91163 : if (applyLag != -1 || clearLagTimes)
2150 48879 : walsnd->applyLag = applyLag;
2151 91163 : walsnd->replyTime = replyTime;
2152 91163 : SpinLockRelease(&walsnd->mutex);
2153 ECB : }
2154 :
2155 CBC 91163 : if (!am_cascading_walsender)
2156 90688 : SyncRepReleaseWaiters();
2157 :
2158 ECB : /*
2159 : * Advance our local xmin horizon when the client confirmed a flush.
2160 : */
2161 GIC 91163 : if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2162 : {
2163 89528 : if (SlotIsLogical(MyReplicationSlot))
2164 CBC 43258 : LogicalConfirmReceivedLocation(flushPtr);
2165 : else
2166 46270 : PhysicalConfirmReceivedLocation(flushPtr);
2167 ECB : }
2168 GIC 91163 : }
2169 ECB :
2170 : /* compute new replication slot xmin horizon if needed */
2171 : static void
2172 GIC 40 : PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
2173 : {
2174 40 : bool changed = false;
2175 40 : ReplicationSlot *slot = MyReplicationSlot;
2176 :
2177 CBC 40 : SpinLockAcquire(&slot->mutex);
2178 40 : MyProc->xmin = InvalidTransactionId;
2179 ECB :
2180 : /*
2181 : * For physical replication we don't need the interlock provided by xmin
2182 : * and effective_xmin since the consequences of a missed increase are
2183 : * limited to query cancellations, so set both at once.
2184 : */
2185 CBC 40 : if (!TransactionIdIsNormal(slot->data.xmin) ||
2186 14 : !TransactionIdIsNormal(feedbackXmin) ||
2187 14 : TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2188 : {
2189 31 : changed = true;
2190 31 : slot->data.xmin = feedbackXmin;
2191 31 : slot->effective_xmin = feedbackXmin;
2192 : }
2193 40 : if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2194 GIC 1 : !TransactionIdIsNormal(feedbackCatalogXmin) ||
2195 CBC 1 : TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2196 : {
2197 39 : changed = true;
2198 39 : slot->data.catalog_xmin = feedbackCatalogXmin;
2199 GIC 39 : slot->effective_catalog_xmin = feedbackCatalogXmin;
2200 ECB : }
2201 GIC 40 : SpinLockRelease(&slot->mutex);
2202 :
2203 40 : if (changed)
2204 : {
2205 39 : ReplicationSlotMarkDirty();
2206 39 : ReplicationSlotsComputeRequiredXmin(false);
2207 : }
2208 40 : }
2209 :
2210 : /*
2211 : * Check that the provided xmin/epoch are sane, that is, not in the future
2212 : * and not so far back as to be already wrapped around.
2213 ECB : *
2214 : * Epoch of nextXid should be same as standby, or if the counter has
2215 : * wrapped, then one greater than standby.
2216 : *
2217 : * This check doesn't care about whether clog exists for these xids
2218 : * at all.
2219 : */
2220 : static bool
2221 CBC 22 : TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
2222 : {
2223 ECB : FullTransactionId nextFullXid;
2224 : TransactionId nextXid;
2225 : uint32 nextEpoch;
2226 EUB :
2227 GIC 22 : nextFullXid = ReadNextFullTransactionId();
2228 22 : nextXid = XidFromFullTransactionId(nextFullXid);
2229 22 : nextEpoch = EpochFromFullTransactionId(nextFullXid);
2230 EUB :
2231 GBC 22 : if (xid <= nextXid)
2232 : {
2233 GIC 22 : if (epoch != nextEpoch)
2234 LBC 0 : return false;
2235 EUB : }
2236 : else
2237 ECB : {
2238 UIC 0 : if (epoch + 1 != nextEpoch)
2239 0 : return false;
2240 : }
2241 :
2242 GIC 22 : if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2243 UIC 0 : return false; /* epoch OK, but it's wrapped around */
2244 ECB :
2245 GIC 22 : return true;
2246 : }
2247 :
2248 : /*
2249 : * Hot Standby feedback
2250 : */
2251 : static void
2252 106 : ProcessStandbyHSFeedbackMessage(void)
2253 : {
2254 : TransactionId feedbackXmin;
2255 : uint32 feedbackEpoch;
2256 : TransactionId feedbackCatalogXmin;
2257 ECB : uint32 feedbackCatalogEpoch;
2258 : TimestampTz replyTime;
2259 :
2260 : /*
2261 : * Decipher the reply message. The caller already consumed the msgtype
2262 : * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2263 : * of this message.
2264 : */
2265 GIC 106 : replyTime = pq_getmsgint64(&reply_message);
2266 106 : feedbackXmin = pq_getmsgint(&reply_message, 4);
2267 106 : feedbackEpoch = pq_getmsgint(&reply_message, 4);
2268 CBC 106 : feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2269 GIC 106 : feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
2270 ECB :
2271 GIC 106 : if (message_level_is_interesting(DEBUG2))
2272 : {
2273 : char *replyTimeStr;
2274 :
2275 : /* Copy because timestamptz_to_str returns a static buffer */
2276 15 : replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
2277 ECB :
2278 GIC 15 : elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2279 : feedbackXmin,
2280 : feedbackEpoch,
2281 : feedbackCatalogXmin,
2282 : feedbackCatalogEpoch,
2283 : replyTimeStr);
2284 :
2285 CBC 15 : pfree(replyTimeStr);
2286 : }
2287 ECB :
2288 : /*
2289 : * Update shared state for this WalSender process based on reply data from
2290 : * standby.
2291 : */
2292 : {
2293 GIC 106 : WalSnd *walsnd = MyWalSnd;
2294 :
2295 106 : SpinLockAcquire(&walsnd->mutex);
2296 CBC 106 : walsnd->replyTime = replyTime;
2297 106 : SpinLockRelease(&walsnd->mutex);
2298 : }
2299 ECB :
2300 : /*
2301 : * Unset WalSender's xmins if the feedback message values are invalid.
2302 : * This happens when the downstream turned hot_standby_feedback off.
2303 : */
2304 GIC 106 : if (!TransactionIdIsNormal(feedbackXmin)
2305 87 : && !TransactionIdIsNormal(feedbackCatalogXmin))
2306 : {
2307 87 : MyProc->xmin = InvalidTransactionId;
2308 87 : if (MyReplicationSlot != NULL)
2309 CBC 21 : PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2310 87 : return;
2311 EUB : }
2312 :
2313 ECB : /*
2314 : * Check that the provided xmin/epoch are sane, that is, not in the future
2315 EUB : * and not so far back as to be already wrapped around. Ignore if not.
2316 : */
2317 GIC 19 : if (TransactionIdIsNormal(feedbackXmin) &&
2318 19 : !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2319 UIC 0 : return;
2320 :
2321 GIC 19 : if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2322 3 : !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2323 UIC 0 : return;
2324 :
2325 : /*
2326 : * Set the WalSender's xmin equal to the standby's requested xmin, so that
2327 : * the xmin will be taken into account by GetSnapshotData() /
2328 : * ComputeXidHorizons(). This will hold back the removal of dead rows and
2329 : * thereby prevent the generation of cleanup conflicts on the standby
2330 : * server.
2331 : *
2332 : * There is a small window for a race condition here: although we just
2333 : * checked that feedbackXmin precedes nextXid, the nextXid could have
2334 : * gotten advanced between our fetching it and applying the xmin below,
2335 : * perhaps far enough to make feedbackXmin wrap around. In that case the
2336 : * xmin we set here would be "in the future" and have no effect. No point
2337 : * in worrying about this since it's too late to save the desired data
2338 : * anyway. Assuming that the standby sends us an increasing sequence of
2339 : * xmins, this could only happen during the first reply cycle, else our
2340 : * own xmin would prevent nextXid from advancing so far.
2341 : *
2342 : * We don't bother taking the ProcArrayLock here. Setting the xmin field
2343 : * is assumed atomic, and there's no real need to prevent concurrent
2344 : * horizon determinations. (If we're moving our xmin forward, this is
2345 : * obviously safe, and if we're moving it backwards, well, the data is at
2346 : * risk already since a VACUUM could already have determined the horizon.)
2347 : *
2348 ECB : * If we're using a replication slot we reserve the xmin via that,
2349 : * otherwise via the walsender's PGPROC entry. We can only track the
2350 : * catalog xmin separately when using a slot, so we store the least of the
2351 : * two provided when not using a slot.
2352 EUB : *
2353 : * XXX: It might make sense to generalize the ephemeral slot concept and
2354 : * always use the slot mechanism to handle the feedback xmin.
2355 : */
2356 GBC 19 : if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2357 GIC 19 : PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2358 : else
2359 : {
2360 UIC 0 : if (TransactionIdIsNormal(feedbackCatalogXmin)
2361 0 : && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
2362 0 : MyProc->xmin = feedbackCatalogXmin;
2363 : else
2364 0 : MyProc->xmin = feedbackXmin;
2365 : }
2366 : }
2367 :
2368 ECB : /*
2369 : * Compute how long send/receive loops should sleep.
2370 : *
2371 : * If wal_sender_timeout is enabled we want to wake up in time to send
2372 : * keepalives and to abort the connection if wal_sender_timeout has been
2373 : * reached.
2374 : */
2375 : static long
2376 GIC 55918 : WalSndComputeSleeptime(TimestampTz now)
2377 : {
2378 55918 : long sleeptime = 10000; /* 10 s */
2379 :
2380 CBC 55918 : if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2381 : {
2382 : TimestampTz wakeup_time;
2383 :
2384 : /*
2385 : * At the latest stop sleeping once wal_sender_timeout has been
2386 : * reached.
2387 : */
2388 55894 : wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2389 ECB : wal_sender_timeout);
2390 :
2391 : /*
2392 : * If no ping has been sent yet, wakeup when it's time to do so.
2393 : * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2394 : * the timeout passed without a response.
2395 : */
2396 CBC 55894 : if (!waiting_for_ping_response)
2397 GIC 55892 : wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2398 : wal_sender_timeout / 2);
2399 :
2400 : /* Compute relative time until wakeup. */
2401 55894 : sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2402 : }
2403 :
2404 55918 : return sleeptime;
2405 : }
2406 :
2407 : /*
2408 : * Check whether there have been responses by the client within
2409 : * wal_sender_timeout and shutdown if not. Using last_processing as the
2410 : * reference point avoids counting server-side stalls against the client.
2411 : * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
2412 ECB : * postdate last_processing by more than wal_sender_timeout. If that happens,
2413 : * the client must reply almost immediately to avoid a timeout. This rarely
2414 : * affects the default configuration, under which clients spontaneously send a
2415 : * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2416 : * could eliminate that problem by recognizing timeout expiration at
2417 : * wal_sender_timeout/2 after the keepalive.
2418 : */
2419 : static void
2420 CBC 951815 : WalSndCheckTimeOut(void)
2421 : {
2422 : TimestampTz timeout;
2423 ECB :
2424 : /* don't bail out if we're doing something that doesn't require timeouts */
2425 GIC 951815 : if (last_reply_timestamp <= 0)
2426 24 : return;
2427 :
2428 951791 : timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2429 : wal_sender_timeout);
2430 EUB :
2431 GIC 951791 : if (wal_sender_timeout > 0 && last_processing >= timeout)
2432 : {
2433 EUB : /*
2434 : * Since typically expiration of replication timeout means
2435 : * communication problem, we don't send the error message to the
2436 : * standby.
2437 : */
2438 UIC 0 : ereport(COMMERROR,
2439 ECB : (errmsg("terminating walsender process due to replication timeout")));
2440 :
2441 UIC 0 : WalSndShutdown();
2442 : }
2443 : }
2444 :
2445 ECB : /* Main loop of walsender process that streams the WAL over Copy messages. */
2446 : static void
2447 GIC 483 : WalSndLoop(WalSndSendDataCallback send_data)
2448 : {
2449 : /*
2450 : * Initialize the last reply timestamp. That enables timeout processing
2451 : * from hereon.
2452 : */
2453 483 : last_reply_timestamp = GetCurrentTimestamp();
2454 483 : waiting_for_ping_response = false;
2455 ECB :
2456 : /*
2457 : * Loop until we reach the end of this timeline or the client requests to
2458 : * stop streaming.
2459 : */
2460 : for (;;)
2461 : {
2462 : /* Clear any already-pending wakeups */
2463 CBC 949969 : ResetLatch(MyLatch);
2464 ECB :
2465 GIC 949969 : CHECK_FOR_INTERRUPTS();
2466 :
2467 : /* Process any requests or signals received recently */
2468 CBC 949966 : if (ConfigReloadPending)
2469 : {
2470 GIC 23 : ConfigReloadPending = false;
2471 23 : ProcessConfigFile(PGC_SIGHUP);
2472 23 : SyncRepInitConfig();
2473 : }
2474 :
2475 ECB : /* Check for input from the client */
2476 CBC 949966 : ProcessRepliesIfAny();
2477 ECB :
2478 : /*
2479 : * If we have received CopyDone from the client, sent CopyDone
2480 : * ourselves, and the output buffer is empty, it's time to exit
2481 : * streaming.
2482 : */
2483 GIC 949910 : if (streamingDoneReceiving && streamingDoneSending &&
2484 427 : !pq_is_send_pending())
2485 CBC 264 : break;
2486 ECB :
2487 : /*
2488 : * If we don't have any pending data in the output buffer, try to send
2489 : * some more. If there is some, we don't bother to call send_data
2490 : * again until we've flushed it ... but we'd better assume we are not
2491 : * caught up.
2492 EUB : */
2493 GIC 949646 : if (!pq_is_send_pending())
2494 945363 : send_data();
2495 ECB : else
2496 GIC 4283 : WalSndCaughtUp = false;
2497 :
2498 : /* Try to flush pending output to the client */
2499 949513 : if (pq_flush_if_writable() != 0)
2500 UIC 0 : WalSndShutdown();
2501 :
2502 : /* If nothing remains to be sent right now ... */
2503 GIC 949513 : if (WalSndCaughtUp && !pq_is_send_pending())
2504 : {
2505 ECB : /*
2506 : * If we're in catchup state, move to streaming. This is an
2507 : * important state change for users to know about, since before
2508 : * this point data loss might occur if the primary dies and we
2509 : * need to failover to the standby. The state change is also
2510 : * important for synchronous replication, since commits that
2511 : * started to wait at that point might wait for some time.
2512 : */
2513 GIC 107775 : if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2514 : {
2515 414 : ereport(DEBUG1,
2516 : (errmsg_internal("\"%s\" has now caught up with upstream server",
2517 : application_name)));
2518 414 : WalSndSetState(WALSNDSTATE_STREAMING);
2519 : }
2520 ECB :
2521 : /*
2522 : * When SIGUSR2 arrives, we send any outstanding logs up to the
2523 : * shutdown checkpoint record (i.e., the latest record), wait for
2524 : * them to be replicated to the standby, and exit. This may be a
2525 : * normal termination at shutdown, or a promotion, the walsender
2526 : * is not sure which.
2527 : */
2528 CBC 107775 : if (got_SIGUSR2)
2529 GIC 77 : WalSndDone(send_data);
2530 : }
2531 :
2532 : /* Check for replication timeout. */
2533 949486 : WalSndCheckTimeOut();
2534 :
2535 : /* Send keepalive if the time has come */
2536 CBC 949486 : WalSndKeepaliveIfNecessary();
2537 ECB :
2538 : /*
2539 : * Block if we have unsent data. XXX For logical replication, let
2540 : * WalSndWaitForWal() handle any other blocking; idle receivers need
2541 : * its additional actions. For physical replication, also block if
2542 : * caught up; its send_data does not block.
2543 : */
2544 CBC 949486 : if ((WalSndCaughtUp && send_data != XLogSendLogical &&
2545 GIC 1005837 : !streamingDoneSending) ||
2546 CBC 899890 : pq_is_send_pending())
2547 : {
2548 : long sleeptime;
2549 : int wakeEvents;
2550 :
2551 GIC 53695 : if (!streamingDoneReceiving)
2552 CBC 53688 : wakeEvents = WL_SOCKET_READABLE;
2553 : else
2554 7 : wakeEvents = 0;
2555 ECB :
2556 : /*
2557 : * Use fresh timestamp, not last_processing, to reduce the chance
2558 : * of reaching wal_sender_timeout before sending a keepalive.
2559 : */
2560 GIC 53695 : sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
2561 ECB :
2562 GIC 53695 : if (pq_is_send_pending())
2563 4177 : wakeEvents |= WL_SOCKET_WRITEABLE;
2564 :
2565 ECB : /* Sleep until something happens or we time out */
2566 GIC 53695 : WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2567 : }
2568 : }
2569 264 : }
2570 :
2571 : /* Initialize a per-walsender data structure for this walsender process */
2572 : static void
2573 CBC 831 : InitWalSenderSlot(void)
2574 ECB : {
2575 : int i;
2576 :
2577 : /*
2578 : * WalSndCtl should be set up already (we inherit this by fork() or
2579 : * EXEC_BACKEND mechanism from the postmaster).
2580 : */
2581 GIC 831 : Assert(WalSndCtl != NULL);
2582 CBC 831 : Assert(MyWalSnd == NULL);
2583 :
2584 ECB : /*
2585 : * Find a free walsender slot and reserve it. This must not fail due to
2586 : * the prior check for free WAL senders in InitProcess().
2587 : */
2588 CBC 1226 : for (i = 0; i < max_wal_senders; i++)
2589 ECB : {
2590 GIC 1226 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
2591 :
2592 1226 : SpinLockAcquire(&walsnd->mutex);
2593 :
2594 1226 : if (walsnd->pid != 0)
2595 : {
2596 CBC 395 : SpinLockRelease(&walsnd->mutex);
2597 395 : continue;
2598 ECB : }
2599 : else
2600 : {
2601 : /*
2602 : * Found a free slot. Reserve it for us.
2603 : */
2604 CBC 831 : walsnd->pid = MyProcPid;
2605 831 : walsnd->state = WALSNDSTATE_STARTUP;
2606 831 : walsnd->sentPtr = InvalidXLogRecPtr;
2607 831 : walsnd->needreload = false;
2608 831 : walsnd->write = InvalidXLogRecPtr;
2609 GIC 831 : walsnd->flush = InvalidXLogRecPtr;
2610 831 : walsnd->apply = InvalidXLogRecPtr;
2611 831 : walsnd->writeLag = -1;
2612 831 : walsnd->flushLag = -1;
2613 831 : walsnd->applyLag = -1;
2614 831 : walsnd->sync_standby_priority = 0;
2615 831 : walsnd->latch = &MyProc->procLatch;
2616 831 : walsnd->replyTime = 0;
2617 :
2618 : /*
2619 : * The kind assignment is done here and not in StartReplication()
2620 : * and StartLogicalReplication(). Indeed, the logical walsender
2621 : * needs to read WAL records (like snapshot of running
2622 : * transactions) during the slot creation. So it needs to be woken
2623 : * up based on its kind.
2624 : *
2625 : * The kind assignment could also be done in StartReplication(),
2626 : * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2627 : * seems better to set it on one place.
2628 : */
2629 GNC 831 : if (MyDatabaseId == InvalidOid)
2630 349 : walsnd->kind = REPLICATION_KIND_PHYSICAL;
2631 : else
2632 482 : walsnd->kind = REPLICATION_KIND_LOGICAL;
2633 :
2634 GIC 831 : SpinLockRelease(&walsnd->mutex);
2635 : /* don't need the lock anymore */
2636 831 : MyWalSnd = (WalSnd *) walsnd;
2637 :
2638 CBC 831 : break;
2639 ECB : }
2640 : }
2641 :
2642 GIC 831 : Assert(MyWalSnd != NULL);
2643 ECB :
2644 : /* Arrange to clean up at walsender exit */
2645 CBC 831 : on_shmem_exit(WalSndKill, 0);
2646 GIC 831 : }
2647 ECB :
2648 : /* Destroy the per-walsender data structure for this walsender process */
2649 : static void
2650 GIC 831 : WalSndKill(int code, Datum arg)
2651 ECB : {
2652 GIC 831 : WalSnd *walsnd = MyWalSnd;
2653 :
2654 CBC 831 : Assert(walsnd != NULL);
2655 ECB :
2656 GIC 831 : MyWalSnd = NULL;
2657 :
2658 831 : SpinLockAcquire(&walsnd->mutex);
2659 ECB : /* clear latch while holding the spinlock, so it can safely be read */
2660 GIC 831 : walsnd->latch = NULL;
2661 ECB : /* Mark WalSnd struct as no longer being in use. */
2662 GIC 831 : walsnd->pid = 0;
2663 CBC 831 : SpinLockRelease(&walsnd->mutex);
2664 GIC 831 : }
2665 ECB :
2666 : /* XLogReaderRoutine->segment_open callback */
2667 : static void
2668 GIC 820 : WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
2669 ECB : TimeLineID *tli_p)
2670 : {
2671 : char path[MAXPGPATH];
2672 :
2673 : /*-------
2674 : * When reading from a historic timeline, and there is a timeline switch
2675 : * within this segment, read from the WAL segment belonging to the new
2676 : * timeline.
2677 : *
2678 : * For example, imagine that this server is currently on timeline 5, and
2679 : * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2680 : * 0/13002088. In pg_wal, we have these files:
2681 : *
2682 : * ...
2683 : * 000000040000000000000012
2684 : * 000000040000000000000013
2685 : * 000000050000000000000013
2686 : * 000000050000000000000014
2687 : * ...
2688 : *
2689 : * In this situation, when requested to send the WAL from segment 0x13, on
2690 : * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2691 : * recovery prefers files from newer timelines, so if the segment was
2692 : * restored from the archive on this server, the file belonging to the old
2693 : * timeline, 000000040000000000000013, might not exist. Their contents are
2694 : * equal up to the switchpoint, because at a timeline switch, the used
2695 : * portion of the old segment is copied to the new file. -------
2696 : */
2697 GIC 820 : *tli_p = sendTimeLine;
2698 820 : if (sendTimeLineIsHistoric)
2699 : {
2700 : XLogSegNo endSegNo;
2701 :
2702 12 : XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
2703 12 : if (nextSegNo == endSegNo)
2704 10 : *tli_p = sendTimeLineNextTLI;
2705 : }
2706 ECB :
2707 CBC 820 : XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
2708 GIC 820 : state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2709 820 : if (state->seg.ws_file >= 0)
2710 819 : return;
2711 ECB :
2712 : /*
2713 : * If the file is not found, assume it's because the standby asked for a
2714 : * too old WAL segment that has already been removed or recycled.
2715 : */
2716 CBC 1 : if (errno == ENOENT)
2717 ECB : {
2718 : char xlogfname[MAXFNAMELEN];
2719 CBC 1 : int save_errno = errno;
2720 :
2721 GIC 1 : XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2722 1 : errno = save_errno;
2723 1 : ereport(ERROR,
2724 : (errcode_for_file_access(),
2725 ECB : errmsg("requested WAL segment %s has already been removed",
2726 : xlogfname)));
2727 : }
2728 : else
2729 UIC 0 : ereport(ERROR,
2730 ECB : (errcode_for_file_access(),
2731 : errmsg("could not open file \"%s\": %m",
2732 : path)));
2733 : }
2734 :
2735 : /*
2736 : * Send out the WAL in its normal physical/stored form.
2737 : *
2738 EUB : * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2739 : * but not yet sent to the client, and buffer it in the libpq output
2740 : * buffer.
2741 : *
2742 : * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2743 : * otherwise WalSndCaughtUp is set to false.
2744 : */
2745 : static void
2746 GIC 115098 : XLogSendPhysical(void)
2747 : {
2748 : XLogRecPtr SendRqstPtr;
2749 : XLogRecPtr startptr;
2750 : XLogRecPtr endptr;
2751 : Size nbytes;
2752 : XLogSegNo segno;
2753 : WALReadError errinfo;
2754 :
2755 ECB : /* If requested switch the WAL sender to the stopping state. */
2756 GIC 115098 : if (got_STOPPING)
2757 353 : WalSndSetState(WALSNDSTATE_STOPPING);
2758 :
2759 115098 : if (streamingDoneSending)
2760 : {
2761 56338 : WalSndCaughtUp = true;
2762 87715 : return;
2763 : }
2764 :
2765 ECB : /* Figure out how far we can safely send the WAL. */
2766 CBC 58760 : if (sendTimeLineIsHistoric)
2767 : {
2768 ECB : /*
2769 : * Streaming an old timeline that's in this server's history, but is
2770 : * not the one we're currently inserting or replaying. It can be
2771 : * streamed up to the point where we switched off that timeline.
2772 : */
2773 GIC 165 : SendRqstPtr = sendTimeLineValidUpto;
2774 : }
2775 CBC 58595 : else if (am_cascading_walsender)
2776 : {
2777 : TimeLineID SendRqstTLI;
2778 :
2779 : /*
2780 : * Streaming the latest timeline on a standby.
2781 : *
2782 ECB : * Attempt to send all WAL that has already been replayed, so that we
2783 : * know it's valid. If we're receiving WAL through streaming
2784 : * replication, it's also OK to send any WAL that has been received
2785 : * but not replayed.
2786 : *
2787 : * The timeline we're recovering from can change, or we can be
2788 : * promoted. In either case, the current timeline becomes historic. We
2789 : * need to detect that so that we don't try to stream past the point
2790 : * where we switched to another timeline. We check for promotion or
2791 : * timeline switch after calculating FlushPtr, to avoid a race
2792 : * condition: if the timeline becomes historic just after we checked
2793 : * that it was still current, it's still be OK to stream it up to the
2794 : * FlushPtr that was calculated before it became historic.
2795 : */
2796 GIC 1146 : bool becameHistoric = false;
2797 :
2798 1146 : SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2799 :
2800 1146 : if (!RecoveryInProgress())
2801 : {
2802 : /* We have been promoted. */
2803 1 : SendRqstTLI = GetWALInsertionTimeLine();
2804 1 : am_cascading_walsender = false;
2805 CBC 1 : becameHistoric = true;
2806 : }
2807 ECB : else
2808 : {
2809 : /*
2810 : * Still a cascading standby. But is the timeline we're sending
2811 : * still the one recovery is recovering from?
2812 : */
2813 CBC 1145 : if (sendTimeLine != SendRqstTLI)
2814 LBC 0 : becameHistoric = true;
2815 : }
2816 :
2817 GIC 1146 : if (becameHistoric)
2818 : {
2819 : /*
2820 : * The timeline we were sending has become historic. Read the
2821 : * timeline history file of the new timeline to see where exactly
2822 ECB : * we forked off from the timeline we were sending.
2823 EUB : */
2824 : List *history;
2825 :
2826 CBC 1 : history = readTimeLineHistory(SendRqstTLI);
2827 GIC 1 : sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2828 :
2829 1 : Assert(sendTimeLine < sendTimeLineNextTLI);
2830 1 : list_free_deep(history);
2831 :
2832 1 : sendTimeLineIsHistoric = true;
2833 :
2834 1 : SendRqstPtr = sendTimeLineValidUpto;
2835 ECB : }
2836 : }
2837 : else
2838 : {
2839 : /*
2840 : * Streaming the current timeline on a primary.
2841 : *
2842 : * Attempt to send all data that's already been written out and
2843 : * fsync'd to disk. We cannot go further than what's been written out
2844 : * given the current implementation of WALRead(). And in any case
2845 : * it's unsafe to send WAL that is not securely down to disk on the
2846 : * primary: if the primary subsequently crashes and restarts, standbys
2847 : * must not have applied any WAL that got lost on the primary.
2848 : */
2849 GIC 57449 : SendRqstPtr = GetFlushRecPtr(NULL);
2850 : }
2851 :
2852 : /*
2853 : * Record the current system time as an approximation of the time at which
2854 : * this WAL location was written for the purposes of lag tracking.
2855 : *
2856 : * In theory we could make XLogFlush() record a time in shmem whenever WAL
2857 : * is flushed and we could get that time as well as the LSN when we call
2858 ECB : * GetFlushRecPtr() above (and likewise for the cascading standby
2859 : * equivalent), but rather than putting any new code into the hot WAL path
2860 : * it seems good enough to capture the time here. We should reach this
2861 : * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2862 : * may take some time, we read the WAL flush pointer and take the time
2863 : * very close to together here so that we'll get a later position if it is
2864 : * still moving.
2865 : *
2866 : * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2867 : * this gives us a cheap approximation for the WAL flush time for this
2868 : * LSN.
2869 : *
2870 : * Note that the LSN is not necessarily the LSN for the data contained in
2871 : * the present message; it's the end of the WAL, which might be further
2872 : * ahead. All the lag tracking machinery cares about is finding out when
2873 : * that arbitrary LSN is eventually reported as written, flushed and
2874 : * applied, so that it can measure the elapsed time.
2875 : */
2876 GIC 58760 : LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2877 :
2878 : /*
2879 : * If this is a historic timeline and we've reached the point where we
2880 : * forked to the next timeline, stop streaming.
2881 : *
2882 : * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2883 : * startup process will normally replay all WAL that has been received
2884 : * from the primary, before promoting, but if the WAL streaming is
2885 ECB : * terminated at a WAL page boundary, the valid portion of the timeline
2886 : * might end in the middle of a WAL record. We might've already sent the
2887 : * first half of that partial WAL record to the cascading standby, so that
2888 : * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2889 : * replay the partial WAL record either, so it can still follow our
2890 : * timeline switch.
2891 : */
2892 GIC 58760 : if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2893 : {
2894 : /* close the current file. */
2895 13 : if (xlogreader->seg.ws_file >= 0)
2896 13 : wal_segment_close(xlogreader);
2897 :
2898 : /* Send CopyDone */
2899 13 : pq_putmessage_noblock('c', NULL, 0);
2900 13 : streamingDoneSending = true;
2901 ECB :
2902 GIC 13 : WalSndCaughtUp = true;
2903 :
2904 CBC 13 : elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
2905 ECB : LSN_FORMAT_ARGS(sendTimeLineValidUpto),
2906 : LSN_FORMAT_ARGS(sentPtr));
2907 GIC 13 : return;
2908 ECB : }
2909 :
2910 : /* Do we have any work to do? */
2911 CBC 58747 : Assert(sentPtr <= SendRqstPtr);
2912 GIC 58747 : if (SendRqstPtr <= sentPtr)
2913 ECB : {
2914 GIC 31364 : WalSndCaughtUp = true;
2915 31364 : return;
2916 ECB : }
2917 :
2918 : /*
2919 : * Figure out how much to send in one message. If there's no more than
2920 : * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
2921 : * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2922 : *
2923 : * The rounding is not only for performance reasons. Walreceiver relies on
2924 : * the fact that we never split a WAL record across two messages. Since a
2925 : * long WAL record is split at page boundary into continuation records,
2926 : * page boundary is always a safe cut-off point. We also assume that
2927 : * SendRqstPtr never points to the middle of a WAL record.
2928 : */
2929 GIC 27383 : startptr = sentPtr;
2930 27383 : endptr = startptr;
2931 27383 : endptr += MAX_SEND_SIZE;
2932 :
2933 : /* if we went beyond SendRqstPtr, back off */
2934 27383 : if (SendRqstPtr <= endptr)
2935 : {
2936 18290 : endptr = SendRqstPtr;
2937 18290 : if (sendTimeLineIsHistoric)
2938 CBC 12 : WalSndCaughtUp = false;
2939 ECB : else
2940 CBC 18278 : WalSndCaughtUp = true;
2941 : }
2942 : else
2943 ECB : {
2944 : /* round down to page boundary. */
2945 CBC 9093 : endptr -= (endptr % XLOG_BLCKSZ);
2946 9093 : WalSndCaughtUp = false;
2947 ECB : }
2948 :
2949 CBC 27383 : nbytes = endptr - startptr;
2950 GIC 27383 : Assert(nbytes <= MAX_SEND_SIZE);
2951 :
2952 : /*
2953 : * OK to read and send the slice.
2954 ECB : */
2955 CBC 27383 : resetStringInfo(&output_message);
2956 GIC 27383 : pq_sendbyte(&output_message, 'w');
2957 :
2958 CBC 27383 : pq_sendint64(&output_message, startptr); /* dataStart */
2959 27383 : pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
2960 GIC 27383 : pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2961 :
2962 : /*
2963 : * Read the log directly into the output buffer to avoid extra memcpy
2964 ECB : * calls.
2965 : */
2966 GIC 27383 : enlargeStringInfo(&output_message, nbytes);
2967 ECB :
2968 CBC 27383 : retry:
2969 27382 : if (!WALRead(xlogreader,
2970 GIC 27383 : &output_message.data[output_message.len],
2971 : startptr,
2972 : nbytes,
2973 27383 : xlogreader->seg.ws_tli, /* Pass the current TLI because
2974 : * only WalSndSegmentOpen controls
2975 ECB : * whether new TLI is needed. */
2976 : &errinfo))
2977 LBC 0 : WALReadRaiseError(&errinfo);
2978 ECB :
2979 : /* See logical_read_xlog_page(). */
2980 GIC 27382 : XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2981 27382 : CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
2982 ECB :
2983 : /*
2984 : * During recovery, the currently-open WAL file might be replaced with the
2985 : * file of the same name retrieved from archive. So we always need to
2986 EUB : * check what we read was valid after reading into the buffer. If it's
2987 : * invalid, we try to open and read the file again.
2988 : */
2989 CBC 27382 : if (am_cascading_walsender)
2990 ECB : {
2991 GIC 871 : WalSnd *walsnd = MyWalSnd;
2992 : bool reload;
2993 :
2994 871 : SpinLockAcquire(&walsnd->mutex);
2995 871 : reload = walsnd->needreload;
2996 871 : walsnd->needreload = false;
2997 871 : SpinLockRelease(&walsnd->mutex);
2998 ECB :
2999 GIC 871 : if (reload && xlogreader->seg.ws_file >= 0)
3000 ECB : {
3001 UIC 0 : wal_segment_close(xlogreader);
3002 :
3003 LBC 0 : goto retry;
3004 ECB : }
3005 : }
3006 :
3007 GIC 27382 : output_message.len += nbytes;
3008 CBC 27382 : output_message.data[output_message.len] = '\0';
3009 :
3010 EUB : /*
3011 : * Fill the send timestamp last, so that it is taken as late as possible.
3012 : */
3013 GIC 27382 : resetStringInfo(&tmpbuf);
3014 27382 : pq_sendint64(&tmpbuf, GetCurrentTimestamp());
3015 27382 : memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3016 CBC 27382 : tmpbuf.data, sizeof(int64));
3017 ECB :
3018 GIC 27382 : pq_putmessage_noblock('d', output_message.data, output_message.len);
3019 :
3020 27382 : sentPtr = endptr;
3021 :
3022 ECB : /* Update shared memory status */
3023 : {
3024 CBC 27382 : WalSnd *walsnd = MyWalSnd;
3025 ECB :
3026 GIC 27382 : SpinLockAcquire(&walsnd->mutex);
3027 CBC 27382 : walsnd->sentPtr = sentPtr;
3028 GIC 27382 : SpinLockRelease(&walsnd->mutex);
3029 ECB : }
3030 :
3031 : /* Report progress of XLOG streaming in PS display */
3032 GIC 27382 : if (update_process_title)
3033 ECB : {
3034 : char activitymsg[50];
3035 :
3036 CBC 27382 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
3037 27382 : LSN_FORMAT_ARGS(sentPtr));
3038 GIC 27382 : set_ps_display(activitymsg);
3039 : }
3040 : }
3041 ECB :
3042 : /*
3043 : * Stream out logically decoded data.
3044 : */
3045 : static void
3046 CBC 830342 : XLogSendLogical(void)
3047 ECB : {
3048 : XLogRecord *record;
3049 : char *errm;
3050 :
3051 : /*
3052 : * We'll use the current flush point to determine whether we've caught up.
3053 : * This variable is static in order to cache it across calls. Caching is
3054 : * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
3055 : * spinlock.
3056 : */
3057 : static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3058 :
3059 : /*
3060 : * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3061 : * true in WalSndWaitForWal, if we're actually waiting. We also set to
3062 : * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3063 : * didn't wait - i.e. when we're shutting down.
3064 : */
3065 GIC 830342 : WalSndCaughtUp = false;
3066 :
3067 830342 : record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3068 :
3069 : /* xlog record was invalid */
3070 830220 : if (errm != NULL)
3071 1 : elog(ERROR, "could not find record while sending logically-decoded data: %s",
3072 : errm);
3073 :
3074 CBC 830219 : if (record != NULL)
3075 : {
3076 ECB : /*
3077 : * Note the lack of any call to LagTrackerWrite() which is handled by
3078 : * WalSndUpdateProgress which is called by output plugin through
3079 : * logical decoding write api.
3080 : */
3081 GIC 830016 : LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
3082 :
3083 CBC 830007 : sentPtr = logical_decoding_ctx->reader->EndRecPtr;
3084 : }
3085 :
3086 : /*
3087 : * If first time through in this session, initialize flushPtr. Otherwise,
3088 : * we only need to update flushPtr if EndRecPtr is past it.
3089 : */
3090 GNC 830210 : if (flushPtr == InvalidXLogRecPtr ||
3091 829974 : logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3092 : {
3093 2035 : if (am_cascading_walsender)
3094 55 : flushPtr = GetStandbyFlushRecPtr(NULL);
3095 : else
3096 1980 : flushPtr = GetFlushRecPtr(NULL);
3097 : }
3098 :
3099 : /* If EndRecPtr is still past our flushPtr, it means we caught up. */
3100 GIC 830210 : if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3101 1259 : WalSndCaughtUp = true;
3102 :
3103 ECB : /*
3104 : * If we're caught up and have been requested to stop, have WalSndLoop()
3105 : * terminate the connection in an orderly manner, after writing out all
3106 : * the pending data.
3107 : */
3108 GIC 830210 : if (WalSndCaughtUp && got_STOPPING)
3109 CBC 108 : got_SIGUSR2 = true;
3110 :
3111 : /* Update shared memory status */
3112 : {
3113 830210 : WalSnd *walsnd = MyWalSnd;
3114 ECB :
3115 GIC 830210 : SpinLockAcquire(&walsnd->mutex);
3116 830210 : walsnd->sentPtr = sentPtr;
3117 830210 : SpinLockRelease(&walsnd->mutex);
3118 : }
3119 830210 : }
3120 :
3121 ECB : /*
3122 : * Shutdown if the sender is caught up.
3123 : *
3124 : * NB: This should only be called when the shutdown signal has been received
3125 : * from postmaster.
3126 : *
3127 : * Note that if we determine that there's still more data to send, this
3128 : * function will return control to the caller.
3129 : */
3130 : static void
3131 GIC 77 : WalSndDone(WalSndSendDataCallback send_data)
3132 ECB : {
3133 : XLogRecPtr replicatedPtr;
3134 :
3135 : /* ... let's just be real sure we're caught up ... */
3136 GIC 77 : send_data();
3137 :
3138 : /*
3139 : * To figure out whether all WAL has successfully been replicated, check
3140 : * flush location if valid, write otherwise. Tools like pg_receivewal will
3141 : * usually (unless in synchronous mode) return an invalid flush location.
3142 : */
3143 154 : replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3144 CBC 77 : MyWalSnd->write : MyWalSnd->flush;
3145 :
3146 GIC 77 : if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3147 27 : !pq_is_send_pending())
3148 : {
3149 ECB : QueryCompletion qc;
3150 :
3151 : /* Inform the standby that XLOG streaming is done */
3152 GIC 27 : SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3153 27 : EndCommand(&qc, DestRemote, false);
3154 27 : pq_flush();
3155 :
3156 CBC 27 : proc_exit(0);
3157 ECB : }
3158 GIC 50 : if (!waiting_for_ping_response)
3159 CBC 2 : WalSndKeepalive(true, InvalidXLogRecPtr);
3160 50 : }
3161 :
3162 : /*
3163 : * Returns the latest point in WAL that has been safely flushed to disk, and
3164 : * can be sent to the standby. This should only be called when in recovery,
3165 ECB : * ie. we're streaming to a cascaded standby.
3166 : *
3167 : * As a side-effect, *tli is updated to the TLI of the last
3168 : * replayed WAL record.
3169 : */
3170 : static XLogRecPtr
3171 CBC 1266 : GetStandbyFlushRecPtr(TimeLineID *tli)
3172 ECB : {
3173 : XLogRecPtr replayPtr;
3174 : TimeLineID replayTLI;
3175 : XLogRecPtr receivePtr;
3176 : TimeLineID receiveTLI;
3177 : XLogRecPtr result;
3178 :
3179 : /*
3180 : * We can safely send what's already been replayed. Also, if walreceiver
3181 : * is streaming WAL from the same timeline, we can send anything that it
3182 : * has streamed, but hasn't been replayed yet.
3183 : */
3184 :
3185 GIC 1266 : receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3186 1266 : replayPtr = GetXLogReplayRecPtr(&replayTLI);
3187 :
3188 GNC 1266 : if (tli)
3189 1211 : *tli = replayTLI;
3190 :
3191 GIC 1266 : result = replayPtr;
3192 1266 : if (receiveTLI == replayTLI && receivePtr > replayPtr)
3193 275 : result = receivePtr;
3194 :
3195 1266 : return result;
3196 : }
3197 :
3198 : /*
3199 ECB : * Request walsenders to reload the currently-open WAL file
3200 : */
3201 : void
3202 CBC 16 : WalSndRqstFileReload(void)
3203 ECB : {
3204 : int i;
3205 :
3206 CBC 158 : for (i = 0; i < max_wal_senders; i++)
3207 ECB : {
3208 GIC 142 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3209 ECB :
3210 GIC 142 : SpinLockAcquire(&walsnd->mutex);
3211 142 : if (walsnd->pid == 0)
3212 : {
3213 142 : SpinLockRelease(&walsnd->mutex);
3214 142 : continue;
3215 : }
3216 LBC 0 : walsnd->needreload = true;
3217 UIC 0 : SpinLockRelease(&walsnd->mutex);
3218 : }
3219 GIC 16 : }
3220 ECB :
3221 : /*
3222 : * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3223 : */
3224 : void
3225 CBC 27 : HandleWalSndInitStopping(void)
3226 : {
3227 27 : Assert(am_walsender);
3228 ECB :
3229 : /*
3230 EUB : * If replication has not yet started, die like with SIGTERM. If
3231 : * replication is active, only set a flag and wake up the main loop. It
3232 : * will send any outstanding WAL, wait for it to be replicated to the
3233 ECB : * standby, and then exit gracefully.
3234 : */
3235 GIC 27 : if (!replication_active)
3236 UIC 0 : kill(MyProcPid, SIGTERM);
3237 : else
3238 GIC 27 : got_STOPPING = true;
3239 CBC 27 : }
3240 :
3241 ECB : /*
3242 : * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3243 : * sender should already have been switched to WALSNDSTATE_STOPPING at
3244 : * this point.
3245 : */
3246 : static void
3247 GIC 23 : WalSndLastCycleHandler(SIGNAL_ARGS)
3248 : {
3249 CBC 23 : int save_errno = errno;
3250 EUB :
3251 GIC 23 : got_SIGUSR2 = true;
3252 CBC 23 : SetLatch(MyLatch);
3253 ECB :
3254 GIC 23 : errno = save_errno;
3255 23 : }
3256 :
3257 : /* Set up signal handlers */
3258 : void
3259 831 : WalSndSignals(void)
3260 : {
3261 ECB : /* Set up signal handlers */
3262 GIC 831 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
3263 CBC 831 : pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3264 GIC 831 : pqsignal(SIGTERM, die); /* request shutdown */
3265 ECB : /* SIGQUIT handler was already set up by InitPostmasterChild */
3266 CBC 831 : InitializeTimeouts(); /* establishes SIGALRM handler */
3267 GIC 831 : pqsignal(SIGPIPE, SIG_IGN);
3268 CBC 831 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
3269 831 : pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3270 : * shutdown */
3271 :
3272 : /* Reset some signals that are accepted by postmaster but not here */
3273 831 : pqsignal(SIGCHLD, SIG_DFL);
3274 GIC 831 : }
3275 :
3276 ECB : /* Report shared-memory space needed by WalSndShmemInit */
3277 : Size
3278 CBC 6390 : WalSndShmemSize(void)
3279 : {
3280 6390 : Size size = 0;
3281 ECB :
3282 CBC 6390 : size = offsetof(WalSndCtlData, walsnds);
3283 6390 : size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3284 :
3285 GIC 6390 : return size;
3286 : }
3287 ECB :
3288 : /* Allocate and initialize walsender-related shared memory */
3289 : void
3290 GIC 1826 : WalSndShmemInit(void)
3291 : {
3292 ECB : bool found;
3293 : int i;
3294 :
3295 GIC 1826 : WalSndCtl = (WalSndCtlData *)
3296 CBC 1826 : ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
3297 ECB :
3298 GIC 1826 : if (!found)
3299 ECB : {
3300 : /* First time through, so initialize */
3301 GIC 5151 : MemSet(WalSndCtl, 0, WalSndShmemSize());
3302 :
3303 7304 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
3304 GNC 5478 : dlist_init(&(WalSndCtl->SyncRepQueue[i]));
3305 :
3306 GIC 18091 : for (i = 0; i < max_wal_senders; i++)
3307 : {
3308 16265 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3309 ECB :
3310 CBC 16265 : SpinLockInit(&walsnd->mutex);
3311 : }
3312 ECB : }
3313 GIC 1826 : }
3314 :
3315 ECB : /*
3316 : * Wake up physical, logical or both kinds of walsenders
3317 : *
3318 : * The distinction between physical and logical walsenders is done, because:
3319 : * - physical walsenders can't send data until it's been flushed
3320 : * - logical walsenders on standby can't decode and send data until it's been
3321 : * applied
3322 : *
3323 : * For cascading replication we need to wake up physical walsenders separately
3324 : * from logical walsenders (see the comment before calling WalSndWakeup() in
3325 : * ApplyWalRecord() for more details).
3326 : *
3327 : * This will be called inside critical sections, so throwing an error is not
3328 : * advisable.
3329 : */
3330 : void
3331 GNC 2627758 : WalSndWakeup(bool physical, bool logical)
3332 : {
3333 ECB : int i;
3334 :
3335 GIC 28847595 : for (i = 0; i < max_wal_senders; i++)
3336 ECB : {
3337 : Latch *latch;
3338 : ReplicationKind kind;
3339 GIC 26219837 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3340 :
3341 : /*
3342 : * Get latch pointer with spinlock held, for the unlikely case that
3343 : * pointer reads aren't atomic (as they're 8 bytes). While at it, also
3344 : * get kind.
3345 : */
3346 26219837 : SpinLockAcquire(&walsnd->mutex);
3347 26219837 : latch = walsnd->latch;
3348 GNC 26219837 : kind = walsnd->kind;
3349 GIC 26219837 : SpinLockRelease(&walsnd->mutex);
3350 :
3351 GNC 26219837 : if (latch == NULL)
3352 26191517 : continue;
3353 :
3354 28320 : if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
3355 8540 : (logical && kind == REPLICATION_KIND_LOGICAL))
3356 GIC 21889 : SetLatch(latch);
3357 : }
3358 2627758 : }
3359 :
3360 : /*
3361 ECB : * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3362 : * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3363 : * on postmaster death.
3364 : */
3365 : static void
3366 GIC 55918 : WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3367 : {
3368 : WaitEvent event;
3369 ECB :
3370 GIC 55918 : ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3371 55918 : if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3372 55918 : (event.events & WL_POSTMASTER_DEATH))
3373 UIC 0 : proc_exit(1);
3374 GIC 55918 : }
3375 :
3376 ECB : /*
3377 : * Signal all walsenders to move to stopping state.
3378 : *
3379 : * This will trigger walsenders to move to a state where no further WAL can be
3380 : * generated. See this file's header for details.
3381 : */
3382 : void
3383 GIC 971 : WalSndInitStopping(void)
3384 ECB : {
3385 : int i;
3386 :
3387 GIC 9658 : for (i = 0; i < max_wal_senders; i++)
3388 ECB : {
3389 GIC 8687 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3390 : pid_t pid;
3391 :
3392 8687 : SpinLockAcquire(&walsnd->mutex);
3393 8687 : pid = walsnd->pid;
3394 8687 : SpinLockRelease(&walsnd->mutex);
3395 :
3396 CBC 8687 : if (pid == 0)
3397 GIC 8660 : continue;
3398 :
3399 27 : SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
3400 ECB : }
3401 CBC 971 : }
3402 ECB :
3403 EUB : /*
3404 ECB : * Wait that all the WAL senders have quit or reached the stopping state. This
3405 : * is used by the checkpointer to control when the shutdown checkpoint can
3406 : * safely be performed.
3407 : */
3408 : void
3409 GIC 971 : WalSndWaitStopping(void)
3410 : {
3411 : for (;;)
3412 25 : {
3413 ECB : int i;
3414 GIC 996 : bool all_stopped = true;
3415 :
3416 9683 : for (i = 0; i < max_wal_senders; i++)
3417 ECB : {
3418 GIC 8712 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3419 ECB :
3420 GIC 8712 : SpinLockAcquire(&walsnd->mutex);
3421 :
3422 CBC 8712 : if (walsnd->pid == 0)
3423 ECB : {
3424 CBC 8664 : SpinLockRelease(&walsnd->mutex);
3425 GIC 8664 : continue;
3426 ECB : }
3427 :
3428 GIC 48 : if (walsnd->state != WALSNDSTATE_STOPPING)
3429 ECB : {
3430 GIC 25 : all_stopped = false;
3431 CBC 25 : SpinLockRelease(&walsnd->mutex);
3432 GIC 25 : break;
3433 : }
3434 23 : SpinLockRelease(&walsnd->mutex);
3435 : }
3436 :
3437 : /* safe to leave if confirmation is done for all WAL senders */
3438 996 : if (all_stopped)
3439 CBC 971 : return;
3440 :
3441 GIC 25 : pg_usleep(10000L); /* wait for 10 msec */
3442 ECB : }
3443 : }
3444 :
3445 : /* Set state for current walsender (only called in walsender) */
3446 : void
3447 GIC 1687 : WalSndSetState(WalSndState state)
3448 ECB : {
3449 GIC 1687 : WalSnd *walsnd = MyWalSnd;
3450 ECB :
3451 GIC 1687 : Assert(am_walsender);
3452 ECB :
3453 GIC 1687 : if (walsnd->state == state)
3454 CBC 357 : return;
3455 ECB :
3456 GIC 1330 : SpinLockAcquire(&walsnd->mutex);
3457 1330 : walsnd->state = state;
3458 CBC 1330 : SpinLockRelease(&walsnd->mutex);
3459 : }
3460 ECB :
3461 : /*
3462 : * Return a string constant representing the state. This is used
3463 : * in system views, and should *not* be translated.
3464 : */
3465 : static const char *
3466 GIC 633 : WalSndGetStateString(WalSndState state)
3467 : {
3468 CBC 633 : switch (state)
3469 ECB : {
3470 GIC 3 : case WALSNDSTATE_STARTUP:
3471 CBC 3 : return "startup";
3472 UIC 0 : case WALSNDSTATE_BACKUP:
3473 0 : return "backup";
3474 GIC 6 : case WALSNDSTATE_CATCHUP:
3475 6 : return "catchup";
3476 624 : case WALSNDSTATE_STREAMING:
3477 CBC 624 : return "streaming";
3478 UIC 0 : case WALSNDSTATE_STOPPING:
3479 LBC 0 : return "stopping";
3480 : }
3481 0 : return "UNKNOWN";
3482 : }
3483 ECB :
3484 : static Interval *
3485 GIC 897 : offset_to_interval(TimeOffset offset)
3486 ECB : {
3487 CBC 897 : Interval *result = palloc(sizeof(Interval));
3488 ECB :
3489 GIC 897 : result->month = 0;
3490 897 : result->day = 0;
3491 897 : result->time = offset;
3492 :
3493 897 : return result;
3494 : }
3495 :
3496 ECB : /*
3497 : * Returns activity of walsenders, including pids and xlog locations sent to
3498 : * standby servers.
3499 : */
3500 : Datum
3501 CBC 553 : pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
3502 EUB : {
3503 : #define PG_STAT_GET_WAL_SENDERS_COLS 12
3504 CBC 553 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
3505 ECB : SyncRepStandbyData *sync_standbys;
3506 : int num_standbys;
3507 : int i;
3508 EUB :
3509 GBC 553 : InitMaterializedSRF(fcinfo, 0);
3510 :
3511 EUB : /*
3512 : * Get the currently active synchronous standbys. This could be out of
3513 : * date before we're done, but we'll use the data anyway.
3514 : */
3515 CBC 553 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3516 :
3517 5961 : for (i = 0; i < max_wal_senders; i++)
3518 : {
3519 5408 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3520 ECB : XLogRecPtr sentPtr;
3521 : XLogRecPtr write;
3522 : XLogRecPtr flush;
3523 : XLogRecPtr apply;
3524 : TimeOffset writeLag;
3525 : TimeOffset flushLag;
3526 : TimeOffset applyLag;
3527 : int priority;
3528 : int pid;
3529 : WalSndState state;
3530 : TimestampTz replyTime;
3531 : bool is_sync_standby;
3532 : Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
3533 GNC 5408 : bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
3534 ECB : int j;
3535 :
3536 : /* Collect data from shared memory */
3537 GIC 5408 : SpinLockAcquire(&walsnd->mutex);
3538 5408 : if (walsnd->pid == 0)
3539 ECB : {
3540 GIC 4775 : SpinLockRelease(&walsnd->mutex);
3541 4775 : continue;
3542 : }
3543 633 : pid = walsnd->pid;
3544 633 : sentPtr = walsnd->sentPtr;
3545 CBC 633 : state = walsnd->state;
3546 GIC 633 : write = walsnd->write;
3547 CBC 633 : flush = walsnd->flush;
3548 GIC 633 : apply = walsnd->apply;
3549 CBC 633 : writeLag = walsnd->writeLag;
3550 GIC 633 : flushLag = walsnd->flushLag;
3551 633 : applyLag = walsnd->applyLag;
3552 633 : priority = walsnd->sync_standby_priority;
3553 633 : replyTime = walsnd->replyTime;
3554 633 : SpinLockRelease(&walsnd->mutex);
3555 :
3556 : /*
3557 : * Detect whether walsender is/was considered synchronous. We can
3558 : * provide some protection against stale data by checking the PID
3559 : * along with walsnd_index.
3560 : */
3561 633 : is_sync_standby = false;
3562 674 : for (j = 0; j < num_standbys; j++)
3563 ECB : {
3564 GIC 68 : if (sync_standbys[j].walsnd_index == i &&
3565 27 : sync_standbys[j].pid == pid)
3566 : {
3567 CBC 27 : is_sync_standby = true;
3568 27 : break;
3569 : }
3570 ECB : }
3571 :
3572 CBC 633 : values[0] = Int32GetDatum(pid);
3573 ECB :
3574 CBC 633 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
3575 ECB : {
3576 : /*
3577 : * Only superusers and roles with privileges of pg_read_all_stats
3578 : * can see details. Other users only get the pid value to know
3579 : * it's a walsender, but no details.
3580 : */
3581 LBC 0 : MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
3582 ECB : }
3583 : else
3584 : {
3585 GIC 633 : values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3586 :
3587 633 : if (XLogRecPtrIsInvalid(sentPtr))
3588 3 : nulls[2] = true;
3589 633 : values[2] = LSNGetDatum(sentPtr);
3590 ECB :
3591 CBC 633 : if (XLogRecPtrIsInvalid(write))
3592 GIC 5 : nulls[3] = true;
3593 CBC 633 : values[3] = LSNGetDatum(write);
3594 ECB :
3595 GIC 633 : if (XLogRecPtrIsInvalid(flush))
3596 CBC 5 : nulls[4] = true;
3597 633 : values[4] = LSNGetDatum(flush);
3598 :
3599 GIC 633 : if (XLogRecPtrIsInvalid(apply))
3600 5 : nulls[5] = true;
3601 CBC 633 : values[5] = LSNGetDatum(apply);
3602 :
3603 ECB : /*
3604 : * Treat a standby such as a pg_basebackup background process
3605 : * which always returns an invalid flush location, as an
3606 : * asynchronous standby.
3607 : */
3608 GIC 633 : priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3609 :
3610 GBC 633 : if (writeLag < 0)
3611 GIC 352 : nulls[6] = true;
3612 : else
3613 281 : values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
3614 ECB :
3615 GIC 633 : if (flushLag < 0)
3616 CBC 298 : nulls[7] = true;
3617 ECB : else
3618 CBC 335 : values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3619 :
3620 633 : if (applyLag < 0)
3621 352 : nulls[8] = true;
3622 ECB : else
3623 GIC 281 : values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
3624 ECB :
3625 CBC 633 : values[9] = Int32GetDatum(priority);
3626 ECB :
3627 : /*
3628 : * More easily understood version of standby state. This is purely
3629 : * informational.
3630 : *
3631 : * In quorum-based sync replication, the role of each standby
3632 : * listed in synchronous_standby_names can be changing very
3633 : * frequently. Any standbys considered as "sync" at one moment can
3634 : * be switched to "potential" ones at the next moment. So, it's
3635 : * basically useless to report "sync" or "potential" as their sync
3636 : * states. We report just "quorum" for them.
3637 : */
3638 GIC 633 : if (priority == 0)
3639 CBC 595 : values[10] = CStringGetTextDatum("async");
3640 38 : else if (is_sync_standby)
3641 GIC 27 : values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
3642 CBC 27 : CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3643 : else
3644 11 : values[10] = CStringGetTextDatum("potential");
3645 ECB :
3646 GIC 633 : if (replyTime == 0)
3647 CBC 3 : nulls[11] = true;
3648 : else
3649 630 : values[11] = TimestampTzGetDatum(replyTime);
3650 ECB : }
3651 :
3652 CBC 633 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3653 : values, nulls);
3654 ECB : }
3655 :
3656 GIC 553 : return (Datum) 0;
3657 : }
3658 :
3659 : /*
3660 : * Send a keepalive message to standby.
3661 : *
3662 : * If requestReply is set, the message requests the other party to send
3663 : * a message back to us, for heartbeat purposes. We also set a flag to
3664 : * let nearby code know that we're waiting for that response, to avoid
3665 : * repeated requests.
3666 : *
3667 ECB : * writePtr is the location up to which the WAL is sent. It is essentially
3668 : * the same as sentPtr but in some cases, we need to send keep alive before
3669 : * sentPtr is updated like when skipping empty transactions.
3670 : */
3671 : static void
3672 GIC 1465 : WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
3673 ECB : {
3674 GIC 1465 : elog(DEBUG2, "sending replication keepalive");
3675 ECB :
3676 : /* construct the message... */
3677 GIC 1465 : resetStringInfo(&output_message);
3678 CBC 1465 : pq_sendbyte(&output_message, 'k');
3679 GIC 1465 : pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
3680 1465 : pq_sendint64(&output_message, GetCurrentTimestamp());
3681 CBC 1465 : pq_sendbyte(&output_message, requestReply ? 1 : 0);
3682 :
3683 : /* ... and send it wrapped in CopyData */
3684 GIC 1465 : pq_putmessage_noblock('d', output_message.data, output_message.len);
3685 ECB :
3686 : /* Set local flag */
3687 GIC 1465 : if (requestReply)
3688 2 : waiting_for_ping_response = true;
3689 1465 : }
3690 :
3691 : /*
3692 : * Send keepalive message if too much time has elapsed.
3693 : */
3694 : static void
3695 951815 : WalSndKeepaliveIfNecessary(void)
3696 : {
3697 : TimestampTz ping_time;
3698 :
3699 : /*
3700 : * Don't send keepalive messages if timeouts are globally disabled or
3701 ECB : * we're doing something not partaking in timeouts.
3702 : */
3703 CBC 951815 : if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3704 GIC 24 : return;
3705 :
3706 CBC 951791 : if (waiting_for_ping_response)
3707 52 : return;
3708 ECB :
3709 : /*
3710 : * If half of wal_sender_timeout has lapsed without receiving any reply
3711 : * from the standby, send a keep-alive message to the standby requesting
3712 : * an immediate reply.
3713 : */
3714 GIC 951739 : ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3715 : wal_sender_timeout / 2);
3716 CBC 951739 : if (last_processing >= ping_time)
3717 ECB : {
3718 LBC 0 : WalSndKeepalive(true, InvalidXLogRecPtr);
3719 :
3720 : /* Try to flush pending output to the client */
3721 UIC 0 : if (pq_flush_if_writable() != 0)
3722 0 : WalSndShutdown();
3723 : }
3724 ECB : }
3725 :
3726 : /*
3727 : * Record the end of the WAL and the time it was flushed locally, so that
3728 : * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3729 : * eventually reported to have been written, flushed and applied by the
3730 : * standby in a reply message.
3731 : */
3732 : static void
3733 CBC 58924 : LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3734 : {
3735 ECB : bool buffer_full;
3736 : int new_write_head;
3737 : int i;
3738 :
3739 GIC 58924 : if (!am_walsender)
3740 UIC 0 : return;
3741 :
3742 : /*
3743 ECB : * If the lsn hasn't advanced since last time, then do nothing. This way
3744 : * we only record a new sample when new WAL has been written.
3745 : */
3746 GIC 58924 : if (lag_tracker->last_lsn == lsn)
3747 GBC 40358 : return;
3748 GIC 18566 : lag_tracker->last_lsn = lsn;
3749 :
3750 EUB : /*
3751 : * If advancing the write head of the circular buffer would crash into any
3752 : * of the read heads, then the buffer is full. In other words, the
3753 : * slowest reader (presumably apply) is the one that controls the release
3754 : * of space.
3755 : */
3756 GIC 18566 : new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
3757 18566 : buffer_full = false;
3758 74264 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3759 : {
3760 55698 : if (new_write_head == lag_tracker->read_heads[i])
3761 UIC 0 : buffer_full = true;
3762 ECB : }
3763 :
3764 : /*
3765 : * If the buffer is full, for now we just rewind by one slot and overwrite
3766 : * the last sample, as a simple (if somewhat uneven) way to lower the
3767 : * sampling rate. There may be better adaptive compaction algorithms.
3768 : */
3769 GBC 18566 : if (buffer_full)
3770 : {
3771 UIC 0 : new_write_head = lag_tracker->write_head;
3772 0 : if (lag_tracker->write_head > 0)
3773 0 : lag_tracker->write_head--;
3774 : else
3775 LBC 0 : lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
3776 ECB : }
3777 :
3778 : /* Store a sample at the current write head position. */
3779 GIC 18566 : lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3780 18566 : lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3781 18566 : lag_tracker->write_head = new_write_head;
3782 : }
3783 :
3784 : /*
3785 ECB : * Find out how much time has elapsed between the moment WAL location 'lsn'
3786 : * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3787 : * We have a separate read head for each of the reported LSN locations we
3788 : * receive in replies from standby; 'head' controls which read head is
3789 : * used. Whenever a read head crosses an LSN which was written into the
3790 EUB : * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3791 : * find out the time this LSN (or an earlier one) was flushed locally, and
3792 : * therefore compute the lag.
3793 : *
3794 : * Return -1 if no new sample data is available, and otherwise the elapsed
3795 : * time in microseconds.
3796 : */
3797 : static TimeOffset
3798 CBC 273489 : LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3799 : {
3800 GBC 273489 : TimestampTz time = 0;
3801 EUB :
3802 : /* Read all unread samples up to this LSN or end of buffer. */
3803 GIC 328505 : while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
3804 GBC 113758 : lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3805 : {
3806 GIC 55016 : time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3807 55016 : lag_tracker->last_read[head] =
3808 CBC 55016 : lag_tracker->buffer[lag_tracker->read_heads[head]];
3809 55016 : lag_tracker->read_heads[head] =
3810 55016 : (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3811 : }
3812 :
3813 : /*
3814 : * If the lag tracker is empty, that means the standby has processed
3815 : * everything we've ever sent so we should now clear 'last_read'. If we
3816 : * didn't do that, we'd risk using a stale and irrelevant sample for
3817 : * interpolation at the beginning of the next burst of WAL after a period
3818 : * of idleness.
3819 : */
3820 GIC 273489 : if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3821 214747 : lag_tracker->last_read[head].time = 0;
3822 :
3823 273489 : if (time > now)
3824 : {
3825 : /* If the clock somehow went backwards, treat as not found. */
3826 UIC 0 : return -1;
3827 ECB : }
3828 GIC 273489 : else if (time == 0)
3829 ECB : {
3830 : /*
3831 : * We didn't cross a time. If there is a future sample that we
3832 : * haven't reached yet, and we've already reached at least one sample,
3833 : * let's interpolate the local flushed time. This is mainly useful
3834 : * for reporting a completely stuck apply position as having
3835 : * increasing lag, since otherwise we'd have to wait for it to
3836 : * eventually start moving again and cross one of our samples before
3837 : * we can show the lag increasing.
3838 : */
3839 CBC 223893 : if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3840 : {
3841 : /* There are no future samples, so we can't interpolate. */
3842 GIC 168188 : return -1;
3843 : }
3844 55705 : else if (lag_tracker->last_read[head].time != 0)
3845 : {
3846 : /* We can interpolate between last_read and the next sample. */
3847 : double fraction;
3848 7521 : WalTimeSample prev = lag_tracker->last_read[head];
3849 CBC 7521 : WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
3850 ECB :
3851 GIC 7521 : if (lsn < prev.lsn)
3852 ECB : {
3853 : /*
3854 : * Reported LSNs shouldn't normally go backwards, but it's
3855 EUB : * possible when there is a timeline change. Treat as not
3856 : * found.
3857 ECB : */
3858 UIC 0 : return -1;
3859 : }
3860 :
3861 GIC 7521 : Assert(prev.lsn < next.lsn);
3862 :
3863 7521 : if (prev.time > next.time)
3864 : {
3865 : /* If the clock somehow went backwards, treat as not found. */
3866 UIC 0 : return -1;
3867 : }
3868 ECB :
3869 : /* See how far we are between the previous and next samples. */
3870 GIC 7521 : fraction =
3871 CBC 7521 : (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3872 :
3873 ECB : /* Scale the local flush time proportionally. */
3874 GIC 7521 : time = (TimestampTz)
3875 7521 : ((double) prev.time + (next.time - prev.time) * fraction);
3876 : }
3877 ECB : else
3878 : {
3879 : /*
3880 : * We have only a future sample, implying that we were entirely
3881 : * caught up but and now there is a new burst of WAL and the
3882 : * standby hasn't processed the first sample yet. Until the
3883 : * standby reaches the future sample the best we can do is report
3884 : * the hypothetical lag if that sample were to be replayed now.
3885 : */
3886 GIC 48184 : time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3887 EUB : }
3888 : }
3889 :
3890 ECB : /* Return the elapsed time since local flush time in microseconds. */
3891 GIC 105301 : Assert(time != 0);
3892 CBC 105301 : return now - time;
3893 : }
|