Skip to content

PodStartSLIDuration excludes init container runtime and excludes stateful pods #131950

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,7 @@ func NewMainKubelet(ctx context.Context,
nodeStatusMaxImages: nodeStatusMaxImages,
tracer: tracer,
nodeStartupLatencyTracker: kubeDeps.NodeStartupLatencyTracker,
podStartupLatencyTracker: kubeDeps.PodStartupLatencyTracker,
healthChecker: kubeDeps.HealthChecker,
flagz: kubeDeps.Flagz,
}
Expand Down Expand Up @@ -793,10 +794,11 @@ func NewMainKubelet(ctx context.Context,
kubeCfg.MemorySwap.SwapBehavior,
kubeDeps.ContainerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
kubeDeps.PodStartupLatencyTracker,
klet.podStartupLatencyTracker,
kubeDeps.TracerProvider,
tokenManager,
getServiceAccount,
klet.podStartupLatencyTracker,
)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1469,6 +1471,9 @@ type Kubelet struct {
// Track node startup latencies
nodeStartupLatencyTracker util.NodeStartupLatencyTracker

// Track pod startup latencies
podStartupLatencyTracker util.PodStartupLatencyTracker

// Health check kubelet
healthChecker watchdog.HealthChecker

Expand Down
7 changes: 4 additions & 3 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ func newTestKubeletWithImageList(
kubelet.configMapManager = configMapManager
kubelet.mirrorPodClient = fakeMirrorClient
kubelet.podManager = kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker)
kubelet.podStartupLatencyTracker = kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, kubelet.podStartupLatencyTracker)
kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
kubelet.podCertificateManager = &podcertificate.NoOpManager{}

Expand Down Expand Up @@ -3348,10 +3348,11 @@ func TestSyncPodSpans(t *testing.T) {
kubeCfg.MemorySwap.SwapBehavior,
kubelet.containerManager.GetNodeAllocatableAbsolute,
*kubeCfg.MemoryThrottlingFactor,
kubeletutil.NewPodStartupLatencyTracker(),
kubelet.podStartupLatencyTracker,
tp,
token.NewManager(kubelet.kubeClient),
func(string, string) (*v1.ServiceAccount, error) { return nil, nil },
kubelet.podStartupLatencyTracker,
)
assert.NoError(t, err)
kubelet.allocationManager.SetContainerRuntime(kubelet.containerRuntime)
Expand Down
85 changes: 61 additions & 24 deletions pkg/kubelet/kuberuntime/kuberuntime_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ type podStateProvider interface {
ShouldPodRuntimeBeRemoved(kubetypes.UID) bool
}

type PodInitContainerTimeRecorder interface {
RecordInitContainerStarted(podUID kubetypes.UID, startedAt time.Time)
RecordInitContainerFinished(podUID kubetypes.UID, finishedAt time.Time)
}

