My Journey With Spark On Kubernetes... In Python (3/3)

psclgllt

Pascal Gillet

Posted on April 12, 2021

My Journey With Spark On Kubernetes... In Python (3/3)

We need to operate Kubernetes as part of a Python client application. So, we need to interact with the Kubernetes REST API. Luckily we do not need to implement the API calls and manage HTTP requests/responses ourselves: we can rely on the Kubernetes Python client, among other officially-supported Kubernetes client libraries for other languages such as Go, Java, .NET, JavaScript and Haskell (there are also a lot of community-maintained client libraries for many languages).

Kubernetes Python Client

The Kubernetes Python Client is compatible with Python 2.7 and 3.4+. See the compatibility matrix for the supported versions of Kubernetes.

When using the client library, we must first load authentication and cluster information.

Load Authentication And Cluster Information

First, you need to setup the required service account and roles.

k8s/python-client-sa-rbac.yaml

apiVersion: v1
kind: ServiceAccount
metadata:
  name: python-client-sa
  namespace: spark-jobs
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: spark-jobs
  name: python-client-role
rules:
- apiGroups: [""]
  resources: ["configmaps", "pods", "pods/log", "pods/status", "services"]
  verbs: ["*"]
- apiGroups: ["networking.k8s.io"]
  resources: ["ingresses", "ingresses/status"]
  verbs: ["*"]
- apiGroups: ["sparkoperator.k8s.io"]
  resources: [sparkapplications]
  verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: python-client-role-binding
  namespace: spark-jobs
subjects:
- kind: ServiceAccount
  name: python-client-sa
  namespace: spark-jobs
roleRef:
  kind: Role
  name: python-client-role
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: node-reader
rules:
- apiGroups: [""]
  resources: ["nodes"]
  verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: python-client-cluster-role-binding
subjects:
- kind: ServiceAccount
  name: python-client-sa
  namespace: spark-jobs
roleRef:
  kind: ClusterRole
  name: node-reader
  apiGroup: rbac.authorization.k8s.io
Enter fullscreen mode Exit fullscreen mode
kubectl create -f k8s/python-client-sa-rbac.yaml
Enter fullscreen mode Exit fullscreen mode

This command creates a new service account named python-client-sa, a new role with the needed permissions in the spark-jobs namespace and then binds the new role to the newly created service account.

WARNING: The python-client-sa is the service account that will provide the identity for the Kubernetes Python Client in our application. Do not confuse this service account with the driver-sa service account for driver pods.

The Easy Way

In this method, we can use an helper utility to load authentication and cluster information from a kubeconfig file and store them in kubernetes.client.configuration.

from kubernetes import config, client

config.load_kube_config("path/to/kubeconfig_file")

v1 = client.CoreV1Api()

print("Listing pods with their IPs:")

ret = v1.list_namespaced_pod(namespace="spark-jobs")
for i in ret.items:
    print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
Enter fullscreen mode Exit fullscreen mode

But we DO NOT want to rely on the default kubeconfig file, denoted by the environment variable KUBECONFIG or, failing that, in ~/.kube/config. This kubeconfig file is yours, as user of the kubectl command. Concretely, with this kubeconfig file, you have the right to do almost everything in the Kubernetes cluster, and in all namespaces. Instead, we're going to generate one especially for the service account created above, with the help of the script kubeconfig-gen.sh:

#!/usr/bin/env bash

# set -eux

# Reads the API server name from the default `kubeconfig` file.
# Here we suppose that the kubectl command-line tool is already configured to communicate with our cluster.
APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')

SERVICE_ACCOUNT_NAME=${1:-python-client-sa}
NAMESPACE=${2:-spark-jobs}
SECRET_NAME=$(kubectl get serviceaccount ${SERVICE_ACCOUNT_NAME} -n ${NAMESPACE} -o jsonpath='{.secrets[0].name}')
TOKEN=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath='{.data.token}' | base64 --decode)
CACERT=$(kubectl get secret ${SECRET_NAME} -n ${NAMESPACE} -o jsonpath="{['data']['ca\.crt']}")


