@@ -31,72 +31,89 @@ import (
31
31
logf "sigs.k8s.io/controller-runtime/pkg/log"
32
32
)
33
33
34
- type runModeType string
35
-
36
- const (
37
- localRunMode runModeType = "local"
38
- )
39
-
40
- // forceRunModeEnv indicates if the operator should be forced to run in either local
41
- // or cluster mode (currently only used for local mode)
42
- var forceRunModeEnv = "OSDK_FORCE_RUN_MODE"
43
-
44
- // errNoNamespace indicates that a namespace could not be found for the current
34
+ // ErrNoNamespace indicates that a namespace could not be found for the current
45
35
// environment
46
- var errNoNamespace = fmt .Errorf ("namespace not found for current environment" )
47
-
48
- // errRunLocal indicates that the operator is set to run in local mode (this error
49
- // is returned by functions that only work on operators running in cluster mode)
50
- var errRunLocal = fmt .Errorf ("operator run mode forced to local" )
36
+ var ErrNoNamespace = fmt .Errorf ("namespace not found for current environment" )
51
37
52
38
// podNameEnvVar is the constant for env variable POD_NAME
53
39
// which is the name of the current pod.
54
40
const podNameEnvVar = "POD_NAME"
55
41
42
+ var readNamespace = func () ([]byte , error ) {
43
+ return ioutil .ReadFile ("/var/run/secrets/kubernetes.io/serviceaccount/namespace" )
44
+ }
45
+
56
46
var log = logf .Log .WithName ("leader" )
57
47
58
48
// maxBackoffInterval defines the maximum amount of time to wait between
59
49
// attempts to become the leader.
60
50
const maxBackoffInterval = time .Second * 16
61
51
52
+ type Option func (* Config ) error
53
+
54
+ type Config struct {
55
+ Client crclient.Client
56
+ }
57
+
58
+ func (c * Config ) setDefaults () error {
59
+ if c .Client == nil {
60
+ config , err := config .GetConfig ()
61
+ if err != nil {
62
+ return err
63
+ }
64
+
65
+ client , err := crclient .New (config , crclient.Options {})
66
+ if err != nil {
67
+ return err
68
+ }
69
+ c .Client = client
70
+ }
71
+ return nil
72
+ }
73
+
74
+ func WithClient (cl crclient.Client ) Option {
75
+ return func (c * Config ) error {
76
+ c .Client = cl
77
+ return nil
78
+ }
79
+ }
80
+
62
81
// Become ensures that the current pod is the leader within its namespace. If
63
82
// run outside a cluster, it will skip leader election and return nil. It
64
83
// continuously tries to create a ConfigMap with the provided name and the
65
84
// current pod set as the owner reference. Only one can exist at a time with
66
85
// the same name, so the pod that successfully creates the ConfigMap is the
67
86
// leader. Upon termination of that pod, the garbage collector will delete the
68
87
// ConfigMap, enabling a different pod to become the leader.
69
- func Become (ctx context.Context , lockName string ) error {
88
+ func Become (ctx context.Context , lockName string , opts ... Option ) error {
70
89
log .Info ("Trying to become the leader." )
71
90
72
- ns , err := getOperatorNamespace ()
73
- if err != nil {
74
- if err == errNoNamespace || err == errRunLocal {
75
- log . Info ( "Skipping leader election; not running in a cluster." )
76
- return nil
91
+ config := Config {}
92
+
93
+ for _ , opt := range opts {
94
+ if err := opt ( & config ); err != nil {
95
+ return err
77
96
}
78
- return err
79
97
}
80
98
81
- config , err := config .GetConfig ()
82
- if err != nil {
99
+ if err := config .setDefaults (); err != nil {
83
100
return err
84
101
}
85
102
86
- client , err := crclient . New ( config , crclient. Options {} )
103
+ ns , err := getOperatorNamespace ( )
87
104
if err != nil {
88
105
return err
89
106
}
90
107
91
- owner , err := myOwnerRef (ctx , client , ns )
108
+ owner , err := myOwnerRef (ctx , config . Client , ns )
92
109
if err != nil {
93
110
return err
94
111
}
95
112
96
113
// check for existing lock from this pod, in case we got restarted
97
114
existing := & corev1.ConfigMap {}
98
115
key := crclient.ObjectKey {Namespace : ns , Name : lockName }
99
- err = client .Get (ctx , key , existing )
116
+ err = config . Client .Get (ctx , key , existing )
100
117
101
118
switch {
102
119
case err == nil :
@@ -126,15 +143,15 @@ func Become(ctx context.Context, lockName string) error {
126
143
// try to create a lock
127
144
backoff := time .Second
128
145
for {
129
- err := client .Create (ctx , cm )
146
+ err := config . Client .Create (ctx , cm )
130
147
switch {
131
148
case err == nil :
132
149
log .Info ("Became the leader." )
133
150
return nil
134
151
case apierrors .IsAlreadyExists (err ):
135
152
// refresh the lock so we use current leader
136
153
key := crclient.ObjectKey {Namespace : ns , Name : lockName }
137
- if err := client .Get (ctx , key , existing ); err != nil {
154
+ if err := config . Client .Get (ctx , key , existing ); err != nil {
138
155
log .Info ("Leader lock configmap not found." )
139
156
continue // configmap got lost ... just wait a bit
140
157
}
@@ -148,7 +165,7 @@ func Become(ctx context.Context, lockName string) error {
148
165
default :
149
166
leaderPod := & corev1.Pod {}
150
167
key = crclient.ObjectKey {Namespace : ns , Name : existingOwners [0 ].Name }
151
- err = client .Get (ctx , key , leaderPod )
168
+ err = config . Client .Get (ctx , key , leaderPod )
152
169
switch {
153
170
case apierrors .IsNotFound (err ):
154
171
log .Info ("Leader pod has been deleted, waiting for garbage collection to remove the lock." )
@@ -158,7 +175,7 @@ func Become(ctx context.Context, lockName string) error {
158
175
log .Info ("Operator pod with leader lock has been evicted." , "leader" , leaderPod .Name )
159
176
log .Info ("Deleting evicted leader." )
160
177
// Pod may not delete immediately, continue with backoff
161
- err := client .Delete (ctx , leaderPod )
178
+ err := config . Client .Delete (ctx , leaderPod )
162
179
if err != nil {
163
180
log .Error (err , "Leader pod could not be deleted." )
164
181
}
@@ -210,13 +227,10 @@ func isPodEvicted(pod corev1.Pod) bool {
210
227
211
228
// getOperatorNamespace returns the namespace the operator should be running in.
212
229
func getOperatorNamespace () (string , error ) {
213
- if isRunModeLocal () {
214
- return "" , errRunLocal
215
- }
216
- nsBytes , err := ioutil .ReadFile ("/var/run/secrets/kubernetes.io/serviceaccount/namespace" )
230
+ nsBytes , err := readNamespace ()
217
231
if err != nil {
218
232
if os .IsNotExist (err ) {
219
- return "" , errNoNamespace
233
+ return "" , ErrNoNamespace
220
234
}
221
235
return "" , err
222
236
}
@@ -225,17 +239,10 @@ func getOperatorNamespace() (string, error) {
225
239
return ns , nil
226
240
}
227
241
228
- func isRunModeLocal () bool {
229
- return os .Getenv (forceRunModeEnv ) == string (localRunMode )
230
- }
231
-
232
242
// getPod returns a Pod object that corresponds to the pod in which the code
233
243
// is currently running.
234
244
// It expects the environment variable POD_NAME to be set by the downwards API.
235
245
func getPod (ctx context.Context , client crclient.Client , ns string ) (* corev1.Pod , error ) {
236
- if isRunModeLocal () {
237
- return nil , errRunLocal
238
- }
239
246
podName := os .Getenv (podNameEnvVar )
240
247
if podName == "" {
241
248
return nil , fmt .Errorf ("required env %s not set, please configure downward API" , podNameEnvVar )
0 commit comments