Skip to content

Commit 0f303fe

Browse files
committed
client-go: Add informer metrics
Signed-off-by: xigang <wangxigang2014@gmail.com>
1 parent c5c87bd commit 0f303fe

File tree

17 files changed

+1369
-44
lines changed

17 files changed

+1369
-44
lines changed

cmd/kube-controller-manager/app/controllermanager.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ import (
6161
logsapi "k8s.io/component-base/logs/api/v1"
6262
metricsfeatures "k8s.io/component-base/metrics/features"
6363
controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
64+
fifometrics "k8s.io/component-base/metrics/prometheus/fifo"
65+
informermetrics "k8s.io/component-base/metrics/prometheus/informer"
66+
reflectormetrics "k8s.io/component-base/metrics/prometheus/reflector"
6467
"k8s.io/component-base/metrics/prometheus/slis"
6568
"k8s.io/component-base/term"
6669
utilversion "k8s.io/component-base/version"
@@ -670,6 +673,12 @@ func CreateControllerContext(ctx context.Context, s *config.CompletedConfig, roo
670673
}
671674

672675
controllersmetrics.Register()
676+
if utilfeature.DefaultFeatureGate.Enabled(metricsfeatures.InformerMetrics) {
677+
informermetrics.Register()
678+
reflectormetrics.Register()
679+
fifometrics.Register()
680+
}
681+
673682
return controllerContext, nil
674683
}
675684

staging/src/k8s.io/client-go/tools/cache/controller.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ package cache
1919
import (
2020
"context"
2121
"errors"
22-
clientgofeaturegate "k8s.io/client-go/features"
22+
"reflect"
2323
"sync"
2424
"time"
2525

2626
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/apimachinery/pkg/util/naming"
2728
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2829
"k8s.io/apimachinery/pkg/util/wait"
30+
clientgofeaturegate "k8s.io/client-go/features"
2931
"k8s.io/utils/clock"
3032
)
3133

