LCOV - differential code coverage report
Current view: top level - src/include/replication - worker_internal.h (source / functions) Coverage Total Hit GIC GNC ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 100.0 % 7 7 2 5 7
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 3 3 1 2 3
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (60,120] days: 100.0 % 5 5 5
Legend: Lines: hit not hit (240..) days: 100.0 % 2 2 2
Function coverage date bins:
(60,120] days: 100.0 % 2 2 2
(240..) days: 100.0 % 1 1 1

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * worker_internal.h
                                  4                 :  *    Internal headers shared by logical replication workers.
                                  5                 :  *
                                  6                 :  * Portions Copyright (c) 2016-2023, PostgreSQL Global Development Group
                                  7                 :  *
                                  8                 :  * src/include/replication/worker_internal.h
                                  9                 :  *
                                 10                 :  *-------------------------------------------------------------------------
                                 11                 :  */
                                 12                 : #ifndef WORKER_INTERNAL_H
                                 13                 : #define WORKER_INTERNAL_H
                                 14                 : 
                                 15                 : #include <signal.h>
                                 16                 : 
                                 17                 : #include "access/xlogdefs.h"
                                 18                 : #include "catalog/pg_subscription.h"
                                 19                 : #include "datatype/timestamp.h"
                                 20                 : #include "miscadmin.h"
                                 21                 : #include "replication/logicalrelation.h"
                                 22                 : #include "storage/buffile.h"
                                 23                 : #include "storage/fileset.h"
                                 24                 : #include "storage/lock.h"
                                 25                 : #include "storage/shm_mq.h"
                                 26                 : #include "storage/shm_toc.h"
                                 27                 : #include "storage/spin.h"
                                 28                 : 
                                 29                 : 
                                 30                 : typedef struct LogicalRepWorker
                                 31                 : {
                                 32                 :     /* Time at which this worker was launched. */
                                 33                 :     TimestampTz launch_time;
                                 34                 : 
                                 35                 :     /* Indicates if this slot is used or free. */
                                 36                 :     bool        in_use;
                                 37                 : 
                                 38                 :     /* Increased every time the slot is taken by new worker. */
                                 39                 :     uint16      generation;
                                 40                 : 
                                 41                 :     /* Pointer to proc array. NULL if not running. */
                                 42                 :     PGPROC     *proc;
                                 43                 : 
                                 44                 :     /* Database id to connect to. */
                                 45                 :     Oid         dbid;
                                 46                 : 
                                 47                 :     /* User to use for connection (will be same as owner of subscription). */
                                 48                 :     Oid         userid;
                                 49                 : 
                                 50                 :     /* Subscription id for the worker. */
                                 51                 :     Oid         subid;
                                 52                 : 
                                 53                 :     /* Used for initial table synchronization. */
                                 54                 :     Oid         relid;
                                 55                 :     char        relstate;
                                 56                 :     XLogRecPtr  relstate_lsn;
                                 57                 :     slock_t     relmutex;
                                 58                 : 
                                 59                 :     /*
                                 60                 :      * Used to create the changes and subxact files for the streaming
                                 61                 :      * transactions.  Upon the arrival of the first streaming transaction or
                                 62                 :      * when the first-time leader apply worker times out while sending changes
                                 63                 :      * to the parallel apply worker, the fileset will be initialized, and it
                                 64                 :      * will be deleted when the worker exits.  Under this, separate buffiles
                                 65                 :      * would be created for each transaction which will be deleted after the
                                 66                 :      * transaction is finished.
                                 67                 :      */
                                 68                 :     FileSet    *stream_fileset;
                                 69                 : 
                                 70                 :     /*
                                 71                 :      * PID of leader apply worker if this slot is used for a parallel apply
                                 72                 :      * worker, InvalidPid otherwise.
                                 73                 :      */
                                 74                 :     pid_t       leader_pid;
                                 75                 : 
                                 76                 :     /* Indicates whether apply can be performed in parallel. */
                                 77                 :     bool        parallel_apply;
                                 78                 : 
                                 79                 :     /* Stats. */
                                 80                 :     XLogRecPtr  last_lsn;
                                 81                 :     TimestampTz last_send_time;
                                 82                 :     TimestampTz last_recv_time;
                                 83                 :     XLogRecPtr  reply_lsn;
                                 84                 :     TimestampTz reply_time;
                                 85                 : } LogicalRepWorker;
                                 86                 : 
                                 87                 : /*
                                 88                 :  * State of the transaction in parallel apply worker.
                                 89                 :  *
                                 90                 :  * The enum values must have the same order as the transaction state
                                 91                 :  * transitions.
                                 92                 :  */
                                 93                 : typedef enum ParallelTransState
                                 94                 : {
                                 95                 :     PARALLEL_TRANS_UNKNOWN,
                                 96                 :     PARALLEL_TRANS_STARTED,
                                 97                 :     PARALLEL_TRANS_FINISHED
                                 98                 : } ParallelTransState;
                                 99                 : 
                                100                 : /*
                                101                 :  * State of fileset used to communicate changes from leader to parallel
                                102                 :  * apply worker.
                                103                 :  *
                                104                 :  * FS_EMPTY indicates an initial state where the leader doesn't need to use
                                105                 :  * the file to communicate with the parallel apply worker.
                                106                 :  *
                                107                 :  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
                                108                 :  * to the file.
                                109                 :  *
                                110                 :  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
                                111                 :  * the file.
                                112                 :  *
                                113                 :  * FS_READY indicates that it is now ok for a parallel apply worker to
                                114                 :  * read the file.
                                115                 :  */
                                116                 : typedef enum PartialFileSetState
                                117                 : {
                                118                 :     FS_EMPTY,
                                119                 :     FS_SERIALIZE_IN_PROGRESS,
                                120                 :     FS_SERIALIZE_DONE,
                                121                 :     FS_READY
                                122                 : } PartialFileSetState;
                                123                 : 
                                124                 : /*
                                125                 :  * Struct for sharing information between leader apply worker and parallel
                                126                 :  * apply workers.
                                127                 :  */
                                128                 : typedef struct ParallelApplyWorkerShared
                                129                 : {
                                130                 :     slock_t     mutex;
                                131                 : 
                                132                 :     TransactionId xid;
                                133                 : 
                                134                 :     /*
                                135                 :      * State used to ensure commit ordering.
                                136                 :      *
                                137                 :      * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
                                138                 :      * handling the transaction finish commands while the apply leader will
                                139                 :      * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
                                140                 :      * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
                                141                 :      * STREAM_ABORT).
                                142                 :      */
                                143                 :     ParallelTransState xact_state;
                                144                 : 
                                145                 :     /* Information from the corresponding LogicalRepWorker slot. */
                                146                 :     uint16      logicalrep_worker_generation;
                                147                 :     int         logicalrep_worker_slot_no;
                                148                 : 
                                149                 :     /*
                                150                 :      * Indicates whether there are pending streaming blocks in the queue. The
                                151                 :      * parallel apply worker will check it before starting to wait.
                                152                 :      */
                                153                 :     pg_atomic_uint32 pending_stream_count;
                                154                 : 
                                155                 :     /*
                                156                 :      * XactLastCommitEnd from the parallel apply worker. This is required by
                                157                 :      * the leader worker so it can update the lsn_mappings.
                                158                 :      */
                                159                 :     XLogRecPtr  last_commit_end;
                                160                 : 
                                161                 :     /*
                                162                 :      * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
                                163                 :      * serialize changes to the file, and share the fileset with the parallel
                                164                 :      * apply worker when processing the transaction finish command. Then the
                                165                 :      * parallel apply worker will apply all the spooled messages.
                                166                 :      *
                                167                 :      * FileSet is used here instead of SharedFileSet because we need it to
                                168                 :      * survive after releasing the shared memory so that the leader apply
                                169                 :      * worker can re-use the same fileset for the next streaming transaction.
                                170                 :      */
                                171                 :     PartialFileSetState fileset_state;
                                172                 :     FileSet     fileset;
                                173                 : } ParallelApplyWorkerShared;
                                174                 : 
                                175                 : /*
                                176                 :  * Information which is used to manage the parallel apply worker.
                                177                 :  */
                                178                 : typedef struct ParallelApplyWorkerInfo
                                179                 : {
                                180                 :     /*
                                181                 :      * This queue is used to send changes from the leader apply worker to the
                                182                 :      * parallel apply worker.
                                183                 :      */
                                184                 :     shm_mq_handle *mq_handle;
                                185                 : 
                                186                 :     /*
                                187                 :      * This queue is used to transfer error messages from the parallel apply
                                188                 :      * worker to the leader apply worker.
                                189                 :      */
                                190                 :     shm_mq_handle *error_mq_handle;
                                191                 : 
                                192                 :     dsm_segment *dsm_seg;
                                193                 : 
                                194                 :     /*
                                195                 :      * Indicates whether the leader apply worker needs to serialize the
                                196                 :      * remaining changes to a file due to timeout when attempting to send data
                                197                 :      * to the parallel apply worker via shared memory.
                                198                 :      */
                                199                 :     bool        serialize_changes;
                                200                 : 
                                201                 :     /*
                                202                 :      * True if the worker is being used to process a parallel apply
                                203                 :      * transaction. False indicates this worker is available for re-use.
                                204                 :      */
                                205                 :     bool        in_use;
                                206                 : 
                                207                 :     ParallelApplyWorkerShared *shared;
                                208                 : } ParallelApplyWorkerInfo;
                                209                 : 
                                210                 : /* Main memory context for apply worker. Permanent during worker lifetime. */
                                211                 : extern PGDLLIMPORT MemoryContext ApplyContext;
                                212                 : 
                                213                 : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
                                214                 : 
                                215                 : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
                                216                 : 
                                217                 : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
                                218                 : 
                                219                 : /* libpqreceiver connection */
                                220                 : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
                                221                 : 
                                222                 : /* Worker and subscription objects. */
                                223                 : extern PGDLLIMPORT Subscription *MySubscription;
                                224                 : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
                                225                 : 
                                226                 : extern PGDLLIMPORT bool in_remote_transaction;
                                227                 : 
                                228                 : extern void logicalrep_worker_attach(int slot);
                                229                 : extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                230                 :                                                 bool only_running);
                                231                 : extern List *logicalrep_workers_find(Oid subid, bool only_running);
                                232                 : extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                                233                 :                                      Oid userid, Oid relid,
                                234                 :                                      dsm_handle subworker_dsm);
                                235                 : extern void logicalrep_worker_stop(Oid subid, Oid relid);
                                236                 : extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
                                237                 : extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
                                238                 : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
                                239                 : 
                                240                 : extern int  logicalrep_sync_worker_count(Oid subid);
                                241                 : 
                                242                 : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
                                243                 :                                                char *originname, Size szoriginname);
                                244                 : extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
                                245                 : 
                                246                 : extern bool AllTablesyncsReady(void);
                                247                 : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
                                248                 : 
                                249                 : extern void process_syncing_tables(XLogRecPtr current_lsn);
                                250                 : extern void invalidate_syncing_table_states(Datum arg, int cacheid,
                                251                 :                                             uint32 hashvalue);
                                252                 : 
                                253                 : extern void stream_start_internal(TransactionId xid, bool first_segment);
                                254                 : extern void stream_stop_internal(TransactionId xid);
                                255                 : 
                                256                 : /* Common streaming function to apply all the spooled messages */
                                257                 : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
                                258                 :                                    XLogRecPtr lsn);
                                259                 : 
                                260                 : extern void apply_dispatch(StringInfo s);
                                261                 : 
                                262                 : extern void maybe_reread_subscription(void);
                                263                 : 
                                264                 : extern void stream_cleanup_files(Oid subid, TransactionId xid);
                                265                 : 
                                266                 : extern void InitializeApplyWorker(void);
                                267                 : 
                                268                 : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
                                269                 : 
                                270                 : /* Function for apply error callback */
                                271                 : extern void apply_error_callback(void *arg);
                                272                 : extern void set_apply_error_context_origin(char *originname);
                                273                 : 
                                274                 : /* Parallel apply worker setup and interactions */
                                275                 : extern void pa_allocate_worker(TransactionId xid);
                                276                 : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
                                277                 : extern void pa_detach_all_error_mq(void);
                                278                 : 
                                279                 : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
                                280                 :                          const void *data);
                                281                 : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
                                282                 :                                            bool stream_locked);
                                283                 : 
                                284                 : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
                                285                 :                               ParallelTransState in_xact);
                                286                 : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
                                287                 : 
                                288                 : extern void pa_start_subtrans(TransactionId current_xid,
                                289                 :                               TransactionId top_xid);
                                290                 : extern void pa_reset_subtrans(void);
                                291                 : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
                                292                 : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
                                293                 :                                  PartialFileSetState fileset_state);
                                294                 : 
                                295                 : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
                                296                 : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
                                297                 : 
                                298                 : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
                                299                 : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
                                300                 : 
                                301                 : extern void pa_decr_and_wait_stream_block(void);
                                302                 : 
                                303                 : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                                304                 :                            XLogRecPtr remote_lsn);
                                305                 : 
                                306                 : #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
                                307                 : 
                                308                 : static inline bool
 2208 peter_e                   309 GIC      152294 : am_tablesync_worker(void)
                                310                 : {
                                311          152294 :     return OidIsValid(MyLogicalRepWorker->relid);
                                312                 : }
                                313                 : 
                                314                 : static inline bool
   90 akapila                   315 GNC         426 : am_leader_apply_worker(void)
                                316                 : {
                                317             695 :     return (!am_tablesync_worker() &&
                                318             269 :             !isParallelApplyWorker(MyLogicalRepWorker));
                                319                 : }
                                320                 : 
                                321                 : static inline bool
                                322          477539 : am_parallel_apply_worker(void)
                                323                 : {
                                324          477539 :     return isParallelApplyWorker(MyLogicalRepWorker);
                                325                 : }
                                326                 : 
                                327                 : #endif                          /* WORKER_INTERNAL_H */
        

Generated by: LCOV version v1.16-55-g56c0a2a