cat > kubeconfig-sa << EOF
apiVersion: v1
kind: Config
clusters:
- cluster:
    certificate-authority-data: ${CACERT}
    server: ${APISERVER}
  name: default-cluster
contexts:
- context:
    cluster: default-cluster
    namespace: ${NAMESPACE}
    user: ${SERVICE_ACCOUNT_NAME}
  name: default-context
current-context: default-context
users:
- user:
    token: ${TOKEN}
  name: ${SERVICE_ACCOUNT_NAME}
EOF
Enter fullscreen mode Exit fullscreen mode

The kubeconfig file thus created configures access to the cluster for the python-client-sa service account, with only the rights needed for our client application and in the single namespace spark-jobs ("principle of least privilege").

The Hard Way

Fetch credentials

Here, we're going to configure the Python client in the most programmatic way possible.

First, we need to fetch the credentials to access the Kubernetes cluster. We’ll store these in Python environmental variables.

export APISERVER=$(kubectl config view --minify -o jsonpath='{.clusters[0].cluster.server}')
SECRET_NAME=$(kubectl get serviceaccount python-client-sa -o jsonpath='{.secrets[0].name}')
export TOKEN=$(kubectl get secret ${SECRET_NAME} -o jsonpath='{.data.token}' | base64 --decode)
export CACERT=$(kubectl get secret ${SECRET_NAME} -o jsonpath="{['data']['ca\.crt']}")
Enter fullscreen mode Exit fullscreen mode

Note that environment variables are captured the first time the os module is imported, typically during IDE/Python startup. Changes to the environment made after this time are not reflected in os.environ (except for changes made by modifying os.environ directly).

Python sample usage

import base64
import os
from tempfile import NamedTemporaryFile

from kubernetes import client

api_server = os.environ["APISERVER"]
cacert = os.environ["CACERT"]
token = os.environ["TOKEN"]

# Set the configuration
configuration = client.Configuration()
with NamedTemporaryFile(delete=False) as cert:
    cert.write(base64.b64decode(cacert))
    configuration.ssl_ca_cert = cert.name
configuration.host = api_server
configuration.verify_ssl = True
configuration.debug = False
configuration.api_key = {"authorization": "Bearer " + token}
client.Configuration.set_default(configuration)

v1 = client.CoreV1Api()

print("Listing pods with their IPs:")

ret = v1.list_namespaced_pod(namespace="spark-jobs")
for i in ret.items:
    print("%s\t%s\t%s" % (i.status.pod_ip, i.metadata.namespace, i.metadata.name))
Enter fullscreen mode Exit fullscreen mode

Getting Started

Kubernetes Object Management

With the Kubernetes Python Client, you can create and manage Kubernetes objects programmatically.

In the following example (provided in the GitHub repository), we create, update then delete a Deployment using AppsV1Api:

"""
Creates, updates, and deletes a deployment using AppsV1Api.
"""

from kubernetes import client, config

DEPLOYMENT_NAME = "nginx-deployment"


def create_deployment_object():
    # Configureate Pod template container
    container = client.V1Container(
        name="nginx",
        image="nginx:1.15.4",
        ports=[client.V1ContainerPort(container_port=80)],
        resources=client.V1ResourceRequirements(
            requests={"cpu": "100m", "memory": "200Mi"},
            limits={"cpu": "500m", "memory": "500Mi"}
        )
    )
    # Create and configurate a spec section
    template = client.V1PodTemplateSpec(
        metadata=client.V1ObjectMeta(labels={"app": "nginx"}),
        spec=client.V1PodSpec(containers=[container]))
    # Create the specification of deployment
    spec = client.V1DeploymentSpec(
        replicas=3,
        template=template,
        selector={'matchLabels': {'app': 'nginx'}})
    # Instantiate the deployment object
    deployment = client.V1Deployment(
        api_version="apps/v1",
        kind="Deployment",
        metadata=client.V1ObjectMeta(name=DEPLOYMENT_NAME),
        spec=spec)

    return deployment


