Skip to content

Commit 70d9e7c

Browse files
committed
Merge branch '217-pool-rotation' into 'master'
feat: add a storage pool rotation (#217) - use a linked list to define the next storage pool to update - exclude pools with running clones from rotation - deny updating an active pool as it will lead to downtime # Related issue #217 See merge request postgres-ai/database-lab!249
2 parents dbabb6c + 782e1dd commit 70d9e7c

File tree

12 files changed

+105
-164
lines changed

12 files changed

+105
-164
lines changed

configs/config.example.logical_generic.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ provision:
9797
# Custom parameters for clone containers, see
9898
# https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources
9999
containerConfig:
100-
"shm-size": 256MB
100+
"shm-size": 1gb
101101

102102
# Data retrieval flow. This section defines both initial retrieval, and rules
103103
# to keep the data directory in a synchronized state with the source. Both are optional:

configs/config.example.logical_rds_iam.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ provision:
9696

9797
# Custom parameters for clone containers (https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources).
9898
containerConfig:
99-
"shm-size": 256MB
99+
"shm-size": 1gb
100100

101101
# Data retrieval flow. This section defines both initial retrieval, and rules
102102
# to keep the data directory in a synchronized state with the source. Both are optional:

configs/config.example.physical_generic.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ provision:
9696

9797
# Custom parameters for clone containers (https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources).
9898
containerConfig:
99-
"shm-size": 256MB
99+
"shm-size": 1gb
100100

101101
# Data retrieval flow. This section defines both initial retrieval, and rules
102102
# to keep the data directory in a synchronized state with the source. Both are optional:

configs/config.example.physical_walg.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ provision:
9696

9797
# Custom parameters for clone containers (https://docs.docker.com/engine/reference/run/#runtime-constraints-on-resources).
9898
containerConfig:
99-
"shm-size": 256MB
99+
"shm-size": 1gb
100100

101101
# Data retrieval flow. This section defines both initial retrieval, and rules
102102
# to keep the data directory in a synchronized state with the source. Both are optional:

pkg/retrieval/engine/postgres/logical/restore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (r *RestoreJob) retrieveDataStateAt(ctx context.Context, contID string) (st
285285
return dataStateAt, nil
286286
}
287287

288-
// updateDataStateAt updates dataStateAt for in-memory representation of a filesystem pool.
288+
// updateDataStateAt updates dataStateAt for in-memory representation of a storage pool.
289289
func (r *RestoreJob) updateDataStateAt() {
290290
dsaTime, err := time.Parse(util.DataStateAtFormat, r.dbMark.DataStateAt)
291291
if err != nil {

pkg/retrieval/engine/postgres/snapshot/physical.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ func (p *PhysicalInitial) markDatabaseData() error {
671671
return p.dbMarker.SaveConfig(p.dbMark)
672672
}
673673

674-
// updateDataStateAt updates dataStateAt for in-memory representation of a filesystem pool.
674+
// updateDataStateAt updates dataStateAt for in-memory representation of a storage pool.
675675
func (p *PhysicalInitial) updateDataStateAt() {
676676
dsaTime, err := time.Parse(util.DataStateAtFormat, p.dbMark.DataStateAt)
677677
if err != nil {

pkg/retrieval/retrieval.go

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func (r *Retrieval) refreshFunc(ctx context.Context) func() {
240240
}
241241
}
242242

243-
// fullRefresh makes a full refresh for an old filesystem pool.
243+
// fullRefresh performs full refresh for an unused storage pool and makes it active.
244244
func (r *Retrieval) fullRefresh(ctx context.Context) error {
245245
// Stop previous runs and snapshot schedulers.
246246
if r.ctxCancel != nil {
@@ -249,13 +249,18 @@ func (r *Retrieval) fullRefresh(ctx context.Context) error {
249249

250250
runCtx, cancel := context.WithCancel(ctx)
251251
r.ctxCancel = cancel
252-
poolToUpdate := r.poolManager.Oldest()
252+
elementToUpdate := r.poolManager.GetPoolToUpdate()
253253

254-
if poolToUpdate == nil {
255-
log.Msg("Pool to a full refresh not found. Skip refreshing.")
254+
if elementToUpdate == nil || elementToUpdate.Value == nil {
255+
log.Msg("Pool to perform full refresh not found. Skip refreshing")
256256
return nil
257257
}
258258

259+
poolToUpdate, err := r.poolManager.GetFSManager(elementToUpdate.Value.(string))
260+
if err != nil {
261+
return errors.Wrap(err, "failed to get FSManager")
262+
}
263+
259264
log.Msg("Pool to a full refresh: ", poolToUpdate.Pool())
260265

261266
if err := preparePoolToRefresh(poolToUpdate); err != nil {
@@ -269,14 +274,11 @@ func (r *Retrieval) fullRefresh(ctx context.Context) error {
269274
return cleanUpErr
270275
}
271276

272-
current := r.poolManager.Active()
273-
274277
if err := r.run(runCtx, poolToUpdate); err != nil {
275278
return err
276279
}
277280

278-
r.poolManager.SetOldest(current)
279-
r.poolManager.SetActive(poolToUpdate)
281+
r.poolManager.SetActive(elementToUpdate)
280282

281283
return nil
282284
}

pkg/services/provision/pool/pool_manager.go

Lines changed: 62 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package pool
66

77
import (
8+
"container/list"
89
"io/ioutil"
910
"os"
1011
"path"
@@ -34,9 +35,8 @@ const (
3435
type Manager struct {
3536
cfg *Config
3637
mu *sync.Mutex
38+
fsManagerList *list.List
3739
fsManagerPool map[string]FSManager
38-
fsManager FSManager
39-
oldFsManager FSManager
4040
runner runners.Runner
4141
blockDeviceTypes map[string]string
4242
}
@@ -58,6 +58,7 @@ func NewPoolManager(cfg *Config, runner runners.Runner) *Manager {
5858
fsManagerPool: make(map[string]FSManager),
5959
runner: runner,
6060
blockDeviceTypes: make(map[string]string),
61+
fsManagerList: list.New(),
6162
}
6263
}
6364

@@ -68,24 +69,56 @@ func (pm *Manager) Reload(cfg Config) error {
6869
return pm.ReloadPools()
6970
}
7071

71-
// Active returns the active filesystem pool manager.
72-
func (pm *Manager) Active() FSManager {
73-
return pm.fsManager
72+
// SetActive sets a new active pool manager element.
73+
func (pm *Manager) SetActive(element *list.Element) {
74+
pm.fsManagerList.MoveToFront(element)
7475
}
7576

76-
// SetActive sets a new active pool manager.
77-
func (pm *Manager) SetActive(active FSManager) {
78-
pm.fsManager = active
77+
// Active returns the active storage pool manager.
78+
func (pm *Manager) Active() FSManager {
79+
active := pm.fsManagerList.Front()
80+
81+
if active == nil || active.Value == nil {
82+
return nil
83+
}
84+
85+
return pm.getFSManager(active.Value.(string))
7986
}
8087

81-
// Oldest returns the oldest filesystem pool manager.
82-
func (pm *Manager) Oldest() FSManager {
83-
return pm.oldFsManager
88+
func (pm *Manager) getFSManager(pool string) FSManager {
89+
pm.mu.Lock()
90+
fsm := pm.fsManagerPool[pool]
91+
pm.mu.Unlock()
92+
93+
return fsm
8494
}
8595

86-
// SetOldest sets a pool manager to update.
87-
func (pm *Manager) SetOldest(pool FSManager) {
88-
pm.oldFsManager = pool
96+
// GetPoolToUpdate returns the element to update.
97+
func (pm *Manager) GetPoolToUpdate() *list.Element {
98+
for element := pm.fsManagerList.Back(); element != nil; element = element.Prev() {
99+
if element.Value == nil {
100+
return nil
101+
}
102+
103+
// The active pool cannot be updated as it leads to downtime.
104+
if element == pm.fsManagerList.Front() {
105+
return nil
106+
}
107+
108+
fsm := pm.getFSManager(element.Value.(string))
109+
110+
clones, err := fsm.ListClonesNames()
111+
if err != nil {
112+
log.Err("failed to list clones", err)
113+
return nil
114+
}
115+
116+
if len(clones) == 0 {
117+
return element
118+
}
119+
}
120+
121+
return nil
89122
}
90123

91124
// GetFSManager returns a filesystem manager by name if exists.
@@ -130,24 +163,14 @@ func (pm *Manager) ReloadPools() error {
130163
fsPools := pm.examineEntries(entries)
131164

132165
if len(fsPools) == 0 {
133-
return errors.New("no available filesystem pools")
134-
}
135-
136-
active, old := pm.detectWorkingPools(fsPools)
137-
138-
if active == nil {
139-
return errors.New("active pool not found: make sure it exists")
166+
return errors.New("no available pools")
140167
}
141168

142169
pm.mu.Lock()
143-
144170
pm.fsManagerPool = fsPools
145-
pm.SetActive(active)
146-
pm.SetOldest(old)
147-
148171
pm.mu.Unlock()
149172

150-
log.Msg("Available FS pools: ", pm.describeAvailablePools())
173+
log.Msg("Available storage pools: ", pm.describeAvailablePools())
151174
log.Msg("Active pool: ", pm.Active().Pool().Name)
152175

153176
return nil
@@ -208,6 +231,13 @@ func (pm *Manager) examineEntries(entries []os.FileInfo) map[string]FSManager {
208231

209232
// TODO(akartasov): extract pool name.
210233
fsManagers[entry.Name()] = fsm
234+
235+
if pm.Active() == nil || pm.Active().Pool().DSA.Before(pool.DSA) {
236+
pm.fsManagerList.PushFront(fsm.Pool().Name)
237+
continue
238+
}
239+
240+
pm.fsManagerList.PushBack(fsm.Pool().Name)
211241
}
212242

213243
return fsManagers
@@ -226,33 +256,6 @@ func (pm *Manager) reloadBlockDevices() error {
226256
return nil
227257
}
228258

229-
func (pm *Manager) detectWorkingPools(fsm map[string]FSManager) (FSManager, FSManager) {
230-
var fsManager, old FSManager
231-
232-
for _, manager := range fsm {
233-
if fsManager == nil {
234-
fsManager = manager
235-
continue
236-
}
237-
238-
if fsManager.Pool().DSA.Before(manager.Pool().DSA) {
239-
if old == nil {
240-
old = fsManager
241-
}
242-
243-
fsManager = manager
244-
245-
continue
246-
}
247-
248-
if old == nil || manager.Pool().DSA.Before(old.Pool().DSA) {
249-
old = manager
250-
}
251-
}
252-
253-
return fsManager, old
254-
}
255-
256259
func extractDataStateAt(dataPath string) (*time.Time, error) {
257260
marker := dbmarker.NewMarker(dataPath)
258261

@@ -294,13 +297,14 @@ func (pm *Manager) getFSInfo(path string) (string, error) {
294297
func (pm *Manager) describeAvailablePools() []string {
295298
availablePools := []string{}
296299

297-
pm.mu.Lock()
300+
for el := pm.fsManagerList.Front(); el != nil; el = el.Next() {
301+
if el.Value == nil {
302+
log.Err("empty element: skip listing")
303+
continue
304+
}
298305

299-
for _, fsm := range pm.fsManagerPool {
300-
availablePools = append(availablePools, fsm.Pool().DataDir())
306+
availablePools = append(availablePools, el.Value.(string))
301307
}
302308

303-
pm.mu.Unlock()
304-
305309
return availablePools
306310
}

pkg/services/provision/pool/pool_manager_test.go

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)