-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathmain.go
executable file
·101 lines (85 loc) · 2.65 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiWatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/watch"
)
// sentinel is an object that knows how to
// start a watch on namespaces resources
//
// this is our implementation of `cache.Watcher`
type sentinel struct {
client *kubernetes.Clientset
timeoutSecs int64
}
// newSentinel returns a new `sentinel` object that implements `cache.Watcher`
func newSentinel(cs *kubernetes.Clientset, timeout int64) cache.Watcher {
return &sentinel{cs, timeout}
}
// Watch begin a watch on namespaces resources
func (s *sentinel) Watch(options metav1.ListOptions) (apiWatch.Interface, error) {
return s.client.CoreV1().Namespaces().
Watch(context.Background(), metav1.ListOptions{
TimeoutSeconds: &s.timeoutSecs,
})
}
// just to be sure that `cache.Watcher` interface
// is being implemented by our `sentinel` struct type
var _ cache.Watcher = (*sentinel)(nil)
func main() {
configLoader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
)
cfg, err := configLoader.ClientConfig()
if err != nil {
panic(err)
}
// create a new `Clientset`` for the given config
cs, err := kubernetes.NewForConfig(cfg)
if err != nil {
panic(err)
}
fmt.Printf("---- Start watching namespaces ----\n")
// create a `cache.Watcher` implementation using the `ClientSet``
watcher := newSentinel(cs, 50)
// create a `RetryWatcher` using initial
// version "1" and our specialized watcher
rw, err := watch.NewRetryWatcher("1", watcher)
if err != nil {
panic(err)
}
// process incoming event notifications
for {
// grab the event object
event, ok := <-rw.ResultChan()
if !ok {
panic(fmt.Errorf("closed channel"))
}
// cast to namespace
ns, ok := event.Object.(*corev1.Namespace)
if !ok {
panic(fmt.Errorf("invalid type '%T'", event.Object))
}
// skip events older then five minutes
creationTime := ns.GetCreationTimestamp().Time
fiveMinsAgo := time.Now().Add(-5 * time.Minute)
if event.Type == apiWatch.Added && creationTime.Before(fiveMinsAgo) {
// fmt.Printf(">> skip older events (creationTime: %s, currentTime: %s)\n",
// creationTime.Format(time.RFC3339), time.Now().Format(time.RFC3339))
continue
}
// print some info about the event
fmt.Printf("%s %s (createdAt: %s, phase: %s)\n",
event.Type, ns.Name, creationTime.Format(time.RFC3339), ns.Status.Phase)
// sleep a bit
time.Sleep(5 * time.Second)
}
}