def create_deployment(api_instance, deployment):
    # Create deployement
    api_response = api_instance.create_namespaced_deployment(
        body=deployment,
        namespace="default")
    print("Deployment created. status='%s'" % str(api_response.status))


def update_deployment(api_instance, deployment):
    # Update container image
    deployment.spec.template.spec.containers[0].image = "nginx:1.16.0"
    # Update the deployment
    api_response = api_instance.patch_namespaced_deployment(
        name=DEPLOYMENT_NAME,
        namespace="default",
        body=deployment)
    print("Deployment updated. status='%s'" % str(api_response.status))


def delete_deployment(api_instance):
    # Delete deployment
    api_response = api_instance.delete_namespaced_deployment(
        name=DEPLOYMENT_NAME,
        namespace="default",
        body=client.V1DeleteOptions(
            propagation_policy='Foreground',
            grace_period_seconds=5))
    print("Deployment deleted. status='%s'" % str(api_response.status))


def main():
    # Configs can be set in Configuration class directly or using helper
    # utility. If no argument provided, the config will be loaded from
    # default location.
    config.load_kube_config("path/to/kubeconfig_file")
    apps_v1 = client.AppsV1Api()

    deployment = create_deployment_object()

    create_deployment(apps_v1, deployment)

    update_deployment(apps_v1, deployment)

    delete_deployment(apps_v1)


if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

This is great, but this involves mastering the client's API, and above all we must configure our objects imperatively: we specify the desired operation (create, replace, etc.) on Python objects that represent Kubernetes objects. Here, we prefer to manage our objects in a declarative way and operate on object configuration files (stored locally along the Python code source) like we usually do with the kubectl command. Indeed, Python code should only be a simple execution backend to trigger Kubernetes operations, and business logic, so to speak, should be concentrated in manifest files.

The deployment we created above is the same as in the nginx-deployment.yaml:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
  labels:
    app: nginx
spec:
  replicas: 3
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.15.4
        ports:
        - containerPort: 80
Enter fullscreen mode Exit fullscreen mode

We can directly load the manifest as follows:

from os import path
import yaml
from kubernetes import client, config


config.load_kube_config("path/to/kubeconfig_file")

with open(path.join(path.dirname(__file__), "nginx-deployment.yaml")) as f:
    dep = yaml.safe_load(f)
    k8s_apps_v1 = client.AppsV1Api()
    resp = k8s_apps_v1.create_namespaced_deployment(
        body=dep, namespace="default")
    print("Deployment created. status='%s'" % resp.metadata.name)
Enter fullscreen mode Exit fullscreen mode

This is the equivalent in Python of kubectl create -f nginx-deployment.yaml.

As you can see, you must call create_namespaced_deployment to create a Deployment. In the same way, you would call create_namespaced_pod to create a Pod, and so on. This is because the Python client is automatically generated following the OpenAPI specifications of the Kubernetes API.

It's a shame to have to call a specific method to create a particular type of object, even though the type of object itself is already specified in the manifest that we load through this method. Luckily, the Kubernetes Python Client provides a utility method that acts as an input hub for any kind of object.

import os
import yaml
from kubernetes import client, config, utils


config.load_kube_config("path/to/kubeconfig_file")

with open(os.path.join(os.path.dirname(__file__), "nginx-deployment.yaml")) as f:
    dep = yaml.safe_load(f)
    k8s_client = client.ApiClient()
    resp = utils.create_from_dict(k8s_client, dep)
    print("Deployment created. status='%s'" % resp[0].metadata.name)
Enter fullscreen mode Exit fullscreen mode

utils.create_from_dict is the magic method here. It only takes a Dict holding valid kubernetes objects. It is a blessing to have found it, because it is well hidden in the client and not documented at all.

