42
42
#include <sys/stat.h>
43
43
#include <unistd.h>
44
44
45
+ #include "access/xact.h"
45
46
#include "libpq/be-fsstubs.h"
46
47
#include "libpq/libpq-fs.h"
47
48
#include "miscadmin.h"
50
51
#include "utils/acl.h"
51
52
#include "utils/builtins.h"
52
53
#include "utils/memutils.h"
54
+ #include "utils/snapmgr.h"
53
55
54
56
/* define this to enable debug logging */
55
57
/* #define FSDB 1 */
67
69
static LargeObjectDesc * * cookies = NULL ;
68
70
static int cookies_size = 0 ;
69
71
72
+ static bool lo_cleanup_needed = false;
70
73
static MemoryContext fscxt = NULL ;
71
74
72
- #define CreateFSContext () \
73
- do { \
74
- if (fscxt == NULL) \
75
- fscxt = AllocSetContextCreate(TopMemoryContext, \
76
- "Filesystem", \
77
- ALLOCSET_DEFAULT_SIZES); \
78
- } while (0)
79
-
80
-
81
- static int newLOfd (LargeObjectDesc * lobjCookie );
82
- static void deleteLOfd (int fd );
75
+ static int newLOfd (void );
76
+ static void closeLOfd (int fd );
83
77
static Oid lo_import_internal (text * filename , Oid lobjOid );
84
78
85
79
@@ -99,11 +93,26 @@ be_lo_open(PG_FUNCTION_ARGS)
99
93
elog (DEBUG4 , "lo_open(%u,%d)" , lobjId , mode );
100
94
#endif
101
95
102
- CreateFSContext ();
96
+ /*
97
+ * Allocate a large object descriptor first. This will also create
98
+ * 'fscxt' if this is the first LO opened in this transaction.
99
+ */
100
+ fd = newLOfd ();
103
101
104
102
lobjDesc = inv_open (lobjId , mode , fscxt );
103
+ lobjDesc -> subid = GetCurrentSubTransactionId ();
104
+
105
+ /*
106
+ * We must register the snapshot in TopTransaction's resowner so that it
107
+ * stays alive until the LO is closed rather than until the current portal
108
+ * shuts down.
109
+ */
110
+ if (lobjDesc -> snapshot )
111
+ lobjDesc -> snapshot = RegisterSnapshotOnOwner (lobjDesc -> snapshot ,
112
+ TopTransactionResourceOwner );
105
113
106
- fd = newLOfd (lobjDesc );
114
+ Assert (cookies [fd ] == NULL );
115
+ cookies [fd ] = lobjDesc ;
107
116
108
117
PG_RETURN_INT32 (fd );
109
118
}
@@ -122,9 +131,7 @@ be_lo_close(PG_FUNCTION_ARGS)
122
131
elog (DEBUG4 , "lo_close(%d)" , fd );
123
132
#endif
124
133
125
- inv_close (cookies [fd ]);
126
-
127
- deleteLOfd (fd );
134
+ closeLOfd (fd );
128
135
129
136
PG_RETURN_INT32 (0 );
130
137
}
@@ -238,12 +245,7 @@ be_lo_creat(PG_FUNCTION_ARGS)
238
245
{
239
246
Oid lobjId ;
240
247
241
- /*
242
- * We don't actually need to store into fscxt, but create it anyway to
243
- * ensure that AtEOXact_LargeObject knows there is state to clean up
244
- */
245
- CreateFSContext ();
246
-
248
+ lo_cleanup_needed = true;
247
249
lobjId = inv_create (InvalidOid );
248
250
249
251
PG_RETURN_OID (lobjId );
@@ -254,12 +256,7 @@ be_lo_create(PG_FUNCTION_ARGS)
254
256
{
255
257
Oid lobjId = PG_GETARG_OID (0 );
256
258
257
- /*
258
- * We don't actually need to store into fscxt, but create it anyway to
259
- * ensure that AtEOXact_LargeObject knows there is state to clean up
260
- */
261
- CreateFSContext ();
262
-
259
+ lo_cleanup_needed = true;
263
260
lobjId = inv_create (lobjId );
264
261
265
262
PG_RETURN_OID (lobjId );
@@ -330,16 +327,13 @@ be_lo_unlink(PG_FUNCTION_ARGS)
330
327
for (i = 0 ; i < cookies_size ; i ++ )
331
328
{
332
329
if (cookies [i ] != NULL && cookies [i ]-> id == lobjId )
333
- {
334
- inv_close (cookies [i ]);
335
- deleteLOfd (i );
336
- }
330
+ closeLOfd (i );
337
331
}
338
332
}
339
333
340
334
/*
341
335
* inv_drop does not create a need for end-of-transaction cleanup and
342
- * hence we don't need to have created fscxt .
336
+ * hence we don't need to set lo_cleanup_needed .
343
337
*/
344
338
PG_RETURN_INT32 (inv_drop (lobjId ));
345
339
}
@@ -419,8 +413,6 @@ lo_import_internal(text *filename, Oid lobjOid)
419
413
LargeObjectDesc * lobj ;
420
414
Oid oid ;
421
415
422
- CreateFSContext ();
423
-
424
416
/*
425
417
* open the file to be read in
426
418
*/
@@ -435,12 +427,13 @@ lo_import_internal(text *filename, Oid lobjOid)
435
427
/*
436
428
* create an inversion object
437
429
*/
430
+ lo_cleanup_needed = true;
438
431
oid = inv_create (lobjOid );
439
432
440
433
/*
441
434
* read in from the filesystem and write to the inversion object
442
435
*/
443
- lobj = inv_open (oid , INV_WRITE , fscxt );
436
+ lobj = inv_open (oid , INV_WRITE , CurrentMemoryContext );
444
437
445
438
while ((nbytes = read (fd , buf , BUFSIZE )) > 0 )
446
439
{
@@ -482,12 +475,11 @@ be_lo_export(PG_FUNCTION_ARGS)
482
475
LargeObjectDesc * lobj ;
483
476
mode_t oumask ;
484
477
485
- CreateFSContext ();
486
-
487
478
/*
488
479
* open the inversion object (no need to test for failure)
489
480
*/
490
- lobj = inv_open (lobjId , INV_READ , fscxt );
481
+ lo_cleanup_needed = true;
482
+ lobj = inv_open (lobjId , INV_READ , CurrentMemoryContext );
491
483
492
484
/*
493
485
* open the file to be written to
@@ -592,20 +584,22 @@ AtEOXact_LargeObject(bool isCommit)
592
584
{
593
585
int i ;
594
586
595
- if (fscxt == NULL )
587
+ if (! lo_cleanup_needed )
596
588
return ; /* no LO operations in this xact */
597
589
598
590
/*
599
591
* Close LO fds and clear cookies array so that LO fds are no longer good.
600
- * On abort we skip the close step.
592
+ * The memory context and resource owner holding them are going away at
593
+ * the end-of-transaction anyway, but on commit, we need to close them to
594
+ * avoid warnings about leaked resources at commit. On abort we can skip
595
+ * this step.
601
596
*/
602
- for ( i = 0 ; i < cookies_size ; i ++ )
597
+ if ( isCommit )
603
598
{
604
- if ( cookies [ i ] != NULL )
599
+ for ( i = 0 ; i < cookies_size ; i ++ )
605
600
{
606
- if (isCommit )
607
- inv_close (cookies [i ]);
608
- deleteLOfd (i );
601
+ if (cookies [i ] != NULL )
602
+ closeLOfd (i );
609
603
}
610
604
}
611
605
@@ -614,11 +608,14 @@ AtEOXact_LargeObject(bool isCommit)
614
608
cookies_size = 0 ;
615
609
616
610
/* Release the LO memory context to prevent permanent memory leaks. */
617
- MemoryContextDelete (fscxt );
611
+ if (fscxt )
612
+ MemoryContextDelete (fscxt );
618
613
fscxt = NULL ;
619
614
620
615
/* Give inv_api.c a chance to clean up, too */
621
616
close_lo_relation (isCommit );
617
+
618
+ lo_cleanup_needed = false;
622
619
}
623
620
624
621
/*
@@ -646,14 +643,7 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
646
643
if (isCommit )
647
644
lo -> subid = parentSubid ;
648
645
else
649
- {
650
- /*
651
- * Make sure we do not call inv_close twice if it errors out
652
- * for some reason. Better a leak than a crash.
653
- */
654
- deleteLOfd (i );
655
- inv_close (lo );
656
- }
646
+ closeLOfd (i );
657
647
}
658
648
}
659
649
}
@@ -663,19 +653,22 @@ AtEOSubXact_LargeObject(bool isCommit, SubTransactionId mySubid,
663
653
*****************************************************************************/
664
654
665
655
static int
666
- newLOfd (LargeObjectDesc * lobjCookie )
656
+ newLOfd (void )
667
657
{
668
658
int i ,
669
659
newsize ;
670
660
661
+ lo_cleanup_needed = true;
662
+ if (fscxt == NULL )
663
+ fscxt = AllocSetContextCreate (TopMemoryContext ,
664
+ "Filesystem" ,
665
+ ALLOCSET_DEFAULT_SIZES );
666
+
671
667
/* Try to find a free slot */
672
668
for (i = 0 ; i < cookies_size ; i ++ )
673
669
{
674
670
if (cookies [i ] == NULL )
675
- {
676
- cookies [i ] = lobjCookie ;
677
671
return i ;
678
- }
679
672
}
680
673
681
674
/* No free slot, so make the array bigger */
@@ -700,15 +693,25 @@ newLOfd(LargeObjectDesc *lobjCookie)
700
693
cookies_size = newsize ;
701
694
}
702
695
703
- Assert (cookies [i ] == NULL );
704
- cookies [i ] = lobjCookie ;
705
696
return i ;
706
697
}
707
698
708
699
static void
709
- deleteLOfd (int fd )
700
+ closeLOfd (int fd )
710
701
{
702
+ LargeObjectDesc * lobj ;
703
+
704
+ /*
705
+ * Make sure we do not try to free twice if this errors out for some
706
+ * reason. Better a leak than a crash.
707
+ */
708
+ lobj = cookies [fd ];
711
709
cookies [fd ] = NULL ;
710
+
711
+ if (lobj -> snapshot )
712
+ UnregisterSnapshotFromOwner (lobj -> snapshot ,
713
+ TopTransactionResourceOwner );
714
+ inv_close (lobj );
712
715
}
713
716
714
717
/*****************************************************************************
@@ -727,13 +730,8 @@ lo_get_fragment_internal(Oid loOid, int64 offset, int32 nbytes)
727
730
int total_read PG_USED_FOR_ASSERTS_ONLY ;
728
731
bytea * result = NULL ;
729
732
730
- /*
731
- * We don't actually need to store into fscxt, but create it anyway to
732
- * ensure that AtEOXact_LargeObject knows there is state to clean up
733
- */
734
- CreateFSContext ();
735
-
736
- loDesc = inv_open (loOid , INV_READ , fscxt );
733
+ lo_cleanup_needed = true;
734
+ loDesc = inv_open (loOid , INV_READ , CurrentMemoryContext );
737
735
738
736
/*
739
737
* Compute number of bytes we'll actually read, accommodating nbytes == -1
@@ -817,10 +815,9 @@ be_lo_from_bytea(PG_FUNCTION_ARGS)
817
815
LargeObjectDesc * loDesc ;
818
816
int written PG_USED_FOR_ASSERTS_ONLY ;
819
817
820
- CreateFSContext ();
821
-
818
+ lo_cleanup_needed = true;
822
819
loOid = inv_create (loOid );
823
- loDesc = inv_open (loOid , INV_WRITE , fscxt );
820
+ loDesc = inv_open (loOid , INV_WRITE , CurrentMemoryContext );
824
821
written = inv_write (loDesc , VARDATA_ANY (str ), VARSIZE_ANY_EXHDR (str ));
825
822
Assert (written == VARSIZE_ANY_EXHDR (str ));
826
823
inv_close (loDesc );
@@ -840,9 +837,8 @@ be_lo_put(PG_FUNCTION_ARGS)
840
837
LargeObjectDesc * loDesc ;
841
838
int written PG_USED_FOR_ASSERTS_ONLY ;
842
839
843
- CreateFSContext ();
844
-
845
- loDesc = inv_open (loOid , INV_WRITE , fscxt );
840
+ lo_cleanup_needed = true;
841
+ loDesc = inv_open (loOid , INV_WRITE , CurrentMemoryContext );
846
842
847
843
/* Permission check */
848
844
if (!lo_compat_privileges &&
0 commit comments