Skip to content

[WIP] pop lock with channel #132164

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 58 additions & 25 deletions pkg/scheduler/backend/queue/active_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ type activeQueue struct {
// When SchedulerPopFromBackoffQ feature is enabled,
// condition is also notified when the pod is added to backoffQ.
// It is used with lock.
cond sync.Cond
// cond sync.Cond

// inFlightPods holds the UID of all pods which have been popped out for which Done
// hasn't been called yet - in other words, all pods that are currently being
Expand Down Expand Up @@ -186,6 +186,9 @@ type activeQueue struct {
// backoffQPopper is used to pop from backoffQ when activeQ is empty.
// It is non-nil only when SchedulerPopFromBackoffQ feature is enabled.
backoffQPopper backoffQPopper

notifyCh chan struct{}
closeCh chan struct{}
}

func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueueHintEnabled bool, metricRecorder metrics.MetricAsyncRecorder, backoffQPopper backoffQPopper) *activeQueue {
Expand All @@ -197,8 +200,10 @@ func newActiveQueue(queue *heap.Heap[*framework.QueuedPodInfo], isSchedulingQueu
metricsRecorder: metricRecorder,
unlockedQueue: newUnlockedActiveQueue(queue),
backoffQPopper: backoffQPopper,
notifyCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
}
aq.cond.L = &aq.lock
// aq.cond.L = &aq.lock

return aq
}
Expand Down Expand Up @@ -243,45 +248,55 @@ func (aq *activeQueue) delete(pInfo *framework.QueuedPodInfo) error {
return aq.queue.Delete(pInfo)
}

// pop removes the head of the queue and returns it.
// It blocks if the queue is empty and waits until a new item is added to the queue.
// It increments scheduling cycle when a pod is popped.
// // pop removes the head of the queue and returns it.
// // It blocks if the queue is empty and waits until a new item is added to the queue.
// // It increments scheduling cycle when a pod is popped.
func (aq *activeQueue) pop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
aq.lock.Lock()
defer aq.lock.Unlock()
for {
aq.lock.Lock()

return aq.unlockedPop(logger)
}
if aq.closed {
aq.lock.Unlock()
logger.V(2).Info("Scheduling queue is closed")
return nil, nil
}