type kubeGenericRuntimeManager struct {
runtimeName string
recorder record.EventRecorder
Expand Down Expand Up @@ -186,6 +191,9 @@ type kubeGenericRuntimeManager struct {
// Swap controller availability check function (Linux only)
// Uses sync.OnceValue for lazy initialization
getSwapControllerAvailable func() bool

// Records first initContainer start time and last initContainer finish time
podInitContainerTimeRecorder PodInitContainerTimeRecorder
}

// KubeGenericRuntime is a interface contains interfaces for container runtime and command.
Expand Down Expand Up @@ -236,37 +244,39 @@ func NewKubeGenericRuntimeManager(
tracerProvider trace.TracerProvider,
tokenManager *token.Manager,
getServiceAccount plugin.GetServiceAccountFunc,
podInitContainerTimeRecorder PodInitContainerTimeRecorder,
) (KubeGenericRuntime, []images.PostImageGCHook, error) {
logger := klog.FromContext(ctx)

runtimeService = newInstrumentedRuntimeService(runtimeService)
imageService = newInstrumentedImageManagerService(imageService)
tracer := tracerProvider.Tracer(instrumentationScope)
kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder,
singleProcessOOMKill: singleProcessOOMKill,
cpuCFSQuota: cpuCFSQuota,
cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
seccompProfileRoot: filepath.Join(rootDirectory, "seccomp"),
livenessManager: livenessManager,
readinessManager: readinessManager,
startupManager: startupManager,
machineInfo: machineInfo,
osInterface: osInterface,
runtimeHelper: runtimeHelper,
runtimeService: runtimeService,
imageService: imageService,
containerManager: containerManager,
internalLifecycle: containerManager.InternalContainerLifecycle(),
logManager: logManager,
runtimeClassManager: runtimeClassManager,
allocationManager: allocationManager,
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
seccompDefault: seccompDefault,
memorySwapBehavior: memorySwapBehavior,
getNodeAllocatable: getNodeAllocatable,
memoryThrottlingFactor: memoryThrottlingFactor,
podLogsDirectory: podLogsDirectory,
recorder: recorder,
singleProcessOOMKill: singleProcessOOMKill,
cpuCFSQuota: cpuCFSQuota,
cpuCFSQuotaPeriod: cpuCFSQuotaPeriod,
seccompProfileRoot: filepath.Join(rootDirectory, "seccomp"),
livenessManager: livenessManager,
readinessManager: readinessManager,
startupManager: startupManager,
machineInfo: machineInfo,
osInterface: osInterface,
runtimeHelper: runtimeHelper,
runtimeService: runtimeService,
imageService: imageService,
containerManager: containerManager,
internalLifecycle: containerManager.InternalContainerLifecycle(),
logManager: logManager,
runtimeClassManager: runtimeClassManager,
allocationManager: allocationManager,
logReduction: logreduction.NewLogReduction(identicalErrorDelay),
seccompDefault: seccompDefault,
memorySwapBehavior: memorySwapBehavior,
getNodeAllocatable: getNodeAllocatable,
memoryThrottlingFactor: memoryThrottlingFactor,
podLogsDirectory: podLogsDirectory,
podInitContainerTimeRecorder: podInitContainerTimeRecorder,
}

// Initialize swap controller availability check with lazy evaluation
Expand Down Expand Up @@ -1437,6 +1447,11 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
}
return err
}
if typeName == "init container" {
if m.podInitContainerTimeRecorder != nil {
m.podInitContainerTimeRecorder.RecordInitContainerStarted(pod.UID, time.Now())
}
}

return nil
}
Expand Down Expand Up @@ -1466,6 +1481,28 @@ func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, po
logger.V(4).Info("Completed init container for pod", "containerName", container.Name, "pod", klog.KObj(pod))
}

for _, cs := range podStatus.ContainerStatuses {
// Check if this is an init container
for _, init := range pod.Spec.InitContainers {
if cs.Name == init.Name && cs.State == kubecontainer.ContainerStateExited && !cs.FinishedAt.IsZero() {
if m.podInitContainerTimeRecorder != nil {
m.podInitContainerTimeRecorder.RecordInitContainerFinished(pod.UID, cs.FinishedAt)
}
}
}
}

for _, cs := range podStatus.ContainerStatuses {
// Check if this is an init container
for _, init := range pod.Spec.InitContainers {
if cs.Name == init.Name && cs.State == kubecontainer.ContainerStateExited && !cs.FinishedAt.IsZero() {
if m.podInitContainerTimeRecorder != nil {
m.podInitContainerTimeRecorder.RecordInitContainerFinished(pod.UID, cs.FinishedAt)
}
}
}
}

