@@ -23,10 +23,12 @@ import (
23
23
"github.com/sorintlab/stolon/internal/cluster"
24
24
"github.com/sorintlab/stolon/internal/util"
25
25
26
+ jsonpatch "github.com/evanphx/json-patch"
26
27
v1 "k8s.io/api/core/v1"
27
28
apierrors "k8s.io/apimachinery/pkg/api/errors"
28
29
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29
30
"k8s.io/apimachinery/pkg/labels"
31
+ "k8s.io/apimachinery/pkg/types"
30
32
"k8s.io/client-go/kubernetes"
31
33
"k8s.io/client-go/kubernetes/scheme"
32
34
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
@@ -72,6 +74,43 @@ func (s *KubeStore) labelSelector(componentLabel ComponentLabelValue) labels.Sel
72
74
return labels .SelectorFromSet (selector )
73
75
}
74
76
77
+ func (s * KubeStore ) patchKubeStatusAnnotation (annotationData []byte ) error {
78
+ podsClient := s .client .CoreV1 ().Pods (s .namespace )
79
+ retryErr := retry .RetryOnConflict (retry .DefaultRetry , func () error {
80
+ pod , err := podsClient .Get (s .podName , metav1.GetOptions {})
81
+ if err != nil {
82
+ return fmt .Errorf ("failed to get latest version of pod: %v" , err )
83
+ }
84
+
85
+ oldPodJSON , err := json .Marshal (pod )
86
+ if err != nil {
87
+ return fmt .Errorf ("failed to marshal pod: %v" , err )
88
+ }
89
+
90
+ if pod .Annotations == nil {
91
+ pod .Annotations = map [string ]string {}
92
+ }
93
+ pod .Annotations [util .KubeStatusAnnnotation ] = string (annotationData )
94
+
95
+ newPodJSON , err := json .Marshal (pod )
96
+ if err != nil {
97
+ return fmt .Errorf ("failed to marshal pod: %v" , err )
98
+ }
99
+
100
+ patchBytes , err := jsonpatch .CreateMergePatch (oldPodJSON , newPodJSON )
101
+ if err != nil {
102
+ return fmt .Errorf ("failed to create pod merge patch: %v" , err )
103
+ }
104
+
105
+ _ , err = podsClient .Patch (s .podName , types .MergePatchType , patchBytes )
106
+ return err
107
+ })
108
+ if retryErr != nil {
109
+ return fmt .Errorf ("update failed: %v" , retryErr )
110
+ }
111
+ return nil
112
+ }
113
+
75
114
func (s * KubeStore ) AtomicPutClusterData (ctx context.Context , cd * cluster.ClusterData , previous * KVPair ) (* KVPair , error ) {
76
115
cdj , err := json .Marshal (cd )
77
116
if err != nil {
@@ -210,23 +249,7 @@ func (s *KubeStore) SetKeeperInfo(ctx context.Context, id string, ms *cluster.Ke
210
249
if err != nil {
211
250
return err
212
251
}
213
- podsClient := s .client .CoreV1 ().Pods (s .namespace )
214
- retryErr := retry .RetryOnConflict (retry .DefaultRetry , func () error {
215
- result , err := podsClient .Get (s .podName , metav1.GetOptions {})
216
- if err != nil {
217
- return fmt .Errorf ("failed to get latest version of pod: %v" , err )
218
- }
219
- if result .Annotations == nil {
220
- result .Annotations = map [string ]string {}
221
- }
222
- result .Annotations [util .KubeStatusAnnnotation ] = string (msj )
223
- _ , err = podsClient .Update (result )
224
- return err
225
- })
226
- if retryErr != nil {
227
- return fmt .Errorf ("update failed: %v" , retryErr )
228
- }
229
- return nil
252
+ return s .patchKubeStatusAnnotation (msj )
230
253
}
231
254
232
255
func (s * KubeStore ) GetKeepersInfo (ctx context.Context ) (cluster.KeepersInfo , error ) {
@@ -261,23 +284,7 @@ func (s *KubeStore) SetSentinelInfo(ctx context.Context, si *cluster.SentinelInf
261
284
if err != nil {
262
285
return err
263
286
}
264
- podsClient := s .client .CoreV1 ().Pods (s .namespace )
265
- retryErr := retry .RetryOnConflict (retry .DefaultRetry , func () error {
266
- result , err := podsClient .Get (s .podName , metav1.GetOptions {})
267
- if err != nil {
268
- return fmt .Errorf ("failed to get latest version of pod: %v" , err )
269
- }
270
- if result .Annotations == nil {
271
- result .Annotations = map [string ]string {}
272
- }
273
- result .Annotations [util .KubeStatusAnnnotation ] = string (sij )
274
- _ , err = podsClient .Update (result )
275
- return err
276
- })
277
- if retryErr != nil {
278
- return fmt .Errorf ("update failed: %v" , retryErr )
279
- }
280
- return nil
287
+ return s .patchKubeStatusAnnotation (sij )
281
288
}
282
289
283
290
func (s * KubeStore ) GetSentinelsInfo (ctx context.Context ) (cluster.SentinelsInfo , error ) {
@@ -312,23 +319,7 @@ func (s *KubeStore) SetProxyInfo(ctx context.Context, pi *cluster.ProxyInfo, ttl
312
319
if err != nil {
313
320
return err
314
321
}
315
- podsClient := s .client .CoreV1 ().Pods (s .namespace )
316
- retryErr := retry .RetryOnConflict (retry .DefaultRetry , func () error {
317
- result , err := podsClient .Get (s .podName , metav1.GetOptions {})
318
- if err != nil {
319
- return fmt .Errorf ("failed to get latest version of pod: %v" , err )
320
- }
321
- if result .Annotations == nil {
322
- result .Annotations = map [string ]string {}
323
- }
324
- result .Annotations [util .KubeStatusAnnnotation ] = string (pij )
325
- _ , err = podsClient .Update (result )
326
- return err
327
- })
328
- if retryErr != nil {
329
- return fmt .Errorf ("update failed: %v" , retryErr )
330
- }
331
- return nil
322
+ return s .patchKubeStatusAnnotation (pij )
332
323
}
333
324
334
325
func (s * KubeStore ) GetProxiesInfo (ctx context.Context ) (cluster.ProxiesInfo , error ) {
0 commit comments