LCOV - differential code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 94.8 % 309 293 16 293
Current Date: 2023-04-08 15:15:32 Functions: 100.0 % 14 14 14
Baseline: 15
Baseline Date: 2023-04-08 15:09:40
Legend: Lines: hit not hit

           TLA  Line data    Source code
       1                 : /*-------------------------------------------------------------------------
       2                 :  *
       3                 :  * inv_api.c
       4                 :  *    routines for manipulating inversion fs large objects. This file
       5                 :  *    contains the user-level large object application interface routines.
       6                 :  *
       7                 :  *
       8                 :  * Note: we access pg_largeobject.data using its C struct declaration.
       9                 :  * This is safe because it immediately follows pageno which is an int4 field,
      10                 :  * and therefore the data field will always be 4-byte aligned, even if it
      11                 :  * is in the short 1-byte-header format.  We have to detoast it since it's
      12                 :  * quite likely to be in compressed or short format.  We also need to check
      13                 :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
      14                 :  *
      15                 :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
      16                 :  * does most of the backend code.  We expect that CurrentMemoryContext will
      17                 :  * be a short-lived context.  Data that must persist across function calls
      18                 :  * is kept either in CacheMemoryContext (the Relation structs) or in the
      19                 :  * memory context given to inv_open (for LargeObjectDesc structs).
      20                 :  *
      21                 :  *
      22                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
      23                 :  * Portions Copyright (c) 1994, Regents of the University of California
      24                 :  *
      25                 :  *
      26                 :  * IDENTIFICATION
      27                 :  *    src/backend/storage/large_object/inv_api.c
      28                 :  *
      29                 :  *-------------------------------------------------------------------------
      30                 :  */
      31                 : #include "postgres.h"
      32                 : 
      33                 : #include <limits.h>
      34                 : 
      35                 : #include "access/detoast.h"
      36                 : #include "access/genam.h"
      37                 : #include "access/htup_details.h"
      38                 : #include "access/sysattr.h"
      39                 : #include "access/table.h"
      40                 : #include "access/xact.h"
      41                 : #include "catalog/dependency.h"
      42                 : #include "catalog/indexing.h"
      43                 : #include "catalog/objectaccess.h"
      44                 : #include "catalog/pg_largeobject.h"
      45                 : #include "catalog/pg_largeobject_metadata.h"
      46                 : #include "libpq/libpq-fs.h"
      47                 : #include "miscadmin.h"
      48                 : #include "storage/large_object.h"
      49                 : #include "utils/acl.h"
      50                 : #include "utils/fmgroids.h"
      51                 : #include "utils/rel.h"
      52                 : #include "utils/snapmgr.h"
      53                 : 
      54                 : 
      55                 : /*
      56                 :  * GUC: backwards-compatibility flag to suppress LO permission checks
      57                 :  */
      58                 : bool        lo_compat_privileges;
      59                 : 
      60                 : /*
      61                 :  * All accesses to pg_largeobject and its index make use of a single Relation
      62                 :  * reference, so that we only need to open pg_relation once per transaction.
      63                 :  * To avoid problems when the first such reference occurs inside a
      64                 :  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
      65                 :  * the Relation reference to TopTransactionResourceOwner.
      66                 :  */
      67                 : static Relation lo_heap_r = NULL;
      68                 : static Relation lo_index_r = NULL;
      69                 : 
      70                 : 
      71                 : /*
      72                 :  * Open pg_largeobject and its index, if not already done in current xact
      73                 :  */
      74                 : static void
      75 CBC        1538 : open_lo_relation(void)
      76                 : {
      77                 :     ResourceOwner currentOwner;
      78                 : 
      79            1538 :     if (lo_heap_r && lo_index_r)
      80            1392 :         return;                 /* already open in current xact */
      81                 : 
      82                 :     /* Arrange for the top xact to own these relation references */
      83             146 :     currentOwner = CurrentResourceOwner;
      84             146 :     CurrentResourceOwner = TopTransactionResourceOwner;
      85                 : 
      86                 :     /* Use RowExclusiveLock since we might either read or write */
      87             146 :     if (lo_heap_r == NULL)
      88             146 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
      89             146 :     if (lo_index_r == NULL)
      90             146 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
      91                 : 
      92             146 :     CurrentResourceOwner = currentOwner;
      93                 : }
      94                 : 
      95                 : /*
      96                 :  * Clean up at main transaction end
      97                 :  */
      98                 : void
      99             223 : close_lo_relation(bool isCommit)
     100                 : {
     101             223 :     if (lo_heap_r || lo_index_r)
     102                 :     {
     103                 :         /*
     104                 :          * Only bother to close if committing; else abort cleanup will handle
     105                 :          * it
     106                 :          */
     107             146 :         if (isCommit)
     108                 :         {
     109                 :             ResourceOwner currentOwner;
     110                 : 
     111             105 :             currentOwner = CurrentResourceOwner;
     112             105 :             CurrentResourceOwner = TopTransactionResourceOwner;
     113                 : 
     114             105 :             if (lo_index_r)
     115             105 :                 index_close(lo_index_r, NoLock);
     116             105 :             if (lo_heap_r)
     117             105 :                 table_close(lo_heap_r, NoLock);
     118                 : 
     119             105 :             CurrentResourceOwner = currentOwner;
     120                 :         }
     121             146 :         lo_heap_r = NULL;
     122             146 :         lo_index_r = NULL;
     123                 :     }
     124             223 : }
     125                 : 
     126                 : 
     127                 : /*
     128                 :  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
     129                 :  * read with can be specified.
     130                 :  */
     131                 : static bool
     132             238 : myLargeObjectExists(Oid loid, Snapshot snapshot)
     133                 : {
     134                 :     Relation    pg_lo_meta;
     135                 :     ScanKeyData skey[1];
     136                 :     SysScanDesc sd;
     137                 :     HeapTuple   tuple;
     138             238 :     bool        retval = false;
     139                 : 
     140             238 :     ScanKeyInit(&skey[0],
     141                 :                 Anum_pg_largeobject_metadata_oid,
     142                 :                 BTEqualStrategyNumber, F_OIDEQ,
     143                 :                 ObjectIdGetDatum(loid));
     144                 : 
     145             238 :     pg_lo_meta = table_open(LargeObjectMetadataRelationId,
     146                 :                             AccessShareLock);
     147                 : 
     148             238 :     sd = systable_beginscan(pg_lo_meta,
     149                 :                             LargeObjectMetadataOidIndexId, true,
     150                 :                             snapshot, 1, skey);
     151                 : 
     152             238 :     tuple = systable_getnext(sd);
     153             238 :     if (HeapTupleIsValid(tuple))
     154             236 :         retval = true;
     155                 : 
     156             238 :     systable_endscan(sd);
     157                 : 
     158             238 :     table_close(pg_lo_meta, AccessShareLock);
     159                 : 
     160             238 :     return retval;
     161                 : }
     162                 : 
     163                 : 
     164                 : /*
     165                 :  * Extract data field from a pg_largeobject tuple, detoasting if needed
     166                 :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
     167                 :  * data length, and an indication of whether to pfree the data pointer.
     168                 :  */
     169                 : static void
     170            5124 : getdatafield(Form_pg_largeobject tuple,
     171                 :              bytea **pdatafield,
     172                 :              int *plen,
     173                 :              bool *pfreeit)
     174                 : {
     175                 :     bytea      *datafield;
     176                 :     int         len;
     177                 :     bool        freeit;
     178                 : 
     179            5124 :     datafield = &(tuple->data); /* see note at top of file */
     180            5124 :     freeit = false;
     181            5124 :     if (VARATT_IS_EXTENDED(datafield))
     182                 :     {
     183                 :         datafield = (bytea *)
     184            5041 :             detoast_attr((struct varlena *) datafield);
     185            5041 :         freeit = true;
     186                 :     }
     187            5124 :     len = VARSIZE(datafield) - VARHDRSZ;
     188            5124 :     if (len < 0 || len > LOBLKSIZE)
     189 UBC           0 :         ereport(ERROR,
     190                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
     191                 :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
     192                 :                         tuple->loid, tuple->pageno, len)));
     193 CBC        5124 :     *pdatafield = datafield;
     194            5124 :     *plen = len;
     195            5124 :     *pfreeit = freeit;
     196            5124 : }
     197                 : 
     198                 : 
     199                 : /*
     200                 :  *  inv_create -- create a new large object
     201                 :  *
     202                 :  *  Arguments:
     203                 :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
     204                 :  *
     205                 :  *  Returns:
     206                 :  *    OID of new object
     207                 :  *
     208                 :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
     209                 :  * in use.
     210                 :  */
     211                 : Oid
     212              56 : inv_create(Oid lobjId)
     213                 : {
     214                 :     Oid         lobjId_new;
     215                 : 
     216                 :     /*
     217                 :      * Create a new largeobject with empty data pages
     218                 :      */
     219              56 :     lobjId_new = LargeObjectCreate(lobjId);
     220                 : 
     221                 :     /*
     222                 :      * dependency on the owner of largeobject
     223                 :      *
     224                 :      * The reason why we use LargeObjectRelationId instead of
     225                 :      * LargeObjectMetadataRelationId here is to provide backward compatibility
     226                 :      * to the applications which utilize a knowledge about internal layout of
     227                 :      * system catalogs. OID of pg_largeobject_metadata and loid of
     228                 :      * pg_largeobject are same value, so there are no actual differences here.
     229                 :      */
     230              56 :     recordDependencyOnOwner(LargeObjectRelationId,
     231                 :                             lobjId_new, GetUserId());
     232                 : 
     233                 :     /* Post creation hook for new large object */
     234              56 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
     235                 : 
     236                 :     /*
     237                 :      * Advance command counter to make new tuple visible to later operations.
     238                 :      */
     239              56 :     CommandCounterIncrement();
     240                 : 
     241              56 :     return lobjId_new;
     242                 : }
     243                 : 
     244                 : /*
     245                 :  *  inv_open -- access an existing large object.
     246                 :  *
     247                 :  * Returns a large object descriptor, appropriately filled in.
     248                 :  * The descriptor and subsidiary data are allocated in the specified
     249                 :  * memory context, which must be suitably long-lived for the caller's
     250                 :  * purposes.  If the returned descriptor has a snapshot associated
     251                 :  * with it, the caller must ensure that it also lives long enough,
     252                 :  * e.g. by calling RegisterSnapshotOnOwner
     253                 :  */
     254                 : LargeObjectDesc *
     255             238 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
     256                 : {
     257                 :     LargeObjectDesc *retval;
     258             238 :     Snapshot    snapshot = NULL;
     259             238 :     int         descflags = 0;
     260                 : 
     261                 :     /*
     262                 :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
     263                 :      * | INV_READ), the caller being allowed to read the large object
     264                 :      * descriptor in either case.
     265                 :      */
     266             238 :     if (flags & INV_WRITE)
     267              77 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
     268             238 :     if (flags & INV_READ)
     269             176 :         descflags |= IFS_RDLOCK;
     270                 : 
     271             238 :     if (descflags == 0)
     272 UBC           0 :         ereport(ERROR,
     273                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     274                 :                  errmsg("invalid flags for opening a large object: %d",
     275                 :                         flags)));
     276                 : 
     277                 :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
     278 CBC         238 :     if (descflags & IFS_WRLOCK)
     279              77 :         snapshot = NULL;
     280                 :     else
     281             161 :         snapshot = GetActiveSnapshot();
     282                 : 
     283                 :     /* Can't use LargeObjectExists here because we need to specify snapshot */
     284             238 :     if (!myLargeObjectExists(lobjId, snapshot))
     285               2 :         ereport(ERROR,
     286                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
     287                 :                  errmsg("large object %u does not exist", lobjId)));
     288                 : 
     289                 :     /* Apply permission checks, again specifying snapshot */
     290             236 :     if ((descflags & IFS_RDLOCK) != 0)
     291                 :     {
     292             463 :         if (!lo_compat_privileges &&
     293             227 :             pg_largeobject_aclcheck_snapshot(lobjId,
     294                 :                                              GetUserId(),
     295                 :                                              ACL_SELECT,
     296                 :                                              snapshot) != ACLCHECK_OK)
     297              21 :             ereport(ERROR,
     298                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     299                 :                      errmsg("permission denied for large object %u",
     300                 :                             lobjId)));
     301                 :     }
     302             215 :     if ((descflags & IFS_WRLOCK) != 0)
     303                 :     {
     304             124 :         if (!lo_compat_privileges &&
     305              59 :             pg_largeobject_aclcheck_snapshot(lobjId,
     306                 :                                              GetUserId(),
     307                 :                                              ACL_UPDATE,
     308                 :                                              snapshot) != ACLCHECK_OK)
     309               6 :             ereport(ERROR,
     310                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     311                 :                      errmsg("permission denied for large object %u",
     312                 :                             lobjId)));
     313                 :     }
     314                 : 
     315                 :     /* OK to create a descriptor */
     316             209 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
     317                 :                                                     sizeof(LargeObjectDesc));
     318             209 :     retval->id = lobjId;
     319             209 :     retval->offset = 0;
     320             209 :     retval->flags = descflags;
     321                 : 
     322                 :     /* caller sets if needed, not used by the functions in this file */
     323             209 :     retval->subid = InvalidSubTransactionId;
     324                 : 
     325                 :     /*
     326                 :      * The snapshot (if any) is just the currently active snapshot.  The
     327                 :      * caller will replace it with a longer-lived copy if needed.
     328                 :      */
     329             209 :     retval->snapshot = snapshot;
     330                 : 
     331             209 :     return retval;
     332                 : }
     333                 : 
     334                 : /*
     335                 :  * Closes a large object descriptor previously made by inv_open(), and
     336                 :  * releases the long-term memory used by it.
     337                 :  */
     338                 : void
     339             194 : inv_close(LargeObjectDesc *obj_desc)
     340                 : {
     341             194 :     Assert(PointerIsValid(obj_desc));
     342             194 :     pfree(obj_desc);
     343             194 : }
     344                 : 
     345                 : /*
     346                 :  * Destroys an existing large object (not to be confused with a descriptor!)
     347                 :  *
     348                 :  * Note we expect caller to have done any required permissions check.
     349                 :  */
     350                 : int
     351              41 : inv_drop(Oid lobjId)
     352                 : {
     353                 :     ObjectAddress object;
     354                 : 
     355                 :     /*
     356                 :      * Delete any comments and dependencies on the large object
     357                 :      */
     358              41 :     object.classId = LargeObjectRelationId;
     359              41 :     object.objectId = lobjId;
     360              41 :     object.objectSubId = 0;
     361              41 :     performDeletion(&object, DROP_CASCADE, 0);
     362                 : 
     363                 :     /*
     364                 :      * Advance command counter so that tuple removal will be seen by later
     365                 :      * large-object operations in this transaction.
     366                 :      */
     367              41 :     CommandCounterIncrement();
     368                 : 
     369                 :     /* For historical reasons, we always return 1 on success. */
     370              41 :     return 1;
     371                 : }
     372                 : 
     373                 : /*
     374                 :  * Determine size of a large object
     375                 :  *
     376                 :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
     377                 :  * the offset of the last byte + 1.
     378                 :  */
     379                 : static uint64
     380              52 : inv_getsize(LargeObjectDesc *obj_desc)
     381                 : {
     382              52 :     uint64      lastbyte = 0;
     383                 :     ScanKeyData skey[1];
     384                 :     SysScanDesc sd;
     385                 :     HeapTuple   tuple;
     386                 : 
     387              52 :     Assert(PointerIsValid(obj_desc));
     388                 : 
     389              52 :     open_lo_relation();
     390                 : 
     391              52 :     ScanKeyInit(&skey[0],
     392                 :                 Anum_pg_largeobject_loid,
     393                 :                 BTEqualStrategyNumber, F_OIDEQ,
     394                 :                 ObjectIdGetDatum(obj_desc->id));
     395                 : 
     396              52 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     397                 :                                     obj_desc->snapshot, 1, skey);
     398                 : 
     399                 :     /*
     400                 :      * Because the pg_largeobject index is on both loid and pageno, but we
     401                 :      * constrain only loid, a backwards scan should visit all pages of the
     402                 :      * large object in reverse pageno order.  So, it's sufficient to examine
     403                 :      * the first valid tuple (== last valid page).
     404                 :      */
     405              52 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
     406              52 :     if (HeapTupleIsValid(tuple))
     407                 :     {
     408                 :         Form_pg_largeobject data;
     409                 :         bytea      *datafield;
     410                 :         int         len;
     411                 :         bool        pfreeit;
     412                 : 
     413              48 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     414 UBC           0 :             elog(ERROR, "null field found in pg_largeobject");
     415 CBC          48 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     416              48 :         getdatafield(data, &datafield, &len, &pfreeit);
     417              48 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
     418              48 :         if (pfreeit)
     419               9 :             pfree(datafield);
     420                 :     }
     421                 : 
     422              52 :     systable_endscan_ordered(sd);
     423                 : 
     424              52 :     return lastbyte;
     425                 : }
     426                 : 
     427                 : int64
     428             110 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
     429                 : {
     430                 :     int64       newoffset;
     431                 : 
     432             110 :     Assert(PointerIsValid(obj_desc));
     433                 : 
     434                 :     /*
     435                 :      * We allow seek/tell if you have either read or write permission, so no
     436                 :      * need for a permission check here.
     437                 :      */
     438                 : 
     439                 :     /*
     440                 :      * Note: overflow in the additions is possible, but since we will reject
     441                 :      * negative results, we don't need any extra test for that.
     442                 :      */
     443             110 :     switch (whence)
     444                 :     {
     445              49 :         case SEEK_SET:
     446              49 :             newoffset = offset;
     447              49 :             break;
     448               9 :         case SEEK_CUR:
     449               9 :             newoffset = obj_desc->offset + offset;
     450               9 :             break;
     451              52 :         case SEEK_END:
     452              52 :             newoffset = inv_getsize(obj_desc) + offset;
     453              52 :             break;
     454 UBC           0 :         default:
     455               0 :             ereport(ERROR,
     456                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     457                 :                      errmsg("invalid whence setting: %d", whence)));
     458                 :             newoffset = 0;      /* keep compiler quiet */
     459                 :             break;
     460                 :     }
     461                 : 
     462                 :     /*
     463                 :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     464                 :      * in translatable strings; doing better is not worth the trouble
     465                 :      */
     466 CBC         110 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
     467 UBC           0 :         ereport(ERROR,
     468                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     469                 :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
     470                 :                                  newoffset)));
     471                 : 
     472 CBC         110 :     obj_desc->offset = newoffset;
     473             110 :     return newoffset;
     474                 : }
     475                 : 
     476                 : int64
     477              24 : inv_tell(LargeObjectDesc *obj_desc)
     478                 : {
     479              24 :     Assert(PointerIsValid(obj_desc));
     480                 : 
     481                 :     /*
     482                 :      * We allow seek/tell if you have either read or write permission, so no
     483                 :      * need for a permission check here.
     484                 :      */
     485                 : 
     486              24 :     return obj_desc->offset;
     487                 : }
     488                 : 
     489                 : int
     490             693 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
     491                 : {
     492             693 :     int         nread = 0;
     493                 :     int64       n;
     494                 :     int64       off;
     495                 :     int         len;
     496             693 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     497                 :     uint64      pageoff;
     498                 :     ScanKeyData skey[2];
     499                 :     SysScanDesc sd;
     500                 :     HeapTuple   tuple;
     501                 : 
     502             693 :     Assert(PointerIsValid(obj_desc));
     503             693 :     Assert(buf != NULL);
     504                 : 
     505             693 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
     506 UBC           0 :         ereport(ERROR,
     507                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     508                 :                  errmsg("permission denied for large object %u",
     509                 :                         obj_desc->id)));
     510                 : 
     511 CBC         693 :     if (nbytes <= 0)
     512               4 :         return 0;
     513                 : 
     514             689 :     open_lo_relation();
     515                 : 
     516             689 :     ScanKeyInit(&skey[0],
     517                 :                 Anum_pg_largeobject_loid,
     518                 :                 BTEqualStrategyNumber, F_OIDEQ,
     519                 :                 ObjectIdGetDatum(obj_desc->id));
     520                 : 
     521             689 :     ScanKeyInit(&skey[1],
     522                 :                 Anum_pg_largeobject_pageno,
     523                 :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     524                 :                 Int32GetDatum(pageno));
     525                 : 
     526             689 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     527                 :                                     obj_desc->snapshot, 2, skey);
     528                 : 
     529            5219 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     530                 :     {
     531                 :         Form_pg_largeobject data;
     532                 :         bytea      *datafield;
     533                 :         bool        pfreeit;
     534                 : 
     535            5061 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
     536 UBC           0 :             elog(ERROR, "null field found in pg_largeobject");
     537 CBC        5061 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
     538                 : 
     539                 :         /*
     540                 :          * We expect the indexscan will deliver pages in order.  However,
     541                 :          * there may be missing pages if the LO contains unwritten "holes". We
     542                 :          * want missing sections to read out as zeroes.
     543                 :          */
     544            5061 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
     545            5061 :         if (pageoff > obj_desc->offset)
     546                 :         {
     547               6 :             n = pageoff - obj_desc->offset;
     548               6 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     549               6 :             MemSet(buf + nread, 0, n);
     550               6 :             nread += n;
     551               6 :             obj_desc->offset += n;
     552                 :         }
     553                 : 
     554            5061 :         if (nread < nbytes)
     555                 :         {
     556            5058 :             Assert(obj_desc->offset >= pageoff);
     557            5058 :             off = (int) (obj_desc->offset - pageoff);
     558            5058 :             Assert(off >= 0 && off < LOBLKSIZE);
     559                 : 
     560            5058 :             getdatafield(data, &datafield, &len, &pfreeit);
     561            5058 :             if (len > off)
     562                 :             {
     563            5007 :                 n = len - off;
     564            5007 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
     565            5007 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
     566            5007 :                 nread += n;
     567            5007 :                 obj_desc->offset += n;
     568                 :             }
     569            5058 :             if (pfreeit)
     570            5020 :                 pfree(datafield);
     571                 :         }
     572                 : 
     573            5061 :         if (nread >= nbytes)
     574             531 :             break;
     575                 :     }
     576                 : 
     577             689 :     systable_endscan_ordered(sd);
     578                 : 
     579             689 :     return nread;
     580                 : }
     581                 : 
     582                 : int
     583             776 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
     584                 : {
     585             776 :     int         nwritten = 0;
     586                 :     int         n;
     587                 :     int         off;
     588                 :     int         len;
     589             776 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
     590                 :     ScanKeyData skey[2];
     591                 :     SysScanDesc sd;
     592                 :     HeapTuple   oldtuple;
     593                 :     Form_pg_largeobject olddata;
     594                 :     bool        neednextpage;
     595                 :     bytea      *datafield;
     596                 :     bool        pfreeit;
     597                 :     union
     598                 :     {
     599                 :         bytea       hdr;
     600                 :         /* this is to make the union big enough for a LO data chunk: */
     601                 :         char        data[LOBLKSIZE + VARHDRSZ];
     602                 :         /* ensure union is aligned well enough: */
     603                 :         int32       align_it;
     604                 :     }           workbuf;
     605             776 :     char       *workb = VARDATA(&workbuf.hdr);
     606                 :     HeapTuple   newtup;
     607                 :     Datum       values[Natts_pg_largeobject];
     608                 :     bool        nulls[Natts_pg_largeobject];
     609                 :     bool        replace[Natts_pg_largeobject];
     610                 :     CatalogIndexState indstate;
     611                 : 
     612             776 :     Assert(PointerIsValid(obj_desc));
     613             776 :     Assert(buf != NULL);
     614                 : 
     615                 :     /* enforce writability because snapshot is probably wrong otherwise */
     616             776 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     617 UBC           0 :         ereport(ERROR,
     618                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     619                 :                  errmsg("permission denied for large object %u",
     620                 :                         obj_desc->id)));
     621                 : 
     622 CBC         776 :     if (nbytes <= 0)
     623 UBC           0 :         return 0;
     624                 : 
     625                 :     /* this addition can't overflow because nbytes is only int32 */
     626 CBC         776 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
     627 UBC           0 :         ereport(ERROR,
     628                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     629                 :                  errmsg("invalid large object write request size: %d",
     630                 :                         nbytes)));
     631                 : 
     632 CBC         776 :     open_lo_relation();
     633                 : 
     634             776 :     indstate = CatalogOpenIndexes(lo_heap_r);
     635                 : 
     636             776 :     ScanKeyInit(&skey[0],
     637                 :                 Anum_pg_largeobject_loid,
     638                 :                 BTEqualStrategyNumber, F_OIDEQ,
     639                 :                 ObjectIdGetDatum(obj_desc->id));
     640                 : 
     641             776 :     ScanKeyInit(&skey[1],
     642                 :                 Anum_pg_largeobject_pageno,
     643                 :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     644                 :                 Int32GetDatum(pageno));
     645                 : 
     646             776 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     647                 :                                     obj_desc->snapshot, 2, skey);
     648                 : 
     649             776 :     oldtuple = NULL;
     650             776 :     olddata = NULL;
     651             776 :     neednextpage = true;
     652                 : 
     653            4750 :     while (nwritten < nbytes)
     654                 :     {
     655                 :         /*
     656                 :          * If possible, get next pre-existing page of the LO.  We expect the
     657                 :          * indexscan will deliver these in order --- but there may be holes.
     658                 :          */
     659            3974 :         if (neednextpage)
     660                 :         {
     661             779 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     662                 :             {
     663              12 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     664 UBC           0 :                     elog(ERROR, "null field found in pg_largeobject");
     665 CBC          12 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     666              12 :                 Assert(olddata->pageno >= pageno);
     667                 :             }
     668             779 :             neednextpage = false;
     669                 :         }
     670                 : 
     671                 :         /*
     672                 :          * If we have a pre-existing page, see if it is the page we want to
     673                 :          * write, or a later one.
     674                 :          */
     675            3974 :         if (olddata != NULL && olddata->pageno == pageno)
     676                 :         {
     677                 :             /*
     678                 :              * Update an existing page with fresh data.
     679                 :              *
     680                 :              * First, load old data into workbuf
     681                 :              */
     682              12 :             getdatafield(olddata, &datafield, &len, &pfreeit);
     683              12 :             memcpy(workb, VARDATA(datafield), len);
     684              12 :             if (pfreeit)
     685               9 :                 pfree(datafield);
     686                 : 
     687                 :             /*
     688                 :              * Fill any hole
     689                 :              */
     690              12 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     691              12 :             if (off > len)
     692 UBC           0 :                 MemSet(workb + len, 0, off - len);
     693                 : 
     694                 :             /*
     695                 :              * Insert appropriate portion of new data
     696                 :              */
     697 CBC          12 :             n = LOBLKSIZE - off;
     698              12 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     699              12 :             memcpy(workb + off, buf + nwritten, n);
     700              12 :             nwritten += n;
     701              12 :             obj_desc->offset += n;
     702              12 :             off += n;
     703                 :             /* compute valid length of new page */
     704              12 :             len = (len >= off) ? len : off;
     705              12 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     706                 : 
     707                 :             /*
     708                 :              * Form and insert updated tuple
     709                 :              */
     710              12 :             memset(values, 0, sizeof(values));
     711              12 :             memset(nulls, false, sizeof(nulls));
     712              12 :             memset(replace, false, sizeof(replace));
     713              12 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     714              12 :             replace[Anum_pg_largeobject_data - 1] = true;
     715              12 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     716                 :                                        values, nulls, replace);
     717              12 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     718                 :                                        indstate);
     719              12 :             heap_freetuple(newtup);
     720                 : 
     721                 :             /*
     722                 :              * We're done with this old page.
     723                 :              */
     724              12 :             oldtuple = NULL;
     725              12 :             olddata = NULL;
     726              12 :             neednextpage = true;
     727                 :         }
     728                 :         else
     729                 :         {
     730                 :             /*
     731                 :              * Write a brand new page.
     732                 :              *
     733                 :              * First, fill any hole
     734                 :              */
     735            3962 :             off = (int) (obj_desc->offset % LOBLKSIZE);
     736            3962 :             if (off > 0)
     737               3 :                 MemSet(workb, 0, off);
     738                 : 
     739                 :             /*
     740                 :              * Insert appropriate portion of new data
     741                 :              */
     742            3962 :             n = LOBLKSIZE - off;
     743            3962 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
     744            3962 :             memcpy(workb + off, buf + nwritten, n);
     745            3962 :             nwritten += n;
     746            3962 :             obj_desc->offset += n;
     747                 :             /* compute valid length of new page */
     748            3962 :             len = off + n;
     749            3962 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
     750                 : 
     751                 :             /*
     752                 :              * Form and insert updated tuple
     753                 :              */
     754            3962 :             memset(values, 0, sizeof(values));
     755            3962 :             memset(nulls, false, sizeof(nulls));
     756            3962 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     757            3962 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     758            3962 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     759            3962 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     760            3962 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     761            3962 :             heap_freetuple(newtup);
     762                 :         }
     763            3974 :         pageno++;
     764                 :     }
     765                 : 
     766             776 :     systable_endscan_ordered(sd);
     767                 : 
     768             776 :     CatalogCloseIndexes(indstate);
     769                 : 
     770                 :     /*
     771                 :      * Advance command counter so that my tuple updates will be seen by later
     772                 :      * large-object operations in this transaction.
     773                 :      */
     774             776 :     CommandCounterIncrement();
     775                 : 
     776             776 :     return nwritten;
     777                 : }
     778                 : 
     779                 : void
     780              21 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
     781                 : {
     782              21 :     int32       pageno = (int32) (len / LOBLKSIZE);
     783                 :     int32       off;
     784                 :     ScanKeyData skey[2];
     785                 :     SysScanDesc sd;
     786                 :     HeapTuple   oldtuple;
     787                 :     Form_pg_largeobject olddata;
     788                 :     union
     789                 :     {
     790                 :         bytea       hdr;
     791                 :         /* this is to make the union big enough for a LO data chunk: */
     792                 :         char        data[LOBLKSIZE + VARHDRSZ];
     793                 :         /* ensure union is aligned well enough: */
     794                 :         int32       align_it;
     795                 :     }           workbuf;
     796              21 :     char       *workb = VARDATA(&workbuf.hdr);
     797                 :     HeapTuple   newtup;
     798                 :     Datum       values[Natts_pg_largeobject];
     799                 :     bool        nulls[Natts_pg_largeobject];
     800                 :     bool        replace[Natts_pg_largeobject];
     801                 :     CatalogIndexState indstate;
     802                 : 
     803              21 :     Assert(PointerIsValid(obj_desc));
     804                 : 
     805                 :     /* enforce writability because snapshot is probably wrong otherwise */
     806              21 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
     807 UBC           0 :         ereport(ERROR,
     808                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
     809                 :                  errmsg("permission denied for large object %u",
     810                 :                         obj_desc->id)));
     811                 : 
     812                 :     /*
     813                 :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
     814                 :      * in translatable strings; doing better is not worth the trouble
     815                 :      */
     816 CBC          21 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
     817 UBC           0 :         ereport(ERROR,
     818                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
     819                 :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
     820                 :                                  len)));
     821                 : 
     822 CBC          21 :     open_lo_relation();
     823                 : 
     824              21 :     indstate = CatalogOpenIndexes(lo_heap_r);
     825                 : 
     826                 :     /*
     827                 :      * Set up to find all pages with desired loid and pageno >= target
     828                 :      */
     829              21 :     ScanKeyInit(&skey[0],
     830                 :                 Anum_pg_largeobject_loid,
     831                 :                 BTEqualStrategyNumber, F_OIDEQ,
     832                 :                 ObjectIdGetDatum(obj_desc->id));
     833                 : 
     834              21 :     ScanKeyInit(&skey[1],
     835                 :                 Anum_pg_largeobject_pageno,
     836                 :                 BTGreaterEqualStrategyNumber, F_INT4GE,
     837                 :                 Int32GetDatum(pageno));
     838                 : 
     839              21 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
     840                 :                                     obj_desc->snapshot, 2, skey);
     841                 : 
     842                 :     /*
     843                 :      * If possible, get the page the truncation point is in. The truncation
     844                 :      * point may be beyond the end of the LO or in a hole.
     845                 :      */
     846              21 :     olddata = NULL;
     847              21 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     848                 :     {
     849              12 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
     850 UBC           0 :             elog(ERROR, "null field found in pg_largeobject");
     851 CBC          12 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
     852              12 :         Assert(olddata->pageno >= pageno);
     853                 :     }
     854                 : 
     855                 :     /*
     856                 :      * If we found the page of the truncation point we need to truncate the
     857                 :      * data in it.  Otherwise if we're in a hole, we need to create a page to
     858                 :      * mark the end of data.
     859                 :      */
     860              21 :     if (olddata != NULL && olddata->pageno == pageno)
     861               6 :     {
     862                 :         /* First, load old data into workbuf */
     863                 :         bytea      *datafield;
     864                 :         int         pagelen;
     865                 :         bool        pfreeit;
     866                 : 
     867               6 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
     868               6 :         memcpy(workb, VARDATA(datafield), pagelen);
     869               6 :         if (pfreeit)
     870               3 :             pfree(datafield);
     871                 : 
     872                 :         /*
     873                 :          * Fill any hole
     874                 :          */
     875               6 :         off = len % LOBLKSIZE;
     876               6 :         if (off > pagelen)
     877               3 :             MemSet(workb + pagelen, 0, off - pagelen);
     878                 : 
     879                 :         /* compute length of new page */
     880               6 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     881                 : 
     882                 :         /*
     883                 :          * Form and insert updated tuple
     884                 :          */
     885               6 :         memset(values, 0, sizeof(values));
     886               6 :         memset(nulls, false, sizeof(nulls));
     887               6 :         memset(replace, false, sizeof(replace));
     888               6 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     889               6 :         replace[Anum_pg_largeobject_data - 1] = true;
     890               6 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
     891                 :                                    values, nulls, replace);
     892               6 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
     893                 :                                    indstate);
     894               6 :         heap_freetuple(newtup);
     895                 :     }
     896                 :     else
     897                 :     {
     898                 :         /*
     899                 :          * If the first page we found was after the truncation point, we're in
     900                 :          * a hole that we'll fill, but we need to delete the later page
     901                 :          * because the loop below won't visit it again.
     902                 :          */
     903              15 :         if (olddata != NULL)
     904                 :         {
     905               6 :             Assert(olddata->pageno > pageno);
     906               6 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     907                 :         }
     908                 : 
     909                 :         /*
     910                 :          * Write a brand new page.
     911                 :          *
     912                 :          * Fill the hole up to the truncation point
     913                 :          */
     914              15 :         off = len % LOBLKSIZE;
     915              15 :         if (off > 0)
     916              15 :             MemSet(workb, 0, off);
     917                 : 
     918                 :         /* compute length of new page */
     919              15 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
     920                 : 
     921                 :         /*
     922                 :          * Form and insert new tuple
     923                 :          */
     924              15 :         memset(values, 0, sizeof(values));
     925              15 :         memset(nulls, false, sizeof(nulls));
     926              15 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
     927              15 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
     928              15 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
     929              15 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
     930              15 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
     931              15 :         heap_freetuple(newtup);
     932                 :     }
     933                 : 
     934                 :     /*
     935                 :      * Delete any pages after the truncation point.  If the initial search
     936                 :      * didn't find a page, then of course there's nothing more to do.
     937                 :      */
     938              21 :     if (olddata != NULL)
     939                 :     {
     940              15 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
     941                 :         {
     942               3 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
     943                 :         }
     944                 :     }
     945                 : 
     946              21 :     systable_endscan_ordered(sd);
     947                 : 
     948              21 :     CatalogCloseIndexes(indstate);
     949                 : 
     950                 :     /*
     951                 :      * Advance command counter so that tuple updates will be seen by later
     952                 :      * large-object operations in this transaction.
     953                 :      */
     954              21 :     CommandCounterIncrement();
     955              21 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a