Automate All the Boring Kubernetes Operations with Python
Martin Heinz
Posted on May 18, 2022
Kubernetes became a de-facto standard in recent years and many of us - both DevOps engineers and developers alike - use it on daily basis. Many of the task that we perform are however, same, boring and easy to automate. Oftentimes it's simple enough to whip up a quick shell script with a bunch of kubectl
commands, but for more complicated automation tasks bash just isn't good enough, and you need the power of proper language, such as Python.
So, in this article we will look at how you can leverage Kubernetes Python Client library to automate whatever annoying Kubernetes task you might be dealing with!
Playground
Before we start playing with the Kubernetes client, we first need to create a playground cluster where we can safely test things out. We will use KinD (Kubernetes in Docker), which you can install from here.
We will use the following cluster configuration:
# kind.yaml
# https://kind.sigs.k8s.io/docs/user/configuration/
apiVersion: kind.x-k8s.io/v1alpha4
kind: Cluster
name: api-playground
nodes:
- role: control-plane
- role: worker
- role: worker
- role: worker
To create cluster from above configuration, you can run:
kind create cluster --image kindest/node:v1.23.5 --config=kind.yaml
kubectl cluster-info --context kind-api-playground
# Kubernetes control plane is running at https://127.0.0.1:36599
# CoreDNS is running at https://127.0.0.1:36599/api/v1/namespaces/kube-system/services/kube-dns:dns/proxy
kubectl get nodes
# NAME STATUS ROLES AGE VERSION
# api-playground-control-plane Ready control-plane,master 58s v1.23.5
# api-playground-worker Ready <none> 27s v1.23.5
# api-playground-worker2 NotReady <none> 27s v1.23.5
# api-playground-worker3 NotReady <none> 27s v1.23.5
With cluster up-and-running, we also need to install the client library (optionally, inside virtual environment):
python3 -m venv venv
source venv/bin/activate
pip install kubernetes
Authentication
To perform any action inside our Kubernetes cluster we first need to authenticate.
We will use long-lived tokens so that we don't need to go through the authentication flow repeatedly. Long-lived tokens can be created by creating a ServiceAccount:
kubectl create sa playground
kubectl describe sa playground
Name: playground
Namespace: default
Labels: <none>
Annotations: <none>
Image pull secrets: <none>
Mountable secrets: playground-token-v8bq7
Tokens: playground-token-v8bq7
Events: <none>
export KIND_TOKEN=$(kubectl get secret playground-token-v8bq7 -o json | jq -r .data.token | base64 --decode)
Using a service account also has the benefit, that it's not tied to any single person, which is always preferable for automation purposes.
Token from the output above can be then used in requests:
curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis
We're now authenticated, but not authorized to do much of anything. Therefore, next we need to create a Role and bind it to the ServiceAccount so that we can perform actions on resources:
kubectl create clusterrole manage-pods \
--verb=get --verb=list --verb=watch --verb=create --verb=update --verb=patch --verb=delete \
--resource=pods
kubectl -n default create rolebinding sa-manage-pods \
--clusterrole=manage-pods \
--serviceaccount=default:playground
The above gives our service account permission to perform any action on pods, limited to default
namespace.
You should always keep your roles very narrow and specific, but playing around in KinD, it makes sense to apply cluster-wide admin role:
kubectl create clusterrolebinding sa-cluster-admin \
--clusterrole=cluster-admin \
--serviceaccount=default:playground
Raw Requests
To get a better understanding of what is kubectl
and also the client doing under the hood, we will start with raw HTTP requests using curl
.
The easiest way to find out what requests are being made under the hood, is to run the desired kubectl
command with -v 10
which will output complete curl
commands:
kubectl get pods -v 10
# <snip>
curl -k -v -XGET -H "Accept: application/json;as=Table;v=v1;g=meta.k8s.io,application/json..." \
'https://127.0.0.1:36599/api/v1/namespaces/default/pods?limit=500'
# <snip>
The output with loglevel 10 will be very verbose, but somewhere it there, you will find the above curl
command.
Add a Bearer token header in the above curl
command with your long-lived token and you should be able to perform same actions as kubectl
, such as:
curl -s -k -XGET -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
-H "kubernetes/$Format" 'https://127.0.0.1:36599/api/v1/namespaces/default/pods/example' | jq .status.phase
# "Running"
In case there's request body needed, look up which fields need to be included in the request. For example when creating a Pod, we can use API described here, which results in following request:
curl -k -XPOST -H "Authorization: Bearer $KIND_TOKEN" -H "Accept: application/json, */*" -H "Content-Type: application/json" \
-H "kubernetes/$Format" https://127.0.0.1:36599/api/v1/namespaces/default/pods -d@pod.json
# To confirm
kubectl get pods
NAME READY STATUS RESTARTS AGE
example 0/1 Running 0 7s
Refer to the Kubernetes API reference for object attributes. Additionally, you can also view OpenAPI definition with:
curl -k -X GET -H "Authorization: Bearer $KIND_TOKEN" https://127.0.0.1:36599/apis
Interacting with Kubernetes directly using REST API might be a bit clunky, but there are situations where it might make sense to use it. That includes interacting with APIs that have no equivalent kubectl
command or for example in case you're using different distribution of Kubernetes - such as OpenShift - which exposes additional APIs not covered by either kubectl
or client SDK.
Python Client
Moving onto the Python client itself now. We need to go through the same step as with kubectl
or curl
. First being, authentication:
from kubernetes import client
import os
configuration = client.Configuration()
configuration.api_key_prefix["authorization"] = "Bearer"
configuration.host = "https://127.0.0.1:36599"
configuration.api_key["authorization"] = os.getenv("KIND_TOKEN", None)
configuration.verify_ssl = False # Only for testing with KinD!
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
ret = v1.list_namespaced_pod(namespace="default", watch=False)
for pod in ret.items:
print(f"Name: {pod.metadata.name}, Namespace: {pod.metadata.namespace} IP: {pod.status.pod_ip}")
# Name: example, Namespace: default IP: 10.244.2.2
First we define configuration object which tells the client that we will authenticate using Bearer token. Considering that our KinD cluster doesn't use SSL, we disable it, in real cluster however, you should never do that.
To test out the configuration, we use list_namespaced_pod
method of API client to get all pods in the default
namespace, and we print out their name, namespace and IP.
Now, for a more realistic task, let's create a Deployment:
deployment_name = "my-deploy"
deployment_manifest = {
"apiVersion": "apps/v1",
"kind": "Deployment",
"metadata": {"name": deployment_name, "namespace": "default"},
"spec": {"replicas": 3,
"selector": {
"matchLabels": {
"app": "nginx"
}},
"template": {"metadata": {"labels": {"app": "nginx"}},
"spec": {"containers": [
{"name": "nginx", "image": "nginx:1.21.6", "ports": [{"containerPort": 80}]}]
}
},
}
}
import time
from kubernetes.client.rest import ApiException
v1 = client.AppsV1Api(api_client)
response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")
while True:
try:
response = v1.read_namespaced_deployment_status(name=deployment_name, namespace="default")
if response.status.available_replicas != 3:
print("Waiting for Deployment to become ready...")
time.sleep(5)
else:
break
except ApiException as e:
print(f"Exception when calling AppsV1Api -> read_namespaced_deployment_status: {e}\n")
In addition to creating the Deployment, we also wait for its pods to become available. We do that by querying Deployment status and checking number of available replicas.
Also, notice the pattern in function names, such as create_namespaced_deployment
. To make it more obvious let's look at couple more:
replace_namespaced_cron_job
patch_namespaced_stateful_set
list_namespaced_horizontal_pod_autoscaler
read_namespaced_daemon_set
read_custom_resource_definition
All of these are in format operation_namespaced_resource
or just operation_resource
for global resources. They can be additionally suffixed with _status
or _scale
for methods that perform operations on resource status such as read_namespaced_deployment_status
or resource scale such as patch_namespaced_stateful_set_scale
.
Another thing to highlight is that in the above example we performed the actions using client.AppsV1Api
which allows us to work with all the resources that belong to apiVersion: apps/v1
. If we - for example - wanted to use CronJob we would instead choose BatchV1Api
(which is apiVersion: batch/v1
in YAML format) or for PVCs we would choose CoreV1Api
because of apiVersion: v1
- you get the gist.
As you can imagine, that's a lot of functions to choose from, luckily all of them are listed in docs and you can click on any one of them to get an example of its usage.
Beyond basic CRUD operations, it's also possible to continuously watch objects for changes. Obvious choice is to watch Events:
from kubernetes import client, watch
v1 = client.CoreV1Api(api_client)
count = 10
w = watch.Watch()
for event in w.stream(partial(v1.list_namespaced_event, namespace="default"), timeout_seconds=10):
print(f"Event - Message: {event['object']['message']} at {event['object']['metadata']['creationTimestamp']}")
count -= 1
if not count:
w.stop()
print("Finished namespace stream.")
# Event - Message: Successfully assigned default/my-deploy-cb69f686c-2dspd to api-playground-worker2 at 2022-04-19T11:18:25Z
# Event - Message: Container image "nginx:1.21.6" already present on machine at 2022-04-19T11:18:26Z
# Event - Message: Created container nginx at 2022-04-19T11:18:26Z
# Event - Message: Started container nginx at 2022-04-19T11:18:26Z
Here we chose to watch events in default
namespace. We take the first 10 events and then close the stream. If we wanted to continuously monitor the resources we would just remove the timeout_seconds
and the w.stop()
call.
In the first example you saw that we used plain Python dict
to define the Deployment object which we passed to the client. Alternatively though, we can use a more OOP style by using API Models (classes) provided by the library:
v1 = client.AppsV1Api(api_client)
deployment_manifest = client.V1Deployment(
api_version="apps/v1",
kind="Deployment",
metadata=client.V1ObjectMeta(name=deployment_name),
spec=client.V1DeploymentSpec(
replicas=3,
selector=client.V1LabelSelector(match_labels={
"app": "nginx"
}),
template=client.V1PodTemplateSpec(
metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
spec=client.V1PodSpec(
containers=[client.V1Container(name="nginx",
image="nginx:1.21.6",
ports=[client.V1ContainerPort(container_port=80)]
)]))
)
)
response = v1.create_namespaced_deployment(body=deployment_manifest, namespace="default")
Trying to figure out which model you should use for each argument is a losing battle, tough. When creating resources like shown above, you should always use documentation for models and traverse the links as you create the individual sub-objects to figure out what values/types are expected in each field.
Handy Examples
You should now have a basic idea about how the client works, so let's take a look at some handy examples and snippets that might help you automate daily Kubernetes operations.
A very common thing you might want to perform is a Deployment rollout - usually done with kubectl rollout restart
. There's however no API to do this. The way kubectl
does it is by updating Deployment Annotations, more specifically, setting kubectl.kubernetes.io/restartedAt
to current time. This works because any change made to Pod spec causes a restart.
If we want to perform a restart using Python client we need to do the same:
from kubernetes import dynamic
from kubernetes.client import api_client # Careful - different import - not the same as previous client!
import datetime
client = dynamic.DynamicClient(api_client.ApiClient(configuration=configuration))
api = client.resources.get(api_version="apps/v1", kind="Deployment")
# Even though the Deployment manifest was previously created with class model, it still behaves as dictionary:
deployment_manifest["spec"]["template"]["metadata"]["annotations"] = {
"kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow().isoformat()
}
deployment_patched = api.patch(body=deployment_manifest, name=deployment_name, namespace="default")
Another common operation is scaling a Deployment, this one fortunately has an API function we can use:
from kubernetes import client
api_client = client.ApiClient(configuration)
apps_v1 = client.AppsV1Api(api_client)
# The body can be of different patch types - https://github.com/kubernetes-client/python/issues/1206#issuecomment-668118057
api_response = apps_v1.patch_namespaced_deployment_scale(deployment_name, "default", {"spec": {"replicas": 5}})
For troubleshooting purposes, it often makes sense to exec
into a Pod and take a look around, possibly grab environment variable to verify correct configuration:
from kubernetes.stream import stream
def pod_exec(name, namespace, command, api_instance):
exec_command = ["/bin/sh", "-c", command]
resp = stream(api_instance.connect_get_namespaced_pod_exec,
name,
namespace,
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False,
_preload_content=False)
while resp.is_open():
resp.update(timeout=1)
if resp.peek_stdout():
print(f"STDOUT: \n{resp.read_stdout()}")
if resp.peek_stderr():
print(f"STDERR: \n{resp.read_stderr()}")
resp.close()
if resp.returncode != 0:
raise Exception("Script failed")
pod = "example"
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
pod_exec(pod, "default", "env", v1)
# STDOUT:
# KUBERNETES_SERVICE_PORT=443
# KUBERNETES_PORT=tcp://10.96.0.1:443
# HOSTNAME=example
# HOME=/root
# ...
The snippet above also allows you to run whole shell scripts if needs be.
Moving onto more cluster administration-oriented tasks - let's say you want to apply a Taint onto a node that has some issue. Well, once again there's no direct API for Node Taints, but we can find a way:
from kubernetes import client
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
# kubectl taint nodes api-playground-worker some-taint=1:NoSchedule
v1.patch_node("api-playground-worker", {"spec": {"taints": [{"effect": "NoSchedule", "key": "some-taint", "value": "1"}]}})
# kubectl get nodes -o custom-columns=NAME:.metadata.name,TAINTS:.spec.taints --no-headers
# api-playground-control-plane [map[effect:NoSchedule key:node-role.kubernetes.io/master]]
# api-playground-worker [map[effect:NoSchedule key:some-taint value:1]]
# api-playground-worker2 <none>
# api-playground-worker3 <none>
You might also want to monitor a cluster resource utilization to possibly automate cluster scaling. Usually, you'd use kubectl top
to get the information interactively, with the client library you can do:
# https://github.com/kubernetes-sigs/kind/issues/398
# kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/download/v0.5.0/components.yaml
# kubectl patch -n kube-system deployment metrics-server --type=json \
# -p '[{"op":"add","path":"/spec/template/spec/containers/0/args/-","value":"--kubelet-insecure-tls"}]'
from kubernetes import client
api_client = client.ApiClient(configuration)
custom_api = client.CustomObjectsApi(api_client)
response = custom_api.list_cluster_custom_object("metrics.k8s.io", "v1beta1", "nodes") # also works with "pods" instead of "nodes"
for node in response["items"]:
print(f"{node['metadata']['name']: <30} CPU: {node['usage']['cpu']: <10} Memory: {node['usage']['memory']}")
# api-playground-control-plane CPU: 148318488n Memory: 2363504Ki
# api-playground-worker CPU: 91635913n Memory: 1858680Ki
# api-playground-worker2 CPU: 75473747n Memory: 1880860Ki
# api-playground-worker3 CPU: 105692650n Memory: 1881560Ki
The above example assumes that you have metrics-server
installed in your cluster. You can run kubectl top
to verify that. Use the comment in the snippet to install it if you're working with KinD.
Last but not least - you might already have a bunch of YAML or JSON files that you want to use to deploy or modify objects in your cluster, or you might want to export and backup what you've created with the client. Here's how you can convert from YAML/JSON files to Kubernetes object and back to files again:
# pip install kopf # (Python 3.7+)
import kopf
api_client = client.ApiClient(configuration)
v1 = client.CoreV1Api(api_client)
pods = []
# https://stackoverflow.com/questions/59977058/clone-kubernetes-objects-programmatically-using-the-python-api/59977059#59977059
ret = v1.list_namespaced_pod(namespace="default")
for pod in ret.items:
# Simple conversion to Dict/JSON
print(api_client.sanitize_for_serialization(pod))
# Conversion with fields clean-up
pods.append(kopf.AnnotationsDiffBaseStorage()
.build(body=kopf.Body(api_client.sanitize_for_serialization(pod))))
# Conversion from Dict back to Client object
class FakeKubeResponse:
def __init__(self, obj):
import json
self.data = json.dumps(obj)
for pod in pods:
pod_manifest = api_client.deserialize(FakeKubeResponse(pod), "V1Pod")
...
First way to convert existing object into Python dictionary (JSON) is to use sanitize_for_serialization
which produces raw output with all the generated/default fields. Better option is to use utility methods of kopf
library which will remove all the unnecessary fields. From there it's simple enough to convert dictionary into proper YAML or JSON file.
For the reverse - that is if we want to go from dictionary to Client Object Model - we can use deserialize
method of API Client. This method however expects its argument to have a data
attribute, so we pass it a container class instance with such attribute.
If you already have YAML files which you'd like to use with the Python client, then you can use the utility function kubernetes.utils.create_from_yaml
.
To get complete overview of all the features of the library, I recommend you take a look at the examples directory in the repository.
I'd also encourage you to look through the issues in the library repository, as it has a lot of great examples of client usage, such as processing events in parallel or watching ConfigMaps for updates.
Conclusion
The Python client library contains literally hundreds of function, so it's difficult to cover every little feature or use-case there is. Most of them however, follow a common pattern which should make the library's usage pretty natural after couple minutes.
If you're looking for more examples beyond what was shown and referenced above, I recommend exploring other popular tools that make use Python Kubernetes client, such kopf
- the library for creating Kubernetes operators. I also find it very useful to take a look at tests of the library itself, as it showcases its intended usage such this client test suite.
Posted on May 18, 2022
Join Our Newsletter. No Spam, Only the good stuff.
Sign up to receive the latest update from our blog.