Skip to content

Commit b96d997

Browse files
author
Jeff McCormick
committed
cleanup deploy script and add golang example
1 parent aa825ea commit b96d997

File tree

2 files changed

+214
-10
lines changed

2 files changed

+214
-10
lines changed

deploy/deploy.sh

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,10 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
1717

1818
$DIR/cleanup.sh
1919

20-
# see if CRDs need to be created
21-
#$CO_CMD get crd pgclusters.cr.client-go.k8s.io
22-
#if [ $? -eq 1 ]; then
23-
# $CO_CMD create -f $DIR/crd.yaml
24-
#fi
25-
2620
if [ "$CO_CMD" = "kubectl" ]; then
2721
NS="--namespace=$CO_NAMESPACE"
2822
fi
2923

30-
#expenv -f $DIR/service-account.yaml | $CO_CMD create -f -
31-
#expenv -f $DIR/rbac.yaml | $CO_CMD create -f -
32-
#expenv -f $DIR/rbac-role-only.yaml | $CO_CMD create -f -
33-
3424
$CO_CMD $NS create secret generic apiserver-conf-secret \
3525
--from-file=server.crt=$COROOT/conf/apiserver/server.crt \
3626
--from-file=server.key=$COROOT/conf/apiserver/server.key \

golang-examples/lag.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
package main
2+
3+
import (
4+
"database/sql"
5+
"flag"
6+
"fmt"
7+
"os"
8+
9+
log "github.com/Sirupsen/logrus"
10+
crv1 "github.com/crunchydata/postgres-operator/apis/cr/v1"
11+
"github.com/crunchydata/postgres-operator/apiserver"
12+
msgs "github.com/crunchydata/postgres-operator/apiservermsgs"
13+
clientset "github.com/crunchydata/postgres-operator/client"
14+
15+
"github.com/crunchydata/postgres-operator/kubeapi"
16+
17+
_ "github.com/lib/pq"
18+
"k8s.io/api/core/v1"
19+
"k8s.io/client-go/kubernetes"
20+
21+
"k8s.io/client-go/tools/clientcmd"
22+
)
23+
24+
const (
25+
replInfoQueryFormat = "SELECT %s(%s(), '0/0')::bigint, %s(%s(), '0/0')::bigint"
26+
27+
recvV9 = "pg_last_xlog_receive_location"
28+
replayV9 = "pg_last_xlog_replay_location"
29+
locationDiffV9 = "pg_xlog_location_diff"
30+
31+
recvV10 = "pg_last_wal_receive_lsn"
32+
replayV10 = "pg_last_wal_replay_lsn"
33+
locationDiffV10 = "pg_wal_lsn_diff"
34+
)
35+
36+
type ReplicationInfo struct {
37+
ReceiveLocation uint64
38+
ReplayLocation uint64
39+
}
40+
41+
var (
42+
kubeconfig = flag.String("kubeconfig", "./config", "absolute path to the kubeconfig file")
43+
)
44+
45+
func main() {
46+
flag.Parse()
47+
fmt.Println("hi")
48+
49+
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
50+
if err != nil {
51+
panic(err.Error())
52+
}
53+
54+
kubeClient, err2 := kubernetes.NewForConfig(config)
55+
if err2 != nil {
56+
panic(err2.Error())
57+
}
58+
if kubeClient != nil {
59+
log.Println("got kube client")
60+
}
61+
restclient, _, err := clientset.NewClient(config)
62+
if err != nil {
63+
panic(err)
64+
}
65+
log.Println("got rest client")
66+
67+
namespace := "demo"
68+
//get the secrets for this cluster
69+
clusterName := "nake"
70+
71+
selector := "primary=true,pg-cluster=" + clusterName
72+
//get the pgcluster
73+
cluster := crv1.Pgcluster{}
74+
var clusterfound bool
75+
clusterfound, err = kubeapi.Getpgcluster(restclient, &cluster, clusterName, namespace)
76+
if err != nil || !clusterfound {
77+
fmt.Println("Getpgcluster error: " + err.Error())
78+
os.Exit(2)
79+
} else {
80+
fmt.Println("pgcluster found " + clusterName)
81+
}
82+
//get the secrets for that pgcluster
83+
var secretInfo []msgs.ShowUserSecret
84+
apiserver.Clientset = kubeClient
85+
secretInfo, err = apiserver.GetSecrets(&cluster)
86+
var pgSecret msgs.ShowUserSecret
87+
var found bool
88+
for _, si := range secretInfo {
89+
if si.Username == "postgres" {
90+
pgSecret = si
91+
found = true
92+
fmt.Println("postgres secret found")
93+
}
94+
}
95+
96+
if !found {
97+
fmt.Println("postgres secret not found for " + clusterName)
98+
os.Exit(2)
99+
} else {
100+
fmt.Println("found postgres secret with password " + pgSecret.Password)
101+
}
102+
103+
selector = "primary=false,pg-cluster=" + clusterName
104+
podList, err := kubeapi.GetPods(kubeClient, selector, namespace)
105+
if err != nil {
106+
fmt.Println(err.Error())
107+
os.Exit(2)
108+
}
109+
110+
var selectedReplica v1.Pod
111+
if len(podList.Items) > 0 {
112+
selectedReplica = podList.Items[0]
113+
} else {
114+
fmt.Println("no replicas found")
115+
os.Exit(2)
116+
}
117+
118+
var value uint64 = 0
119+
databaseName := "postgres"
120+
port := "5432"
121+
for _, pod := range podList.Items {
122+
fmt.Println(pod.Name)
123+
124+
target := getSQLTarget(&pod, pgSecret.Username, pgSecret.Password, port, databaseName)
125+
replInfo, err := GetReplicationInfo(target)
126+
if err != nil {
127+
fmt.Println(err.Error())
128+
} else {
129+
fmt.Printf("receive location=%d replaylocation=%d\n", replInfo.ReceiveLocation, replInfo.ReplayLocation)
130+
if replInfo.ReceiveLocation > value {
131+
value = replInfo.ReceiveLocation
132+
selectedReplica = pod
133+
}
134+
}
135+
}
136+
fmt.Println("selected replica pod name is " + selectedReplica.Name)
137+
}
138+
139+
func GetReplicationInfo(target string) (*ReplicationInfo, error) {
140+
conn, err := sql.Open("postgres", target)
141+
142+
if err != nil {
143+
log.Errorf("Could not connect to: %s", target)
144+
return nil, err
145+
}
146+
147+
defer conn.Close()
148+
149+
// Get PG version
150+
var version int
151+
152+
rows, err := conn.Query("SELECT current_setting('server_version_num')")
153+
154+
if err != nil {
155+
log.Errorf("Could not perform query for version: %s", target)
156+
return nil, err
157+
}
158+
159+
defer rows.Close()
160+
161+
for rows.Next() {
162+
if err := rows.Scan(&version); err != nil {
163+
return nil, err
164+
}
165+
}
166+
// Get replication info
167+
var replicationInfoQuery string
168+
var recvLocation uint64
169+
var replayLocation uint64
170+
171+
if version < 100000 {
172+
replicationInfoQuery = fmt.Sprintf(
173+
replInfoQueryFormat,
174+
locationDiffV9, recvV9,
175+
locationDiffV9, replayV9,
176+
)
177+
} else {
178+
replicationInfoQuery = fmt.Sprintf(
179+
replInfoQueryFormat,
180+
locationDiffV10, recvV10,
181+
locationDiffV10, replayV10,
182+
)
183+
}
184+
185+
rows, err = conn.Query(replicationInfoQuery)
186+
187+
if err != nil {
188+
log.Errorf("Could not perform replication info query: %s", target)
189+
return nil, err
190+
}
191+
192+
defer rows.Close()
193+
194+
for rows.Next() {
195+
if err := rows.Scan(&recvLocation, &replayLocation); err != nil {
196+
return nil, err
197+
}
198+
}
199+
200+
return &ReplicationInfo{recvLocation, replayLocation}, nil
201+
}
202+
203+
func getSQLTarget(pod *v1.Pod, username, password, port, db string) string {
204+
target := fmt.Sprintf(
205+
"postgresql://%s:%s@%s:%s/%s?sslmode=disable",
206+
username,
207+
password,
208+
pod.Status.PodIP,
209+
port,
210+
db,
211+
)
212+
return target
213+
214+
}

0 commit comments

Comments
 (0)