Skip to content

Commit 48a5744

Browse files
Use Patroni API to set bootstrap-only options. (zalando#299)
Call Patroni API /config in order to set special options that are ignored when set in the configuration file, such as max_connections. Per zalando#297 * Some minor refacoring: Rename Cluster ManualFailover to Swithover Rename Patroni Failover to Switchover Add more details to error messages and comments introduced in this PR. Review by @zerg-junior
1 parent 24df918 commit 48a5744

File tree

6 files changed

+85
-17
lines changed

6 files changed

+85
-17
lines changed

pkg/cluster/cluster.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -866,8 +866,8 @@ func (c *Cluster) GetStatus() *spec.ClusterStatus {
866866
}
867867
}
868868

869-
// ManualFailover does manual failover to a candidate pod
870-
func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
869+
// Switchover does a switchover (via Patroni) to a candidate pod
870+
func (c *Cluster) Switchover(curMaster *v1.Pod, candidate spec.NamespacedName) error {
871871
c.logger.Debugf("failing over from %q to %q", curMaster.Name, candidate)
872872

873873
podLabelErr := make(chan error)
@@ -889,7 +889,7 @@ func (c *Cluster) ManualFailover(curMaster *v1.Pod, candidate spec.NamespacedNam
889889
}
890890
}()
891891