@@ -366,6 +368,10 @@ func DeletionHandlingObjectToName(obj interface{}) (ObjectName, error) {
366368

367369
// InformerOptions configure a Reflector.
368370
type InformerOptions struct {
371+
// Name is the name of the informer, typically derived from the callsite.
372+
// It's used for metrics and logging.
373+
Name string
374+
369375
// ListerWatcher implements List and Watch functions for the source of the resource
370376
// the informer will be informing about.
371377
ListerWatcher ListerWatcher
@@ -594,6 +600,10 @@ func newInformer(clientState Store, options InformerOptions) Controller {
594600
// KeyLister, that way resync operations will result in the correct set
595601
// of update/delete deltas.
596602

603+
if options.Name == "" {
604+
options.Name = naming.GetNameFromCallsite(internalPackages...)
605+
}
606+
597607
var fifo Queue
598608
if clientgofeaturegate.FeatureGates().Enabled(clientgofeaturegate.InOrderInformers) {
599609
fifo = NewRealFIFO(MetaNamespaceKeyFunc, clientState, options.Transform)
@@ -602,6 +612,7 @@ func newInformer(clientState Store, options InformerOptions) Controller {
602612
KnownObjects: clientState,
603613
EmitDeltaTypeReplaced: true,
604614
Transformer: options.Transform,
615+
Identifier: Identifier{Name: options.Name, Type: reflect.TypeOf(options.ObjectType).Elem().String()},
605616
})
606617
}
607618

staging/src/k8s.io/client-go/tools/cache/delta_fifo.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,15 @@ type DeltaFIFOOptions struct {
5858

5959
// If set, log output will go to this logger instead of klog.Background().
6060
Logger *klog.Logger
61+
62+
// Identifier is used to identify the DeltaFIFO. Name can be a queue name or
63+
// a controller name, and Type represents the type of items in the queue.
64+
// Both Name and Type are used to create metrics for the DeltaFIFO.
65+
Identifier Identifier
66+
67+
// If set, metricsProvider will be used to create metrics for the DeltaFIFO.
68+
// This allows consumers to provide their own metrics implementation.
69+
MetricsProvider FIFOMetricsProvider
6170
}
6271

6372
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
@@ -143,6 +152,10 @@ type DeltaFIFO struct {
143152
// logger is a per-instance logger. This gets chosen when constructing
144153
// the instance, with klog.Background() as default.
145154
logger klog.Logger
155+
156+
// metrics tracks basic metric information about the DeltaFIFO.
157+
// It's used to expose queue length and latency metrics.
158+
metrics *fifoMetrics
146159
}
147160

148161
// TransformFunc allows for transforming an object before it will be processed.
@@ -265,6 +278,11 @@ func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
265278
if opts.Logger != nil {
266279
f.logger = *opts.Logger
267280
}
281+
282+
if opts.Identifier.Name != "" && opts.Identifier.Type != "" {
283+
f.metrics = newFIFOMetrics(opts.Identifier.Name, opts.Identifier.Type, opts.MetricsProvider)
284+
}
285+
268286
f.cond.L = &f.lock
269287
return f
270288
}
@@ -420,6 +438,13 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
420438
// ignore emitDeltaTypeReplaced.
421439
// Caller must lock first.
422440
func (f *DeltaFIFO) queueActionInternalLocked(actionType, internalActionType DeltaType, obj interface{}) error {
441+
defer func() {
442+
if f.metrics != nil {
443+
f.metrics.numberOfQueuedItem.Set(float64(len(f.queue)))
444+
f.metrics.numberOfStoredItem.Set(float64(len(f.items)))
445+
}
446+
}()
447+
423448
id, err := f.KeyOf(obj)
424449
if err != nil {
425450
return KeyError{obj, err}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/utils/clock"
23+
)
24+
25+
// fifoMetrics tracks metrics for a FIFO queue, including the number of stored and queued items.
26+
type fifoMetrics struct {
27+
clock clock.Clock
28+
29+
numberOfStoredItem GaugeMetric
30+
numberOfQueuedItem GaugeMetric
31+
}
32+
33+
// FIFOMetricsProvider defines an interface for creating metrics that track FIFO queue operations.
34+
type FIFOMetricsProvider interface {
35+
// NewStoredItemMetric returns a gauge metric for tracking the total number of items
36+
// currently stored in the FIFO's internal storage.
37+
//
38+
// For DeltaFIFO: Represents len(f.items) - the number of unique keys with pending deltas
39+
// For RealFIFO: Would represent len(f.items) - the total number of individual deltas
40+
//
41+
// Parameters:
42+
// - name: Identifier for the queue (e.g., controller name, queue name)
43+
// - itemType: Type of objects being queued (e.g., "pods", "services")
44+
NewStoredItemMetric(name string, itemType string) GaugeMetric
45+
46+
// NewQueuedItemMetric returns a gauge metric for tracking the total number of items
47+
// currently queued and waiting to be processed.
48+
//
49+
// For DeltaFIFO: Represents len(f.queue) - the number of keys in processing order
50+
// For RealFIFO: Would represent len(f.items) - same as stored items due to strict ordering
51+
//
52+
// Parameters:
53+
// - name: Identifier for the queue (e.g., controller name, queue name)
54+
// - itemType: Type of objects being queued (e.g., "pods", "services")
55+
NewQueuedItemMetric(name string, itemType string) GaugeMetric
56+
}
57+
58+
type noopFIFOMetricsProvider struct{}
59+
60+
func (noopFIFOMetricsProvider) NewStoredItemMetric(name string, itemType string) GaugeMetric {
61+
return noopMetric{}
62+
}
63+
64+
func (noopFIFOMetricsProvider) NewQueuedItemMetric(name string, itemType string) GaugeMetric {
65+
return noopMetric{}
66+
}
67+
68+
var globalFIFOMetricsProvider FIFOMetricsProvider = noopFIFOMetricsProvider{}
69+
var setGlobalFIFOMetricsProvider sync.Once
70+
71+
// newFIFOMetrics creates a new fifoMetrics instance for tracking metrics related to FIFO queues.
72+
func newFIFOMetrics(queueName string, itemType string, metricsProvider FIFOMetricsProvider) *fifoMetrics {
73+
var ret *fifoMetrics
74+
if queueName == "" || itemType == "" {
75+
return ret
76+
}
77+
78+
if metricsProvider == nil {
79+
metricsProvider = globalFIFOMetricsProvider
80+
}
81+
82+
return &fifoMetrics{
83+
clock: &clock.RealClock{},
84+
numberOfStoredItem: metricsProvider.NewStoredItemMetric(queueName, itemType),
85+
numberOfQueuedItem: metricsProvider.NewQueuedItemMetric(queueName, itemType),
86+
}
87+
}
88+
89+
// SetFIFOMetricsProvider sets the global metrics provider for FIFO queues.
90+
func SetFIFOMetricsProvider(metricsProvider FIFOMetricsProvider) {
91+
setGlobalFIFOMetricsProvider.Do(func() {
92+
globalFIFOMetricsProvider = metricsProvider
93+
})
94+
}

staging/src/k8s.io/client-go/tools/cache/processor_listener_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"testing"
2222
"time"
2323

24+
v1 "k8s.io/api/core/v1"
2425
"k8s.io/apimachinery/pkg/util/wait"
2526
"k8s.io/klog/v2"
2627
)
@@ -36,7 +37,7 @@ func BenchmarkListener(b *testing.B) {
3637
swg.Add(b.N)
3738
b.SetParallelism(concurrencyLevel)
3839
// Preallocate enough space so that benchmark does not run out of it
39-
pl := newProcessListener(klog.Background(), &ResourceEventHandlerFuncs{
40+
pl := newProcessListener(klog.Background(), &informerMetrics{}, "informer_test", &v1.Pod{}, "handler_test", &ResourceEventHandlerFuncs{
4041
AddFunc: func(obj interface{}) {
4142
swg.Done()
4243
},

0 commit comments

Comments
 (0)