Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * inv_api.c
4 : : * routines for manipulating inversion fs large objects. This file
5 : : * contains the user-level large object application interface routines.
6 : : *
7 : : *
8 : : * Note: we access pg_largeobject.data using its C struct declaration.
9 : : * This is safe because it immediately follows pageno which is an int4 field,
10 : : * and therefore the data field will always be 4-byte aligned, even if it
11 : : * is in the short 1-byte-header format. We have to detoast it since it's
12 : : * quite likely to be in compressed or short format. We also need to check
13 : : * for NULLs, since initdb will mark loid and pageno but not data as NOT NULL.
14 : : *
15 : : * Note: many of these routines leak memory in CurrentMemoryContext, as indeed
16 : : * does most of the backend code. We expect that CurrentMemoryContext will
17 : : * be a short-lived context. Data that must persist across function calls
18 : : * is kept either in CacheMemoryContext (the Relation structs) or in the
19 : : * memory context given to inv_open (for LargeObjectDesc structs).
20 : : *
21 : : *
22 : : * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
23 : : * Portions Copyright (c) 1994, Regents of the University of California
24 : : *
25 : : *
26 : : * IDENTIFICATION
27 : : * src/backend/storage/large_object/inv_api.c
28 : : *
29 : : *-------------------------------------------------------------------------
30 : : */
31 : : #include "postgres.h"
32 : :
33 : : #include <limits.h>
34 : :
35 : : #include "access/detoast.h"
36 : : #include "access/genam.h"
37 : : #include "access/htup_details.h"
38 : : #include "access/table.h"
39 : : #include "access/xact.h"
40 : : #include "catalog/dependency.h"
41 : : #include "catalog/indexing.h"
42 : : #include "catalog/objectaccess.h"
43 : : #include "catalog/pg_largeobject.h"
44 : : #include "catalog/pg_largeobject_metadata.h"
45 : : #include "libpq/libpq-fs.h"
46 : : #include "miscadmin.h"
47 : : #include "storage/large_object.h"
48 : : #include "utils/acl.h"
49 : : #include "utils/fmgroids.h"
50 : : #include "utils/rel.h"
51 : : #include "utils/snapmgr.h"
52 : :
53 : :
54 : : /*
55 : : * GUC: backwards-compatibility flag to suppress LO permission checks
56 : : */
57 : : bool lo_compat_privileges;
58 : :
59 : : /*
60 : : * All accesses to pg_largeobject and its index make use of a single
61 : : * Relation reference. To guarantee that the relcache entry remains
62 : : * in the cache, on the first reference inside a subtransaction, we
63 : : * execute a slightly klugy maneuver to assign ownership of the
64 : : * Relation reference to TopTransactionResourceOwner.
65 : : */
66 : : static Relation lo_heap_r = NULL;
67 : : static Relation lo_index_r = NULL;
68 : :
69 : :
70 : : /*
71 : : * Open pg_largeobject and its index, if not already done in current xact
72 : : */
73 : : static void
7200 tgl@sss.pgh.pa.us 74 :CBC 1538 : open_lo_relation(void)
75 : : {
76 : : ResourceOwner currentOwner;
77 : :
78 [ + + + - ]: 1538 : if (lo_heap_r && lo_index_r)
79 : 1388 : return; /* already open in current xact */
80 : :
81 : : /* Arrange for the top xact to own these relation references */
82 : 150 : currentOwner = CurrentResourceOwner;
2377 83 : 150 : CurrentResourceOwner = TopTransactionResourceOwner;
84 : :
85 : : /* Use RowExclusiveLock since we might either read or write */
86 [ + - ]: 150 : if (lo_heap_r == NULL)
1910 andres@anarazel.de 87 : 150 : lo_heap_r = table_open(LargeObjectRelationId, RowExclusiveLock);
2377 tgl@sss.pgh.pa.us 88 [ + - ]: 150 : if (lo_index_r == NULL)
89 : 150 : lo_index_r = index_open(LargeObjectLOidPNIndexId, RowExclusiveLock);
90 : :
7200 91 : 150 : CurrentResourceOwner = currentOwner;
92 : : }
93 : :
94 : : /*
95 : : * Clean up at main transaction end
96 : : */
97 : : void
98 : 225 : close_lo_relation(bool isCommit)
99 : : {
100 [ + + - + ]: 225 : if (lo_heap_r || lo_index_r)
101 : : {
102 : : /*
103 : : * Only bother to close if committing; else abort cleanup will handle
104 : : * it
105 : : */
106 [ + + ]: 150 : if (isCommit)
107 : : {
108 : : ResourceOwner currentOwner;
109 : :
110 : 105 : currentOwner = CurrentResourceOwner;
2377 111 : 105 : CurrentResourceOwner = TopTransactionResourceOwner;
112 : :
113 [ + - ]: 105 : if (lo_index_r)
114 : 105 : index_close(lo_index_r, NoLock);
115 [ + - ]: 105 : if (lo_heap_r)
1910 andres@anarazel.de 116 : 105 : table_close(lo_heap_r, NoLock);
117 : :
7200 tgl@sss.pgh.pa.us 118 : 105 : CurrentResourceOwner = currentOwner;
119 : : }
120 : 150 : lo_heap_r = NULL;
121 : 150 : lo_index_r = NULL;
122 : : }
123 : 225 : }
124 : :
125 : :
126 : : /*
127 : : * Same as pg_largeobject.c's LargeObjectExists(), except snapshot to
128 : : * read with can be specified.
129 : : */
130 : : static bool
6880 131 : 238 : myLargeObjectExists(Oid loid, Snapshot snapshot)
132 : : {
133 : : Relation pg_lo_meta;
134 : : ScanKeyData skey[1];
135 : : SysScanDesc sd;
136 : : HeapTuple tuple;
137 : 238 : bool retval = false;
138 : :
139 : 238 : ScanKeyInit(&skey[0],
140 : : Anum_pg_largeobject_metadata_oid,
141 : : BTEqualStrategyNumber, F_OIDEQ,
142 : : ObjectIdGetDatum(loid));
143 : :
1910 andres@anarazel.de 144 : 238 : pg_lo_meta = table_open(LargeObjectMetadataRelationId,
145 : : AccessShareLock);
146 : :
5238 itagaki.takahiro@gma 147 : 238 : sd = systable_beginscan(pg_lo_meta,
148 : : LargeObjectMetadataOidIndexId, true,
149 : : snapshot, 1, skey);
150 : :
151 : 238 : tuple = systable_getnext(sd);
152 [ + + ]: 238 : if (HeapTupleIsValid(tuple))
6880 tgl@sss.pgh.pa.us 153 : 236 : retval = true;
154 : :
155 : 238 : systable_endscan(sd);
156 : :
1910 andres@anarazel.de 157 : 238 : table_close(pg_lo_meta, AccessShareLock);
158 : :
6880 tgl@sss.pgh.pa.us 159 : 238 : return retval;
160 : : }
161 : :
162 : :
163 : : /*
164 : : * Extract data field from a pg_largeobject tuple, detoasting if needed
165 : : * and verifying that the length is sane. Returns data pointer (a bytea *),
166 : : * data length, and an indication of whether to pfree the data pointer.
167 : : */
168 : : static void
3601 169 : 5124 : getdatafield(Form_pg_largeobject tuple,
170 : : bytea **pdatafield,
171 : : int *plen,
172 : : bool *pfreeit)
173 : : {
174 : : bytea *datafield;
175 : : int len;
176 : : bool freeit;
177 : :
178 : 5124 : datafield = &(tuple->data); /* see note at top of file */
179 : 5124 : freeit = false;
180 [ + + ]: 5124 : if (VARATT_IS_EXTENDED(datafield))
181 : : {
182 : : datafield = (bytea *)
1654 rhaas@postgresql.org 183 : 5041 : detoast_attr((struct varlena *) datafield);
3601 tgl@sss.pgh.pa.us 184 : 5041 : freeit = true;
185 : : }
186 : 5124 : len = VARSIZE(datafield) - VARHDRSZ;
187 [ + - - + ]: 5124 : if (len < 0 || len > LOBLKSIZE)
3601 tgl@sss.pgh.pa.us 188 [ # # ]:UBC 0 : ereport(ERROR,
189 : : (errcode(ERRCODE_DATA_CORRUPTED),
190 : : errmsg("pg_largeobject entry for OID %u, page %d has invalid data field size %d",
191 : : tuple->loid, tuple->pageno, len)));
3601 tgl@sss.pgh.pa.us 192 :CBC 5124 : *pdatafield = datafield;
193 : 5124 : *plen = len;
194 : 5124 : *pfreeit = freeit;
8573 195 : 5124 : }
196 : :
197 : :
198 : : /*
199 : : * inv_create -- create a new large object
200 : : *
201 : : * Arguments:
202 : : * lobjId - OID to use for new large object, or InvalidOid to pick one
203 : : *
204 : : * Returns:
205 : : * OID of new object
206 : : *
207 : : * If lobjId is not InvalidOid, then an error occurs if the OID is already
208 : : * in use.
209 : : */
210 : : Oid
6880 211 : 56 : inv_create(Oid lobjId)
212 : : {
213 : : Oid lobjId_new;
214 : :
215 : : /*
216 : : * Create a new largeobject with empty data pages
217 : : */
5238 itagaki.takahiro@gma 218 : 56 : lobjId_new = LargeObjectCreate(lobjId);
219 : :
220 : : /*
221 : : * dependency on the owner of largeobject
222 : : *
223 : : * Note that LO dependencies are recorded using classId
224 : : * LargeObjectRelationId for backwards-compatibility reasons. Using
225 : : * LargeObjectMetadataRelationId instead would simplify matters for the
226 : : * backend, but it'd complicate pg_dump and possibly break other clients.
227 : : */
228 : 56 : recordDependencyOnOwner(LargeObjectRelationId,
229 : : lobjId_new, GetUserId());
230 : :
231 : : /* Post creation hook for new large object */
4057 rhaas@postgresql.org 232 [ - + ]: 56 : InvokeObjectPostCreateHook(LargeObjectRelationId, lobjId_new, 0);
233 : :
234 : : /*
235 : : * Advance command counter to make new tuple visible to later operations.
236 : : */
8575 bruce@momjian.us 237 : 56 : CommandCounterIncrement();
238 : :
5238 itagaki.takahiro@gma 239 : 56 : return lobjId_new;
240 : : }
241 : :
242 : : /*
243 : : * inv_open -- access an existing large object.
244 : : *
245 : : * Returns a large object descriptor, appropriately filled in.
246 : : * The descriptor and subsidiary data are allocated in the specified
247 : : * memory context, which must be suitably long-lived for the caller's
248 : : * purposes. If the returned descriptor has a snapshot associated
249 : : * with it, the caller must ensure that it also lives long enough,
250 : : * e.g. by calling RegisterSnapshotOnOwner
251 : : */
252 : : LargeObjectDesc *
6563 tgl@sss.pgh.pa.us 253 : 238 : inv_open(Oid lobjId, int flags, MemoryContext mcxt)
254 : : {
255 : : LargeObjectDesc *retval;
3849 heikki.linnakangas@i 256 : 238 : Snapshot snapshot = NULL;
257 : 238 : int descflags = 0;
258 : :
259 : : /*
260 : : * Historically, no difference is made between (INV_WRITE) and (INV_WRITE
261 : : * | INV_READ), the caller being allowed to read the large object
262 : : * descriptor in either case.
263 : : */
8424 bruce@momjian.us 264 [ + + ]: 238 : if (flags & INV_WRITE)
2348 tgl@sss.pgh.pa.us 265 : 77 : descflags |= IFS_WRLOCK | IFS_RDLOCK;
266 [ + + ]: 238 : if (flags & INV_READ)
267 : 176 : descflags |= IFS_RDLOCK;
268 : :
269 [ - + ]: 238 : if (descflags == 0)
4206 tgl@sss.pgh.pa.us 270 [ # # ]:UBC 0 : ereport(ERROR,
271 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
272 : : errmsg("invalid flags for opening a large object: %d",
273 : : flags)));
274 : :
275 : : /* Get snapshot. If write is requested, use an instantaneous snapshot. */
2348 tgl@sss.pgh.pa.us 276 [ + + ]:CBC 238 : if (descflags & IFS_WRLOCK)
277 : 77 : snapshot = NULL;
278 : : else
279 : 161 : snapshot = GetActiveSnapshot();
280 : :
281 : : /* Can't use LargeObjectExists here because we need to specify snapshot */
3849 heikki.linnakangas@i 282 [ + + ]: 238 : if (!myLargeObjectExists(lobjId, snapshot))
6880 tgl@sss.pgh.pa.us 283 [ + - ]: 2 : ereport(ERROR,
284 : : (errcode(ERRCODE_UNDEFINED_OBJECT),
285 : : errmsg("large object %u does not exist", lobjId)));
286 : :
287 : : /* Apply permission checks, again specifying snapshot */
2348 288 [ + - ]: 236 : if ((descflags & IFS_RDLOCK) != 0)
289 : : {
290 [ + + + + ]: 463 : if (!lo_compat_privileges &&
291 : 227 : pg_largeobject_aclcheck_snapshot(lobjId,
292 : : GetUserId(),
293 : : ACL_SELECT,
294 : : snapshot) != ACLCHECK_OK)
295 [ + - ]: 21 : ereport(ERROR,
296 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
297 : : errmsg("permission denied for large object %u",
298 : : lobjId)));
299 : : }
300 [ + + ]: 215 : if ((descflags & IFS_WRLOCK) != 0)
301 : : {
302 [ + + + + ]: 124 : if (!lo_compat_privileges &&
303 : 59 : pg_largeobject_aclcheck_snapshot(lobjId,
304 : : GetUserId(),
305 : : ACL_UPDATE,
306 : : snapshot) != ACLCHECK_OK)
307 [ + - ]: 6 : ereport(ERROR,
308 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
309 : : errmsg("permission denied for large object %u",
310 : : lobjId)));
311 : : }
312 : :
313 : : /* OK to create a descriptor */
314 : 209 : retval = (LargeObjectDesc *) MemoryContextAlloc(mcxt,
315 : : sizeof(LargeObjectDesc));
316 : 209 : retval->id = lobjId;
317 : 209 : retval->offset = 0;
318 : 209 : retval->flags = descflags;
319 : :
320 : : /* caller sets if needed, not used by the functions in this file */
893 heikki.linnakangas@i 321 : 209 : retval->subid = InvalidSubTransactionId;
322 : :
323 : : /*
324 : : * The snapshot (if any) is just the currently active snapshot. The
325 : : * caller will replace it with a longer-lived copy if needed.
326 : : */
3849 327 : 209 : retval->snapshot = snapshot;
328 : :
9357 bruce@momjian.us 329 : 209 : return retval;
330 : : }
331 : :
332 : : /*
333 : : * Closes a large object descriptor previously made by inv_open(), and
334 : : * releases the long-term memory used by it.
335 : : */
336 : : void
9715 337 : 194 : inv_close(LargeObjectDesc *obj_desc)
338 : : {
9716 339 [ - + ]: 194 : Assert(PointerIsValid(obj_desc));
340 : 194 : pfree(obj_desc);
10141 scrappy@hub.org 341 : 194 : }
342 : :
343 : : /*
344 : : * Destroys an existing large object (not to be confused with a descriptor!)
345 : : *
346 : : * Note we expect caller to have done any required permissions check.
347 : : */
348 : : int
8892 bruce@momjian.us 349 : 41 : inv_drop(Oid lobjId)
350 : : {
351 : : ObjectAddress object;
352 : :
353 : : /*
354 : : * Delete any comments and dependencies on the large object
355 : : */
5238 itagaki.takahiro@gma 356 : 41 : object.classId = LargeObjectRelationId;
357 : 41 : object.objectId = lobjId;
358 : 41 : object.objectSubId = 0;
4462 rhaas@postgresql.org 359 : 41 : performDeletion(&object, DROP_CASCADE, 0);
360 : :
361 : : /*
362 : : * Advance command counter so that tuple removal will be seen by later
363 : : * large-object operations in this transaction.
364 : : */
8573 tgl@sss.pgh.pa.us 365 : 41 : CommandCounterIncrement();
366 : :
367 : : /* For historical reasons, we always return 1 on success. */
9716 bruce@momjian.us 368 : 41 : return 1;
369 : : }
370 : :
371 : : /*
372 : : * Determine size of a large object
373 : : *
374 : : * NOTE: LOs can contain gaps, just like Unix files. We actually return
375 : : * the offset of the last byte + 1.
376 : : */
377 : : static uint64
8573 tgl@sss.pgh.pa.us 378 : 52 : inv_getsize(LargeObjectDesc *obj_desc)
379 : : {
4207 ishii@postgresql.org 380 : 52 : uint64 lastbyte = 0;
381 : : ScanKeyData skey[1];
382 : : SysScanDesc sd;
383 : : HeapTuple tuple;
384 : :
9716 bruce@momjian.us 385 [ - + ]: 52 : Assert(PointerIsValid(obj_desc));
386 : :
7200 tgl@sss.pgh.pa.us 387 : 52 : open_lo_relation();
388 : :
7459 389 : 52 : ScanKeyInit(&skey[0],
390 : : Anum_pg_largeobject_loid,
391 : : BTEqualStrategyNumber, F_OIDEQ,
392 : : ObjectIdGetDatum(obj_desc->id));
393 : :
5846 394 : 52 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
395 : : obj_desc->snapshot, 1, skey);
396 : :
397 : : /*
398 : : * Because the pg_largeobject index is on both loid and pageno, but we
399 : : * constrain only loid, a backwards scan should visit all pages of the
400 : : * large object in reverse pageno order. So, it's sufficient to examine
401 : : * the first valid tuple (== last valid page).
402 : : */
5238 itagaki.takahiro@gma 403 : 52 : tuple = systable_getnext_ordered(sd, BackwardScanDirection);
404 [ + + ]: 52 : if (HeapTupleIsValid(tuple))
405 : : {
406 : : Form_pg_largeobject data;
407 : : bytea *datafield;
408 : : int len;
409 : : bool pfreeit;
410 : :
5995 bruce@momjian.us 411 [ - + ]: 48 : if (HeapTupleHasNulls(tuple)) /* paranoia */
6151 tgl@sss.pgh.pa.us 412 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
8000 tgl@sss.pgh.pa.us 413 :CBC 48 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
3601 414 : 48 : getdatafield(data, &datafield, &len, &pfreeit);
415 : 48 : lastbyte = (uint64) data->pageno * LOBLKSIZE + len;
8573 416 [ + + ]: 48 : if (pfreeit)
417 : 9 : pfree(datafield);
418 : : }
419 : :
5846 420 : 52 : systable_endscan_ordered(sd);
421 : :
8573 422 : 52 : return lastbyte;
423 : : }
424 : :
425 : : int64
4207 ishii@postgresql.org 426 : 110 : inv_seek(LargeObjectDesc *obj_desc, int64 offset, int whence)
427 : : {
428 : : int64 newoffset;
429 : :
9716 bruce@momjian.us 430 [ - + ]: 110 : Assert(PointerIsValid(obj_desc));
431 : :
432 : : /*
433 : : * We allow seek/tell if you have either read or write permission, so no
434 : : * need for a permission check here.
435 : : */
436 : :
437 : : /*
438 : : * Note: overflow in the additions is possible, but since we will reject
439 : : * negative results, we don't need any extra test for that.
440 : : */
8573 tgl@sss.pgh.pa.us 441 [ + + + - ]: 110 : switch (whence)
442 : : {
443 : 49 : case SEEK_SET:
4206 444 : 49 : newoffset = offset;
8573 445 : 49 : break;
446 : 9 : case SEEK_CUR:
4206 447 : 9 : newoffset = obj_desc->offset + offset;
8573 448 : 9 : break;
449 : 52 : case SEEK_END:
4206 450 : 52 : newoffset = inv_getsize(obj_desc) + offset;
8573 451 : 52 : break;
8573 tgl@sss.pgh.pa.us 452 :UBC 0 : default:
4206 453 [ # # ]: 0 : ereport(ERROR,
454 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
455 : : errmsg("invalid whence setting: %d", whence)));
456 : : newoffset = 0; /* keep compiler quiet */
457 : : break;
458 : : }
459 : :
460 : : /*
461 : : * use errmsg_internal here because we don't want to expose INT64_FORMAT
462 : : * in translatable strings; doing better is not worth the trouble
463 : : */
4206 tgl@sss.pgh.pa.us 464 [ + - - + ]:CBC 110 : if (newoffset < 0 || newoffset > MAX_LARGE_OBJECT_SIZE)
4206 tgl@sss.pgh.pa.us 465 [ # # ]:UBC 0 : ereport(ERROR,
466 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
467 : : errmsg_internal("invalid large object seek target: " INT64_FORMAT,
468 : : newoffset)));
469 : :
4206 tgl@sss.pgh.pa.us 470 :CBC 110 : obj_desc->offset = newoffset;
471 : 110 : return newoffset;
472 : : }
473 : :
474 : : int64
9715 bruce@momjian.us 475 : 24 : inv_tell(LargeObjectDesc *obj_desc)
476 : : {
9716 477 [ - + ]: 24 : Assert(PointerIsValid(obj_desc));
478 : :
479 : : /*
480 : : * We allow seek/tell if you have either read or write permission, so no
481 : : * need for a permission check here.
482 : : */
483 : :
9357 484 : 24 : return obj_desc->offset;
485 : : }
486 : :
487 : : int
9715 488 : 693 : inv_read(LargeObjectDesc *obj_desc, char *buf, int nbytes)
489 : : {
8424 490 : 693 : int nread = 0;
491 : : int64 n;
492 : : int64 off;
493 : : int len;
494 : 693 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
495 : : uint64 pageoff;
496 : : ScanKeyData skey[2];
497 : : SysScanDesc sd;
498 : : HeapTuple tuple;
499 : :
9716 500 [ - + ]: 693 : Assert(PointerIsValid(obj_desc));
501 [ - + ]: 693 : Assert(buf != NULL);
502 : :
2348 tgl@sss.pgh.pa.us 503 [ - + ]: 693 : if ((obj_desc->flags & IFS_RDLOCK) == 0)
2348 tgl@sss.pgh.pa.us 504 [ # # ]:UBC 0 : ereport(ERROR,
505 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
506 : : errmsg("permission denied for large object %u",
507 : : obj_desc->id)));
508 : :
8573 tgl@sss.pgh.pa.us 509 [ + + ]:CBC 693 : if (nbytes <= 0)
8575 bruce@momjian.us 510 : 4 : return 0;
511 : :
7200 tgl@sss.pgh.pa.us 512 : 689 : open_lo_relation();
513 : :
7459 514 : 689 : ScanKeyInit(&skey[0],
515 : : Anum_pg_largeobject_loid,
516 : : BTEqualStrategyNumber, F_OIDEQ,
517 : : ObjectIdGetDatum(obj_desc->id));
518 : :
519 : 689 : ScanKeyInit(&skey[1],
520 : : Anum_pg_largeobject_pageno,
521 : : BTGreaterEqualStrategyNumber, F_INT4GE,
522 : : Int32GetDatum(pageno));
523 : :
5846 524 : 689 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
525 : : obj_desc->snapshot, 2, skey);
526 : :
527 [ + + ]: 5219 : while ((tuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
528 : : {
529 : : Form_pg_largeobject data;
530 : : bytea *datafield;
531 : : bool pfreeit;
532 : :
5995 bruce@momjian.us 533 [ - + ]: 5061 : if (HeapTupleHasNulls(tuple)) /* paranoia */
6151 tgl@sss.pgh.pa.us 534 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
8000 tgl@sss.pgh.pa.us 535 :CBC 5061 : data = (Form_pg_largeobject) GETSTRUCT(tuple);
536 : :
537 : : /*
538 : : * We expect the indexscan will deliver pages in order. However,
539 : : * there may be missing pages if the LO contains unwritten "holes". We
540 : : * want missing sections to read out as zeroes.
541 : : */
4207 ishii@postgresql.org 542 : 5061 : pageoff = ((uint64) data->pageno) * LOBLKSIZE;
8573 tgl@sss.pgh.pa.us 543 [ + + ]: 5061 : if (pageoff > obj_desc->offset)
544 : : {
545 : 6 : n = pageoff - obj_desc->offset;
546 : 6 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
547 [ - + - - : 6 : MemSet(buf + nread, 0, n);
- - - - -
- ]
548 : 6 : nread += n;
549 : 6 : obj_desc->offset += n;
550 : : }
551 : :
552 [ + + ]: 5061 : if (nread < nbytes)
553 : : {
554 [ - + ]: 5058 : Assert(obj_desc->offset >= pageoff);
555 : 5058 : off = (int) (obj_desc->offset - pageoff);
556 [ + - - + ]: 5058 : Assert(off >= 0 && off < LOBLKSIZE);
557 : :
3601 558 : 5058 : getdatafield(data, &datafield, &len, &pfreeit);
8573 559 [ + + ]: 5058 : if (len > off)
560 : : {
561 : 5007 : n = len - off;
562 : 5007 : n = (n <= (nbytes - nread)) ? n : (nbytes - nread);
563 : 5007 : memcpy(buf + nread, VARDATA(datafield) + off, n);
564 : 5007 : nread += n;
565 : 5007 : obj_desc->offset += n;
566 : : }
567 [ + + ]: 5058 : if (pfreeit)
568 : 5020 : pfree(datafield);
569 : : }
570 : :
571 [ + + ]: 5061 : if (nread >= nbytes)
572 : 531 : break;
573 : : }
574 : :
5846 575 : 689 : systable_endscan_ordered(sd);
576 : :
9357 bruce@momjian.us 577 : 689 : return nread;
578 : : }
579 : :
580 : : int
6429 581 : 776 : inv_write(LargeObjectDesc *obj_desc, const char *buf, int nbytes)
582 : : {
8424 583 : 776 : int nwritten = 0;
584 : : int n;
585 : : int off;
586 : : int len;
587 : 776 : int32 pageno = (int32) (obj_desc->offset / LOBLKSIZE);
588 : : ScanKeyData skey[2];
589 : : SysScanDesc sd;
590 : : HeapTuple oldtuple;
591 : : Form_pg_largeobject olddata;
592 : : bool neednextpage;
593 : : bytea *datafield;
594 : : bool pfreeit;
595 : : union
596 : : {
597 : : bytea hdr;
598 : : /* this is to make the union big enough for a LO data chunk: */
599 : : char data[LOBLKSIZE + VARHDRSZ];
600 : : /* ensure union is aligned well enough: */
601 : : int32 align_it;
602 : : } workbuf;
6256 tgl@sss.pgh.pa.us 603 : 776 : char *workb = VARDATA(&workbuf.hdr);
604 : : HeapTuple newtup;
605 : : Datum values[Natts_pg_largeobject];
606 : : bool nulls[Natts_pg_largeobject];
607 : : bool replace[Natts_pg_largeobject];
608 : : CatalogIndexState indstate;
609 : :
9716 bruce@momjian.us 610 [ - + ]: 776 : Assert(PointerIsValid(obj_desc));
611 [ - + ]: 776 : Assert(buf != NULL);
612 : :
613 : : /* enforce writability because snapshot is probably wrong otherwise */
2348 tgl@sss.pgh.pa.us 614 [ - + ]: 776 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
2348 tgl@sss.pgh.pa.us 615 [ # # ]:UBC 0 : ereport(ERROR,
616 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
617 : : errmsg("permission denied for large object %u",
618 : : obj_desc->id)));
619 : :
8573 tgl@sss.pgh.pa.us 620 [ - + ]:CBC 776 : if (nbytes <= 0)
8573 tgl@sss.pgh.pa.us 621 :UBC 0 : return 0;
622 : :
623 : : /* this addition can't overflow because nbytes is only int32 */
4207 ishii@postgresql.org 624 [ - + ]:CBC 776 : if ((nbytes + obj_desc->offset) > MAX_LARGE_OBJECT_SIZE)
4206 tgl@sss.pgh.pa.us 625 [ # # ]:UBC 0 : ereport(ERROR,
626 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
627 : : errmsg("invalid large object write request size: %d",
628 : : nbytes)));
629 : :
7200 tgl@sss.pgh.pa.us 630 :CBC 776 : open_lo_relation();
631 : :
632 : 776 : indstate = CatalogOpenIndexes(lo_heap_r);
633 : :
7459 634 : 776 : ScanKeyInit(&skey[0],
635 : : Anum_pg_largeobject_loid,
636 : : BTEqualStrategyNumber, F_OIDEQ,
637 : : ObjectIdGetDatum(obj_desc->id));
638 : :
639 : 776 : ScanKeyInit(&skey[1],
640 : : Anum_pg_largeobject_pageno,
641 : : BTGreaterEqualStrategyNumber, F_INT4GE,
642 : : Int32GetDatum(pageno));
643 : :
5846 644 : 776 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
645 : : obj_desc->snapshot, 2, skey);
646 : :
8000 647 : 776 : oldtuple = NULL;
8573 648 : 776 : olddata = NULL;
649 : 776 : neednextpage = true;
650 : :
8575 bruce@momjian.us 651 [ + + ]: 4750 : while (nwritten < nbytes)
652 : : {
653 : : /*
654 : : * If possible, get next pre-existing page of the LO. We expect the
655 : : * indexscan will deliver these in order --- but there may be holes.
656 : : */
8573 tgl@sss.pgh.pa.us 657 [ + + ]: 3974 : if (neednextpage)
658 : : {
5846 659 [ + + ]: 779 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
660 : : {
2489 661 [ - + ]: 12 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
6151 tgl@sss.pgh.pa.us 662 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
8000 tgl@sss.pgh.pa.us 663 :CBC 12 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
664 [ - + ]: 12 : Assert(olddata->pageno >= pageno);
665 : : }
8573 666 : 779 : neednextpage = false;
667 : : }
668 : :
669 : : /*
670 : : * If we have a pre-existing page, see if it is the page we want to
671 : : * write, or a later one.
672 : : */
673 [ + + + - ]: 3974 : if (olddata != NULL && olddata->pageno == pageno)
674 : : {
675 : : /*
676 : : * Update an existing page with fresh data.
677 : : *
678 : : * First, load old data into workbuf
679 : : */
3601 680 : 12 : getdatafield(olddata, &datafield, &len, &pfreeit);
8573 681 : 12 : memcpy(workb, VARDATA(datafield), len);
682 [ + + ]: 12 : if (pfreeit)
683 : 9 : pfree(datafield);
684 : :
685 : : /*
686 : : * Fill any hole
687 : : */
688 : 12 : off = (int) (obj_desc->offset % LOBLKSIZE);
689 [ - + ]: 12 : if (off > len)
8573 tgl@sss.pgh.pa.us 690 [ # # # # :UBC 0 : MemSet(workb + len, 0, off - len);
# # # # #
# ]
691 : :
692 : : /*
693 : : * Insert appropriate portion of new data
694 : : */
8573 tgl@sss.pgh.pa.us 695 :CBC 12 : n = LOBLKSIZE - off;
696 : 12 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
697 : 12 : memcpy(workb + off, buf + nwritten, n);
698 : 12 : nwritten += n;
699 : 12 : obj_desc->offset += n;
700 : 12 : off += n;
701 : : /* compute valid length of new page */
702 : 12 : len = (len >= off) ? len : off;
6256 703 : 12 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
704 : :
705 : : /*
706 : : * Form and insert updated tuple
707 : : */
8573 708 : 12 : memset(values, 0, sizeof(values));
5642 709 : 12 : memset(nulls, false, sizeof(nulls));
710 : 12 : memset(replace, false, sizeof(replace));
8421 711 : 12 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
5642 712 : 12 : replace[Anum_pg_largeobject_data - 1] = true;
713 : 12 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
714 : : values, nulls, replace);
2629 715 : 12 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
716 : : indstate);
8573 717 : 12 : heap_freetuple(newtup);
718 : :
719 : : /*
720 : : * We're done with this old page.
721 : : */
8000 722 : 12 : oldtuple = NULL;
8573 723 : 12 : olddata = NULL;
724 : 12 : neednextpage = true;
725 : : }
726 : : else
727 : : {
728 : : /*
729 : : * Write a brand new page.
730 : : *
731 : : * First, fill any hole
732 : : */
733 : 3962 : off = (int) (obj_desc->offset % LOBLKSIZE);
734 [ + + ]: 3962 : if (off > 0)
735 [ - + - - : 3 : MemSet(workb, 0, off);
- - - - -
- ]
736 : :
737 : : /*
738 : : * Insert appropriate portion of new data
739 : : */
740 : 3962 : n = LOBLKSIZE - off;
741 : 3962 : n = (n <= (nbytes - nwritten)) ? n : (nbytes - nwritten);
742 : 3962 : memcpy(workb + off, buf + nwritten, n);
743 : 3962 : nwritten += n;
744 : 3962 : obj_desc->offset += n;
745 : : /* compute valid length of new page */
746 : 3962 : len = off + n;
6256 747 : 3962 : SET_VARSIZE(&workbuf.hdr, len + VARHDRSZ);
748 : :
749 : : /*
750 : : * Form and insert updated tuple
751 : : */
8573 752 : 3962 : memset(values, 0, sizeof(values));
5642 753 : 3962 : memset(nulls, false, sizeof(nulls));
8573 754 : 3962 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
755 : 3962 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
8421 756 : 3962 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
5642 757 : 3962 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
2629 758 : 3962 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
8573 759 : 3962 : heap_freetuple(newtup);
760 : : }
761 : 3974 : pageno++;
762 : : }
763 : :
5846 764 : 776 : systable_endscan_ordered(sd);
765 : :
7923 766 : 776 : CatalogCloseIndexes(indstate);
767 : :
768 : : /*
769 : : * Advance command counter so that my tuple updates will be seen by later
770 : : * large-object operations in this transaction.
771 : : */
8573 772 : 776 : CommandCounterIncrement();
773 : :
8575 bruce@momjian.us 774 : 776 : return nwritten;
775 : : }
776 : :
777 : : void
4207 ishii@postgresql.org 778 : 21 : inv_truncate(LargeObjectDesc *obj_desc, int64 len)
779 : : {
6252 bruce@momjian.us 780 : 21 : int32 pageno = (int32) (len / LOBLKSIZE);
781 : : int32 off;
782 : : ScanKeyData skey[2];
783 : : SysScanDesc sd;
784 : : HeapTuple oldtuple;
785 : : Form_pg_largeobject olddata;
786 : : union
787 : : {
788 : : bytea hdr;
789 : : /* this is to make the union big enough for a LO data chunk: */
790 : : char data[LOBLKSIZE + VARHDRSZ];
791 : : /* ensure union is aligned well enough: */
792 : : int32 align_it;
793 : : } workbuf;
5995 794 : 21 : char *workb = VARDATA(&workbuf.hdr);
795 : : HeapTuple newtup;
796 : : Datum values[Natts_pg_largeobject];
797 : : bool nulls[Natts_pg_largeobject];
798 : : bool replace[Natts_pg_largeobject];
799 : : CatalogIndexState indstate;
800 : :
6252 801 [ - + ]: 21 : Assert(PointerIsValid(obj_desc));
802 : :
803 : : /* enforce writability because snapshot is probably wrong otherwise */
2348 tgl@sss.pgh.pa.us 804 [ - + ]: 21 : if ((obj_desc->flags & IFS_WRLOCK) == 0)
2348 tgl@sss.pgh.pa.us 805 [ # # ]:UBC 0 : ereport(ERROR,
806 : : (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
807 : : errmsg("permission denied for large object %u",
808 : : obj_desc->id)));
809 : :
810 : : /*
811 : : * use errmsg_internal here because we don't want to expose INT64_FORMAT
812 : : * in translatable strings; doing better is not worth the trouble
813 : : */
4206 tgl@sss.pgh.pa.us 814 [ + - - + ]:CBC 21 : if (len < 0 || len > MAX_LARGE_OBJECT_SIZE)
4206 tgl@sss.pgh.pa.us 815 [ # # ]:UBC 0 : ereport(ERROR,
816 : : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
817 : : errmsg_internal("invalid large object truncation target: " INT64_FORMAT,
818 : : len)));
819 : :
6252 bruce@momjian.us 820 :CBC 21 : open_lo_relation();
821 : :
822 : 21 : indstate = CatalogOpenIndexes(lo_heap_r);
823 : :
824 : : /*
825 : : * Set up to find all pages with desired loid and pageno >= target
826 : : */
827 : 21 : ScanKeyInit(&skey[0],
828 : : Anum_pg_largeobject_loid,
829 : : BTEqualStrategyNumber, F_OIDEQ,
830 : : ObjectIdGetDatum(obj_desc->id));
831 : :
832 : 21 : ScanKeyInit(&skey[1],
833 : : Anum_pg_largeobject_pageno,
834 : : BTGreaterEqualStrategyNumber, F_INT4GE,
835 : : Int32GetDatum(pageno));
836 : :
5846 tgl@sss.pgh.pa.us 837 : 21 : sd = systable_beginscan_ordered(lo_heap_r, lo_index_r,
838 : : obj_desc->snapshot, 2, skey);
839 : :
840 : : /*
841 : : * If possible, get the page the truncation point is in. The truncation
842 : : * point may be beyond the end of the LO or in a hole.
843 : : */
6252 bruce@momjian.us 844 : 21 : olddata = NULL;
5846 tgl@sss.pgh.pa.us 845 [ + + ]: 21 : if ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
846 : : {
2489 847 [ - + ]: 12 : if (HeapTupleHasNulls(oldtuple)) /* paranoia */
6151 tgl@sss.pgh.pa.us 848 [ # # ]:UBC 0 : elog(ERROR, "null field found in pg_largeobject");
6252 bruce@momjian.us 849 :CBC 12 : olddata = (Form_pg_largeobject) GETSTRUCT(oldtuple);
850 [ - + ]: 12 : Assert(olddata->pageno >= pageno);
851 : : }
852 : :
853 : : /*
854 : : * If we found the page of the truncation point we need to truncate the
855 : : * data in it. Otherwise if we're in a hole, we need to create a page to
856 : : * mark the end of data.
857 : : */
858 [ + + + + ]: 21 : if (olddata != NULL && olddata->pageno == pageno)
859 : 6 : {
860 : : /* First, load old data into workbuf */
861 : : bytea *datafield;
862 : : int pagelen;
863 : : bool pfreeit;
864 : :
3601 tgl@sss.pgh.pa.us 865 : 6 : getdatafield(olddata, &datafield, &pagelen, &pfreeit);
6252 bruce@momjian.us 866 : 6 : memcpy(workb, VARDATA(datafield), pagelen);
867 [ + + ]: 6 : if (pfreeit)
5995 868 : 3 : pfree(datafield);
869 : :
870 : : /*
871 : : * Fill any hole
872 : : */
6252 873 : 6 : off = len % LOBLKSIZE;
874 [ + + ]: 6 : if (off > pagelen)
5995 875 [ + - - + : 3 : MemSet(workb + pagelen, 0, off - pagelen);
- - - - -
- ]
876 : :
877 : : /* compute length of new page */
6252 878 : 6 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
879 : :
880 : : /*
881 : : * Form and insert updated tuple
882 : : */
883 : 6 : memset(values, 0, sizeof(values));
5642 tgl@sss.pgh.pa.us 884 : 6 : memset(nulls, false, sizeof(nulls));
885 : 6 : memset(replace, false, sizeof(replace));
6252 bruce@momjian.us 886 : 6 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
5642 tgl@sss.pgh.pa.us 887 : 6 : replace[Anum_pg_largeobject_data - 1] = true;
888 : 6 : newtup = heap_modify_tuple(oldtuple, RelationGetDescr(lo_heap_r),
889 : : values, nulls, replace);
2629 890 : 6 : CatalogTupleUpdateWithInfo(lo_heap_r, &newtup->t_self, newtup,
891 : : indstate);
6252 bruce@momjian.us 892 : 6 : heap_freetuple(newtup);
893 : : }
894 : : else
895 : : {
896 : : /*
897 : : * If the first page we found was after the truncation point, we're in
898 : : * a hole that we'll fill, but we need to delete the later page
899 : : * because the loop below won't visit it again.
900 : : */
4827 tgl@sss.pgh.pa.us 901 [ + + ]: 15 : if (olddata != NULL)
902 : : {
903 [ - + ]: 6 : Assert(olddata->pageno > pageno);
2629 904 : 6 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
905 : : }
906 : :
907 : : /*
908 : : * Write a brand new page.
909 : : *
910 : : * Fill the hole up to the truncation point
911 : : */
6252 bruce@momjian.us 912 : 15 : off = len % LOBLKSIZE;
913 [ + - ]: 15 : if (off > 0)
914 [ - + - - : 15 : MemSet(workb, 0, off);
- - - - -
- ]
915 : :
916 : : /* compute length of new page */
917 : 15 : SET_VARSIZE(&workbuf.hdr, off + VARHDRSZ);
918 : :
919 : : /*
920 : : * Form and insert new tuple
921 : : */
922 : 15 : memset(values, 0, sizeof(values));
5642 tgl@sss.pgh.pa.us 923 : 15 : memset(nulls, false, sizeof(nulls));
6252 bruce@momjian.us 924 : 15 : values[Anum_pg_largeobject_loid - 1] = ObjectIdGetDatum(obj_desc->id);
925 : 15 : values[Anum_pg_largeobject_pageno - 1] = Int32GetDatum(pageno);
926 : 15 : values[Anum_pg_largeobject_data - 1] = PointerGetDatum(&workbuf);
5642 tgl@sss.pgh.pa.us 927 : 15 : newtup = heap_form_tuple(lo_heap_r->rd_att, values, nulls);
2629 928 : 15 : CatalogTupleInsertWithInfo(lo_heap_r, newtup, indstate);
6252 bruce@momjian.us 929 : 15 : heap_freetuple(newtup);
930 : : }
931 : :
932 : : /*
933 : : * Delete any pages after the truncation point. If the initial search
934 : : * didn't find a page, then of course there's nothing more to do.
935 : : */
4827 tgl@sss.pgh.pa.us 936 [ + + ]: 21 : if (olddata != NULL)
937 : : {
938 [ + + ]: 15 : while ((oldtuple = systable_getnext_ordered(sd, ForwardScanDirection)) != NULL)
939 : : {
2629 940 : 3 : CatalogTupleDelete(lo_heap_r, &oldtuple->t_self);
941 : : }
942 : : }
943 : :
5846 944 : 21 : systable_endscan_ordered(sd);
945 : :
6252 bruce@momjian.us 946 : 21 : CatalogCloseIndexes(indstate);
947 : :
948 : : /*
949 : : * Advance command counter so that tuple updates will be seen by later
950 : : * large-object operations in this transaction.
951 : : */
952 : 21 : CommandCounterIncrement();
953 : 21 : }
|