diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index 86f03304701cd..0e7a764216556 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -1131,7 +1131,7 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) { kubelet.setCachedMachineInfo(&cadvisorapi.MachineInfo{}) // override test volumeManager - fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes, 0, nil) + fakeVolumeManager := kubeletvolume.NewFakeVolumeManager(tc.existingVolumes, 0, nil, false) kubelet.volumeManager = fakeVolumeManager // Only test VolumesInUse setter diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7746458001108..d0a3342f150b6 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -474,6 +474,34 @@ func newTestPods(count int) []*v1.Pod { return pods } +func newTestPodsWithResources(count int) (pods []*v1.Pod, containerNames []string) { + pods = make([]*v1.Pod, count) + containerNames = make([]string, count) + for i := 0; i < count; i++ { + containerName := fmt.Sprintf("container%d", i) + containerNames[i] = containerName + pods[i] = &v1.Pod{ + Spec: v1.PodSpec{ + HostNetwork: true, + Containers: []v1.Container{{ + Name: containerName, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("1m"), + v1.ResourceMemory: resource.MustParse("1Mi"), + }, + }, + }}, + }, + ObjectMeta: metav1.ObjectMeta{ + UID: types.UID(strconv.Itoa(10000 + i)), + Name: fmt.Sprintf("pod%d", i), + }, + } + } + return pods, containerNames +} + func TestSyncLoopAbort(t *testing.T) { ctx := context.Background() testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */) @@ -747,6 +775,77 @@ func TestHandlePodCleanups(t *testing.T) { fakeRuntime.AssertKilledPods([]string(nil)) } +func TestVolumeAttachLimitExceededCleanup(t *testing.T) { + const podCount = 500 + tk := newTestKubelet(t, true /* controller-attach-detach enabled */) + defer tk.Cleanup() + kl := tk.kubelet + + kl.nodeLister = testNodeLister{nodes: []*v1.Node{{ + ObjectMeta: metav1.ObjectMeta{Name: string(kl.nodeName)}, + Status: v1.NodeStatus{ + Allocatable: v1.ResourceList{ + v1.ResourcePods: *resource.NewQuantity(1000, resource.DecimalSI), + v1.ResourceCPU: resource.MustParse("10"), + v1.ResourceMemory: resource.MustParse("20Gi"), + }, + }, + }}} + + kl.workQueue = queue.NewBasicWorkQueue(clock.RealClock{}) + kl.podWorkers = newPodWorkers( + kl, kl.recorder, kl.workQueue, + kl.resyncInterval, backOffPeriod, + kl.podCache, kl.allocationManager, + ) + + kl.volumeManager = kubeletvolume.NewFakeVolumeManager(nil, 0, nil, true /* volumeAttachLimitExceededError */) + + pods, _ := newTestPodsWithResources(podCount) + + kl.podManager.SetPods(pods) + kl.HandlePodSyncs(pods) + + ctx := context.Background() + + // all pods must reach a terminal, Failed state due to VolumeAttachmentLimitExceeded. + if err := wait.PollUntilContextTimeout( + ctx, 200*time.Millisecond, 30*time.Second, true, + func(ctx context.Context) (bool, error) { + for _, p := range pods { + st, ok := kl.statusManager.GetPodStatus(p.UID) + if !ok || st.Phase != v1.PodFailed && st.Reason != "VolumeAttachmentLimitExceeded" { + return false, nil + } + } + return true, nil + }); err != nil { + t.Fatalf("pods did not reach a terminal, Failed state: %v", err) + } + + // validate that SyncTerminatedPod completed successfully for each pod. + if err := wait.PollUntilContextTimeout( + ctx, 200*time.Millisecond, 30*time.Second, true, + func(ctx context.Context) (bool, error) { + for _, p := range pods { + if !kl.podWorkers.ShouldPodBeFinished(p.UID) { + return false, nil + } + } + return true, nil + }); err != nil { + t.Fatalf("pod workers did not finish cleanup: %v", err) + } + + // validate container-level resource allocations are released. + for _, p := range pods { + cn := p.Spec.Containers[0].Name + if _, still := kl.allocationManager.GetContainerResourceAllocation(p.UID, cn); still { + t.Fatalf("allocation for pod %q container %q not released", p.Name, cn) + } + } +} + func TestHandlePodRemovesWhenSourcesAreReady(t *testing.T) { if testing.Short() { t.Skip("skipping test in short mode.") diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go index 85ef3aeb9f454..197d57931ddd1 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux_test.go @@ -335,7 +335,7 @@ func TestManager(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.GracefulNodeShutdown, true) fakeRecorder := &record.FakeRecorder{} - fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) + fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager := NewManager(&Config{ Logger: logger, @@ -439,7 +439,7 @@ func TestFeatureEnabled(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.GracefulNodeShutdown, tc.featureGateEnabled) fakeRecorder := &record.FakeRecorder{} - fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) + fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager := NewManager(&Config{ @@ -496,7 +496,7 @@ func TestRestart(t *testing.T) { } fakeRecorder := &record.FakeRecorder{} - fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) + fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager := NewManager(&Config{ Logger: logger, @@ -534,7 +534,7 @@ func TestRestart(t *testing.T) { func Test_managerImpl_processShutdownEvent(t *testing.T) { var ( fakeRecorder = &record.FakeRecorder{} - fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) + fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false) syncNodeStatus = func() {} nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} fakeclock = testingclock.NewFakeClock(time.Now()) @@ -651,7 +651,7 @@ func Test_processShutdownEvent_VolumeUnmountTimeout(t *testing.T) { []v1.UniqueVolumeName{}, 3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior // for volume unmount operations that take longer than the allowed grace period. - fmt.Errorf("unmount timeout"), + fmt.Errorf("unmount timeout"), false, ) logger := ktesting.NewLogger(t, ktesting.NewConfig(ktesting.BufferLogs(true))) m := &managerImpl{ diff --git a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_windows_test.go b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_windows_test.go index 02946a04c70fc..f0598a7025e4e 100644 --- a/pkg/kubelet/nodeshutdown/nodeshutdown_manager_windows_test.go +++ b/pkg/kubelet/nodeshutdown/nodeshutdown_manager_windows_test.go @@ -81,7 +81,7 @@ func TestFeatureEnabled(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.WindowsGracefulNodeShutdown, tc.featureGateEnabled) fakeRecorder := &record.FakeRecorder{} - fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) + fakeVolumeManager := volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false) nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} manager := NewManager(&Config{ @@ -104,7 +104,7 @@ func TestFeatureEnabled(t *testing.T) { func Test_managerImpl_ProcessShutdownEvent(t *testing.T) { var ( fakeRecorder = &record.FakeRecorder{} - fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil) + fakeVolumeManager = volumemanager.NewFakeVolumeManager([]v1.UniqueVolumeName{}, 0, nil, false) syncNodeStatus = func() {} nodeRef = &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} fakeclock = testingclock.NewFakeClock(time.Now()) @@ -201,7 +201,7 @@ func Test_managerImpl_ProcessShutdownEvent(t *testing.T) { []v1.UniqueVolumeName{}, 3*time.Second, // This value is intentionally longer than the shutdownGracePeriodSeconds (2s) to test the behavior // for volume unmount operations that take longer than the allowed grace period. - fmt.Errorf("unmount timeout"), + fmt.Errorf("unmount timeout"), false, ), shutdownGracePeriodByPodPriority: []kubeletconfig.ShutdownGracePeriodByPodPriority{ { diff --git a/pkg/kubelet/volumemanager/volume_manager_fake.go b/pkg/kubelet/volumemanager/volume_manager_fake.go index 02c424c380154..c18b7367302f2 100644 --- a/pkg/kubelet/volumemanager/volume_manager_fake.go +++ b/pkg/kubelet/volumemanager/volume_manager_fake.go @@ -28,25 +28,27 @@ import ( // FakeVolumeManager is a test implementation that just tracks calls type FakeVolumeManager struct { - volumes map[v1.UniqueVolumeName]bool - reportedInUse map[v1.UniqueVolumeName]bool - unmountDelay time.Duration - unmountError error + volumes map[v1.UniqueVolumeName]bool + reportedInUse map[v1.UniqueVolumeName]bool + unmountDelay time.Duration + unmountError error + volumeAttachLimitExceeded bool } var _ VolumeManager = &FakeVolumeManager{} // NewFakeVolumeManager creates a new VolumeManager test instance -func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName, unmountDelay time.Duration, unmountError error) *FakeVolumeManager { +func NewFakeVolumeManager(initialVolumes []v1.UniqueVolumeName, unmountDelay time.Duration, unmountError error, volumeAttachLimitExceeded bool) *FakeVolumeManager { volumes := map[v1.UniqueVolumeName]bool{} for _, v := range initialVolumes { volumes[v] = true } return &FakeVolumeManager{ - volumes: volumes, - reportedInUse: map[v1.UniqueVolumeName]bool{}, - unmountDelay: unmountDelay, - unmountError: unmountError, + volumes: volumes, + reportedInUse: map[v1.UniqueVolumeName]bool{}, + unmountDelay: unmountDelay, + unmountError: unmountError, + volumeAttachLimitExceeded: volumeAttachLimitExceeded, } } @@ -56,6 +58,9 @@ func (f *FakeVolumeManager) Run(ctx context.Context, sourcesReady config.Sources // WaitForAttachAndMount is not implemented func (f *FakeVolumeManager) WaitForAttachAndMount(ctx context.Context, pod *v1.Pod) error { + if f.volumeAttachLimitExceeded { + return &VolumeAttachLimitExceededError{} + } return nil }