So, to launch a Spark job with spark-submit, you could just call the code snippet above with a single YAML file which groups all the needed resources (separated by --- in YAML).

But what about the Spark Operator? utils.create_from_dict does not support custom resources, that means object types that are not part of the core Kubernetes API, namely SparkApplication from the Spark Operator. To run a Spark job with the Spark Operator, you have no other choice than calling the create_namespaced_custom_object function of CustomObjectsApi:

import os
import yaml
from kubernetes import client, config, utils


config.load_kube_config("path/to/kubeconfig_file")

with open(os.path.join(os.path.dirname(__file__), "k8s/spark-operator/pyspark-pi.yaml")) as f:
    dep = yaml.safe_load(f)
    custom_object_api = client.CustomObjectsApi()

    custom_object_api.create_namespaced_custom_object(
        group="sparkoperator.k8s.io",
        version="v1beta2",
        namespace="spark-jobs",
        plural="sparkapplications",
        body=dep,
    )
    print("SparkApplication created")
Enter fullscreen mode Exit fullscreen mode

Regarding spark-submit, there is an even more direct method utils.create_from_yaml, which reads Kubernetes objects from a YAML file. But we cannot use it, as we need to "parameterize" our YAML files before submitting them to the Kubernetes Python client.

Templating

As you know, when you apply a manifest file to Kubernetes - the YAML-formatted resource descriptions that Kubernetes can understand - you must specify the resource name which must be unique for that type of resource (and within the same namespace), otherwise Kubernetes will complain that the resource already exists.

For example, you can only have one Pod named myapp-1234 within the same namespace, but you can have one Pod and one Deployment that are each named myapp-1234.

As we want to run multiple Spark jobs simultaneously, and as these Spark jobs are mostly identical except for a few runtime parameters, we need to parameterize, or templatize, our Kubernetes YAML files.

Normally, you don't do that, at least that's not in Kubernetes' philosophy: Kubernetes files should be template-free and should only be patched, by the means of Kustomize for instance. Helm has also its own templating system.

You cannot use such a tool with the Kubernetes Python client. Instead, we are going to substitute references to variables of the form $VAR or ${VAR} with the corresponding values, exactly like envsubst, but programmatically.

Let's get back to Spark (native). We saw earlier the YAML file that defines a driver pod to run the Pi example program. As you can see, we have placeholders to specify the namespace, the priorityClassName, the
serviceAccountName, the nodeAffinity and a NAME_SUFFIX to make the pod's name unique.

Now, in the Python code, we replace at runtime these variables with the desired values before creating the pod:

import binascii
import os
from os import listdir
from pprint import pprint

import yaml
from kubernetes import client, config, utils


def create_k8s_object(yaml_file=None, env_subst=None):
    with open(yaml_file) as f:
        str = f.read()
        if env_subst:
            for env, value in env_subst.items():
                str = str.replace(env, value)
        return yaml.safe_load(str)


def main():
    # Configs can be set in Configuration class directly or using helper utility
    config.load_kube_config("path/to/kubeconfig_file")

    name_suffix = "-" + binascii.b2a_hex(os.urandom(8))
    priority_class_name = "routine"
    env_subst = {"${NAMESPACE}": "spark-jobs",
                 "${SERVICE_ACCOUNT_NAME}": "driver-sa",
                 "${DRIVER_NODE_AFFINITIES}": "driver",
                 "${EXECUTOR_NODE_AFFINITIES}": "compute",
                 "${NAME_SUFFIX}": name_suffix,
                 "${PRIORITY_CLASS_NAME}": priority_class_name}

    k8s_client = client.ApiClient()
    verbose = True

    # Create driver pod
    k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native")
    k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst)
    pprint(k8s_object_dict)
    k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    # TODO: create the other resources

    print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Putting the pieces together

We have now the general mechanics and we can create all the resources needed.
Remember, the driver pod consumes a ConfigMap to define environment variables and to mount configuration files in the Spark container (including the template for the executor pods). We also have a Service that allows executors to communicate back with the driver. And finally, we have another Service, backed by an Ingress, to expose the Spark UI.

