From c158231edd6c2b7287123bb8ccdb6b3f059837c0 Mon Sep 17 00:00:00 2001 From: YamasouA Date: Sat, 7 Jun 2025 11:21:36 +0900 Subject: [PATCH 1/3] pop lock with channel --- pkg/scheduler/backend/queue/active_queue.go | 152 ++++++++++++++++---- 1 file changed, 128 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index 620579d7adcaa..a867799d23c4e 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -144,7 +144,7 @@ type activeQueue struct { // When SchedulerPopFromBackoffQ feature is enabled, // condition is also notified when the pod is added to backoffQ. // It is used with lock. - cond sync.Cond + // cond sync.Cond // inFlightPods holds the UID of all pods which have been popped out for which Done // hasn't been called yet - in other words, all pods that are currently being @@ -186,6 +186,9 @@ type activeQueue struct { // backoffQPopper is used to pop from backoffQ when activeQ is empty. // It is non-nil only when SchedulerPopFromBackoffQ feature is enabled. backoffQPopper backoffQPopper + + notifyCh chan struct{} + closeCh chan struct{} } func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder, backoffQPopper backoffQPopper) *activeQueue { @@ -197,8 +200,10 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu metricsRecorder: metricRecorder, unlockedQueue: newUnlockedActiveQueue(queue), backoffQPopper: backoffQPopper, + notifyCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), } - aq.cond.L = &aq.lock + // aq.cond.L = &aq.lock return aq } @@ -243,45 +248,62 @@ func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error { return aq.queue.Delete(pInfo) } -// pop removes the head of the queue and returns it. -// It blocks if the queue is empty and waits until a new item is added to the queue. -// It increments scheduling cycle when a pod is popped. +// // pop removes the head of the queue and returns it. +// // It blocks if the queue is empty and waits until a new item is added to the queue. +// // It increments scheduling cycle when a pod is popped. +// func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { +// aq.lock.Lock() +// defer aq.lock.Unlock() + +// return aq.unlockedPop(logger) +// } + func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { - aq.lock.Lock() - defer aq.lock.Unlock() + for { + aq.lock.Lock() - return aq.unlockedPop(logger) -} + if aq.closed { + aq.lock.Unlock() + logger.V(2).Info("Scheduling queue is closed") + return nil, nil + } -func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) { - var pInfo *framework.QueuedPodInfo - for aq.queue.Len() == 0 { - // backoffQPopper is non-nil only if SchedulerPopFromBackoffQ feature is enabled. - // In case of non-empty backoffQ, try popping from there. - if aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() != 0 { - break + if aq.hasSomething() { + pInfo, err := aq.popLocked(logger) + aq.lock.Unlock() + if err != nil { + return nil, err + } + if pInfo == nil { + continue + } + return pInfo, nil } - // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. - // When Close() is called, the p.closed is set and the condition is broadcast, - // which causes this loop to continue and return from the Pop(). - if aq.closed { + + aq.lock.Unlock() + select { + case <-aq.notifyCh: + continue + case <-aq.closeCh: logger.V(2).Info("Scheduling queue is closed") return nil, nil } - aq.cond.Wait() } +} + +func (aq *activeQueue) popLocked(logger klog.Logger) (*framework.QueuedPodInfo, error) { pInfo, err := aq.queue.Pop() if err != nil { if aq.backoffQPopper == nil { return nil, err } - // Try to pop from backoffQ when activeQ is empty. pInfo, err = aq.backoffQPopper.popBackoff() if err != nil { return nil, err } metrics.SchedulerQueueIncomingPods.WithLabelValues("active", framework.PopFromBackoffQ).Inc() } + pInfo.Attempts++ // In flight, no concurrent events yet. if aq.isSchedulingQueueHintEnabled { @@ -292,7 +314,7 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo // because it likely doesn't cause any visible issues from the scheduling perspective. utilruntime.HandleErrorWithLogger(logger, nil, "The same pod is tracked in multiple places in the scheduler, and just discard it", "pod", klog.KObj(pInfo.Pod)) // Just ignore/discard this duplicated pod and try to pop the next one. - return aq.unlockedPop(logger) + return nil, nil } aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false) @@ -312,6 +334,83 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo return pInfo, nil } +func (aq *activeQueue) hasSomething() bool { + if aq.queue.Len() > 0 { + return true + } + return aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() > 0 +} + +// func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) { +// var pInfo *framework.QueuedPodInfo +// for aq.queue.Len() == 0 { +// // backoffQPopper is non-nil only if SchedulerPopFromBackoffQ feature is enabled. +// // In case of non-empty backoffQ, try popping from there. +// if aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() != 0 { +// break +// } +// // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. +// // When Close() is called, the p.closed is set and the condition is broadcast, +// // which causes this loop to continue and return from the Pop(). +// if aq.closed { +// logger.V(2).Info("Scheduling queue is closed") +// return nil, nil +// } +// aq.cond.Wait() + +// select { +// case <-aq.notifyCh: +// case <-aq.closeCh: +// logger.V(2).Info("Scheduling queue is closed") +// return nil, nil +// } +// } +// select { +// case <-aq.notifyCh: +// default: +// } +// pInfo, err := aq.queue.Pop() +// if err != nil { +// if aq.backoffQPopper == nil { +// return nil, err +// } +// // Try to pop from backoffQ when activeQ is empty. +// pInfo, err = aq.backoffQPopper.popBackoff() +// if err != nil { +// return nil, err +// } +// metrics.SchedulerQueueIncomingPods.WithLabelValues("active", framework.PopFromBackoffQ).Inc() +// } +// pInfo.Attempts++ +// // In flight, no concurrent events yet. +// if aq.isSchedulingQueueHintEnabled { +// // If the pod is already in the map, we shouldn't overwrite the inFlightPods otherwise it'd lead to a memory leak. +// // https://github.com/kubernetes/kubernetes/pull/127016 +// if _, ok := aq.inFlightPods[pInfo.Pod.UID]; ok { +// // Just report it as an error, but no need to stop the scheduler +// // because it likely doesn't cause any visible issues from the scheduling perspective. +// logger.Error(nil, "the same pod is tracked in multiple places in the scheduler, and just discard it", "pod", klog.KObj(pInfo.Pod)) +// // Just ignore/discard this duplicated pod and try to pop the next one. +// return aq.unlockedPop(logger) +// } + +// aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false) +// aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod) +// } +// aq.schedCycle++ + +// // Update metrics and reset the set of unschedulable plugins for the next attempt. +// for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) { +// metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec() +// } +// pInfo.UnschedulablePlugins.Clear() +// pInfo.PendingPlugins.Clear() +// pInfo.GatingPlugin = "" +// pInfo.GatingPluginEvents = nil + +// return pInfo, nil +// } + // list returns all pods that are in the queue. func (aq *activeQueue) list() []*v1.Pod { aq.lock.RLock() @@ -492,9 +591,14 @@ func (aq *activeQueue) close() { aq.unlockedDone(pod) } aq.closed = true + close(aq.closeCh) } // broadcast notifies the pop() operation that new pod(s) was added to the activeQueue. func (aq *activeQueue) broadcast() { - aq.cond.Broadcast() + // aq.cond.Broadcast() + select { + case aq.notifyCh <- struct{}{}: + default: + } } From 4ca40faeacdb203913b8020ce86664757d8835cb Mon Sep 17 00:00:00 2001 From: YamasouA Date: Mon, 11 Aug 2025 13:39:42 +0900 Subject: [PATCH 2/3] fix --- pkg/scheduler/backend/queue/active_queue.go | 83 +++---------------- pkg/scheduler/backend/queue/backoff_queue.go | 25 +++++- .../backend/queue/scheduling_queue.go | 5 +- 3 files changed, 38 insertions(+), 75 deletions(-) diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index a867799d23c4e..6639de48dfbfb 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -57,6 +57,7 @@ type activeQueuer interface { done(pod types.UID) close() broadcast() + tryNotify() } // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. @@ -341,76 +342,6 @@ func (aq *activeQueue) hasSomething() bool { return aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() > 0 } -// func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) { -// var pInfo *framework.QueuedPodInfo -// for aq.queue.Len() == 0 { -// // backoffQPopper is non-nil only if SchedulerPopFromBackoffQ feature is enabled. -// // In case of non-empty backoffQ, try popping from there. -// if aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() != 0 { -// break -// } -// // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. -// // When Close() is called, the p.closed is set and the condition is broadcast, -// // which causes this loop to continue and return from the Pop(). -// if aq.closed { -// logger.V(2).Info("Scheduling queue is closed") -// return nil, nil -// } -// aq.cond.Wait() - -// select { -// case <-aq.notifyCh: -// case <-aq.closeCh: -// logger.V(2).Info("Scheduling queue is closed") -// return nil, nil -// } -// } -// select { -// case <-aq.notifyCh: -// default: -// } -// pInfo, err := aq.queue.Pop() -// if err != nil { -// if aq.backoffQPopper == nil { -// return nil, err -// } -// // Try to pop from backoffQ when activeQ is empty. -// pInfo, err = aq.backoffQPopper.popBackoff() -// if err != nil { -// return nil, err -// } -// metrics.SchedulerQueueIncomingPods.WithLabelValues("active", framework.PopFromBackoffQ).Inc() -// } -// pInfo.Attempts++ -// // In flight, no concurrent events yet. -// if aq.isSchedulingQueueHintEnabled { -// // If the pod is already in the map, we shouldn't overwrite the inFlightPods otherwise it'd lead to a memory leak. -// // https://github.com/kubernetes/kubernetes/pull/127016 -// if _, ok := aq.inFlightPods[pInfo.Pod.UID]; ok { -// // Just report it as an error, but no need to stop the scheduler -// // because it likely doesn't cause any visible issues from the scheduling perspective. -// logger.Error(nil, "the same pod is tracked in multiple places in the scheduler, and just discard it", "pod", klog.KObj(pInfo.Pod)) -// // Just ignore/discard this duplicated pod and try to pop the next one. -// return aq.unlockedPop(logger) -// } - -// aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false) -// aq.inFlightPods[pInfo.Pod.UID] = aq.inFlightEvents.PushBack(pInfo.Pod) -// } -// aq.schedCycle++ - -// // Update metrics and reset the set of unschedulable plugins for the next attempt. -// for plugin := range pInfo.UnschedulablePlugins.Union(pInfo.PendingPlugins) { -// metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Dec() -// } -// pInfo.UnschedulablePlugins.Clear() -// pInfo.PendingPlugins.Clear() -// pInfo.GatingPlugin = "" -// pInfo.GatingPluginEvents = nil - -// return pInfo, nil -// } - // list returns all pods that are in the queue. func (aq *activeQueue) list() []*v1.Pod { aq.lock.RLock() @@ -590,13 +521,19 @@ func (aq *activeQueue) close() { for pod := range aq.inFlightPods { aq.unlockedDone(pod) } - aq.closed = true - close(aq.closeCh) + if !aq.closed { + aq.closed = true + close(aq.closeCh) + } } // broadcast notifies the pop() operation that new pod(s) was added to the activeQueue. func (aq *activeQueue) broadcast() { - // aq.cond.Broadcast() + aq.tryNotify() +} + +// tryNotify sends a non-blocking wake-up signal to one waiter (coalesced). +func (aq *activeQueue) tryNotify() { select { case aq.notifyCh <- struct{}{}: default: diff --git a/pkg/scheduler/backend/queue/backoff_queue.go b/pkg/scheduler/backend/queue/backoff_queue.go index 5d20f2593ce9f..ddf51631e3510 100644 --- a/pkg/scheduler/backend/queue/backoff_queue.go +++ b/pkg/scheduler/backend/queue/backoff_queue.go @@ -74,6 +74,8 @@ type backoffQueuer interface { list() []*v1.Pod // len returns length of the queue. len() int + + setNotifyFunc(f func()) } // backoffQueue implements backoffQueuer and wraps two queues inside, @@ -104,6 +106,8 @@ type backoffQueue struct { // isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled. isPopFromBackoffQEnabled bool + + notify func() } func newBackoffQueue(clock clock.WithTicker, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration, activeQLessFn framework.LessFunc, popFromBackoffQEnabled bool) *backoffQueue { @@ -293,7 +297,15 @@ func (bq *backoffQueue) popAllBackoffCompleted(logger klog.Logger) []*framework. // It also ensures that pInfo is not in both queues. func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) { bq.lock.Lock() - defer bq.lock.Unlock() + + preEmpty := (bq.podBackoffQ.Len()+bq.podErrorBackoffQ.Len() == 0) + needNotify := false + defer func() { + bq.lock.Unlock() + if needNotify && bq.notify != nil { + bq.notify() + } + }() // If pod has empty both unschedulable plugins and pending plugins, // it means that it failed because of error and should be moved to podErrorBackoffQ. @@ -306,9 +318,15 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, return } metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() + if preEmpty { + needNotify = true + } return } bq.podBackoffQ.AddOrUpdate(pInfo) + if preEmpty { + needNotify = true + } // Ensure the pod is not in the podErrorBackoffQ and report the error if it happens. err := bq.podErrorBackoffQ.Delete(pInfo) if err == nil { @@ -318,6 +336,11 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc() } +// setNotifyFunc injects the activeQ-side notifier. +func (bq *backoffQueue) setNotifyFunc(f func()) { + bq.notify = f +} + // update updates the pod in backoffQueue if oldPodInfo is already in the queue. // It returns new pod info if updated, nil otherwise. func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo { diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index 798794c371835..e09fdb2ec9fd3 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -49,7 +49,7 @@ import ( "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/backend/heap" "k8s.io/kubernetes/pkg/scheduler/framework" - "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" + apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread" "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -375,6 +375,9 @@ func NewPriorityQueue( backoffQPopper = backoffQ } pq.activeQ = newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessConverted), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder, backoffQPopper) + if bq, ok := pq.backoffQ.(*backoffQueue); ok { + bq.setNotifyFunc(func() { pq.activeQ.tryNotify() }) + } pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() pq.nominator = newPodNominator(options.podLister) From 8b5e6a0ba916445f15fc3d6c1673490e54481266 Mon Sep 17 00:00:00 2001 From: YamasouA Date: Mon, 11 Aug 2025 13:51:35 +0900 Subject: [PATCH 3/3] tweak --- pkg/scheduler/backend/queue/active_queue.go | 8 -------- pkg/scheduler/backend/queue/backoff_queue.go | 2 -- pkg/scheduler/backend/queue/scheduling_queue.go | 5 +++-- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/pkg/scheduler/backend/queue/active_queue.go b/pkg/scheduler/backend/queue/active_queue.go index 6639de48dfbfb..85a6f4ecbaedf 100644 --- a/pkg/scheduler/backend/queue/active_queue.go +++ b/pkg/scheduler/backend/queue/active_queue.go @@ -57,7 +57,6 @@ type activeQueuer interface { done(pod types.UID) close() broadcast() - tryNotify() } // unlockedActiveQueuer defines activeQ methods that are not protected by the lock itself. @@ -252,13 +251,6 @@ func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error { // // pop removes the head of the queue and returns it. // // It blocks if the queue is empty and waits until a new item is added to the queue. // // It increments scheduling cycle when a pod is popped. -// func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { -// aq.lock.Lock() -// defer aq.lock.Unlock() - -// return aq.unlockedPop(logger) -// } - func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) { for { aq.lock.Lock() diff --git a/pkg/scheduler/backend/queue/backoff_queue.go b/pkg/scheduler/backend/queue/backoff_queue.go index ddf51631e3510..dd0c0b0e95791 100644 --- a/pkg/scheduler/backend/queue/backoff_queue.go +++ b/pkg/scheduler/backend/queue/backoff_queue.go @@ -74,8 +74,6 @@ type backoffQueuer interface { list() []*v1.Pod // len returns length of the queue. len() int - - setNotifyFunc(f func()) } // backoffQueue implements backoffQueuer and wraps two queues inside, diff --git a/pkg/scheduler/backend/queue/scheduling_queue.go b/pkg/scheduler/backend/queue/scheduling_queue.go index e09fdb2ec9fd3..545e77366bac7 100644 --- a/pkg/scheduler/backend/queue/scheduling_queue.go +++ b/pkg/scheduler/backend/queue/scheduling_queue.go @@ -374,9 +374,10 @@ func NewPriorityQueue( if isPopFromBackoffQEnabled { backoffQPopper = backoffQ } - pq.activeQ = newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessConverted), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder, backoffQPopper) + aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessConverted), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder, backoffQPopper) + pq.activeQ = aq if bq, ok := pq.backoffQ.(*backoffQueue); ok { - bq.setNotifyFunc(func() { pq.activeQ.tryNotify() }) + bq.setNotifyFunc(func() { aq.tryNotify() }) } pq.nsLister = informerFactory.Core().V1().Namespaces().Lister() pq.nominator = newPodNominator(options.podLister)