Skip to content

Commit 7ce1c86

Browse files
author
Jeff McCormick
committed
add replication receive and replay stats into pgo failover --query and into autofailover logic when picking the target for autofailover
1 parent 8224fe5 commit 7ce1c86

File tree

11 files changed

+233
-126
lines changed

11 files changed

+233
-126
lines changed

apiserver/common.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"k8s.io/api/core/v1"
2525
)
2626

27+
//TODO remove and replace with util.GetSecrets
2728
func GetSecrets(cluster *crv1.Pgcluster) ([]msgs.ShowUserSecret, error) {
2829

2930
output := make([]msgs.ShowUserSecret, 0)

apiserver/failoverservice/failoverimpl.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,8 @@ func QueryFailover(name string) msgs.QueryFailoverResponse {
138138
log.Debug("found " + dep.Name)
139139
target := msgs.FailoverTargetSpec{}
140140
target.Name = dep.Name
141+
142+
target.ReceiveLocation, target.ReplayLocation = util.GetRepStatus(apiserver.RESTClient, apiserver.Clientset, &dep, apiserver.Namespace)
141143
//get the pod status
142144
target.ReadyStatus, target.Node = apiserver.GetPodStatus(dep.Name)
143145
//get the rep status

apiservermsgs/failovermsgs.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ limitations under the License.
1818
import ()
1919

2020
type FailoverTargetSpec struct {
21-
Name string
22-
ReadyStatus string
23-
Node string
24-
RepStatus string
21+
Name string
22+
ReadyStatus string
23+
Node string
24+
RepStatus string
25+
ReceiveLocation uint64
26+
ReplayLocation uint64
2527
}
2628

2729
// QueryFailoverResponse ...

bin/postgres-operator/runpsql.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ FAILURES=0
2121
MAX_FAILURES=7
2222
while true; do
2323
sleep $SLEEP_TIME
24-
/usr/pgsql-9.6/bin/pg_isready --dbname=postgres --host=$2 --port=5432 --username=postgres
24+
/usr/pgsql-10/bin/pg_isready --dbname=postgres --host=$2 --port=5432 --username=postgres
2525
if [ $? -eq 0 ]
2626
then
2727
echo "Successfully reached master @ " `date`

examples/sample-json-load-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
COImagePrefix: crunchydata
2-
COImageTag: centos7-3.1
2+
COImageTag: centos7-3.2
33
DbDatabase: userdb
44
DbUser: postgres
55
DbPort: 5432

operator/cluster/autofailover.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/crunchydata/postgres-operator/kubeapi"
2626
"github.com/crunchydata/postgres-operator/operator"
2727
"github.com/crunchydata/postgres-operator/util"
28+
"k8s.io/api/extensions/v1beta1"
2829
kerrors "k8s.io/apimachinery/pkg/api/errors"
2930
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/client-go/kubernetes"
@@ -256,7 +257,7 @@ func (*AutoFailoverTask) GetEvents(restclient *rest.RESTClient, clusterName, nam
256257
return "", make(map[string]string)
257258
}
258259

259-
func getTargetDeployment(clientset *kubernetes.Clientset, clusterName, ns string) (string, error) {
260+
func getTargetDeployment(restclient *rest.RESTClient, clientset *kubernetes.Clientset, clusterName, ns string) (string, error) {
260261

261262
selector := util.LABEL_PRIMARY + "=false," + util.LABEL_PG_CLUSTER + "=" + clusterName
262263

@@ -272,22 +273,44 @@ func getTargetDeployment(clientset *kubernetes.Clientset, clusterName, ns string
272273
//return a deployment target that has a Ready database
273274
log.Debugf("deps len %d\n", len(deployments.Items))
274275
found := false
276+
readyDeps := make([]v1beta1.Deployment, 0)
275277
for _, dep := range deployments.Items {
276278
ready := getPodStatus(clientset, dep.Name, ns)
277279
if ready {
278280
log.Debug("autofail: found ready deployment " + dep.Name)
279281
found = true
280-
return dep.Name, err
282+
readyDeps = append(readyDeps, dep)
283+
//return dep.Name, err
281284
} else {
282285
log.Debug("autofail: found not ready deployment " + dep.Name)
283286
}
284287
}
288+
285289
if !found {
286290
log.Error("could not find a ready pod in autofailover for cluster " + clusterName)
287291
return "", errors.New("could not find a ready pod to failover to")
288292
}
289293

290-
return "", err
294+
//at this point readyDeps should hold all the Ready deployments
295+
//we look for the most up to date and return that name
296+
297+
var value uint64
298+
value = 0
299+
var selectedDeployment v1beta1.Deployment
300+
301+
for _, d := range readyDeps {
302+
target := util.ReplicationInfo{}
303+
target.ReceiveLocation, target.ReplayLocation = util.GetRepStatus(restclient, clientset, &d, ns)
304+
log.Debug("autofail receive=%d replay=%d dep=%s\n", target.ReceiveLocation, target.ReplayLocation, d.Name)
305+
if target.ReceiveLocation > value {
306+
value = target.ReceiveLocation
307+
selectedDeployment = d
308+
}
309+
310+
}
311+
312+
log.Debugf("autofail logic selected deployment is %s receive=%d\n", selectedDeployment.Name, value)
313+
return selectedDeployment.Name, err
291314

292315
}
293316

@@ -314,7 +337,7 @@ func getPodStatus(clientset *kubernetes.Clientset, depname, ns string) bool {
314337
}
315338

316339
func (s *StateMachine) triggerFailover() {
317-
targetDeploy, err := getTargetDeployment(s.Clientset, s.ClusterName, s.Namespace)
340+
targetDeploy, err := getTargetDeployment(s.RESTClient, s.Clientset, s.ClusterName, s.Namespace)
318341
if targetDeploy == "" || err != nil {
319342
log.Errorf("could not autofailover with no replicas found for %s\n", s.ClusterName)
320343
return

pgo/cmd/failover.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func init() {
6161
failoverCmd.Flags().BoolVarP(&Query, "query", "", false, "Prints the list of failover candidates.")
6262
failoverCmd.Flags().BoolVarP(&NoPrompt, "no-prompt", "n", false, "No command line confirmation.")
6363
failoverCmd.Flags().StringVarP(&Target, "target", "", "", "The replica target which the failover will occur on.")
64-
64+
6565
}
6666

6767
// createFailover ....
@@ -140,7 +140,6 @@ func queryFailover(args []string) {
140140
fmt.Println("Error: Do: ", err)
141141
return
142142
}
143-
log.Debugf("%v\n", resp)
144143
StatusCheck(resp)
145144

146145
defer resp.Body.Close()
@@ -158,7 +157,7 @@ func queryFailover(args []string) {
158157
if len(response.Targets) > 0 {
159158
fmt.Println("Failover targets include:")
160159
for i := 0; i < len(response.Targets); i++ {
161-
fmt.Println("\t" + response.Targets[i].Name + " (" + response.Targets[i].ReadyStatus + ") (" + response.Targets[i].Node + ")")
160+
printTarget(response.Targets[i])
162161
}
163162
}
164163
for k := range response.Results {
@@ -170,3 +169,7 @@ func queryFailover(args []string) {
170169
}
171170

172171
}
172+
173+
func printTarget(target msgs.FailoverTargetSpec) {
174+
fmt.Printf("\t%s (%s) (%s) ReceiveLoc (%d) ReplayLoc (%d)\n", target.Name, target.ReadyStatus, target.Node, target.ReceiveLocation, target.ReplayLocation)
175+
}

pgo/cmd/scale.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,12 @@ func queryCluster(args []string) {
172172
return
173173
}
174174

175+
fmt.Println("jeff here we are")
175176
if response.Status.Code == msgs.Ok {
176177
if len(response.Targets) > 0 {
177178
fmt.Println("Replica targets include:")
178179
for i := 0; i < len(response.Targets); i++ {
179-
fmt.Println("\t" + response.Targets[i].Name + " (" + response.Targets[i].ReadyStatus + ") (" + response.Targets[i].Node + ")")
180+
printScaleTarget(response.Targets[i])
180181
}
181182
}
182183

@@ -232,3 +233,7 @@ func scaleDownCluster(clusterName string) {
232233
}
233234

234235
}
236+
237+
func printScaleTarget(target msgs.ScaleQueryTargetSpec) {
238+
fmt.Printf("\t%s (%s) (%s)\n", target.Name, target.ReadyStatus, target.Node)
239+
}

util/failover.go

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,37 @@ package util
1616
*/
1717

1818
import (
19+
"database/sql"
1920
"errors"
21+
"fmt"
2022
log "github.com/Sirupsen/logrus"
23+
crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
24+
msgs "github.com/crunchydata/postgres-operator/apiservermsgs"
2125
"github.com/crunchydata/postgres-operator/kubeapi"
26+
_ "github.com/lib/pq"
2227
"k8s.io/api/core/v1"
2328
"k8s.io/api/extensions/v1beta1"
2429
"k8s.io/client-go/kubernetes"
30+
"k8s.io/client-go/rest"
2531
)
2632

33+
const (
34+
replInfoQueryFormat = "SELECT %s(%s(), '0/0')::bigint, %s(%s(), '0/0')::bigint"
35+
36+
recvV9 = "pg_last_xlog_receive_location"
37+
replayV9 = "pg_last_xlog_replay_location"
38+
locationDiffV9 = "pg_xlog_location_diff"
39+
40+
recvV10 = "pg_last_wal_receive_lsn"
41+
replayV10 = "pg_last_wal_replay_lsn"
42+
locationDiffV10 = "pg_wal_lsn_diff"
43+
)
44+
45+
type ReplicationInfo struct {
46+
ReceiveLocation uint64
47+
ReplayLocation uint64
48+
}
49+
2750
// GetBestTarget
2851
func GetBestTarget(clientset *kubernetes.Clientset, clusterName, namespace string) (*v1.Pod, *v1beta1.Deployment, error) {
2952

@@ -101,3 +124,139 @@ func GetPod(clientset *kubernetes.Clientset, deploymentName, namespace string) (
101124

102125
return pod, err
103126
}
127+
128+
func GetRepStatus(restclient *rest.RESTClient, clientset *kubernetes.Clientset, dep *v1beta1.Deployment, namespace string) (uint64, uint64) {
129+
var receiveLocation, replayLocation uint64
130+
131+
//get the pods for this deployment
132+
selector := "primary=false,replica-name=" + dep.Name
133+
podList, err := kubeapi.GetPods(clientset, selector, namespace)
134+
if err != nil {
135+
log.Error(err.Error())
136+
return receiveLocation, replayLocation
137+
}
138+
139+
if len(podList.Items) != 1 {
140+
log.Debug("no replicas found for dep " + dep.Name)
141+
return receiveLocation, replayLocation
142+
}
143+
144+
pod := podList.Items[0]
145+
146+
//get the crd for this dep
147+
cluster := crv1.Pgcluster{}
148+
var clusterfound bool
149+
clusterfound, err = kubeapi.Getpgcluster(restclient, &cluster, dep.ObjectMeta.Labels[LABEL_PG_CLUSTER], namespace)
150+
if err != nil || !clusterfound {
151+
log.Error("Getpgcluster error: " + err.Error())
152+
return receiveLocation, replayLocation
153+
}
154+
155+
//get the postgres secret for this dep
156+
var secretInfo []msgs.ShowUserSecret
157+
secretInfo, err = GetSecrets(clientset, &cluster, namespace)
158+
var pgSecret msgs.ShowUserSecret
159+
var found bool
160+
for _, si := range secretInfo {
161+
if si.Username == "postgres" {
162+
pgSecret = si
163+
found = true
164+
log.Debug("postgres secret found")
165+
}
166+
}
167+
168+
if !found {
169+
log.Error("postgres secret not found for " + dep.Name)
170+
return receiveLocation, replayLocation
171+
}
172+
173+
port := "5432"
174+
databaseName := "postgres"
175+
target := getSQLTarget(&pod, pgSecret.Username, pgSecret.Password, port, databaseName)
176+
var repInfo *ReplicationInfo
177+
repInfo, err = GetReplicationInfo(target)
178+
if err != nil {
179+
log.Error(err)
180+
return receiveLocation, replayLocation
181+
}
182+
183+
receiveLocation = repInfo.ReceiveLocation
184+
replayLocation = repInfo.ReplayLocation
185+
return receiveLocation, replayLocation
186+
}
187+
188+
func getSQLTarget(pod *v1.Pod, username, password, port, db string) string {
189+
target := fmt.Sprintf(
190+
"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
191+
username,
192+
password,
193+
pod.Status.PodIP,
194+
port,
195+
db,
196+
)
197+
return target
198+
199+
}
200+
func GetReplicationInfo(target string) (*ReplicationInfo, error) {
201+
conn, err := sql.Open("postgres", target)
202+
203+
if err != nil {
204+
log.Errorf("Could not connect to: %s", target)
205+
return nil, err
206+
}
207+
208+
defer conn.Close()
209+
210+
// Get PG version
211+
var version int
212+
213+
rows, err := conn.Query("SELECT current_setting('server_version_num')")
214+
215+
if err != nil {
216+
log.Errorf("Could not perform query for version: %s", target)
217+
return nil, err
218+
}
219+
220+
defer rows.Close()
221+
222+
for rows.Next() {
223+
if err := rows.Scan(&version); err != nil {
224+
return nil, err
225+
}
226+
}
227+
// Get replication info
228+
var replicationInfoQuery string
229+
var recvLocation uint64
230+
var replayLocation uint64
231+
232+
if version < 100000 {
233+
replicationInfoQuery = fmt.Sprintf(
234+
replInfoQueryFormat,
235+
locationDiffV9, recvV9,
236+
locationDiffV9, replayV9,
237+
)
238+
} else {
239+
replicationInfoQuery = fmt.Sprintf(
240+
replInfoQueryFormat,
241+
locationDiffV10, recvV10,
242+
locationDiffV10, replayV10,
243+
)
244+
}
245+
246+
rows, err = conn.Query(replicationInfoQuery)
247+
248+
if err != nil {
249+
log.Errorf("Could not perform replication info query: %s", target)
250+
return nil, err
251+
}
252+
253+
defer rows.Close()
254+
255+
for rows.Next() {
256+
if err := rows.Scan(&recvLocation, &replayLocation); err != nil {
257+
return nil, err
258+
}
259+
}
260+
261+
return &ReplicationInfo{recvLocation, replayLocation}, nil
262+
}

0 commit comments

Comments
 (0)