Skip to content

Add Kubelet stress test for pod cleanup when rejection due to VolumeAttachmentLimitExceeded #133357

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kubelet/kubelet_node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,7 +1131,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
kubelet.setCachedMachineInfo(&cadvisorapi.MachineInfo{})

// override test volumeManager
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes, 0, nil)
fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes, 0, nil, false)
kubelet.volumeManager = fakeVolumeManager

// Only test VolumesInUse setter
Expand Down
99 changes: 99 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,34 @@ func newTestPods(count int) []*v1.Pod {
return pods
}

func newTestPodsWithResources(count int) (pods []*v1.Pod, containerNames []string) {
pods = make([]*v1.Pod, count)
containerNames = make([]string, count)
for i := 0; i < count; i++ {
containerName := fmt.Sprintf("container%d", i)
containerNames[i] = containerName
pods[i] = &v1.Pod{
Spec: v1.PodSpec{
HostNetwork: true,
Containers: []v1.Container{{
Name: containerName,
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1m"),
v1.ResourceMemory: resource.MustParse("1Mi"),
},
},
}},
},
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(strconv.Itoa(10000 + i)),
Name: fmt.Sprintf("pod%d", i),
},
}
}
return pods, containerNames
}

func TestSyncLoopAbort(t *testing.T) {
ctx := context.Background()
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
Expand Down Expand Up @@ -747,6 +775,77 @@ func TestHandlePodCleanups(t *testing.T) {
fakeRuntime.AssertKilledPods([]string(nil))
}

func TestVolumeAttachLimitExceededCleanup(t *testing.T) {
const podCount = 500
tk := newTestKubelet(t, true /* controller-attach-detach enabled */)
defer tk.Cleanup()
kl := tk.kubelet

kl.nodeLister = testNodeLister{nodes: []*v1.Node{{
ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(1000, resource.DecimalSI),
v1.ResourceCPU: resource.MustParse("10"),
v1.ResourceMemory: resource.MustParse("20Gi"),
},
},
}}}

kl.workQueue = queue.NewBasicWorkQueue(clock.RealClock{})
kl.podWorkers = newPodWorkers(
kl, kl.recorder, kl.workQueue,
kl.resyncInterval, backOffPeriod,
kl.podCache, kl.allocationManager,
)

kl.volumeManager = kubeletvolume.NewFakeVolumeManager(nil, 0, nil, true /* volumeAttachLimitExceededError */)

pods, _ := newTestPodsWithResources(podCount)

kl.podManager.SetPods(pods)
kl.HandlePodSyncs(pods)

ctx := context.Background()

