Writing Kubernetes Operators with Kopf

2023-05-16

Quick Notes 📚

  • Controllers are control loops that watch the state of your cluster & update the actual state of the cluster, examples below are core controllers managed by the Kubernetes controller manager in the control plane:
    • Node controller: Responsible for noticing and responding when nodes go down.
    • Job controller: Watches for Job objects that represent one-off tasks, then creates Pods to run those tasks to completion.
    • ServiceAccount controller: Create default ServiceAccounts for new namespaces.
  • Resources: are endpoints in the Kubernetes API.
  • Custom resources: are extensions of the Kubernetes API that is not necessarily available in the default Kubernetes installation.
  • Operators are application-specific controllers.
    An operator is a type of controller that extends Kubernetes’ functionality to manage custom resources (CRDs - Custom Resource Definitions) specific to a particular application or workload. Unlike built-in controllers in Kubernetes, which handle standard resources like Pods, Deployments, and Services, operators are tailored to manage higher-level abstractions created by users for their applications.

While Golang is the dominant language in the Kubernetes ecosystem, there are other popular operators and frameworks for building operators in languages other than Go:

  1. Kopf : is a Python framework for building Kubernetes operators.
  2. Java Operator SDK: The Java Operator SDK is a framework for building Kubernetes operators using the Java programming language.
  3. NodeJS Kubernetes operator framework: The Operator Framework for building Kubernetes operators using JavaScript or TypeScript.
  4. kube-rs: is a Rust library for interacting with Kubernetes. It provides bindings to the Kubernetes API and can be used to build custom operators in Rust.
  5. dotnet-operator-sdk: The dotnet Kubernetes Operator SDK.

Operator’s Logic

In this post, we will walk through the process of building a simple operator using Kopf. The operator we’ll create is an alert operator that monitors the states of pods and generates alerts for pods that are not running. We will also include log trails to assist application developers in debugging.

Note:

Please note that this tutorial focuses on the core functionality of the operator. Although it can be extended to integrate with Slack webhooks to send alerts to specific channels, we will keep the tutorial simple and solely focus on the basic implementation.

By the end of this tutorial, you will have a solid understanding of how to build a basic operator using Kopf and leverage its capabilities to enhance your Kubernetes-based applications. Let’s get started!

Environment Setup

1
2
3
4
minikube start --cpus 4 --memory 4096
kubectl config get-contexts
kubectl config current-context
kubectl config use-context minikube

Custom Resource Definition

Let us define a CRD (custom resource definition) for our alerting object:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
## alert.yaml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
spec:
scope: Namespaced
group: kopf.dev
names:
kind: Orchestr
plural: orchestrs
singular: orchestr
shortNames:
- orcs
- orc
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
spec:
type: object
x-kubernetes-preserve-unknown-fields: true
status:
type: object
x-kubernetes-preserve-unknown-fields: true
1
kubectl apply -f alert.yaml

The yaml above is a CustomResourceDefinition (CRD) resource in Kubernetes. Let’s break down the different sections:

  • apiVersion: apiextensions.k8s.io/v1 specifies the API version of the CustomResourceDefinition resource.
  • metadata: Contains metadata information about the CustomResourceDefinition resource.
  • spec: Defines the specifications for the CustomResourceDefinition.
  • scope: Namespaced, Specifies that the custom resource defined by this CRD will be namespaced, meaning it will be limited to a specific namespace.
  • group: kopf.dev: Specifies the group to which this custom resource belongs.
  • kind: OrchestrSpecifies the singular name of the custom resource. In this case, the singular form is "orchestr".
  • plural: orchestrs: Specifies the plural name of the custom resource. In this case, the plural form is "orchestrs".
  • openAPIV3Schema: Specifies the OpenAPI v3 schema for the custom resource.
  • x-kubernetes-preserve-unknown-fields: Indicates that any unknown fields encountered in the custom resource should be preserved when reading or writing the resource.

Custom Resource Object

1
2
3
4
5
6
7
8
9
#obj.yaml

apiVersion: kopf.dev/v1
kind: Orchestr
metadata:
name: my-alerts
spec:
alerts:
checkRunningPods: true
1
kubectl apply -f obj.yaml

Get a list of the existing objects of this kind with one of the commands:

1
2
3
4
5
6
kubectl get Orchestr
kubectl get orchestrs
kubectl get orchestr
kubectl get orcs
kubectl get orc
kubectl get orc my-alerts -o yaml

