@@ -50,8 +50,9 @@ static pthread_t stream_thread;
50
50
51
51
static int is_ptrack_enable = false;
52
52
53
- /* Backup connection */
53
+ /* Backup connections */
54
54
static PGconn * backup_conn = NULL ;
55
+ static PGconn * master_conn = NULL ;
55
56
56
57
/* PostgreSQL server version from "backup_conn" */
57
58
static int server_version = 0 ;
@@ -434,6 +435,17 @@ do_backup(void)
434
435
if (!current .stream && !pg_archive_enabled ())
435
436
elog (ERROR , "Archiving must be enabled for archive backup" );
436
437
438
+ if (from_replica )
439
+ {
440
+ /* Check master connection options */
441
+ if (master_host == NULL )
442
+ elog (ERROR , "Options for connection to master must be provided to perform backup from replica" );
443
+
444
+ /* Create connection to master server */
445
+ master_conn = pgut_connect_extended (master_host , master_port ,
446
+ master_db , master_user , password );
447
+ }
448
+
437
449
/* Get exclusive lock of backup catalog */
438
450
catalog_lock ();
439
451
@@ -896,6 +908,7 @@ pg_stop_backup(pgBackup *backup)
896
908
PGresult * res ;
897
909
uint32 xlogid ;
898
910
uint32 xrecoff ;
911
+ XLogRecPtr restore_lsn ;
899
912
900
913
/*
901
914
* We will use this values if there are no transactions between start_lsn
@@ -912,6 +925,80 @@ pg_stop_backup(pgBackup *backup)
912
925
0 , NULL );
913
926
PQclear (res );
914
927
928
+ /* Create restore point */
929
+ if (backup != NULL )
930
+ {
931
+ const char * params [1 ];
932
+ char name [1024 ];
933
+ char * backup_id ;
934
+
935
+ backup_id = base36enc (backup -> start_time );
936
+
937
+ if (!from_replica )
938
+ {
939
+ snprintf (name , lengthof (name ), "pg_probackup, backup_id %s" ,
940
+ backup_id );
941
+ params [0 ] = name ;
942
+
943
+ res = pgut_execute (backup_conn , "SELECT pg_create_restore_point($1)" ,
944
+ 1 , params );
945
+ PQclear (res );
946
+ }
947
+ else
948
+ {
949
+ uint32 try_count = 0 ;
950
+
951
+ snprintf (name , lengthof (name ), "pg_probackup, backup_id %s. Replica Backup" ,
952
+ backup_id );
953
+ params [0 ] = name ;
954
+
955
+ res = pgut_execute (master_conn , "SELECT pg_create_restore_point($1)" ,
956
+ 1 , params );
957
+ /* Extract timeline and LSN from result */
958
+ XLogDataFromLSN (PQgetvalue (res , 0 , 0 ), & xlogid , & xrecoff );
959
+ /* Calculate LSN */
960
+ restore_lsn = (XLogRecPtr ) ((uint64 ) xlogid << 32 ) | xrecoff ;
961
+ PQclear (res );
962
+
963
+ /* Wait for restore_lsn from master */
964
+ while (true)
965
+ {
966
+ XLogRecPtr min_recovery_lsn ;
967
+
968
+ res = pgut_execute (backup_conn , "SELECT min_recovery_end_location from pg_control_recovery()" ,
969
+ 0 , NULL );
970
+ /* Extract timeline and LSN from result */
971
+ XLogDataFromLSN (PQgetvalue (res , 0 , 0 ), & xlogid , & xrecoff );
972
+ /* Calculate LSN */
973
+ min_recovery_lsn = (XLogRecPtr ) ((uint64 ) xlogid << 32 ) | xrecoff ;
974
+ PQclear (res );
975
+
976
+ /* restore_lsn was streamed and applied to the replica */
977
+ if (min_recovery_lsn >= restore_lsn )
978
+ break ;
979
+
980
+ sleep (1 );
981
+ if (interrupted )
982
+ elog (ERROR , "Interrupted during waiting for restore point LSN" );
983
+ try_count ++ ;
984
+
985
+ /* Inform user if restore_lsn is absent in first attempt */
986
+ if (try_count == 1 )
987
+ elog (INFO , "Wait for restore point LSN %X/%X to be streamed "
988
+ "to replica" ,
989
+ (uint32 ) (restore_lsn >> 32 ), (uint32 ) restore_lsn );
990
+
991
+ if (replica_timeout > 0 && try_count > replica_timeout )
992
+ elog (ERROR , "Restore point LSN %X/%X could not be "
993
+ "streamed to replica in %d seconds" ,
994
+ (uint32 ) (restore_lsn >> 32 ), (uint32 ) restore_lsn ,
995
+ replica_timeout );
996
+ }
997
+ }
998
+
999
+ pfree (backup_id );
1000
+ }
1001
+
915
1002
if (!exclusive_backup )
916
1003
/*
917
1004
* Stop the non-exclusive backup. Besides stop_lsn it returns from
@@ -937,6 +1024,15 @@ pg_stop_backup(pgBackup *backup)
937
1024
/* Calculate LSN */
938
1025
stop_backup_lsn = (XLogRecPtr ) ((uint64 ) xlogid << 32 ) | xrecoff ;
939
1026
1027
+ if (!XRecOffIsValid (stop_backup_lsn ))
1028
+ {
1029
+ stop_backup_lsn = restore_lsn ;
1030
+ }
1031
+
1032
+ if (!XRecOffIsValid (stop_backup_lsn ))
1033
+ elog (ERROR , "Invalid stop_backup_lsn value %X/%X" ,
1034
+ (uint32 ) (stop_backup_lsn >> 32 ), (uint32 ) (stop_backup_lsn ));
1035
+
940
1036
/* Write backup_label and tablespace_map for backup from replica */
941
1037
if (!exclusive_backup )
942
1038
{
@@ -1019,18 +1115,19 @@ pg_stop_backup(pgBackup *backup)
1019
1115
if (stream_wal )
1020
1116
/* Wait for the completion of stream */
1021
1117
pthread_join (stream_thread , NULL );
1022
- /*
1023
- * Wait for stop_lsn to be archived or streamed.
1024
- * We wait for stop_lsn in stream mode just in case.
1025
- */
1026
- wait_wal_lsn (stop_backup_lsn );
1027
1118
1028
1119
/* Fill in fields if that is the correct end of backup. */
1029
1120
if (backup != NULL )
1030
1121
{
1031
1122
char * xlog_path ,
1032
1123
stream_xlog_path [MAXPGPATH ];
1033
1124
1125
+ /*
1126
+ * Wait for stop_lsn to be archived or streamed.
1127
+ * We wait for stop_lsn in stream mode just in case.
1128
+ */
1129
+ wait_wal_lsn (stop_backup_lsn );
1130
+
1034
1131
if (stream_wal )
1035
1132
{
1036
1133
pgBackupGetPath2 (backup , stream_xlog_path ,
@@ -1134,6 +1231,8 @@ static void
1134
1231
backup_disconnect (bool fatal , void * userdata )
1135
1232
{
1136
1233
pgut_disconnect (backup_conn );
1234
+ if (master_conn )
1235
+ pgut_disconnect (master_conn );
1137
1236
}
1138
1237
1139
1238
/* Count bytes in file */
0 commit comments