Skip to content

Commit 9471875

Browse files
committed
Add stream mode for save WAL during backup process.
1 parent ccd4f48 commit 9471875

File tree

14 files changed

+353
-23
lines changed

14 files changed

+353
-23
lines changed

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ OBJS = backup.o \
1616
datapagemap.o \
1717
parsexlog.o \
1818
xlogreader.o \
19+
streamutil.o \
20+
receivelog.o \
1921
pgut/pgut.o \
2022
pgut/pgut-port.o
2123

@@ -39,7 +41,7 @@ PG_LIBS = $(libpq_pgport) ${PTHREAD_LIBS} ${PTHREAD_CFLAGS}
3941

4042
REGRESS = init option show delete backup restore
4143

42-
all: checksrcdir docs datapagemap.h pg_arman
44+
all: checksrcdir docs datapagemap.h receivelog.h streamutil.h pg_arman
4345

4446
# This rule's only purpose is to give the user instructions on how to pass
4547
# the path to PostgreSQL source tree to the makefile.

backup.c

Lines changed: 207 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,19 @@
2323
#include "pgut/pgut-port.h"
2424
#include "storage/bufpage.h"
2525
#include "datapagemap.h"
26+
#include "streamutil.h"
27+
#include "receivelog.h"
2628

2729
/* wait 10 sec until WAL archive complete */
28-
#define TIMEOUT_ARCHIVE 10
30+
#define TIMEOUT_ARCHIVE 10
2931

3032
/* Server version */
3133
static int server_version = 0;
3234

33-
static bool in_backup = false; /* TODO: more robust logic */
35+
static bool in_backup = false; /* TODO: more robust logic */
36+
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
37+
static XLogRecPtr stop_backup_lsn = InvalidXLogRecPtr;
38+
const char *progname = "pg_arman";
3439

3540
/* list of files contained in backup */
3641
parray *backup_files_list;
@@ -71,6 +76,15 @@ static void create_file_list(parray *files,
7176
bool is_append);
7277
static void wait_for_archive(pgBackup *backup, const char *sql);
7378
static void make_pagemap_from_ptrack(parray *files);
79+
static void StreamLog(void *arg);
80+
81+
82+
#define disconnect_and_exit(code) \
83+
{ \
84+
if (conn != NULL) PQfinish(conn); \
85+
exit(code); \
86+
}
87+
7488

7589
/*
7690
* Take a backup of database and return the list of files backed up.
@@ -82,12 +96,14 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
8296
parray *prev_files = NULL; /* file list of previous database backup */
8397
FILE *fp;
8498
char path[MAXPGPATH];
99+
char dst_backup_path[MAXPGPATH];
85100
char label[1024];
86101
XLogRecPtr *lsn = NULL;
87102
char prev_file_txt[MAXPGPATH]; /* path of the previous backup
88103
* list file */
89104
bool has_backup_label = true; /* flag if backup_label is there */
90105
pthread_t backup_threads[num_threads];
106+
pthread_t stream_thread;
91107
backup_files_args *backup_threads_args[num_threads];
92108

93109
/* repack the options */
@@ -129,9 +145,19 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
129145
"or validate existing one.");
130146
}
131147

148+
/* clear ptrack files for FULL and DIFF backup */
132149
if (current.backup_mode != BACKUP_MODE_DIFF_PTRACK)
133150
pg_ptrack_clear();
134151

152+
/* start stream replication */
153+
if (stream_wal)
154+
{
155+
pgBackupGetPath(&current, path, lengthof(path), DATABASE_DIR);
156+
join_path_components(dst_backup_path, path, "pg_xlog");
157+
dir_create_dir(dst_backup_path, DIR_PERMISSION);
158+
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path);
159+
}
160+
135161
/* notify start of backup to PostgreSQL server */
136162
time2iso(label, lengthof(label), current.start_time);
137163
strncat(label, " with pg_arman", lengthof(label));
@@ -322,6 +348,35 @@ do_backup_database(parray *backup_list, pgBackupOption bkupopt)
322348
/* Notify end of backup */
323349
pg_stop_backup(&current);
324350

