LCOV - differential code coverage report
Current view: top level - src/backend/storage/large_object - inv_api.c (source / functions) Coverage Total Hit UBC CBC
Current: Differential Code Coverage HEAD vs 15 Lines: 94.8 % 309 293 16 293
Current Date: 2023-04-08 17:13:01 Functions: 100.0 % 14 14 14
Baseline: 15 Line coverage date bins:
Baseline Date: 2023-04-08 15:09:40 (240..) days: 94.8 % 309 293 16 293
Legend: Lines: hit not hit Function coverage date bins:
(240..) days: 100.0 % 14 14 14

 Age         Owner                  TLA  Line data    Source code
                                  1                 : /*-------------------------------------------------------------------------
                                  2                 :  *
                                  3                 :  * inv_api.c
                                  4                 :  *    routines for manipulating inversion fs large objects. This file
                                  5                 :  *    contains the user-level large object application interface routines.
                                  6                 :  *
                                  7                 :  *
                                  8                 :  * Note: we access pg_largeobject.data using its C struct declaration.
                                  9                 :  * This is safe because it immediately follows pageno which is an int4 field,
                                 10                 :  * and therefore the data field will always be 4-byte aligned, even if it
                                 11                 :  * is in the short 1-byte-header format.  We have to detoast it since it's
                                 12                 :  * quite likely to be in compressed or short format.  We also need to check
                                 13                 :  * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
                                 14                 :  *
                                 15                 :  * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
                                 16                 :  * does most of the backend code.  We expect that CurrentMemoryContext will
                                 17                 :  * be a short-lived context.  Data that must persist across function calls
                                 18                 :  * is kept either in CacheMemoryContext (the Relation structs) or in the
                                 19                 :  * memory context given to inv_open (for LargeObjectDesc structs).
                                 20                 :  *
                                 21                 :  *
                                 22                 :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
                                 23                 :  * Portions Copyright (c) 1994, Regents of the University of California
                                 24                 :  *
                                 25                 :  *
                                 26                 :  * IDENTIFICATION
                                 27                 :  *    src/backend/storage/large_object/inv_api.c
                                 28                 :  *
                                 29                 :  *-------------------------------------------------------------------------
                                 30                 :  */
                                 31                 : #include "postgres.h"
                                 32                 : 
                                 33                 : #include <limits.h>
                                 34                 : 
                                 35                 : #include "access/detoast.h"
                                 36                 : #include "access/genam.h"
                                 37                 : #include "access/htup_details.h"
                                 38                 : #include "access/sysattr.h"
                                 39                 : #include "access/table.h"
                                 40                 : #include "access/xact.h"
                                 41                 : #include "catalog/dependency.h"
                                 42                 : #include "catalog/indexing.h"
                                 43                 : #include "catalog/objectaccess.h"
                                 44                 : #include "catalog/pg_largeobject.h"
                                 45                 : #include "catalog/pg_largeobject_metadata.h"
                                 46                 : #include "libpq/libpq-fs.h"
                                 47                 : #include "miscadmin.h"
                                 48                 : #include "storage/large_object.h"
                                 49                 : #include "utils/acl.h"
                                 50                 : #include "utils/fmgroids.h"
                                 51                 : #include "utils/rel.h"
                                 52                 : #include "utils/snapmgr.h"
                                 53                 : 
                                 54                 : 
                                 55                 : /*
                                 56                 :  * GUC: backwards-compatibility flag to suppress LO permission checks
                                 57                 :  */
                                 58                 : bool        lo_compat_privileges;
                                 59                 : 
                                 60                 : /*
                                 61                 :  * All accesses to pg_largeobject and its index make use of a single Relation
                                 62                 :  * reference, so that we only need to open pg_relation once per transaction.
                                 63                 :  * To avoid problems when the first such reference occurs inside a
                                 64                 :  * subtransaction, we execute a slightly klugy maneuver to assign ownership of
                                 65                 :  * the Relation reference to TopTransactionResourceOwner.
                                 66                 :  */
                                 67                 : static Relation lo_heap_r = NULL;
                                 68                 : static Relation lo_index_r = NULL;
                                 69                 : 
                                 70                 : 
                                 71                 : /*
                                 72                 :  * Open pg_largeobject and its index, if not already done in current xact
                                 73                 :  */
                                 74                 : static void
 6829 tgl                        75 CBC        1538 : open_lo_relation(void)
                                 76                 : {
                                 77                 :     ResourceOwner currentOwner;
                                 78                 : 
                                 79            1538 :     if (lo_heap_r && lo_index_r)
                                 80            1392 :         return;                 /* already open in current xact */
                                 81                 : 
                                 82                 :     /* Arrange for the top xact to own these relation references */
                                 83             146 :     currentOwner = CurrentResourceOwner;
 2006                            84             146 :     CurrentResourceOwner = TopTransactionResourceOwner;
                                 85                 : 
                                 86                 :     /* Use RowExclusiveLock since we might either read or write */
                                 87             146 :     if (lo_heap_r == NULL)
 1539 andres                     88             146 :         lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
 2006 tgl                        89             146 :     if (lo_index_r == NULL)
                                 90             146 :         lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
                                 91                 : 
 6829                            92             146 :     CurrentResourceOwner = currentOwner;
                                 93                 : }
                                 94                 : 
                                 95                 : /*
                                 96                 :  * Clean up at main transaction end
                                 97                 :  */
                                 98                 : void
                                 99             223 : close_lo_relation(bool isCommit)
                                100                 : {
                                101             223 :     if (lo_heap_r || lo_index_r)
                                102                 :     {
                                103                 :         /*
                                104                 :          * Only bother to close if committing; else abort cleanup will handle
                                105                 :          * it
                                106                 :          */
                                107             146 :         if (isCommit)
                                108                 :         {
                                109                 :             ResourceOwner currentOwner;
                                110                 : 
                                111             105 :             currentOwner = CurrentResourceOwner;
 2006                           112             105 :             CurrentResourceOwner = TopTransactionResourceOwner;
                                113                 : 
                                114             105 :             if (lo_index_r)
                                115             105 :                 index_close(lo_index_r, NoLock);
                                116             105 :             if (lo_heap_r)
 1539 andres                    117             105 :                 table_close(lo_heap_r, NoLock);
                                118                 : 
 6829 tgl                       119             105 :             CurrentResourceOwner = currentOwner;
                                120                 :         }
                                121             146 :         lo_heap_r = NULL;
                                122             146 :         lo_index_r = NULL;
                                123                 :     }
                                124             223 : }
                                125                 : 
                                126                 : 
                                127                 : /*
                                128                 :  * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
                                129                 :  * read with can be specified.
                                130                 :  */
                                131                 : static bool
 6509                           132             238 : myLargeObjectExists(Oid loid, Snapshot snapshot)
                                133                 : {
                                134                 :     Relation    pg_lo_meta;
                                135                 :     ScanKeyData skey[1];
                                136                 :     SysScanDesc sd;
                                137                 :     HeapTuple   tuple;
                                138             238 :     bool        retval = false;
                                139                 : 
                                140             238 :     ScanKeyInit(&skey[0],
                                141                 :                 Anum_pg_largeobject_metadata_oid,
                                142                 :                 BTEqualStrategyNumber, F_OIDEQ,
                                143                 :                 ObjectIdGetDatum(loid));
                                144                 : 
 1539 andres                    145             238 :     pg_lo_meta = table_open(LargeObjectMetadataRelationId,
                                146                 :                             AccessShareLock);
                                147                 : 
 4867 itagaki.takahiro          148             238 :     sd = systable_beginscan(pg_lo_meta,
                                149                 :                             LargeObjectMetadataOidIndexId, true,
                                150                 :                             snapshot, 1, skey);
                                151                 : 
                                152             238 :     tuple = systable_getnext(sd);
                                153             238 :     if (HeapTupleIsValid(tuple))
 6509 tgl                       154             236 :         retval = true;
                                155                 : 
                                156             238 :     systable_endscan(sd);
                                157                 : 
 1539 andres                    158             238 :     table_close(pg_lo_meta, AccessShareLock);
                                159                 : 
 6509 tgl                       160             238 :     return retval;
                                161                 : }
                                162                 : 
                                163                 : 
                                164                 : /*
                                165                 :  * Extract data field from a pg_largeobject tuple, detoasting if needed
                                166                 :  * and verifying that the length is sane.  Returns data pointer (a bytea *),
                                167                 :  * data length, and an indication of whether to pfree the data pointer.
                                168                 :  */
                                169                 : static void
 3230                           170            5124 : getdatafield(Form_pg_largeobject tuple,
                                171                 :              bytea **pdatafield,
                                172                 :              int *plen,
                                173                 :              bool *pfreeit)
                                174                 : {
                                175                 :     bytea      *datafield;
                                176                 :     int         len;
                                177                 :     bool        freeit;
                                178                 : 
                                179            5124 :     datafield = &(tuple->data); /* see note at top of file */
                                180            5124 :     freeit = false;
                                181            5124 :     if (VARATT_IS_EXTENDED(datafield))
                                182                 :     {
                                183                 :         datafield = (bytea *)
 1283 rhaas                     184            5041 :             detoast_attr((struct varlena *) datafield);
 3230 tgl                       185            5041 :         freeit = true;
                                186                 :     }
                                187            5124 :     len = VARSIZE(datafield) - VARHDRSZ;
                                188            5124 :     if (len < 0 || len > LOBLKSIZE)
 3230 tgl                       189 UBC           0 :         ereport(ERROR,
                                190                 :                 (errcode(ERRCODE_DATA_CORRUPTED),
                                191                 :                  errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
                                192                 :                         tuple->loid, tuple->pageno, len)));
 3230 tgl                       193 CBC        5124 :     *pdatafield = datafield;
                                194            5124 :     *plen = len;
                                195            5124 :     *pfreeit = freeit;
 8202                           196            5124 : }
                                197                 : 
                                198                 : 
                                199                 : /*
                                200                 :  *  inv_create -- create a new large object
                                201                 :  *
                                202                 :  *  Arguments:
                                203                 :  *    lobjId - OID to use for new large object, or InvalidOid to pick one
                                204                 :  *
                                205                 :  *  Returns:
                                206                 :  *    OID of new object
                                207                 :  *
                                208                 :  * If lobjId is not InvalidOid, then an error occurs if the OID is already
                                209                 :  * in use.
                                210                 :  */
                                211                 : Oid
 6509                           212              56 : inv_create(Oid lobjId)
                                213                 : {
                                214                 :     Oid         lobjId_new;
                                215                 : 
                                216                 :     /*
                                217                 :      * Create a new largeobject with empty data pages
                                218                 :      */
 4867 itagaki.takahiro          219              56 :     lobjId_new = LargeObjectCreate(lobjId);
                                220                 : 
                                221                 :     /*
                                222                 :      * dependency on the owner of largeobject
                                223                 :      *
                                224                 :      * The reason why we use LargeObjectRelationId instead of
                                225                 :      * LargeObjectMetadataRelationId here is to provide backward compatibility
                                226                 :      * to the applications which utilize a knowledge about internal layout of
                                227                 :      * system catalogs. OID of pg_largeobject_metadata and loid of
                                228                 :      * pg_largeobject are same value, so there are no actual differences here.
                                229                 :      */
                                230              56 :     recordDependencyOnOwner(LargeObjectRelationId,
                                231                 :                             lobjId_new, GetUserId());
                                232                 : 
                                233                 :     /* Post creation hook for new large object */
 3686 rhaas                     234              56 :     InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
                                235                 : 
                                236                 :     /*
                                237                 :      * Advance command counter to make new tuple visible to later operations.
                                238                 :      */
 8204 bruce                     239              56 :     CommandCounterIncrement();
                                240                 : 
 4867 itagaki.takahiro          241              56 :     return lobjId_new;
                                242                 : }
                                243                 : 
                                244                 : /*
                                245                 :  *  inv_open -- access an existing large object.
                                246                 :  *
                                247                 :  * Returns a large object descriptor, appropriately filled in.
                                248                 :  * The descriptor and subsidiary data are allocated in the specified
                                249                 :  * memory context, which must be suitably long-lived for the caller's
                                250                 :  * purposes.  If the returned descriptor has a snapshot associated
                                251                 :  * with it, the caller must ensure that it also lives long enough,
                                252                 :  * e.g. by calling RegisterSnapshotOnOwner
                                253                 :  */
                                254                 : LargeObjectDesc *
 6192 tgl                       255             238 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
                                256                 : {
                                257                 :     LargeObjectDesc *retval;
 3478 heikki.linnakangas        258             238 :     Snapshot    snapshot = NULL;
                                259             238 :     int         descflags = 0;
                                260                 : 
                                261                 :     /*
                                262                 :      * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
                                263                 :      * | INV_READ), the caller being allowed to read the large object
                                264                 :      * descriptor in either case.
                                265                 :      */
 8053 bruce                     266             238 :     if (flags & INV_WRITE)
 1977 tgl                       267              77 :         descflags |= IFS_WRLOCK | IFS_RDLOCK;
                                268             238 :     if (flags & INV_READ)
                                269             176 :         descflags |= IFS_RDLOCK;
                                270                 : 
                                271             238 :     if (descflags == 0)
 3835 tgl                       272 UBC           0 :         ereport(ERROR,
                                273                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                274                 :                  errmsg("invalid flags for opening a large object: %d",
                                275                 :                         flags)));
                                276                 : 
                                277                 :     /* Get snapshot.  If write is requested, use an instantaneous snapshot. */
 1977 tgl                       278 CBC         238 :     if (descflags & IFS_WRLOCK)
                                279              77 :         snapshot = NULL;
                                280                 :     else
                                281             161 :         snapshot = GetActiveSnapshot();
                                282                 : 
                                283                 :     /* Can't use LargeObjectExists here because we need to specify snapshot */
 3478 heikki.linnakangas        284             238 :     if (!myLargeObjectExists(lobjId, snapshot))
 6509 tgl                       285               2 :         ereport(ERROR,
                                286                 :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
                                287                 :                  errmsg("large object %u does not exist", lobjId)));
                                288                 : 
                                289                 :     /* Apply permission checks, again specifying snapshot */
 1977                           290             236 :     if ((descflags & IFS_RDLOCK) != 0)
                                291                 :     {
                                292             463 :         if (!lo_compat_privileges &&
                                293             227 :             pg_largeobject_aclcheck_snapshot(lobjId,
                                294                 :                                              GetUserId(),
                                295                 :                                              ACL_SELECT,
                                296                 :                                              snapshot) != ACLCHECK_OK)
                                297              21 :             ereport(ERROR,
                                298                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                299                 :                      errmsg("permission denied for large object %u",
                                300                 :                             lobjId)));
                                301                 :     }
                                302             215 :     if ((descflags & IFS_WRLOCK) != 0)
                                303                 :     {
                                304             124 :         if (!lo_compat_privileges &&
                                305              59 :             pg_largeobject_aclcheck_snapshot(lobjId,
                                306                 :                                              GetUserId(),
                                307                 :                                              ACL_UPDATE,
                                308                 :                                              snapshot) != ACLCHECK_OK)
                                309               6 :             ereport(ERROR,
                                310                 :                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                311                 :                      errmsg("permission denied for large object %u",
                                312                 :                             lobjId)));
                                313                 :     }
                                314                 : 
                                315                 :     /* OK to create a descriptor */
                                316             209 :     retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
                                317                 :                                                     sizeof(LargeObjectDesc));
                                318             209 :     retval->id = lobjId;
                                319             209 :     retval->offset = 0;
                                320             209 :     retval->flags = descflags;
                                321                 : 
                                322                 :     /* caller sets if needed, not used by the functions in this file */
  522 heikki.linnakangas        323             209 :     retval->subid = InvalidSubTransactionId;
                                324                 : 
                                325                 :     /*
                                326                 :      * The snapshot (if any) is just the currently active snapshot.  The
                                327                 :      * caller will replace it with a longer-lived copy if needed.
                                328                 :      */
 3478                           329             209 :     retval->snapshot = snapshot;
                                330                 : 
 8986 bruce                     331             209 :     return retval;
                                332                 : }
                                333                 : 
                                334                 : /*
                                335                 :  * Closes a large object descriptor previously made by inv_open(), and
                                336                 :  * releases the long-term memory used by it.
                                337                 :  */
                                338                 : void
 9344                           339             194 : inv_close(LargeObjectDesc *obj_desc)
                                340                 : {
 9345                           341             194 :     Assert(PointerIsValid(obj_desc));
                                342             194 :     pfree(obj_desc);
 9770 scrappy                   343             194 : }
                                344                 : 
                                345                 : /*
                                346                 :  * Destroys an existing large object (not to be confused with a descriptor!)
                                347                 :  *
                                348                 :  * Note we expect caller to have done any required permissions check.
                                349                 :  */
                                350                 : int
 8521 bruce                     351              41 : inv_drop(Oid lobjId)
                                352                 : {
                                353                 :     ObjectAddress object;
                                354                 : 
                                355                 :     /*
                                356                 :      * Delete any comments and dependencies on the large object
                                357                 :      */
 4867 itagaki.takahiro          358              41 :     object.classId = LargeObjectRelationId;
                                359              41 :     object.objectId = lobjId;
                                360              41 :     object.objectSubId = 0;
 4091 rhaas                     361              41 :     performDeletion(&object, DROP_CASCADE, 0);
                                362                 : 
                                363                 :     /*
                                364                 :      * Advance command counter so that tuple removal will be seen by later
                                365                 :      * large-object operations in this transaction.
                                366                 :      */
 8202 tgl                       367              41 :     CommandCounterIncrement();
                                368                 : 
                                369                 :     /* For historical reasons, we always return 1 on success. */
 9345 bruce                     370              41 :     return 1;
                                371                 : }
                                372                 : 
                                373                 : /*
                                374                 :  * Determine size of a large object
                                375                 :  *
                                376                 :  * NOTE: LOs can contain gaps, just like Unix files.  We actually return
                                377                 :  * the offset of the last byte + 1.
                                378                 :  */
                                379                 : static uint64
 8202 tgl                       380              52 : inv_getsize(LargeObjectDesc *obj_desc)
                                381                 : {
 3836 ishii                     382              52 :     uint64      lastbyte = 0;
                                383                 :     ScanKeyData skey[1];
                                384                 :     SysScanDesc sd;
                                385                 :     HeapTuple   tuple;
                                386                 : 
 9345 bruce                     387              52 :     Assert(PointerIsValid(obj_desc));
                                388                 : 
 6829 tgl                       389              52 :     open_lo_relation();
                                390                 : 
 7088                           391              52 :     ScanKeyInit(&skey[0],
                                392                 :                 Anum_pg_largeobject_loid,
                                393                 :                 BTEqualStrategyNumber, F_OIDEQ,
                                394                 :                 ObjectIdGetDatum(obj_desc->id));
                                395                 : 
 5475                           396              52 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
                                397                 :                                     obj_desc->snapshot, 1, skey);
                                398                 : 
                                399                 :     /*
                                400                 :      * Because the pg_largeobject index is on both loid and pageno, but we
                                401                 :      * constrain only loid, a backwards scan should visit all pages of the
                                402                 :      * large object in reverse pageno order.  So, it's sufficient to examine
                                403                 :      * the first valid tuple (== last valid page).
                                404                 :      */
 4867 itagaki.takahiro          405              52 :     tuple = systable_getnext_ordered(sd, BackwardScanDirection);
                                406              52 :     if (HeapTupleIsValid(tuple))
                                407                 :     {
                                408                 :         Form_pg_largeobject data;
                                409                 :         bytea      *datafield;
                                410                 :         int         len;
                                411                 :         bool        pfreeit;
                                412                 : 
 5624 bruce                     413              48 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
 5780 tgl                       414 UBC           0 :             elog(ERROR, "null field found in pg_largeobject");
 7629 tgl                       415 CBC          48 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
 3230                           416              48 :         getdatafield(data, &datafield, &len, &pfreeit);
                                417              48 :         lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
 8202                           418              48 :         if (pfreeit)
                                419               9 :             pfree(datafield);
                                420                 :     }
                                421                 : 
 5475                           422              52 :     systable_endscan_ordered(sd);
                                423                 : 
 8202                           424              52 :     return lastbyte;
                                425                 : }
                                426                 : 
                                427                 : int64
 3836 ishii                     428             110 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
                                429                 : {
                                430                 :     int64       newoffset;
                                431                 : 
 9345 bruce                     432             110 :     Assert(PointerIsValid(obj_desc));
                                433                 : 
                                434                 :     /*
                                435                 :      * We allow seek/tell if you have either read or write permission, so no
                                436                 :      * need for a permission check here.
                                437                 :      */
                                438                 : 
                                439                 :     /*
                                440                 :      * Note: overflow in the additions is possible, but since we will reject
                                441                 :      * negative results, we don't need any extra test for that.
                                442                 :      */
 8202 tgl                       443             110 :     switch (whence)
                                444                 :     {
                                445              49 :         case SEEK_SET:
 3835                           446              49 :             newoffset = offset;
 8202                           447              49 :             break;
                                448               9 :         case SEEK_CUR:
 3835                           449               9 :             newoffset = obj_desc->offset + offset;
 8202                           450               9 :             break;
                                451              52 :         case SEEK_END:
 3835                           452              52 :             newoffset = inv_getsize(obj_desc) + offset;
 8202                           453              52 :             break;
 8202 tgl                       454 UBC           0 :         default:
 3835                           455               0 :             ereport(ERROR,
                                456                 :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                457                 :                      errmsg("invalid whence setting: %d", whence)));
                                458                 :             newoffset = 0;      /* keep compiler quiet */
                                459                 :             break;
                                460                 :     }
                                461                 : 
                                462                 :     /*
                                463                 :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
                                464                 :      * in translatable strings; doing better is not worth the trouble
                                465                 :      */
 3835 tgl                       466 CBC         110 :     if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
 3835 tgl                       467 UBC           0 :         ereport(ERROR,
                                468                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                469                 :                  errmsg_internal("invalid large object seek target: " INT64_FORMAT,
                                470                 :                                  newoffset)));
                                471                 : 
 3835 tgl                       472 CBC         110 :     obj_desc->offset = newoffset;
                                473             110 :     return newoffset;
                                474                 : }
                                475                 : 
                                476                 : int64
 9344 bruce                     477              24 : inv_tell(LargeObjectDesc *obj_desc)
                                478                 : {
 9345                           479              24 :     Assert(PointerIsValid(obj_desc));
                                480                 : 
                                481                 :     /*
                                482                 :      * We allow seek/tell if you have either read or write permission, so no
                                483                 :      * need for a permission check here.
                                484                 :      */
                                485                 : 
 8986                           486              24 :     return obj_desc->offset;
                                487                 : }
                                488                 : 
                                489                 : int
 9344                           490             693 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
                                491                 : {
 8053                           492             693 :     int         nread = 0;
                                493                 :     int64       n;
                                494                 :     int64       off;
                                495                 :     int         len;
                                496             693 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
                                497                 :     uint64      pageoff;
                                498                 :     ScanKeyData skey[2];
                                499                 :     SysScanDesc sd;
                                500                 :     HeapTuple   tuple;
                                501                 : 
 9345                           502             693 :     Assert(PointerIsValid(obj_desc));
                                503             693 :     Assert(buf != NULL);
                                504                 : 
 1977 tgl                       505             693 :     if ((obj_desc->flags & IFS_RDLOCK) == 0)
 1977 tgl                       506 UBC           0 :         ereport(ERROR,
                                507                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                508                 :                  errmsg("permission denied for large object %u",
                                509                 :                         obj_desc->id)));
                                510                 : 
 8202 tgl                       511 CBC         693 :     if (nbytes <= 0)
 8204 bruce                     512               4 :         return 0;
                                513                 : 
 6829 tgl                       514             689 :     open_lo_relation();
                                515                 : 
 7088                           516             689 :     ScanKeyInit(&skey[0],
                                517                 :                 Anum_pg_largeobject_loid,
                                518                 :                 BTEqualStrategyNumber, F_OIDEQ,
                                519                 :                 ObjectIdGetDatum(obj_desc->id));
                                520                 : 
                                521             689 :     ScanKeyInit(&skey[1],
                                522                 :                 Anum_pg_largeobject_pageno,
                                523                 :                 BTGreaterEqualStrategyNumber, F_INT4GE,
                                524                 :                 Int32GetDatum(pageno));
                                525                 : 
 5475                           526             689 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
                                527                 :                                     obj_desc->snapshot, 2, skey);
                                528                 : 
                                529            5219 :     while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
                                530                 :     {
                                531                 :         Form_pg_largeobject data;
                                532                 :         bytea      *datafield;
                                533                 :         bool        pfreeit;
                                534                 : 
 5624 bruce                     535            5061 :         if (HeapTupleHasNulls(tuple))   /* paranoia */
 5780 tgl                       536 UBC           0 :             elog(ERROR, "null field found in pg_largeobject");
 7629 tgl                       537 CBC        5061 :         data = (Form_pg_largeobject) GETSTRUCT(tuple);
                                538                 : 
                                539                 :         /*
                                540                 :          * We expect the indexscan will deliver pages in order.  However,
                                541                 :          * there may be missing pages if the LO contains unwritten "holes". We
                                542                 :          * want missing sections to read out as zeroes.
                                543                 :          */
 3836 ishii                     544            5061 :         pageoff = ((uint64) data->pageno) * LOBLKSIZE;
 8202 tgl                       545            5061 :         if (pageoff > obj_desc->offset)
                                546                 :         {
                                547               6 :             n = pageoff - obj_desc->offset;
                                548               6 :             n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
                                549               6 :             MemSet(buf + nread, 0, n);
                                550               6 :             nread += n;
                                551               6 :             obj_desc->offset += n;
                                552                 :         }
                                553                 : 
                                554            5061 :         if (nread < nbytes)
                                555                 :         {
                                556            5058 :             Assert(obj_desc->offset >= pageoff);
                                557            5058 :             off = (int) (obj_desc->offset - pageoff);
                                558            5058 :             Assert(off >= 0 && off < LOBLKSIZE);
                                559                 : 
 3230                           560            5058 :             getdatafield(data, &datafield, &len, &pfreeit);
 8202                           561            5058 :             if (len > off)
                                562                 :             {
                                563            5007 :                 n = len - off;
                                564            5007 :                 n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
                                565            5007 :                 memcpy(buf + nread, VARDATA(datafield) + off, n);
                                566            5007 :                 nread += n;
                                567            5007 :                 obj_desc->offset += n;
                                568                 :             }
                                569            5058 :             if (pfreeit)
                                570            5020 :                 pfree(datafield);
                                571                 :         }
                                572                 : 
                                573            5061 :         if (nread >= nbytes)
                                574             531 :             break;
                                575                 :     }
                                576                 : 
 5475                           577             689 :     systable_endscan_ordered(sd);
                                578                 : 
 8986 bruce                     579             689 :     return nread;
                                580                 : }
                                581                 : 
                                582                 : int
 6058                           583             776 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
                                584                 : {
 8053                           585             776 :     int         nwritten = 0;
                                586                 :     int         n;
                                587                 :     int         off;
                                588                 :     int         len;
                                589             776 :     int32       pageno = (int32) (obj_desc->offset / LOBLKSIZE);
                                590                 :     ScanKeyData skey[2];
                                591                 :     SysScanDesc sd;
                                592                 :     HeapTuple   oldtuple;
                                593                 :     Form_pg_largeobject olddata;
                                594                 :     bool        neednextpage;
                                595                 :     bytea      *datafield;
                                596                 :     bool        pfreeit;
                                597                 :     union
                                598                 :     {
                                599                 :         bytea       hdr;
                                600                 :         /* this is to make the union big enough for a LO data chunk: */
                                601                 :         char        data[LOBLKSIZE + VARHDRSZ];
                                602                 :         /* ensure union is aligned well enough: */
                                603                 :         int32       align_it;
                                604                 :     }           workbuf;
 5885 tgl                       605             776 :     char       *workb = VARDATA(&workbuf.hdr);
                                606                 :     HeapTuple   newtup;
                                607                 :     Datum       values[Natts_pg_largeobject];
                                608                 :     bool        nulls[Natts_pg_largeobject];
                                609                 :     bool        replace[Natts_pg_largeobject];
                                610                 :     CatalogIndexState indstate;
                                611                 : 
 9345 bruce                     612             776 :     Assert(PointerIsValid(obj_desc));
                                613             776 :     Assert(buf != NULL);
                                614                 : 
                                615                 :     /* enforce writability because snapshot is probably wrong otherwise */
 1977 tgl                       616             776 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
 1977 tgl                       617 UBC           0 :         ereport(ERROR,
                                618                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                619                 :                  errmsg("permission denied for large object %u",
                                620                 :                         obj_desc->id)));
                                621                 : 
 8202 tgl                       622 CBC         776 :     if (nbytes <= 0)
 8202 tgl                       623 UBC           0 :         return 0;
                                624                 : 
                                625                 :     /* this addition can't overflow because nbytes is only int32 */
 3836 ishii                     626 CBC         776 :     if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
 3835 tgl                       627 UBC           0 :         ereport(ERROR,
                                628                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                629                 :                  errmsg("invalid large object write request size: %d",
                                630                 :                         nbytes)));
                                631                 : 
 6829 tgl                       632 CBC         776 :     open_lo_relation();
                                633                 : 
                                634             776 :     indstate = CatalogOpenIndexes(lo_heap_r);
                                635                 : 
 7088                           636             776 :     ScanKeyInit(&skey[0],
                                637                 :                 Anum_pg_largeobject_loid,
                                638                 :                 BTEqualStrategyNumber, F_OIDEQ,
                                639                 :                 ObjectIdGetDatum(obj_desc->id));
                                640                 : 
                                641             776 :     ScanKeyInit(&skey[1],
                                642                 :                 Anum_pg_largeobject_pageno,
                                643                 :                 BTGreaterEqualStrategyNumber, F_INT4GE,
                                644                 :                 Int32GetDatum(pageno));
                                645                 : 
 5475                           646             776 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
                                647                 :                                     obj_desc->snapshot, 2, skey);
                                648                 : 
 7629                           649             776 :     oldtuple = NULL;
 8202                           650             776 :     olddata = NULL;
                                651             776 :     neednextpage = true;
                                652                 : 
 8204 bruce                     653            4750 :     while (nwritten < nbytes)
                                654                 :     {
                                655                 :         /*
                                656                 :          * If possible, get next pre-existing page of the LO.  We expect the
                                657                 :          * indexscan will deliver these in order --- but there may be holes.
                                658                 :          */
 8202 tgl                       659            3974 :         if (neednextpage)
                                660                 :         {
 5475                           661             779 :             if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
                                662                 :             {
 2118                           663              12 :                 if (HeapTupleHasNulls(oldtuple))    /* paranoia */
 5780 tgl                       664 UBC           0 :                     elog(ERROR, "null field found in pg_largeobject");
 7629 tgl                       665 CBC          12 :                 olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
                                666              12 :                 Assert(olddata->pageno >= pageno);
                                667                 :             }
 8202                           668             779 :             neednextpage = false;
                                669                 :         }
                                670                 : 
                                671                 :         /*
                                672                 :          * If we have a pre-existing page, see if it is the page we want to
                                673                 :          * write, or a later one.
                                674                 :          */
                                675            3974 :         if (olddata != NULL && olddata->pageno == pageno)
                                676                 :         {
                                677                 :             /*
                                678                 :              * Update an existing page with fresh data.
                                679                 :              *
                                680                 :              * First, load old data into workbuf
                                681                 :              */
 3230                           682              12 :             getdatafield(olddata, &datafield, &len, &pfreeit);
 8202                           683              12 :             memcpy(workb, VARDATA(datafield), len);
                                684              12 :             if (pfreeit)
                                685               9 :                 pfree(datafield);
                                686                 : 
                                687                 :             /*
                                688                 :              * Fill any hole
                                689                 :              */
                                690              12 :             off = (int) (obj_desc->offset % LOBLKSIZE);
                                691              12 :             if (off > len)
 8202 tgl                       692 UBC           0 :                 MemSet(workb + len, 0, off - len);
                                693                 : 
                                694                 :             /*
                                695                 :              * Insert appropriate portion of new data
                                696                 :              */
 8202 tgl                       697 CBC          12 :             n = LOBLKSIZE - off;
                                698              12 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
                                699              12 :             memcpy(workb + off, buf + nwritten, n);
                                700              12 :             nwritten += n;
                                701              12 :             obj_desc->offset += n;
                                702              12 :             off += n;
                                703                 :             /* compute valid length of new page */
                                704              12 :             len = (len >= off) ? len : off;
 5885                           705              12 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
                                706                 : 
                                707                 :             /*
                                708                 :              * Form and insert updated tuple
                                709                 :              */
 8202                           710              12 :             memset(values, 0, sizeof(values));
 5271                           711              12 :             memset(nulls, false, sizeof(nulls));
                                712              12 :             memset(replace, false, sizeof(replace));
 8050                           713              12 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 5271                           714              12 :             replace[Anum_pg_largeobject_data - 1] = true;
                                715              12 :             newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
                                716                 :                                        values, nulls, replace);
 2258                           717              12 :             CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
                                718                 :                                        indstate);
 8202                           719              12 :             heap_freetuple(newtup);
                                720                 : 
                                721                 :             /*
                                722                 :              * We're done with this old page.
                                723                 :              */
 7629                           724              12 :             oldtuple = NULL;
 8202                           725              12 :             olddata = NULL;
                                726              12 :             neednextpage = true;
                                727                 :         }
                                728                 :         else
                                729                 :         {
                                730                 :             /*
                                731                 :              * Write a brand new page.
                                732                 :              *
                                733                 :              * First, fill any hole
                                734                 :              */
                                735            3962 :             off = (int) (obj_desc->offset % LOBLKSIZE);
                                736            3962 :             if (off > 0)
                                737               3 :                 MemSet(workb, 0, off);
                                738                 : 
                                739                 :             /*
                                740                 :              * Insert appropriate portion of new data
                                741                 :              */
                                742            3962 :             n = LOBLKSIZE - off;
                                743            3962 :             n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
                                744            3962 :             memcpy(workb + off, buf + nwritten, n);
                                745            3962 :             nwritten += n;
                                746            3962 :             obj_desc->offset += n;
                                747                 :             /* compute valid length of new page */
                                748            3962 :             len = off + n;
 5885                           749            3962 :             SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
                                750                 : 
                                751                 :             /*
                                752                 :              * Form and insert updated tuple
                                753                 :              */
 8202                           754            3962 :             memset(values, 0, sizeof(values));
 5271                           755            3962 :             memset(nulls, false, sizeof(nulls));
 8202                           756            3962 :             values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
                                757            3962 :             values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
 8050                           758            3962 :             values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 5271                           759            3962 :             newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
 2258                           760            3962 :             CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
 8202                           761            3962 :             heap_freetuple(newtup);
                                762                 :         }
                                763            3974 :         pageno++;
                                764                 :     }
                                765                 : 
 5475                           766             776 :     systable_endscan_ordered(sd);
                                767                 : 
 7552                           768             776 :     CatalogCloseIndexes(indstate);
                                769                 : 
                                770                 :     /*
                                771                 :      * Advance command counter so that my tuple updates will be seen by later
                                772                 :      * large-object operations in this transaction.
                                773                 :      */
 8202                           774             776 :     CommandCounterIncrement();
                                775                 : 
 8204 bruce                     776             776 :     return nwritten;
                                777                 : }
                                778                 : 
                                779                 : void
 3836 ishii                     780              21 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
                                781                 : {
 5881 bruce                     782              21 :     int32       pageno = (int32) (len / LOBLKSIZE);
                                783                 :     int32       off;
                                784                 :     ScanKeyData skey[2];
                                785                 :     SysScanDesc sd;
                                786                 :     HeapTuple   oldtuple;
                                787                 :     Form_pg_largeobject olddata;
                                788                 :     union
                                789                 :     {
                                790                 :         bytea       hdr;
                                791                 :         /* this is to make the union big enough for a LO data chunk: */
                                792                 :         char        data[LOBLKSIZE + VARHDRSZ];
                                793                 :         /* ensure union is aligned well enough: */
                                794                 :         int32       align_it;
                                795                 :     }           workbuf;
 5624                           796              21 :     char       *workb = VARDATA(&workbuf.hdr);
                                797                 :     HeapTuple   newtup;
                                798                 :     Datum       values[Natts_pg_largeobject];
                                799                 :     bool        nulls[Natts_pg_largeobject];
                                800                 :     bool        replace[Natts_pg_largeobject];
                                801                 :     CatalogIndexState indstate;
                                802                 : 
 5881                           803              21 :     Assert(PointerIsValid(obj_desc));
                                804                 : 
                                805                 :     /* enforce writability because snapshot is probably wrong otherwise */
 1977 tgl                       806              21 :     if ((obj_desc->flags & IFS_WRLOCK) == 0)
 1977 tgl                       807 UBC           0 :         ereport(ERROR,
                                808                 :                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                809                 :                  errmsg("permission denied for large object %u",
                                810                 :                         obj_desc->id)));
                                811                 : 
                                812                 :     /*
                                813                 :      * use errmsg_internal here because we don't want to expose INT64_FORMAT
                                814                 :      * in translatable strings; doing better is not worth the trouble
                                815                 :      */
 3835 tgl                       816 CBC          21 :     if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
 3835 tgl                       817 UBC           0 :         ereport(ERROR,
                                818                 :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
                                819                 :                  errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
                                820                 :                                  len)));
                                821                 : 
 5881 bruce                     822 CBC          21 :     open_lo_relation();
                                823                 : 
                                824              21 :     indstate = CatalogOpenIndexes(lo_heap_r);
                                825                 : 
                                826                 :     /*
                                827                 :      * Set up to find all pages with desired loid and pageno >= target
                                828                 :      */
                                829              21 :     ScanKeyInit(&skey[0],
                                830                 :                 Anum_pg_largeobject_loid,
                                831                 :                 BTEqualStrategyNumber, F_OIDEQ,
                                832                 :                 ObjectIdGetDatum(obj_desc->id));
                                833                 : 
                                834              21 :     ScanKeyInit(&skey[1],
                                835                 :                 Anum_pg_largeobject_pageno,
                                836                 :                 BTGreaterEqualStrategyNumber, F_INT4GE,
                                837                 :                 Int32GetDatum(pageno));
                                838                 : 
 5475 tgl                       839              21 :     sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
                                840                 :                                     obj_desc->snapshot, 2, skey);
                                841                 : 
                                842                 :     /*
                                843                 :      * If possible, get the page the truncation point is in. The truncation
                                844                 :      * point may be beyond the end of the LO or in a hole.
                                845                 :      */
 5881 bruce                     846              21 :     olddata = NULL;
 5475 tgl                       847              21 :     if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
                                848                 :     {
 2118                           849              12 :         if (HeapTupleHasNulls(oldtuple))    /* paranoia */
 5780 tgl                       850 UBC           0 :             elog(ERROR, "null field found in pg_largeobject");
 5881 bruce                     851 CBC          12 :         olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
                                852              12 :         Assert(olddata->pageno >= pageno);
                                853                 :     }
                                854                 : 
                                855                 :     /*
                                856                 :      * If we found the page of the truncation point we need to truncate the
                                857                 :      * data in it.  Otherwise if we're in a hole, we need to create a page to
                                858                 :      * mark the end of data.
                                859                 :      */
                                860              21 :     if (olddata != NULL && olddata->pageno == pageno)
                                861               6 :     {
                                862                 :         /* First, load old data into workbuf */
                                863                 :         bytea      *datafield;
                                864                 :         int         pagelen;
                                865                 :         bool        pfreeit;
                                866                 : 
 3230 tgl                       867               6 :         getdatafield(olddata, &datafield, &pagelen, &pfreeit);
 5881 bruce                     868               6 :         memcpy(workb, VARDATA(datafield), pagelen);
                                869               6 :         if (pfreeit)
 5624                           870               3 :             pfree(datafield);
                                871                 : 
                                872                 :         /*
                                873                 :          * Fill any hole
                                874                 :          */
 5881                           875               6 :         off = len % LOBLKSIZE;
                                876               6 :         if (off > pagelen)
 5624                           877               3 :             MemSet(workb + pagelen, 0, off - pagelen);
                                878                 : 
                                879                 :         /* compute length of new page */
 5881                           880               6 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
                                881                 : 
                                882                 :         /*
                                883                 :          * Form and insert updated tuple
                                884                 :          */
                                885               6 :         memset(values, 0, sizeof(values));
 5271 tgl                       886               6 :         memset(nulls, false, sizeof(nulls));
                                887               6 :         memset(replace, false, sizeof(replace));
 5881 bruce                     888               6 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 5271 tgl                       889               6 :         replace[Anum_pg_largeobject_data - 1] = true;
                                890               6 :         newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
                                891                 :                                    values, nulls, replace);
 2258                           892               6 :         CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
                                893                 :                                    indstate);
 5881 bruce                     894               6 :         heap_freetuple(newtup);
                                895                 :     }
                                896                 :     else
                                897                 :     {
                                898                 :         /*
                                899                 :          * If the first page we found was after the truncation point, we're in
                                900                 :          * a hole that we'll fill, but we need to delete the later page
                                901                 :          * because the loop below won't visit it again.
                                902                 :          */
 4456 tgl                       903              15 :         if (olddata != NULL)
                                904                 :         {
                                905               6 :             Assert(olddata->pageno > pageno);
 2258                           906               6 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
                                907                 :         }
                                908                 : 
                                909                 :         /*
                                910                 :          * Write a brand new page.
                                911                 :          *
                                912                 :          * Fill the hole up to the truncation point
                                913                 :          */
 5881 bruce                     914              15 :         off = len % LOBLKSIZE;
                                915              15 :         if (off > 0)
                                916              15 :             MemSet(workb, 0, off);
                                917                 : 
                                918                 :         /* compute length of new page */
                                919              15 :         SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
                                920                 : 
                                921                 :         /*
                                922                 :          * Form and insert new tuple
                                923                 :          */
                                924              15 :         memset(values, 0, sizeof(values));
 5271 tgl                       925              15 :         memset(nulls, false, sizeof(nulls));
 5881 bruce                     926              15 :         values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
                                927              15 :         values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
                                928              15 :         values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
 5271 tgl                       929              15 :         newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
 2258                           930              15 :         CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
 5881 bruce                     931              15 :         heap_freetuple(newtup);
                                932                 :     }
                                933                 : 
                                934                 :     /*
                                935                 :      * Delete any pages after the truncation point.  If the initial search
                                936                 :      * didn't find a page, then of course there's nothing more to do.
                                937                 :      */
 4456 tgl                       938              21 :     if (olddata != NULL)
                                939                 :     {
                                940              15 :         while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
                                941                 :         {
 2258                           942               3 :             CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
                                943                 :         }
                                944                 :     }
                                945                 : 
 5475                           946              21 :     systable_endscan_ordered(sd);
                                947                 : 
 5881 bruce                     948              21 :     CatalogCloseIndexes(indstate);
                                949                 : 
                                950                 :     /*
                                951                 :      * Advance command counter so that tuple updates will be seen by later
                                952                 :      * large-object operations in this transaction.
                                953                 :      */
                                954              21 :     CommandCounterIncrement();
                                955              21 : }
        

Generated by: LCOV version v1.16-55-g56c0a2a