Skip to content

Commit 91b4816

Browse files
committed
Optimize job controller performance: reduce work duration time & minimize cache locking
Signed-off-by: xigang <wangxigang2014@gmail.com>
1 parent 0001a8a commit 91b4816

File tree

1 file changed

+33
-3
lines changed

1 file changed

+33
-3
lines changed

pkg/controller/job/job_controller.go

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ type Controller struct {
109109
// A store of pods, populated by the podController
110110
podStore corelisters.PodLister
111111

112+
// podIndexer allows looking up pods by ControllerRef UID
113+
podIndexer cache.Indexer
114+
112115
// Jobs that need to be updated
113116
queue workqueue.TypedRateLimitingInterface[string]
114117

@@ -223,6 +226,12 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
223226
jm.podStore = podInformer.Lister()
224227
jm.podStoreSynced = podInformer.Informer().HasSynced
225228

229+
err := controller.AddPodControllerUIDIndexer(podInformer.Informer())
230+
if err != nil {
231+
return nil, fmt.Errorf("adding Pod controller UID indexer: %w", err)
232+
}
233+
jm.podIndexer = podInformer.Informer().GetIndexer()
234+
226235
jm.updateStatusHandler = jm.updateJobStatus
227236
jm.patchJobHandler = jm.patchJob
228237
jm.syncHandler = jm.syncJob
@@ -758,9 +767,9 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po
758767
if err != nil {
759768
return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
760769
}
761-
// List all pods to include those that don't match the selector anymore
762-
// but have a ControllerRef pointing to this controller.
763-
pods, err := jm.podStore.Pods(j.Namespace).List(labels.Everything())
770+
771+
// list all pods managed by this Job using the pod indexer
772+
pods, err := jm.getJobPodsByIndexer(j)
764773
if err != nil {
765774
return nil, err
766775
}
@@ -799,6 +808,27 @@ func (jm *Controller) getPodsForJob(ctx context.Context, j *batch.Job) ([]*v1.Po
799808
return pods, err
800809
}
801810

811+
// getJobPodsByIndexer returns the set of pods that this Job should manage.
812+
func (jm *Controller) getJobPodsByIndexer(j *batch.Job) ([]*v1.Pod, error) {
813+
podsForJob := []*v1.Pod{}
814+
for _, key := range []string{string(j.UID), controller.OrphanPodIndexKey} {
815+
pods, err := jm.podIndexer.ByIndex(controller.PodControllerUIDIndex, key)
816+
if err != nil {
817+
return nil, err
818+
}
819+
820+
for _, obj := range pods {
821+
pod, ok := obj.(*v1.Pod)
822+
if !ok {
823+
utilruntime.HandleError(fmt.Errorf("unexpected object type in pod indexer: %v", obj))
824+
continue
825+
}
826+
podsForJob = append(podsForJob, pod)
827+
}
828+
}
829+
return podsForJob, nil
830+
}
831+
802832
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
803833
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
804834
// concurrently with the same key.

0 commit comments

Comments
 (0)