LCOV - differential code coverage report
Current view: top level - src/include/replication - worker_internal.h (source / functions) Coverage Total Hit GIC GNC ECB
Current: Differential Code Coverage HEAD vs 15 Lines: 100.0 % 7 7 2 5 7
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 3 3 1 2 3
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * worker_internal.h
       4                 :  *    Internal headers shared by logical replication workers.
       5                 :  *
       6                 :  * Portions Copyright (c) 2016-2023, PostgreSQL Global Development Group
       7                 :  *
       8                 :  * src/include/replication/worker_internal.h
       9                 :  *
      10                 :  *-------------------------------------------------------------------------
      11                 :  */
      12                 : #ifndef WORKER_INTERNAL_H
      13                 : #define WORKER_INTERNAL_H
      14                 : 
      15                 : #include <signal.h>
      16                 : 
      17                 : #include "access/xlogdefs.h"
      18                 : #include "catalog/pg_subscription.h"
      19                 : #include "datatype/timestamp.h"
      20                 : #include "miscadmin.h"
      21                 : #include "replication/logicalrelation.h"
      22                 : #include "storage/buffile.h"
      23                 : #include "storage/fileset.h"
      24                 : #include "storage/lock.h"
      25                 : #include "storage/shm_mq.h"
      26                 : #include "storage/shm_toc.h"
      27                 : #include "storage/spin.h"
      28                 : 
      29                 : 
      30                 : typedef struct LogicalRepWorker
      31                 : {
      32                 :     /* Time at which this worker was launched. */
      33                 :     TimestampTz launch_time;
      34                 : 
      35                 :     /* Indicates if this slot is used or free. */
      36                 :     bool        in_use;
      37                 : 
      38                 :     /* Increased every time the slot is taken by new worker. */
      39                 :     uint16      generation;
      40                 : 
      41                 :     /* Pointer to proc array. NULL if not running. */
      42                 :     PGPROC     *proc;
      43                 : 
      44                 :     /* Database id to connect to. */
      45                 :     Oid         dbid;
      46                 : 
      47                 :     /* User to use for connection (will be same as owner of subscription). */
      48                 :     Oid         userid;
      49                 : 
      50                 :     /* Subscription id for the worker. */
      51                 :     Oid         subid;
      52                 : 
      53                 :     /* Used for initial table synchronization. */
      54                 :     Oid         relid;
      55                 :     char        relstate;
      56                 :     XLogRecPtr  relstate_lsn;
      57                 :     slock_t     relmutex;
      58                 : 
      59                 :     /*
      60                 :      * Used to create the changes and subxact files for the streaming
      61                 :      * transactions.  Upon the arrival of the first streaming transaction or
      62                 :      * when the first-time leader apply worker times out while sending changes
      63                 :      * to the parallel apply worker, the fileset will be initialized, and it
      64                 :      * will be deleted when the worker exits.  Under this, separate buffiles
      65                 :      * would be created for each transaction which will be deleted after the
      66                 :      * transaction is finished.
      67                 :      */
      68                 :     FileSet    *stream_fileset;
      69                 : 
      70                 :     /*
      71                 :      * PID of leader apply worker if this slot is used for a parallel apply
      72                 :      * worker, InvalidPid otherwise.
      73                 :      */
      74                 :     pid_t       leader_pid;
      75                 : 
      76                 :     /* Indicates whether apply can be performed in parallel. */
      77                 :     bool        parallel_apply;
      78                 : 
      79                 :     /* Stats. */
      80                 :     XLogRecPtr  last_lsn;
      81                 :     TimestampTz last_send_time;
      82                 :     TimestampTz last_recv_time;
      83                 :     XLogRecPtr  reply_lsn;
      84                 :     TimestampTz reply_time;
      85                 : } LogicalRepWorker;
      86                 : 
      87                 : /*
      88                 :  * State of the transaction in parallel apply worker.
      89                 :  *
      90                 :  * The enum values must have the same order as the transaction state
      91                 :  * transitions.
      92                 :  */
      93                 : typedef enum ParallelTransState
      94                 : {
      95                 :     PARALLEL_TRANS_UNKNOWN,
      96                 :     PARALLEL_TRANS_STARTED,
      97                 :     PARALLEL_TRANS_FINISHED
      98                 : } ParallelTransState;
      99                 : 
     100                 : /*
     101                 :  * State of fileset used to communicate changes from leader to parallel
     102                 :  * apply worker.
     103                 :  *
     104                 :  * FS_EMPTY indicates an initial state where the leader doesn't need to use
     105                 :  * the file to communicate with the parallel apply worker.
     106                 :  *
     107                 :  * FS_SERIALIZE_IN_PROGRESS indicates that the leader is serializing changes
     108                 :  * to the file.
     109                 :  *
     110                 :  * FS_SERIALIZE_DONE indicates that the leader has serialized all changes to
     111                 :  * the file.
     112                 :  *
     113                 :  * FS_READY indicates that it is now ok for a parallel apply worker to
     114                 :  * read the file.
     115                 :  */
     116                 : typedef enum PartialFileSetState
     117                 : {
     118                 :     FS_EMPTY,
     119                 :     FS_SERIALIZE_IN_PROGRESS,
     120                 :     FS_SERIALIZE_DONE,
     121                 :     FS_READY
     122                 : } PartialFileSetState;
     123                 : 
     124                 : /*
     125                 :  * Struct for sharing information between leader apply worker and parallel
     126                 :  * apply workers.
     127                 :  */
     128                 : typedef struct ParallelApplyWorkerShared
     129                 : {
     130                 :     slock_t     mutex;
     131                 : 
     132                 :     TransactionId xid;
     133                 : 
     134                 :     /*
     135                 :      * State used to ensure commit ordering.
     136                 :      *
     137                 :      * The parallel apply worker will set it to PARALLEL_TRANS_FINISHED after
     138                 :      * handling the transaction finish commands while the apply leader will
     139                 :      * wait for it to become PARALLEL_TRANS_FINISHED before proceeding in
     140                 :      * transaction finish commands (e.g. STREAM_COMMIT/STREAM_PREPARE/
     141                 :      * STREAM_ABORT).
     142                 :      */
     143                 :     ParallelTransState xact_state;
     144                 : 
     145                 :     /* Information from the corresponding LogicalRepWorker slot. */
     146                 :     uint16      logicalrep_worker_generation;
     147                 :     int         logicalrep_worker_slot_no;
     148                 : 
     149                 :     /*
     150                 :      * Indicates whether there are pending streaming blocks in the queue. The
     151                 :      * parallel apply worker will check it before starting to wait.
     152                 :      */
     153                 :     pg_atomic_uint32 pending_stream_count;
     154                 : 
     155                 :     /*
     156                 :      * XactLastCommitEnd from the parallel apply worker. This is required by
     157                 :      * the leader worker so it can update the lsn_mappings.
     158                 :      */
     159                 :     XLogRecPtr  last_commit_end;
     160                 : 
     161                 :     /*
     162                 :      * After entering PARTIAL_SERIALIZE mode, the leader apply worker will
     163                 :      * serialize changes to the file, and share the fileset with the parallel
     164                 :      * apply worker when processing the transaction finish command. Then the
     165                 :      * parallel apply worker will apply all the spooled messages.
     166                 :      *
     167                 :      * FileSet is used here instead of SharedFileSet because we need it to
     168                 :      * survive after releasing the shared memory so that the leader apply
     169                 :      * worker can re-use the same fileset for the next streaming transaction.
     170                 :      */
     171                 :     PartialFileSetState fileset_state;
     172                 :     FileSet     fileset;
     173                 : } ParallelApplyWorkerShared;
     174                 : 
     175                 : /*
     176                 :  * Information which is used to manage the parallel apply worker.
     177                 :  */
     178                 : typedef struct ParallelApplyWorkerInfo
     179                 : {
     180                 :     /*
     181                 :      * This queue is used to send changes from the leader apply worker to the
     182                 :      * parallel apply worker.
     183                 :      */
     184                 :     shm_mq_handle *mq_handle;
     185                 : 
     186                 :     /*
     187                 :      * This queue is used to transfer error messages from the parallel apply
     188                 :      * worker to the leader apply worker.
     189                 :      */
     190                 :     shm_mq_handle *error_mq_handle;
     191                 : 
     192                 :     dsm_segment *dsm_seg;
     193                 : 
     194                 :     /*
     195                 :      * Indicates whether the leader apply worker needs to serialize the
     196                 :      * remaining changes to a file due to timeout when attempting to send data
     197                 :      * to the parallel apply worker via shared memory.
     198                 :      */
     199                 :     bool        serialize_changes;
     200                 : 
     201                 :     /*
     202                 :      * True if the worker is being used to process a parallel apply
     203                 :      * transaction. False indicates this worker is available for re-use.
     204                 :      */
     205                 :     bool        in_use;
     206                 : 
     207                 :     ParallelApplyWorkerShared *shared;
     208                 : } ParallelApplyWorkerInfo;
     209                 : 
     210                 : /* Main memory context for apply worker. Permanent during worker lifetime. */
     211                 : extern PGDLLIMPORT MemoryContext ApplyContext;
     212                 : 
     213                 : extern PGDLLIMPORT MemoryContext ApplyMessageContext;
     214                 : 
     215                 : extern PGDLLIMPORT ErrorContextCallback *apply_error_context_stack;
     216                 : 
     217                 : extern PGDLLIMPORT ParallelApplyWorkerShared *MyParallelShared;
     218                 : 
     219                 : /* libpqreceiver connection */
     220                 : extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn;
     221                 : 
     222                 : /* Worker and subscription objects. */
     223                 : extern PGDLLIMPORT Subscription *MySubscription;
     224                 : extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
     225                 : 
     226                 : extern PGDLLIMPORT bool in_remote_transaction;
     227                 : 
     228                 : extern void logicalrep_worker_attach(int slot);
     229                 : extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
     230                 :                                                 bool only_running);
     231                 : extern List *logicalrep_workers_find(Oid subid, bool only_running);
     232                 : extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
     233                 :                                      Oid userid, Oid relid,
     234                 :                                      dsm_handle subworker_dsm);
     235                 : extern void logicalrep_worker_stop(Oid subid, Oid relid);
     236                 : extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
     237                 : extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
     238                 : extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
     239                 : 
     240                 : extern int  logicalrep_sync_worker_count(Oid subid);
     241                 : 
     242                 : extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
     243                 :                                                char *originname, Size szoriginname);
     244                 : extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
     245                 : 
     246                 : extern bool AllTablesyncsReady(void);
     247                 : extern void UpdateTwoPhaseState(Oid suboid, char new_state);
     248                 : 
     249                 : extern void process_syncing_tables(XLogRecPtr current_lsn);
     250                 : extern void invalidate_syncing_table_states(Datum arg, int cacheid,
     251                 :                                             uint32 hashvalue);
     252                 : 
     253                 : extern void stream_start_internal(TransactionId xid, bool first_segment);
     254                 : extern void stream_stop_internal(TransactionId xid);
     255                 : 
     256                 : /* Common streaming function to apply all the spooled messages */
     257                 : extern void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
     258                 :                                    XLogRecPtr lsn);
     259                 : 
     260                 : extern void apply_dispatch(StringInfo s);
     261                 : 
     262                 : extern void maybe_reread_subscription(void);
     263                 : 
     264                 : extern void stream_cleanup_files(Oid subid, TransactionId xid);
     265                 : 
     266                 : extern void InitializeApplyWorker(void);
     267                 : 
     268                 : extern void store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn);
     269                 : 
     270                 : /* Function for apply error callback */
     271                 : extern void apply_error_callback(void *arg);
     272                 : extern void set_apply_error_context_origin(char *originname);
     273                 : 
     274                 : /* Parallel apply worker setup and interactions */
     275                 : extern void pa_allocate_worker(TransactionId xid);
     276                 : extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid);
     277                 : extern void pa_detach_all_error_mq(void);
     278                 : 
     279                 : extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes,
     280                 :                          const void *data);
     281                 : extern void pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
     282                 :                                            bool stream_locked);
     283                 : 
     284                 : extern void pa_set_xact_state(ParallelApplyWorkerShared *wshared,
     285                 :                               ParallelTransState in_xact);
     286                 : extern void pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo);
     287                 : 
     288                 : extern void pa_start_subtrans(TransactionId current_xid,
     289                 :                               TransactionId top_xid);
     290                 : extern void pa_reset_subtrans(void);
     291                 : extern void pa_stream_abort(LogicalRepStreamAbortData *abort_data);
     292                 : extern void pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
     293                 :                                  PartialFileSetState fileset_state);
     294                 : 
     295                 : extern void pa_lock_stream(TransactionId xid, LOCKMODE lockmode);
     296                 : extern void pa_unlock_stream(TransactionId xid, LOCKMODE lockmode);
     297                 : 
     298                 : extern void pa_lock_transaction(TransactionId xid, LOCKMODE lockmode);
     299                 : extern void pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode);
     300                 : 
     301                 : extern void pa_decr_and_wait_stream_block(void);
     302                 : 
     303                 : extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo,
     304                 :                            XLogRecPtr remote_lsn);
     305                 : 
     306                 : #define isParallelApplyWorker(worker) ((worker)->leader_pid != InvalidPid)
     307                 : 
     308                 : static inline bool
     309 GIC      152294 : am_tablesync_worker(void)
     310                 : {
     311          152294 :     return OidIsValid(MyLogicalRepWorker->relid);
     312                 : }
     313                 : 
     314                 : static inline bool
     315 GNC         426 : am_leader_apply_worker(void)
     316                 : {
     317             695 :     return (!am_tablesync_worker() &&
     318             269 :             !isParallelApplyWorker(MyLogicalRepWorker));
     319                 : }
     320                 : 
     321                 : static inline bool
     322          477539 : am_parallel_apply_worker(void)
     323                 : {
     324          477539 :     return isParallelApplyWorker(MyLogicalRepWorker);
     325                 : }
     326                 : 
     327                 : #endif                          /* WORKER_INTERNAL_H */
        

Generated by: LCOV version v1.16-55-g56c0a2a