We just iterate over the YAML files that define these resources and just call the same method utils.create_from_dict:

    # List all YAML files in k8s/spark-native directory, except the driver pod definition file
    other_resources = listdir(k8s_dir)
    other_resources.remove("pyspark-pi-driver-pod.yaml")
    for f in other_resources:
        k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst)
        pprint(k8s_object_dict)
        utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))
Enter fullscreen mode Exit fullscreen mode

Now that we've launched a full Spark application, let's see what happens when we kill it 😈 or when the application completes normally.

Garbage collection

Killed applications

The role of the Kubernetes garbage collector is to delete certain objects that once had an owner, but no longer have one. The goal is to make sure that the garbage collector properly deletes resources that are no longer needed when killing a Spark application.
It is important to free up the resources of the Kubernetes cluster when you are going to run tens / hundreds of Spark applications in parallel.

For this, certain Kubernetes objects can be declared owners of other objects. "Owned" objects are called dependent on the owner object. Each dependent object has a metadata.ownerReferences field that points to the owning object. When deleting an owner object, all dependent objects are also automatically deleted (cascading deletion) by default.

Example of an executor pod owned by its driver pod:

apiVersion: v1
kind: Pod
metadata:
  labels:
    spark-role: executor
  ownerReferences:
    - apiVersion: v1
      controller: true
      kind: Pod
      name: pyspark-pi-routine-0245dc3d340cd533-driver
      uid: 3b10fa97-c847-4fce-b3e1-71f779cffbef
...
Enter fullscreen mode Exit fullscreen mode

The Spark Operator automatically sets the value of ownerReference, at different levels: the custom SparkApplication resource owns the driver pod which owns its executors.

For applications that are submitted natively (without the Spark Operator), the highest level owner object is the driver pod: the executor pods automatically set the ownerReference field, pointing to the driver pod. But we must manage the ownership relationship ourselves for the other ConfigMap, Service and Ingress resources.
For this, we must retrieve the auto-generated uid of the newly created driver pod and inject it into the dependent objects: it is impossible to manually set the uid in the YAML definition files, this can only be done at runtime through code (and that's why we cannot put all the resources in a single YAML file).

import binascii
import os
from os import listdir
from pprint import pprint

import yaml
from kubernetes import config, utils
from kubernetes.client import ApiClient


def create_k8s_object(yaml_file=None, env_subst=None):
    with open(yaml_file) as f:
        str = f.read()
        if env_subst:
            for env in env_subst:
                str = str.replace(env, env_subst[env])
        return yaml.safe_load(str)


def main():
    # Configs can be set in Configuration class directly or using helper utility
    config.load_kube_config("path/to/kubeconfig_file")

    name_suffix = "-" + binascii.b2a_hex(os.urandom(8))
    priority_class_name = "routine"
    env_subst = {"${NAMESPACE}": "spark-jobs",
                 "${SERVICE_ACCOUNT_NAME}": "driver-sa",
                 "${DRIVER_NODE_AFFINITIES}": "driver",
                 "${EXECUTOR_NODE_AFFINITIES}": "compute",
                 "${NAME_SUFFIX}": name_suffix,
                 "${PRIORITY_CLASS_NAME}": priority_class_name}

    k8s_client = ApiClient()
    verbose = True

    # Create driver pod
    k8s_dir = os.path.join(os.path.dirname(__file__), "k8s/spark-native")
    k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, "pyspark-pi-driver-pod.yaml"), env_subst)
    pprint(k8s_object_dict)
    k8s_objects = utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    # Prepare ownership on dependent objects
    owner_refs = [{"apiVersion": "v1",
                   "controller": True,
                   "kind": "Pod",
                   "name": k8s_objects[0].metadata.name,
                   "uid": k8s_objects[0].metadata.uid}]

    # List all YAML files in k8s/spark-native directory, except the driver pod definition file
    other_resources = listdir(k8s_dir)
    other_resources.remove("pyspark-pi-driver-pod.yaml")
    for f in other_resources:
        k8s_object_dict = create_k8s_object(os.path.join(k8s_dir, f), env_subst)
        # Set ownership
        k8s_object_dict["metadata"]["ownerReferences"] = owner_refs
        pprint(k8s_object_dict)
        utils.create_from_dict(k8s_client, k8s_object_dict, verbose=verbose)

    print("Submitted %s" % (k8s_objects[0].metadata.labels["app-name"]))

