Skip to content

Commit aa9764d

Browse files
committed
Wait checkpoint_timeout + 3s for LSN in stream mode
1 parent 82ca5a0 commit aa9764d

File tree

4 files changed

+271
-32
lines changed

4 files changed

+271
-32
lines changed

backup.c

Lines changed: 56 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,11 @@ const char *progname = "pg_probackup";
3333
/* list of files contained in backup */
3434
static parray *backup_files_list = NULL;
3535

36-
static pthread_mutex_t check_stream_mut = PTHREAD_MUTEX_INITIALIZER;
36+
static pthread_mutex_t start_stream_mut = PTHREAD_MUTEX_INITIALIZER;
37+
/*
38+
* We need to wait end of WAL streaming before execute pg_stop_backup().
39+
*/
40+
static pthread_t stream_thread;
3741

3842
static int is_ptrack_enable = false;
3943

@@ -105,7 +109,6 @@ do_backup_database(parray *backup_list)
105109
XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr;
106110

107111
pthread_t backup_threads[num_threads];
108-
pthread_t stream_thread;
109112
backup_files_args *backup_threads_args[num_threads];
110113

111114
pgBackup *prev_backup = NULL;
@@ -142,24 +145,6 @@ do_backup_database(parray *backup_list)
142145
strncat(label, " with pg_probackup", lengthof(label));
143146
pg_start_backup(label, smooth_checkpoint, &current);
144147

145-
pgBackupGetPath(&current, database_path, lengthof(database_path),
146-
DATABASE_DIR);
147-
148-
/* start stream replication */
149-
if (stream_wal)
150-
{
151-
join_path_components(dst_backup_path, database_path, PG_XLOG_DIR);
152-
dir_create_dir(dst_backup_path, DIR_PERMISSION);
153-
154-
pthread_mutex_lock(&check_stream_mut);
155-
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path);
156-
pthread_mutex_lock(&check_stream_mut);
157-
if (conn == NULL)
158-
elog(ERROR, "Cannot continue backup because stream connect has failed.");
159-
160-
pthread_mutex_unlock(&check_stream_mut);
161-
}
162-
163148
/*
164149
* If backup_label does not exist in $PGDATA, stop taking backup.
165150
* NOTE. We can check it only on master, though.
@@ -178,6 +163,24 @@ do_backup_database(parray *backup_list)
178163
}
179164
}
180165

166+
pgBackupGetPath(&current, database_path, lengthof(database_path),
167+
DATABASE_DIR);
168+
169+
/* start stream replication */
170+
if (stream_wal)
171+
{
172+
join_path_components(dst_backup_path, database_path, PG_XLOG_DIR);
173+
dir_create_dir(dst_backup_path, DIR_PERMISSION);
174+
175+
pthread_mutex_lock(&start_stream_mut);
176+
pthread_create(&stream_thread, NULL, (void *(*)(void *)) StreamLog, dst_backup_path);
177+
pthread_mutex_lock(&start_stream_mut);
178+
if (conn == NULL)
179+
elog(ERROR, "Cannot continue backup because stream connect has failed.");
180+
181+
pthread_mutex_unlock(&start_stream_mut);
182+
}
183+
181184
/*
182185
* To take incremental backup get the filelist of the last completed database
183186
*/
@@ -299,7 +302,7 @@ do_backup_database(parray *backup_list)
299302
pthread_create(&backup_threads[i], NULL, (void *(*)(void *)) backup_files, backup_threads_args[i]);
300303
}
301304