// all pods must reach a terminal, Failed state due to VolumeAttachmentLimitExceeded.
if err := wait.PollUntilContextTimeout(
ctx, 200*time.Millisecond, 30*time.Second, true,
func(ctx context.Context) (bool, error) {
for _, p := range pods {
st, ok := kl.statusManager.GetPodStatus(p.UID)
if !ok || st.Phase != v1.PodFailed && st.Reason != "VolumeAttachmentLimitExceeded" {
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("pods did not reach a terminal, Failed state: %v", err)
}

// validate that SyncTerminatedPod completed successfully for each pod.
if err := wait.PollUntilContextTimeout(
ctx, 200*time.Millisecond, 30*time.Second, true,
func(ctx context.Context) (bool, error) {
for _, p := range pods {
if !kl.podWorkers.ShouldPodBeFinished(p.UID) {
return false, nil
}
}
return true, nil
}); err != nil {
t.Fatalf("pod workers did not finish cleanup: %v", err)
}

// validate container-level resource allocations are released.
for _, p := range pods {
cn := p.Spec.Containers[0].Name
if _, still := kl.allocationManager.GetContainerResourceAllocation(p.UID, cn); still {
t.Fatalf("allocation for pod %q container %q not released", p.Name, cn)
}
}
}

func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
Expand Down
10 changes: 5 additions & 5 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func TestManager(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.GracefulNodeShutdown, true)

fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager := NewManager(&Config{
Logger: logger,
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestFeatureEnabled(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.GracefulNodeShutdown, tc.featureGateEnabled)

fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}

manager := NewManager(&Config{
Expand Down Expand Up @@ -496,7 +496,7 @@ func TestRestart(t *testing.T) {
}

fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
manager := NewManager(&Config{
Logger: logger,
Expand Down Expand Up @@ -534,7 +534,7 @@ func TestRestart(t *testing.T) {
func Test_managerImpl_processShutdownEvent(t *testing.T) {
var (
fakeRecorder = &record.FakeRecorder{}
fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false)
syncNodeStatus = func() {}
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
fakeclock = testingclock.NewFakeClock(time.Now())
Expand Down Expand Up @@ -651,7 +651,7 @@ func Test_processShutdownEvent_VolumeUnmountTimeout(t *testing.T) {
[]v1.UniqueVolumeName{},
3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior
// for volume unmount operations that take longer than the allowed grace period.
fmt.Errorf("unmount timeout"),
fmt.Errorf("unmount timeout"), false,
)
logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true)))
m := &managerImpl{
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_windows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestFeatureEnabled(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.WindowsGracefulNodeShutdown, tc.featureGateEnabled)

fakeRecorder := &record.FakeRecorder{}
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false)
nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}

manager := NewManager(&Config{
Expand All @@ -104,7 +104,7 @@ func TestFeatureEnabled(t *testing.T) {
func Test_managerImpl_ProcessShutdownEvent(t *testing.T) {
var (
fakeRecorder = &record.FakeRecorder{}
fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil)
fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false)
syncNodeStatus = func() {}
nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""}
fakeclock = testingclock.NewFakeClock(time.Now())
Expand Down Expand Up @@ -201,7 +201,7 @@ func Test_managerImpl_ProcessShutdownEvent(t *testing.T) {
[]v1.UniqueVolumeName{},
3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior
// for volume unmount operations that take longer than the allowed grace period.
fmt.Errorf("unmount timeout"),
fmt.Errorf("unmount timeout"), false,
),
shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Expand Down
23 changes: 14 additions & 9 deletions pkg/kubelet/volumemanager/volume_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,27 @@ import (

// FakeVolumeManager is a test implementation that just tracks calls
type FakeVolumeManager struct {
volumes map[v1.UniqueVolumeName]bool
reportedInUse map[v1.UniqueVolumeName]bool
unmountDelay time.Duration
unmountError error
volumes map[v1.UniqueVolumeName]bool
reportedInUse map[v1.UniqueVolumeName]bool
unmountDelay time.Duration
unmountError error
volumeAttachLimitExceeded bool
}

var _ VolumeManager = &FakeVolumeManager{}

// NewFakeVolumeManager creates a new VolumeManager test instance
func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName, unmountDelay time.Duration, unmountError error) *FakeVolumeManager {
func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName, unmountDelay time.Duration, unmountError error, volumeAttachLimitExceeded bool) *FakeVolumeManager {
volumes := map[v1.UniqueVolumeName]bool{}
for _, v := range initialVolumes {
volumes[v] = true
}
return &FakeVolumeManager{
volumes: volumes,
reportedInUse: map[v1.UniqueVolumeName]bool{},
unmountDelay: unmountDelay,
unmountError: unmountError,
volumes: volumes,
reportedInUse: map[v1.UniqueVolumeName]bool{},
unmountDelay: unmountDelay,
unmountError: unmountError,
volumeAttachLimitExceeded: volumeAttachLimitExceeded,
}
}

Expand All @@ -56,6 +58,9 @@ func (f *FakeVolumeManager) Run(ctx context.Context, sourcesReady config.Sources

// WaitForAttachAndMount is not implemented
func (f *FakeVolumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error {
if f.volumeAttachLimitExceeded {
return &VolumeAttachLimitExceededError{}
}
return nil
}

Expand Down