diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go index 9d9e238cccedd..a0d7a834aa0da 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo.go @@ -270,7 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { } var ( - _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue + _ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue + _ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations ) var ( diff --git a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go index 9c67012cc1c57..24f0ec87d04bb 100644 --- a/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go @@ -28,7 +28,7 @@ import ( // from the most recent Delta. // You should treat the items returned inside the deltas as immutable. // This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. -func (f *DeltaFIFO) List() []interface{} { +func (f *DeltaFIFO) list() []interface{} { f.lock.RLock() defer f.lock.RUnlock() return f.listLocked() @@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} { // ListKeys returns a list of all the keys of the objects currently // in the FIFO. // This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. -func (f *DeltaFIFO) ListKeys() []string { +func (f *DeltaFIFO) listKeys() []string { f.lock.RLock() defer f.lock.RUnlock() list := make([]string, 0, len(f.queue)) @@ -60,19 +60,19 @@ func (f *DeltaFIFO) ListKeys() []string { // or sets exists=false. // You should treat the items returned inside the deltas as immutable. // This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. -func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { +func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) { key, err := f.KeyOf(obj) if err != nil { return nil, false, KeyError{obj, err} } - return f.GetByKey(key) + return f.getByKey(key) } // GetByKey returns the complete list of deltas for the requested item, // setting exists=false if that list is empty. // You should treat the items returned inside the deltas as immutable. // This function was moved here because it is not consistent with normal list semantics, but is used in unit testing. -func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) { +func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) { f.lock.RLock() defer f.lock.RUnlock() d, exists := f.items[key] @@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { f.Update(mkFifoObj("foo", 12)) f.Delete(mkFifoObj("foo", 15)) - if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) { + if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) { t.Errorf("Expected %+v, got %+v", e, a) } - if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) { + if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) { t.Errorf("Expected %+v, got %+v", e, a) } @@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) { t.Errorf("Got second value %v", unexpected.val) case <-time.After(50 * time.Millisecond): } - _, exists, _ := f.Get(mkFifoObj("foo", "")) + _, exists, _ := f.get(mkFifoObj("foo", "")) if exists { t.Errorf("item did not get removed") } @@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) { must(f.Replace([]interface{}{}, "")) // Should be empty - if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) { + if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) { t.Errorf("Expected %+v, got %+v", e, a) } @@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) { t.Errorf("Got second value %v", unexpected.val) case <-time.After(50 * time.Millisecond): } - _, exists, _ := f.Get(mkFifoObj("foo", "")) + _, exists, _ := f.get(mkFifoObj("foo", "")) if exists { t.Errorf("item did not get removed") } @@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) { b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _ = f.ListKeys() + _ = f.listKeys() } }) b.StopTimer() diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index ee9be77278ab4..c68f0d87d249f 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -80,7 +80,7 @@ type ReflectorStore interface { // TransformingStore is an optional interface that can be implemented by the provided store. // If implemented on the provided store reflector will use the same transformer in its internal stores. type TransformingStore interface { - Store + ReflectorStore Transformer() TransformFunc } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go index 8c20ae0b02a4e..2c1965dd1a62a 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_test.go @@ -20,10 +20,12 @@ import ( "context" "errors" "fmt" + "maps" "math/rand" "net/http" "reflect" goruntime "runtime" + "slices" "strconv" "sync" "sync/atomic" @@ -1962,7 +1964,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) { s := NewFIFO(MetaNamespaceKeyFunc) var replaceInvoked atomic.Int32 store := &fakeStore{ - Store: s, + ReflectorStore: s, beforeReplace: func(list []interface{}, rv string) { // interested in the Replace call that happens after the Error event if rv == lastExpectedRV { @@ -2057,131 +2059,165 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) { } func TestReflectorRespectStoreTransformer(t *testing.T) { - mkPod := func(id string, rv string) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}, - Spec: v1.PodSpec{ - Hostname: "test", + for name, test := range map[string]struct { + storeBuilder func(counter *atomic.Int32) ReflectorStore + items func(rs ReflectorStore) []interface{} + }{ + "real-fifo": { + storeBuilder: func(counter *atomic.Int32) ReflectorStore { + return NewRealFIFO(MetaNamespaceKeyFunc, NewStore(MetaNamespaceKeyFunc), func(i interface{}) (interface{}, error) { + counter.Add(1) + cast := i.(*v1.Pod) + cast.Spec.Hostname = "transformed" + return cast, nil + }) }, - } - } - - preExisting1 := mkPod("foo-1", "1") - preExisting2 := mkPod("foo-2", "2") - pod3 := mkPod("foo-3", "3") - - lastExpectedRV := "3" - events := []watch.Event{ - {Type: watch.Added, Object: preExisting1}, - {Type: watch.Added, Object: preExisting2}, - {Type: watch.Bookmark, Object: &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: lastExpectedRV, - Annotations: map[string]string{ - metav1.InitialEventsAnnotationKey: "true", - }, + items: func(rs ReflectorStore) []interface{} { + store := rs.(*RealFIFO) + objects := make(map[string]interface{}) + for _, item := range store.getItems() { + key, _ := store.keyFunc(item.Object) + if item.Type == Deleted { + delete(objects, key) + } else { + objects[key] = item.Object + } + } + return slices.Collect(maps.Values(objects)) }, - }}, - {Type: watch.Added, Object: pod3}, - } + }, + "delta-fifo": { + storeBuilder: func(counter *atomic.Int32) ReflectorStore { + return NewDeltaFIFOWithOptions(DeltaFIFOOptions{ + KeyFunction: MetaNamespaceKeyFunc, + Transformer: func(i interface{}) (interface{}, error) { + counter.Add(1) + cast := i.(*v1.Pod) + cast.Spec.Hostname = "transformed" + return cast, nil + }, + }) + }, + items: func(rs ReflectorStore) []interface{} { + return rs.(*DeltaFIFO).list() + }, + }, + } { + t.Run(name, func(t *testing.T) { + mkPod := func(id string, rv string) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv}, + Spec: v1.PodSpec{ + Hostname: "test", + }, + } + } - s := NewFIFO(MetaNamespaceKeyFunc) - var replaceInvoked atomic.Int32 - store := &fakeStore{ - Store: s, - beforeReplace: func(list []interface{}, rv string) { - replaceInvoked.Add(1) - // Only two pods are present at the point when Replace is called. - if len(list) != 2 { - t.Errorf("unexpected nb of objects: expected 2 received %d", len(list)) + preExisting1 := mkPod("foo-1", "1") + preExisting2 := mkPod("foo-2", "2") + pod3 := mkPod("foo-3", "3") + + lastExpectedRV := "3" + events := []watch.Event{ + {Type: watch.Added, Object: preExisting1}, + {Type: watch.Added, Object: preExisting2}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: lastExpectedRV, + Annotations: map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + }, + }, + }}, + {Type: watch.Added, Object: pod3}, } - for _, obj := range list { - cast := obj.(*v1.Pod) - if cast.Spec.Hostname != "transformed" { - t.Error("Object was not transformed prior to replacement") - } + + var transformerInvoked atomic.Int32 + s := test.storeBuilder(&transformerInvoked) + + var once sync.Once + lw := &ListWatch{ + WatchFunc: func(metav1.ListOptions) (watch.Interface, error) { + fw := watch.NewFake() + go func() { + once.Do(func() { + for _, e := range events { + fw.Action(e.Type, e.Object) + } + }) + }() + return fw, nil + }, + // ListFunc should never be used in WatchList mode + ListFunc: func(metav1.ListOptions) (runtime.Object, error) { + return nil, errors.New("list call not expected in WatchList mode") + }, } - }, - afterReplace: func(rv string, err error) {}, - transformer: func(i interface{}) (interface{}, error) { - cast := i.(*v1.Pod) - cast.Spec.Hostname = "transformed" - return cast, nil - }, - } - var once sync.Once - lw := &ListWatch{ - WatchFunc: func(metav1.ListOptions) (watch.Interface, error) { - fw := watch.NewFake() + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + r := NewReflector(lw, &v1.Pod{}, s, 0) + ctx, cancel := context.WithCancel(context.Background()) + doneCh := make(chan struct{}) go func() { - once.Do(func() { - for _, e := range events { - fw.Action(e.Type, e.Object) - } - }) + defer close(doneCh) + r.RunWithContext(ctx) }() - return fw, nil - }, - // ListFunc should never be used in WatchList mode - ListFunc: func(metav1.ListOptions) (runtime.Object, error) { - return nil, errors.New("list call not expected in WatchList mode") - }, - } - clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) - r := NewReflector(lw, &v1.Pod{}, store, 0) - ctx, cancel := context.WithCancel(context.Background()) - doneCh := make(chan struct{}) - go func() { - defer close(doneCh) - r.RunWithContext(ctx) - }() + // wait for the RV to sync to the version returned by the final list + err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) { + if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err) + } - // wait for the RV to sync to the version returned by the final list - err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) { - if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV { - return true, nil - } - return false, nil - }) - if err != nil { - t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err) - } + if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got { + t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got) + } - if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got { - t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got) - } - if want, got := 1, int(replaceInvoked.Load()); want != got { - t.Errorf("expected replace to be invoked %d times, but got: %d", want, got) - } + informerItems := test.items(s) + if want, got := 3, len(informerItems); want != got { + t.Errorf("expected informer to contain %d objects, but got: %d", want, got) + } + for _, item := range informerItems { + cast := item.(*v1.Pod) + if cast.Spec.Hostname != "transformed" { + t.Error("Object was not transformed prior to replacement") + } + } - cancel() - select { - case <-doneCh: - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("timed out waiting for Run to return") + // Transformer should have been invoked twice for the initial sync in the informer on the temporary store, + // then twice on replace, then once on the following update. + if want, got := 5, int(transformerInvoked.Load()); want != got { + t.Errorf("expected transformer to be invoked %d times, but got: %d", want, got) + } + + cancel() + select { + case <-doneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("timed out waiting for Run to return") + } + }) } } type fakeStore struct { - Store + ReflectorStore beforeReplace func(list []interface{}, s string) afterReplace func(rv string, err error) - transformer TransformFunc } func (f *fakeStore) Replace(list []interface{}, rv string) error { f.beforeReplace(list, rv) - err := f.Store.Replace(list, rv) + err := f.ReflectorStore.Replace(list, rv) f.afterReplace(rv, err) return err } -func (f *fakeStore) Transformer() TransformFunc { - return f.transformer -} - func BenchmarkExtractList(b *testing.B) { _, _, podList := getPodListItems(0, fakeItemsNum) _, _, configMapList := getConfigmapListItems(0, fakeItemsNum) diff --git a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go index ef322bea85078..b907410dc05e0 100644 --- a/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go +++ b/staging/src/k8s.io/client-go/tools/cache/the_real_fifo.go @@ -61,7 +61,8 @@ type RealFIFO struct { } var ( - _ = Queue(&RealFIFO{}) // RealFIFO is a Queue + _ = Queue(&RealFIFO{}) // RealFIFO is a Queue + _ = TransformingStore(&RealFIFO{}) // RealFIFO implements TransformingStore to allow memory optimizations ) // Close the queue.