302-
/* Wait theads */
305+
/* Wait threads */
303306
for (i = 0; i < num_threads; i++)
304307
{
305308
pthread_join(backup_threads[i], NULL);
@@ -322,9 +325,6 @@ do_backup_database(parray *backup_list)
322325
parray *xlog_files_list;
323326
char pg_xlog_path[MAXPGPATH];
324327

325-
/* Wait for the completion of stream */
326-
pthread_join(stream_thread, NULL);
327-
328328
/* Scan backup PG_XLOG_DIR */
329329
xlog_files_list = parray_new();
330330
join_path_components(pg_xlog_path, database_path, PG_XLOG_DIR);
@@ -796,7 +796,8 @@ wait_wal_lsn(XLogRecPtr lsn)
796796
char wal_dir[MAXPGPATH],
797797
wal_segment_full_path[MAXPGPATH];
798798
char wal_segment[MAXFNAMELEN];
799-
uint32 try_count = 0;
799+
uint32 try_count = 0,
800+
timeout;
800801

801802
tli = get_current_timeline(false);
802803

@@ -806,12 +807,35 @@ wait_wal_lsn(XLogRecPtr lsn)
806807

807808
if (stream_wal)
808809
{
810+
PGresult *res;
811+
const char *val;
812+
const char *hintmsg;
813+
809814
pgBackupGetPath2(&current, wal_dir, lengthof(wal_dir),
810815
DATABASE_DIR, PG_XLOG_DIR);
811816
join_path_components(wal_segment_full_path, wal_dir, wal_segment);
817+
818+
res = pgut_execute(backup_conn, "show checkpoint_timeout", 0, NULL);
819+
val = PQgetvalue(res, 0, 0);
820+
PQclear(res);
821+
822+
if (!parse_int(val, (int *) &timeout, OPTION_UNIT_S,
823+
&hintmsg))
824+
{
825+
if (hintmsg)
826+
elog(ERROR, "Invalid value of checkout_timeout %s: %s", val,
827+
hintmsg);
828+
else
829+
elog(ERROR, "Invalid value of checkout_timeout %s", val);
830+
}
831+
/* Add 3 seconds to the initial value of checkpoint_timeout */
832+
timeout = timeout + 3;
812833
}
813834
else
835+
{
814836
join_path_components(wal_segment_full_path, arclog_path, wal_segment);
837+
timeout = archive_timeout;
838+
}
815839

816840
/* Wait until switched WAL is archived */
817841
while (!fileExists(wal_segment_full_path))
@@ -826,10 +850,10 @@ wait_wal_lsn(XLogRecPtr lsn)
826850
elog(INFO, "wait for LSN %X/%X in archived WAL segment %s",
827851
(uint32) (lsn >> 32), (uint32) lsn, wal_segment_full_path);
828852

829-
if (archive_timeout > 0 && try_count > archive_timeout)
853+
if (timeout > 0 && try_count > timeout)
830854
elog(ERROR,
831855
"switched WAL segment %s could not be archived in %d seconds",
832-
wal_segment, archive_timeout);
856+
wal_segment, timeout);
833857
}
834858

835859
/*
@@ -959,6 +983,9 @@ pg_stop_backup(pgBackup *backup)
959983

960984
PQclear(res);
961985

986+
if (stream_wal)
987+
/* Wait for the completion of stream */
988+
pthread_join(stream_thread, NULL);
962989
wait_wal_lsn(stop_backup_lsn);
963990

964991
/* Fill in fields if that is the correct end of backup. */
@@ -1631,7 +1658,7 @@ StreamLog(void *arg)
16311658
conn = GetConnection();
16321659
if (!conn)
16331660
{
1634-
pthread_mutex_unlock(&check_stream_mut);
1661+
pthread_mutex_unlock(&start_stream_mut);
16351662
/* Error message already written in GetConnection() */
16361663
return;
16371664
}
@@ -1655,7 +1682,7 @@ StreamLog(void *arg)
16551682
disconnect_and_exit(1);
16561683

16571684
/* Ok we have normal stream connect and main process can work again */
1658-
pthread_mutex_unlock(&check_stream_mut);
1685+
pthread_mutex_unlock(&start_stream_mut);
16591686

16601687
/*
16611688
* We must use startpos as start_lsn from start_backup

show.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ show_backup_list(FILE *out, parray *backup_list)
253253
parent_tli = get_parent_tli(backup->tli);
254254
backup_id = base36enc(backup->start_time);
255255

256-
fprintf(out, "%-6s %-19s %-6s %-7s %3d / %-3d %5s %6s %2X/%08X %2X/%08X %-8s\n",
256+
fprintf(out, "%-6s %-19s %-6s %-7s %3d / %-3d %5s %6s %2X/%-8X %2X/%-8X %-8s\n",
257257
backup_id,
258258
timestamp,
259259
pgBackupGetBackupMode(backup),

0 commit comments

Comments
 (0)