Skip to content

Commit 3c91bde

Browse files
sdudoladovSergey DudoladovFxKu
authored
Re-create pods only if all replicas are running (zalando#903)
* adds a Get call to Patroni interface to fetch state of a Patroni member * postpones re-creating pods if at least one replica is currently being created Co-authored-by: Sergey Dudoladov <sergey.dudoladov@zalando.de> Co-authored-by: Felix Kunde <felix-kunde@gmx.de>
1 parent 5014eeb commit 3c91bde

File tree

4 files changed

+67
-5
lines changed

4 files changed

+67
-5
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ _testmain.go
2828
/vendor/
2929
/build/
3030
/docker/build/
31+
/github.com/
3132
.idea
3233

3334
scm-source.json

e2e/tests/test_e2e.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,6 @@ def test_node_readiness_label(self):
344344
'''
345345
k8s = self.k8s
346346
cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster'
347-
labels = 'spilo-role=master,' + cluster_label
348347
readiness_label = 'lifecycle-status'
349348
readiness_value = 'ready'
350349

@@ -709,14 +708,16 @@ def wait_for_logical_backup_job_deletion(self):
709708
def wait_for_logical_backup_job_creation(self):
710709
self.wait_for_logical_backup_job(expected_num_of_jobs=1)
711710

712-
def update_config(self, config_map_patch):
713-
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
714-
711+
def delete_operator_pod(self):
715712
operator_pod = self.api.core_v1.list_namespaced_pod(
716713
'default', label_selector="name=postgres-operator").items[0].metadata.name
717714
self.api.core_v1.delete_namespaced_pod(operator_pod, "default") # restart reloads the conf
718715
self.wait_for_operator_pod_start()
719716

717+
def update_config(self, config_map_patch):
718+
self.api.core_v1.patch_namespaced_config_map("postgres-operator", "default", config_map_patch)
719+
self.delete_operator_pod()
720+
720721
def create_with_kubectl(self, path):
721722
return subprocess.run(
722723
["kubectl", "create", "-f", path],

pkg/cluster/pod.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,27 @@ func (c *Cluster) recreatePod(podName spec.NamespacedName) (*v1.Pod, error) {
294294
return pod, nil
295295
}
296296

297+
func (c *Cluster) isSafeToRecreatePods(pods *v1.PodList) bool {
298+
299+
/*
300+
Operator should not re-create pods if there is at least one replica being bootstrapped
301+
because Patroni might use other replicas to take basebackup from (see Patroni's "clonefrom" tag).
302+
303+
XXX operator cannot forbid replica re-init, so we might still fail if re-init is started
304+
after this check succeeds but before a pod is re-created
305+
*/
306+
307+
for _, pod := range pods.Items {
308+
state, err := c.patroni.GetPatroniMemberState(&pod)
309+
if err != nil || state == "creating replica" {
310+
c.logger.Warningf("cannot re-create replica %s: it is currently being initialized", pod.Name)
311+
return false
312+
}
313+
314+
}
315+
return true
316+
}
317+
297318
func (c *Cluster) recreatePods() error {
298319
c.setProcessName("starting to recreate pods")
299320
ls := c.labelsSet(false)
@@ -309,6 +330,10 @@ func (c *Cluster) recreatePods() error {
309330
}
310331
c.logger.Infof("there are %d pods in the cluster to recreate", len(pods.Items))
311332

333+
if !c.isSafeToRecreatePods(pods) {
334+
return fmt.Errorf("postpone pod recreation until next Sync: recreation is unsafe because pods are being initilalized")
335+
}
336+
312337
var (
313338
masterPod, newMasterPod, newPod *v1.Pod
314339
)

pkg/util/patroni/patroni.go

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package patroni
33
import (
44
"bytes"
55
"encoding/json"
6+
"errors"
67
"fmt"
78
"io/ioutil"
89
"net"
@@ -11,7 +12,7 @@ import (
1112
"time"
1213

1314
"github.com/sirupsen/logrus"
14-
"k8s.io/api/core/v1"
15+
v1 "k8s.io/api/core/v1"
1516
)
1617

1718
const (
@@ -25,6 +26,7 @@ const (
2526
type Interface interface {
2627
Switchover(master *v1.Pod, candidate string) error
2728
SetPostgresParameters(server *v1.Pod, options map[string]string) error
29+
GetPatroniMemberState(pod *v1.Pod) (string, error)
2830
}
2931

3032
// Patroni API client
@@ -123,3 +125,36 @@ func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]st
123125
}
124126
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
125127
}
128+
129+
//GetPatroniMemberState returns a state of member of a Patroni cluster
130+
func (p *Patroni) GetPatroniMemberState(server *v1.Pod) (string, error) {
131+
132+
apiURLString, err := apiURL(server)
133+
if err != nil {
134+
return "", err
135+
}
136+
response, err := p.httpClient.Get(apiURLString)
137+
if err != nil {
138+
return "", fmt.Errorf("could not perform Get request: %v", err)
139+
}
140+
defer response.Body.Close()
141+
142+
body, err := ioutil.ReadAll(response.Body)
143+
if err != nil {
144+
return "", fmt.Errorf("could not read response: %v", err)
145+
}
146+
147+
data := make(map[string]interface{})
148+
err = json.Unmarshal(body, &data)
149+
if err != nil {
150+
return "", err
151+
}
152+
153+
state, ok := data["state"].(string)
154+
if !ok {
155+
return "", errors.New("Patroni Get call response contains wrong type for 'state' field")
156+
}
157+
158+
return state, nil
159+
160+
}

0 commit comments

Comments
 (0)