Skip to content

Commit 2b07a41

Browse files
author
Jeff McCormick
committed
add clone code
1 parent da591d0 commit 2b07a41

File tree

1 file changed

+309
-0
lines changed

1 file changed

+309
-0
lines changed

operator/cluster/clone.go

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
/*
2+
Copyright 2017 Crunchy Data Solutions, Inc.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
16+
package cluster
17+
18+
import (
19+
"strings"
20+
//"encoding/json"
21+
log "github.com/Sirupsen/logrus"
22+
"os"
23+
//"text/template"
24+
"time"
25+
26+
//"github.com/crunchydata/postgres-operator/operator/pvc"
27+
"github.com/crunchydata/postgres-operator/operator/util"
28+
"github.com/crunchydata/postgres-operator/tpr"
29+
30+
"k8s.io/client-go/kubernetes"
31+
32+
"k8s.io/client-go/pkg/api"
33+
kerrors "k8s.io/client-go/pkg/api/errors"
34+
"k8s.io/client-go/pkg/api/v1"
35+
//v1batch "k8s.io/client-go/pkg/apis/batch/v1"
36+
37+
"k8s.io/client-go/pkg/apis/extensions/v1beta1"
38+
"k8s.io/client-go/pkg/fields"
39+
"k8s.io/client-go/pkg/watch"
40+
"k8s.io/client-go/rest"
41+
"k8s.io/client-go/tools/cache"
42+
)
43+
44+
func ProcessClone(clientset *kubernetes.Clientset, client *rest.RESTClient, stopchan chan struct{}, namespace string) {
45+
46+
eventchan := make(chan *tpr.PgClone)
47+
48+
source := cache.NewListWatchFromClient(client, "pgclones", namespace, fields.Everything())
49+
50+
createAddHandler := func(obj interface{}) {
51+
clone := obj.(*tpr.PgClone)
52+
eventchan <- clone
53+
addClone(clientset, client, clone, namespace)
54+
}
55+
createDeleteHandler := func(obj interface{}) {
56+
clone := obj.(*tpr.PgClone)
57+
eventchan <- clone
58+
//deleteClone(clientset, client, clone, namespace)
59+
}
60+
61+
updateHandler := func(old interface{}, obj interface{}) {
62+
clone := obj.(*tpr.PgClone)
63+
eventchan <- clone
64+
//log.Info("updating PgUpgrade object")
65+
//log.Info("updated with Name=" + job.Spec.Name)
66+
}
67+
68+
_, controller := cache.NewInformer(
69+
source,
70+
&tpr.PgClone{},
71+
time.Second*10,
72+
cache.ResourceEventHandlerFuncs{
73+
AddFunc: createAddHandler,
74+
UpdateFunc: updateHandler,
75+
DeleteFunc: createDeleteHandler,
76+
})
77+
78+
go controller.Run(stopchan)
79+
80+
for {
81+
select {
82+
case event := <-eventchan:
83+
//log.Infof("%#v\n", event)
84+
if event == nil {
85+
log.Info("event was null")
86+
}
87+
}
88+
}
89+
90+
}
91+
92+
func addClone(clientset *kubernetes.Clientset, tprclient *rest.RESTClient, clone *tpr.PgClone, namespace string) {
93+
log.Debug("addClone called")
94+
95+
log.Debug("clone.Spec.Name is " + clone.Spec.Name)
96+
log.Debug("clone.Spec.ClusterName is " + clone.Spec.ClusterName)
97+
98+
//get PgCluster
99+
//lookup the cluster
100+
cl := tpr.PgCluster{}
101+
err := tprclient.Get().
102+
Resource("pgclusters").
103+
Namespace(namespace).
104+
Name(clone.Spec.ClusterName).
105+
Do().
106+
Into(&cl)
107+
if err == nil {
108+
log.Debug("got cluster in clone prep")
109+
} else if kerrors.IsNotFound(err) {
110+
log.Error("could not get cluster in clone prep using " + clone.Spec.ClusterName)
111+
return
112+
} else {
113+
log.Errorf("\npgcluster %s\n", clone.Spec.ClusterName+" lookup error "+err.Error())
114+
return
115+
}
116+
117+
strategy, ok := StrategyMap[cl.Spec.STRATEGY]
118+
if ok {
119+
log.Info("strategy found")
120+
} else {
121+
log.Error("invalid STRATEGY requested for clone creation" + cl.Spec.STRATEGY)
122+
return
123+
}
124+
125+
strategy.PrepareClone(clientset, tprclient, clone.Spec.Name, &cl, namespace)
126+
127+
}
128+
129+
func CompleteClone(config *rest.Config, clientset *kubernetes.Clientset, client *rest.RESTClient, stopchan chan struct{}, namespace string) {
130+
131+
lo := v1.ListOptions{LabelSelector: "clone"}
132+
fw, err := clientset.Extensions().Deployments(namespace).Watch(lo)
133+
if err != nil {
134+
log.Error("error watching pg-cluster deployments" + err.Error())
135+
os.Exit(2)
136+
}
137+
138+
_, err4 := watch.Until(0, fw, func(event watch.Event) (bool, error) {
139+
log.Infoln("got a deployment watch event")
140+
141+
switch event.Type {
142+
case watch.Added:
143+
dep := event.Object.(*v1beta1.Deployment)
144+
log.Infof("clone pg-cluster Deployment added=%d\n", dep.Status.AvailableReplicas)
145+
case watch.Deleted:
146+
dep := event.Object.(*v1beta1.Deployment)
147+
log.Infof("clone pg-cluster Deployment deleted=%d\n", dep.Status.AvailableReplicas)
148+
case watch.Error:
149+
log.Infof("clone pg-cluster Deployment watch error event")
150+
case watch.Modified:
151+
dep := event.Object.(*v1beta1.Deployment)
152+
log.Infof("clone pg-cluster Deployment modified=%d\n", dep.Status.AvailableReplicas)
153+
if dep.Status.AvailableReplicas == 1 {
154+
log.Infoln("clone pg-cluster Deployment " + dep.Name + " succeeded")
155+
finishClone(config, clientset, client, dep, namespace)
156+
157+
}
158+
default:
159+
log.Infoln("pg-cluster Deployment unknown watch event %v\n", event.Type)
160+
}
161+
162+
return false, nil
163+
})
164+
165+
if err4 != nil {
166+
log.Error("erro in clone complete " + err4.Error())
167+
}
168+
169+
}
170+
171+
func finishClone(config *rest.Config, clientset *kubernetes.Clientset, tprclient *rest.RESTClient, dep *v1beta1.Deployment, namespace string) {
172+
//trigger the failover of the clone replica to make it a master
173+
//cmd := "touch /tmp/pg-failover-trigger"
174+
//cmd := "ls"
175+
containername := "database"
176+
podname, err := getMasterPodName(clientset, dep.Name, namespace)
177+
cmd := []string{"touch", "/tmp/pg-failover-trigger"}
178+
err = util.Exec(config, namespace, podname, containername, cmd)
179+
180+
//create the pgcluster tpr which creates the services and replica dep
181+
182+
clone := tpr.PgClone{}
183+
err = tprclient.Get().
184+
Resource("pgclones").
185+
Namespace(namespace).
186+
Name(dep.Name).
187+
Do().
188+
Into(&clone)
189+
if kerrors.IsNotFound(err) {
190+
log.Error("pgclone TPR not found for " + dep.Name)
191+
return
192+
} else if err != nil {
193+
log.Error("pgclone " + dep.Name + " lookup error " + err.Error())
194+
return
195+
}
196+
197+
cluster := tpr.PgCluster{}
198+
err = tprclient.Get().
199+
Resource("pgclusters").
200+
Namespace(namespace).
201+
Name(clone.Spec.ClusterName).
202+
Do().
203+
Into(&cluster)
204+
if kerrors.IsNotFound(err) {
205+
log.Error("pgcluster TPR not found for " + clone.Spec.ClusterName)
206+
return
207+
} else if err != nil {
208+
log.Error("pgcluster " + clone.Spec.ClusterName + " lookup error " + err.Error())
209+
return
210+
}
211+
212+
//copy the secrets
213+
err = util.CopySecrets(clientset, namespace, clone.Spec.ClusterName, clone.Spec.Name)
214+
if err != nil {
215+
log.Error("error in copying secrets for clone " + err.Error())
216+
return
217+
}
218+
219+
//override old name with new name
220+
newcluster := copyTPR(&cluster, &clone)
221+
222+
//create the tpr for the clone, this will cause services and replica
223+
//deployment to be created
224+
225+
result := tpr.PgCluster{}
226+
227+
err = tprclient.Post().
228+
Resource("pgclusters").
229+
Namespace(namespace).
230+
Body(newcluster).
231+
Do().Into(&result)
232+
if err != nil {
233+
log.Error("error in finish clone " + err.Error())
234+
}
235+
236+
//delete the pgclone after the cloning
237+
err = tprclient.Delete().
238+
Resource("pgclones").
239+
Namespace(namespace).
240+
Name(dep.Name).
241+
Do().
242+
Error()
243+
244+
if err != nil {
245+
log.Error("error deleting pgclone " + err.Error())
246+
} else {
247+
log.Info("deleted pgclone " + dep.Name)
248+
}
249+
250+
log.Info("finished clone " + dep.Name)
251+
252+
}
253+
254+
func getMasterPodName(clientset *kubernetes.Clientset, clusterName, namespace string) (string, error) {
255+
lo := v1.ListOptions{LabelSelector: "pg-cluster=" + clusterName}
256+
pods, err := clientset.Core().Pods(namespace).List(lo)
257+
if err != nil {
258+
log.Error("error getting list of pods" + err.Error())
259+
return "", err
260+
}
261+
//assume the first pod since this is for getting the master pod
262+
for _, pod := range pods.Items {
263+
return pod.ObjectMeta.Name, err
264+
}
265+
266+
return "", err
267+
268+
}
269+
270+
func copyTPR(cluster *tpr.PgCluster, clone *tpr.PgClone) *tpr.PgCluster {
271+
spec := tpr.PgClusterSpec{}
272+
273+
spec.PGUSER_SECRET_NAME = strings.Replace(cluster.Spec.PGUSER_SECRET_NAME, cluster.Spec.ClusterName, clone.Spec.Name, 1)
274+
spec.PGMASTER_SECRET_NAME = strings.Replace(cluster.Spec.PGMASTER_SECRET_NAME, cluster.Spec.ClusterName, clone.Spec.Name, 1)
275+
spec.PGROOT_SECRET_NAME = strings.Replace(cluster.Spec.PGROOT_SECRET_NAME, cluster.Spec.ClusterName, clone.Spec.Name, 1)
276+
277+
spec.Name = clone.Spec.Name
278+
spec.ClusterName = clone.Spec.Name
279+
280+
spec.CCP_IMAGE_TAG = cluster.Spec.CCP_IMAGE_TAG
281+
spec.Port = cluster.Spec.Port
282+
spec.PVC_NAME = cluster.Spec.PVC_NAME
283+
spec.PVC_SIZE = cluster.Spec.PVC_SIZE
284+
spec.PVC_ACCESS_MODE = cluster.Spec.PVC_ACCESS_MODE
285+
spec.SECRET_FROM = cluster.Spec.SECRET_FROM
286+
spec.BACKUP_PATH = cluster.Spec.BACKUP_PATH
287+
spec.BACKUP_PVC_NAME = cluster.Spec.BACKUP_PVC_NAME
288+
spec.PG_MASTER_HOST = cluster.Spec.PG_MASTER_HOST
289+
spec.PG_MASTER_USER = cluster.Spec.PG_MASTER_USER
290+
spec.PG_MASTER_PASSWORD = cluster.Spec.PG_MASTER_PASSWORD
291+
spec.PG_USER = cluster.Spec.PG_USER
292+
spec.PG_PASSWORD = cluster.Spec.PG_PASSWORD
293+
spec.PG_DATABASE = cluster.Spec.PG_DATABASE
294+
spec.PG_ROOT_PASSWORD = cluster.Spec.PG_ROOT_PASSWORD
295+
spec.REPLICAS = cluster.Spec.REPLICAS
296+
spec.FS_GROUP = cluster.Spec.FS_GROUP
297+
spec.SUPPLEMENTAL_GROUPS = cluster.Spec.SUPPLEMENTAL_GROUPS
298+
spec.STRATEGY = cluster.Spec.STRATEGY
299+
spec.BACKUP_PATH = cluster.Spec.BACKUP_PATH
300+
301+
newInstance := &tpr.PgCluster{
302+
Metadata: api.ObjectMeta{
303+
Name: clone.Spec.Name,
304+
},
305+
Spec: spec,
306+
}
307+
return newInstance
308+
309+
}

0 commit comments

Comments
 (0)