Skip to content

Commit 3c90569

Browse files
committed
Rename bbstreamer to astreamer.
I (rhaas) intended "bbstreamer" to stand for "base backup streamer," but that implies that this infrastructure can only ever be used by pg_basebackup. In fact, it is a generally useful way of streaming data from a tar or compressed tar file, and it could be extended to work with other archive formats as well if we ever wanted to do that. Hence, rename it to "astreamer" (archive streamer) in preparation for reusing the infrastructure from pg_verifybackup (and perhaps eventually also other utilities, such as pg_combinebackup or pg_waldump). This is purely a renaming commit. Comment adjustments and relocation of the actual code to someplace from which it can be reused are left to future commits. Amul Sul, reviewed by Sravan Kumar and by me. Discussion: http://postgr.es/m/CAAJ_b94StvLWrc_p4q-f7n3OPfr6GhL8_XuAg2aAaYZp1tF-nw@mail.gmail.com
1 parent 66e9444 commit 3c90569

13 files changed

+875
-875
lines changed

src/bin/pg_basebackup/Makefile

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ OBJS = \
3737

3838
BBOBJS = \
3939
pg_basebackup.o \
40-
bbstreamer_file.o \
41-
bbstreamer_gzip.o \
42-
bbstreamer_inject.o \
43-
bbstreamer_lz4.o \
44-
bbstreamer_tar.o \
45-
bbstreamer_zstd.o
40+
astreamer_file.o \
41+
astreamer_gzip.o \
42+
astreamer_inject.o \
43+
astreamer_lz4.o \
44+
astreamer_tar.o \
45+
astreamer_zstd.o
4646

4747
all: pg_basebackup pg_createsubscriber pg_receivewal pg_recvlogical
4848

src/bin/pg_basebackup/astreamer.h

