LCOV - differential code coverage report
Current view: top level - src/include/replication - worker_internal.h (source / functions) Coverage Total Hit UNC UIC GIC GNC CBC DCB
Current: Differential Code Coverage 16@8cea358b128 vs 17@8cea358b128 Lines: 100.0 % 8 8 4 4 3
Current Date: 2024-04-14 14:21:10 Functions: 100.0 % 3 3 3
Baseline: 16@8cea358b128 Branches: 66.7 % 12 8 3 1 3 5
Baseline Date: 2024-04-14 14:21:09 Line coverage date bins:
Legend: Lines: hit not hit | Branches: + taken - not taken # not executed (180,240] days: 100.0 % 2 2 2
(240..) days: 100.0 % 6 6 2 4
Function coverage date bins:
(240..) days: 100.0 % 3 3 3
Branch coverage date bins:
(180,240] days: 50.0 % 4 2 2 2
(240..) days: 75.0 % 8 6 1 1 3 3

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * worker_internal.h
                                  4                 :                :  *    Internal headers shared by logical replication workers.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 2016-2024, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * src/include/replication/worker_internal.h
                                  9                 :                :  *
                                 10                 :                :  *-------------------------------------------------------------------------
                                 11                 :                :  */
                                 12                 :                : #ifndef WORKER_INTERNAL_H
                                 13                 :                : #define WORKER_INTERNAL_H
                                 14                 :                : 
                                 15                 :                : #include "access/xlogdefs.h"
                                 16                 :                : #include "catalog/pg_subscription.h"
                                 17                 :                : #include "datatype/timestamp.h"
                                 18                 :                : #include "miscadmin.h"
                                 19                 :                : #include "replication/logicalrelation.h"
                                 20                 :                : #include "replication/walreceiver.h"
                                 21                 :                : #include "storage/buffile.h"
                                 22                 :                : #include "storage/fileset.h"
                                 23                 :                : #include "storage/lock.h"
                                 24                 :                : #include "storage/shm_mq.h"
                                 25                 :                : #include "storage/shm_toc.h"
                                 26                 :                : #include "storage/spin.h"
                                 27                 :                : 
                                 28                 :                : /* Different types of worker */
                                 29                 :                : typedef enum LogicalRepWorkerType
                                 30                 :                : {
                                 31                 :                :     WORKERTYPE_UNKNOWN = 0,
                                 32                 :                :     WORKERTYPE_TABLESYNC,
                                 33                 :                :     WORKERTYPE_APPLY,
                                 34                 :                :     WORKERTYPE_PARALLEL_APPLY,
                                 35                 :                : } LogicalRepWorkerType;
                                 36                 :                : 
                                 37                 :                : typedef struct LogicalRepWorker
                                 38                 :                : {
                                 39                 :                :     /* What type of worker is this? */
                                 40                 :                :     LogicalRepWorkerType type;
                                 41                 :                : 
                                 42                 :                :     /* Time at which this worker was launched. */
                                 43                 :                :     TimestampTz launch_time;
                                 44                 :                : 
                                 45                 :                :     /* Indicates if this slot is used or free. */
                                 46                 :                :     bool        in_use;
                                 47                 :                : 
                                 48                 :                :     /* Increased every time the slot is taken by new worker. */
                                 49                 :                :     uint16      generation;
                                 50                 :                : 
                                 51                 :                :     /* Pointer to proc array. NULL if not running. */
                                 52                 :                :     PGPROC     *proc;
                                 53                 :                : 
                                 54                 :                :     /* Database id to connect to. */
                                 55                 :                :     Oid         dbid;
                                 56                 :                : 
                                 57                 :                :     /* User to use for connection (will be same as owner of subscription). */
                                 58                 :                :     Oid         userid;
                                 59                 :                : 
                                 60                 :                :     /* Subscription id for the worker. */
                                 61                 :                :     Oid         subid;
                                 62                 :                : 
                                 63                 :                :     /* Used for initial table synchronization. */
                                 64                 :                :     Oid         relid;
                                 65                 :                :     char        relstate;
                                 66                 :                :     XLogRecPtr  relstate_lsn;
                                 67                 :                :     slock_t     relmutex;
                                 68                 :                : 
                                 69                 :                :     /*
                                 70                 :                :      * Used to create the changes and subxact files for the streaming
                                 71                 :                :      * transactions.  Upon the arrival of the first streaming transaction or
                                 72                 :                :      * when the first-time leader apply worker times out while sending changes
                                 73                 :                :      * to the parallel apply worker, the fileset will be initialized, and it
                                 74                 :                :      * will be deleted when the worker exits.  Under this, separate buffiles
                                 75                 :                :      * would be created for each transaction which will be deleted after the
                                 76                 :                :      * transaction is finished.
                                 77                 :                :      */
                                 78                 :                :     FileSet    *stream_fileset;
                                 79                 :                : 
                                 80                 :                :     /*
                                 81                 :                :      * PID of leader apply worker if this slot is used for a parallel apply
                                 82                 :                :      * worker, InvalidPid otherwise.
                                 83                 :                :      */
                                 84                 :                :     pid_t       leader_pid;
                                 85                 :                : 
                                 86                 :                :     /* Indicates whether apply can be performed in parallel. */
                                 87                 :                :     bool        parallel_apply;
                                 88                 :                : 
                                 89                 :                :     /* Stats. */
                                 90                 :                :     XLogRecPtr  last_lsn;
                                 91                 :                :     TimestampTz last_send_time;
                                 92                 :                :     TimestampTz last_recv_time;
                                 93                 :                :     XLogRecPtr  reply_lsn;
                                 94                 :                :     TimestampTz reply_time;
                                 95                 :                : } LogicalRepWorker;
                                 96                 :                : 
                                 97                 :                : /*
                                 98                 :                :  * State of the transaction in parallel apply worker.
                                 99                 :                :  *
                                100                 :                :  * The enum values must have the same order as the transaction state
                                101                 :                :  * transitions.
                                102                 :                :  */
                                103                 :                : typedef enum ParallelTransState
                                104                 :                : {
                                105                 :                :     PARALLEL_TRANS_UNKNOWN,
                                106                 :                :     PARALLEL_TRANS_STARTED,
                                107                 :                :     PARALLEL_TRANS_FINISHED,
                                108                 :                : } ParallelTransState;
                                109                 :                : 
                                110                 :                : /*
                                111                 :                :  * State of fileset used to communicate changes from leader to parallel
                                112                 :                :  * apply worker.
                                113                 :                :  *
                                114                 :                :  * FS_EMPTY indicates an initial state where the leader doesn't need to use
                                115                 :                :  * the file to communicate with the parallel apply worker.
                                116                 :                :  *
                                117                 :                :  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
                                118                 :                :  * to the file.
                                119                 :                :  *
                                120                 :                :  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
                                121                 :                :  * the file.
                                122                 :                :  *
                                123                 :                :  * FS_READY indicates that it is now ok for a parallel apply worker to
                                124                 :                :  * read the file.
                                125                 :                :  */
                                126                 :                : typedef enum PartialFileSetState
                                127                 :                : {
                                128                 :                :     FS_EMPTY,
                                129                 :                :     FS_SERIALIZE_IN_PROGRESS,
                                130                 :                :     FS_SERIALIZE_DONE,
                                131                 :                :     FS_READY,
                                132                 :                : } PartialFileSetState;
                                133                 :                : 
                                134                 :                : /*
                                135                 :                :  * Struct for sharing information between leader apply worker and parallel
                                136                 :                :  * apply workers.
                                137                 :                :  */
                                138                 :                : typedef struct ParallelApplyWorkerShared
                                139                 :                : {
                                140                 :                :     slock_t     mutex;
                                141                 :                : 
                                142                 :                :     TransactionId xid;
                                143                 :                : 
                                144                 :                :     /*
                                145                 :                :      * State used to ensure commit ordering.
                                146                 :                :      *
                                147                 :                :      * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
                                148                 :                :      * handling the transaction finish commands while the apply leader will
                                149                 :                :      * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
                                150                 :                :      * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
                                151                 :                :      * STREAM_ABORT).
                                152                 :                :      */
                                153                 :                :     ParallelTransState xact_state;
                                154                 :                : 
                                155                 :                :     /* Information from the corresponding LogicalRepWorker slot. */
                                156                 :                :     uint16      logicalrep_worker_generation;
                                157                 :                :     int         logicalrep_worker_slot_no;
                                158                 :                : 
                                159                 :                :     /*
                                160                 :                :      * Indicates whether there are pending streaming blocks in the queue. The
                                161                 :                :      * parallel apply worker will check it before starting to wait.
                                162                 :                :      */
                                163                 :                :     pg_atomic_uint32 pending_stream_count;
                                164                 :                : 
                                165                 :                :     /*
                                166                 :                :      * XactLastCommitEnd from the parallel apply worker. This is required by
                                167                 :                :      * the leader worker so it can update the lsn_mappings.
                                168                 :                :      */
                                169                 :                :     XLogRecPtr  last_commit_end;
                                170                 :                : 
                                171                 :                :     /*
                                172                 :                :      * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
                                173                 :                :      * serialize changes to the file, and share the fileset with the parallel
                                174                 :                :      * apply worker when processing the transaction finish command. Then the
                                175                 :                :      * parallel apply worker will apply all the spooled messages.
                                176                 :                :      *
                                177                 :                :      * FileSet is used here instead of SharedFileSet because we need it to
                                178                 :                :      * survive after releasing the shared memory so that the leader apply
                                179                 :                :      * worker can re-use the same fileset for the next streaming transaction.
                                180                 :                :      */
                                181                 :                :     PartialFileSetState fileset_state;
                                182                 :                :     FileSet     fileset;
                                183                 :                : } ParallelApplyWorkerShared;
                                184                 :                : 
                                185                 :                : /*
                                186                 :                :  * Information which is used to manage the parallel apply worker.
                                187                 :                :  */
                                188                 :                : typedef struct ParallelApplyWorkerInfo
                                189                 :                : {
                                190                 :                :     /*
                                191                 :                :      * This queue is used to send changes from the leader apply worker to the
                                192                 :                :      * parallel apply worker.
                                193                 :                :      */
                                194                 :                :     shm_mq_handle *mq_handle;
                                195                 :                : 
                                196                 :                :     /*
                                197                 :                :      * This queue is used to transfer error messages from the parallel apply
                                198                 :                :      * worker to the leader apply worker.
                                199                 :                :      */
                                200                 :                :     shm_mq_handle *error_mq_handle;
                                201                 :                : 
                                202                 :                :     dsm_segment *dsm_seg;
                                203                 :                : 
                                204                 :                :     /*
                                205                 :                :      * Indicates whether the leader apply worker needs to serialize the
                                206                 :                :      * remaining changes to a file due to timeout when attempting to send data
                                207                 :                :      * to the parallel apply worker via shared memory.
                                208                 :                :      */
                                209                 :                :     bool        serialize_changes;
                                210                 :                : 
                                211                 :                :     /*
                                212                 :                :      * True if the worker is being used to process a parallel apply
                                213                 :                :      * transaction. False indicates this worker is available for re-use.
                                214                 :                :      */
                                215                 :                :     bool        in_use;
                                216                 :                : 
                                217                 :                :     ParallelApplyWorkerShared *shared;
                                218                 :                : } ParallelApplyWorkerInfo;
                                219                 :                : 
                                220                 :                : /* Main memory context for apply worker. Permanent during worker lifetime. */
                                221                 :                : extern PGDLLIMPORT MemoryContext ApplyContext;
                                222                 :                : 
                                223                 :                : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
                                224                 :                : 
                                225                 :                : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
                                226                 :                : 
                                227                 :                : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
                                228                 :                : 
                                229                 :                : /* libpqreceiver connection */
                                230                 :                : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
                                231                 :                : 
                                232                 :                : /* Worker and subscription objects. */
                                233                 :                : extern PGDLLIMPORT Subscription *MySubscription;
                                234                 :                : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
                                235                 :                : 
                                236                 :                : extern PGDLLIMPORT bool in_remote_transaction;
                                237                 :                : 
                                238                 :                : extern PGDLLIMPORT bool InitializingApplyWorker;
                                239                 :                : 
                                240                 :                : extern void logicalrep_worker_attach(int slot);
                                241                 :                : extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                242                 :                :                                                 bool only_running);
                                243                 :                : extern List *logicalrep_workers_find(Oid subid, bool only_running);
                                244                 :                : extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                245                 :                :                                      Oid dbid, Oid subid, const char *subname,
                                246                 :                :                                      Oid userid, Oid relid,
                                247                 :                :                                      dsm_handle subworker_dsm);
                                248                 :                : extern void logicalrep_worker_stop(Oid subid, Oid relid);
                                249                 :                : extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
                                250                 :                : extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
                                251                 :                : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
                                252                 :                : 
                                253                 :                : extern int  logicalrep_sync_worker_count(Oid subid);
                                254                 :                : 
                                255                 :                : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
                                256                 :                :                                                char *originname, Size szoriginname);
                                257                 :                : 
                                258                 :                : extern bool AllTablesyncsReady(void);
                                259                 :                : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
                                260                 :                : 
                                261                 :                : extern void process_syncing_tables(XLogRecPtr current_lsn);
                                262                 :                : extern void invalidate_syncing_table_states(Datum arg, int cacheid,
                                263                 :                :                                             uint32 hashvalue);
                                264                 :                : 
                                265                 :                : extern void stream_start_internal(TransactionId xid, bool first_segment);
                                266                 :                : extern void stream_stop_internal(TransactionId xid);
                                267                 :                : 
                                268                 :                : /* Common streaming function to apply all the spooled messages */
                                269                 :                : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
                                270                 :                :                                    XLogRecPtr lsn);
                                271                 :                : 
                                272                 :                : extern void apply_dispatch(StringInfo s);
                                273                 :                : 
                                274                 :                : extern void maybe_reread_subscription(void);
                                275                 :                : 
                                276                 :                : extern void stream_cleanup_files(Oid subid, TransactionId xid);
                                277                 :                : 
                                278                 :                : extern void set_stream_options(WalRcvStreamOptions *options,
                                279                 :                :                                char *slotname,
                                280                 :                :                                XLogRecPtr *origin_startpos);
                                281                 :                : 
                                282                 :                : extern void start_apply(XLogRecPtr origin_startpos);
                                283                 :                : 
                                284                 :                : extern void InitializeLogRepWorker(void);
                                285                 :                : 
                                286                 :                : extern void SetupApplyOrSyncWorker(int worker_slot);
                                287                 :                : 
                                288                 :                : extern void DisableSubscriptionAndExit(void);
                                289                 :                : 
                                290                 :                : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
                                291                 :                : 
                                292                 :                : /* Function for apply error callback */
                                293                 :                : extern void apply_error_callback(void *arg);
                                294                 :                : extern void set_apply_error_context_origin(char *originname);
                                295                 :                : 
                                296                 :                : /* Parallel apply worker setup and interactions */
                                297                 :                : extern void pa_allocate_worker(TransactionId xid);
                                298                 :                : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
                                299                 :                : extern void pa_detach_all_error_mq(void);
                                300                 :                : 
                                301                 :                : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
                                302                 :                :                          const void *data);
                                303                 :                : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
                                304                 :                :                                            bool stream_locked);
                                305                 :                : 
                                306                 :                : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
                                307                 :                :                               ParallelTransState xact_state);
                                308                 :                : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
                                309                 :                : 
                                310                 :                : extern void pa_start_subtrans(TransactionId current_xid,
                                311                 :                :                               TransactionId top_xid);
                                312                 :                : extern void pa_reset_subtrans(void);
                                313                 :                : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
                                314                 :                : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
                                315                 :                :                                  PartialFileSetState fileset_state);
                                316                 :                : 
                                317                 :                : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
                                318                 :                : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
                                319                 :                : 
                                320                 :                : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
                                321                 :                : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
                                322                 :                : 
                                323                 :                : extern void pa_decr_and_wait_stream_block(void);
                                324                 :                : 
                                325                 :                : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
                                326                 :                :                            XLogRecPtr remote_lsn);
                                327                 :                : 
                                328                 :                : #define isParallelApplyWorker(worker) ((worker)->in_use && \
                                329                 :                :                                        (worker)->type == WORKERTYPE_PARALLEL_APPLY)
                                330                 :                : #define isTablesyncWorker(worker) ((worker)->in_use && \
                                331                 :                :                                    (worker)->type == WORKERTYPE_TABLESYNC)
                                332                 :                : 
                                333                 :                : static inline bool
 2579 peter_e@gmx.net           334                 :CBC        1153 : am_tablesync_worker(void)
                                335                 :                : {
  244 akapila@postgresql.o      336   [ +  -  +  + ]:GNC        1153 :     return isTablesyncWorker(MyLogicalRepWorker);
                                337                 :                : }
                                338                 :                : 
                                339                 :                : static inline bool
  461 akapila@postgresql.o      340                 :CBC         878 : am_leader_apply_worker(void)
                                341                 :                : {
  233 akapila@postgresql.o      342         [ -  + ]:GNC         878 :     Assert(MyLogicalRepWorker->in_use);
  244                           343                 :            878 :     return (MyLogicalRepWorker->type == WORKERTYPE_APPLY);
                                344                 :                : }
                                345                 :                : 
                                346                 :                : static inline bool
  461 akapila@postgresql.o      347                 :CBC      330376 : am_parallel_apply_worker(void)
                                348                 :                : {
  233 akapila@postgresql.o      349         [ -  + ]:GNC      330376 :     Assert(MyLogicalRepWorker->in_use);
  461 akapila@postgresql.o      350   [ +  -  +  + ]:CBC      330376 :     return isParallelApplyWorker(MyLogicalRepWorker);
                                351                 :                : }
                                352                 :                : 
                                353                 :                : #endif                          /* WORKER_INTERNAL_H */
        

Generated by: LCOV version 2.1-beta2-3-g6141622