From d961af4f33560e0c46546f2bc42e746718c6a098 Mon Sep 17 00:00:00 2001 From: yliao Date: Sun, 3 Aug 2025 01:46:20 +0000 Subject: [PATCH] fixed bug such that implicit extended resource name can always be used, no matter the explicit extendedResourceName field in device class is set or not. --- pkg/kubelet/cm/dra/manager.go | 4 +- .../dynamicresources/dynamicresources.go | 5 +- .../dynamicresources/dynamicresources_test.go | 55 ++++++++++++----- .../plugins/dynamicresources/extended/util.go | 7 +-- .../framework/plugins/noderesources/fit.go | 2 +- .../plugins/noderesources/fit_test.go | 60 +++++++++++++++++++ pkg/scheduler/util/utils.go | 8 +++ test/e2e/dra/dra.go | 19 ++++++ 8 files changed, 135 insertions(+), 25 deletions(-) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 9d034a091a566..73182ff1d29ac 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -39,7 +39,6 @@ import ( drahealthv1alpha1 "k8s.io/kubelet/pkg/apis/dra-health/v1alpha1" drapb "k8s.io/kubelet/pkg/apis/dra/v1" - v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" kubefeatures "k8s.io/kubernetes/pkg/features" draplugin "k8s.io/kubernetes/pkg/kubelet/cm/dra/plugin" "k8s.io/kubernetes/pkg/kubelet/cm/dra/state" @@ -48,6 +47,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" + "k8s.io/kubernetes/pkg/scheduler/util" ) // draManagerStateFileName is the file name where dra manager stores its state @@ -526,7 +526,7 @@ func (m *Manager) GetResources(pod *v1.Pod, container *v1.Container) (*Container // We only care about the resources requested by the pod continue } - if v1helper.IsExtendedResourceName(rName) { + if util.IsDRAExtendedResourceName(rName) { requestName := "" for _, rm := range pod.Status.ExtendedResourceClaimStatus.RequestMappings { if rm.ContainerName == container.Name && rm.ResourceName == rName.String() { diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go index 3cd457a9ea97f..923d47a5bcfe0 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources.go @@ -47,7 +47,6 @@ import ( "k8s.io/dynamic-resource-allocation/structured" "k8s.io/klog/v2" fwk "k8s.io/kube-scheduler/framework" - v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/apis/config/validation" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -430,7 +429,7 @@ func hasDeviceClassMappedExtendedResource(reqs v1.ResourceList, deviceClassMappi // We only care about the resources requested by the pod we are trying to schedule. continue } - if v1helper.IsExtendedResourceName(rName) { + if schedutil.IsDRAExtendedResourceName(rName) { _, ok := deviceClassMapping[rName] if ok { return true @@ -761,7 +760,7 @@ func (pl *DynamicResources) filterExtendedResources(state *stateData, pod *v1.Po extendedResources := make(map[v1.ResourceName]int64) hasExtendedResource := false for rName, rQuant := range state.draExtendedResource.podScalarResources { - if !v1helper.IsExtendedResourceName(rName) { + if !schedutil.IsDRAExtendedResourceName(rName) { continue } // Skip in case request quantity is zero diff --git a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go index a86bbcdcef6a7..1be506080a9e4 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/dynamicresources_test.go @@ -58,21 +58,22 @@ import ( var ( podKind = v1.SchemeGroupVersion.WithKind("Pod") - nodeName = "worker" - node2Name = "worker-2" - node3Name = "worker-3" - driver = "some-driver" - driver2 = "some-driver-2" - podName = "my-pod" - podUID = "1234" - resourceName = "my-resource" - resourceName2 = resourceName + "-2" - claimName = podName + "-" + resourceName - claimName2 = podName + "-" + resourceName2 - className = "my-resource-class" - namespace = "default" - attrName = resourceapi.QualifiedName("healthy") // device attribute only available on non-default node - extendedResourceName = "example.com/gpu" + nodeName = "worker" + node2Name = "worker-2" + node3Name = "worker-3" + driver = "some-driver" + driver2 = "some-driver-2" + podName = "my-pod" + podUID = "1234" + resourceName = "my-resource" + resourceName2 = resourceName + "-2" + claimName = podName + "-" + resourceName + claimName2 = podName + "-" + resourceName2 + className = "my-resource-class" + namespace = "default" + attrName = resourceapi.QualifiedName("healthy") // device attribute only available on non-default node + extendedResourceName = "example.com/gpu" + implicitExtendedResourceName = "deviceclass.resource.kubernetes.io/my-resource-class" deviceClass = &resourceapi.DeviceClass{ ObjectMeta: metav1.ObjectMeta{ @@ -122,6 +123,12 @@ var ( v1.ResourceName(extendedResourceName): "1", }). Obj() + podWithImplicitExtendedResourceName = st.MakePod().Name(podName).Namespace(namespace). + UID(podUID). + Req(map[v1.ResourceName]string{ + v1.ResourceName(implicitExtendedResourceName): "1", + }). + Obj() // Node with "instance-1" device and no device attributes. workerNode = &st.MakeNode().Name(nodeName).Label("kubernetes.io/hostname", nodeName).Node @@ -1303,6 +1310,24 @@ func TestPlugin(t *testing.T) { }, }, }, + "implicit-extended-resource-name-with-resources": { + enableDRAExtendedResource: true, + pod: podWithImplicitExtendedResourceName, + classes: []*resourceapi.DeviceClass{deviceClass}, + objs: []apiruntime.Object{workerNodeSlice, podWithImplicitExtendedResourceName}, + want: want{ + reserve: result{ + inFlightClaim: extendedResourceClaimNoName, + }, + prebind: result{ + assumedClaim: reserve(extendedResourceClaim, podWithImplicitExtendedResourceName), + added: []metav1.Object{reserve(extendedResourceClaim, podWithImplicitExtendedResourceName)}, + }, + postbind: result{ + assumedClaim: reserve(extendedResourceClaim, podWithImplicitExtendedResourceName), + }, + }, + }, "extended-resource-name-with-resources-has-claim": { enableDRAExtendedResource: true, pod: podWithExtendedResourceName, diff --git a/pkg/scheduler/framework/plugins/dynamicresources/extended/util.go b/pkg/scheduler/framework/plugins/dynamicresources/extended/util.go index 7c3acf137a4b3..a0949291c399b 100644 --- a/pkg/scheduler/framework/plugins/dynamicresources/extended/util.go +++ b/pkg/scheduler/framework/plugins/dynamicresources/extended/util.go @@ -18,7 +18,7 @@ package extended import ( v1 "k8s.io/api/core/v1" - "k8s.io/api/resource/v1beta1" + resourceapi "k8s.io/api/resource/v1" "k8s.io/kubernetes/pkg/scheduler/framework" ) @@ -29,11 +29,10 @@ func DeviceClassMapping(draManager framework.SharedDRAManager) (map[v1.ResourceN return nil, err } for _, c := range classes { - if c.Spec.ExtendedResourceName == nil { - extendedResources[v1.ResourceName(v1beta1.ResourceDeviceClassPrefix+c.Name)] = c.Name - } else { + if c.Spec.ExtendedResourceName != nil { extendedResources[v1.ResourceName(*c.Spec.ExtendedResourceName)] = c.Name } + extendedResources[v1.ResourceName(resourceapi.ResourceDeviceClassPrefix+c.Name)] = c.Name } return extendedResources, nil } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit.go b/pkg/scheduler/framework/plugins/noderesources/fit.go index 1159a8d8b2ccc..f42102d77c164 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit.go @@ -241,7 +241,7 @@ func withDeviceClass(result *preFilterState, draManager framework.SharedDRAManag continue } - if v1helper.IsExtendedResourceName(rName) { + if schedutil.IsDRAExtendedResourceName(rName) { hasExtendedResource = true break } diff --git a/pkg/scheduler/framework/plugins/noderesources/fit_test.go b/pkg/scheduler/framework/plugins/noderesources/fit_test.go index cc5583d0e72a0..469d9d965291b 100644 --- a/pkg/scheduler/framework/plugins/noderesources/fit_test.go +++ b/pkg/scheduler/framework/plugins/noderesources/fit_test.go @@ -1908,3 +1908,63 @@ func TestHaveAnyRequestedResourcesIncreased(t *testing.T) { }) } } +func TestWithDeviceClass(t *testing.T) { + logger, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + client := fake.NewSimpleClientset(deviceClassWithExtendResourceName) + informerFactory := informers.NewSharedInformerFactory(client, 0) + draManager := dynamicresources.NewDRAManager(ctx, assumecache.NewAssumeCache(logger, informerFactory.Resource().V1().ResourceClaims().Informer(), "resource claim", "", nil), nil, informerFactory) + informerFactory.Start(ctx.Done()) + t.Cleanup(func() { + // Now we can wait for all goroutines to stop. + informerFactory.Shutdown() + }) + informerFactory.WaitForCacheSync(ctx.Done()) + + testCases := map[string]struct { + state *preFilterState + expectedResourceToDeviceClass map[v1.ResourceName]string + }{ + + "regular extended resource name": { + state: &preFilterState{ + Resource: framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{extendedResourceA: 1}, + }, + }, + expectedResourceToDeviceClass: map[v1.ResourceName]string{ + v1.ResourceName("deviceclass.resource.kubernetes.io/device-class-name"): deviceClassName, + v1.ResourceName("extended.resource.dra.io/something"): deviceClassName, + }, + }, + "implicit extended resource name": { + state: &preFilterState{ + Resource: framework.Resource{ + ScalarResources: map[v1.ResourceName]int64{v1.ResourceName("deviceclass.resource.kubernetes.io/" + deviceClassName): 1}, + }, + }, + expectedResourceToDeviceClass: map[v1.ResourceName]string{ + v1.ResourceName("deviceclass.resource.kubernetes.io/device-class-name"): deviceClassName, + v1.ResourceName("extended.resource.dra.io/something"): deviceClassName, + }, + }, + "no extended resource name": { + state: &preFilterState{ + Resource: framework.Resource{ + MilliCPU: 1000, + }, + }, + expectedResourceToDeviceClass: nil, + }, + } + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + withDeviceClass(tc.state, draManager) + if diff := cmp.Diff(tc.state.resourceToDeviceClass, tc.expectedResourceToDeviceClass); diff != "" { + t.Error("resourceToDeviceClass doesn't match (-expected +actual):\n", diff) + } + }) + } +} diff --git a/pkg/scheduler/util/utils.go b/pkg/scheduler/util/utils.go index acaaaabb4d71c..06ba395030dbd 100644 --- a/pkg/scheduler/util/utils.go +++ b/pkg/scheduler/util/utils.go @@ -20,9 +20,11 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" v1 "k8s.io/api/core/v1" + resourceapi "k8s.io/api/resource/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -142,6 +144,12 @@ func IsScalarResourceName(name v1.ResourceName) bool { v1helper.IsPrefixedNativeResource(name) || v1helper.IsAttachableVolumeResourceName(name) } +// IsDRAExtendedResourceName returns true when name is an extended resource name, or an implicit extended resource name +// derived from device class name with the format of deviceclass.resource.kubernetes.io/ +func IsDRAExtendedResourceName(name v1.ResourceName) bool { + return v1helper.IsExtendedResourceName(name) || strings.HasPrefix(string(name), resourceapi.ResourceDeviceClassPrefix) +} + // As converts two objects to the given type. // Both objects must be of the same type. If not, an error is returned. // nil objects are allowed and will be converted to nil. diff --git a/test/e2e/dra/dra.go b/test/e2e/dra/dra.go index 8fc52e8828170..4d83e6acd20af 100644 --- a/test/e2e/dra/dra.go +++ b/test/e2e/dra/dra.go @@ -1910,6 +1910,25 @@ var _ = framework.SIGDescribe("node")(framework.WithLabel("DRA"), func() { b := drautils.NewBuilder(f, driver) b.UseExtendedResourceName = true + ginkgo.It("must run a pod with both implicit and explicit extended resource with one container two resources", func(ctx context.Context) { + pod := b.Pod() + res := v1.ResourceList{} + // drautils.ExtendedResourceName(0) is added to the deivce class with name: b.ClassName()+"0" + res[v1.ResourceName("deviceclass.resource.kubernetes.io/"+b.ClassName()+"0")] = resource.MustParse("1") + res[v1.ResourceName(drautils.ExtendedResourceName(0))] = resource.MustParse("1") + pod.Spec.Containers[0].Resources.Requests = res + pod.Spec.Containers[0].Resources.Limits = res + + b.Create(ctx, pod) + err := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod) + framework.ExpectNoError(err, "start pod") + containerEnv := []string{ + "container_0_request_0", "true", + "container_0_request_1", "true", + } + drautils.TestContainerEnv(ctx, f, pod, pod.Spec.Containers[0].Name, false, containerEnv...) + }) + ginkgo.It("must run a pod with extended resource with one container one resource", func(ctx context.Context) { pod := b.Pod() res := v1.ResourceList{}