if __name__ == "__main__":
    main()
Enter fullscreen mode Exit fullscreen mode

Applications normally completed

When an application completes normally, the executor pods terminate and are cleaned up, but the driver pod persists logs and remains in "completed" state in the Kubernetes API "until it's eventually garbage collected or cleaned up manually".

Note that in the completed state, the driver pod does not use any compute or memory resources.

The Spark Operator has TTL support for SparkApplications through the optional field named .spec.timeToLiveSeconds, which if set, defines the Time-To-Live (TTL) duration in seconds for a SparkAplication after its termination. The SparkApplication object will be garbage collected if the current time is more than the .spec.timeToLiveSeconds since its termination. The example below illustrates how to use the field:

spec:
  timeToLiveSeconds: 86400
Enter fullscreen mode Exit fullscreen mode

On the native Spark side, there is nothing in the doc that specifies how driver pods are ultimately deleted. We could set up a simple Kubernetes CronJob that would run periodically to delete them automatically.

At the time of writing this article, there are pending requests in Kubernetes to support TTL in Pods like in Jobs: "TTL controller only handles Jobs for now, and may be expanded to handle other resources that will finish execution, such as Pods and custom resources."

Background cascading deletion

When killing an application from the Python code, we delete the owner object using the Background cascading deletion policy.
In background cascading deletion, Kubernetes deletes the owner object immediately and the garbage collector then deletes the dependents in the background. This is useful so as not to delay the main execution thread.

To delete a Spark job launched by spark-submit:

from kubernetes import client


core_v1_api = client.CoreV1Api()
core_v1_api.delete_namespaced_pod("driver-pod-name", "spark-jobs", propagation_policy="Background")
Enter fullscreen mode Exit fullscreen mode

To delete a Spark job launched with the Spark Operator, we must delete the enclosing SparkApplication resource:

from kubernetes import client, config

custom_object_api = client.CustomObjectsApi()
custom_object_api.delete_namespaced_custom_object(
    group="sparkoperator.k8s.io",
    version="v1beta2",
    namespace="spark-jobs",
    plural="sparkapplications",
    name="app_name",
    propagation_policy="Background")
Enter fullscreen mode Exit fullscreen mode

Pod priority & preemption

Whether it is Volcano or the default kube-scheduler, job preemption relies on job priorities. For two jobs, the scheduler decides whose priority is higher by comparing .spec.priorityClassName (then createTime).

The priority is propagated to driver and executor pods, whether with native spark-submit or with Spark Operator, and regardless of the node affinities.

How to know which pods have been preempted

You can retrieve high-level information on what is happening in the cluster. To list all events in the namespace spark-jobs you can use:

