Skip to content

[client-go #1415] Embed proper interface in TransformingStore to ensure DeltaFIFO and RealFIFO are implementing it #133263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion staging/src/k8s.io/client-go/tools/cache/delta_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
}

var (
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = Queue(&DeltaFIFO{}) // DeltaFIFO is a Queue
_ = TransformingStore(&DeltaFIFO{}) // DeltaFIFO implements TransformingStore to allow memory optimizations
)

var (
Expand Down
22 changes: 11 additions & 11 deletions staging/src/k8s.io/client-go/tools/cache/delta_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// from the most recent Delta.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) List() []interface{} {
func (f *DeltaFIFO) list() []interface{} {
f.lock.RLock()
defer f.lock.RUnlock()
return f.listLocked()
Expand All @@ -46,7 +46,7 @@ func (f *DeltaFIFO) listLocked() []interface{} {
// ListKeys returns a list of all the keys of the objects currently
// in the FIFO.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) ListKeys() []string {
func (f *DeltaFIFO) listKeys() []string {
f.lock.RLock()
defer f.lock.RUnlock()
list := make([]string, 0, len(f.queue))
Expand All @@ -60,19 +60,19 @@ func (f *DeltaFIFO) ListKeys() []string {
// or sets exists=false.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) get(obj interface{}) (item interface{}, exists bool, err error) {
key, err := f.KeyOf(obj)
if err != nil {
return nil, false, KeyError{obj, err}
}
return f.GetByKey(key)
return f.getByKey(key)
}

// GetByKey returns the complete list of deltas for the requested item,
// setting exists=false if that list is empty.
// You should treat the items returned inside the deltas as immutable.
// This function was moved here because it is not consistent with normal list semantics, but is used in unit testing.
func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
func (f *DeltaFIFO) getByKey(key string) (item interface{}, exists bool, err error) {
f.lock.RLock()
defer f.lock.RUnlock()
d, exists := f.items[key]
Expand Down Expand Up @@ -320,10 +320,10 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
f.Update(mkFifoObj("foo", 12))
f.Delete(mkFifoObj("foo", 15))

if e, a := []interface{}{mkFifoObj("foo", 15)}, f.List(); !reflect.DeepEqual(e, a) {
if e, a := []interface{}{mkFifoObj("foo", 15)}, f.list(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}
if e, a := []string{"foo"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}

Expand All @@ -349,7 +349,7 @@ func TestDeltaFIFO_addUpdate(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
Expand Down Expand Up @@ -397,7 +397,7 @@ func TestDeltaFIFO_transformer(t *testing.T) {
must(f.Replace([]interface{}{}, ""))

// Should be empty
if e, a := []string{"foo", "bar"}, f.ListKeys(); !reflect.DeepEqual(e, a) {
if e, a := []string{"foo", "bar"}, f.listKeys(); !reflect.DeepEqual(e, a) {
t.Errorf("Expected %+v, got %+v", e, a)
}

Expand Down Expand Up @@ -507,7 +507,7 @@ func TestDeltaFIFO_addReplace(t *testing.T) {
t.Errorf("Got second value %v", unexpected.val)
case <-time.After(50 * time.Millisecond):
}
_, exists, _ := f.Get(mkFifoObj("foo", ""))
_, exists, _ := f.get(mkFifoObj("foo", ""))
if exists {
t.Errorf("item did not get removed")
}
Expand Down Expand Up @@ -991,7 +991,7 @@ func BenchmarkDeltaFIFOListKeys(b *testing.B) {
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = f.ListKeys()
_ = f.listKeys()
}
})
b.StopTimer()
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/client-go/tools/cache/reflector.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type ReflectorStore interface {
// TransformingStore is an optional interface that can be implemented by the provided store.
// If implemented on the provided store reflector will use the same transformer in its internal stores.
type TransformingStore interface {
Store
ReflectorStore
Transformer() TransformFunc
}

Expand Down
236 changes: 136 additions & 100 deletions staging/src/k8s.io/client-go/tools/cache/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"context"
"errors"
"fmt"
"maps"
"math/rand"
"net/http"
"reflect"
goruntime "runtime"
"slices"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1962,7 +1964,7 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
ReflectorStore: s,
beforeReplace: func(list []interface{}, rv string) {
// interested in the Replace call that happens after the Error event
if rv == lastExpectedRV {
Expand Down Expand Up @@ -2057,131 +2059,165 @@ func TestReflectorReplacesStoreOnUnsafeDelete(t *testing.T) {
}

func TestReflectorRespectStoreTransformer(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
for name, test := range map[string]struct {
storeBuilder func(counter *atomic.Int32) ReflectorStore
items func(rs ReflectorStore) []interface{}
}{
"real-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewRealFIFO(MetaNamespaceKeyFunc, NewStore(MetaNamespaceKeyFunc), func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
})
},
}
}

preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")

lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
items: func(rs ReflectorStore) []interface{} {
store := rs.(*RealFIFO)
objects := make(map[string]interface{})
for _, item := range store.getItems() {
key, _ := store.keyFunc(item.Object)
if item.Type == Deleted {
delete(objects, key)
} else {
objects[key] = item.Object
}
}
return slices.Collect(maps.Values(objects))
},
}},
{Type: watch.Added, Object: pod3},
}
},
"delta-fifo": {
storeBuilder: func(counter *atomic.Int32) ReflectorStore {
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KeyFunction: MetaNamespaceKeyFunc,
Transformer: func(i interface{}) (interface{}, error) {
counter.Add(1)
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
})
},
items: func(rs ReflectorStore) []interface{} {
return rs.(*DeltaFIFO).list()
},
},
} {
t.Run(name, func(t *testing.T) {
mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: id, ResourceVersion: rv},
Spec: v1.PodSpec{
Hostname: "test",
},
}
}

s := NewFIFO(MetaNamespaceKeyFunc)
var replaceInvoked atomic.Int32
store := &fakeStore{
Store: s,
beforeReplace: func(list []interface{}, rv string) {
replaceInvoked.Add(1)
// Only two pods are present at the point when Replace is called.
if len(list) != 2 {
t.Errorf("unexpected nb of objects: expected 2 received %d", len(list))
preExisting1 := mkPod("foo-1", "1")
preExisting2 := mkPod("foo-2", "2")
pod3 := mkPod("foo-3", "3")

lastExpectedRV := "3"
events := []watch.Event{
{Type: watch.Added, Object: preExisting1},
{Type: watch.Added, Object: preExisting2},
{Type: watch.Bookmark, Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: lastExpectedRV,
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
}},
{Type: watch.Added, Object: pod3},
}
for _, obj := range list {
cast := obj.(*v1.Pod)
if cast.Spec.Hostname != "transformed" {
t.Error("Object was not transformed prior to replacement")
}

var transformerInvoked atomic.Int32
s := test.storeBuilder(&transformerInvoked)

var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}
},
afterReplace: func(rv string, err error) {},
transformer: func(i interface{}) (interface{}, error) {
cast := i.(*v1.Pod)
cast.Spec.Hostname = "transformed"
return cast, nil
},
}

var once sync.Once
lw := &ListWatch{
WatchFunc: func(metav1.ListOptions) (watch.Interface, error) {
fw := watch.NewFake()
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, s, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
once.Do(func() {
for _, e := range events {
fw.Action(e.Type, e.Object)
}
})
defer close(doneCh)
r.RunWithContext(ctx)
}()
return fw, nil
},
// ListFunc should never be used in WatchList mode
ListFunc: func(metav1.ListOptions) (runtime.Object, error) {
return nil, errors.New("list call not expected in WatchList mode")
},
}

clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
r := NewReflector(lw, &v1.Pod{}, store, 0)
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.RunWithContext(ctx)
}()
// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}

// wait for the RV to sync to the version returned by the final list
err := wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (done bool, err error) {
if rv := r.LastSyncResourceVersion(); rv == lastExpectedRV {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("reflector never caught up with expected revision: %q, err: %v", lastExpectedRV, err)
}
if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}

if want, got := lastExpectedRV, r.LastSyncResourceVersion(); want != got {
t.Errorf("expected LastSyncResourceVersion to be %q, but got: %q", want, got)
}
if want, got := 1, int(replaceInvoked.Load()); want != got {
t.Errorf("expected replace to be invoked %d times, but got: %d", want, got)
}
informerItems := test.items(s)
if want, got := 3, len(informerItems); want != got {
t.Errorf("expected informer to contain %d objects, but got: %d", want, got)
}
for _, item := range informerItems {
cast := item.(*v1.Pod)
if cast.Spec.Hostname != "transformed" {
t.Error("Object was not transformed prior to replacement")
}
}

cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
// Transformer should have been invoked twice for the initial sync in the informer on the temporary store,
// then twice on replace, then once on the following update.
if want, got := 5, int(transformerInvoked.Load()); want != got {
t.Errorf("expected transformer to be invoked %d times, but got: %d", want, got)
}

cancel()
select {
case <-doneCh:
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("timed out waiting for Run to return")
}
})
}
}

type fakeStore struct {
Store
ReflectorStore
beforeReplace func(list []interface{}, s string)
afterReplace func(rv string, err error)
transformer TransformFunc
}

func (f *fakeStore) Replace(list []interface{}, rv string) error {
f.beforeReplace(list, rv)
err := f.Store.Replace(list, rv)
err := f.ReflectorStore.Replace(list, rv)
f.afterReplace(rv, err)
return err
}

func (f *fakeStore) Transformer() TransformFunc {
return f.transformer
}

func BenchmarkExtractList(b *testing.B) {
_, _, podList := getPodListItems(0, fakeItemsNum)
_, _, configMapList := getConfigmapListItems(0, fakeItemsNum)
Expand Down
Loading