Lines changed: 226 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,226 @@
1+
/*-------------------------------------------------------------------------
2+
*
3+
* astreamer.h
4+
*
5+
* Each tar archive returned by the server is passed to one or more
6+
* astreamer objects for further processing. The astreamer may do
7+
* something simple, like write the archive to a file, perhaps after
8+
* compressing it, but it can also do more complicated things, like
9+
* annotating the byte stream to indicate which parts of the data
10+
* correspond to tar headers or trailing padding, vs. which parts are
11+
* payload data. A subsequent astreamer may use this information to
12+
* make further decisions about how to process the data; for example,
13+
* it might choose to modify the archive contents.
14+
*
15+
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
16+
*
17+
* IDENTIFICATION
18+
* src/bin/pg_basebackup/astreamer.h
19+
*-------------------------------------------------------------------------
20+
*/
21+
22+
#ifndef ASTREAMER_H
23+
#define ASTREAMER_H
24+
25+
#include "common/compression.h"
26+
#include "lib/stringinfo.h"
27+
#include "pqexpbuffer.h"
28+
29+
struct astreamer;
30+
struct astreamer_ops;
31+
typedef struct astreamer astreamer;
32+
typedef struct astreamer_ops astreamer_ops;
33+
34+
/*
35+
* Each chunk of archive data passed to a astreamer is classified into one
36+
* of these categories. When data is first received from the remote server,
37+
* each chunk will be categorized as ASTREAMER_UNKNOWN, and the chunks will
38+
* be of whatever size the remote server chose to send.
39+
*
40+
* If the archive is parsed (e.g. see astreamer_tar_parser_new()), then all
41+
* chunks should be labelled as one of the other types listed here. In
42+
* addition, there should be exactly one ASTREAMER_MEMBER_HEADER chunk and
43+
* exactly one ASTREAMER_MEMBER_TRAILER chunk per archive member, even if
44+
* that means a zero-length call. There can be any number of
45+
* ASTREAMER_MEMBER_CONTENTS chunks in between those calls. There
46+
* should exactly ASTREAMER_ARCHIVE_TRAILER chunk, and it should follow the
47+
* last ASTREAMER_MEMBER_TRAILER chunk.
48+
*
49+
* In theory, we could need other classifications here, such as a way of
50+
* indicating an archive header, but the "tar" format doesn't need anything
51+
* else, so for the time being there's no point.
52+
*/
53+
typedef enum
54+
{
55+
ASTREAMER_UNKNOWN,
56+
ASTREAMER_MEMBER_HEADER,
57+
ASTREAMER_MEMBER_CONTENTS,
58+
ASTREAMER_MEMBER_TRAILER,
59+
ASTREAMER_ARCHIVE_TRAILER,
60+
} astreamer_archive_context;
61+
62+
/*
63+
* Each chunk of data that is classified as ASTREAMER_MEMBER_HEADER,
64+
* ASTREAMER_MEMBER_CONTENTS, or ASTREAMER_MEMBER_TRAILER should also
65+
* pass a pointer to an instance of this struct. The details are expected
66+
* to be present in the archive header and used to fill the struct, after
67+
* which all subsequent calls for the same archive member are expected to
68+
* pass the same details.
69+
*/
70+
typedef struct
71+
{
72+
char pathname[MAXPGPATH];
73+
pgoff_t size;
74+
mode_t mode;
75+
uid_t uid;
76+
gid_t gid;
77+
bool is_directory;
78+
bool is_link;
79+
char linktarget[MAXPGPATH];
80+
} astreamer_member;
81+
82+
/*
83+
* Generally, each type of astreamer will define its own struct, but the
84+
* first element should be 'astreamer base'. A astreamer that does not
85+
* require any additional private data could use this structure directly.
86+
*
87+
* bbs_ops is a pointer to the astreamer_ops object which contains the
88+
* function pointers appropriate to this type of astreamer.
89+
*
90+
* bbs_next is a pointer to the successor astreamer, for those types of
91+
* astreamer which forward data to a successor. It need not be used and
92+
* should be set to NULL when not relevant.
93+
*
94+
* bbs_buffer is a buffer for accumulating data for temporary storage. Each
95+
* type of astreamer makes its own decisions about whether and how to use
96+
* this buffer.
97+
*/
98+
struct astreamer
99+
{
100+
const astreamer_ops *bbs_ops;
101+
astreamer *bbs_next;
102+
StringInfoData bbs_buffer;
103+
};
104+
105+
/*
106+
* There are three callbacks for a astreamer. The 'content' callback is
107+
* called repeatedly, as described in the astreamer_archive_context comments.
108+
* Then, the 'finalize' callback is called once at the end, to give the
109+
* astreamer a chance to perform cleanup such as closing files. Finally,
110+
* because this code is running in a frontend environment where, as of this
111+
* writing, there are no memory contexts, the 'free' callback is called to
112+
* release memory. These callbacks should always be invoked using the static
113+
* inline functions defined below.
114+
*/
115+
struct astreamer_ops
116+
{
117+
void (*content) (astreamer *streamer, astreamer_member *member,
118+
const char *data, int len,
119+
astreamer_archive_context context);
120+
void (*finalize) (astreamer *streamer);
121+
void (*free) (astreamer *streamer);
122+
};
123+
124+
/* Send some content to a astreamer. */
125+
static inline void
126+
astreamer_content(astreamer *streamer, astreamer_member *member,
127+
const char *data, int len,
128+
astreamer_archive_context context)
129+
{
130+
Assert(streamer != NULL);
131+
streamer->bbs_ops->content(streamer, member, data, len, context);
132+
}
133+
134+
/* Finalize a astreamer. */
135+
static inline void
136+
astreamer_finalize(astreamer *streamer)
137+
{
138+
Assert(streamer != NULL);
139+
streamer->bbs_ops->finalize(streamer);
140+
}
141+
142+
/* Free a astreamer. */
143+
static inline void
144+
astreamer_free(astreamer *streamer)
145+
{
146+
Assert(streamer != NULL);
147+
streamer->bbs_ops->free(streamer);
148+
}
149+
150+
/*
151+
* This is a convenience method for use when implementing a astreamer; it is
152+
* not for use by outside callers. It adds the amount of data specified by
153+
* 'nbytes' to the astreamer's buffer and adjusts '*len' and '*data'
154+
* accordingly.
155+
*/
156+
static inline void
157+
astreamer_buffer_bytes(astreamer *streamer, const char **data, int *len,
158+
int nbytes)
159+
{
160+
Assert(nbytes <= *len);
161+
162+
appendBinaryStringInfo(&streamer->bbs_buffer, *data, nbytes);
163+
*len -= nbytes;
164+
*data += nbytes;
165+
}
166+
167+
/*
168+
* This is a convenience method for use when implementing a astreamer; it is
169+
* not for use by outsider callers. It attempts to add enough data to the
170+
* astreamer's buffer to reach a length of target_bytes and adjusts '*len'
171+
* and '*data' accordingly. It returns true if the target length has been
172+
* reached and false otherwise.
173+
*/
174+
static inline bool
175+
astreamer_buffer_until(astreamer *streamer, const char **data, int *len,
176+
int target_bytes)
177+
{
178+
int buflen = streamer->bbs_buffer.len;
179+
180+
if (buflen >= target_bytes)
181+
{
182+
/* Target length already reached; nothing to do. */
183+
return true;
184+
}
185+
186+
if (buflen + *len < target_bytes)
187+
{
188+
/* Not enough data to reach target length; buffer all of it. */
189+
astreamer_buffer_bytes(streamer, data, len, *len);
190+
return false;
191+
}
192+
193+
/* Buffer just enough to reach the target length. */
194+
astreamer_buffer_bytes(streamer, data, len, target_bytes - buflen);
195+
return true;
196+
}
197+
198+
/*
199+
* Functions for creating astreamer objects of various types. See the header
200+
* comments for each of these functions for details.
201+
*/
202+
extern astreamer *astreamer_plain_writer_new(char *pathname, FILE *file);
203+
extern astreamer *astreamer_gzip_writer_new(char *pathname, FILE *file,
204+
pg_compress_specification *compress);
205+
extern astreamer *astreamer_extractor_new(const char *basepath,
206+
const char *(*link_map) (const char *),
207+
void (*report_output_file) (const char *));
208+
209+
extern astreamer *astreamer_gzip_decompressor_new(astreamer *next);
210+
extern astreamer *astreamer_lz4_compressor_new(astreamer *next,
211+
pg_compress_specification *compress);
212+
extern astreamer *astreamer_lz4_decompressor_new(astreamer *next);
213+
extern astreamer *astreamer_zstd_compressor_new(astreamer *next,
214+
pg_compress_specification *compress);
215+
extern astreamer *astreamer_zstd_decompressor_new(astreamer *next);
216+
extern astreamer *astreamer_tar_parser_new(astreamer *next);
217+
extern astreamer *astreamer_tar_terminator_new(astreamer *next);
218+
extern astreamer *astreamer_tar_archiver_new(astreamer *next);
219+
220+
extern astreamer *astreamer_recovery_injector_new(astreamer *next,
221+
bool is_recovery_guc_supported,
222+
PQExpBuffer recoveryconfcontents);
223+
extern void astreamer_inject_file(astreamer *streamer, char *pathname,
224+
char *data, int len);
225+
226+
#endif

0 commit comments

Comments
 (0)