# List Events sorted by timestamp
kubectl get events --sort-by=.metadata.creationTimestamp --namespace=spark-jobs
Enter fullscreen mode Exit fullscreen mode
LAST SEEN   TYPE      REASON             OBJECT                                                   MESSAGE
69s         Normal    Scheduled          podgroup/podgroup-6083b4f9-f240-4a0e-95f2-06882aec2942   pod group is ready
93s         Normal    Scheduled          pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Successfully assigned spark-jobs/pyspark-pi-driver-routine-bf20cae50b6a8253 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp
92s         Normal    Started            pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Started container pyspark-pi
92s         Normal    Pulled             pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
92s         Normal    Created            pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Created container pyspark-pi
82s         Normal    Scheduled          pod/pythonpi-34b3597593d246a1-exec-2                     Successfully assigned spark-jobs/pythonpi-34b3597593d246a1-exec-2 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl
82s         Normal    Scheduled          pod/pythonpi-34b3597593d246a1-exec-1                     Successfully assigned spark-jobs/pythonpi-34b3597593d246a1-exec-1 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck
82s         Normal    Created            pod/pythonpi-34b3597593d246a1-exec-1                     Created container spark-kubernetes-executor
82s         Normal    Started            pod/pythonpi-34b3597593d246a1-exec-1                     Started container spark-kubernetes-executor
82s         Normal    Pulled             pod/pythonpi-34b3597593d246a1-exec-1                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
81s         Normal    Created            pod/pythonpi-34b3597593d246a1-exec-2                     Created container spark-kubernetes-executor
81s         Normal    Pulled             pod/pythonpi-34b3597593d246a1-exec-2                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
80s         Normal    Started            pod/pythonpi-34b3597593d246a1-exec-2                     Started container spark-kubernetes-executor
42s         Normal    Killing            pod/pythonpi-34b3597593d246a1-exec-1                     Stopping container spark-kubernetes-executor
42s         Normal    Killing            pod/pythonpi-34b3597593d246a1-exec-2                     Stopping container spark-kubernetes-executor
42s         Warning   FailedScheduling   pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              all nodes are unavailable: 3 node(s) resource fit failed.
42s         Normal    Killing            pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Stopping container pyspark-pi
42s         Warning   Evict              pod/pyspark-pi-driver-routine-bf20cae50b6a8253           Pod is evicted, because of preempt
34s         Warning   Unschedulable      podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816   0/1 tasks in gang unschedulable: pod group is not ready, 1 Pipelined, 1 minAvailable.
41s         Warning   FailedScheduling   pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              1/1 tasks in gang unschedulable: pod group is not ready, 1 Pipelined, 1 minAvailable.
18s         Normal    Scheduled          podgroup/podgroup-ee4b9210-35c2-4d68-841a-2daf7712a816   pod group is ready
33s         Normal    Scheduled          pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Successfully assigned spark-jobs/pyspark-pi-driver-rush-bb25fc0b8efe7c4d to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-jpfp
32s         Normal    Started            pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Started container pyspark-pi
32s         Normal    Created            pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Created container pyspark-pi
32s         Normal    Pulled             pod/pyspark-pi-driver-rush-bb25fc0b8efe7c4d              Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
22s         Normal    Scheduled          pod/pythonpi-f36cce7593d332f1-exec-1                     Successfully assigned spark-jobs/pythonpi-f36cce7593d332f1-exec-1 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-wxck
22s         Normal    Scheduled          pod/pythonpi-f36cce7593d332f1-exec-2                     Successfully assigned spark-jobs/pythonpi-f36cce7593d332f1-exec-2 to gke-yippee-spark-k8s-clus-default-pool-0b72dd1d-pvdl
22s         Normal    Pulled             pod/pythonpi-f36cce7593d332f1-exec-1                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
21s         Normal    Started            pod/pythonpi-f36cce7593d332f1-exec-1                     Started container spark-kubernetes-executor
21s         Normal    Created            pod/pythonpi-f36cce7593d332f1-exec-1                     Created container spark-kubernetes-executor
21s         Normal    Pulled             pod/pythonpi-f36cce7593d332f1-exec-2                     Container image "eu.gcr.io/yippee-spark-k8s/spark-py:3.0.1" already present on machine
21s         Normal    Created            pod/pythonpi-f36cce7593d332f1-exec-2                     Created container spark-kubernetes-executor
21s         Normal    Started            pod/pythonpi-f36cce7593d332f1-exec-2                     Started container spark-kubernetes-executor

In the output above, we can see that the pod pyspark-pi-driver-routine-bf20cae50b6a8253 has been "evicted because of preempt" by another job with the "rush" priority.