351+
if (stream_wal)
352+
{
353+
parray *list_file;
354+
char pg_xlog_path[MAXPGPATH];
355+
356+
/* We expect the completion of stream */
357+
pthread_join(stream_thread, NULL);
358+
359+
/* Scan backup pg_xlog dir */
360+
list_file = parray_new();
361+
join_path_components(pg_xlog_path, path, "pg_xlog");
362+
dir_list_file(list_file, pg_xlog_path, NULL, true, false);
363+
364+
/* Remove file path root prefix and calc meta */
365+
for (i = 0; i < parray_num(list_file); i++)
366+
{
367+
pgFile *file = (pgFile *)parray_get(list_file, i);
368+
369+
calc_file(file);
370+
if (strstr(file->path, path) == file->path)
371+
{
372+
char *ptr = file->path;
373+
file->path = pstrdup(JoinPathEnd(ptr, path));
374+
free(ptr);
375+
}
376+
}
377+
parray_concat(backup_files_list, list_file);
378+
}
379+
325380
/* Create file list */
326381
create_file_list(backup_files_list, pgdata, DATABASE_FILE_LIST, NULL, false);
327382

@@ -549,31 +604,31 @@ static void
549604
pg_ptrack_clear(void)
550605
{
551606
PGresult *res_db, *res;
552-
const char *old_dbname = dbname;
607+
const char *old_dbname = pgut_dbname;
553608
int i;
554609

555610
reconnect();
556611
res_db = execute("SELECT datname FROM pg_database", 0, NULL);
557612
disconnect();
558613
for(i=0; i < PQntuples(res_db); i++)
559614
{
560-
dbname = PQgetvalue(res_db, i, 0);
561-
if (!strcmp(dbname, "template0"))
615+
pgut_dbname = PQgetvalue(res_db, i, 0);
616+
if (!strcmp(pgut_dbname, "template0"))
562617
continue;
563618
reconnect();
564619
res = execute("SELECT pg_ptrack_clear()", 0, NULL);
565620
PQclear(res);
566621
}
567622
PQclear(res_db);
568623
disconnect();
569-
dbname = old_dbname;
624+
pgut_dbname = old_dbname;
570625
}
571626

