diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml index e24f962101..303c28b9d4 100644 --- a/.github/workflows/verify.yml +++ b/.github/workflows/verify.yml @@ -1,17 +1,18 @@ +name: PR title verifier + on: pull_request_target: - types: [opened, edited, reopened, synchronize] - -permissions: - checks: write # Allow access to checks to write check runs. + types: [opened, edited, synchronize, reopened] jobs: verify: runs-on: ubuntu-latest - name: verify PR contents + steps: - - name: Verifier action - id: verifier - uses: kubernetes-sigs/kubebuilder-release-tools@012269a88fa4c034a0acf1ba84c26b195c0dbab4 # tag=v0.4.3 - with: - github_token: ${{ secrets.GITHUB_TOKEN }} + - uses: actions/checkout@eef61447b9ff4aafe5dcd4e0bbf5d482be7e7871 # tag=v4.2.1 + + - name: Check if PR title is valid + env: + PR_TITLE: ${{ github.event.pull_request.title }} + run: | + ./hack/verify-pr-title.sh "${PR_TITLE}" diff --git a/examples/scratch-env/go.mod b/examples/scratch-env/go.mod index dceb4d12aa..8cf3ce725b 100644 --- a/examples/scratch-env/go.mod +++ b/examples/scratch-env/go.mod @@ -14,7 +14,6 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch/v5 v5.9.0 // indirect - github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/zapr v1.3.0 // indirect diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 89d30c15c1..a920487db5 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -13,8 +13,6 @@ github.com/evanphx/json-patch v0.5.2 h1:xVCHIVMUu1wtM/VkR9jVZ45N3FhZfYMMYGorLCR8 github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMikGIFbs31qQ= github.com/evanphx/json-patch/v5 v5.9.0 h1:kcBlZQbplgElYIlo/n1hJbls2z/1awpXxpRi0/FOJfg= github.com/evanphx/json-patch/v5 v5.9.0/go.mod h1:VNkHZ/282BpEyt/tObQO8s5CMPmYYq14uClGH4abBuQ= -github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= -github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= diff --git a/go.mod b/go.mod index 3fd1aa9562..a0a01baa74 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.22.0 require ( github.com/evanphx/json-patch/v5 v5.9.0 - github.com/fsnotify/fsnotify v1.7.0 github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 github.com/google/go-cmp v0.6.0 @@ -40,6 +39,7 @@ require ( github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect diff --git a/hack/verify-pr-title.sh b/hack/verify-pr-title.sh new file mode 100755 index 0000000000..a556b0172b --- /dev/null +++ b/hack/verify-pr-title.sh @@ -0,0 +1,54 @@ +#!/bin/bash + +# Copyright 2024 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define regex patterns +WIP_REGEX="^\W?WIP\W" +TAG_REGEX="^\[[[:alnum:]\._-]*\]" +PR_TITLE="$1" + +# Trim WIP and tags from title +trimmed_title=$(echo "$PR_TITLE" | sed -E "s/$WIP_REGEX//" | sed -E "s/$TAG_REGEX//" | xargs) + +# Normalize common emojis in text form to actual emojis +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:warning:/⚠/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:sparkles:/✨/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:bug:/🐛/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:book:/📖/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:rocket:/🚀/g") +trimmed_title=$(echo "$trimmed_title" | sed -E "s/:seedling:/🌱/g") + +# Check PR type prefix +if [[ "$trimmed_title" =~ ^(⚠|✨|🐛|📖|🚀|🌱) ]]; then + echo "PR title is valid: $trimmed_title" +else + echo "Error: No matching PR type indicator found in title." + echo "You need to have one of these as the prefix of your PR title:" + echo "- Breaking change: ⚠ (:warning:)" + echo "- Non-breaking feature: ✨ (:sparkles:)" + echo "- Patch fix: 🐛 (:bug:)" + echo "- Docs: 📖 (:book:)" + echo "- Release: 🚀 (:rocket:)" + echo "- Infra/Tests/Other: 🌱 (:seedling:)" + exit 1 +fi + +# Check that PR title does not contain Issue or PR number +if [[ "$trimmed_title" =~ \#[0-9]+ ]]; then + echo "Error: PR title should not contain issue or PR number." + echo "Issue numbers belong in the PR body as either \"Fixes #XYZ\" (if it closes the issue or PR), or something like \"Related to #XYZ\" (if it's just related)." + exit 1 +fi + diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 706f9c6cdd..406380d329 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -222,6 +222,18 @@ type Options struct { // DefaultNamespaces. DefaultUnsafeDisableDeepCopy *bool + // DefaultEnableWatchBookmarks requests watch events with type "BOOKMARK". + // Servers that do not implement bookmarks may ignore this flag and + // bookmarks are sent at the server's discretion. Clients should not + // assume bookmarks are returned at any specific interval, nor may they + // assume the server will send any BOOKMARK event during a session. + // + // This will be used for all object types, unless it is set in ByObject or + // DefaultNamespaces. + // + // Defaults to false. + DefaultEnableWatchBookmarks *bool + // ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object. // If unset, this will fall through to the Default* settings. ByObject map[client.Object]ByObject @@ -272,6 +284,15 @@ type ByObject struct { // Be very careful with this, when enabled you must DeepCopy any object before mutating it, // otherwise you will mutate the object in the cache. UnsafeDisableDeepCopy *bool + + // EnableWatchBookmarks requests watch events with type "BOOKMARK". + // Servers that do not implement bookmarks may ignore this flag and + // bookmarks are sent at the server's discretion. Clients should not + // assume bookmarks are returned at any specific interval, nor may they + // assume the server will send any BOOKMARK event during a session. + // + // Defaults to false. + EnableWatchBookmarks *bool } // Config describes all potential options for a given watch. @@ -298,6 +319,15 @@ type Config struct { // UnsafeDisableDeepCopy specifies if List and Get requests against the // cache should not DeepCopy. A nil value allows to default this. UnsafeDisableDeepCopy *bool + + // EnableWatchBookmarks requests watch events with type "BOOKMARK". + // Servers that do not implement bookmarks may ignore this flag and + // bookmarks are sent at the server's discretion. Clients should not + // assume bookmarks are returned at any specific interval, nor may they + // assume the server will send any BOOKMARK event during a session. + // + // Defaults to false. + EnableWatchBookmarks *bool } // NewCacheFunc - Function for creating a new cache from the options and a rest config. @@ -367,6 +397,7 @@ func optionDefaultsToConfig(opts *Options) Config { FieldSelector: opts.DefaultFieldSelector, Transform: opts.DefaultTransform, UnsafeDisableDeepCopy: opts.DefaultUnsafeDisableDeepCopy, + EnableWatchBookmarks: opts.DefaultEnableWatchBookmarks, } } @@ -376,6 +407,7 @@ func byObjectToConfig(byObject ByObject) Config { FieldSelector: byObject.Field, Transform: byObject.Transform, UnsafeDisableDeepCopy: byObject.UnsafeDisableDeepCopy, + EnableWatchBookmarks: byObject.EnableWatchBookmarks, } } @@ -398,6 +430,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { Transform: config.Transform, WatchErrorHandler: opts.DefaultWatchErrorHandler, UnsafeDisableDeepCopy: ptr.Deref(config.UnsafeDisableDeepCopy, false), + EnableWatchBookmarks: ptr.Deref(config.EnableWatchBookmarks, false), NewInformer: opts.newInformer, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, @@ -482,6 +515,7 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { byObject.Field = defaultedConfig.FieldSelector byObject.Transform = defaultedConfig.Transform byObject.UnsafeDisableDeepCopy = defaultedConfig.UnsafeDisableDeepCopy + byObject.EnableWatchBookmarks = defaultedConfig.EnableWatchBookmarks } opts.ByObject[obj] = byObject @@ -523,7 +557,9 @@ func defaultConfig(toDefault, defaultFrom Config) Config { if toDefault.UnsafeDisableDeepCopy == nil { toDefault.UnsafeDisableDeepCopy = defaultFrom.UnsafeDisableDeepCopy } - + if toDefault.EnableWatchBookmarks == nil { + toDefault.EnableWatchBookmarks = defaultFrom.EnableWatchBookmarks + } return toDefault } diff --git a/pkg/cache/defaulting_test.go b/pkg/cache/defaulting_test.go index 3c01bf8404..8e3033eb47 100644 --- a/pkg/cache/defaulting_test.go +++ b/pkg/cache/defaulting_test.go @@ -224,6 +224,30 @@ func TestDefaultOpts(t *testing.T) { return cmp.Diff(expected, o.ByObject[pod].UnsafeDisableDeepCopy) }, }, + { + name: "ByObject.EnableWatchBookmarks gets defaulted from DefaultEnableWatchBookmarks", + in: Options{ + ByObject: map[client.Object]ByObject{pod: {}}, + DefaultEnableWatchBookmarks: ptr.To(true), + }, + + verification: func(o Options) string { + expected := ptr.To(true) + return cmp.Diff(expected, o.ByObject[pod].EnableWatchBookmarks) + }, + }, + { + name: "ByObject.EnableWatchBookmarks doesn't get defaulted when set", + in: Options{ + ByObject: map[client.Object]ByObject{pod: {EnableWatchBookmarks: ptr.To(false)}}, + DefaultEnableWatchBookmarks: ptr.To(true), + }, + + verification: func(o Options) string { + expected := ptr.To(false) + return cmp.Diff(expected, o.ByObject[pod].EnableWatchBookmarks) + }, + }, { name: "DefaultNamespace label selector gets defaulted from DefaultLabelSelector", in: Options{ diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index cd8c6774ca..a40382d6f3 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -51,6 +51,7 @@ type InformersOpts struct { Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool + EnableWatchBookmarks bool WatchErrorHandler cache.WatchErrorHandler } @@ -78,6 +79,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { selector: options.Selector, transform: options.Transform, unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, + enableWatchBookmarks: options.EnableWatchBookmarks, newInformer: newInformer, watchErrorHandler: options.WatchErrorHandler, } @@ -174,6 +176,7 @@ type Informers struct { selector Selector transform cache.TransformFunc unsafeDisableDeepCopy bool + enableWatchBookmarks bool // NewInformer allows overriding of the shared index informer constructor for testing. newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer @@ -361,8 +364,10 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O return listWatcher.ListFunc(opts) }, WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { - ip.selector.ApplyToList(&opts) opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + + ip.selector.ApplyToList(&opts) return listWatcher.WatchFunc(opts) }, }, obj, calculateResyncPeriod(ip.resync), cache.Indexers{ @@ -444,6 +449,9 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + if namespace != "" { return resources.Namespace(namespace).Watch(ip.ctx, opts) } @@ -486,6 +494,9 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watcher watch.Interface, err error) { + opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + if namespace != "" { watcher, err = resources.Namespace(namespace).Watch(ip.ctx, opts) } else { @@ -527,6 +538,9 @@ func (ip *Informers) makeListWatcher(gvk schema.GroupVersionKind, obj runtime.Ob }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + opts.Watch = true // Watch needs to be set to true separately + opts.AllowWatchBookmarks = ip.enableWatchBookmarks + // Build the request. req := client.Get().Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec) if namespace != "" { diff --git a/pkg/certwatcher/certwatcher.go b/pkg/certwatcher/certwatcher.go index fe15fc0dd7..f629dd4e16 100644 --- a/pkg/certwatcher/certwatcher.go +++ b/pkg/certwatcher/certwatcher.go @@ -17,58 +17,55 @@ limitations under the License. package certwatcher import ( + "bytes" "context" "crypto/tls" - "fmt" + "os" "sync" "time" - "github.com/fsnotify/fsnotify" - kerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" ) var log = logf.RuntimeLog.WithName("certwatcher") -// CertWatcher watches certificate and key files for changes. When either file -// changes, it reads and parses both and calls an optional callback with the new -// certificate. +const defaultWatchInterval = 10 * time.Second + +// CertWatcher watches certificate and key files for changes. +// It always returns the cached version, +// but periodically reads and parses certificate and key for changes +// and calls an optional callback with the new certificate. type CertWatcher struct { sync.RWMutex currentCert *tls.Certificate - watcher *fsnotify.Watcher + interval time.Duration certPath string keyPath string + cachedKeyPEMBlock []byte + // callback is a function to be invoked when the certificate changes. callback func(tls.Certificate) } // New returns a new CertWatcher watching the given certificate and key. func New(certPath, keyPath string) (*CertWatcher, error) { - var err error - cw := &CertWatcher{ certPath: certPath, keyPath: keyPath, + interval: defaultWatchInterval, } - // Initial read of certificate and key. - if err := cw.ReadCertificate(); err != nil { - return nil, err - } - - cw.watcher, err = fsnotify.NewWatcher() - if err != nil { - return nil, err - } + return cw, cw.ReadCertificate() +} - return cw, nil +// WithWatchInterval sets the watch interval and returns the CertWatcher pointer +func (cw *CertWatcher) WithWatchInterval(interval time.Duration) *CertWatcher { + cw.interval = interval + return cw } // RegisterCallback registers a callback to be invoked when the certificate changes. @@ -91,72 +88,71 @@ func (cw *CertWatcher) GetCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, // Start starts the watch on the certificate and key files. func (cw *CertWatcher) Start(ctx context.Context) error { - files := sets.New(cw.certPath, cw.keyPath) - - { - var watchErr error - if err := wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { - for _, f := range files.UnsortedList() { - if err := cw.watcher.Add(f); err != nil { - watchErr = err - return false, nil //nolint:nilerr // We want to keep trying. - } - // We've added the watch, remove it from the set. - files.Delete(f) - } - return true, nil - }); err != nil { - return fmt.Errorf("failed to add watches: %w", kerrors.NewAggregate([]error{err, watchErr})) - } - } - - go cw.Watch() + ticker := time.NewTicker(cw.interval) + defer ticker.Stop() log.Info("Starting certificate watcher") - - // Block until the context is done. - <-ctx.Done() - - return cw.watcher.Close() -} - -// Watch reads events from the watcher's channel and reacts to changes. -func (cw *CertWatcher) Watch() { for { select { - case event, ok := <-cw.watcher.Events: - // Channel is closed. - if !ok { - return + case <-ctx.Done(): + return nil + case <-ticker.C: + if err := cw.ReadCertificate(); err != nil { + log.Error(err, "failed read certificate") } + } + } +} - cw.handleEvent(event) +// Watch used to read events from the watcher's channel and reacts to changes, +// it has currently no function and it's left here for backward compatibility until a future release. +// +// Deprecated: fsnotify has been removed and Start() is now polling instead. +func (cw *CertWatcher) Watch() { +} - case err, ok := <-cw.watcher.Errors: - // Channel is closed. - if !ok { - return - } +// updateCachedCertificate checks if the new certificate differs from the cache, +// updates it and returns the result if it was updated or not +func (cw *CertWatcher) updateCachedCertificate(cert *tls.Certificate, keyPEMBlock []byte) bool { + cw.Lock() + defer cw.Unlock() - log.Error(err, "certificate watch error") - } + if cw.currentCert != nil && + bytes.Equal(cw.currentCert.Certificate[0], cert.Certificate[0]) && + bytes.Equal(cw.cachedKeyPEMBlock, keyPEMBlock) { + log.V(7).Info("certificate already cached") + return false } + cw.currentCert = cert + cw.cachedKeyPEMBlock = keyPEMBlock + return true } // ReadCertificate reads the certificate and key files from disk, parses them, -// and updates the current certificate on the watcher. If a callback is set, it +// and updates the current certificate on the watcher if updated. If a callback is set, it // is invoked with the new certificate. func (cw *CertWatcher) ReadCertificate() error { metrics.ReadCertificateTotal.Inc() - cert, err := tls.LoadX509KeyPair(cw.certPath, cw.keyPath) + certPEMBlock, err := os.ReadFile(cw.certPath) + if err != nil { + metrics.ReadCertificateErrors.Inc() + return err + } + keyPEMBlock, err := os.ReadFile(cw.keyPath) if err != nil { metrics.ReadCertificateErrors.Inc() return err } - cw.Lock() - cw.currentCert = &cert - cw.Unlock() + cert, err := tls.X509KeyPair(certPEMBlock, keyPEMBlock) + if err != nil { + metrics.ReadCertificateErrors.Inc() + return err + } + + if !cw.updateCachedCertificate(&cert, keyPEMBlock) { + return nil + } log.Info("Updated current TLS certificate") @@ -170,39 +166,3 @@ func (cw *CertWatcher) ReadCertificate() error { } return nil } - -func (cw *CertWatcher) handleEvent(event fsnotify.Event) { - // Only care about events which may modify the contents of the file. - if !(isWrite(event) || isRemove(event) || isCreate(event) || isChmod(event)) { - return - } - - log.V(1).Info("certificate event", "event", event) - - // If the file was removed or renamed, re-add the watch to the previous name - if isRemove(event) || isChmod(event) { - if err := cw.watcher.Add(event.Name); err != nil { - log.Error(err, "error re-watching file") - } - } - - if err := cw.ReadCertificate(); err != nil { - log.Error(err, "error re-reading certificate") - } -} - -func isWrite(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Write) -} - -func isCreate(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Create) -} - -func isRemove(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Remove) -} - -func isChmod(event fsnotify.Event) bool { - return event.Op.Has(fsnotify.Chmod) -} diff --git a/pkg/certwatcher/certwatcher_suite_test.go b/pkg/certwatcher/certwatcher_suite_test.go index 2d0f677685..d0d9a72a62 100644 --- a/pkg/certwatcher/certwatcher_suite_test.go +++ b/pkg/certwatcher/certwatcher_suite_test.go @@ -22,6 +22,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" + logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" ) diff --git a/pkg/certwatcher/certwatcher_test.go b/pkg/certwatcher/certwatcher_test.go index 1fb247581f..fa7e09adae 100644 --- a/pkg/certwatcher/certwatcher_test.go +++ b/pkg/certwatcher/certwatcher_test.go @@ -17,6 +17,7 @@ limitations under the License. package certwatcher_test import ( + "bytes" "context" "crypto/rand" "crypto/rsa" @@ -34,6 +35,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/prometheus/client_golang/prometheus/testutil" + "sigs.k8s.io/controller-runtime/pkg/certwatcher" "sigs.k8s.io/controller-runtime/pkg/certwatcher/metrics" ) @@ -80,7 +82,7 @@ var _ = Describe("CertWatcher", func() { go func() { defer GinkgoRecover() defer close(doneCh) - Expect(watcher.Start(ctx)).To(Succeed()) + Expect(watcher.WithWatchInterval(time.Second).Start(ctx)).To(Succeed()) }() // wait till we read first cert Eventually(func() error { @@ -113,7 +115,7 @@ var _ = Describe("CertWatcher", func() { Eventually(func() bool { secondcert, _ := watcher.GetCertificate(nil) first := firstcert.PrivateKey.(*rsa.PrivateKey) - return first.Equal(secondcert.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) }).ShouldNot(BeTrue()) ctxCancel() @@ -143,7 +145,7 @@ var _ = Describe("CertWatcher", func() { Eventually(func() bool { secondcert, _ := watcher.GetCertificate(nil) first := firstcert.PrivateKey.(*rsa.PrivateKey) - return first.Equal(secondcert.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) }).ShouldNot(BeTrue()) ctxCancel() @@ -151,6 +153,33 @@ var _ = Describe("CertWatcher", func() { Expect(called.Load()).To(BeNumerically(">=", 1)) }) + It("should reload currentCert after move out", func() { + doneCh := startWatcher() + called := atomic.Int64{} + watcher.RegisterCallback(func(crt tls.Certificate) { + called.Add(1) + Expect(crt.Certificate).ToNot(BeEmpty()) + }) + + firstcert, _ := watcher.GetCertificate(nil) + + Expect(os.Rename(certPath, certPath+".old")).To(Succeed()) + Expect(os.Rename(keyPath, keyPath+".old")).To(Succeed()) + + err := writeCerts(certPath, keyPath, "192.168.0.3") + Expect(err).ToNot(HaveOccurred()) + + Eventually(func() bool { + secondcert, _ := watcher.GetCertificate(nil) + first := firstcert.PrivateKey.(*rsa.PrivateKey) + return first.Equal(secondcert.PrivateKey) || bytes.Equal(firstcert.Certificate[0], secondcert.Certificate[0]) + }, "10s", "1s").ShouldNot(BeTrue()) + + ctxCancel() + Eventually(doneCh, "4s").Should(BeClosed()) + Expect(called.Load()).To(BeNumerically(">=", 1)) + }) + Context("prometheus metric read_certificate_total", func() { var readCertificateTotalBefore float64 var readCertificateErrorsBefore float64 @@ -165,8 +194,8 @@ var _ = Describe("CertWatcher", func() { Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+1.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+1.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) } return nil }, "4s").Should(Succeed()) @@ -180,8 +209,8 @@ var _ = Describe("CertWatcher", func() { Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+1.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+1.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+1.0, readCertificateTotalAfter) } readCertificateTotalBefore = readCertificateTotalAfter return nil @@ -192,15 +221,15 @@ var _ = Describe("CertWatcher", func() { // Note, we are checking two errors here, because os.Remove generates two fsnotify events: Chmod + Remove Eventually(func() error { readCertificateTotalAfter := testutil.ToFloat64(metrics.ReadCertificateTotal) - if readCertificateTotalAfter != readCertificateTotalBefore+2.0 { - return fmt.Errorf("metric read certificate total expected: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter) + if readCertificateTotalAfter < readCertificateTotalBefore+2.0 { + return fmt.Errorf("metric read certificate total expected at least: %v and got: %v", readCertificateTotalBefore+2.0, readCertificateTotalAfter) } return nil }, "4s").Should(Succeed()) Eventually(func() error { readCertificateErrorsAfter := testutil.ToFloat64(metrics.ReadCertificateErrors) - if readCertificateErrorsAfter != readCertificateErrorsBefore+2.0 { - return fmt.Errorf("metric read certificate errors expected: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter) + if readCertificateErrorsAfter < readCertificateErrorsBefore+2.0 { + return fmt.Errorf("metric read certificate errors expected at least: %v and got: %v", readCertificateErrorsBefore+2.0, readCertificateErrorsAfter) } return nil }, "4s").Should(Succeed()) diff --git a/pkg/certwatcher/example_test.go b/pkg/certwatcher/example_test.go index e322aeebfc..e85b2403cb 100644 --- a/pkg/certwatcher/example_test.go +++ b/pkg/certwatcher/example_test.go @@ -39,7 +39,7 @@ func Example() { panic(err) } - // Start goroutine with certwatcher running fsnotify against supplied certdir + // Start goroutine with certwatcher running against supplied cert go func() { if err := watcher.Start(ctx); err != nil { panic(err) diff --git a/pkg/certwatcher/metrics/metrics.go b/pkg/certwatcher/metrics/metrics.go index 05869eff03..f128abbcf0 100644 --- a/pkg/certwatcher/metrics/metrics.go +++ b/pkg/certwatcher/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" ) diff --git a/pkg/client/fake/client.go b/pkg/client/fake/client.go index 7366a18528..54f5b5d258 100644 --- a/pkg/client/fake/client.go +++ b/pkg/client/fake/client.go @@ -68,16 +68,21 @@ type versionedTracker struct { } type fakeClient struct { - tracker versionedTracker - scheme *runtime.Scheme + // trackerWriteLock must be acquired before writing to + // the tracker or performing reads that affect a following + // write. + trackerWriteLock sync.Mutex + tracker versionedTracker + + schemeWriteLock sync.Mutex + scheme *runtime.Scheme + restMapper meta.RESTMapper withStatusSubresource sets.Set[schema.GroupVersionKind] // indexes maps each GroupVersionKind (GVK) to the indexes registered for that GVK. // The inner map maps from index name to IndexerFunc. indexes map[schema.GroupVersionKind]map[string]client.IndexerFunc - - schemeWriteLock sync.Mutex } var _ client.WithWatch = &fakeClient{} @@ -467,6 +472,11 @@ func (t versionedTracker) updateObject(gvr schema.GroupVersionResource, obj runt switch { case allowsUnconditionalUpdate(gvk): accessor.SetResourceVersion(oldAccessor.GetResourceVersion()) + // This is needed because if the patch explicitly sets the RV to null, the client-go reaction we use + // to apply it and whose output we process here will have it unset. It is not clear why the Kubernetes + // apiserver accepts such a patch, but it does so we just copy that behavior. + // Kubernetes apiserver behavior can be checked like this: + // `kubectl patch configmap foo --patch '{"metadata":{"annotations":{"foo":"bar"},"resourceVersion":null}}' -v=9` case bytes. Contains(debug.Stack(), []byte("sigs.k8s.io/controller-runtime/pkg/client/fake.(*fakeClient).Patch")): // We apply patches using a client-go reaction that ends up calling the trackers Update. As we can't change @@ -508,7 +518,10 @@ func (c *fakeClient) Get(ctx context.Context, key client.ObjectKey, obj client.O return err } - if _, isUnstructured := obj.(runtime.Unstructured); isUnstructured { + _, isUnstructured := obj.(runtime.Unstructured) + _, isPartialObject := obj.(*metav1.PartialObjectMetadata) + + if isUnstructured || isPartialObject { gvk, err := apiutil.GVKForObject(obj, c.scheme) if err != nil { return err @@ -729,6 +742,8 @@ func (c *fakeClient) Create(ctx context.Context, obj client.Object, opts ...clie accessor.SetDeletionTimestamp(nil) } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() return c.tracker.Create(gvr, obj, accessor.GetNamespace()) } @@ -750,6 +765,8 @@ func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...clie } } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() // Check the ResourceVersion if that Precondition was specified. if delOptions.Preconditions != nil && delOptions.Preconditions.ResourceVersion != nil { name := accessor.GetName() @@ -772,7 +789,7 @@ func (c *fakeClient) Delete(ctx context.Context, obj client.Object, opts ...clie } } - return c.deleteObject(gvr, accessor) + return c.deleteObjectLocked(gvr, accessor) } func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error { @@ -790,6 +807,9 @@ func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts .. } } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() + gvr, _ := meta.UnsafeGuessKindToResource(gvk) o, err := c.tracker.List(gvr, gvk, dcOptions.Namespace) if err != nil { @@ -809,7 +829,7 @@ func (c *fakeClient) DeleteAllOf(ctx context.Context, obj client.Object, opts .. if err != nil { return err } - err = c.deleteObject(gvr, accessor) + err = c.deleteObjectLocked(gvr, accessor) if err != nil { return err } @@ -839,6 +859,9 @@ func (c *fakeClient) update(obj client.Object, isStatus bool, opts ...client.Upd if err != nil { return err } + + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() return c.tracker.update(gvr, obj, accessor.GetNamespace(), isStatus, false, *updateOptions.AsUpdateOptions()) } @@ -874,6 +897,8 @@ func (c *fakeClient) patch(obj client.Object, patch client.Patch, opts ...client return err } + c.trackerWriteLock.Lock() + defer c.trackerWriteLock.Unlock() oldObj, err := c.tracker.Get(gvr, accessor.GetNamespace(), accessor.GetName()) if err != nil { return err @@ -1082,7 +1107,7 @@ func (c *fakeClient) SubResource(subResource string) client.SubResourceClient { return &fakeSubResourceClient{client: c, subResource: subResource} } -func (c *fakeClient) deleteObject(gvr schema.GroupVersionResource, accessor metav1.Object) error { +func (c *fakeClient) deleteObjectLocked(gvr schema.GroupVersionResource, accessor metav1.Object) error { old, err := c.tracker.Get(gvr, accessor.GetNamespace(), accessor.GetName()) if err == nil { oldAccessor, err := meta.Accessor(old) @@ -1164,7 +1189,7 @@ func (sw *fakeSubResourceClient) Update(ctx context.Context, obj client.Object, switch sw.subResource { case subResourceScale: - if err := sw.client.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil { + if err := sw.client.Get(ctx, client.ObjectKeyFromObject(obj), obj.DeepCopyObject().(client.Object)); err != nil { return err } if updateOptions.SubResourceBody == nil { diff --git a/pkg/client/fake/client_test.go b/pkg/client/fake/client_test.go index e86a64eefc..2f5392fda7 100644 --- a/pkg/client/fake/client_test.go +++ b/pkg/client/fake/client_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "strconv" + "sync" "time" "github.com/google/go-cmp/cmp" @@ -339,6 +340,33 @@ var _ = Describe("Fake client", func() { Expect(apierrors.IsNotFound(err)).To(BeTrue()) }) + It("should be able to retrieve objects by PartialObjectMetadata", func() { + By("Creating a Resource") + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + } + err := cl.Create(context.Background(), secret) + Expect(err).ToNot(HaveOccurred()) + + By("Fetching the resource using a PartialObjectMeta") + partialObjMeta := &metav1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: "bar", + }, + } + partialObjMeta.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret")) + + err = cl.Get(context.Background(), client.ObjectKeyFromObject(partialObjMeta), partialObjMeta) + Expect(err).ToNot(HaveOccurred()) + + Expect(partialObjMeta.Kind).To(Equal("Secret")) + Expect(partialObjMeta.APIVersion).To(Equal("v1")) + }) + It("should support filtering by labels and their values", func() { By("Listing deployments with a particular label and value") list := &appsv1.DeploymentList{} @@ -552,7 +580,7 @@ var _ = Describe("Fake client", func() { Expect(obj.ObjectMeta.ResourceVersion).To(Equal("1000")) }) - It("should allow patch with non-set ResourceVersion for a resource that doesn't allow unconditional updates", func() { + It("should allow patch when the patch sets RV to 'null'", func() { schemeBuilder := &scheme.Builder{GroupVersion: schema.GroupVersion{Group: "test", Version: "v1"}} schemeBuilder.Register(&WithPointerMeta{}, &WithPointerMetaList{}) @@ -577,6 +605,7 @@ var _ = Describe("Fake client", func() { "foo": "bar", }, }} + Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(original))).To(Succeed()) patched := &WithPointerMeta{} @@ -2071,6 +2100,280 @@ var _ = Describe("Fake client", func() { Expect(apierrors.IsNotFound(err)).To(BeTrue()) }) + It("should allow concurrent patches to a configMap", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries) + + for i := range tries { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + Expect(cl.Patch(context.Background(), newObj, client.MergeFrom(obj))).To(Succeed()) + }() + } + wg.Wait() + + // While the order is not deterministic, there must be $tries distinct updates + // that each increment the resource version by one + Expect(cl.Get(context.Background(), client.ObjectKey{Name: "foo"}, obj)).To(Succeed()) + Expect(obj.ResourceVersion).To(Equal(strconv.Itoa(tries))) + }) + + It("should not allow concurrent patches to a configMap if the patch contains a ResourceVersion", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + wg := sync.WaitGroup{} + wg.Add(5) + + for i := range 5 { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.ResourceVersion = "1" // include an invalid RV to cause a conflict + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + Expect(apierrors.IsConflict(cl.Patch(context.Background(), newObj, client.MergeFrom(obj)))).To(BeTrue()) + }() + } + wg.Wait() + }) + + It("should allow concurrent updates to an object that allows unconditionalUpdate if the incoming request has no RV", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + obj := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + ResourceVersion: "0", + }, + } + cl := NewClientBuilder().WithScheme(scheme).WithObjects(obj).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries) + + for i := range tries { + go func() { + defer wg.Done() + defer GinkgoRecover() + + newObj := obj.DeepCopy() + newObj.Data = map[string]string{"foo": strconv.Itoa(i)} + newObj.ResourceVersion = "" + Expect(cl.Update(context.Background(), newObj)).To(Succeed()) + }() + } + wg.Wait() + + // While the order is not deterministic, there must be $tries distinct updates + // that each increment the resource version by one + Expect(cl.Get(context.Background(), client.ObjectKey{Name: "foo"}, obj)).To(Succeed()) + Expect(obj.ResourceVersion).To(Equal(strconv.Itoa(tries))) + }) + + It("If a create races with an update for an object that allows createOnUpdate, the update should always succeed", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries * 2) + + for i := range tries { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + go func() { + defer wg.Done() + defer GinkgoRecover() + + // this may or may not succeed depending on if we win the race. Either is acceptable, + // but if it fails, it must fail due to an AlreadyExists. + err := cl.Create(context.Background(), obj.DeepCopy()) + if err != nil { + Expect(apierrors.IsAlreadyExists(err)).To(BeTrue()) + } + }() + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This must always succeed, regardless of the outcome of the create. + Expect(cl.Update(context.Background(), obj.DeepCopy())).To(Succeed()) + }() + } + + wg.Wait() + }) + + It("If a delete races with an update for an object that allows createOnUpdate, the update should always succeed", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const tries = 50 + wg := sync.WaitGroup{} + wg.Add(tries * 2) + + for i := range tries { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + Expect(cl.Create(context.Background(), obj.DeepCopy())).To(Succeed()) + + go func() { + defer wg.Done() + defer GinkgoRecover() + + Expect(cl.Delete(context.Background(), obj.DeepCopy())).To(Succeed()) + }() + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This must always succeed, regardless of if the delete came before or + // after us. + Expect(cl.Update(context.Background(), obj.DeepCopy())).To(Succeed()) + }() + } + + wg.Wait() + }) + + It("If a DeleteAllOf races with a delete, the DeleteAllOf should always succeed", func() { + scheme := runtime.NewScheme() + Expect(corev1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const objects = 50 + wg := sync.WaitGroup{} + wg.Add(objects) + + for i := range objects { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + Expect(cl.Create(context.Background(), obj.DeepCopy())).To(Succeed()) + } + + for i := range objects { + obj := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This may or may not succeed depending on if the DeleteAllOf is faster, + // but if it fails, it should be a not found. + err := cl.Delete(context.Background(), obj) + if err != nil { + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + } + }() + } + Expect(cl.DeleteAllOf(context.Background(), &corev1.Service{})).To(Succeed()) + + wg.Wait() + }) + + It("If an update races with a scale update, only one of them succeeds", func() { + scheme := runtime.NewScheme() + Expect(appsv1.AddToScheme(scheme)).To(Succeed()) + + cl := NewClientBuilder().WithScheme(scheme).Build() + + const tries = 5000 + for i := range tries { + dep := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: strconv.Itoa(i), + }, + } + Expect(cl.Create(context.Background(), dep)).To(Succeed()) + + wg := sync.WaitGroup{} + wg.Add(2) + var updateSucceeded bool + var scaleSucceeded bool + + go func() { + defer wg.Done() + defer GinkgoRecover() + + dep := dep.DeepCopy() + dep.Annotations = map[string]string{"foo": "bar"} + + // This may or may not fail. If it does fail, it must be a conflict. + err := cl.Update(context.Background(), dep) + if err != nil { + Expect(apierrors.IsConflict(err)).To(BeTrue()) + } else { + updateSucceeded = true + } + }() + + go func() { + defer wg.Done() + defer GinkgoRecover() + + // This may or may not fail. If it does fail, it must be a conflict. + scale := &autoscalingv1.Scale{Spec: autoscalingv1.ScaleSpec{Replicas: 10}} + err := cl.SubResource("scale").Update(context.Background(), dep.DeepCopy(), client.WithSubResourceBody(scale)) + if err != nil { + Expect(apierrors.IsConflict(err)).To(BeTrue()) + } else { + scaleSucceeded = true + } + }() + + wg.Wait() + Expect(updateSucceeded).ToNot(Equal(scaleSucceeded)) + } + + }) + It("disallows scale subresources on unsupported built-in types", func() { scheme := runtime.NewScheme() Expect(corev1.AddToScheme(scheme)).To(Succeed()) @@ -2225,8 +2528,8 @@ func (t *WithPointerMetaList) DeepCopyObject() runtime.Object { } type WithPointerMeta struct { - *metav1.TypeMeta - *metav1.ObjectMeta + *metav1.TypeMeta `json:",inline"` + *metav1.ObjectMeta `json:"metadata,omitempty"` } func (t *WithPointerMeta) DeepCopy() *WithPointerMeta { diff --git a/pkg/leaderelection/leader_election.go b/pkg/leaderelection/leader_election.go index ee4fcf4cbe..5cc253917a 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/leaderelection/leader_election.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "os" + "time" "k8s.io/apimachinery/pkg/util/uuid" coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" @@ -49,6 +50,12 @@ type Options struct { // LeaderElectionID determines the name of the resource that leader election // will use for holding the leader lock. LeaderElectionID string + + // RenewDeadline is the renew deadline for this leader election client. + // Must be set to ensure the resource lock has an appropriate client timeout. + // Without that, a single slow response from the API server can result + // in losing leadership. + RenewDeadline time.Duration } // NewResourceLock creates a new resource lock for use in a leader election loop. @@ -88,6 +95,20 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op // Construct clients for leader election rest.AddUserAgent(config, "leader-election") + + if options.RenewDeadline != 0 { + return resourcelock.NewFromKubeconfig(options.LeaderElectionResourceLock, + options.LeaderElectionNamespace, + options.LeaderElectionID, + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorderProvider.GetEventRecorderFor(id), + }, + config, + options.RenewDeadline, + ) + } + corev1Client, err := corev1client.NewForConfig(config) if err != nil { return nil, err @@ -97,7 +118,6 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op if err != nil { return nil, err } - return resourcelock.New(options.LeaderElectionResourceLock, options.LeaderElectionNamespace, options.LeaderElectionID, @@ -106,7 +126,8 @@ func NewResourceLock(config *rest.Config, recorderProvider recorder.Provider, op resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: recorderProvider.GetEventRecorderFor(id), - }) + }, + ) } func getInClusterNamespace() (string, error) { diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 3166f4818f..92906fe6ca 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -389,6 +389,7 @@ func New(config *rest.Config, options Options) (Manager, error) { LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, + RenewDeadline: *options.RenewDeadline, }) if err != nil { return nil, err diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index c42d2f2ae7..6e5353e345 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -317,6 +317,28 @@ var _ = Describe("manger.Manager", func() { <-m2done }) + It("should default RenewDeadline for leader election config", func() { + var rl resourcelock.Interface + m1, err := New(cfg, Options{ + LeaderElection: true, + LeaderElectionNamespace: "default", + LeaderElectionID: "test-leader-election-id", + newResourceLock: func(config *rest.Config, recorderProvider recorder.Provider, options leaderelection.Options) (resourcelock.Interface, error) { + if options.RenewDeadline != 10*time.Second { + return nil, fmt.Errorf("expected RenewDeadline to be 10s, got %v", options.RenewDeadline) + } + var err error + rl, err = leaderelection.NewResourceLock(config, recorderProvider, options) + return rl, err + }, + HealthProbeBindAddress: "0", + Metrics: metricsserver.Options{BindAddress: "0"}, + PprofBindAddress: "0", + }) + Expect(err).ToNot(HaveOccurred()) + Expect(m1).ToNot(BeNil()) + }) + It("should default ID to controller-runtime if ID is not set", func() { var rl resourcelock.Interface m1, err := New(cfg, Options{