Output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
apiVersion: kopf.dev/v1
kind: Orchestr
metadata:
annotations:
kopf.zalando.org/last-handled-configuration: |
{"spec":{"alerts":{"checkRunningPods":true}}}
kubectl.kubernetes.io/last-applied-configuration: |
{"apiVersion":"kopf.dev/v1","kind":"Orchestr","metadata":{"annotations":{},"name":"my-alerts","namespace":"default"},"spec":{"alerts":{"checkRunningPods":true}}}
creationTimestamp: "2023-07-02T22:22:38Z"
generation: 1
name: my-alerts
namespace: default
resourceVersion: "212913"
uid: 36c8bf95-9b05-4f7b-98fe-954eb97e7e3d
spec:
alerts:
checkRunningPods: true

Writing the Operator Logic

The following 3 core cause-handlers are available on Kopf, they keep track of the following

  1. When are object is created
  2. When an object is deleted
  3. When an object is updated
1
2
3
4
5
6
7
8
9
10
11
12
13
import kopf

@kopf.on.create('orchestrs')
def my_handler(spec, **_):
pass

@kopf.on.update('orchestrs')
def my_handler(spec, old, new, diff, **_):
pass

@kopf.on.delete('orchestrs')
def my_handler(spec, **_):
pass

For this post, we will just focus on the @kopf.on.create
kopf.on.create is a decorator or function provided by the Kopf framework for defining handlers that are triggered when a new resource of a specific kind is created in Kubernetes.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
#main.py
from kubernetes import client, config
from kubernetes.client.rest import ApiException
import kopf


@kopf.on.create('orchestrs')
def create_fn(spec, namespace, **kwargs):

alert = spec['alerts']['checkRunningPods']
if not alert:
raise kopf.PermanentError(
"An alert must be set, check the documentations for guide")
problematic_pods = check_pod_status()
if problematic_pods:
print("The following pods are not in the 'Running' state:")
for pod in problematic_pods:
pod_name = pod['name']
pod_namespace = pod['namespace']
pod_status = pod['status']
pod_issues = pod['issues']

print(f"Pod: {pod_name} in namespace: {pod_namespace}")
print(f"Status: {pod_status}")
if pod_issues:
print("Issues:")
for issue in pod_issues:
print(f"- {issue}")

try:
logs = get_pod_logs(pod_name, pod_namespace)
print("Logs:")
print(logs)
except ApiException as e:
print(
f"Failed to retrieve logs for pod {pod_name} in namespace {pod_namespace}. Error: {e}"
)

print("-" * 50)
else:
print("All pods are running.")


def check_pod_status():
# Load Kubernetes configuration from default location
config.load_kube_config()

# Create an instance of the Kubernetes client
api_client = client.CoreV1Api()

# Retrieve all pods in the cluster
pods = api_client.list_pod_for_all_namespaces(watch=False)

# Check the status of each pod
problematic_pods = []
for pod in pods.items:
if pod.status.phase != "Running":
pod_name = pod.metadata.name
pod_namespace = pod.metadata.namespace
pod_status = pod.status.phase
pod_issues = []

# Check for specific issues with the pod
for container_status in pod.status.container_statuses:
if container_status.state.waiting is not None:
pod_issues.append(
f"Container {container_status.name} is waiting: {container_status.state.waiting.reason}"
)

problematic_pods.append({
"name": pod_name,
"namespace": pod_namespace,
"status": pod_status,
"issues": pod_issues
})

return problematic_pods


def get_pod_logs(pod_name, namespace):
# Load Kubernetes configuration from default location
config.load_kube_config()

# Create an instance of the Kubernetes client
api_client = client.CoreV1Api()

try:
# Retrieve the logs for the pod
logs = api_client.read_namespaced_pod_log(name=pod_name,
namespace=namespace)
return logs
except ApiException as e:
error_message = e.body
raise ApiException(
f"Failed to retrieve logs for pod {pod_name} in namespace {namespace}. Error: {error_message}"
)

The code you provided is a Python script that uses the Kopf framework to build a Kubernetes operator. Let’s go through the code and explain its functionality:

  • The script starts by importing necessary modules:
    • kubernetes for interacting with the Kubernetes cluster
    • kopf for building the operator
    • ApiException from kubernetes.client.rest for handling API exceptions.
  • The @kopf.on.create('orchestrs') decorator indicates that the following function is a handler for the creation of custom resources of kind 'orchestrs'. When a new resource of this kind is created, the create_fn function will be triggered.
  • Inside the create_fn function, it retrieves the value of the alerts.checkRunningPods field from the custom resource’s spec. If this field is not set, it raises a kopf.PermanentError with an error message indicating that an alert must be set.
  • The check_pod_status function is called to check the status of all pods in the cluster. It retrieves the list of pods using the Kubernetes client and iterates over each pod to check if its status is not “Running”. If a pod is found with a status other than “Running”, it collects information about the pod, such as its name, namespace, status, and any issues related to its containers. These problematic pods are stored in the problematic_pods list.
  • The get_pod_logs function retrieves the logs for a specific pod in a given namespace. It uses the Kubernetes client to read the logs for the specified pod. If an ApiException occurs while retrieving the logs, it raises an exception with an appropriate error message.

