Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walsender.c
4 : *
5 : * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
6 : * care of sending XLOG from the primary server to a single recipient.
7 : * (Note that there can be more than one walsender process concurrently.)
8 : * It is started by the postmaster when the walreceiver of a standby server
9 : * connects to the primary server and requests XLOG streaming replication.
10 : *
11 : * A walsender is similar to a regular backend, ie. there is a one-to-one
12 : * relationship between a connection and a walsender process, but instead
13 : * of processing SQL queries, it understands a small set of special
14 : * replication-mode commands. The START_REPLICATION command begins streaming
15 : * WAL to the client. While streaming, the walsender keeps reading XLOG
16 : * records from the disk and sends them to the standby server over the
17 : * COPY protocol, until either side ends the replication by exiting COPY
18 : * mode (or until the connection is closed).
19 : *
20 : * Normal termination is by SIGTERM, which instructs the walsender to
21 : * close the connection and exit(0) at the next convenient moment. Emergency
22 : * termination is by SIGQUIT; like any backend, the walsender will simply
23 : * abort and exit on SIGQUIT. A close of the connection and a FATAL error
24 : * are treated as not a crash but approximately normal termination;
25 : * the walsender will exit quickly without sending any more XLOG records.
26 : *
27 : * If the server is shut down, checkpointer sends us
28 : * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If
29 : * the backend is idle or runs an SQL query this causes the backend to
30 : * shutdown, if logical replication is in progress all existing WAL records
31 : * are processed followed by a shutdown. Otherwise this causes the walsender
32 : * to switch to the "stopping" state. In this state, the walsender will reject
33 : * any further replication commands. The checkpointer begins the shutdown
34 : * checkpoint once all walsenders are confirmed as stopping. When the shutdown
35 : * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
36 : * walsender to send any outstanding WAL, including the shutdown checkpoint
37 : * record, wait for it to be replicated to the standby, and then exit.
38 : *
39 : *
40 : * Portions Copyright (c) 2010-2023, PostgreSQL Global Development Group
41 : *
42 : * IDENTIFICATION
43 : * src/backend/replication/walsender.c
44 : *
45 : *-------------------------------------------------------------------------
46 : */
47 : #include "postgres.h"
48 :
49 : #include <signal.h>
50 : #include <unistd.h>
51 :
52 : #include "access/printtup.h"
53 : #include "access/timeline.h"
54 : #include "access/transam.h"
55 : #include "access/xact.h"
56 : #include "access/xlog_internal.h"
57 : #include "access/xlogreader.h"
58 : #include "access/xlogrecovery.h"
59 : #include "access/xlogutils.h"
60 : #include "backup/basebackup.h"
61 : #include "catalog/pg_authid.h"
62 : #include "catalog/pg_type.h"
63 : #include "commands/dbcommands.h"
64 : #include "commands/defrem.h"
65 : #include "funcapi.h"
66 : #include "libpq/libpq.h"
67 : #include "libpq/pqformat.h"
68 : #include "miscadmin.h"
69 : #include "nodes/replnodes.h"
70 : #include "pgstat.h"
71 : #include "postmaster/interrupt.h"
72 : #include "replication/decode.h"
73 : #include "replication/logical.h"
74 : #include "replication/slot.h"
75 : #include "replication/snapbuild.h"
76 : #include "replication/syncrep.h"
77 : #include "replication/walreceiver.h"
78 : #include "replication/walsender.h"
79 : #include "replication/walsender_private.h"
80 : #include "storage/condition_variable.h"
81 : #include "storage/fd.h"
82 : #include "storage/ipc.h"
83 : #include "storage/pmsignal.h"
84 : #include "storage/proc.h"
85 : #include "storage/procarray.h"
86 : #include "tcop/dest.h"
87 : #include "tcop/tcopprot.h"
88 : #include "utils/acl.h"
89 : #include "utils/builtins.h"
90 : #include "utils/guc.h"
91 : #include "utils/memutils.h"
92 : #include "utils/pg_lsn.h"
93 : #include "utils/portal.h"
94 : #include "utils/ps_status.h"
95 : #include "utils/timeout.h"
96 : #include "utils/timestamp.h"
97 :
98 : /*
99 : * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
100 : *
101 : * We don't have a good idea of what a good value would be; there's some
102 : * overhead per message in both walsender and walreceiver, but on the other
103 : * hand sending large batches makes walsender less responsive to signals
104 : * because signals are checked only between messages. 128kB (with
105 : * default 8k blocks) seems like a reasonable guess for now.
106 : */
107 : #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
108 :
109 : /* Array of WalSnds in shared memory */
110 : WalSndCtlData *WalSndCtl = NULL;
111 :
112 : /* My slot in the shared memory array */
113 : WalSnd *MyWalSnd = NULL;
114 :
115 : /* Global state */
116 : bool am_walsender = false; /* Am I a walsender process? */
117 : bool am_cascading_walsender = false; /* Am I cascading WAL to another
118 : * standby? */
119 : bool am_db_walsender = false; /* Connected to a database? */
120 :
121 : /* GUC variables */
122 : int max_wal_senders = 10; /* the maximum number of concurrent
123 : * walsenders */
124 : int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
125 : * data message */
126 : bool log_replication_commands = false;
127 :
128 : /*
129 : * State for WalSndWakeupRequest
130 : */
131 : bool wake_wal_senders = false;
132 :
133 : /*
134 : * xlogreader used for replication. Note that a WAL sender doing physical
135 : * replication does not need xlogreader to read WAL, but it needs one to
136 : * keep a state of its work.
137 : */
138 : static XLogReaderState *xlogreader = NULL;
139 :
140 : /*
141 : * These variables keep track of the state of the timeline we're currently
142 : * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
143 : * the timeline is not the latest timeline on this server, and the server's
144 : * history forked off from that timeline at sendTimeLineValidUpto.
145 : */
146 : static TimeLineID sendTimeLine = 0;
147 : static TimeLineID sendTimeLineNextTLI = 0;
148 : static bool sendTimeLineIsHistoric = false;
149 : static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
150 :
151 : /*
152 : * How far have we sent WAL already? This is also advertised in
153 : * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
154 : */
155 : static XLogRecPtr sentPtr = InvalidXLogRecPtr;
156 :
157 : /* Buffers for constructing outgoing messages and processing reply messages. */
158 : static StringInfoData output_message;
159 : static StringInfoData reply_message;
160 : static StringInfoData tmpbuf;
161 :
162 : /* Timestamp of last ProcessRepliesIfAny(). */
163 : static TimestampTz last_processing = 0;
164 :
165 : /*
166 : * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
167 : * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
168 : */
169 : static TimestampTz last_reply_timestamp = 0;
170 :
171 : /* Have we sent a heartbeat message asking for reply, since last reply? */
172 : static bool waiting_for_ping_response = false;
173 :
174 : /*
175 : * While streaming WAL in Copy mode, streamingDoneSending is set to true
176 : * after we have sent CopyDone. We should not send any more CopyData messages
177 : * after that. streamingDoneReceiving is set to true when we receive CopyDone
178 : * from the other end. When both become true, it's time to exit Copy mode.
179 : */
180 : static bool streamingDoneSending;
181 : static bool streamingDoneReceiving;
182 :
183 : /* Are we there yet? */
184 : static bool WalSndCaughtUp = false;
185 :
186 : /* Flags set by signal handlers for later service in main loop */
187 : static volatile sig_atomic_t got_SIGUSR2 = false;
188 : static volatile sig_atomic_t got_STOPPING = false;
189 :
190 : /*
191 : * This is set while we are streaming. When not set
192 : * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
193 : * the main loop is responsible for checking got_STOPPING and terminating when
194 : * it's set (after streaming any remaining WAL).
195 : */
196 : static volatile sig_atomic_t replication_active = false;
197 :
198 : static LogicalDecodingContext *logical_decoding_ctx = NULL;
199 :
200 : /* A sample associating a WAL location with the time it was written. */
201 : typedef struct
202 : {
203 : XLogRecPtr lsn;
204 : TimestampTz time;
205 : } WalTimeSample;
206 :
207 : /* The size of our buffer of time samples. */
208 : #define LAG_TRACKER_BUFFER_SIZE 8192
209 :
210 : /* A mechanism for tracking replication lag. */
211 : typedef struct
212 : {
213 : XLogRecPtr last_lsn;
214 : WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
215 : int write_head;
216 : int read_heads[NUM_SYNC_REP_WAIT_MODE];
217 : WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
218 : } LagTracker;
219 :
220 : static LagTracker *lag_tracker;
221 :
222 : /* Signal handlers */
223 : static void WalSndLastCycleHandler(SIGNAL_ARGS);
224 :
225 : /* Prototypes for private functions */
226 : typedef void (*WalSndSendDataCallback) (void);
227 : static void WalSndLoop(WalSndSendDataCallback send_data);
228 : static void InitWalSenderSlot(void);
229 : static void WalSndKill(int code, Datum arg);
230 : static void WalSndShutdown(void) pg_attribute_noreturn();
231 : static void XLogSendPhysical(void);
232 : static void XLogSendLogical(void);
233 : static void WalSndDone(WalSndSendDataCallback send_data);
234 : static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
235 : static void IdentifySystem(void);
236 : static void ReadReplicationSlot(ReadReplicationSlotCmd *cmd);
237 : static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
238 : static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
239 : static void StartReplication(StartReplicationCmd *cmd);
240 : static void StartLogicalReplication(StartReplicationCmd *cmd);
241 : static void ProcessStandbyMessage(void);
242 : static void ProcessStandbyReplyMessage(void);
243 : static void ProcessStandbyHSFeedbackMessage(void);
244 : static void ProcessRepliesIfAny(void);
245 : static void ProcessPendingWrites(void);
246 : static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
247 : static void WalSndKeepaliveIfNecessary(void);
248 : static void WalSndCheckTimeOut(void);
249 : static long WalSndComputeSleeptime(TimestampTz now);
250 : static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
251 : static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
252 : static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
253 : static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
254 : bool skipped_xact);
255 : static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
256 : static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
257 : static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
258 : static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
259 :
260 : static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
261 : TimeLineID *tli_p);
262 :
263 :
264 : /* Initialize walsender process before entering the main command loop */
265 : void
3838 heikki.linnakangas 266 CBC 831 : InitWalSender(void)
267 : {
4282 simon 268 831 : am_cascading_walsender = RecoveryInProgress();
269 :
270 : /* Create a per-walsender data structure in shared memory */
3838 heikki.linnakangas 271 831 : InitWalSenderSlot();
272 :
273 : /*
274 : * We don't currently need any ResourceOwner in a walsender process, but
275 : * if we did, we could call CreateAuxProcessResourceOwner here.
276 : */
277 :
278 : /*
279 : * Let postmaster know that we're a WAL sender. Once we've declared us as
280 : * a WAL sender process, postmaster will let us outlive the bgwriter and
281 : * kill us last in the shutdown sequence, so we get a chance to stream all
282 : * remaining WAL at shutdown, including the shutdown checkpoint. Note that
283 : * there's no going back, and we mustn't write any WAL records after this.
284 : */
3769 285 831 : MarkPostmasterChildWalSender();
286 831 : SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
287 :
288 : /*
289 : * If the client didn't specify a database to connect to, show in PGPROC
290 : * that our advertised xmin should affect vacuum horizons in all
291 : * databases. This allows physical replication clients to send hot
292 : * standby feedback that will delay vacuum cleanup in all databases.
293 : */
359 tgl 294 831 : if (MyDatabaseId == InvalidOid)
295 : {
296 349 : Assert(MyProc->xmin == InvalidTransactionId);
297 349 : LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
298 349 : MyProc->statusFlags |= PROC_AFFECTS_ALL_HORIZONS;
299 349 : ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
300 349 : LWLockRelease(ProcArrayLock);
301 : }
302 :
303 : /* Initialize empty timestamp buffer for lag tracking. */
1636 tmunro 304 831 : lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
4832 heikki.linnakangas 305 831 : }
306 :
307 : /*
308 : * Clean up after an error.
309 : *
310 : * WAL sender processes don't use transactions like regular backends do.
311 : * This function does any cleanup required after an error in a WAL sender
312 : * process, similar to what transaction abort does in a regular backend.
313 : */
314 : void
2794 andres 315 44 : WalSndErrorCleanup(void)
316 : {
3355 rhaas 317 44 : LWLockReleaseAll();
2329 318 44 : ConditionVariableCancelSleep();
2586 319 44 : pgstat_report_wait_end();
320 :
1059 alvherre 321 44 : if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
1061 322 9 : wal_segment_close(xlogreader);
323 :
3355 rhaas 324 44 : if (MyReplicationSlot != NULL)
325 16 : ReplicationSlotRelease();
326 :
2313 peter_e 327 44 : ReplicationSlotCleanup();
328 :
3769 heikki.linnakangas 329 44 : replication_active = false;
330 :
331 : /*
332 : * If there is a transaction in progress, it will clean up our
333 : * ResourceOwner, but if a replication command set up a resource owner
334 : * without a transaction, we've got to clean that up now.
335 : */
1101 rhaas 336 44 : if (!IsTransactionOrTransactionBlock())
337 43 : WalSndResourceCleanup(false);
338 :
2134 andres 339 44 : if (got_STOPPING || got_SIGUSR2)
3838 heikki.linnakangas 340 UBC 0 : proc_exit(0);
341 :
342 : /* Revert back to startup state */
3769 heikki.linnakangas 343 CBC 44 : WalSndSetState(WALSNDSTATE_STARTUP);
4832 344 44 : }
345 :
346 : /*
347 : * Clean up any ResourceOwner we created.
348 : */
349 : void
1101 rhaas 350 160 : WalSndResourceCleanup(bool isCommit)
351 : {
352 : ResourceOwner resowner;
353 :
354 160 : if (CurrentResourceOwner == NULL)
355 39 : return;
356 :
357 : /*
358 : * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
359 : * in a local variable and clear it first.
360 : */
361 121 : resowner = CurrentResourceOwner;
362 121 : CurrentResourceOwner = NULL;
363 :
364 : /* Now we can release resources and delete it. */
365 121 : ResourceOwnerRelease(resowner,
366 : RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
367 121 : ResourceOwnerRelease(resowner,
368 : RESOURCE_RELEASE_LOCKS, isCommit, true);
369 121 : ResourceOwnerRelease(resowner,
370 : RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
371 121 : ResourceOwnerDelete(resowner);
372 : }
373 :
374 : /*
375 : * Handle a client's connection abort in an orderly manner.
376 : */
377 : static void
3317 378 6 : WalSndShutdown(void)
379 : {
380 : /*
381 : * Reset whereToSendOutput to prevent ereport from attempting to send any
382 : * more messages to the standby.
383 : */
384 6 : if (whereToSendOutput == DestRemote)
385 6 : whereToSendOutput = DestNone;
386 :
387 6 : proc_exit(0);
388 : abort(); /* keep the compiler quiet */
389 : }
390 :
391 : /*
392 : * Handle the IDENTIFY_SYSTEM command.
393 : */
394 : static void
4468 magnus 395 522 : IdentifySystem(void)
396 : {
397 : char sysid[32];
398 : char xloc[MAXFNAMELEN];
399 : XLogRecPtr logptr;
3317 rhaas 400 522 : char *dbname = NULL;
401 : DestReceiver *dest;
402 : TupOutputState *tstate;
403 : TupleDesc tupdesc;
404 : Datum values[4];
267 peter 405 GNC 522 : bool nulls[4] = {0};
406 : TimeLineID currTLI;
407 :
408 : /*
409 : * Reply with a result set with one row, four columns. First col is system
410 : * ID, second is timeline ID, third is current xlog location and the
411 : * fourth contains the database name if we are connected to one.
412 : */
413 :
4468 magnus 414 CBC 522 : snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
415 : GetSystemIdentifier());
416 :
3769 heikki.linnakangas 417 522 : am_cascading_walsender = RecoveryInProgress();
418 522 : if (am_cascading_walsender)
520 rhaas 419 53 : logptr = GetStandbyFlushRecPtr(&currTLI);
420 : else
421 469 : logptr = GetFlushRecPtr(&currTLI);
422 :
775 peter 423 522 : snprintf(xloc, sizeof(xloc), "%X/%X", LSN_FORMAT_ARGS(logptr));
424 :
3317 rhaas 425 522 : if (MyDatabaseId != InvalidOid)
426 : {
427 171 : MemoryContext cur = CurrentMemoryContext;
428 :
429 : /* syscache access needs a transaction env. */
430 171 : StartTransactionCommand();
431 : /* make dbname live outside TX context */
432 171 : MemoryContextSwitchTo(cur);
433 171 : dbname = get_database_name(MyDatabaseId);
434 171 : CommitTransactionCommand();
435 : /* CommitTransactionCommand switches to TopMemoryContext */
436 171 : MemoryContextSwitchTo(cur);
437 : }
438 :
2258 439 522 : dest = CreateDestReceiver(DestRemoteSimple);
440 :
2258 rhaas 441 ECB : /* need a tuple descriptor representing four columns */
1601 andres 442 CBC 522 : tupdesc = CreateTemplateTupleDesc(4);
2258 rhaas 443 GIC 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
2258 rhaas 444 ECB : TEXTOID, -1, 0);
2258 rhaas 445 GIC 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
446 : INT8OID, -1, 0);
447 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
2258 rhaas 448 ECB : TEXTOID, -1, 0);
2258 rhaas 449 GIC 522 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
450 : TEXTOID, -1, 0);
451 :
2258 rhaas 452 ECB : /* prepare for projection of tuples */
1606 andres 453 GIC 522 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
454 :
2721 alvherre 455 ECB : /* column 1: system identifier */
2258 rhaas 456 GIC 522 : values[0] = CStringGetTextDatum(sysid);
457 :
2721 alvherre 458 ECB : /* column 2: timeline */
279 peter 459 GNC 522 : values[1] = Int64GetDatum(currTLI);
460 :
2158 peter_e 461 ECB : /* column 3: wal location */
2158 peter_e 462 GIC 522 : values[2] = CStringGetTextDatum(xloc);
463 :
2721 alvherre 464 ECB : /* column 4: database name, or NULL if none */
3317 rhaas 465 CBC 522 : if (dbname)
2258 rhaas 466 GIC 171 : values[3] = CStringGetTextDatum(dbname);
3317 rhaas 467 ECB : else
2258 rhaas 468 GIC 351 : nulls[3] = true;
469 :
2258 rhaas 470 ECB : /* send it to dest */
2258 rhaas 471 GIC 522 : do_tup_output(tstate, values, nulls);
2258 rhaas 472 ECB :
2258 rhaas 473 CBC 522 : end_tup_output(tstate);
4468 magnus 474 GIC 522 : }
475 :
476 : /* Handle READ_REPLICATION_SLOT command */
531 michael 477 ECB : static void
531 michael 478 GIC 6 : ReadReplicationSlot(ReadReplicationSlotCmd *cmd)
479 : {
480 : #define READ_REPLICATION_SLOT_COLS 3
481 : ReplicationSlot *slot;
482 : DestReceiver *dest;
483 : TupOutputState *tstate;
531 michael 484 ECB : TupleDesc tupdesc;
267 peter 485 GNC 6 : Datum values[READ_REPLICATION_SLOT_COLS] = {0};
486 : bool nulls[READ_REPLICATION_SLOT_COLS];
531 michael 487 ECB :
531 michael 488 CBC 6 : tupdesc = CreateTemplateTupleDesc(READ_REPLICATION_SLOT_COLS);
531 michael 489 GIC 6 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_type",
531 michael 490 ECB : TEXTOID, -1, 0);
531 michael 491 GIC 6 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "restart_lsn",
492 : TEXTOID, -1, 0);
531 michael 493 ECB : /* TimeLineID is unsigned, so int4 is not wide enough. */
531 michael 494 GIC 6 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "restart_tli",
495 : INT8OID, -1, 0);
531 michael 496 ECB :
267 peter 497 GNC 6 : memset(nulls, true, READ_REPLICATION_SLOT_COLS * sizeof(bool));
531 michael 498 ECB :
531 michael 499 CBC 6 : LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
531 michael 500 GIC 6 : slot = SearchNamedReplicationSlot(cmd->slotname, false);
531 michael 501 CBC 6 : if (slot == NULL || !slot->in_use)
502 : {
531 michael 503 GIC 2 : LWLockRelease(ReplicationSlotControlLock);
504 : }
505 : else
531 michael 506 ECB : {
507 : ReplicationSlot slot_contents;
531 michael 508 GIC 4 : int i = 0;
531 michael 509 ECB :
510 : /* Copy slot contents while holding spinlock */
531 michael 511 CBC 4 : SpinLockAcquire(&slot->mutex);
512 4 : slot_contents = *slot;
531 michael 513 GIC 4 : SpinLockRelease(&slot->mutex);
531 michael 514 CBC 4 : LWLockRelease(ReplicationSlotControlLock);
531 michael 515 ECB :
531 michael 516 GIC 4 : if (OidIsValid(slot_contents.data.database))
517 1 : ereport(ERROR,
518 : errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
519 : errmsg("cannot use %s with a logical replication slot",
520 : "READ_REPLICATION_SLOT"));
531 michael 521 ECB :
522 : /* slot type */
531 michael 523 CBC 3 : values[i] = CStringGetTextDatum("physical");
531 michael 524 GIC 3 : nulls[i] = false;
525 3 : i++;
531 michael 526 ECB :
527 : /* start LSN */
531 michael 528 GIC 3 : if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
529 : {
531 michael 530 ECB : char xloc[64];
531 :
531 michael 532 CBC 3 : snprintf(xloc, sizeof(xloc), "%X/%X",
533 3 : LSN_FORMAT_ARGS(slot_contents.data.restart_lsn));
531 michael 534 GIC 3 : values[i] = CStringGetTextDatum(xloc);
531 michael 535 CBC 3 : nulls[i] = false;
536 : }
531 michael 537 GIC 3 : i++;
531 michael 538 ECB :
539 : /* timeline this WAL was produced on */
531 michael 540 GIC 3 : if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
541 : {
531 michael 542 ECB : TimeLineID slots_position_timeline;
543 : TimeLineID current_timeline;
531 michael 544 GIC 3 : List *timeline_history = NIL;
545 :
546 : /*
547 : * While in recovery, use as timeline the currently-replaying one
531 michael 548 ECB : * to get the LSN position's history.
531 michael 549 EUB : */
531 michael 550 GIC 3 : if (RecoveryInProgress())
531 michael 551 LBC 0 : (void) GetXLogReplayRecPtr(¤t_timeline);
552 : else
520 rhaas 553 CBC 3 : current_timeline = GetWALInsertionTimeLine();
531 michael 554 ECB :
531 michael 555 GIC 3 : timeline_history = readTimeLineHistory(current_timeline);
531 michael 556 CBC 3 : slots_position_timeline = tliOfPointInHistory(slot_contents.data.restart_lsn,
531 michael 557 ECB : timeline_history);
531 michael 558 GIC 3 : values[i] = Int64GetDatum((int64) slots_position_timeline);
531 michael 559 CBC 3 : nulls[i] = false;
560 : }
561 3 : i++;
562 :
531 michael 563 GIC 3 : Assert(i == READ_REPLICATION_SLOT_COLS);
531 michael 564 ECB : }
565 :
531 michael 566 CBC 5 : dest = CreateDestReceiver(DestRemoteSimple);
567 5 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
568 5 : do_tup_output(tstate, values, nulls);
531 michael 569 GIC 5 : end_tup_output(tstate);
570 5 : }
571 :
572 :
573 : /*
574 : * Handle TIMELINE_HISTORY command.
4468 magnus 575 ECB : */
576 : static void
3769 heikki.linnakangas 577 GIC 13 : SendTimeLineHistory(TimeLineHistoryCmd *cmd)
578 : {
579 : DestReceiver *dest;
580 : TupleDesc tupdesc;
581 : StringInfoData buf;
582 : char histfname[MAXFNAMELEN];
583 : char path[MAXPGPATH];
584 : int fd;
585 : off_t histfilelen;
586 : off_t bytesleft;
587 : Size len;
588 :
279 peter 589 GNC 13 : dest = CreateDestReceiver(DestRemoteSimple);
590 :
4467 heikki.linnakangas 591 ECB : /*
592 : * Reply with a result set with one row, and two columns. The first col is
593 : * the name of the history file, 2nd is the contents.
594 : */
279 peter 595 GNC 13 : tupdesc = CreateTemplateTupleDesc(2);
596 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "filename", TEXTOID, -1, 0);
597 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "content", TEXTOID, -1, 0);
598 :
3769 heikki.linnakangas 599 GIC 13 : TLHistoryFileName(histfname, cmd->timeline);
3769 heikki.linnakangas 600 CBC 13 : TLHistoryFilePath(path, cmd->timeline);
3769 heikki.linnakangas 601 ECB :
602 : /* Send a RowDescription message */
279 peter 603 GNC 13 : dest->rStartup(dest, CMD_SELECT, tupdesc);
3769 heikki.linnakangas 604 ECB :
605 : /* Send a DataRow message */
3769 heikki.linnakangas 606 GBC 13 : pq_beginmessage(&buf, 'D');
2006 andres 607 GIC 13 : pq_sendint16(&buf, 2); /* # of columns */
2721 alvherre 608 13 : len = strlen(histfname);
2006 andres 609 CBC 13 : pq_sendint32(&buf, len); /* col1 len */
2721 alvherre 610 GBC 13 : pq_sendbytes(&buf, histfname, len);
611 :
2024 peter_e 612 GIC 13 : fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
3769 heikki.linnakangas 613 13 : if (fd < 0)
3769 heikki.linnakangas 614 LBC 0 : ereport(ERROR,
615 : (errcode_for_file_access(),
3769 heikki.linnakangas 616 ECB : errmsg("could not open file \"%s\": %m", path)));
617 :
618 : /* Determine file length and send it to client */
3769 heikki.linnakangas 619 GIC 13 : histfilelen = lseek(fd, 0, SEEK_END);
620 13 : if (histfilelen < 0)
3769 heikki.linnakangas 621 UIC 0 : ereport(ERROR,
3769 heikki.linnakangas 622 ECB : (errcode_for_file_access(),
623 : errmsg("could not seek to end of file \"%s\": %m", path)));
3769 heikki.linnakangas 624 CBC 13 : if (lseek(fd, 0, SEEK_SET) != 0)
3769 heikki.linnakangas 625 LBC 0 : ereport(ERROR,
3769 heikki.linnakangas 626 EUB : (errcode_for_file_access(),
627 : errmsg("could not seek to beginning of file \"%s\": %m", path)));
628 :
2006 andres 629 GIC 13 : pq_sendint32(&buf, histfilelen); /* col2 len */
3769 heikki.linnakangas 630 ECB :
3769 heikki.linnakangas 631 GBC 13 : bytesleft = histfilelen;
3769 heikki.linnakangas 632 GIC 26 : while (bytesleft > 0)
633 : {
634 : PGAlignedBlock rbuf;
635 : int nread;
3769 heikki.linnakangas 636 ECB :
2213 rhaas 637 CBC 13 : pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
1681 tgl 638 GIC 13 : nread = read(fd, rbuf.data, sizeof(rbuf));
2213 rhaas 639 13 : pgstat_report_wait_end();
1726 michael 640 CBC 13 : if (nread < 0)
3769 heikki.linnakangas 641 UBC 0 : ereport(ERROR,
642 : (errcode_for_file_access(),
643 : errmsg("could not read file \"%s\": %m",
644 : path)));
1726 michael 645 CBC 13 : else if (nread == 0)
1726 michael 646 LBC 0 : ereport(ERROR,
647 : (errcode(ERRCODE_DATA_CORRUPTED),
648 : errmsg("could not read file \"%s\": read %d of %zu",
649 : path, nread, (Size) bytesleft)));
650 :
1681 tgl 651 GIC 13 : pq_sendbytes(&buf, rbuf.data, nread);
3769 heikki.linnakangas 652 13 : bytesleft -= nread;
653 : }
654 :
1373 peter 655 CBC 13 : if (CloseTransientFile(fd) != 0)
1492 michael 656 UIC 0 : ereport(ERROR,
657 : (errcode_for_file_access(),
658 : errmsg("could not close file \"%s\": %m", path)));
659 :
3769 heikki.linnakangas 660 GIC 13 : pq_endmessage(&buf);
661 13 : }
3769 heikki.linnakangas 662 ECB :
663 : /*
664 : * Handle START_REPLICATION command.
665 : *
666 : * At the moment, this never returns, but an ereport(ERROR) will take us back
667 : * to the main loop.
668 : */
3769 heikki.linnakangas 669 EUB : static void
3769 heikki.linnakangas 670 GIC 195 : StartReplication(StartReplicationCmd *cmd)
671 : {
672 : StringInfoData buf;
673 : XLogRecPtr FlushPtr;
674 : TimeLineID FlushTLI;
675 :
676 : /* create xlogreader for physical replication */
1035 michael 677 195 : xlogreader =
699 tmunro 678 195 : XLogReaderAllocate(wal_segment_size, NULL,
679 195 : XL_ROUTINE(.segment_open = WalSndSegmentOpen,
680 : .segment_close = wal_segment_close),
681 : NULL);
682 :
1035 michael 683 CBC 195 : if (!xlogreader)
1035 michael 684 UIC 0 : ereport(ERROR,
1035 michael 685 ECB : (errcode(ERRCODE_OUT_OF_MEMORY),
503 alvherre 686 : errmsg("out of memory"),
503 alvherre 687 EUB : errdetail("Failed while allocating a WAL reading processor.")));
688 :
689 : /*
690 : * We assume here that we're logging enough information in the WAL for
691 : * log-shipping, since this is checked in PostmasterMain().
692 : *
693 : * NOTE: wal_level can only change at shutdown, so in most cases it is
694 : * difficult for there to be WAL data that we can still see that was
695 : * written at wal_level='minimal'.
696 : */
697 :
3355 rhaas 698 GIC 195 : if (cmd->slotname)
699 : {
667 alvherre 700 122 : ReplicationSlotAcquire(cmd->slotname, true);
2798 andres 701 120 : if (SlotIsLogical(MyReplicationSlot))
3355 rhaas 702 LBC 0 : ereport(ERROR,
3355 rhaas 703 ECB : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1165 alvherre 704 : errmsg("cannot use a logical replication slot for physical replication")));
705 :
1076 706 : /*
707 : * We don't need to verify the slot's restart_lsn here; instead we
708 : * rely on the caller requesting the starting point to use. If the
709 : * WAL segment doesn't exist, we'll fail later.
710 : */
711 : }
3355 rhaas 712 :
4433 simon 713 : /*
714 : * Select the timeline. If it was given explicitly by the client, use
520 rhaas 715 : * that. Otherwise use the timeline of the last replayed record.
4433 simon 716 : */
639 jdavis 717 GIC 193 : am_cascading_walsender = RecoveryInProgress();
3762 heikki.linnakangas 718 193 : if (am_cascading_walsender)
520 rhaas 719 12 : FlushPtr = GetStandbyFlushRecPtr(&FlushTLI);
720 : else
721 181 : FlushPtr = GetFlushRecPtr(&FlushTLI);
3762 heikki.linnakangas 722 ECB :
3769 heikki.linnakangas 723 GIC 193 : if (cmd->timeline != 0)
724 : {
725 : XLogRecPtr switchpoint;
726 :
727 192 : sendTimeLine = cmd->timeline;
520 rhaas 728 CBC 192 : if (sendTimeLine == FlushTLI)
3769 heikki.linnakangas 729 ECB : {
3769 heikki.linnakangas 730 GIC 180 : sendTimeLineIsHistoric = false;
3769 heikki.linnakangas 731 CBC 180 : sendTimeLineValidUpto = InvalidXLogRecPtr;
732 : }
733 : else
734 : {
735 : List *timeLineHistory;
736 :
3769 heikki.linnakangas 737 GIC 12 : sendTimeLineIsHistoric = true;
738 :
739 : /*
740 : * Check that the timeline the client requested exists, and the
741 : * requested start location is on that timeline.
742 : */
520 rhaas 743 12 : timeLineHistory = readTimeLineHistory(FlushTLI);
3734 heikki.linnakangas 744 12 : switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
745 : &sendTimeLineNextTLI);
3769 746 12 : list_free_deep(timeLineHistory);
747 :
748 : /*
749 : * Found the requested timeline in the history. Check that
750 : * requested startpoint is on that timeline in our history.
3769 heikki.linnakangas 751 ECB : *
752 : * This is quite loose on purpose. We only check that we didn't
753 : * fork off the requested timeline before the switchpoint. We
3602 bruce 754 EUB : * don't check that we switched *to* it before the requested
755 : * starting point. This is because the client can legitimately
756 : * request to start replication from the beginning of the WAL
757 : * segment that contains switchpoint, but on the new timeline, so
758 : * that it doesn't end up with a partial segment. If you ask for
759 : * too old a starting point, you'll get an error later when we
760 : * fail to find the requested WAL segment in pg_wal.
761 : *
3769 heikki.linnakangas 762 ECB : * XXX: we could be more strict here and only allow a startpoint
763 : * that's older than the switchpoint, if it's still in the same
764 : * WAL segment.
765 : */
3769 heikki.linnakangas 766 GIC 12 : if (!XLogRecPtrIsInvalid(switchpoint) &&
3754 alvherre 767 CBC 12 : switchpoint < cmd->startpoint)
3769 heikki.linnakangas 768 ECB : {
3769 heikki.linnakangas 769 LBC 0 : ereport(ERROR,
770 : (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
771 : LSN_FORMAT_ARGS(cmd->startpoint),
3769 heikki.linnakangas 772 ECB : cmd->timeline),
773 : errdetail("This server's history forked from timeline %u at %X/%X.",
774 : cmd->timeline,
775 peter 775 : LSN_FORMAT_ARGS(switchpoint))));
776 : }
3769 heikki.linnakangas 777 GIC 12 : sendTimeLineValidUpto = switchpoint;
778 : }
779 : }
780 : else
781 : {
520 rhaas 782 1 : sendTimeLine = FlushTLI;
3769 heikki.linnakangas 783 1 : sendTimeLineValidUpto = InvalidXLogRecPtr;
784 1 : sendTimeLineIsHistoric = false;
785 : }
3838 heikki.linnakangas 786 ECB :
3769 heikki.linnakangas 787 GIC 193 : streamingDoneSending = streamingDoneReceiving = false;
788 :
3769 heikki.linnakangas 789 ECB : /* If there is nothing to stream, don't even enter COPY mode */
3734 heikki.linnakangas 790 CBC 193 : if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
3769 heikki.linnakangas 791 ECB : {
792 : /*
3602 bruce 793 : * When we first start replication the standby will be behind the
794 : * primary. For some applications, for example synchronous
795 : * replication, it is important to have a clear state for this initial
796 : * catchup mode, so we can trigger actions when we change streaming
797 : * state later. We may stay in this state for a long time, which is
798 : * exactly why we want to be able to monitor whether or not we are
799 : * still here.
800 : */
3769 heikki.linnakangas 801 GBC 193 : WalSndSetState(WALSNDSTATE_CATCHUP);
802 :
803 : /* Send a CopyBothResponse message, and start streaming */
3769 heikki.linnakangas 804 GIC 193 : pq_beginmessage(&buf, 'W');
805 193 : pq_sendbyte(&buf, 0);
2006 andres 806 193 : pq_sendint16(&buf, 0);
3769 heikki.linnakangas 807 193 : pq_endmessage(&buf);
3769 heikki.linnakangas 808 CBC 193 : pq_flush();
809 :
810 : /*
3769 heikki.linnakangas 811 ECB : * Don't allow a request to stream from a future point in WAL that
812 : * hasn't been flushed to disk in this server yet.
813 : */
3754 alvherre 814 GIC 193 : if (FlushPtr < cmd->startpoint)
3769 heikki.linnakangas 815 ECB : {
3769 heikki.linnakangas 816 UIC 0 : ereport(ERROR,
817 : (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
775 peter 818 ECB : LSN_FORMAT_ARGS(cmd->startpoint),
819 : LSN_FORMAT_ARGS(FlushPtr))));
3769 heikki.linnakangas 820 : }
821 :
822 : /* Start streaming from the requested point */
3769 heikki.linnakangas 823 CBC 193 : sentPtr = cmd->startpoint;
3838 heikki.linnakangas 824 EUB :
3769 heikki.linnakangas 825 ECB : /* Initialize shared memory status, too */
2109 alvherre 826 GIC 193 : SpinLockAcquire(&MyWalSnd->mutex);
2109 alvherre 827 CBC 193 : MyWalSnd->sentPtr = sentPtr;
2109 alvherre 828 GIC 193 : SpinLockRelease(&MyWalSnd->mutex);
829 :
3769 heikki.linnakangas 830 CBC 193 : SyncRepInitConfig();
3769 heikki.linnakangas 831 ECB :
832 : /* Main loop of walsender */
3769 heikki.linnakangas 833 GIC 193 : replication_active = true;
834 :
3317 rhaas 835 193 : WalSndLoop(XLogSendPhysical);
836 :
3769 heikki.linnakangas 837 CBC 110 : replication_active = false;
2134 andres 838 GIC 110 : if (got_STOPPING)
3769 heikki.linnakangas 839 UIC 0 : proc_exit(0);
3769 heikki.linnakangas 840 GIC 110 : WalSndSetState(WALSNDSTATE_STARTUP);
841 :
3734 842 110 : Assert(streamingDoneSending && streamingDoneReceiving);
843 : }
3734 heikki.linnakangas 844 ECB :
3355 rhaas 845 GIC 110 : if (cmd->slotname)
3355 rhaas 846 CBC 94 : ReplicationSlotRelease();
3355 rhaas 847 ECB :
848 : /*
3734 heikki.linnakangas 849 : * Copy is finished now. Send a single-row result set indicating the next
850 : * timeline.
851 : */
3734 heikki.linnakangas 852 GIC 110 : if (sendTimeLineIsHistoric)
853 : {
854 : char startpos_str[8 + 1 + 8 + 1];
855 : DestReceiver *dest;
2258 rhaas 856 ECB : TupOutputState *tstate;
857 : TupleDesc tupdesc;
858 : Datum values[2];
267 peter 859 GNC 13 : bool nulls[2] = {0};
860 :
3623 heikki.linnakangas 861 GIC 13 : snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
775 peter 862 13 : LSN_FORMAT_ARGS(sendTimeLineValidUpto));
3623 heikki.linnakangas 863 ECB :
2258 rhaas 864 GIC 13 : dest = CreateDestReceiver(DestRemoteSimple);
3602 bruce 865 ECB :
866 : /*
867 : * Need a tuple descriptor representing two columns. int8 may seem
2153 868 : * like a surprising data type for this, but in theory int4 would not
869 : * be wide enough for this, as TimeLineID is unsigned.
3734 heikki.linnakangas 870 : */
1601 andres 871 GIC 13 : tupdesc = CreateTemplateTupleDesc(2);
2258 rhaas 872 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
873 : INT8OID, -1, 0);
2258 rhaas 874 CBC 13 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
2258 rhaas 875 ECB : TEXTOID, -1, 0);
876 :
877 : /* prepare for projection of tuple */
1606 andres 878 GIC 13 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
879 :
2258 rhaas 880 13 : values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
881 13 : values[1] = CStringGetTextDatum(startpos_str);
882 :
883 : /* send it to dest */
884 13 : do_tup_output(tstate, values, nulls);
885 :
2258 rhaas 886 CBC 13 : end_tup_output(tstate);
887 : }
888 :
889 : /* Send CommandComplete message */
935 alvherre 890 GIC 110 : EndReplicationCommand("START_STREAMING");
4468 magnus 891 110 : }
892 :
893 : /*
894 : * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
895 : * walsender process.
896 : *
897 : * Inside the walsender we can do better than read_local_xlog_page,
898 : * which has to do a plain sleep/busy loop, because the walsender's latch gets
899 : * set every time WAL is flushed.
3317 rhaas 900 ECB : */
901 : static int
699 tmunro 902 GIC 10781 : logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
903 : XLogRecPtr targetRecPtr, char *cur_page)
904 : {
905 : XLogRecPtr flushptr;
906 : int count;
907 : WALReadError errinfo;
1231 alvherre 908 ECB : XLogSegNo segno;
909 : TimeLineID currTLI;
1 andres 910 :
911 : /*
912 : * Make sure we have enough WAL available before retrieving the current
913 : * timeline. This is needed to determine am_cascading_walsender accurately
914 : * which is needed to determine the current timeline.
915 : */
1 andres 916 GNC 10781 : flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
917 :
918 : /*
919 : * Since logical decoding is also permitted on a standby server, we need
920 : * to check if the server is in recovery to decide how to get the current
921 : * timeline ID (so that it also cover the promotion or timeline change
922 : * cases).
923 : */
924 10659 : am_cascading_walsender = RecoveryInProgress();
925 :
926 10659 : if (am_cascading_walsender)
927 311 : GetXLogReplayRecPtr(&currTLI);
928 : else
929 10348 : currTLI = GetWALInsertionTimeLine();
930 :
520 rhaas 931 CBC 10659 : XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
932 10659 : sendTimeLineIsHistoric = (state->currTLI != currTLI);
2209 simon 933 10659 : sendTimeLine = state->currTLI;
2209 simon 934 GIC 10659 : sendTimeLineValidUpto = state->currTLIValidUntil;
935 10659 : sendTimeLineNextTLI = state->nextTLI;
2209 simon 936 ECB :
2109 tgl 937 : /* fail if not (implies we are going to shut down) */
2109 tgl 938 GIC 10659 : if (flushptr < targetPagePtr + reqLen)
699 tmunro 939 CBC 204 : return -1;
940 :
2109 tgl 941 GIC 10455 : if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
2109 tgl 942 CBC 8909 : count = XLOG_BLCKSZ; /* more than one block available */
943 : else
2109 tgl 944 GIC 1546 : count = flushptr - targetPagePtr; /* part of the page available */
945 :
946 : /* now actually read the data, we know it's there */
699 tmunro 947 10455 : if (!WALRead(state,
948 : cur_page,
949 : targetPagePtr,
1231 alvherre 950 EUB : XLOG_BLCKSZ,
951 : currTLI, /* Pass the current TLI because only
952 : * WalSndSegmentOpen controls whether new TLI
953 : * is needed. */
954 : &errinfo))
1231 alvherre 955 UIC 0 : WALReadRaiseError(&errinfo);
956 :
957 : /*
958 : * After reading into the buffer, check that what we read was valid. We do
1231 alvherre 959 ECB : * this after reading, because even though the segment was present when we
960 : * opened it, it might get recycled or removed while we read it. The
961 : * read() succeeds in that case, but the data we tried to read might
962 : * already have been overwritten with new WAL records.
963 : */
1061 alvherre 964 GIC 10455 : XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
965 10455 : CheckXLogRemoved(segno, state->seg.ws_tli);
966 :
699 tmunro 967 10455 : return count;
968 : }
3317 rhaas 969 ECB :
970 : /*
971 : * Process extra options given to CREATE_REPLICATION_SLOT.
972 : */
973 : static void
2217 peter_e 974 GIC 344 : parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
2217 peter_e 975 ECB : bool *reserve_wal,
648 akapila 976 : CRSSnapshotAction *snapshot_action,
977 : bool *two_phase)
978 : {
979 : ListCell *lc;
2217 peter_e 980 CBC 344 : bool snapshot_action_given = false;
2217 peter_e 981 GIC 344 : bool reserve_wal_given = false;
648 akapila 982 CBC 344 : bool two_phase_given = false;
983 :
2217 peter_e 984 ECB : /* Parse options */
2153 bruce 985 GIC 689 : foreach(lc, cmd->options)
986 : {
2217 peter_e 987 345 : DefElem *defel = (DefElem *) lfirst(lc);
2217 peter_e 988 ECB :
551 rhaas 989 GBC 345 : if (strcmp(defel->defname, "snapshot") == 0)
990 : {
991 : char *action;
992 :
2217 peter_e 993 CBC 247 : if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
2217 peter_e 994 LBC 0 : ereport(ERROR,
995 : (errcode(ERRCODE_SYNTAX_ERROR),
2217 peter_e 996 ECB : errmsg("conflicting or redundant options")));
2217 peter_e 997 EUB :
551 rhaas 998 CBC 247 : action = defGetString(defel);
2217 peter_e 999 247 : snapshot_action_given = true;
551 rhaas 1000 ECB :
551 rhaas 1001 CBC 247 : if (strcmp(action, "export") == 0)
551 rhaas 1002 UIC 0 : *snapshot_action = CRS_EXPORT_SNAPSHOT;
551 rhaas 1003 GBC 247 : else if (strcmp(action, "nothing") == 0)
551 rhaas 1004 GIC 92 : *snapshot_action = CRS_NOEXPORT_SNAPSHOT;
1005 155 : else if (strcmp(action, "use") == 0)
1006 155 : *snapshot_action = CRS_USE_SNAPSHOT;
1007 : else
2208 peter_e 1008 LBC 0 : ereport(ERROR,
1009 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
551 rhaas 1010 ECB : errmsg("unrecognized value for CREATE_REPLICATION_SLOT option \"%s\": \"%s\"",
551 rhaas 1011 EUB : defel->defname, action)));
1012 : }
2217 peter_e 1013 GIC 98 : else if (strcmp(defel->defname, "reserve_wal") == 0)
1014 : {
2217 peter_e 1015 CBC 96 : if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
2217 peter_e 1016 LBC 0 : ereport(ERROR,
1017 : (errcode(ERRCODE_SYNTAX_ERROR),
2217 peter_e 1018 ECB : errmsg("conflicting or redundant options")));
1019 :
2217 peter_e 1020 CBC 96 : reserve_wal_given = true;
551 rhaas 1021 GBC 96 : *reserve_wal = defGetBoolean(defel);
1022 : }
648 akapila 1023 GIC 2 : else if (strcmp(defel->defname, "two_phase") == 0)
648 akapila 1024 ECB : {
648 akapila 1025 CBC 2 : if (two_phase_given || cmd->kind != REPLICATION_KIND_LOGICAL)
648 akapila 1026 UIC 0 : ereport(ERROR,
1027 : (errcode(ERRCODE_SYNTAX_ERROR),
648 akapila 1028 EUB : errmsg("conflicting or redundant options")));
648 akapila 1029 GIC 2 : two_phase_given = true;
551 rhaas 1030 CBC 2 : *two_phase = defGetBoolean(defel);
1031 : }
1032 : else
2217 peter_e 1033 UIC 0 : elog(ERROR, "unrecognized option: %s", defel->defname);
1034 : }
2217 peter_e 1035 GIC 344 : }
2217 peter_e 1036 ECB :
1037 : /*
3355 rhaas 1038 : * Create a new replication slot.
1039 : */
1040 : static void
3355 rhaas 1041 CBC 344 : CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
3355 rhaas 1042 ECB : {
3317 rhaas 1043 CBC 344 : const char *snapshot_name = NULL;
1044 : char xloc[MAXFNAMELEN];
1045 : char *slot_name;
2217 peter_e 1046 GIC 344 : bool reserve_wal = false;
648 akapila 1047 344 : bool two_phase = false;
2208 peter_e 1048 CBC 344 : CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
1049 : DestReceiver *dest;
2258 rhaas 1050 ECB : TupOutputState *tstate;
1051 : TupleDesc tupdesc;
1052 : Datum values[4];
267 peter 1053 GNC 344 : bool nulls[4] = {0};
3355 rhaas 1054 ECB :
3355 rhaas 1055 GIC 344 : Assert(!MyReplicationSlot);
3355 rhaas 1056 ECB :
648 akapila 1057 CBC 344 : parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase);
1058 :
3317 rhaas 1059 GIC 344 : if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1060 : {
2313 peter_e 1061 97 : ReplicationSlotCreate(cmd->slotname, false,
767 akapila 1062 CBC 97 : cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
1063 : false);
1064 : }
1065 : else
1066 : {
3317 rhaas 1067 GIC 247 : CheckLogicalDecodingRequirements();
1068 :
1069 : /*
1070 : * Initially create persistent slot as ephemeral - that allows us to
2313 peter_e 1071 ECB : * nicely handle errors during initialization because it'll get
1072 : * dropped if this transaction fails. We'll make it persistent at the
1073 : * end. Temporary slots can be created as temporary from beginning as
1074 : * they get dropped on error as well.
1075 : */
2313 peter_e 1076 CBC 247 : ReplicationSlotCreate(cmd->slotname, true,
767 akapila 1077 GIC 247 : cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
1078 : two_phase);
3317 rhaas 1079 ECB : }
1080 :
3317 rhaas 1081 GIC 343 : if (cmd->kind == REPLICATION_KIND_LOGICAL)
1082 : {
1083 : LogicalDecodingContext *ctx;
2153 bruce 1084 247 : bool need_full_snapshot = false;
3317 rhaas 1085 ECB :
1086 : /*
2208 peter_e 1087 EUB : * Do options check early so that we can bail before calling the
1088 : * DecodingContextFindStartpoint which can take long time.
1089 : */
2208 peter_e 1090 GIC 247 : if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1091 : {
2208 peter_e 1092 UIC 0 : if (IsTransactionBlock())
2208 peter_e 1093 UBC 0 : ereport(ERROR,
1094 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1424 alvherre 1095 ECB : (errmsg("%s must not be called inside a transaction",
1096 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'export')")));
2173 andres 1097 :
2173 andres 1098 UBC 0 : need_full_snapshot = true;
1099 : }
2208 peter_e 1100 GIC 247 : else if (snapshot_action == CRS_USE_SNAPSHOT)
1101 : {
1102 155 : if (!IsTransactionBlock())
2208 peter_e 1103 LBC 0 : ereport(ERROR,
1424 alvherre 1104 EUB : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1105 : (errmsg("%s must be called inside a transaction",
1106 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1107 :
2208 peter_e 1108 CBC 155 : if (XactIsoLevel != XACT_REPEATABLE_READ)
2208 peter_e 1109 UBC 0 : ereport(ERROR,
1110 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1111 : (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
1112 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
139 akapila 1113 GNC 155 : if (!XactReadOnly)
139 akapila 1114 UNC 0 : ereport(ERROR,
1115 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1116 : (errmsg("%s must be called in a read only transaction",
1117 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1118 :
2208 peter_e 1119 CBC 155 : if (FirstSnapshotSet)
2208 peter_e 1120 UBC 0 : ereport(ERROR,
1121 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1122 : (errmsg("%s must be called before any query",
1123 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1124 :
2208 peter_e 1125 CBC 155 : if (IsSubTransaction())
2208 peter_e 1126 UBC 0 : ereport(ERROR,
1127 : /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
1128 : (errmsg("%s must not be called in a subtransaction",
1129 : "CREATE_REPLICATION_SLOT ... (SNAPSHOT 'use')")));
1130 :
2173 andres 1131 CBC 155 : need_full_snapshot = true;
1132 : }
1133 :
1134 247 : ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
1135 : InvalidXLogRecPtr,
699 tmunro 1136 247 : XL_ROUTINE(.page_read = logical_read_xlog_page,
1137 : .segment_open = WalSndSegmentOpen,
1138 : .segment_close = wal_segment_close),
1139 : WalSndPrepareWrite, WalSndWriteData,
1140 : WalSndUpdateProgress);
1141 :
1142 : /*
1143 : * Signal that we don't need the timeout mechanism. We're just
1144 : * creating the replication slot and don't yet accept feedback
1145 : * messages or send keepalives. As we possibly need to wait for
1146 : * further WAL the walsender would otherwise possibly be killed too
1147 : * soon.
1148 : */
3237 andres 1149 247 : last_reply_timestamp = 0;
1150 :
1151 : /* build initial snapshot, might take a while */
3317 rhaas 1152 247 : DecodingContextFindStartpoint(ctx);
1153 :
1154 : /*
1155 : * Export or use the snapshot if we've been asked to do so.
1156 : *
1157 : * NB. We will convert the snapbuild.c kind of snapshot to normal
1158 : * snapshot when doing this.
1159 : */
2208 peter_e 1160 247 : if (snapshot_action == CRS_EXPORT_SNAPSHOT)
1161 : {
2217 peter_e 1162 UBC 0 : snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
1163 : }
2208 peter_e 1164 CBC 247 : else if (snapshot_action == CRS_USE_SNAPSHOT)
1165 : {
1166 : Snapshot snap;
1167 :
2205 tgl 1168 155 : snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
2208 peter_e 1169 155 : RestoreTransactionSnapshot(snap, MyProc);
1170 : }
1171 :
1172 : /* don't need the decoding context anymore */
3317 rhaas 1173 247 : FreeDecodingContext(ctx);
1174 :
2313 peter_e 1175 247 : if (!cmd->temporary)
1176 247 : ReplicationSlotPersist();
1177 : }
2217 1178 96 : else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
1179 : {
2772 andres 1180 95 : ReplicationSlotReserveWal();
1181 :
1182 95 : ReplicationSlotMarkDirty();
1183 :
1184 : /* Write this slot to disk if it's a permanent one. */
2313 peter_e 1185 95 : if (!cmd->temporary)
1186 3 : ReplicationSlotSave();
1187 : }
1188 :
2158 1189 343 : snprintf(xloc, sizeof(xloc), "%X/%X",
775 peter 1190 343 : LSN_FORMAT_ARGS(MyReplicationSlot->data.confirmed_flush));
1191 :
2258 rhaas 1192 343 : dest = CreateDestReceiver(DestRemoteSimple);
1193 :
1194 : /*----------
1195 : * Need a tuple descriptor representing four columns:
1196 : * - first field: the slot name
1197 : * - second field: LSN at which we became consistent
1198 : * - third field: exported snapshot's name
1199 : * - fourth field: output plugin
1200 : *----------
2258 rhaas 1201 ECB : */
1601 andres 1202 CBC 343 : tupdesc = CreateTemplateTupleDesc(4);
2258 rhaas 1203 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
2258 rhaas 1204 ECB : TEXTOID, -1, 0);
2258 rhaas 1205 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
2258 rhaas 1206 ECB : TEXTOID, -1, 0);
2258 rhaas 1207 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
2258 rhaas 1208 ECB : TEXTOID, -1, 0);
2258 rhaas 1209 GIC 343 : TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
1210 : TEXTOID, -1, 0);
1211 :
2258 rhaas 1212 ECB : /* prepare for projection of tuples */
1606 andres 1213 GIC 343 : tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
1214 :
3355 rhaas 1215 ECB : /* slot_name */
2258 rhaas 1216 CBC 343 : slot_name = NameStr(MyReplicationSlot->data.name);
2258 rhaas 1217 GIC 343 : values[0] = CStringGetTextDatum(slot_name);
1218 :
3317 rhaas 1219 ECB : /* consistent wal location */
2158 peter_e 1220 GIC 343 : values[1] = CStringGetTextDatum(xloc);
1221 :
2721 alvherre 1222 ECB : /* snapshot name, or NULL if none */
3317 rhaas 1223 GBC 343 : if (snapshot_name != NULL)
2258 rhaas 1224 UIC 0 : values[2] = CStringGetTextDatum(snapshot_name);
3317 rhaas 1225 ECB : else
2258 rhaas 1226 GIC 343 : nulls[2] = true;
1227 :
2721 alvherre 1228 ECB : /* plugin, or NULL if none */
3317 rhaas 1229 CBC 343 : if (cmd->plugin != NULL)
2258 rhaas 1230 GIC 247 : values[3] = CStringGetTextDatum(cmd->plugin);
3317 rhaas 1231 ECB : else
2258 rhaas 1232 GIC 96 : nulls[3] = true;
1233 :
2258 rhaas 1234 ECB : /* send it to dest */
2258 rhaas 1235 CBC 343 : do_tup_output(tstate, values, nulls);
2258 rhaas 1236 GIC 343 : end_tup_output(tstate);
3355 rhaas 1237 ECB :
3355 rhaas 1238 CBC 343 : ReplicationSlotRelease();
3355 rhaas 1239 GIC 343 : }
1240 :
1241 : /*
1242 : * Get rid of a replication slot that is no longer wanted.
1243 : */
3355 rhaas 1244 ECB : static void
3355 rhaas 1245 GIC 192 : DropReplicationSlot(DropReplicationSlotCmd *cmd)
3355 rhaas 1246 ECB : {
2046 alvherre 1247 CBC 192 : ReplicationSlotDrop(cmd->slotname, !cmd->wait);
3355 rhaas 1248 GIC 192 : }
1249 :
1250 : /*
1251 : * Load previously initiated logical slot and prepare for sending data (via
1252 : * WalSndLoop).
1253 : */
3317 rhaas 1254 ECB : static void
3317 rhaas 1255 GIC 298 : StartLogicalReplication(StartReplicationCmd *cmd)
1256 : {
1257 : StringInfoData buf;
1258 : QueryCompletion qc;
1259 :
3317 rhaas 1260 ECB : /* make sure that our requirements are still fulfilled */
3317 rhaas 1261 GIC 298 : CheckLogicalDecodingRequirements();
3317 rhaas 1262 ECB :
3317 rhaas 1263 GIC 296 : Assert(!MyReplicationSlot);
3317 rhaas 1264 ECB :
667 alvherre 1265 GIC 296 : ReplicationSlotAcquire(cmd->slotname, true);
1266 :
1267 : /*
3317 rhaas 1268 EUB : * Force a disconnect, so that the decoding code doesn't need to care
1269 : * about an eventual switch from running in recovery, to running in a
1270 : * normal environment. Client code is expected to handle reconnects.
1271 : */
3317 rhaas 1272 GIC 296 : if (am_cascading_walsender && !RecoveryInProgress())
1273 : {
3317 rhaas 1274 UIC 0 : ereport(LOG,
1275 : (errmsg("terminating walsender process after promotion")));
2134 andres 1276 0 : got_STOPPING = true;
1277 : }
3317 rhaas 1278 ECB :
1712 alvherre 1279 : /*
1280 : * Create our decoding context, making it start at the previously ack'ed
1281 : * position.
1282 : *
1283 : * Do this before sending a CopyBothResponse message, so that any errors
1284 : * are reported early.
1285 : */
1712 alvherre 1286 GIC 290 : logical_decoding_ctx =
1712 alvherre 1287 CBC 296 : CreateDecodingContext(cmd->startpoint, cmd->options, false,
699 tmunro 1288 GIC 296 : XL_ROUTINE(.page_read = logical_read_xlog_page,
1289 : .segment_open = WalSndSegmentOpen,
699 tmunro 1290 ECB : .segment_close = wal_segment_close),
1712 alvherre 1291 : WalSndPrepareWrite, WalSndWriteData,
1292 : WalSndUpdateProgress);
1061 alvherre 1293 CBC 290 : xlogreader = logical_decoding_ctx->reader;
1712 alvherre 1294 ECB :
3317 rhaas 1295 GIC 290 : WalSndSetState(WALSNDSTATE_CATCHUP);
1296 :
3317 rhaas 1297 ECB : /* Send a CopyBothResponse message, and start streaming */
3317 rhaas 1298 CBC 290 : pq_beginmessage(&buf, 'W');
3317 rhaas 1299 GIC 290 : pq_sendbyte(&buf, 0);
2006 andres 1300 290 : pq_sendint16(&buf, 0);
3317 rhaas 1301 290 : pq_endmessage(&buf);
1302 290 : pq_flush();
1303 :
3317 rhaas 1304 ECB : /* Start reading WAL from the oldest required WAL. */
1169 heikki.linnakangas 1305 GIC 290 : XLogBeginRead(logical_decoding_ctx->reader,
1306 290 : MyReplicationSlot->data.restart_lsn);
3317 rhaas 1307 ECB :
1308 : /*
1309 : * Report the location after which we'll send out further commits as the
1310 : * current sentPtr.
1311 : */
3317 rhaas 1312 GIC 290 : sentPtr = MyReplicationSlot->data.confirmed_flush;
3317 rhaas 1313 ECB :
1314 : /* Also update the sent position status in shared memory */
2109 alvherre 1315 GIC 290 : SpinLockAcquire(&MyWalSnd->mutex);
2109 alvherre 1316 CBC 290 : MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
2109 alvherre 1317 GIC 290 : SpinLockRelease(&MyWalSnd->mutex);
3317 rhaas 1318 ECB :
3317 rhaas 1319 CBC 290 : replication_active = true;
1320 :
1321 290 : SyncRepInitConfig();
3317 rhaas 1322 ECB :
3317 rhaas 1323 EUB : /* Main loop of walsender */
3317 rhaas 1324 CBC 290 : WalSndLoop(XLogSendLogical);
1325 :
3317 rhaas 1326 GIC 154 : FreeDecodingContext(logical_decoding_ctx);
3317 rhaas 1327 CBC 154 : ReplicationSlotRelease();
3317 rhaas 1328 ECB :
3317 rhaas 1329 CBC 154 : replication_active = false;
2134 andres 1330 GIC 154 : if (got_STOPPING)
3317 rhaas 1331 UIC 0 : proc_exit(0);
3317 rhaas 1332 GIC 154 : WalSndSetState(WALSNDSTATE_STARTUP);
1333 :
1334 : /* Get out of COPY mode (CommandComplete). */
1133 alvherre 1335 154 : SetQueryCompletion(&qc, CMDTAG_COPY, 0);
1336 154 : EndCommand(&qc, DestRemote, false);
3317 rhaas 1337 154 : }
1338 :
1339 : /*
3317 rhaas 1340 ECB : * LogicalDecodingContext 'prepare_write' callback.
1341 : *
1342 : * Prepare a write into a StringInfo.
1343 : *
2198 magnus 1344 : * Don't do anything lasting in here, it's quite possible that nothing will be done
1345 : * with the data.
3317 rhaas 1346 : */
1347 : static void
3317 rhaas 1348 CBC 184362 : WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
3317 rhaas 1349 ECB : {
1350 : /* can't have sync rep confused by sending the same LSN several times */
3317 rhaas 1351 GIC 184362 : if (!last_write)
1352 318 : lsn = InvalidXLogRecPtr;
1353 :
1354 184362 : resetStringInfo(ctx->out);
1355 :
3317 rhaas 1356 CBC 184362 : pq_sendbyte(ctx->out, 'w');
1357 184362 : pq_sendint64(ctx->out, lsn); /* dataStart */
3317 rhaas 1358 GIC 184362 : pq_sendint64(ctx->out, lsn); /* walEnd */
1359 :
1360 : /*
1361 : * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
1362 : * reserve space here.
1363 : */
3260 bruce 1364 184362 : pq_sendint64(ctx->out, 0); /* sendtime */
3317 rhaas 1365 184362 : }
1366 :
3317 rhaas 1367 ECB : /*
1368 : * LogicalDecodingContext 'write' callback.
1369 : *
1370 : * Actually write out data previously prepared by WalSndPrepareWrite out to
1371 : * the network. Take as long as needed, but process replies from the other
1372 : * side and check timeouts during that.
1373 : */
1374 : static void
3317 rhaas 1375 GIC 184362 : WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1376 : bool last_write)
3317 rhaas 1377 ECB : {
1809 tgl 1378 : TimestampTz now;
1942 andrew 1379 :
3317 rhaas 1380 : /*
3260 bruce 1381 : * Fill the send timestamp last, so that it is taken as late as possible.
1382 : * This is somewhat ugly, but the protocol is set as it's already used for
1383 : * several releases by streaming physical replication.
3317 rhaas 1384 : */
3317 rhaas 1385 GIC 184362 : resetStringInfo(&tmpbuf);
1942 andrew 1386 CBC 184362 : now = GetCurrentTimestamp();
1942 andrew 1387 GIC 184362 : pq_sendint64(&tmpbuf, now);
3317 rhaas 1388 184362 : memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
3317 rhaas 1389 CBC 184362 : tmpbuf.data, sizeof(int64));
3317 rhaas 1390 ECB :
1391 : /* output previously gathered data in a CopyData packet */
1250 michael 1392 GIC 184362 : pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
1250 michael 1393 ECB :
1942 andrew 1394 CBC 184362 : CHECK_FOR_INTERRUPTS();
1942 andrew 1395 ECB :
1396 : /* Try to flush pending output to the client */
3317 rhaas 1397 CBC 184362 : if (pq_flush_if_writable() != 0)
3317 rhaas 1398 GIC 6 : WalSndShutdown();
1399 :
1400 : /* Try taking fast path unless we get too close to walsender timeout. */
1942 andrew 1401 CBC 184356 : if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
1942 andrew 1402 GIC 184356 : wal_sender_timeout / 2) &&
1403 184356 : !pq_is_send_pending())
1404 : {
3317 rhaas 1405 184250 : return;
1406 : }
1407 :
1408 : /* If we have pending write here, go to slow path */
375 akapila 1409 CBC 106 : ProcessPendingWrites();
1410 : }
1411 :
375 akapila 1412 ECB : /*
1413 : * Wait until there is no pending write. Also process replies from the other
1414 : * side and check timeouts during that.
1415 : */
1416 : static void
375 akapila 1417 GIC 106 : ProcessPendingWrites(void)
1418 : {
3317 rhaas 1419 ECB : for (;;)
3317 rhaas 1420 GIC 136 : {
1421 : long sleeptime;
1942 andrew 1422 ECB :
1423 : /* Check for input from the client */
1942 andrew 1424 CBC 242 : ProcessRepliesIfAny();
1942 andrew 1425 ECB :
1426 : /* die if timeout was reached */
1682 noah 1427 CBC 242 : WalSndCheckTimeOut();
1428 :
1429 : /* Send keepalive if the time has come */
1430 242 : WalSndKeepaliveIfNecessary();
1431 :
1942 andrew 1432 GIC 242 : if (!pq_is_send_pending())
1433 106 : break;
1942 andrew 1434 ECB :
1682 noah 1435 GIC 136 : sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1942 andrew 1436 ECB :
1437 : /* Sleep until something happens or we time out */
769 tmunro 1438 GIC 136 : WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
769 tmunro 1439 ECB : WAIT_EVENT_WAL_SENDER_WRITE_DATA);
1440 :
3004 andres 1441 EUB : /* Clear any already-pending wakeups */
3004 andres 1442 GBC 136 : ResetLatch(MyLatch);
3004 andres 1443 EUB :
3004 andres 1444 GIC 136 : CHECK_FOR_INTERRUPTS();
1445 :
1446 : /* Process any requests or signals received recently */
2134 andres 1447 CBC 136 : if (ConfigReloadPending)
3317 rhaas 1448 EUB : {
2134 andres 1449 UIC 0 : ConfigReloadPending = false;
3317 rhaas 1450 0 : ProcessConfigFile(PGC_SIGHUP);
1451 0 : SyncRepInitConfig();
3317 rhaas 1452 ECB : }
1453 :
1454 : /* Try to flush pending output to the client */
3317 rhaas 1455 GIC 136 : if (pq_flush_if_writable() != 0)
3317 rhaas 1456 UIC 0 : WalSndShutdown();
1457 : }
1458 :
1459 : /* reactivate latch so WalSndLoop knows to continue */
3004 andres 1460 GIC 106 : SetLatch(MyLatch);
3317 rhaas 1461 106 : }
1462 :
2158 simon 1463 ECB : /*
1464 : * LogicalDecodingContext 'update_progress' callback.
1465 : *
1466 : * Write the current position to the lag tracker (see XLogSendPhysical).
375 akapila 1467 : *
1468 : * When skipping empty transactions, send a keepalive message if necessary.
2158 simon 1469 : */
1470 : static void
375 akapila 1471 GIC 2215 : WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
1472 : bool skipped_xact)
1473 : {
1474 : static TimestampTz sendTime = 0;
2158 simon 1475 2215 : TimestampTz now = GetCurrentTimestamp();
333 akapila 1476 2215 : bool pending_writes = false;
1477 2215 : bool end_xact = ctx->end_xact;
1478 :
1479 : /*
2153 bruce 1480 ECB : * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
1481 : * avoid flooding the lag tracker when we commit frequently.
1482 : *
333 akapila 1483 : * We don't have a mechanism to get the ack for any LSN other than end
1484 : * xact LSN from the downstream. So, we track lag only for end of
1485 : * transaction LSN.
1486 : */
1487 : #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
333 akapila 1488 GIC 2215 : if (end_xact && TimestampDifferenceExceeds(sendTime, now,
1489 : WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
1490 : {
375 1491 164 : LagTrackerWrite(lsn, now);
1492 164 : sendTime = now;
1493 : }
1494 :
375 akapila 1495 ECB : /*
1496 : * When skipping empty transactions in synchronous replication, we send a
1497 : * keepalive message to avoid delaying such transactions.
1498 : *
332 tgl 1499 EUB : * It is okay to check sync_standbys_defined flag without lock here as in
1500 : * the worst case we will just send an extra keepalive message when it is
1501 : * really not required.
375 akapila 1502 : */
375 akapila 1503 GBC 2215 : if (skipped_xact &&
375 akapila 1504 GIC 255 : SyncRepRequested() &&
1505 255 : ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
375 akapila 1506 EUB : {
375 akapila 1507 UBC 0 : WalSndKeepalive(false, lsn);
1508 :
1509 : /* Try to flush pending output to the client */
375 akapila 1510 UIC 0 : if (pq_flush_if_writable() != 0)
1511 0 : WalSndShutdown();
1512 :
1513 : /* If we have pending write here, make sure it's actually flushed */
1514 0 : if (pq_is_send_pending())
333 1515 0 : pending_writes = true;
1516 : }
333 akapila 1517 ECB :
1518 : /*
1519 : * Process pending writes if any or try to send a keepalive if required.
333 akapila 1520 EUB : * We don't need to try sending keep alive messages at the transaction end
333 akapila 1521 ECB : * as that will be done at a later point in time. This is required only
1522 : * for large transactions where we don't send any changes to the
1523 : * downstream and the receiver can timeout due to that.
1524 : */
333 akapila 1525 GIC 2215 : if (pending_writes || (!end_xact &&
1526 1528 : now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
1527 : wal_sender_timeout / 2)))
333 akapila 1528 UIC 0 : ProcessPendingWrites();
2158 simon 1529 GIC 2215 : }
1530 :
3317 rhaas 1531 ECB : /*
1532 : * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
1533 : *
1534 : * Returns end LSN of flushed WAL. Normally this will be >= loc, but
1535 : * if we detect a shutdown request (either from postmaster or client)
1536 : * we will return early, so caller must always check.
1537 : */
1538 : static XLogRecPtr
3317 rhaas 1539 GIC 10781 : WalSndWaitForWal(XLogRecPtr loc)
1540 : {
3317 rhaas 1541 ECB : int wakeEvents;
1542 : static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
1543 :
1544 : /*
1545 : * Fast path to avoid acquiring the spinlock in case we already know we
1546 : * have enough WAL available. This is particularly interesting if we're
1547 : * far behind.
1548 : */
3317 rhaas 1549 CBC 10781 : if (RecentFlushPtr != InvalidXLogRecPtr &&
3317 rhaas 1550 GIC 10392 : loc <= RecentFlushPtr)
1551 8957 : return RecentFlushPtr;
3317 rhaas 1552 ECB :
1553 : /* Get a more recent flush pointer. */
3317 rhaas 1554 GIC 1824 : if (!RecoveryInProgress())
520 1555 1553 : RecentFlushPtr = GetFlushRecPtr(NULL);
3317 rhaas 1556 ECB : else
3317 rhaas 1557 GIC 271 : RecentFlushPtr = GetXLogReplayRecPtr(NULL);
3317 rhaas 1558 ECB :
1559 : for (;;)
3317 rhaas 1560 GIC 2087 : {
3317 rhaas 1561 ECB : long sleeptime;
1562 :
3004 andres 1563 : /* Clear any already-pending wakeups */
3004 andres 1564 CBC 3911 : ResetLatch(MyLatch);
3004 andres 1565 ECB :
3004 andres 1566 GIC 3911 : CHECK_FOR_INTERRUPTS();
1567 :
1568 : /* Process any requests or signals received recently */
2134 andres 1569 CBC 3905 : if (ConfigReloadPending)
1570 : {
2134 andres 1571 GIC 3 : ConfigReloadPending = false;
3317 rhaas 1572 3 : ProcessConfigFile(PGC_SIGHUP);
1573 3 : SyncRepInitConfig();
1574 : }
1575 :
3317 rhaas 1576 ECB : /* Check for input from the client */
3317 rhaas 1577 CBC 3905 : ProcessRepliesIfAny();
1578 :
1579 : /*
2134 andres 1580 ECB : * If we're shutting down, trigger pending WAL to be written out,
1581 : * otherwise we'd possibly end up waiting for WAL that never gets
1582 : * written, because walwriter has shut down already.
1583 : */
2134 andres 1584 GIC 3789 : if (got_STOPPING)
1585 108 : XLogBackgroundFlush();
1586 :
1587 : /* Update our idea of the currently flushed position. */
3317 rhaas 1588 3789 : if (!RecoveryInProgress())
520 1589 3453 : RecentFlushPtr = GetFlushRecPtr(NULL);
1590 : else
3317 1591 336 : RecentFlushPtr = GetXLogReplayRecPtr(NULL);
3317 rhaas 1592 ECB :
1593 : /*
1594 : * If postmaster asked us to stop, don't wait anymore.
1595 : *
1596 : * It's important to do this check after the recomputation of
1597 : * RecentFlushPtr, so we can send all remaining data before shutting
1598 : * down.
1599 : */
2134 andres 1600 GIC 3789 : if (got_STOPPING)
3317 rhaas 1601 108 : break;
1602 :
3317 rhaas 1603 ECB : /*
1604 : * We only send regular messages to the client for full decoded
1605 : * transactions, but a synchronous replication and walsender shutdown
1079 noah 1606 : * possibly are waiting for a later location. So, before sleeping, we
1607 : * send a ping containing the flush location. If the receiver is
1608 : * otherwise idle, this keepalive will trigger a reply. Processing the
1609 : * reply will update these MyWalSnd locations.
3317 rhaas 1610 : */
3162 andres 1611 GIC 3681 : if (MyWalSnd->flush < sentPtr &&
1612 1970 : MyWalSnd->write < sentPtr &&
3162 andres 1613 CBC 1463 : !waiting_for_ping_response)
375 akapila 1614 GIC 1463 : WalSndKeepalive(false, InvalidXLogRecPtr);
1615 :
1616 : /* check whether we're done */
3317 rhaas 1617 3681 : if (loc <= RecentFlushPtr)
3317 rhaas 1618 CBC 1498 : break;
3317 rhaas 1619 EUB :
1620 : /* Waiting for new WAL. Since we need to wait, we're now caught up. */
3317 rhaas 1621 GIC 2183 : WalSndCaughtUp = true;
1622 :
1623 : /*
1624 : * Try to flush any pending output to the client.
1625 : */
3317 rhaas 1626 CBC 2183 : if (pq_flush_if_writable() != 0)
3317 rhaas 1627 LBC 0 : WalSndShutdown();
3317 rhaas 1628 ECB :
1629 : /*
1630 : * If we have received CopyDone from the client, sent CopyDone
2109 tgl 1631 : * ourselves, and the output buffer is empty, it's time to exit
1632 : * streaming, so fail the current WAL fetch request.
1633 : */
2109 tgl 1634 CBC 2183 : if (streamingDoneReceiving && streamingDoneSending &&
2109 tgl 1635 GIC 96 : !pq_is_send_pending())
1636 96 : break;
1637 :
1638 : /* die if timeout was reached */
1682 noah 1639 2087 : WalSndCheckTimeOut();
1640 :
1641 : /* Send keepalive if the time has come */
1642 2087 : WalSndKeepaliveIfNecessary();
3317 rhaas 1643 ECB :
1644 : /*
2109 tgl 1645 : * Sleep until something happens or we time out. Also wait for the
1646 : * socket becoming writable, if there's still pending output.
1647 : * Otherwise we might sit on sendable output data while waiting for
2109 tgl 1648 EUB : * new WAL to be generated. (But if we have nothing to send, we don't
1649 : * want to wake on socket-writable.)
2109 tgl 1650 ECB : */
1682 noah 1651 GIC 2087 : sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
1652 :
769 tmunro 1653 2087 : wakeEvents = WL_SOCKET_READABLE;
3317 rhaas 1654 ECB :
3317 rhaas 1655 CBC 2087 : if (pq_is_send_pending())
3317 rhaas 1656 UIC 0 : wakeEvents |= WL_SOCKET_WRITEABLE;
1657 :
769 tmunro 1658 GIC 2087 : WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
1659 : }
1660 :
1661 : /* reactivate latch so WalSndLoop knows to continue */
3004 andres 1662 1702 : SetLatch(MyLatch);
3317 rhaas 1663 1702 : return RecentFlushPtr;
1664 : }
3317 rhaas 1665 ECB :
1666 : /*
1667 : * Execute an incoming replication command.
1668 : *
1669 : * Returns true if the cmd_string was recognized as WalSender command, false
1670 : * if not.
1671 : */
1672 : bool
3838 heikki.linnakangas 1673 GIC 3914 : exec_replication_command(const char *cmd_string)
1674 : {
1675 : int parse_rc;
1676 : Node *cmd_node;
935 alvherre 1677 ECB : const char *cmdtag;
4468 magnus 1678 EUB : MemoryContext cmd_context;
1679 : MemoryContext old_context;
1680 :
1681 : /*
1682 : * If WAL sender has been told that shutdown is getting close, switch its
1683 : * status accordingly to handle the next replication commands correctly.
1684 : */
2134 andres 1685 CBC 3914 : if (got_STOPPING)
2134 andres 1686 UBC 0 : WalSndSetState(WALSNDSTATE_STOPPING);
1687 :
1688 : /*
1689 : * Throw error if in stopping mode. We need prevent commands that could
1690 : * generate WAL while the shutdown checkpoint is being written. To be
1691 : * safe, we just prohibit all new commands.
1692 : */
2134 andres 1693 GIC 3914 : if (MyWalSnd->state == WALSNDSTATE_STOPPING)
2134 andres 1694 LBC 0 : ereport(ERROR,
1695 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
440 tgl 1696 ECB : errmsg("cannot execute new commands while WAL sender is in stopping mode")));
1697 :
1698 : /*
1699 : * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
1700 : * command arrives. Clean up the old stuff if there's anything.
3317 rhaas 1701 : */
3317 rhaas 1702 GIC 3914 : SnapBuildClearExportedSnapshot();
1703 :
3838 heikki.linnakangas 1704 CBC 3914 : CHECK_FOR_INTERRUPTS();
1705 :
937 tgl 1706 ECB : /*
1707 : * Prepare to parse and execute the command.
1708 : */
4468 magnus 1709 GIC 3914 : cmd_context = AllocSetContextCreate(CurrentMemoryContext,
1710 : "Replication command context",
2416 tgl 1711 ECB : ALLOCSET_DEFAULT_SIZES);
4468 magnus 1712 GIC 3914 : old_context = MemoryContextSwitchTo(cmd_context);
1713 :
4468 magnus 1714 CBC 3914 : replication_scanner_init(cmd_string);
1715 :
2179 fujii 1716 ECB : /*
440 tgl 1717 : * Is it a WalSender command?
1718 : */
440 tgl 1719 GIC 3914 : if (!replication_scanner_is_replication_command())
937 tgl 1720 ECB : {
440 tgl 1721 EUB : /* Nope; clean up and get out. */
440 tgl 1722 GIC 1752 : replication_scanner_finish();
1723 :
937 1724 1752 : MemoryContextSwitchTo(old_context);
1725 1752 : MemoryContextDelete(cmd_context);
937 tgl 1726 ECB :
1727 : /* XXX this is a pretty random place to make this check */
440 tgl 1728 GIC 1752 : if (MyDatabaseId == InvalidOid)
440 tgl 1729 UIC 0 : ereport(ERROR,
1730 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1731 : errmsg("cannot execute SQL commands in WAL sender for physical replication")));
440 tgl 1732 ECB :
937 1733 : /* Tell the caller that this wasn't a WalSender command. */
937 tgl 1734 GBC 1752 : return false;
1735 : }
1736 :
1737 : /*
440 tgl 1738 ECB : * Looks like a WalSender command, so parse it.
1739 : */
440 tgl 1740 CBC 2162 : parse_rc = replication_yyparse();
440 tgl 1741 GIC 2162 : if (parse_rc != 0)
440 tgl 1742 UIC 0 : ereport(ERROR,
1743 : (errcode(ERRCODE_SYNTAX_ERROR),
1744 : errmsg_internal("replication command parser returned %d",
1745 : parse_rc)));
440 tgl 1746 CBC 2162 : replication_scanner_finish();
1747 :
1748 2162 : cmd_node = replication_parse_result;
1749 :
1750 : /*
1751 : * Report query to various monitoring facilities. For this purpose, we
1752 : * report replication commands just like SQL commands.
1753 : */
937 tgl 1754 GIC 2162 : debug_query_string = cmd_string;
937 tgl 1755 ECB :
937 tgl 1756 GIC 2162 : pgstat_report_activity(STATE_RUNNING, cmd_string);
1757 :
1758 : /*
1759 : * Log replication command if log_replication_commands is enabled. Even
1760 : * when it's disabled, log the command with DEBUG1 level for backward
937 tgl 1761 ECB : * compatibility.
2208 peter_e 1762 EUB : */
937 tgl 1763 GIC 2162 : ereport(log_replication_commands ? LOG : DEBUG1,
1764 : (errmsg("received replication command: %s", cmd_string)));
1765 :
1766 : /*
937 tgl 1767 ECB : * Disallow replication commands in aborted transaction blocks.
1768 : */
937 tgl 1769 GIC 2162 : if (IsAbortedTransactionBlockState())
2208 peter_e 1770 UIC 0 : ereport(ERROR,
1771 : (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
1772 : errmsg("current transaction is aborted, "
2208 peter_e 1773 ECB : "commands ignored until end of transaction block")));
1774 :
2208 peter_e 1775 CBC 2162 : CHECK_FOR_INTERRUPTS();
1776 :
2237 fujii 1777 ECB : /*
1778 : * Allocate buffers that will be used for each outgoing and incoming
1779 : * message. We do this just once per command to reduce palloc overhead.
1780 : */
2237 fujii 1781 CBC 2162 : initStringInfo(&output_message);
1782 2162 : initStringInfo(&reply_message);
1783 2162 : initStringInfo(&tmpbuf);
2237 fujii 1784 ECB :
4468 magnus 1785 GIC 2162 : switch (cmd_node->type)
4468 magnus 1786 ECB : {
4468 magnus 1787 CBC 522 : case T_IdentifySystemCmd:
935 alvherre 1788 522 : cmdtag = "IDENTIFY_SYSTEM";
tgl 1789 522 : set_ps_display(cmdtag);
4468 magnus 1790 522 : IdentifySystem();
935 alvherre 1791 522 : EndReplicationCommand(cmdtag);
4468 magnus 1792 GIC 522 : break;
4468 magnus 1793 ECB :
531 michael 1794 CBC 6 : case T_ReadReplicationSlotCmd:
1795 6 : cmdtag = "READ_REPLICATION_SLOT";
1796 6 : set_ps_display(cmdtag);
1797 6 : ReadReplicationSlot((ReadReplicationSlotCmd *) cmd_node);
1798 5 : EndReplicationCommand(cmdtag);
1799 5 : break;
1800 :
4468 magnus 1801 143 : case T_BaseBackupCmd:
935 alvherre 1802 143 : cmdtag = "BASE_BACKUP";
tgl 1803 143 : set_ps_display(cmdtag);
alvherre 1804 143 : PreventInTransactionBlock(true, cmdtag);
4459 magnus 1805 143 : SendBaseBackup((BaseBackupCmd *) cmd_node);
935 alvherre 1806 117 : EndReplicationCommand(cmdtag);
4459 magnus 1807 GIC 117 : break;
4468 magnus 1808 ECB :
3355 rhaas 1809 CBC 344 : case T_CreateReplicationSlotCmd:
935 alvherre 1810 344 : cmdtag = "CREATE_REPLICATION_SLOT";
tgl 1811 344 : set_ps_display(cmdtag);
3355 rhaas 1812 344 : CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
935 alvherre 1813 343 : EndReplicationCommand(cmdtag);
3355 rhaas 1814 GIC 343 : break;
3355 rhaas 1815 ECB :
3355 rhaas 1816 GIC 192 : case T_DropReplicationSlotCmd:
935 alvherre 1817 CBC 192 : cmdtag = "DROP_REPLICATION_SLOT";
935 tgl 1818 GIC 192 : set_ps_display(cmdtag);
3355 rhaas 1819 CBC 192 : DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
935 alvherre 1820 192 : EndReplicationCommand(cmdtag);
3355 rhaas 1821 192 : break;
1822 :
1823 493 : case T_StartReplicationCmd:
3355 rhaas 1824 ECB : {
3355 rhaas 1825 GIC 493 : StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
3260 bruce 1826 ECB :
935 alvherre 1827 GIC 493 : cmdtag = "START_REPLICATION";
tgl 1828 493 : set_ps_display(cmdtag);
935 alvherre 1829 CBC 493 : PreventInTransactionBlock(true, cmdtag);
1830 :
3355 rhaas 1831 493 : if (cmd->kind == REPLICATION_KIND_PHYSICAL)
1832 195 : StartReplication(cmd);
1833 : else
3317 rhaas 1834 GIC 298 : StartLogicalReplication(cmd);
1035 michael 1835 ECB :
907 alvherre 1836 : /* dupe, but necessary per libpqrcv_endstreaming */
907 alvherre 1837 CBC 264 : EndReplicationCommand(cmdtag);
935 alvherre 1838 ECB :
1035 michael 1839 CBC 264 : Assert(xlogreader != NULL);
3355 rhaas 1840 264 : break;
3355 rhaas 1841 ECB : }
1842 :
3769 heikki.linnakangas 1843 CBC 13 : case T_TimeLineHistoryCmd:
935 alvherre 1844 GIC 13 : cmdtag = "TIMELINE_HISTORY";
935 tgl 1845 CBC 13 : set_ps_display(cmdtag);
alvherre 1846 13 : PreventInTransactionBlock(true, cmdtag);
3769 heikki.linnakangas 1847 GIC 13 : SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
935 alvherre 1848 CBC 13 : EndReplicationCommand(cmdtag);
3769 heikki.linnakangas 1849 13 : break;
1850 :
2266 rhaas 1851 GIC 449 : case T_VariableShowStmt:
2266 rhaas 1852 ECB : {
2266 rhaas 1853 CBC 449 : DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
1854 449 : VariableShowStmt *n = (VariableShowStmt *) cmd_node;
2266 rhaas 1855 ECB :
935 alvherre 1856 GIC 449 : cmdtag = "SHOW";
935 tgl 1857 CBC 449 : set_ps_display(cmdtag);
1858 :
1455 michael 1859 EUB : /* syscache access needs a transaction environment */
1455 michael 1860 GBC 449 : StartTransactionCommand();
2266 rhaas 1861 GIC 449 : GetPGVariable(n->name, dest);
1455 michael 1862 449 : CommitTransactionCommand();
935 alvherre 1863 449 : EndReplicationCommand(cmdtag);
1864 : }
2266 rhaas 1865 CBC 449 : break;
2266 rhaas 1866 ECB :
4468 magnus 1867 UIC 0 : default:
3769 heikki.linnakangas 1868 0 : elog(ERROR, "unrecognized replication command node tag: %u",
1869 : cmd_node->type);
1870 : }
1871 :
1872 : /* done */
4468 magnus 1873 CBC 1905 : MemoryContextSwitchTo(old_context);
4468 magnus 1874 GIC 1905 : MemoryContextDelete(cmd_context);
4468 magnus 1875 ECB :
1876 : /*
1877 : * We need not update ps display or pg_stat_activity, because PostgresMain
1878 : * will reset those to "idle". But we must reset debug_query_string to
1879 : * ensure it doesn't become a dangling pointer.
1880 : */
937 tgl 1881 GIC 1905 : debug_query_string = NULL;
1882 :
2208 peter_e 1883 CBC 1905 : return true;
1884 : }
1885 :
1886 : /*
1887 : * Process any incoming messages while streaming. Also checks if the remote
3769 heikki.linnakangas 1888 ECB : * end has closed the connection.
1889 : */
4832 1890 : static void
4441 heikki.linnakangas 1891 GIC 954113 : ProcessRepliesIfAny(void)
1892 : {
1893 : unsigned char firstchar;
1894 : int maxmsglen;
1895 : int r;
4260 tgl 1896 954113 : bool received = false;
4832 heikki.linnakangas 1897 ECB :
1682 noah 1898 GIC 954113 : last_processing = GetCurrentTimestamp();
1682 noah 1899 ECB :
846 jdavis 1900 : /*
1901 : * If we already received a CopyDone from the frontend, any subsequent
1902 : * message is the beginning of a new command, and should be processed in
1903 : * the main processing loop.
1904 : */
846 jdavis 1905 GIC 1045647 : while (!streamingDoneReceiving)
1906 : {
2988 heikki.linnakangas 1907 CBC 1045124 : pq_startmsgread();
4433 simon 1908 GIC 1045124 : r = pq_getbyte_if_available(&firstchar);
4433 simon 1909 CBC 1045124 : if (r < 0)
1910 : {
1911 : /* unexpected error or EOF */
1912 16 : ereport(COMMERROR,
4433 simon 1913 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1914 : errmsg("unexpected EOF on standby connection")));
4433 simon 1915 GIC 16 : proc_exit(0);
1916 : }
4433 simon 1917 CBC 1045108 : if (r == 0)
1918 : {
4433 simon 1919 ECB : /* no data available without blocking */
2988 heikki.linnakangas 1920 CBC 953418 : pq_endmsgread();
4393 1921 953418 : break;
4433 simon 1922 ECB : }
1923 :
711 tgl 1924 : /* Validate message type and set packet size limit */
711 tgl 1925 CBC 91690 : switch (firstchar)
711 tgl 1926 EUB : {
711 tgl 1927 GBC 91269 : case 'd':
711 tgl 1928 GIC 91269 : maxmsglen = PQ_LARGE_MESSAGE_LIMIT;
1929 91269 : break;
1930 421 : case 'c':
1931 : case 'X':
1932 421 : maxmsglen = PQ_SMALL_MESSAGE_LIMIT;
1933 421 : break;
711 tgl 1934 UIC 0 : default:
1935 0 : ereport(FATAL,
711 tgl 1936 ECB : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1937 : errmsg("invalid standby message type \"%c\"",
1938 : firstchar)));
711 tgl 1939 EUB : maxmsglen = 0; /* keep compiler quiet */
1940 : break;
1941 : }
1942 :
1943 : /* Read the message contents */
2988 heikki.linnakangas 1944 GIC 91690 : resetStringInfo(&reply_message);
711 tgl 1945 91690 : if (pq_getmessage(&reply_message, maxmsglen))
2988 heikki.linnakangas 1946 ECB : {
2988 heikki.linnakangas 1947 UIC 0 : ereport(COMMERROR,
1948 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1949 : errmsg("unexpected EOF on standby connection")));
1950 0 : proc_exit(0);
2988 heikki.linnakangas 1951 ECB : }
1952 :
711 tgl 1953 : /* ... and process it */
4433 simon 1954 CBC 91690 : switch (firstchar)
1955 : {
1956 : /*
1957 : * 'd' means a standby reply wrapped in a CopyData packet.
1958 : */
4433 simon 1959 GIC 91269 : case 'd':
4433 simon 1960 CBC 91269 : ProcessStandbyMessage();
4393 heikki.linnakangas 1961 91269 : received = true;
4433 simon 1962 GIC 91269 : break;
4441 heikki.linnakangas 1963 ECB :
3769 1964 : /*
1965 : * CopyDone means the standby requested to finish streaming.
1966 : * Reply with CopyDone, if we had not sent that already.
1967 : */
3769 heikki.linnakangas 1968 CBC 265 : case 'c':
1969 265 : if (!streamingDoneSending)
1970 : {
3769 heikki.linnakangas 1971 GIC 252 : pq_putmessage_noblock('c', NULL, 0);
1972 252 : streamingDoneSending = true;
1973 : }
3769 heikki.linnakangas 1974 ECB :
3769 heikki.linnakangas 1975 CBC 265 : streamingDoneReceiving = true;
3769 heikki.linnakangas 1976 GIC 265 : received = true;
3769 heikki.linnakangas 1977 GBC 265 : break;
3769 heikki.linnakangas 1978 EUB :
1979 : /*
1980 : * 'X' means that the standby is closing down the socket.
1981 : */
4433 simon 1982 GIC 156 : case 'X':
1983 156 : proc_exit(0);
1984 :
4433 simon 1985 LBC 0 : default:
711 tgl 1986 UIC 0 : Assert(false); /* NOT REACHED */
4433 simon 1987 ECB : }
4832 heikki.linnakangas 1988 : }
1989 :
4393 1990 : /*
1991 : * Save the last reply timestamp if we've received at least one reply.
1992 : */
4393 heikki.linnakangas 1993 GIC 953941 : if (received)
1994 : {
1682 noah 1995 34263 : last_reply_timestamp = last_processing;
3317 rhaas 1996 CBC 34263 : waiting_for_ping_response = false;
1997 : }
4832 heikki.linnakangas 1998 GIC 953941 : }
1999 :
2000 : /*
2001 : * Process a status update message received from standby.
2002 : */
4441 heikki.linnakangas 2003 ECB : static void
4433 simon 2004 GIC 91269 : ProcessStandbyMessage(void)
4441 heikki.linnakangas 2005 ECB : {
2006 : char msgtype;
2007 :
2008 : /*
4406 rhaas 2009 : * Check message type from the first byte.
2010 : */
4436 rhaas 2011 CBC 91269 : msgtype = pq_getmsgbyte(&reply_message);
4433 simon 2012 ECB :
4433 simon 2013 CBC 91269 : switch (msgtype)
2014 : {
4433 simon 2015 GBC 91163 : case 'r':
2016 91163 : ProcessStandbyReplyMessage();
4433 simon 2017 GIC 91163 : break;
2018 :
4433 simon 2019 GBC 106 : case 'h':
4433 simon 2020 GIC 106 : ProcessStandbyHSFeedbackMessage();
4433 simon 2021 CBC 106 : break;
2022 :
4433 simon 2023 UIC 0 : default:
2024 0 : ereport(COMMERROR,
2025 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
2026 : errmsg("unexpected message type \"%c\"", msgtype)));
4433 simon 2027 LBC 0 : proc_exit(0);
2028 : }
4433 simon 2029 CBC 91269 : }
4433 simon 2030 ECB :
2031 : /*
3355 rhaas 2032 : * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
2033 : */
2034 : static void
3355 rhaas 2035 GIC 46270 : PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
3355 rhaas 2036 ECB : {
3260 bruce 2037 CBC 46270 : bool changed = false;
2742 rhaas 2038 GIC 46270 : ReplicationSlot *slot = MyReplicationSlot;
3355 rhaas 2039 ECB :
3355 rhaas 2040 GIC 46270 : Assert(lsn != InvalidXLogRecPtr);
3355 rhaas 2041 CBC 46270 : SpinLockAcquire(&slot->mutex);
3355 rhaas 2042 GIC 46270 : if (slot->data.restart_lsn != lsn)
3355 rhaas 2043 ECB : {
3355 rhaas 2044 CBC 17531 : changed = true;
3355 rhaas 2045 GIC 17531 : slot->data.restart_lsn = lsn;
2046 : }
2047 46270 : SpinLockRelease(&slot->mutex);
2048 :
2049 46270 : if (changed)
2050 : {
2051 17531 : ReplicationSlotMarkDirty();
2052 17531 : ReplicationSlotsComputeRequiredLSN();
3355 rhaas 2053 ECB : }
2054 :
2055 : /*
2056 : * One could argue that the slot should be saved to disk now, but that'd
2057 : * be energy wasted - the worst thing lost information could cause here is
2058 : * to give wrong information in a statistics view - we'll just potentially
2059 : * be more conservative in removing files.
2060 : */
3355 rhaas 2061 GIC 46270 : }
2062 :
2063 : /*
2064 : * Regular reply from standby advising of WAL locations on standby server.
2065 : */
2066 : static void
4433 simon 2067 91163 : ProcessStandbyReplyMessage(void)
2068 : {
2069 : XLogRecPtr writePtr,
2070 : flushPtr,
2071 : applyPtr;
2072 : bool replyRequested;
2073 : TimeOffset writeLag,
2074 : flushLag,
2208 simon 2075 ECB : applyLag;
2076 : bool clearLagTimes;
2077 : TimestampTz now;
1582 michael 2078 : TimestampTz replyTime;
2208 simon 2079 :
2080 : static bool fullyAppliedLastTime = false;
3805 heikki.linnakangas 2081 :
2082 : /* the caller already consumed the msgtype byte */
3805 heikki.linnakangas 2083 GIC 91163 : writePtr = pq_getmsgint64(&reply_message);
2084 91163 : flushPtr = pq_getmsgint64(&reply_message);
2085 91163 : applyPtr = pq_getmsgint64(&reply_message);
1582 michael 2086 CBC 91163 : replyTime = pq_getmsgint64(&reply_message);
3805 heikki.linnakangas 2087 GIC 91163 : replyRequested = pq_getmsgbyte(&reply_message);
3805 heikki.linnakangas 2088 ECB :
867 tgl 2089 GIC 91163 : if (message_level_is_interesting(DEBUG2))
2090 : {
2091 : char *replyTimeStr;
2092 :
2093 : /* Copy because timestamptz_to_str returns a static buffer */
1582 michael 2094 1220 : replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1582 michael 2095 ECB :
1582 michael 2096 GIC 1220 : elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
2097 : LSN_FORMAT_ARGS(writePtr),
2098 : LSN_FORMAT_ARGS(flushPtr),
775 peter 2099 ECB : LSN_FORMAT_ARGS(applyPtr),
1582 michael 2100 : replyRequested ? " (reply requested)" : "",
2101 : replyTimeStr);
2102 :
1582 michael 2103 GIC 1220 : pfree(replyTimeStr);
2104 : }
2105 :
2106 : /* See if we can compute the round-trip lag for these positions. */
2208 simon 2107 91163 : now = GetCurrentTimestamp();
2108 91163 : writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
2109 91163 : flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
2110 91163 : applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
2111 :
2208 simon 2112 ECB : /*
2113 : * If the standby reports that it has fully replayed the WAL in two
2114 : * consecutive reply messages, then the second such message must result
2115 : * from wal_receiver_status_interval expiring on the standby. This is a
2116 : * convenient time to forget the lag times measured when it last
2117 : * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
2118 : * until more WAL traffic arrives.
2119 : */
2208 simon 2120 CBC 91163 : clearLagTimes = false;
2208 simon 2121 GIC 91163 : if (applyPtr == sentPtr)
2122 : {
2208 simon 2123 CBC 12132 : if (fullyAppliedLastTime)
2208 simon 2124 GBC 810 : clearLagTimes = true;
2208 simon 2125 GIC 12132 : fullyAppliedLastTime = true;
2126 : }
2127 : else
2128 79031 : fullyAppliedLastTime = false;
2129 :
2130 : /* Send a reply if the standby requested one. */
3805 heikki.linnakangas 2131 CBC 91163 : if (replyRequested)
375 akapila 2132 UIC 0 : WalSndKeepalive(false, InvalidXLogRecPtr);
3832 heikki.linnakangas 2133 ECB :
4441 2134 : /*
4382 bruce 2135 : * Update shared state for this WalSender process based on reply data from
2136 : * standby.
4441 heikki.linnakangas 2137 : */
2138 : {
2495 rhaas 2139 CBC 91163 : WalSnd *walsnd = MyWalSnd;
4441 heikki.linnakangas 2140 ECB :
4441 heikki.linnakangas 2141 CBC 91163 : SpinLockAcquire(&walsnd->mutex);
3805 2142 91163 : walsnd->write = writePtr;
2143 91163 : walsnd->flush = flushPtr;
2144 91163 : walsnd->apply = applyPtr;
2208 simon 2145 GIC 91163 : if (writeLag != -1 || clearLagTimes)
2146 20487 : walsnd->writeLag = writeLag;
2208 simon 2147 CBC 91163 : if (flushLag != -1 || clearLagTimes)
2148 37925 : walsnd->flushLag = flushLag;
2208 simon 2149 GIC 91163 : if (applyLag != -1 || clearLagTimes)
2150 48879 : walsnd->applyLag = applyLag;
1582 michael 2151 91163 : walsnd->replyTime = replyTime;
4441 heikki.linnakangas 2152 91163 : SpinLockRelease(&walsnd->mutex);
4441 heikki.linnakangas 2153 ECB : }
2154 :
4282 simon 2155 CBC 91163 : if (!am_cascading_walsender)
2156 90688 : SyncRepReleaseWaiters();
2157 :
3355 rhaas 2158 ECB : /*
2159 : * Advance our local xmin horizon when the client confirmed a flush.
2160 : */
3355 rhaas 2161 GIC 91163 : if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
2162 : {
2798 andres 2163 89528 : if (SlotIsLogical(MyReplicationSlot))
3317 rhaas 2164 CBC 43258 : LogicalConfirmReceivedLocation(flushPtr);
2165 : else
3355 2166 46270 : PhysicalConfirmReceivedLocation(flushPtr);
3355 rhaas 2167 ECB : }
3355 rhaas 2168 GIC 91163 : }
3355 rhaas 2169 ECB :
2170 : /* compute new replication slot xmin horizon if needed */
2171 : static void
2206 simon 2172 GIC 40 : PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
2173 : {
3260 bruce 2174 40 : bool changed = false;
2742 rhaas 2175 40 : ReplicationSlot *slot = MyReplicationSlot;
2176 :
3355 rhaas 2177 CBC 40 : SpinLockAcquire(&slot->mutex);
969 andres 2178 40 : MyProc->xmin = InvalidTransactionId;
3260 bruce 2179 ECB :
2180 : /*
2181 : * For physical replication we don't need the interlock provided by xmin
2182 : * and effective_xmin since the consequences of a missed increase are
2183 : * limited to query cancellations, so set both at once.
2184 : */
3355 rhaas 2185 CBC 40 : if (!TransactionIdIsNormal(slot->data.xmin) ||
2186 14 : !TransactionIdIsNormal(feedbackXmin) ||
2187 14 : TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
2188 : {
2189 31 : changed = true;
2190 31 : slot->data.xmin = feedbackXmin;
2191 31 : slot->effective_xmin = feedbackXmin;
2192 : }
2206 simon 2193 40 : if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
2206 simon 2194 GIC 1 : !TransactionIdIsNormal(feedbackCatalogXmin) ||
2206 simon 2195 CBC 1 : TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
2196 : {
2197 39 : changed = true;
2198 39 : slot->data.catalog_xmin = feedbackCatalogXmin;
2206 simon 2199 GIC 39 : slot->effective_catalog_xmin = feedbackCatalogXmin;
2206 simon 2200 ECB : }
3355 rhaas 2201 GIC 40 : SpinLockRelease(&slot->mutex);
2202 :
2203 40 : if (changed)
2204 : {
2205 39 : ReplicationSlotMarkDirty();
3324 2206 39 : ReplicationSlotsComputeRequiredXmin(false);
2207 : }
4433 simon 2208 40 : }
2209 :
2210 : /*
2211 : * Check that the provided xmin/epoch are sane, that is, not in the future
2212 : * and not so far back as to be already wrapped around.
2206 simon 2213 ECB : *
2214 : * Epoch of nextXid should be same as standby, or if the counter has
2215 : * wrapped, then one greater than standby.
2216 : *
2217 : * This check doesn't care about whether clog exists for these xids
2218 : * at all.
2219 : */
2220 : static bool
2206 simon 2221 CBC 22 : TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
2222 : {
1473 tmunro 2223 ECB : FullTransactionId nextFullXid;
2224 : TransactionId nextXid;
2206 simon 2225 : uint32 nextEpoch;
2206 simon 2226 EUB :
1473 tmunro 2227 GIC 22 : nextFullXid = ReadNextFullTransactionId();
2228 22 : nextXid = XidFromFullTransactionId(nextFullXid);
2229 22 : nextEpoch = EpochFromFullTransactionId(nextFullXid);
2206 simon 2230 EUB :
2206 simon 2231 GBC 22 : if (xid <= nextXid)
2232 : {
2206 simon 2233 GIC 22 : if (epoch != nextEpoch)
2206 simon 2234 LBC 0 : return false;
2206 simon 2235 EUB : }
2236 : else
2206 simon 2237 ECB : {
2206 simon 2238 UIC 0 : if (epoch + 1 != nextEpoch)
2239 0 : return false;
2240 : }
2241 :
2206 simon 2242 GIC 22 : if (!TransactionIdPrecedesOrEquals(xid, nextXid))
2153 bruce 2243 UIC 0 : return false; /* epoch OK, but it's wrapped around */
2206 simon 2244 ECB :
2206 simon 2245 GIC 22 : return true;
2246 : }
2247 :
2248 : /*
2249 : * Hot Standby feedback
2250 : */
2251 : static void
4433 2252 106 : ProcessStandbyHSFeedbackMessage(void)
2253 : {
2254 : TransactionId feedbackXmin;
2255 : uint32 feedbackEpoch;
2256 : TransactionId feedbackCatalogXmin;
2206 simon 2257 ECB : uint32 feedbackCatalogEpoch;
1582 michael 2258 : TimestampTz replyTime;
4433 simon 2259 :
3805 heikki.linnakangas 2260 : /*
2261 : * Decipher the reply message. The caller already consumed the msgtype
2262 : * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
2206 simon 2263 : * of this message.
2264 : */
1582 michael 2265 GIC 106 : replyTime = pq_getmsgint64(&reply_message);
3805 heikki.linnakangas 2266 106 : feedbackXmin = pq_getmsgint(&reply_message, 4);
2267 106 : feedbackEpoch = pq_getmsgint(&reply_message, 4);
2206 simon 2268 CBC 106 : feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
2206 simon 2269 GIC 106 : feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
4433 simon 2270 ECB :
867 tgl 2271 GIC 106 : if (message_level_is_interesting(DEBUG2))
2272 : {
2273 : char *replyTimeStr;
2274 :
2275 : /* Copy because timestamptz_to_str returns a static buffer */
1582 michael 2276 15 : replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
1582 michael 2277 ECB :
1582 michael 2278 GIC 15 : elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
2279 : feedbackXmin,
2280 : feedbackEpoch,
2281 : feedbackCatalogXmin,
2282 : feedbackCatalogEpoch,
2283 : replyTimeStr);
2284 :
1582 michael 2285 CBC 15 : pfree(replyTimeStr);
2286 : }
1582 michael 2287 ECB :
2288 : /*
2289 : * Update shared state for this WalSender process based on reply data from
2290 : * standby.
2291 : */
2292 : {
1582 michael 2293 GIC 106 : WalSnd *walsnd = MyWalSnd;
2294 :
2295 106 : SpinLockAcquire(&walsnd->mutex);
1582 michael 2296 CBC 106 : walsnd->replyTime = replyTime;
2297 106 : SpinLockRelease(&walsnd->mutex);
2298 : }
4435 simon 2299 ECB :
2206 2300 : /*
2301 : * Unset WalSender's xmins if the feedback message values are invalid.
2302 : * This happens when the downstream turned hot_standby_feedback off.
2303 : */
2206 simon 2304 GIC 106 : if (!TransactionIdIsNormal(feedbackXmin)
2305 87 : && !TransactionIdIsNormal(feedbackCatalogXmin))
2306 : {
969 andres 2307 87 : MyProc->xmin = InvalidTransactionId;
3355 rhaas 2308 87 : if (MyReplicationSlot != NULL)
2206 simon 2309 CBC 21 : PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
4189 tgl 2310 87 : return;
3716 simon 2311 EUB : }
2312 :
4435 simon 2313 ECB : /*
4189 tgl 2314 : * Check that the provided xmin/epoch are sane, that is, not in the future
4189 tgl 2315 EUB : * and not so far back as to be already wrapped around. Ignore if not.
2316 : */
2206 simon 2317 GIC 19 : if (TransactionIdIsNormal(feedbackXmin) &&
2318 19 : !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
2206 simon 2319 UIC 0 : return;
2320 :
2206 simon 2321 GIC 19 : if (TransactionIdIsNormal(feedbackCatalogXmin) &&
2322 3 : !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
2206 simon 2323 UIC 0 : return;
2324 :
2325 : /*
2326 : * Set the WalSender's xmin equal to the standby's requested xmin, so that
2327 : * the xmin will be taken into account by GetSnapshotData() /
2328 : * ComputeXidHorizons(). This will hold back the removal of dead rows and
2329 : * thereby prevent the generation of cleanup conflicts on the standby
2330 : * server.
2331 : *
2332 : * There is a small window for a race condition here: although we just
2333 : * checked that feedbackXmin precedes nextXid, the nextXid could have
2334 : * gotten advanced between our fetching it and applying the xmin below,
2335 : * perhaps far enough to make feedbackXmin wrap around. In that case the
2336 : * xmin we set here would be "in the future" and have no effect. No point
2337 : * in worrying about this since it's too late to save the desired data
2338 : * anyway. Assuming that the standby sends us an increasing sequence of
2339 : * xmins, this could only happen during the first reply cycle, else our
2340 : * own xmin would prevent nextXid from advancing so far.
2341 : *
2342 : * We don't bother taking the ProcArrayLock here. Setting the xmin field
2343 : * is assumed atomic, and there's no real need to prevent concurrent
2344 : * horizon determinations. (If we're moving our xmin forward, this is
2345 : * obviously safe, and if we're moving it backwards, well, the data is at
2346 : * risk already since a VACUUM could already have determined the horizon.)
2347 : *
3355 rhaas 2348 ECB : * If we're using a replication slot we reserve the xmin via that,
969 andres 2349 : * otherwise via the walsender's PGPROC entry. We can only track the
2350 : * catalog xmin separately when using a slot, so we store the least of the
2351 : * two provided when not using a slot.
3260 bruce 2352 EUB : *
3223 andres 2353 : * XXX: It might make sense to generalize the ephemeral slot concept and
2354 : * always use the slot mechanism to handle the feedback xmin.
2355 : */
2118 tgl 2356 GBC 19 : if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */
2206 simon 2357 GIC 19 : PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
2358 : else
2359 : {
2206 simon 2360 UIC 0 : if (TransactionIdIsNormal(feedbackCatalogXmin)
2361 0 : && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
969 andres 2362 0 : MyProc->xmin = feedbackCatalogXmin;
2363 : else
2364 0 : MyProc->xmin = feedbackXmin;
2365 : }
2366 : }
2367 :
3317 rhaas 2368 ECB : /*
2369 : * Compute how long send/receive loops should sleep.
2370 : *
2371 : * If wal_sender_timeout is enabled we want to wake up in time to send
2372 : * keepalives and to abort the connection if wal_sender_timeout has been
2373 : * reached.
2374 : */
2375 : static long
3317 rhaas 2376 GIC 55918 : WalSndComputeSleeptime(TimestampTz now)
2377 : {
2118 tgl 2378 55918 : long sleeptime = 10000; /* 10 s */
2379 :
3237 andres 2380 CBC 55918 : if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
2381 : {
2382 : TimestampTz wakeup_time;
2383 :
2384 : /*
2385 : * At the latest stop sleeping once wal_sender_timeout has been
2386 : * reached.
2387 : */
3317 rhaas 2388 55894 : wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3317 rhaas 2389 ECB : wal_sender_timeout);
2390 :
2391 : /*
2392 : * If no ping has been sent yet, wakeup when it's time to do so.
3260 bruce 2393 : * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
2394 : * the timeout passed without a response.
2395 : */
3317 rhaas 2396 CBC 55894 : if (!waiting_for_ping_response)
3317 rhaas 2397 GIC 55892 : wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
2398 : wal_sender_timeout / 2);
2399 :
2400 : /* Compute relative time until wakeup. */
880 tgl 2401 55894 : sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
2402 : }
2403 :
3317 rhaas 2404 55918 : return sleeptime;
2405 : }
2406 :
2407 : /*
2408 : * Check whether there have been responses by the client within
2409 : * wal_sender_timeout and shutdown if not. Using last_processing as the
2410 : * reference point avoids counting server-side stalls against the client.
2411 : * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
1682 noah 2412 ECB : * postdate last_processing by more than wal_sender_timeout. If that happens,
2413 : * the client must reply almost immediately to avoid a timeout. This rarely
2414 : * affects the default configuration, under which clients spontaneously send a
2415 : * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We
2416 : * could eliminate that problem by recognizing timeout expiration at
2417 : * wal_sender_timeout/2 after the keepalive.
3317 rhaas 2418 : */
2419 : static void
1682 noah 2420 CBC 951815 : WalSndCheckTimeOut(void)
2421 : {
2422 : TimestampTz timeout;
3317 rhaas 2423 ECB :
2424 : /* don't bail out if we're doing something that doesn't require timeouts */
3237 andres 2425 GIC 951815 : if (last_reply_timestamp <= 0)
2426 24 : return;
2427 :
3317 rhaas 2428 951791 : timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
2429 : wal_sender_timeout);
4832 heikki.linnakangas 2430 EUB :
1682 noah 2431 GIC 951791 : if (wal_sender_timeout > 0 && last_processing >= timeout)
2432 : {
3317 rhaas 2433 EUB : /*
2434 : * Since typically expiration of replication timeout means
2435 : * communication problem, we don't send the error message to the
2436 : * standby.
2437 : */
3317 rhaas 2438 UIC 0 : ereport(COMMERROR,
2118 tgl 2439 ECB : (errmsg("terminating walsender process due to replication timeout")));
2440 :
3317 rhaas 2441 UIC 0 : WalSndShutdown();
2442 : }
2443 : }
2444 :
3317 rhaas 2445 ECB : /* Main loop of walsender process that streams the WAL over Copy messages. */
2446 : static void
3317 rhaas 2447 GIC 483 : WalSndLoop(WalSndSendDataCallback send_data)
2448 : {
2449 : /*
2450 : * Initialize the last reply timestamp. That enables timeout processing
2451 : * from hereon.
2452 : */
4393 heikki.linnakangas 2453 483 : last_reply_timestamp = GetCurrentTimestamp();
3317 rhaas 2454 483 : waiting_for_ping_response = false;
4393 heikki.linnakangas 2455 ECB :
2456 : /*
3602 bruce 2457 : * Loop until we reach the end of this timeline or the client requests to
2458 : * stop streaming.
2459 : */
4832 heikki.linnakangas 2460 : for (;;)
2461 : {
3004 andres 2462 : /* Clear any already-pending wakeups */
3004 andres 2463 CBC 949969 : ResetLatch(MyLatch);
3004 andres 2464 ECB :
3004 andres 2465 GIC 949969 : CHECK_FOR_INTERRUPTS();
2466 :
2467 : /* Process any requests or signals received recently */
2134 andres 2468 CBC 949966 : if (ConfigReloadPending)
2469 : {
2134 andres 2470 GIC 23 : ConfigReloadPending = false;
4832 heikki.linnakangas 2471 23 : ProcessConfigFile(PGC_SIGHUP);
4417 simon 2472 23 : SyncRepInitConfig();
2473 : }
2474 :
4260 tgl 2475 ECB : /* Check for input from the client */
4260 tgl 2476 CBC 949966 : ProcessRepliesIfAny();
4260 tgl 2477 ECB :
2478 : /*
2479 : * If we have received CopyDone from the client, sent CopyDone
2480 : * ourselves, and the output buffer is empty, it's time to exit
2481 : * streaming.
2482 : */
2109 tgl 2483 GIC 949910 : if (streamingDoneReceiving && streamingDoneSending &&
2484 427 : !pq_is_send_pending())
3769 heikki.linnakangas 2485 CBC 264 : break;
3769 heikki.linnakangas 2486 ECB :
2487 : /*
4382 bruce 2488 : * If we don't have any pending data in the output buffer, try to send
2489 : * some more. If there is some, we don't bother to call send_data
2490 : * again until we've flushed it ... but we'd better assume we are not
4260 tgl 2491 : * caught up.
4832 heikki.linnakangas 2492 EUB : */
4393 heikki.linnakangas 2493 GIC 949646 : if (!pq_is_send_pending())
3317 rhaas 2494 945363 : send_data();
4260 tgl 2495 ECB : else
3317 rhaas 2496 GIC 4283 : WalSndCaughtUp = false;
2497 :
2498 : /* Try to flush pending output to the client */
4260 tgl 2499 949513 : if (pq_flush_if_writable() != 0)
3317 rhaas 2500 UIC 0 : WalSndShutdown();
2501 :
2502 : /* If nothing remains to be sent right now ... */
3317 rhaas 2503 GIC 949513 : if (WalSndCaughtUp && !pq_is_send_pending())
2504 : {
4593 heikki.linnakangas 2505 ECB : /*
2506 : * If we're in catchup state, move to streaming. This is an
4260 tgl 2507 : * important state change for users to know about, since before
2508 : * this point data loss might occur if the primary dies and we
2509 : * need to failover to the standby. The state change is also
2510 : * important for synchronous replication, since commits that
2511 : * started to wait at that point might wait for some time.
2512 : */
4260 tgl 2513 GIC 107775 : if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
2514 : {
2515 414 : ereport(DEBUG1,
2516 : (errmsg_internal("\"%s\" has now caught up with upstream server",
2517 : application_name)));
2518 414 : WalSndSetState(WALSNDSTATE_STREAMING);
2519 : }
4832 heikki.linnakangas 2520 ECB :
4260 tgl 2521 : /*
2522 : * When SIGUSR2 arrives, we send any outstanding logs up to the
2523 : * shutdown checkpoint record (i.e., the latest record), wait for
2524 : * them to be replicated to the standby, and exit. This may be a
3260 bruce 2525 : * normal termination at shutdown, or a promotion, the walsender
2526 : * is not sure which.
2527 : */
2134 andres 2528 CBC 107775 : if (got_SIGUSR2)
3317 rhaas 2529 GIC 77 : WalSndDone(send_data);
2530 : }
2531 :
2532 : /* Check for replication timeout. */
1682 noah 2533 949486 : WalSndCheckTimeOut();
2534 :
2535 : /* Send keepalive if the time has come */
1682 noah 2536 CBC 949486 : WalSndKeepaliveIfNecessary();
3321 heikki.linnakangas 2537 ECB :
4393 2538 : /*
2539 : * Block if we have unsent data. XXX For logical replication, let
2540 : * WalSndWaitForWal() handle any other blocking; idle receivers need
2541 : * its additional actions. For physical replication, also block if
2542 : * caught up; its send_data does not block.
2543 : */
1079 noah 2544 CBC 949486 : if ((WalSndCaughtUp && send_data != XLogSendLogical &&
1079 noah 2545 GIC 1005837 : !streamingDoneSending) ||
1079 noah 2546 CBC 899890 : pq_is_send_pending())
2547 : {
2548 : long sleeptime;
2549 : int wakeEvents;
2550 :
846 jdavis 2551 GIC 53695 : if (!streamingDoneReceiving)
769 tmunro 2552 CBC 53688 : wakeEvents = WL_SOCKET_READABLE;
2553 : else
2554 7 : wakeEvents = 0;
4117 simon 2555 ECB :
2556 : /*
2557 : * Use fresh timestamp, not last_processing, to reduce the chance
1682 noah 2558 : * of reaching wal_sender_timeout before sending a keepalive.
2559 : */
1682 noah 2560 GIC 53695 : sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
3317 rhaas 2561 ECB :
1079 noah 2562 GIC 53695 : if (pq_is_send_pending())
2563 4177 : wakeEvents |= WL_SOCKET_WRITEABLE;
2564 :
3832 heikki.linnakangas 2565 ECB : /* Sleep until something happens or we time out */
769 tmunro 2566 GIC 53695 : WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
2567 : }
2568 : }
4832 heikki.linnakangas 2569 264 : }
2570 :
2571 : /* Initialize a per-walsender data structure for this walsender process */
2572 : static void
3838 heikki.linnakangas 2573 CBC 831 : InitWalSenderSlot(void)
4832 heikki.linnakangas 2574 ECB : {
2575 : int i;
2576 :
2577 : /*
2578 : * WalSndCtl should be set up already (we inherit this by fork() or
2579 : * EXEC_BACKEND mechanism from the postmaster).
2580 : */
4832 heikki.linnakangas 2581 GIC 831 : Assert(WalSndCtl != NULL);
4832 heikki.linnakangas 2582 CBC 831 : Assert(MyWalSnd == NULL);
2583 :
4832 heikki.linnakangas 2584 ECB : /*
2585 : * Find a free walsender slot and reserve it. This must not fail due to
1517 michael 2586 : * the prior check for free WAL senders in InitProcess().
2587 : */
4756 rhaas 2588 CBC 1226 : for (i = 0; i < max_wal_senders; i++)
4832 heikki.linnakangas 2589 ECB : {
2495 rhaas 2590 GIC 1226 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
2591 :
4832 heikki.linnakangas 2592 1226 : SpinLockAcquire(&walsnd->mutex);
2593 :
2594 1226 : if (walsnd->pid != 0)
2595 : {
4832 heikki.linnakangas 2596 CBC 395 : SpinLockRelease(&walsnd->mutex);
2597 395 : continue;
4832 heikki.linnakangas 2598 ECB : }
2599 : else
2600 : {
4593 2601 : /*
4589 2602 : * Found a free slot. Reserve it for us.
4593 2603 : */
4832 heikki.linnakangas 2604 CBC 831 : walsnd->pid = MyProcPid;
1086 tgl 2605 831 : walsnd->state = WALSNDSTATE_STARTUP;
3755 alvherre 2606 831 : walsnd->sentPtr = InvalidXLogRecPtr;
1086 tgl 2607 831 : walsnd->needreload = false;
2674 magnus 2608 831 : walsnd->write = InvalidXLogRecPtr;
2674 magnus 2609 GIC 831 : walsnd->flush = InvalidXLogRecPtr;
2610 831 : walsnd->apply = InvalidXLogRecPtr;
2208 simon 2611 831 : walsnd->writeLag = -1;
2612 831 : walsnd->flushLag = -1;
2613 831 : walsnd->applyLag = -1;
1086 tgl 2614 831 : walsnd->sync_standby_priority = 0;
3004 andres 2615 831 : walsnd->latch = &MyProc->procLatch;
1582 michael 2616 831 : walsnd->replyTime = 0;
2617 :
2618 : /*
2619 : * The kind assignment is done here and not in StartReplication()
2620 : * and StartLogicalReplication(). Indeed, the logical walsender
2621 : * needs to read WAL records (like snapshot of running
2622 : * transactions) during the slot creation. So it needs to be woken
2623 : * up based on its kind.
2624 : *
2625 : * The kind assignment could also be done in StartReplication(),
2626 : * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
2627 : * seems better to set it on one place.
2628 : */
1 andres 2629 GNC 831 : if (MyDatabaseId == InvalidOid)
2630 349 : walsnd->kind = REPLICATION_KIND_PHYSICAL;
2631 : else
2632 482 : walsnd->kind = REPLICATION_KIND_LOGICAL;
2633 :
4832 heikki.linnakangas 2634 GIC 831 : SpinLockRelease(&walsnd->mutex);
2635 : /* don't need the lock anymore */
4589 2636 831 : MyWalSnd = (WalSnd *) walsnd;
2637 :
4832 heikki.linnakangas 2638 CBC 831 : break;
4832 heikki.linnakangas 2639 ECB : }
2640 : }
1517 michael 2641 :
1517 michael 2642 GIC 831 : Assert(MyWalSnd != NULL);
4832 heikki.linnakangas 2643 ECB :
2644 : /* Arrange to clean up at walsender exit */
4832 heikki.linnakangas 2645 CBC 831 : on_shmem_exit(WalSndKill, 0);
4832 heikki.linnakangas 2646 GIC 831 : }
4832 heikki.linnakangas 2647 ECB :
2648 : /* Destroy the per-walsender data structure for this walsender process */
2649 : static void
4832 heikki.linnakangas 2650 GIC 831 : WalSndKill(int code, Datum arg)
4832 heikki.linnakangas 2651 ECB : {
3354 tgl 2652 GIC 831 : WalSnd *walsnd = MyWalSnd;
2653 :
3354 tgl 2654 CBC 831 : Assert(walsnd != NULL);
3354 tgl 2655 ECB :
3354 tgl 2656 GIC 831 : MyWalSnd = NULL;
2657 :
3004 andres 2658 831 : SpinLockAcquire(&walsnd->mutex);
3004 andres 2659 ECB : /* clear latch while holding the spinlock, so it can safely be read */
3004 andres 2660 GIC 831 : walsnd->latch = NULL;
3004 andres 2661 ECB : /* Mark WalSnd struct as no longer being in use. */
3354 tgl 2662 GIC 831 : walsnd->pid = 0;
3004 andres 2663 CBC 831 : SpinLockRelease(&walsnd->mutex);
4832 heikki.linnakangas 2664 GIC 831 : }
4832 heikki.linnakangas 2665 ECB :
2666 : /* XLogReaderRoutine->segment_open callback */
1061 alvherre 2667 : static void
1061 alvherre 2668 GIC 820 : WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
1231 alvherre 2669 ECB : TimeLineID *tli_p)
2670 : {
2671 : char path[MAXPGPATH];
4282 simon 2672 :
1231 alvherre 2673 : /*-------
2674 : * When reading from a historic timeline, and there is a timeline switch
2675 : * within this segment, read from the WAL segment belonging to the new
2676 : * timeline.
2677 : *
2678 : * For example, imagine that this server is currently on timeline 5, and
2679 : * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
2680 : * 0/13002088. In pg_wal, we have these files:
2681 : *
2682 : * ...
2683 : * 000000040000000000000012
2684 : * 000000040000000000000013
2685 : * 000000050000000000000013
2686 : * 000000050000000000000014
2687 : * ...
2688 : *
2689 : * In this situation, when requested to send the WAL from segment 0x13, on
2690 : * timeline 4, we read the WAL from file 000000050000000000000013. Archive
2691 : * recovery prefers files from newer timelines, so if the segment was
2692 : * restored from the archive on this server, the file belonging to the old
2693 : * timeline, 000000040000000000000013, might not exist. Their contents are
2694 : * equal up to the switchpoint, because at a timeline switch, the used
2695 : * portion of the old segment is copied to the new file. -------
2696 : */
1231 alvherre 2697 GIC 820 : *tli_p = sendTimeLine;
2698 820 : if (sendTimeLineIsHistoric)
2699 : {
2700 : XLogSegNo endSegNo;
2701 :
1061 2702 12 : XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
815 fujii 2703 12 : if (nextSegNo == endSegNo)
1231 alvherre 2704 10 : *tli_p = sendTimeLineNextTLI;
2705 : }
4745 heikki.linnakangas 2706 ECB :
1061 alvherre 2707 CBC 820 : XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
1061 alvherre 2708 GIC 820 : state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
2709 820 : if (state->seg.ws_file >= 0)
2710 819 : return;
4282 simon 2711 ECB :
2712 : /*
1231 alvherre 2713 : * If the file is not found, assume it's because the standby asked for a
2714 : * too old WAL segment that has already been removed or recycled.
2715 : */
1231 alvherre 2716 CBC 1 : if (errno == ENOENT)
1223 michael 2717 ECB : {
2718 : char xlogfname[MAXFNAMELEN];
1223 michael 2719 CBC 1 : int save_errno = errno;
2720 :
1223 michael 2721 GIC 1 : XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
2722 1 : errno = save_errno;
1231 alvherre 2723 1 : ereport(ERROR,
2724 : (errcode_for_file_access(),
1231 alvherre 2725 ECB : errmsg("requested WAL segment %s has already been removed",
2726 : xlogfname)));
2727 : }
2728 : else
1231 alvherre 2729 UIC 0 : ereport(ERROR,
1231 alvherre 2730 ECB : (errcode_for_file_access(),
2731 : errmsg("could not open file \"%s\": %m",
2732 : path)));
2733 : }
2734 :
2735 : /*
2736 : * Send out the WAL in its normal physical/stored form.
2737 : *
4679 tgl 2738 EUB : * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
2739 : * but not yet sent to the client, and buffer it in the libpq output
2740 : * buffer.
2741 : *
2742 : * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
2743 : * otherwise WalSndCaughtUp is set to false.
2744 : */
2745 : static void
3317 rhaas 2746 GIC 115098 : XLogSendPhysical(void)
2747 : {
2748 : XLogRecPtr SendRqstPtr;
2749 : XLogRecPtr startptr;
2750 : XLogRecPtr endptr;
2751 : Size nbytes;
2752 : XLogSegNo segno;
2753 : WALReadError errinfo;
2754 :
2134 andres 2755 ECB : /* If requested switch the WAL sender to the stopping state. */
2134 andres 2756 GIC 115098 : if (got_STOPPING)
2757 353 : WalSndSetState(WALSNDSTATE_STOPPING);
2758 :
3769 heikki.linnakangas 2759 115098 : if (streamingDoneSending)
2760 : {
3317 rhaas 2761 56338 : WalSndCaughtUp = true;
3769 heikki.linnakangas 2762 87715 : return;
2763 : }
2764 :
3761 heikki.linnakangas 2765 ECB : /* Figure out how far we can safely send the WAL. */
3761 heikki.linnakangas 2766 CBC 58760 : if (sendTimeLineIsHistoric)
2767 : {
3761 heikki.linnakangas 2768 ECB : /*
2769 : * Streaming an old timeline that's in this server's history, but is
2919 2770 : * not the one we're currently inserting or replaying. It can be
2771 : * streamed up to the point where we switched off that timeline.
2772 : */
3761 heikki.linnakangas 2773 GIC 165 : SendRqstPtr = sendTimeLineValidUpto;
2774 : }
3761 heikki.linnakangas 2775 CBC 58595 : else if (am_cascading_walsender)
2776 : {
2777 : TimeLineID SendRqstTLI;
2778 :
2779 : /*
2780 : * Streaming the latest timeline on a standby.
2781 : *
3602 bruce 2782 ECB : * Attempt to send all WAL that has already been replayed, so that we
2783 : * know it's valid. If we're receiving WAL through streaming
3761 heikki.linnakangas 2784 : * replication, it's also OK to send any WAL that has been received
2785 : * but not replayed.
2786 : *
2787 : * The timeline we're recovering from can change, or we can be
2788 : * promoted. In either case, the current timeline becomes historic. We
2789 : * need to detect that so that we don't try to stream past the point
2790 : * where we switched to another timeline. We check for promotion or
2791 : * timeline switch after calculating FlushPtr, to avoid a race
2792 : * condition: if the timeline becomes historic just after we checked
2793 : * that it was still current, it's still be OK to stream it up to the
2794 : * FlushPtr that was calculated before it became historic.
2795 : */
3769 heikki.linnakangas 2796 GIC 1146 : bool becameHistoric = false;
2797 :
520 rhaas 2798 1146 : SendRqstPtr = GetStandbyFlushRecPtr(&SendRqstTLI);
2799 :
3769 heikki.linnakangas 2800 1146 : if (!RecoveryInProgress())
2801 : {
2802 : /* We have been promoted. */
520 rhaas 2803 1 : SendRqstTLI = GetWALInsertionTimeLine();
3769 heikki.linnakangas 2804 1 : am_cascading_walsender = false;
3769 heikki.linnakangas 2805 CBC 1 : becameHistoric = true;
2806 : }
3769 heikki.linnakangas 2807 ECB : else
2808 : {
2809 : /*
2810 : * Still a cascading standby. But is the timeline we're sending
2811 : * still the one recovery is recovering from?
2812 : */
520 rhaas 2813 CBC 1145 : if (sendTimeLine != SendRqstTLI)
3769 heikki.linnakangas 2814 LBC 0 : becameHistoric = true;
2815 : }
2816 :
3769 heikki.linnakangas 2817 GIC 1146 : if (becameHistoric)
2818 : {
2819 : /*
2820 : * The timeline we were sending has become historic. Read the
2821 : * timeline history file of the new timeline to see where exactly
3769 heikki.linnakangas 2822 ECB : * we forked off from the timeline we were sending.
3769 heikki.linnakangas 2823 EUB : */
2824 : List *history;
2825 :
520 rhaas 2826 CBC 1 : history = readTimeLineHistory(SendRqstTLI);
3734 heikki.linnakangas 2827 GIC 1 : sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
2828 :
2829 1 : Assert(sendTimeLine < sendTimeLineNextTLI);
3769 2830 1 : list_free_deep(history);
2831 :
2832 1 : sendTimeLineIsHistoric = true;
2833 :
3761 2834 1 : SendRqstPtr = sendTimeLineValidUpto;
3869 heikki.linnakangas 2835 ECB : }
2836 : }
2837 : else
3761 2838 : {
2839 : /*
2840 : * Streaming the current timeline on a primary.
2841 : *
2842 : * Attempt to send all data that's already been written out and
2843 : * fsync'd to disk. We cannot go further than what's been written out
2844 : * given the current implementation of WALRead(). And in any case
2845 : * it's unsafe to send WAL that is not securely down to disk on the
2846 : * primary: if the primary subsequently crashes and restarts, standbys
2847 : * must not have applied any WAL that got lost on the primary.
2848 : */
520 rhaas 2849 GIC 57449 : SendRqstPtr = GetFlushRecPtr(NULL);
2850 : }
2851 :
2852 : /*
2853 : * Record the current system time as an approximation of the time at which
2854 : * this WAL location was written for the purposes of lag tracking.
2855 : *
2856 : * In theory we could make XLogFlush() record a time in shmem whenever WAL
2857 : * is flushed and we could get that time as well as the LSN when we call
2208 simon 2858 ECB : * GetFlushRecPtr() above (and likewise for the cascading standby
2859 : * equivalent), but rather than putting any new code into the hot WAL path
2860 : * it seems good enough to capture the time here. We should reach this
2861 : * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
2862 : * may take some time, we read the WAL flush pointer and take the time
2863 : * very close to together here so that we'll get a later position if it is
2864 : * still moving.
2865 : *
2866 : * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
2867 : * this gives us a cheap approximation for the WAL flush time for this
2868 : * LSN.
2869 : *
2870 : * Note that the LSN is not necessarily the LSN for the data contained in
2871 : * the present message; it's the end of the WAL, which might be further
2872 : * ahead. All the lag tracking machinery cares about is finding out when
2873 : * that arbitrary LSN is eventually reported as written, flushed and
2874 : * applied, so that it can measure the elapsed time.
2875 : */
2208 simon 2876 GIC 58760 : LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
2877 :
2878 : /*
2879 : * If this is a historic timeline and we've reached the point where we
2880 : * forked to the next timeline, stop streaming.
2881 : *
2882 : * Note: We might already have sent WAL > sendTimeLineValidUpto. The
2883 : * startup process will normally replay all WAL that has been received
2884 : * from the primary, before promoting, but if the WAL streaming is
3602 bruce 2885 ECB : * terminated at a WAL page boundary, the valid portion of the timeline
2886 : * might end in the middle of a WAL record. We might've already sent the
2887 : * first half of that partial WAL record to the cascading standby, so that
2888 : * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
2889 : * replay the partial WAL record either, so it can still follow our
2890 : * timeline switch.
2891 : */
3754 alvherre 2892 GIC 58760 : if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
2893 : {
2894 : /* close the current file. */
1061 2895 13 : if (xlogreader->seg.ws_file >= 0)
2896 13 : wal_segment_close(xlogreader);
2897 :
2898 : /* Send CopyDone */
3769 heikki.linnakangas 2899 13 : pq_putmessage_noblock('c', NULL, 0);
2900 13 : streamingDoneSending = true;
3769 heikki.linnakangas 2901 ECB :
3317 rhaas 2902 GIC 13 : WalSndCaughtUp = true;
2903 :
3623 heikki.linnakangas 2904 CBC 13 : elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
775 peter 2905 ECB : LSN_FORMAT_ARGS(sendTimeLineValidUpto),
2906 : LSN_FORMAT_ARGS(sentPtr));
3769 heikki.linnakangas 2907 GIC 13 : return;
3769 heikki.linnakangas 2908 ECB : }
2909 :
2910 : /* Do we have any work to do? */
3754 alvherre 2911 CBC 58747 : Assert(sentPtr <= SendRqstPtr);
3754 alvherre 2912 GIC 58747 : if (SendRqstPtr <= sentPtr)
4701 heikki.linnakangas 2913 ECB : {
3317 rhaas 2914 GIC 31364 : WalSndCaughtUp = true;
4393 heikki.linnakangas 2915 31364 : return;
4701 heikki.linnakangas 2916 ECB : }
2917 :
2918 : /*
2919 : * Figure out how much to send in one message. If there's no more than
2920 : * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
4679 tgl 2921 : * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
2922 : *
4660 bruce 2923 : * The rounding is not only for performance reasons. Walreceiver relies on
2924 : * the fact that we never split a WAL record across two messages. Since a
2925 : * long WAL record is split at page boundary into continuation records,
2926 : * page boundary is always a safe cut-off point. We also assume that
2927 : * SendRqstPtr never points to the middle of a WAL record.
2928 : */
4701 heikki.linnakangas 2929 GIC 27383 : startptr = sentPtr;
2930 27383 : endptr = startptr;
3754 alvherre 2931 27383 : endptr += MAX_SEND_SIZE;
2932 :
2933 : /* if we went beyond SendRqstPtr, back off */
2934 27383 : if (SendRqstPtr <= endptr)
2935 : {
4693 tgl 2936 18290 : endptr = SendRqstPtr;
3769 heikki.linnakangas 2937 18290 : if (sendTimeLineIsHistoric)
3317 rhaas 2938 CBC 12 : WalSndCaughtUp = false;
3769 heikki.linnakangas 2939 ECB : else
3317 rhaas 2940 CBC 18278 : WalSndCaughtUp = true;
2941 : }
2942 : else
4693 tgl 2943 ECB : {
2944 : /* round down to page boundary. */
3941 heikki.linnakangas 2945 CBC 9093 : endptr -= (endptr % XLOG_BLCKSZ);
3317 rhaas 2946 9093 : WalSndCaughtUp = false;
4693 tgl 2947 ECB : }
2948 :
3941 heikki.linnakangas 2949 CBC 27383 : nbytes = endptr - startptr;
4693 tgl 2950 GIC 27383 : Assert(nbytes <= MAX_SEND_SIZE);
2951 :
2952 : /*
2953 : * OK to read and send the slice.
4701 heikki.linnakangas 2954 ECB : */
3805 heikki.linnakangas 2955 CBC 27383 : resetStringInfo(&output_message);
3805 heikki.linnakangas 2956 GIC 27383 : pq_sendbyte(&output_message, 'w');
2957 :
3805 heikki.linnakangas 2958 CBC 27383 : pq_sendint64(&output_message, startptr); /* dataStart */
3602 bruce 2959 27383 : pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
3602 bruce 2960 GIC 27383 : pq_sendint64(&output_message, 0); /* sendtime, filled in last */
2961 :
2962 : /*
2963 : * Read the log directly into the output buffer to avoid extra memcpy
4693 tgl 2964 ECB : * calls.
2965 : */
3805 heikki.linnakangas 2966 GIC 27383 : enlargeStringInfo(&output_message, nbytes);
1231 alvherre 2967 ECB :
1231 alvherre 2968 CBC 27383 : retry:
699 tmunro 2969 27382 : if (!WALRead(xlogreader,
1066 alvherre 2970 GIC 27383 : &output_message.data[output_message.len],
2971 : startptr,
2972 : nbytes,
1061 2973 27383 : xlogreader->seg.ws_tli, /* Pass the current TLI because
2974 : * only WalSndSegmentOpen controls
1061 alvherre 2975 ECB : * whether new TLI is needed. */
2976 : &errinfo))
1231 alvherre 2977 LBC 0 : WALReadRaiseError(&errinfo);
1231 alvherre 2978 ECB :
2979 : /* See logical_read_xlog_page(). */
1061 alvherre 2980 GIC 27382 : XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
2981 27382 : CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
1231 alvherre 2982 ECB :
2983 : /*
2984 : * During recovery, the currently-open WAL file might be replaced with the
2985 : * file of the same name retrieved from archive. So we always need to
1231 alvherre 2986 EUB : * check what we read was valid after reading into the buffer. If it's
2987 : * invalid, we try to open and read the file again.
2988 : */
1231 alvherre 2989 CBC 27382 : if (am_cascading_walsender)
1231 alvherre 2990 ECB : {
1231 alvherre 2991 GIC 871 : WalSnd *walsnd = MyWalSnd;
2992 : bool reload;
2993 :
2994 871 : SpinLockAcquire(&walsnd->mutex);
2995 871 : reload = walsnd->needreload;
2996 871 : walsnd->needreload = false;
2997 871 : SpinLockRelease(&walsnd->mutex);
1231 alvherre 2998 ECB :
1061 alvherre 2999 GIC 871 : if (reload && xlogreader->seg.ws_file >= 0)
1231 alvherre 3000 ECB : {
1061 alvherre 3001 UIC 0 : wal_segment_close(xlogreader);
3002 :
1231 alvherre 3003 LBC 0 : goto retry;
1231 alvherre 3004 ECB : }
3005 : }
3006 :
3805 heikki.linnakangas 3007 GIC 27382 : output_message.len += nbytes;
3805 heikki.linnakangas 3008 CBC 27382 : output_message.data[output_message.len] = '\0';
3009 :
4693 tgl 3010 EUB : /*
3011 : * Fill the send timestamp last, so that it is taken as late as possible.
3012 : */
3805 heikki.linnakangas 3013 GIC 27382 : resetStringInfo(&tmpbuf);
2236 tgl 3014 27382 : pq_sendint64(&tmpbuf, GetCurrentTimestamp());
3805 heikki.linnakangas 3015 27382 : memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
3805 heikki.linnakangas 3016 CBC 27382 : tmpbuf.data, sizeof(int64));
4832 heikki.linnakangas 3017 ECB :
3805 heikki.linnakangas 3018 GIC 27382 : pq_putmessage_noblock('d', output_message.data, output_message.len);
3019 :
4693 tgl 3020 27382 : sentPtr = endptr;
3021 :
4693 tgl 3022 ECB : /* Update shared memory status */
3023 : {
2495 rhaas 3024 CBC 27382 : WalSnd *walsnd = MyWalSnd;
4693 tgl 3025 ECB :
4693 tgl 3026 GIC 27382 : SpinLockAcquire(&walsnd->mutex);
4693 tgl 3027 CBC 27382 : walsnd->sentPtr = sentPtr;
4693 tgl 3028 GIC 27382 : SpinLockRelease(&walsnd->mutex);
4693 tgl 3029 ECB : }
3030 :
3031 : /* Report progress of XLOG streaming in PS display */
4693 tgl 3032 GIC 27382 : if (update_process_title)
4693 tgl 3033 ECB : {
3034 : char activitymsg[50];
3035 :
4693 tgl 3036 CBC 27382 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
775 peter 3037 27382 : LSN_FORMAT_ARGS(sentPtr));
1124 peter 3038 GIC 27382 : set_ps_display(activitymsg);
3039 : }
3040 : }
4832 heikki.linnakangas 3041 ECB :
3042 : /*
3043 : * Stream out logically decoded data.
3044 : */
3317 rhaas 3045 : static void
3317 rhaas 3046 CBC 830342 : XLogSendLogical(void)
3317 rhaas 3047 ECB : {
3048 : XLogRecord *record;
3049 : char *errm;
3050 :
3051 : /*
3052 : * We'll use the current flush point to determine whether we've caught up.
3053 : * This variable is static in order to cache it across calls. Caching is
3054 : * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
1189 tgl 3055 : * spinlock.
3056 : */
3057 : static XLogRecPtr flushPtr = InvalidXLogRecPtr;
3058 :
3059 : /*
3060 : * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
3061 : * true in WalSndWaitForWal, if we're actually waiting. We also set to
3062 : * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
3063 : * didn't wait - i.e. when we're shutting down.
3064 : */
3317 rhaas 3065 GIC 830342 : WalSndCaughtUp = false;
3066 :
699 tmunro 3067 830342 : record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
3068 :
3069 : /* xlog record was invalid */
3317 rhaas 3070 830220 : if (errm != NULL)
515 michael 3071 1 : elog(ERROR, "could not find record while sending logically-decoded data: %s",
3072 : errm);
3073 :
3317 rhaas 3074 CBC 830219 : if (record != NULL)
3075 : {
2208 simon 3076 ECB : /*
3077 : * Note the lack of any call to LagTrackerWrite() which is handled by
3078 : * WalSndUpdateProgress which is called by output plugin through
2158 3079 : * logical decoding write api.
2208 3080 : */
3062 heikki.linnakangas 3081 GIC 830016 : LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
3082 :
3317 rhaas 3083 CBC 830007 : sentPtr = logical_decoding_ctx->reader->EndRecPtr;
3084 : }
3085 :
3086 : /*
3087 : * If first time through in this session, initialize flushPtr. Otherwise,
3088 : * we only need to update flushPtr if EndRecPtr is past it.
3089 : */
1 andres 3090 GNC 830210 : if (flushPtr == InvalidXLogRecPtr ||
3091 829974 : logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3092 : {
3093 2035 : if (am_cascading_walsender)
3094 55 : flushPtr = GetStandbyFlushRecPtr(NULL);
3095 : else
3096 1980 : flushPtr = GetFlushRecPtr(NULL);
3097 : }
3098 :
3099 : /* If EndRecPtr is still past our flushPtr, it means we caught up. */
1270 alvherre 3100 GIC 830210 : if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
3101 1259 : WalSndCaughtUp = true;
3102 :
1270 alvherre 3103 ECB : /*
3104 : * If we're caught up and have been requested to stop, have WalSndLoop()
3105 : * terminate the connection in an orderly manner, after writing out all
3106 : * the pending data.
3107 : */
1270 alvherre 3108 GIC 830210 : if (WalSndCaughtUp && got_STOPPING)
1270 alvherre 3109 CBC 108 : got_SIGUSR2 = true;
3110 :
3111 : /* Update shared memory status */
3112 : {
2495 rhaas 3113 830210 : WalSnd *walsnd = MyWalSnd;
3317 rhaas 3114 ECB :
3317 rhaas 3115 GIC 830210 : SpinLockAcquire(&walsnd->mutex);
3116 830210 : walsnd->sentPtr = sentPtr;
3117 830210 : SpinLockRelease(&walsnd->mutex);
3118 : }
3119 830210 : }
3120 :
3317 rhaas 3121 ECB : /*
3122 : * Shutdown if the sender is caught up.
3123 : *
3124 : * NB: This should only be called when the shutdown signal has been received
3125 : * from postmaster.
3126 : *
3127 : * Note that if we determine that there's still more data to send, this
3128 : * function will return control to the caller.
3129 : */
3130 : static void
3317 rhaas 3131 GIC 77 : WalSndDone(WalSndSendDataCallback send_data)
3317 rhaas 3132 ECB : {
3133 : XLogRecPtr replicatedPtr;
3134 :
3135 : /* ... let's just be real sure we're caught up ... */
3317 rhaas 3136 GIC 77 : send_data();
3137 :
3138 : /*
3139 : * To figure out whether all WAL has successfully been replicated, check
3140 : * flush location if valid, write otherwise. Tools like pg_receivewal will
3141 : * usually (unless in synchronous mode) return an invalid flush location.
3142 : */
3310 fujii 3143 154 : replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
3310 fujii 3144 CBC 77 : MyWalSnd->write : MyWalSnd->flush;
3145 :
3310 fujii 3146 GIC 77 : if (WalSndCaughtUp && sentPtr == replicatedPtr &&
3317 rhaas 3147 27 : !pq_is_send_pending())
3148 : {
1133 alvherre 3149 ECB : QueryCompletion qc;
3150 :
3151 : /* Inform the standby that XLOG streaming is done */
1133 alvherre 3152 GIC 27 : SetQueryCompletion(&qc, CMDTAG_COPY, 0);
3153 27 : EndCommand(&qc, DestRemote, false);
3317 rhaas 3154 27 : pq_flush();
3155 :
3317 rhaas 3156 CBC 27 : proc_exit(0);
3317 rhaas 3157 ECB : }
3317 rhaas 3158 GIC 50 : if (!waiting_for_ping_response)
375 akapila 3159 CBC 2 : WalSndKeepalive(true, InvalidXLogRecPtr);
3317 rhaas 3160 50 : }
3161 :
3162 : /*
3163 : * Returns the latest point in WAL that has been safely flushed to disk, and
3164 : * can be sent to the standby. This should only be called when in recovery,
3762 heikki.linnakangas 3165 ECB : * ie. we're streaming to a cascaded standby.
3166 : *
520 rhaas 3167 : * As a side-effect, *tli is updated to the TLI of the last
3168 : * replayed WAL record.
3762 heikki.linnakangas 3169 : */
3170 : static XLogRecPtr
520 rhaas 3171 CBC 1266 : GetStandbyFlushRecPtr(TimeLineID *tli)
3762 heikki.linnakangas 3172 ECB : {
3602 bruce 3173 : XLogRecPtr replayPtr;
3174 : TimeLineID replayTLI;
3175 : XLogRecPtr receivePtr;
3176 : TimeLineID receiveTLI;
3177 : XLogRecPtr result;
3178 :
3179 : /*
3180 : * We can safely send what's already been replayed. Also, if walreceiver
3181 : * is streaming WAL from the same timeline, we can send anything that it
3182 : * has streamed, but hasn't been replayed yet.
3183 : */
3762 heikki.linnakangas 3184 :
1096 tmunro 3185 GIC 1266 : receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
3762 heikki.linnakangas 3186 1266 : replayPtr = GetXLogReplayRecPtr(&replayTLI);
3187 :
1 andres 3188 GNC 1266 : if (tli)
3189 1211 : *tli = replayTLI;
3190 :
3762 heikki.linnakangas 3191 GIC 1266 : result = replayPtr;
520 rhaas 3192 1266 : if (receiveTLI == replayTLI && receivePtr > replayPtr)
3762 heikki.linnakangas 3193 275 : result = receivePtr;
3194 :
3195 1266 : return result;
3196 : }
3197 :
3198 : /*
4282 simon 3199 ECB : * Request walsenders to reload the currently-open WAL file
3200 : */
3201 : void
4282 simon 3202 CBC 16 : WalSndRqstFileReload(void)
4282 simon 3203 ECB : {
3204 : int i;
3205 :
4282 simon 3206 CBC 158 : for (i = 0; i < max_wal_senders; i++)
4282 simon 3207 ECB : {
2495 rhaas 3208 GIC 142 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
4282 simon 3209 ECB :
2109 alvherre 3210 GIC 142 : SpinLockAcquire(&walsnd->mutex);
4282 simon 3211 142 : if (walsnd->pid == 0)
3212 : {
2109 alvherre 3213 142 : SpinLockRelease(&walsnd->mutex);
4282 simon 3214 142 : continue;
3215 : }
4282 simon 3216 LBC 0 : walsnd->needreload = true;
4282 simon 3217 UIC 0 : SpinLockRelease(&walsnd->mutex);
3218 : }
4282 simon 3219 GIC 16 : }
4282 simon 3220 ECB :
3221 : /*
2134 andres 3222 : * Handle PROCSIG_WALSND_INIT_STOPPING signal.
3223 : */
3224 : void
2134 andres 3225 CBC 27 : HandleWalSndInitStopping(void)
3226 : {
3227 27 : Assert(am_walsender);
2134 andres 3228 ECB :
3229 : /*
2134 andres 3230 EUB : * If replication has not yet started, die like with SIGTERM. If
3231 : * replication is active, only set a flag and wake up the main loop. It
3232 : * will send any outstanding WAL, wait for it to be replicated to the
2134 andres 3233 ECB : * standby, and then exit gracefully.
3234 : */
2134 andres 3235 GIC 27 : if (!replication_active)
2134 andres 3236 UIC 0 : kill(MyProcPid, SIGTERM);
3237 : else
2134 andres 3238 GIC 27 : got_STOPPING = true;
2134 andres 3239 CBC 27 : }
3240 :
2134 andres 3241 ECB : /*
3242 : * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
3243 : * sender should already have been switched to WALSNDSTATE_STOPPING at
3244 : * this point.
3245 : */
3246 : static void
4832 heikki.linnakangas 3247 GIC 23 : WalSndLastCycleHandler(SIGNAL_ARGS)
3248 : {
4260 tgl 3249 CBC 23 : int save_errno = errno;
4260 tgl 3250 EUB :
2134 andres 3251 GIC 23 : got_SIGUSR2 = true;
3004 andres 3252 CBC 23 : SetLatch(MyLatch);
4260 tgl 3253 ECB :
4260 tgl 3254 GIC 23 : errno = save_errno;
4832 heikki.linnakangas 3255 23 : }
3256 :
3257 : /* Set up signal handlers */
3258 : void
3259 831 : WalSndSignals(void)
3260 : {
4832 heikki.linnakangas 3261 ECB : /* Set up signal handlers */
1209 rhaas 3262 GIC 831 : pqsignal(SIGHUP, SignalHandlerForConfigReload);
2134 andres 3263 CBC 831 : pqsignal(SIGINT, StatementCancelHandler); /* query cancel */
3602 bruce 3264 GIC 831 : pqsignal(SIGTERM, die); /* request shutdown */
935 tgl 3265 ECB : /* SIGQUIT handler was already set up by InitPostmasterChild */
3919 alvherre 3266 CBC 831 : InitializeTimeouts(); /* establishes SIGALRM handler */
4832 heikki.linnakangas 3267 GIC 831 : pqsignal(SIGPIPE, SIG_IGN);
2134 andres 3268 CBC 831 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
3269 831 : pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and
3270 : * shutdown */
3271 :
3272 : /* Reset some signals that are accepted by postmaster but not here */
4832 heikki.linnakangas 3273 831 : pqsignal(SIGCHLD, SIG_DFL);
4832 heikki.linnakangas 3274 GIC 831 : }
3275 :
4832 heikki.linnakangas 3276 ECB : /* Report shared-memory space needed by WalSndShmemInit */
3277 : Size
4832 heikki.linnakangas 3278 CBC 6390 : WalSndShmemSize(void)
3279 : {
4790 bruce 3280 6390 : Size size = 0;
4832 heikki.linnakangas 3281 ECB :
4832 heikki.linnakangas 3282 CBC 6390 : size = offsetof(WalSndCtlData, walsnds);
4756 rhaas 3283 6390 : size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
3284 :
4832 heikki.linnakangas 3285 GIC 6390 : return size;
3286 : }
4832 heikki.linnakangas 3287 ECB :
3288 : /* Allocate and initialize walsender-related shared memory */
3289 : void
4832 heikki.linnakangas 3290 GIC 1826 : WalSndShmemInit(void)
3291 : {
4790 bruce 3292 ECB : bool found;
3293 : int i;
4832 heikki.linnakangas 3294 :
4832 heikki.linnakangas 3295 GIC 1826 : WalSndCtl = (WalSndCtlData *)
4832 heikki.linnakangas 3296 CBC 1826 : ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
4832 heikki.linnakangas 3297 ECB :
4729 tgl 3298 GIC 1826 : if (!found)
4832 heikki.linnakangas 3299 ECB : {
3300 : /* First time through, so initialize */
4729 tgl 3301 GIC 5151 : MemSet(WalSndCtl, 0, WalSndShmemSize());
3302 :
4093 simon 3303 7304 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
81 andres 3304 GNC 5478 : dlist_init(&(WalSndCtl->SyncRepQueue[i]));
3305 :
4729 tgl 3306 GIC 18091 : for (i = 0; i < max_wal_senders; i++)
3307 : {
3308 16265 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
4790 bruce 3309 ECB :
4729 tgl 3310 CBC 16265 : SpinLockInit(&walsnd->mutex);
3311 : }
4832 heikki.linnakangas 3312 ECB : }
4832 heikki.linnakangas 3313 GIC 1826 : }
3314 :
3933 rhaas 3315 ECB : /*
3316 : * Wake up physical, logical or both kinds of walsenders
3317 : *
3318 : * The distinction between physical and logical walsenders is done, because:
3319 : * - physical walsenders can't send data until it's been flushed
3320 : * - logical walsenders on standby can't decode and send data until it's been
3321 : * applied
3322 : *
3323 : * For cascading replication we need to wake up physical walsenders separately
3324 : * from logical walsenders (see the comment before calling WalSndWakeup() in
3325 : * ApplyWalRecord() for more details).
3326 : *
3327 : * This will be called inside critical sections, so throwing an error is not
3328 : * advisable.
3329 : */
3330 : void
1 andres 3331 GNC 2627758 : WalSndWakeup(bool physical, bool logical)
3332 : {
4382 bruce 3333 ECB : int i;
3334 :
4593 heikki.linnakangas 3335 GIC 28847595 : for (i = 0; i < max_wal_senders; i++)
3004 andres 3336 ECB : {
3337 : Latch *latch;
3338 : ReplicationKind kind;
2878 bruce 3339 GIC 26219837 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3340 :
3341 : /*
3342 : * Get latch pointer with spinlock held, for the unlikely case that
3343 : * pointer reads aren't atomic (as they're 8 bytes). While at it, also
3344 : * get kind.
3345 : */
3004 andres 3346 26219837 : SpinLockAcquire(&walsnd->mutex);
3347 26219837 : latch = walsnd->latch;
1 andres 3348 GNC 26219837 : kind = walsnd->kind;
3004 andres 3349 GIC 26219837 : SpinLockRelease(&walsnd->mutex);
3350 :
1 andres 3351 GNC 26219837 : if (latch == NULL)
3352 26191517 : continue;
3353 :
3354 28320 : if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
3355 8540 : (logical && kind == REPLICATION_KIND_LOGICAL))
3004 andres 3356 GIC 21889 : SetLatch(latch);
3357 : }
4593 heikki.linnakangas 3358 2627758 : }
3359 :
3360 : /*
769 tmunro 3361 ECB : * Wait for readiness on the FeBe socket, or a timeout. The mask should be
3362 : * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
3363 : * on postmaster death.
3364 : */
3365 : static void
769 tmunro 3366 GIC 55918 : WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
3367 : {
3368 : WaitEvent event;
769 tmunro 3369 ECB :
769 tmunro 3370 GIC 55918 : ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
3371 55918 : if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
3372 55918 : (event.events & WL_POSTMASTER_DEATH))
769 tmunro 3373 UIC 0 : proc_exit(1);
769 tmunro 3374 GIC 55918 : }
3375 :
2134 andres 3376 ECB : /*
3377 : * Signal all walsenders to move to stopping state.
3378 : *
3379 : * This will trigger walsenders to move to a state where no further WAL can be
3380 : * generated. See this file's header for details.
3381 : */
3382 : void
2134 andres 3383 GIC 971 : WalSndInitStopping(void)
2134 andres 3384 ECB : {
3385 : int i;
3386 :
2134 andres 3387 GIC 9658 : for (i = 0; i < max_wal_senders; i++)
2134 andres 3388 ECB : {
2134 andres 3389 GIC 8687 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
3390 : pid_t pid;
3391 :
3392 8687 : SpinLockAcquire(&walsnd->mutex);
3393 8687 : pid = walsnd->pid;
3394 8687 : SpinLockRelease(&walsnd->mutex);
3395 :
2134 andres 3396 CBC 8687 : if (pid == 0)
2134 andres 3397 GIC 8660 : continue;
3398 :
3399 27 : SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
2134 andres 3400 ECB : }
2134 andres 3401 CBC 971 : }
2134 andres 3402 ECB :
2134 andres 3403 EUB : /*
2134 andres 3404 ECB : * Wait that all the WAL senders have quit or reached the stopping state. This
3405 : * is used by the checkpointer to control when the shutdown checkpoint can
3406 : * safely be performed.
3407 : */
3408 : void
2134 andres 3409 GIC 971 : WalSndWaitStopping(void)
3410 : {
3411 : for (;;)
3412 25 : {
2134 andres 3413 ECB : int i;
2134 andres 3414 GIC 996 : bool all_stopped = true;
3415 :
3416 9683 : for (i = 0; i < max_wal_senders; i++)
2134 andres 3417 ECB : {
2134 andres 3418 GIC 8712 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
2134 andres 3419 ECB :
2134 andres 3420 GIC 8712 : SpinLockAcquire(&walsnd->mutex);
3421 :
2134 andres 3422 CBC 8712 : if (walsnd->pid == 0)
2134 andres 3423 ECB : {
2134 andres 3424 CBC 8664 : SpinLockRelease(&walsnd->mutex);
2134 andres 3425 GIC 8664 : continue;
2134 andres 3426 ECB : }
3427 :
2109 alvherre 3428 GIC 48 : if (walsnd->state != WALSNDSTATE_STOPPING)
2134 andres 3429 ECB : {
2134 andres 3430 GIC 25 : all_stopped = false;
2109 alvherre 3431 CBC 25 : SpinLockRelease(&walsnd->mutex);
2134 andres 3432 GIC 25 : break;
3433 : }
2109 alvherre 3434 23 : SpinLockRelease(&walsnd->mutex);
3435 : }
3436 :
3437 : /* safe to leave if confirmation is done for all WAL senders */
2134 andres 3438 996 : if (all_stopped)
2134 andres 3439 CBC 971 : return;
3440 :
2134 andres 3441 GIC 25 : pg_usleep(10000L); /* wait for 10 msec */
2134 andres 3442 ECB : }
3443 : }
3444 :
3445 : /* Set state for current walsender (only called in walsender) */
4471 magnus 3446 : void
4471 magnus 3447 GIC 1687 : WalSndSetState(WalSndState state)
4471 magnus 3448 ECB : {
2495 rhaas 3449 GIC 1687 : WalSnd *walsnd = MyWalSnd;
4471 magnus 3450 ECB :
4471 magnus 3451 GIC 1687 : Assert(am_walsender);
4471 magnus 3452 ECB :
4471 magnus 3453 GIC 1687 : if (walsnd->state == state)
4471 magnus 3454 CBC 357 : return;
4471 magnus 3455 ECB :
4471 magnus 3456 GIC 1330 : SpinLockAcquire(&walsnd->mutex);
3457 1330 : walsnd->state = state;
4471 magnus 3458 CBC 1330 : SpinLockRelease(&walsnd->mutex);
3459 : }
4471 magnus 3460 ECB :
3461 : /*
3462 : * Return a string constant representing the state. This is used
3463 : * in system views, and should *not* be translated.
3464 : */
3465 : static const char *
4471 magnus 3466 GIC 633 : WalSndGetStateString(WalSndState state)
3467 : {
4471 magnus 3468 CBC 633 : switch (state)
4471 magnus 3469 ECB : {
4471 magnus 3470 GIC 3 : case WALSNDSTATE_STARTUP:
4363 bruce 3471 CBC 3 : return "startup";
4471 magnus 3472 UIC 0 : case WALSNDSTATE_BACKUP:
4363 bruce 3473 0 : return "backup";
4471 magnus 3474 GIC 6 : case WALSNDSTATE_CATCHUP:
4363 bruce 3475 6 : return "catchup";
4471 magnus 3476 624 : case WALSNDSTATE_STREAMING:
4363 bruce 3477 CBC 624 : return "streaming";
2134 andres 3478 UIC 0 : case WALSNDSTATE_STOPPING:
2134 andres 3479 LBC 0 : return "stopping";
3480 : }
4471 magnus 3481 0 : return "UNKNOWN";
3482 : }
4471 magnus 3483 ECB :
2208 simon 3484 : static Interval *
2208 simon 3485 GIC 897 : offset_to_interval(TimeOffset offset)
2208 simon 3486 ECB : {
2153 bruce 3487 CBC 897 : Interval *result = palloc(sizeof(Interval));
2208 simon 3488 ECB :
2208 simon 3489 GIC 897 : result->month = 0;
3490 897 : result->day = 0;
3491 897 : result->time = offset;
3492 :
3493 897 : return result;
3494 : }
3495 :
4475 itagaki.takahiro 3496 ECB : /*
3497 : * Returns activity of walsenders, including pids and xlog locations sent to
3498 : * standby servers.
3499 : */
3500 : Datum
4475 itagaki.takahiro 3501 CBC 553 : pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
4475 itagaki.takahiro 3502 EUB : {
1000 akapila 3503 : #define PG_STAT_GET_WAL_SENDERS_COLS 12
4382 bruce 3504 CBC 553 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1086 tgl 3505 ECB : SyncRepStandbyData *sync_standbys;
3506 : int num_standbys;
4382 bruce 3507 : int i;
4475 itagaki.takahiro 3508 EUB :
173 michael 3509 GBC 553 : InitMaterializedSRF(fcinfo, 0);
3510 :
4417 simon 3511 EUB : /*
3512 : * Get the currently active synchronous standbys. This could be out of
3513 : * date before we're done, but we'll use the data anyway.
3514 : */
1086 tgl 3515 CBC 553 : num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
3516 :
4475 itagaki.takahiro 3517 5961 : for (i = 0; i < max_wal_senders; i++)
3518 : {
2495 rhaas 3519 5408 : WalSnd *walsnd = &WalSndCtl->walsnds[i];
4475 itagaki.takahiro 3520 ECB : XLogRecPtr sentPtr;
4441 heikki.linnakangas 3521 : XLogRecPtr write;
3522 : XLogRecPtr flush;
3523 : XLogRecPtr apply;
3524 : TimeOffset writeLag;
3525 : TimeOffset flushLag;
3526 : TimeOffset applyLag;
3527 : int priority;
3528 : int pid;
3529 : WalSndState state;
3530 : TimestampTz replyTime;
1086 tgl 3531 : bool is_sync_standby;
3532 : Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
267 peter 3533 GNC 5408 : bool nulls[PG_STAT_GET_WAL_SENDERS_COLS] = {0};
1086 tgl 3534 ECB : int j;
3535 :
3536 : /* Collect data from shared memory */
2109 alvherre 3537 GIC 5408 : SpinLockAcquire(&walsnd->mutex);
4475 itagaki.takahiro 3538 5408 : if (walsnd->pid == 0)
2109 alvherre 3539 ECB : {
2109 alvherre 3540 GIC 4775 : SpinLockRelease(&walsnd->mutex);
4475 itagaki.takahiro 3541 4775 : continue;
3542 : }
2109 alvherre 3543 633 : pid = walsnd->pid;
4475 itagaki.takahiro 3544 633 : sentPtr = walsnd->sentPtr;
4469 magnus 3545 CBC 633 : state = walsnd->state;
4441 heikki.linnakangas 3546 GIC 633 : write = walsnd->write;
4441 heikki.linnakangas 3547 CBC 633 : flush = walsnd->flush;
4441 heikki.linnakangas 3548 GIC 633 : apply = walsnd->apply;
2208 simon 3549 CBC 633 : writeLag = walsnd->writeLag;
2208 simon 3550 GIC 633 : flushLag = walsnd->flushLag;
3551 633 : applyLag = walsnd->applyLag;
3040 heikki.linnakangas 3552 633 : priority = walsnd->sync_standby_priority;
1582 michael 3553 633 : replyTime = walsnd->replyTime;
4475 itagaki.takahiro 3554 633 : SpinLockRelease(&walsnd->mutex);
3555 :
3556 : /*
3557 : * Detect whether walsender is/was considered synchronous. We can
3558 : * provide some protection against stale data by checking the PID
3559 : * along with walsnd_index.
3560 : */
1086 tgl 3561 633 : is_sync_standby = false;
3562 674 : for (j = 0; j < num_standbys; j++)
1086 tgl 3563 ECB : {
1086 tgl 3564 GIC 68 : if (sync_standbys[j].walsnd_index == i &&
3565 27 : sync_standbys[j].pid == pid)
3566 : {
1086 tgl 3567 CBC 27 : is_sync_standby = true;
3568 27 : break;
3569 : }
1086 tgl 3570 ECB : }
3571 :
2109 alvherre 3572 CBC 633 : values[0] = Int32GetDatum(pid);
4441 heikki.linnakangas 3573 ECB :
377 mail 3574 CBC 633 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
4459 magnus 3575 ECB : {
3576 : /*
377 mail 3577 : * Only superusers and roles with privileges of pg_read_all_stats
3578 : * can see details. Other users only get the pid value to know
3579 : * it's a walsender, but no details.
4459 magnus 3580 : */
4417 simon 3581 LBC 0 : MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
4459 magnus 3582 ECB : }
3583 : else
3584 : {
4459 magnus 3585 GIC 633 : values[1] = CStringGetTextDatum(WalSndGetStateString(state));
3586 :
2674 3587 633 : if (XLogRecPtrIsInvalid(sentPtr))
3588 3 : nulls[2] = true;
3331 rhaas 3589 633 : values[2] = LSNGetDatum(sentPtr);
4441 heikki.linnakangas 3590 ECB :
2674 magnus 3591 CBC 633 : if (XLogRecPtrIsInvalid(write))
4441 heikki.linnakangas 3592 GIC 5 : nulls[3] = true;
3331 rhaas 3593 CBC 633 : values[3] = LSNGetDatum(write);
4441 heikki.linnakangas 3594 ECB :
2674 magnus 3595 GIC 633 : if (XLogRecPtrIsInvalid(flush))
4441 heikki.linnakangas 3596 CBC 5 : nulls[4] = true;
3331 rhaas 3597 633 : values[4] = LSNGetDatum(flush);
3598 :
2674 magnus 3599 GIC 633 : if (XLogRecPtrIsInvalid(apply))
4441 heikki.linnakangas 3600 5 : nulls[5] = true;
3331 rhaas 3601 CBC 633 : values[5] = LSNGetDatum(apply);
3602 :
3040 heikki.linnakangas 3603 ECB : /*
3604 : * Treat a standby such as a pg_basebackup background process
3605 : * which always returns an invalid flush location, as an
3606 : * asynchronous standby.
3607 : */
2109 alvherre 3608 GIC 633 : priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
3609 :
2208 simon 3610 GBC 633 : if (writeLag < 0)
2208 simon 3611 GIC 352 : nulls[6] = true;
3612 : else
3613 281 : values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
2208 simon 3614 ECB :
2208 simon 3615 GIC 633 : if (flushLag < 0)
2208 simon 3616 CBC 298 : nulls[7] = true;
2208 simon 3617 ECB : else
2208 simon 3618 CBC 335 : values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
3619 :
3620 633 : if (applyLag < 0)
3621 352 : nulls[8] = true;
2208 simon 3622 ECB : else
2208 simon 3623 GIC 281 : values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
2208 simon 3624 ECB :
2208 simon 3625 CBC 633 : values[9] = Int32GetDatum(priority);
4417 simon 3626 ECB :
3627 : /*
4382 bruce 3628 : * More easily understood version of standby state. This is purely
2302 fujii 3629 : * informational.
3630 : *
3631 : * In quorum-based sync replication, the role of each standby
3632 : * listed in synchronous_standby_names can be changing very
3633 : * frequently. Any standbys considered as "sync" at one moment can
3634 : * be switched to "potential" ones at the next moment. So, it's
3635 : * basically useless to report "sync" or "potential" as their sync
3636 : * states. We report just "quorum" for them.
4417 simon 3637 : */
3040 heikki.linnakangas 3638 GIC 633 : if (priority == 0)
2208 simon 3639 CBC 595 : values[10] = CStringGetTextDatum("async");
1086 tgl 3640 38 : else if (is_sync_standby)
2208 simon 3641 GIC 27 : values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
2302 fujii 3642 CBC 27 : CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
3643 : else
2208 simon 3644 11 : values[10] = CStringGetTextDatum("potential");
1582 michael 3645 ECB :
1582 michael 3646 GIC 633 : if (replyTime == 0)
1582 michael 3647 CBC 3 : nulls[11] = true;
3648 : else
3649 630 : values[11] = TimestampTzGetDatum(replyTime);
4459 magnus 3650 ECB : }
3651 :
398 michael 3652 CBC 633 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
3653 : values, nulls);
4475 itagaki.takahiro 3654 ECB : }
3655 :
4475 itagaki.takahiro 3656 GIC 553 : return (Datum) 0;
3657 : }
3658 :
3659 : /*
3660 : * Send a keepalive message to standby.
3661 : *
3662 : * If requestReply is set, the message requests the other party to send
3663 : * a message back to us, for heartbeat purposes. We also set a flag to
3664 : * let nearby code know that we're waiting for that response, to avoid
3665 : * repeated requests.
3666 : *
375 akapila 3667 ECB : * writePtr is the location up to which the WAL is sent. It is essentially
3668 : * the same as sentPtr but in some cases, we need to send keep alive before
3669 : * sentPtr is updated like when skipping empty transactions.
974 alvherre 3670 : */
4117 simon 3671 : static void
375 akapila 3672 GIC 1465 : WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
4117 simon 3673 ECB : {
4117 simon 3674 GIC 1465 : elog(DEBUG2, "sending replication keepalive");
4117 simon 3675 ECB :
3805 heikki.linnakangas 3676 : /* construct the message... */
3805 heikki.linnakangas 3677 GIC 1465 : resetStringInfo(&output_message);
3805 heikki.linnakangas 3678 CBC 1465 : pq_sendbyte(&output_message, 'k');
375 akapila 3679 GIC 1465 : pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
2236 tgl 3680 1465 : pq_sendint64(&output_message, GetCurrentTimestamp());
3805 heikki.linnakangas 3681 CBC 1465 : pq_sendbyte(&output_message, requestReply ? 1 : 0);
3682 :
3683 : /* ... and send it wrapped in CopyData */
3805 heikki.linnakangas 3684 GIC 1465 : pq_putmessage_noblock('d', output_message.data, output_message.len);
974 alvherre 3685 ECB :
3686 : /* Set local flag */
974 alvherre 3687 GIC 1465 : if (requestReply)
3688 2 : waiting_for_ping_response = true;
4117 simon 3689 1465 : }
3690 :
3691 : /*
3692 : * Send keepalive message if too much time has elapsed.
3693 : */
3694 : static void
1682 noah 3695 951815 : WalSndKeepaliveIfNecessary(void)
3696 : {
3697 : TimestampTz ping_time;
3698 :
3699 : /*
3700 : * Don't send keepalive messages if timeouts are globally disabled or
3237 andres 3701 ECB : * we're doing something not partaking in timeouts.
3702 : */
3237 andres 3703 CBC 951815 : if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
3317 rhaas 3704 GIC 24 : return;
3705 :
3317 rhaas 3706 CBC 951791 : if (waiting_for_ping_response)
3707 52 : return;
3317 rhaas 3708 ECB :
3709 : /*
3710 : * If half of wal_sender_timeout has lapsed without receiving any reply
3711 : * from the standby, send a keep-alive message to the standby requesting
3712 : * an immediate reply.
3713 : */
3317 rhaas 3714 GIC 951739 : ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
3715 : wal_sender_timeout / 2);
1682 noah 3716 CBC 951739 : if (last_processing >= ping_time)
3317 rhaas 3717 ECB : {
375 akapila 3718 LBC 0 : WalSndKeepalive(true, InvalidXLogRecPtr);
3719 :
3720 : /* Try to flush pending output to the client */
3317 rhaas 3721 UIC 0 : if (pq_flush_if_writable() != 0)
3722 0 : WalSndShutdown();
3723 : }
3317 rhaas 3724 ECB : }
3725 :
3726 : /*
3727 : * Record the end of the WAL and the time it was flushed locally, so that
3728 : * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
3729 : * eventually reported to have been written, flushed and applied by the
3730 : * standby in a reply message.
3731 : */
2158 simon 3732 : static void
2208 simon 3733 CBC 58924 : LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
3734 : {
2153 bruce 3735 ECB : bool buffer_full;
3736 : int new_write_head;
3737 : int i;
3738 :
2208 simon 3739 GIC 58924 : if (!am_walsender)
2208 simon 3740 UIC 0 : return;
3741 :
3742 : /*
2208 simon 3743 ECB : * If the lsn hasn't advanced since last time, then do nothing. This way
3744 : * we only record a new sample when new WAL has been written.
3745 : */
1636 tmunro 3746 GIC 58924 : if (lag_tracker->last_lsn == lsn)
2208 simon 3747 GBC 40358 : return;
1636 tmunro 3748 GIC 18566 : lag_tracker->last_lsn = lsn;
3749 :
2208 simon 3750 EUB : /*
3751 : * If advancing the write head of the circular buffer would crash into any
3752 : * of the read heads, then the buffer is full. In other words, the
3753 : * slowest reader (presumably apply) is the one that controls the release
3754 : * of space.
3755 : */
1636 tmunro 3756 GIC 18566 : new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
2208 simon 3757 18566 : buffer_full = false;
3758 74264 : for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
3759 : {
1636 tmunro 3760 55698 : if (new_write_head == lag_tracker->read_heads[i])
2208 simon 3761 UIC 0 : buffer_full = true;
2208 simon 3762 ECB : }
3763 :
3764 : /*
3765 : * If the buffer is full, for now we just rewind by one slot and overwrite
3766 : * the last sample, as a simple (if somewhat uneven) way to lower the
3767 : * sampling rate. There may be better adaptive compaction algorithms.
3768 : */
2208 simon 3769 GBC 18566 : if (buffer_full)
3770 : {
1636 tmunro 3771 UIC 0 : new_write_head = lag_tracker->write_head;
3772 0 : if (lag_tracker->write_head > 0)
3773 0 : lag_tracker->write_head--;
3774 : else
1636 tmunro 3775 LBC 0 : lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
2208 simon 3776 ECB : }
3777 :
3778 : /* Store a sample at the current write head position. */
1636 tmunro 3779 GIC 18566 : lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
3780 18566 : lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
3781 18566 : lag_tracker->write_head = new_write_head;
3782 : }
3783 :
3784 : /*
2158 peter_e 3785 ECB : * Find out how much time has elapsed between the moment WAL location 'lsn'
2208 simon 3786 : * (or the highest known earlier LSN) was flushed locally and the time 'now'.
3787 : * We have a separate read head for each of the reported LSN locations we
3788 : * receive in replies from standby; 'head' controls which read head is
3789 : * used. Whenever a read head crosses an LSN which was written into the
2208 simon 3790 EUB : * lag buffer with LagTrackerWrite, we can use the associated timestamp to
3791 : * find out the time this LSN (or an earlier one) was flushed locally, and
3792 : * therefore compute the lag.
3793 : *
3794 : * Return -1 if no new sample data is available, and otherwise the elapsed
3795 : * time in microseconds.
3796 : */
3797 : static TimeOffset
2208 simon 3798 CBC 273489 : LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
3799 : {
2208 simon 3800 GBC 273489 : TimestampTz time = 0;
2208 simon 3801 EUB :
3802 : /* Read all unread samples up to this LSN or end of buffer. */
1636 tmunro 3803 GIC 328505 : while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
1636 tmunro 3804 GBC 113758 : lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
3805 : {
1636 tmunro 3806 GIC 55016 : time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
3807 55016 : lag_tracker->last_read[head] =
1636 tmunro 3808 CBC 55016 : lag_tracker->buffer[lag_tracker->read_heads[head]];
3809 55016 : lag_tracker->read_heads[head] =
3810 55016 : (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
3811 : }
3812 :
3813 : /*
3814 : * If the lag tracker is empty, that means the standby has processed
3815 : * everything we've ever sent so we should now clear 'last_read'. If we
3816 : * didn't do that, we'd risk using a stale and irrelevant sample for
3817 : * interpolation at the beginning of the next burst of WAL after a period
3818 : * of idleness.
3819 : */
1636 tmunro 3820 GIC 273489 : if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3821 214747 : lag_tracker->last_read[head].time = 0;
3822 :
2208 simon 3823 273489 : if (time > now)
3824 : {
3825 : /* If the clock somehow went backwards, treat as not found. */
2208 simon 3826 UIC 0 : return -1;
2208 simon 3827 ECB : }
2208 simon 3828 GIC 273489 : else if (time == 0)
2208 simon 3829 ECB : {
3830 : /*
3831 : * We didn't cross a time. If there is a future sample that we
3832 : * haven't reached yet, and we've already reached at least one sample,
2153 bruce 3833 : * let's interpolate the local flushed time. This is mainly useful
3834 : * for reporting a completely stuck apply position as having
3835 : * increasing lag, since otherwise we'd have to wait for it to
3836 : * eventually start moving again and cross one of our samples before
3837 : * we can show the lag increasing.
2208 simon 3838 : */
1636 tmunro 3839 CBC 223893 : if (lag_tracker->read_heads[head] == lag_tracker->write_head)
3840 : {
3841 : /* There are no future samples, so we can't interpolate. */
2116 simon 3842 GIC 168188 : return -1;
3843 : }
1636 tmunro 3844 55705 : else if (lag_tracker->last_read[head].time != 0)
3845 : {
3846 : /* We can interpolate between last_read and the next sample. */
3847 : double fraction;
3848 7521 : WalTimeSample prev = lag_tracker->last_read[head];
1636 tmunro 3849 CBC 7521 : WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
2208 simon 3850 ECB :
2177 simon 3851 GIC 7521 : if (lsn < prev.lsn)
2177 simon 3852 ECB : {
3853 : /*
3854 : * Reported LSNs shouldn't normally go backwards, but it's
2177 simon 3855 EUB : * possible when there is a timeline change. Treat as not
3856 : * found.
2177 simon 3857 ECB : */
2177 simon 3858 UIC 0 : return -1;
3859 : }
3860 :
2208 simon 3861 GIC 7521 : Assert(prev.lsn < next.lsn);
3862 :
3863 7521 : if (prev.time > next.time)
3864 : {
3865 : /* If the clock somehow went backwards, treat as not found. */
2208 simon 3866 UIC 0 : return -1;
3867 : }
2208 simon 3868 ECB :
3869 : /* See how far we are between the previous and next samples. */
2208 simon 3870 GIC 7521 : fraction =
2208 simon 3871 CBC 7521 : (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
3872 :
2208 simon 3873 ECB : /* Scale the local flush time proportionally. */
2208 simon 3874 GIC 7521 : time = (TimestampTz)
3875 7521 : ((double) prev.time + (next.time - prev.time) * fraction);
3876 : }
2208 simon 3877 ECB : else
3878 : {
3879 : /*
2116 3880 : * We have only a future sample, implying that we were entirely
3881 : * caught up but and now there is a new burst of WAL and the
3882 : * standby hasn't processed the first sample yet. Until the
3883 : * standby reaches the future sample the best we can do is report
3884 : * the hypothetical lag if that sample were to be replayed now.
3885 : */
1636 tmunro 3886 GIC 48184 : time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
2208 simon 3887 EUB : }
3888 : }
3889 :
2208 simon 3890 ECB : /* Return the elapsed time since local flush time in microseconds. */
2208 simon 3891 GIC 105301 : Assert(time != 0);
2208 simon 3892 CBC 105301 : return now - time;
3893 : }
|