// Step 7: For containers in podContainerChanges.ContainersToUpdate[CPU,Memory] list, invoke UpdateContainerResources
if resizable, _, _ := allocation.IsInPlacePodVerticalScalingAllowed(pod); resizable {
if len(podContainerChanges.ContainersToUpdate) > 0 || podContainerChanges.UpdatePodResources {
Expand Down
137 changes: 125 additions & 12 deletions pkg/kubelet/util/pod_startup_latency_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type PodStartupLatencyTracker interface {
ObservedPodOnWatch(pod *v1.Pod, when time.Time)
RecordImageStartedPulling(podUID types.UID)
RecordImageFinishedPulling(podUID types.UID)
RecordInitContainerStarted(podUID types.UID, startedAt time.Time)
RecordInitContainerFinished(podUID types.UID, finishedAt time.Time)
RecordStatusUpdated(pod *v1.Pod)
DeletePodStartupState(podUID types.UID)
}
Expand All @@ -48,8 +50,10 @@ type basicPodStartupLatencyTracker struct {
}

type perPodState struct {
firstStartedPulling time.Time
lastFinishedPulling time.Time
firstStartedPulling time.Time
lastFinishedPulling time.Time
firstInitContainerStart time.Time
lastInitContainerFinish time.Time
// first time, when pod status changed into Running
observedRunningTime time.Time
// log, if pod latency was already Observed
Expand Down Expand Up @@ -98,28 +102,86 @@ func (p *basicPodStartupLatencyTracker) ObservedPodOnWatch(pod *v1.Pod, when tim

if hasPodStartedSLO(pod) {
podStartingDuration := when.Sub(pod.CreationTimestamp.Time)
imagePullingDuration := state.lastFinishedPulling.Sub(state.firstStartedPulling)
podStartSLOduration := (podStartingDuration - imagePullingDuration).Seconds()
var podStartSLOduration time.Duration
var excludedTimeStart time.Time
var excludedTimeEnd time.Time

// Add time from pod creation to first excluded activity starts (either image pulling or init containers starting)
if !state.firstStartedPulling.IsZero() && !state.firstInitContainerStart.IsZero() {
if state.firstStartedPulling.Before(state.firstInitContainerStart) {
excludedTimeStart = state.firstStartedPulling
} else {
excludedTimeStart = state.firstInitContainerStart
}
} else if !state.firstStartedPulling.IsZero() {
excludedTimeStart = state.firstStartedPulling
} else if !state.firstInitContainerStart.IsZero() {
excludedTimeStart = state.firstInitContainerStart
}

if !excludedTimeStart.IsZero() {
preExcludedDuration := excludedTimeStart.Sub(pod.CreationTimestamp.Time)
if preExcludedDuration > 0 {
podStartSLOduration += preExcludedDuration
}
}

// Add gap between image pulling end and init container start if there is any
if !state.lastFinishedPulling.IsZero() && !state.firstInitContainerStart.IsZero() {
// Only count gap if init container starts after image pulling ends (no overlap)
if state.firstInitContainerStart.After(state.lastFinishedPulling) {
gapDuration := state.firstInitContainerStart.Sub(state.lastFinishedPulling)
if gapDuration > 0 {
podStartSLOduration += gapDuration
}
}
}

// Add time from last dependency completion to containers running
if state.lastFinishedPulling.After(state.lastInitContainerFinish) {
excludedTimeEnd = state.lastFinishedPulling
} else if !state.lastInitContainerFinish.IsZero() {
excludedTimeEnd = state.lastInitContainerFinish
} else if !state.lastFinishedPulling.IsZero() {
excludedTimeEnd = state.lastFinishedPulling
}

if !excludedTimeEnd.IsZero() {
postExcludedDuration := when.Sub(excludedTimeEnd)
if postExcludedDuration > 0 {
podStartSLOduration += postExcludedDuration
}
} else if excludedTimeStart.IsZero() {
// No dependencies at all, count entire duration
podStartSLOduration = podStartingDuration
}

isStatefulPod := isStatefulPod(pod)

klog.InfoS("Observed pod startup duration",
"pod", klog.KObj(pod),
"podStartSLOduration", podStartSLOduration,
"podStartSLOduration", podStartSLOduration.Seconds(),
"podStartE2EDuration", podStartingDuration,
"isStatefulPod", isStatefulPod,
"podCreationTimestamp", pod.CreationTimestamp.Time,
"firstStartedPulling", state.firstStartedPulling,
"lastFinishedPulling", state.lastFinishedPulling,
"firstInitContainerStart", state.firstInitContainerStart,
"lastInitContainerFinish", state.lastInitContainerFinish,
"observedRunningTime", state.observedRunningTime,
"watchObservedRunningTime", when)

metrics.PodStartSLIDuration.WithLabelValues().Observe(podStartSLOduration)
metrics.PodStartTotalDuration.WithLabelValues().Observe(podStartingDuration.Seconds())
state.metricRecorded = true
// if is the first Pod with network track the start values
// these metrics will help to identify problems with the CNI plugin
if !pod.Spec.HostNetwork && !p.firstNetworkPodSeen {
metrics.FirstNetworkPodStartSLIDuration.Set(podStartSLOduration)
p.firstNetworkPodSeen = true
if !isStatefulPod {
metrics.PodStartSLIDuration.WithLabelValues().Observe(podStartSLOduration.Seconds())
// if is the first Pod with network track the start values
// these metrics will help to identify problems with the CNI plugin
if !pod.Spec.HostNetwork && !p.firstNetworkPodSeen {
metrics.FirstNetworkPodStartSLIDuration.Set(podStartSLOduration.Seconds())
p.firstNetworkPodSeen = true
}
}
state.metricRecorded = true
}
}

Expand Down Expand Up @@ -149,6 +211,34 @@ func (p *basicPodStartupLatencyTracker) RecordImageFinishedPulling(podUID types.
state.lastFinishedPulling = p.clock.Now() // Now is always grater than values from the past.
}

func (p *basicPodStartupLatencyTracker) RecordInitContainerStarted(podUID types.UID, startedAt time.Time) {
p.lock.Lock()
defer p.lock.Unlock()

state := p.pods[podUID]
if state == nil {
return
}

if state.firstInitContainerStart.IsZero() || startedAt.Before(state.firstInitContainerStart) {
state.firstInitContainerStart = startedAt
}
}

func (p *basicPodStartupLatencyTracker) RecordInitContainerFinished(podUID types.UID, finishedAt time.Time) {
p.lock.Lock()
defer p.lock.Unlock()

state := p.pods[podUID]
if state == nil {
return
}

if finishedAt.After(state.lastInitContainerFinish) {
state.lastInitContainerFinish = finishedAt
}
}

func (p *basicPodStartupLatencyTracker) RecordStatusUpdated(pod *v1.Pod) {
p.lock.Lock()
defer p.lock.Unlock()
Expand Down Expand Up @@ -188,6 +278,29 @@ func hasPodStartedSLO(pod *v1.Pod) bool {
return true
}

// isStatefulPod determines if a pod is stateful according to the SLI documentation:
// "A stateful pod is defined as a pod that mounts at least one volume with sources
// other than secrets, config maps, downward API and empty dir."
// This should reflect the "stateful pod" definition
// ref: https://github.com/kubernetes/community/blob/master/sig-scalability/slos/pod_startup_latency.md
func isStatefulPod(pod *v1.Pod) bool {
for _, volume := range pod.Spec.Volumes {
// Check if this volume is NOT a stateless/ephemeral type
if volume.Secret == nil &&
volume.ConfigMap == nil &&
volume.DownwardAPI == nil &&
volume.EmptyDir == nil &&
volume.Projected == nil &&
volume.GitRepo == nil &&
volume.Image == nil &&
volume.Ephemeral == nil &&
(volume.CSI == nil || volume.CSI.VolumeAttributes["csi.storage.k8s.io/ephemeral"] != "true") {
return true
}
}
return false
}

func (p *basicPodStartupLatencyTracker) DeletePodStartupState(podUID types.UID) {
p.lock.Lock()
defer p.lock.Unlock()
Expand Down