Future work

Kubernetes provides containers with lifecycle hooks. The hooks enable containers to be aware of events in their management lifecycle and run code implemented in a handler when the corresponding lifecycle hook is executed.

In particular, the PreStop hook can be called immediately before a container is terminated due to preemption (among other events).
Thus, we can consider an action, whatever it is, to be triggered in case of preemption. All you need to do is implement and register a handler for this hook.

See Container Lifecycle Hooks.

We are soon coming to the end of our journey. Before wishing you good night, let's take a look at how we can monitor a Spark application from Python code.

Monitoring

Getting the status of a Spark application

Many operations in the Kubernetes Python client can be watched. This allows our Python program to watch for changes in a specific resource until you get the desired result or the watch expires.

Here, we want to monitor the lifecycle of the pod driver, starting in the Pending phase, moving through Running if the Spark container starts OK, and then through either the Succeeded or Failed phases:

from kubernetes import client, watch

app_name = 'pyspark-pi-routine-bf20cae50b6a8253'
v1 = client.CoreV1Api()

count = 2
w = watch.Watch()
label_selector = 'app-name=%s,spark-role=driver' % app_name
for event in w.stream(v1.list_namespaced_pod, namespace='spark-jobs', label_selector=label_selector, timeout_seconds=60):
    print('Event: %s' % event['object'].status.phase)
    count -= 1
    if not count:
        w.stop()
Enter fullscreen mode Exit fullscreen mode

Here, the driver pod (hence, the Spark application) is expected to complete, successfully or not, within 60 seconds or less. Its status should only change twice during this period: ideally Pending > Running > Succeeded.

Getting the logs

It is possible to retrieve the logs of the driver pod and mix them into those of the host application.
Getting the logs is just as easy:

from kubernetes import client, watch
from threading import Thread

v1 = client.CoreV1Api()
pod_name = 'pyspark-pi-routine-bf20cae50b6a8253-driver'

def logs(pod_name):
    w = watch.Watch()
    for event in w.stream(v1.read_namespaced_pod_log, pod_name=pod_name, namespace='spark-jobs', _request_timeout=300):
        yield event

# We surely don't want to block the main thread while reading the logs
def consumer():
    for log in logs(pod_name):
        print(log)

t = Thread(target=consumer)
t.start()
Enter fullscreen mode Exit fullscreen mode

Getting the Ingress URI

Once a Spark application has started, the ingress (at least, its public IP address) that exposes the Spark UI may take a while before it becomes available. Here too, we can monitor the Ingress resource as follows:

from kubernetes import client, watch

app_name = 'pyspark-pi-routine-bf20cae50b6a8253'

networking_v1_beta1_api = client.NetworkingV1beta1Api()
w = watch.Watch()
label_selector = 'app-name=%s' % app_name
for event in w.stream(networking_v1_beta1_api.list_namespaced_ingress, namespace=namespace,
                      label_selector=label_selector,
                      timeout_seconds=30):
    ingress = event['object'].status.load_balancer.ingress
    if ingress:
        external_ip = ingress[0].ip
        print('Event: The Spark Web UI is available at http://%s/%s' % (external_ip, app_name))
        w.stop()
    else:
        print('Event: Ingress not yet available')
Enter fullscreen mode Exit fullscreen mode

Conclusion

What a hell of a journey!

We have seen that the Python code for launching or deleting Spark applications is slightly different depending on whether we are using the Spark Operator or spark-submit. But since we name and label the Kubernetes objects consistently between the two, and as we set the ownership relationships properly, we can monitor our Spark applications and manage their lifecycle equally.

Would you like to know more? The Python scripts explained in this last article are available in this GitHub repository. Serve yourself.

💖 💪 🙅 🚩
psclgllt
Pascal Gillet

Posted on April 12, 2021

Join Our Newsletter. No Spam, Only the good stuff.

Sign up to receive the latest update from our blog.

Related