Age Owner TLA Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * worker_internal.h
4 : * Internal headers shared by logical replication workers.
5 : *
6 : * Portions Copyright (c) 2016-2023, PostgreSQL Global Development Group
7 : *
8 : * src/include/replication/worker_internal.h
9 : *
10 : *-------------------------------------------------------------------------
11 : */
12 : #ifndef WORKER_INTERNAL_H
13 : #define WORKER_INTERNAL_H
14 :
15 : #include <signal.h>
16 :
17 : #include "access/xlogdefs.h"
18 : #include "catalog/pg_subscription.h"
19 : #include "datatype/timestamp.h"
20 : #include "miscadmin.h"
21 : #include "replication/logicalrelation.h"
22 : #include "storage/buffile.h"
23 : #include "storage/fileset.h"
24 : #include "storage/lock.h"
25 : #include "storage/shm_mq.h"
26 : #include "storage/shm_toc.h"
27 : #include "storage/spin.h"
28 :
29 :
30 : typedef struct LogicalRepWorker
31 : {
32 : /* Time at which this worker was launched. */
33 : TimestampTz launch_time;
34 :
35 : /* Indicates if this slot is used or free. */
36 : bool in_use;
37 :
38 : /* Increased every time the slot is taken by new worker. */
39 : uint16 generation;
40 :
41 : /* Pointer to proc array. NULL if not running. */
42 : PGPROC *proc;
43 :
44 : /* Database id to connect to. */
45 : Oid dbid;
46 :
47 : /* User to use for connection (will be same as owner of subscription). */
48 : Oid userid;
49 :
50 : /* Subscription id for the worker. */
51 : Oid subid;
52 :
53 : /* Used for initial table synchronization. */
54 : Oid relid;
55 : char relstate;
56 : XLogRecPtr relstate_lsn;
57 : slock_t relmutex;
58 :
59 : /*
60 : * Used to create the changes and subxact files for the streaming
61 : * transactions. Upon the arrival of the first streaming transaction or
62 : * when the first-time leader apply worker times out while sending changes
63 : * to the parallel apply worker, the fileset will be initialized, and it
64 : * will be deleted when the worker exits. Under this, separate buffiles
65 : * would be created for each transaction which will be deleted after the
66 : * transaction is finished.
67 : */
68 : FileSet *stream_fileset;
69 :
70 : /*
71 : * PID of leader apply worker if this slot is used for a parallel apply
72 : * worker, InvalidPid otherwise.
73 : */
74 : pid_t leader_pid;
75 :
76 : /* Indicates whether apply can be performed in parallel. */
77 : bool parallel_apply;
78 :
79 : /* Stats. */
80 : XLogRecPtr last_lsn;
81 : TimestampTz last_send_time;
82 : TimestampTz last_recv_time;
83 : XLogRecPtr reply_lsn;
84 : TimestampTz reply_time;
85 : } LogicalRepWorker;
86 :
87 : /*
88 : * State of the transaction in parallel apply worker.
89 : *
90 : * The enum values must have the same order as the transaction state
91 : * transitions.
92 : */
93 : typedef enum ParallelTransState
94 : {
95 : PARALLEL_TRANS_UNKNOWN,
96 : PARALLEL_TRANS_STARTED,
97 : PARALLEL_TRANS_FINISHED
98 : } ParallelTransState;
99 :
100 : /*
101 : * State of fileset used to communicate changes from leader to parallel
102 : * apply worker.
103 : *
104 : * FS_EMPTY indicates an initial state where the leader doesn't need to use
105 : * the file to communicate with the parallel apply worker.
106 : *
107 : * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
108 : * to the file.
109 : *
110 : * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
111 : * the file.
112 : *
113 : * FS_READY indicates that it is now ok for a parallel apply worker to
114 : * read the file.
115 : */
116 : typedef enum PartialFileSetState
117 : {
118 : FS_EMPTY,
119 : FS_SERIALIZE_IN_PROGRESS,
120 : FS_SERIALIZE_DONE,
121 : FS_READY
122 : } PartialFileSetState;
123 :
124 : /*
125 : * Struct for sharing information between leader apply worker and parallel
126 : * apply workers.
127 : */
128 : typedef struct ParallelApplyWorkerShared
129 : {
130 : slock_t mutex;
131 :
132 : TransactionId xid;
133 :
134 : /*
135 : * State used to ensure commit ordering.
136 : *
137 : * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
138 : * handling the transaction finish commands while the apply leader will
139 : * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
140 : * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
141 : * STREAM_ABORT).
142 : */
143 : ParallelTransState xact_state;
144 :
145 : /* Information from the corresponding LogicalRepWorker slot. */
146 : uint16 logicalrep_worker_generation;
147 : int logicalrep_worker_slot_no;
148 :
149 : /*
150 : * Indicates whether there are pending streaming blocks in the queue. The
151 : * parallel apply worker will check it before starting to wait.
152 : */
153 : pg_atomic_uint32 pending_stream_count;
154 :
155 : /*
156 : * XactLastCommitEnd from the parallel apply worker. This is required by
157 : * the leader worker so it can update the lsn_mappings.
158 : */
159 : XLogRecPtr last_commit_end;
160 :
161 : /*
162 : * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
163 : * serialize changes to the file, and share the fileset with the parallel
164 : * apply worker when processing the transaction finish command. Then the
165 : * parallel apply worker will apply all the spooled messages.
166 : *
167 : * FileSet is used here instead of SharedFileSet because we need it to
168 : * survive after releasing the shared memory so that the leader apply
169 : * worker can re-use the same fileset for the next streaming transaction.
170 : */
171 : PartialFileSetState fileset_state;
172 : FileSet fileset;
173 : } ParallelApplyWorkerShared;
174 :
175 : /*
176 : * Information which is used to manage the parallel apply worker.
177 : */
178 : typedef struct ParallelApplyWorkerInfo
179 : {
180 : /*
181 : * This queue is used to send changes from the leader apply worker to the
182 : * parallel apply worker.
183 : */
184 : shm_mq_handle *mq_handle;
185 :
186 : /*
187 : * This queue is used to transfer error messages from the parallel apply
188 : * worker to the leader apply worker.
189 : */
190 : shm_mq_handle *error_mq_handle;
191 :
192 : dsm_segment *dsm_seg;
193 :
194 : /*
195 : * Indicates whether the leader apply worker needs to serialize the
196 : * remaining changes to a file due to timeout when attempting to send data
197 : * to the parallel apply worker via shared memory.
198 : */
199 : bool serialize_changes;
200 :
201 : /*
202 : * True if the worker is being used to process a parallel apply
203 : * transaction. False indicates this worker is available for re-use.
204 : */
205 : bool in_use;
206 :
207 : ParallelApplyWorkerShared *shared;
208 : } ParallelApplyWorkerInfo;
209 :
210 : /* Main memory context for apply worker. Permanent during worker lifetime. */
211 : extern PGDLLIMPORT MemoryContext ApplyContext;
212 :
213 : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
214 :
215 : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
216 :
217 : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
218 :
219 : /* libpqreceiver connection */
220 : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
221 :
222 : /* Worker and subscription objects. */
223 : extern PGDLLIMPORT Subscription *MySubscription;
224 : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
225 :
226 : extern PGDLLIMPORT bool in_remote_transaction;
227 :
228 : extern void logicalrep_worker_attach(int slot);
229 : extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
230 : bool only_running);
231 : extern List *logicalrep_workers_find(Oid subid, bool only_running);
232 : extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
233 : Oid userid, Oid relid,
234 : dsm_handle subworker_dsm);
235 : extern void logicalrep_worker_stop(Oid subid, Oid relid);
236 : extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
237 : extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
238 : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
239 :
240 : extern int logicalrep_sync_worker_count(Oid subid);
241 :
242 : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
243 : char *originname, Size szoriginname);
244 : extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
245 :
246 : extern bool AllTablesyncsReady(void);
247 : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
248 :
249 : extern void process_syncing_tables(XLogRecPtr current_lsn);
250 : extern void invalidate_syncing_table_states(Datum arg, int cacheid,
251 : uint32 hashvalue);
252 :
253 : extern void stream_start_internal(TransactionId xid, bool first_segment);
254 : extern void stream_stop_internal(TransactionId xid);
255 :
256 : /* Common streaming function to apply all the spooled messages */
257 : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
258 : XLogRecPtr lsn);
259 :
260 : extern void apply_dispatch(StringInfo s);
261 :
262 : extern void maybe_reread_subscription(void);
263 :
264 : extern void stream_cleanup_files(Oid subid, TransactionId xid);
265 :
266 : extern void InitializeApplyWorker(void);
267 :
268 : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
269 :
270 : /* Function for apply error callback */
271 : extern void apply_error_callback(void *arg);
272 : extern void set_apply_error_context_origin(char *originname);
273 :
274 : /* Parallel apply worker setup and interactions */
275 : extern void pa_allocate_worker(TransactionId xid);
276 : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
277 : extern void pa_detach_all_error_mq(void);
278 :
279 : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
280 : const void *data);
281 : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
282 : bool stream_locked);
283 :
284 : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
285 : ParallelTransState in_xact);
286 : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
287 :
288 : extern void pa_start_subtrans(TransactionId current_xid,
289 : TransactionId top_xid);
290 : extern void pa_reset_subtrans(void);
291 : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
292 : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
293 : PartialFileSetState fileset_state);
294 :
295 : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
296 : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
297 :
298 : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
299 : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
300 :
301 : extern void pa_decr_and_wait_stream_block(void);
302 :
303 : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
304 : XLogRecPtr remote_lsn);
305 :
306 : #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
307 :
308 : static inline bool
2208 peter_e 309 GIC 152294 : am_tablesync_worker(void)
310 : {
311 152294 : return OidIsValid(MyLogicalRepWorker->relid);
312 : }
313 :
314 : static inline bool
90 akapila 315 GNC 426 : am_leader_apply_worker(void)
316 : {
317 695 : return (!am_tablesync_worker() &&
318 269 : !isParallelApplyWorker(MyLogicalRepWorker));
319 : }
320 :
321 : static inline bool
322 477539 : am_parallel_apply_worker(void)
323 : {
324 477539 : return isParallelApplyWorker(MyLogicalRepWorker);
325 : }
326 :
327 : #endif /* WORKER_INTERNAL_H */
|