892-
if err := c.patroni.Failover(curMaster, candidate.Name); err != nil {
892+
if err := c.patroni.Switchover(curMaster, candidate.Name); err != nil {
893893
close(stopCh)
894894
return fmt.Errorf("could not failover: %v", err)
895895
}

pkg/cluster/pod.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (c *Cluster) MigrateMasterPod(podName spec.NamespacedName) error {
232232
}
233233

234234
masterCandidateName := util.NameFromMeta(pod.ObjectMeta)
235-
if err := c.ManualFailover(oldMaster, masterCandidateName); err != nil {
235+
if err := c.Switchover(oldMaster, masterCandidateName); err != nil {
236236
return fmt.Errorf("could not failover to pod %q: %v", masterCandidateName, err)
237237
}
238238
} else {
@@ -330,7 +330,7 @@ func (c *Cluster) recreatePods() error {
330330
if masterPod != nil {
331331
// failover if we have not observed a master pod when re-creating former replicas.
332332
if newMasterPod == nil && len(replicas) > 0 {
333-
if err := c.ManualFailover(masterPod, masterCandidate(replicas)); err != nil {
333+
if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
334334
c.logger.Warningf("could not perform failover: %v", err)
335335
}
336336
} else if newMasterPod == nil && len(replicas) == 0 {

pkg/cluster/resources.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (c *Cluster) preScaleDown(newStatefulSet *v1beta1.StatefulSet) error {
125125
return fmt.Errorf("pod %q does not belong to cluster", podName)
126126
}
127127

128-
if err := c.patroni.Failover(&masterPod[0], masterCandidatePod.Name); err != nil {
128+
if err := c.patroni.Switchover(&masterPod[0], masterCandidatePod.Name); err != nil {
129129
return fmt.Errorf("could not failover: %v", err)
130130
}
131131

pkg/cluster/sync.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,7 @@ func (c *Cluster) syncStatefulSet() error {
229229
var (
230230
podsRollingUpdateRequired bool
231231
)
232+
// NB: Be careful to consider the codepath that acts on podsRollingUpdateRequired before returning early.
232233
sset, err := c.KubeClient.StatefulSets(c.Namespace).Get(c.statefulSetName(), metav1.GetOptions{})
233234
if err != nil {
234235
if !k8sutil.ResourceNotFound(err) {
@@ -288,6 +289,14 @@ func (c *Cluster) syncStatefulSet() error {
288289
}
289290
}
290291
}
292+
293+
// Apply special PostgreSQL parameters that can only be set via the Patroni API.
294+
// it is important to do it after the statefulset pods are there, but before the rolling update
295+
// since those parameters require PostgreSQL restart.
296+
if err := c.checkAndSetGlobalPostgreSQLConfiguration(); err != nil {
297+
return fmt.Errorf("could not set cluster-wide PostgreSQL configuration options: %v", err)
298+
}
299+
291300
// if we get here we also need to re-create the pods (either leftovers from the old
292301
// statefulset or those that got their configuration from the outdated statefulset)
293302
if podsRollingUpdateRequired {
@@ -303,6 +312,43 @@ func (c *Cluster) syncStatefulSet() error {
303312
return nil
304313
}
305314

315+
// checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters
316+
// (like max_connections) has changed and if necessary sets it via the Patroni API
317+
func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration() error {
318+
// we need to extract those options from the cluster manifest.
319+
optionsToSet := make(map[string]string)
320+
pgOptions := c.Spec.Parameters
321+
322+
for k, v := range pgOptions {
323+
if isBootstrapOnlyParameter(k) {
324+
optionsToSet[k] = v
325+
}
326+
}
327+
328+
if len(optionsToSet) > 0 {
329+
pods, err := c.listPods()
330+
if err != nil {
331+
return err
332+
}
333+
if len(pods) == 0 {
334+
return fmt.Errorf("could not call Patroni API: cluster has no pods")
335+
}
336+
for _, pod := range pods {
337+
podName := util.NameFromMeta(pod.ObjectMeta)
338+
c.logger.Debugf("calling Patroni API on a pod %s to set the following Postgres options: %v",
339+
podName, optionsToSet)
340+
if err := c.patroni.SetPostgresParameters(&pod, optionsToSet); err == nil {
341+
return nil
342+
} else {
343+
c.logger.Warningf("could not patch postgres parameters with a pod %s: %v", podName, err)
344+
}
345+
}
346+
return fmt.Errorf("could not reach Patroni API to set Postgres options: failed on every pod (%d total)",
347+
len(pods))
348+
}
349+
return nil
350+
}
351+
306352
func (c *Cluster) syncSecrets() error {
307353
c.setProcessName("syncing secrets")
308354
secrets := c.generateUserSecrets()

pkg/spec/types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io/ioutil"
77
"log"
8+
"os"
89
"strings"
910
"time"
1011

@@ -14,6 +15,7 @@ import (
1415
"k8s.io/client-go/pkg/apis/apps/v1beta1"
1516
policyv1beta1 "k8s.io/client-go/pkg/apis/policy/v1beta1"
1617
"k8s.io/client-go/rest"
18+
1719
)
1820

1921
// EventType contains type of the events for the TPRs and Pods received from Kubernetes
@@ -223,6 +225,9 @@ func (r RoleOrigin) String() string {
223225
// Placing this func here instead of pgk/util avoids circular import
224226
func GetOperatorNamespace() string {
225227
if operatorNamespace == "" {
228+
if namespaceFromEnvironment := os.Getenv("OPERATOR_NAMESPACE"); namespaceFromEnvironment != "" {
229+
return namespaceFromEnvironment
230+
}
226231
operatorNamespaceBytes, err := ioutil.ReadFile(fileWithNamespace)
227232
if err != nil {
228233
log.Fatalf("Unable to detect operator namespace from within its pod due to: %v", err)

pkg/util/patroni/patroni.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414

1515
const (
1616
failoverPath = "/failover"
17+
configPath = "/config"
1718
apiPort = 8008
1819
timeout = 30 * time.Second
1920
)
2021

2122
// Interface describe patroni methods
2223
type Interface interface {
23-
Failover(master *v1.Pod, candidate string) error
24+
Switchover(master *v1.Pod, candidate string) error
25+
SetPostgresParameters(server *v1.Pod, options map[string]string) error
2426
}
2527

2628
// Patroni API client
@@ -45,20 +47,13 @@ func apiURL(masterPod *v1.Pod) string {
4547
return fmt.Sprintf("http://%s:%d", masterPod.Status.PodIP, apiPort)
4648
}
4749

48-
// Failover does manual failover via patroni api
49-
func (p *Patroni) Failover(master *v1.Pod, candidate string) error {
50-
buf := &bytes.Buffer{}
51-
52-
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate})
53-
if err != nil {
54-
return fmt.Errorf("could not encode json: %v", err)
55-
}
56-
request, err := http.NewRequest(http.MethodPost, apiURL(master)+failoverPath, buf)
50+
func (p *Patroni) httpPostOrPatch(method string, url string, body *bytes.Buffer) error {
51+
request, err := http.NewRequest(method, url, body)
5752
if err != nil {
5853
return fmt.Errorf("could not create request: %v", err)
5954
}
6055

61-
p.logger.Debugf("making http request: %s", request.URL.String())
56+
p.logger.Debugf("making %s http request: %s", method, request.URL.String())
6257

6358
resp, err := p.httpClient.Do(request)
6459
if err != nil {
@@ -74,6 +69,28 @@ func (p *Patroni) Failover(master *v1.Pod, candidate string) error {
7469

7570
return fmt.Errorf("patroni returned '%s'", string(bodyBytes))
7671
}
72+
return nil
73+
}
74+
75+
// Switchover by calling Patroni REST API
76+
func (p *Patroni) Switchover(master *v1.Pod, candidate string) error {
77+
buf := &bytes.Buffer{}
78+
err := json.NewEncoder(buf).Encode(map[string]string{"leader": master.Name, "member": candidate})
79+
if err != nil {
80+
return fmt.Errorf("could not encode json: %v", err)
81+
}
82+
return p.httpPostOrPatch(http.MethodPost, apiURL(master)+failoverPath, buf)
7783

7884
return nil
7985
}
86+
87+
//TODO: add an option call /patroni to check if it is necessary to restart the server
88+
// SetPostgresParameters sets Postgres options via Patroni patch API call.
89+
func (p *Patroni) SetPostgresParameters(server *v1.Pod, parameters map[string]string) error {
90+
buf := &bytes.Buffer{}
91+
err := json.NewEncoder(buf).Encode(map[string]map[string]interface{}{"postgresql": {"parameters": parameters}})
92+
if err != nil {
93+
return fmt.Errorf("could not encode json: %v", err)
94+
}
95+
return p.httpPostOrPatch(http.MethodPatch, apiURL(server)+configPath, buf)
96+
}

0 commit comments

Comments
 (0)