func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo, error) {
var pInfo *framework.QueuedPodInfo
for aq.queue.Len() == 0 {
// backoffQPopper is non-nil only if SchedulerPopFromBackoffQ feature is enabled.
// In case of non-empty backoffQ, try popping from there.
if aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() != 0 {
break
if aq.hasSomething() {
pInfo, err := aq.popLocked(logger)
aq.lock.Unlock()
if err != nil {
return nil, err
}
if pInfo == nil {
continue
}
return pInfo, nil
}
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if aq.closed {

aq.lock.Unlock()
select {
case <-aq.notifyCh:
continue
case <-aq.closeCh:
logger.V(2).Info("Scheduling queue is closed")
return nil, nil
}
aq.cond.Wait()
}
}

func (aq *activeQueue) popLocked(logger klog.Logger) (*framework.QueuedPodInfo, error) {
pInfo, err := aq.queue.Pop()
if err != nil {
if aq.backoffQPopper == nil {
return nil, err
}
// Try to pop from backoffQ when activeQ is empty.
pInfo, err = aq.backoffQPopper.popBackoff()
if err != nil {
return nil, err
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", framework.PopFromBackoffQ).Inc()
}

pInfo.Attempts++
// In flight, no concurrent events yet.
if aq.isSchedulingQueueHintEnabled {
Expand All @@ -292,7 +307,7 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo
// because it likely doesn't cause any visible issues from the scheduling perspective.
utilruntime.HandleErrorWithLogger(logger, nil, "The same pod is tracked in multiple places in the scheduler, and just discard it", "pod", klog.KObj(pInfo.Pod))
// Just ignore/discard this duplicated pod and try to pop the next one.
return aq.unlockedPop(logger)
return nil, nil
}

aq.metricsRecorder.ObserveInFlightEventsAsync(metrics.PodPoppedInFlightEvent, 1, false)
Expand All @@ -312,6 +327,13 @@ func (aq *activeQueue) unlockedPop(logger klog.Logger) (*framework.QueuedPodInfo
return pInfo, nil
}

func (aq *activeQueue) hasSomething() bool {
if aq.queue.Len() > 0 {
return true
}
return aq.backoffQPopper != nil && aq.backoffQPopper.lenBackoff() > 0
}

// list returns all pods that are in the queue.
func (aq *activeQueue) list() []*v1.Pod {
aq.lock.RLock()
Expand Down Expand Up @@ -491,10 +513,21 @@ func (aq *activeQueue) close() {
for pod := range aq.inFlightPods {
aq.unlockedDone(pod)
}
aq.closed = true
if !aq.closed {
aq.closed = true
close(aq.closeCh)
}
}

// broadcast notifies the pop() operation that new pod(s) was added to the activeQueue.
func (aq *activeQueue) broadcast() {
aq.cond.Broadcast()
aq.tryNotify()
}

// tryNotify sends a non-blocking wake-up signal to one waiter (coalesced).
func (aq *activeQueue) tryNotify() {
select {
case aq.notifyCh <- struct{}{}:
default:
}
}
23 changes: 22 additions & 1 deletion pkg/scheduler/backend/queue/backoff_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ type backoffQueue struct {

// isPopFromBackoffQEnabled indicates whether the feature gate SchedulerPopFromBackoffQ is enabled.
isPopFromBackoffQEnabled bool

notify func()
}

func newBackoffQueue(clock clock.WithTicker, podInitialBackoffDuration time.Duration, podMaxBackoffDuration time.Duration, activeQLessFn framework.LessFunc, popFromBackoffQEnabled bool) *backoffQueue {
Expand Down Expand Up @@ -293,7 +295,15 @@ func (bq *backoffQueue) popAllBackoffCompleted(logger klog.Logger) []*framework.
// It also ensures that pInfo is not in both queues.
func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo, event string) {
bq.lock.Lock()
defer bq.lock.Unlock()

preEmpty := (bq.podBackoffQ.Len()+bq.podErrorBackoffQ.Len() == 0)
needNotify := false
defer func() {
bq.lock.Unlock()
if needNotify && bq.notify != nil {
bq.notify()
}
}()

// If pod has empty both unschedulable plugins and pending plugins,
// it means that it failed because of error and should be moved to podErrorBackoffQ.
Expand All @@ -306,9 +316,15 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo,
return
}
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
if preEmpty {
needNotify = true
}
return
}
bq.podBackoffQ.AddOrUpdate(pInfo)
if preEmpty {
needNotify = true
}
// Ensure the pod is not in the podErrorBackoffQ and report the error if it happens.
err := bq.podErrorBackoffQ.Delete(pInfo)
if err == nil {
Expand All @@ -318,6 +334,11 @@ func (bq *backoffQueue) add(logger klog.Logger, pInfo *framework.QueuedPodInfo,
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event).Inc()
}

// setNotifyFunc injects the activeQ-side notifier.
func (bq *backoffQueue) setNotifyFunc(f func()) {
bq.notify = f
}

// update updates the pod in backoffQueue if oldPodInfo is already in the queue.
// It returns new pod info if updated, nil otherwise.
func (bq *backoffQueue) update(newPod *v1.Pod, oldPodInfo *framework.QueuedPodInfo) *framework.QueuedPodInfo {
Expand Down
8 changes: 6 additions & 2 deletions pkg/scheduler/backend/queue/scheduling_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/backend/heap"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
apicalls "k8s.io/kubernetes/pkg/scheduler/framework/api_calls"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/podtopologyspread"
"k8s.io/kubernetes/pkg/scheduler/metrics"
Expand Down Expand Up @@ -374,7 +374,11 @@ func NewPriorityQueue(
if isPopFromBackoffQEnabled {
backoffQPopper = backoffQ
}
pq.activeQ = newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessConverted), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder, backoffQPopper)
aq := newActiveQueue(heap.NewWithRecorder(podInfoKeyFunc, heap.LessFunc[*framework.QueuedPodInfo](lessConverted), metrics.NewActivePodsRecorder()), isSchedulingQueueHintEnabled, options.metricsRecorder, backoffQPopper)
pq.activeQ = aq
if bq, ok := pq.backoffQ.(*backoffQueue); ok {
bq.setNotifyFunc(func() { aq.tryNotify() })
}
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
pq.nominator = newPodNominator(options.podLister)

Expand Down