572627
static char *
573628
pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *result_size)
574629
{
575630
PGresult *res_db, *res;
576-
const char *old_dbname = dbname;
631+
const char *old_dbname = pgut_dbname;
577632
char *params[2];
578633
char *result;
579634

@@ -584,7 +639,7 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
584639
sprintf(params[1], "%i", rel_oid);
585640
res_db = execute("SELECT datname FROM pg_database WHERE oid=$1", 1, (const char **)params);
586641
disconnect();
587-
dbname = pstrdup(PQgetvalue(res_db, 0, 0));
642+
pgut_dbname = pstrdup(PQgetvalue(res_db, 0, 0));
588643
PQclear(res_db);
589644

590645
reconnect();
@@ -595,8 +650,8 @@ pg_ptrack_get_and_clear(Oid tablespace_oid, Oid db_oid, Oid rel_oid, size_t *res
595650
pfree(params[0]);
596651
pfree(params[1]);
597652

598-
pfree((char *)dbname);
599-
dbname = old_dbname;
653+
pfree((char *)pgut_dbname);
654+
pgut_dbname = old_dbname;
600655

601656
return result;
602657
}
@@ -683,7 +738,52 @@ wait_for_archive(pgBackup *backup, const char *sql)
683738
static void
684739
pg_stop_backup(pgBackup *backup)
685740
{
686-
wait_for_archive(backup,
741+
if (stream_wal)
742+
{
743+
PGresult *res;
744+
TimeLineID tli;
745+
746+
reconnect();
747+
748+
/* Remove annoying NOTICE messages generated by backend */
749+
res = execute("SET client_min_messages = warning;", 0, NULL);
750+
PQclear(res);
751+
752+
/* And execute the query wanted */
753+
res = execute("SELECT * FROM pg_stop_backup()", 0, NULL);
754+
755+
/* Get LSN from execution result */
756+
get_lsn(res, &stop_backup_lsn);
757+
PQclear(res);
758+
759+
/*
760+
* Enforce TLI obtention if backup is not present as this code
761+
* path can be taken as a callback at exit.
762+
*/
763+
tli = get_current_timeline(false);
764+
765+
/* Fill in fields if backup exists */
766+
if (backup != NULL)
767+
{
768+
backup->tli = tli;
769+
backup->stop_lsn = stop_backup_lsn;
770+
elog(LOG, "%s(): tli=%X lsn=%X/%08X",
771+
__FUNCTION__, backup->tli,
772+
(uint32) (backup->stop_lsn >> 32),
773+
(uint32) backup->stop_lsn);
774+
}
775+
776+
res = execute(TXID_CURRENT_SQL, 0, NULL);
777+
if (backup != NULL)
778+
{
779+
get_xid(res, &backup->recovery_xid);
780+
backup->recovery_time = time(NULL);
781+
}
782+
PQclear(res);
783+
disconnect();
784+
}
785+
else
786+
wait_for_archive(backup,
687787
"SELECT * FROM pg_stop_backup()");
688788
}
689789

@@ -719,8 +819,8 @@ get_lsn(PGresult *res, XLogRecPtr *lsn)
719819
* Extract timeline and LSN from results of pg_stop_backup()
720820
* and friends.
721821
*/
722-
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
723822

823+
XLogDataFromLSN(PQgetvalue(res, 0, 0), &xlogid, &xrecoff);
724824
/* Calculate LSN */
725825
*lsn = (XLogRecPtr) ((uint64) xlogid << 32) | xrecoff;
726826
}
@@ -1137,3 +1237,98 @@ void make_pagemap_from_ptrack(parray *files)
11371237
}
11381238
}
11391239
}
1240+
1241+
1242+
static bool
1243+
stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
1244+
{
1245+
static uint32 prevtimeline = 0;
1246+
static XLogRecPtr prevpos = InvalidXLogRecPtr;
1247+
1248+
/* we assume that we get called once at the end of each segment */
1249+
if (verbose && segment_finished)
1250+
fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
1251+
progname, (uint32) (xlogpos >> 32), (uint32) xlogpos,
1252+
timeline);
1253+
1254+
/*
1255+
* Note that we report the previous, not current, position here. After a
1256+
* timeline switch, xlogpos points to the beginning of the segment because
1257+
* that's where we always begin streaming. Reporting the end of previous
1258+
* timeline isn't totally accurate, because the next timeline can begin
1259+
* slightly before the end of the WAL that we received on the previous
1260+
* timeline, but it's close enough for reporting purposes.
1261+
*/
1262+
if (prevtimeline != 0 && prevtimeline != timeline)
1263+
fprintf(stderr, _("%s: switched to timeline %u at %X/%X\n"),
1264+
progname, timeline,
1265+
(uint32) (prevpos >> 32), (uint32) prevpos);
1266+
1267+
if (stop_backup_lsn != InvalidXLogRecPtr && xlogpos > stop_backup_lsn)
1268+
return true;
1269+
1270+
prevtimeline = timeline;
1271+
prevpos = xlogpos;
1272+
1273+
return false;
1274+
}
1275+
1276+
/*
1277+
* Start the log streaming
1278+
*/
1279+
static void
1280+
StreamLog(void *arg)
1281+
{
1282+
XLogRecPtr startpos;
1283+
TimeLineID starttli;
1284+
char *basedir = (char *)arg;
1285+
1286+
/*
1287+
* Connect in replication mode to the server
1288+
*/
1289+
if (conn == NULL)
1290+
conn = GetConnection();
1291+
if (!conn)
1292+
/* Error message already written in GetConnection() */
1293+
return;
1294+
1295+
if (!CheckServerVersionForStreaming(conn))
1296+
{
1297+
/*
1298+
* Error message already written in CheckServerVersionForStreaming().
1299+
* There's no hope of recovering from a version mismatch, so don't
1300+
* retry.
1301+
*/
1302+
disconnect_and_exit(1);
1303+
}
1304+
1305+
/*
1306+
* Identify server, obtaining start LSN position and current timeline ID
1307+
* at the same time, necessary if not valid data can be found in the
1308+
* existing output directory.
1309+
*/
1310+
if (!RunIdentifySystem(conn, NULL, &starttli, &startpos, NULL))
1311+
disconnect_and_exit(1);
1312+
1313+
1314+
/*
1315+
* Always start streaming at the beginning of a segment
1316+
*/
1317+
startpos -= startpos % XLOG_SEG_SIZE;
1318+
1319+
/*
1320+
* Start the replication
1321+
*/
1322+
if (verbose)
1323+
fprintf(stderr,
1324+
_("%s: starting log streaming at %X/%X (timeline %u)\n"),
1325+
progname, (uint32) (startpos >> 32), (uint32) startpos,
1326+
starttli);
1327+
1328+
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
1329+
stop_streaming, standby_message_timeout, ".partial",
1330+
false, false);
1331+
1332+
PQfinish(conn);
1333+
conn = NULL;
1334+
}

0 commit comments

Comments
 (0)