|
40 | 40 | #include <sys/stat.h>
|
41 | 41 |
|
42 | 42 | #include "access/transam.h"
|
| 43 | +#include "access/xlog_internal.h" |
43 | 44 | #include "common/string.h"
|
44 | 45 | #include "miscadmin.h"
|
45 | 46 | #include "replication/slot.h"
|
@@ -781,6 +782,76 @@ CheckSlotRequirements(void)
|
781 | 782 | errmsg("replication slots can only be used if wal_level >= archive")));
|
782 | 783 | }
|
783 | 784 |
|
| 785 | +/* |
| 786 | + * Reserve WAL for the currently active slot. |
| 787 | + * |
| 788 | + * Compute and set restart_lsn in a manner that's appropriate for the type of |
| 789 | + * the slot and concurrency safe. |
| 790 | + */ |
| 791 | +void |
| 792 | +ReplicationSlotReserveWal(void) |
| 793 | +{ |
| 794 | + ReplicationSlot *slot = MyReplicationSlot; |
| 795 | + |
| 796 | + Assert(slot != NULL); |
| 797 | + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); |
| 798 | + |
| 799 | + /* |
| 800 | + * The replication slot mechanism is used to prevent removal of required |
| 801 | + * WAL. As there is no interlock between this routine and checkpoints, WAL |
| 802 | + * segments could concurrently be removed when a now stale return value of |
| 803 | + * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that |
| 804 | + * this happens we'll just retry. |
| 805 | + */ |
| 806 | + while (true) |
| 807 | + { |
| 808 | + XLogSegNo segno; |
| 809 | + |
| 810 | + /* |
| 811 | + * For logical slots log a standby snapshot and start logical decoding |
| 812 | + * at exactly that position. That allows the slot to start up more |
| 813 | + * quickly. |
| 814 | + * |
| 815 | + * That's not needed (or indeed helpful) for physical slots as they'll |
| 816 | + * start replay at the last logged checkpoint anyway. Instead return |
| 817 | + * the location of the last redo LSN. While that slightly increases |
| 818 | + * the chance that we have to retry, it's where a base backup has to |
| 819 | + * start replay at. |
| 820 | + */ |
| 821 | + if (!RecoveryInProgress() && SlotIsLogical(slot)) |
| 822 | + { |
| 823 | + XLogRecPtr flushptr; |
| 824 | + |
| 825 | + /* start at current insert position */ |
| 826 | + slot->data.restart_lsn = GetXLogInsertRecPtr(); |
| 827 | + |
| 828 | + /* make sure we have enough information to start */ |
| 829 | + flushptr = LogStandbySnapshot(); |
| 830 | + |
| 831 | + /* and make sure it's fsynced to disk */ |
| 832 | + XLogFlush(flushptr); |
| 833 | + } |
| 834 | + else |
| 835 | + { |
| 836 | + slot->data.restart_lsn = GetRedoRecPtr(); |
| 837 | + } |
| 838 | + |
| 839 | + /* prevent WAL removal as fast as possible */ |
| 840 | + ReplicationSlotsComputeRequiredLSN(); |
| 841 | + |
| 842 | + /* |
| 843 | + * If all required WAL is still there, great, otherwise retry. The |
| 844 | + * slot should prevent further removal of WAL, unless there's a |
| 845 | + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written |
| 846 | + * the new restart_lsn above, so normally we should never need to loop |
| 847 | + * more than twice. |
| 848 | + */ |
| 849 | + XLByteToSeg(slot->data.restart_lsn, segno); |
| 850 | + if (XLogGetLastRemovedSegno() < segno) |
| 851 | + break; |
| 852 | + } |
| 853 | +} |
| 854 | + |
784 | 855 | /*
|
785 | 856 | * Flush all replication slots to disk.
|
786 | 857 | *
|
|
0 commit comments