Building an image for the Operator

1
2
3
4
5
FROM python:3.7
RUN pip install kopf kubernetes
COPY main.py /main.py
CMD kopf run --verbose /main.py

1
2
docker image build -t lindaindicina/alert-operator:latest .
docker image push lindaindicina/alert-operator:latest

RBAC Rules

Operators often need RBAC (Role-Based Access Control) to perform various tasks within a Kubernetes cluster.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#rbac.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: alert-operator

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: alert-operator
rules:
- apiGroups: ["*"]
resources: ["*"]
verbs: ["*"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: alert-operator-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: alert-operator
subjects:
- kind: ServiceAccount
name: alert-operator
namespace: default
1
kubectl apply -f rbac.yaml

Deploying the Operator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#deploy..yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: alert-operator
spec:
selector:
matchLabels:
app: kopf-alert-operators
template:
metadata:
labels:
app: kopf-alert-operators
spec:
serviceAccountName: alert-operator
containers:
- image: lindaindicina/alert-operator:latest
name: alert-operator-kopf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
kubectl get pods
NAMESPACE NAME READY STATUS RESTARTS AGE
default alert-operator-6b744c8d5f-4rb5h 1/1 Running 1 (129m ago) 21h

kubectl logs alert-operator-6b744c8d5f-4rb5h
[2023-07-03 18:06:10,298] kopf._core.reactor.r [DEBUG ] Starting Kopf 1.36.1.
[2023-07-03 18:06:10,299] kopf._core.engines.a [INFO ] Initial authentication has been initiated.
[2023-07-03 18:06:10,299] kopf.activities.auth [DEBUG ] Activity 'login_via_client' is invoked.
[2023-07-03 18:06:10,302] kopf.activities.auth [DEBUG ] Client is configured in cluster with service account.
[2023-07-03 18:06:10,303] kopf.activities.auth [INFO ] Activity 'login_via_client' succeeded.
[2023-07-03 18:06:10,304] kopf._core.engines.a [INFO ] Initial authentication has finished.
[2023-07-03 18:06:10,478] kopf._cogs.clients.w [DEBUG ] Starting the watch-stream for customresourcedefinitions.v1.apiextensions.k8s.io cluster-wide.
[2023-07-03 18:06:10,479] kopf._cogs.clients.w [DEBUG ] Starting the watch-stream for orchestrs.v1.kopf.dev cluster-wide.
[2023-07-03 18:06:10,609] kopf.objects [DEBUG ] [default/my-alerts] Resuming is in progress: {'apiVersion': 'kopf.dev/v1', 'kind': 'Orchestr', 'metadata': {'annotations': {'kopf.zalando.org/last-handled-configuration': '{"spec":{"alerts":{"checkRunningPods":true}}}\n', 'kubectl.kubernetes.io/last-applied-configuration': '{"apiVersion":"kopf.dev/v1","kind":"Orchestr","metadata":{"annotations":{},"name":"my-alerts","namespace":"default"},"spec":{"alerts":{"checkRunningPods":true}}}\n'}, 'creationTimestamp': '2023-07-02T22:22:38Z', 'generation': 1, 'managedFields': [{'apiVersion': 'kopf.dev/v1', 'fieldsType': 'FieldsV1', 'fieldsV1': {'f:metadata': {'f:annotations': {'.': {}, 'f:kubectl.kubernetes.io/last-applied-configuration': {}}}, 'f:spec': {'.': {}, 'f:alerts': {'.': {}, 'f:checkRunningPods': {}}}}, 'manager': 'kubectl-client-side-apply', 'operation': 'Update', 'time': '2023-07-02T22:22:38Z'}, {'apiVersion': 'kopf.dev/v1', 'fieldsType': 'FieldsV1', 'fieldsV1': {'f:metadata': {'f:annotations': {'f:kopf.zalando.org/last-handled-configuration': {}}}}, 'manager': 'kopf', 'operation': 'Update', 'time': '2023-07-02T22:33:24Z'}], 'name': 'my-alerts', 'namespace': 'default', 'resourceVersion': '212913', 'uid': '36c8bf95-9b05-4f7b-98fe-954eb97e7e3d'}, 'spec': {'alerts': {'checkRunningPods': True}}}.....

Conclusion

The example discussed in this article serves as a simple introduction to using Kopf. It aims to provide beginners with valuable insights and a starting point for utilizing Kopf in their Kubernetes projects.

Hello đź‘‹, If you enjoyed this article, please consider subscribing to my email newsletter. Subscribe đź“­