Watch and react to Kubernetes objects changes
Luca Sepe
Posted on March 18, 2022
The Kubernetes API server exposes a REST interface consumable by any client.
client-go is the official client library for the Go programming language. It is used both internally by Kubernetes itself (for example, inside kubectl) as well as by numerous external consumers: operators, higher level frameworks and many more.
Using this library you can write Go applications to access kubernetes' API Server and you can programmatically add, delete, modify, and check kubernetes resources.
client-go introduces different types of clients such as: RESTClient, Clientset and dynamic.Interface. All these clients make available the Watch verb, which offers an event interface that reacts to objects changes: add, update, delete, etc.
In all cases, the returned object is an implementation of watch.Interface that looks like this:
Let's implement a watcher for namespaces changes. The application will do these things:
- attempts to begin watching the namespaces resource getting a watch.Interface on success
- iterates all the events produced by the watcher
- when an event is of type "namespace added" adds a custom label patching the namespace
- when en event is of type "namespace deleted" greet the namespace gone
In this article I'll show you how to use RESTClient to watch and then react to namespaces changes.
Here the source code - you can grab all the code @ https://github.com/lucasepe/using-client-go.
package main
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// Using the default configuration rules get the info
// to connect to the Kubernetes cluster
configLoader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
clientcmd.NewDefaultClientConfigLoadingRules(),
&clientcmd.ConfigOverrides{},
)
// create the Config object
cfg, err := configLoader.ClientConfig()
if err != nil {
panic(err)
}
// we want to use the core API (namespaces lives here)
cfg.APIPath = "/api"
cfg.GroupVersion = &corev1.SchemeGroupVersion
cfg.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
// create a RESTClient
rc, err := rest.RESTClientFor(cfg)
if err != nil {
panic(err.Error())
}
// utility function to create a int64 pointer
i64Ptr := func(i int64) *int64 { return &i }
opts := metav1.ListOptions{
TimeoutSeconds: i64Ptr(120),
Watch: true,
}
// attempts to begin watching the namespaces
// returns a `watch.Interface`, or an error
watcher, err := rc.Get().Resource("namespaces").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(time.Duration(*opts.TimeoutSeconds)).
Watch(context.TODO())
if err != nil {
panic(err)
}
// the patch data, just add a custom label
pd := []byte(`{"metadata":{"labels":{"modified-by":"lucasepe"}}}`)
// the patch type
pt := types.MergePatchType
// who did this patch?
po := metav1.PatchOptions{
FieldManager: "my-cool-app",
}
// here we iterate all the events streamed by the watch.Interface
for event := range watcher.ResultChan() {
// retrieve the Namespace
item := event.Object.(*corev1.Namespace)
switch event.Type {
// when a namespace is deleted...
case watch.Deleted:
// let's say hello!
fmt.Printf("- '%s' %v ...bye bye\n", item.GetName(), event.Type)
// when a namespace is added...
case watch.Added:
fmt.Printf("+ '%s' %v ", item.GetName(), event.Type)
// try to patch it!
err = rc.Patch(pt).Resource("namespaces").
Name(item.Name).
VersionedParams(&po, scheme.ParameterCodec).
Body(pd).
Do(context.TODO()).
Error()
if err != nil {
panic(err)
}
fmt.Println(" ...patched!")
}
}
}
To see what the program does, open two terminal windows.
In the first terminal run the code as usual:
$ go run main.go
In the second terminal create a namespace using kubectl
:
$ kubectl create namespace demo-system
In the first terminal you should see a new line like this:
+ 'demo-system' ADDED ...patched!
Going back to the second terminal and typing:
$ kubectl describe namespace demo-system
Name: demo-syste
Labels: kubernetes.io/metadata.name=demo-system
modified-by=lucasepe
Annotations: <none>
Status: Active
No resource quota.
No LimitRange resource
You can see that the program added a new label to to the newly created namespace.
Now deleting the namespace with:
$ kubectl delete namespace demo-system
Going to the program terminal, you should see a new line:
- 'demo-system' DELETED ...bye bye
Using the watch.Interface interface directly is actually discouraged.
For instance, since the server will close watch connections regularly, the ResultChan channel can be closed at any time (due to an io.EOF error).
There are helpers to re-establishing a watch at the last-received resourceVersion. RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) it will get restarted from the last point without the consumer even knowing about it.
But what if API server lost events because etcd erased all resource versions?
In order to be resilient to etcd cache not having the resource version anymore - you would need to use Informers.
If you want to discover much more about how to use the client-go library to do all kind of interaction with Kubernetes up to the step-by-step instruction about how to create a custom controller (operator) to manage your custom resource, you could eventually buy my notebook "Using client-go" here: https://leanpub.com/using-client-go - but take a peek first - download a free notebook excerpt with the TOC and some random page.
Thank you for your time!
All the best,
Luca
Posted on March 18, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.