Apache Airflow - Kubernetes Pod Operator

Introduction
This article is part of the series on Airflow on Kubernetes. In this series, we will cover the following topics:
This is the second article in the series.
In this article, we’ll explore how to use the Kubernetes Pod Operator in Apache Airflow to execute tasks within a Kubernetes pod. Specifically, we’ll run a Sling ETL task to migrate data from a source database to Azure Blob Storage in Parquet format. Along the way, you’ll learn how to leverage a custom Docker image with the Kubernetes Pod Operator.
What is Airflow Kubernetes Pod Operator
Kubernetes Pod Operator is an operator that executes a task in a Kubernetes Pod. It is useful when you want to run a task in a Kubernetes Pod with your custom Docker image.
In comparison to the Kubernetes Executor, the Kubernetes Pod Operator can handle more complex tasks that require a custom Docker image.
In this article, we will run a Sling ETL task that migrate data from a source database to Azure Blob Storage in Parquet format.
Add Kubernetes Cluster Connection on Airflow Web UI
First, we need to add the Kubernetes cluster connection on the Airflow Web UI.
-
Go to the Airflow Web UI
-
Click on the Admin menu
-
Click on Connections
-
Click on the Create button

-
Fill in the following fields:
-
Connection Id:
k8s_conn
-
Connection Type:
Kubernetes Cluster Connection
-
Kube Config (JSON format): refer to the following section
-
Namespace:
airflow
-
Cluster context:
your-cluster-context
-
-
Click on the Save button
Fill in the form with the appropriate values. The 'Kube Config (JSON format)' field should contain the content of the kubeconfig file in JSON format.

Convert kube config to JSON
If you are working with kubectl command, you already have a Kube config file in YAML format. The config file is usually located at ~/.kube/config
.
~/.kube/config (YAML)
# convert yaml to json
$ yq -o json eval . ~/.kube/config > kube-config/config.json
The command above will convert the kubeconfig file to JSON format and save it to kube-config/config.json
.
We can use the content of the JSON file as the 'Kube Config (JSON format)' in the Airflow Web UI.
Current Context
to get the current context of the kubeconfig file, run the following command:
$ kubectl config current-context
We can use the output of the command as the 'Cluster context' in the Airflow Web UI.
Add kpo-hello-world DAG
Now, let’s create a DAG that uses the Kubernetes Pod Operator to run hello-world Docker image.
import pendulum
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
container_resources = k8s.V1ResourceRequirements(
limits={
"memory": "512Mi",
"cpu": 0.2,
},
requests={
"memory": "512Mi",
"cpu": 0.1,
},
)
with DAG(
dag_id="kpo-hello-world",
schedule=None,
start_date=pendulum.datetime(2024, 10, 1, tz="UTC"),
catchup=False,
tags=["kpo"],
) as dag:
helloTask = KubernetesPodOperator(
task_id='hello',
kubernetes_conn_id="k8s_conn",
name='hello',
namespace='airflow',
container_resources=container_resources,
image='hello-world:latest',
#image='ubuntu:20.04',
#cmds=["bash", "-cx"],
#arguments=["echo", "hello world"],
is_delete_operator_pod=True,
get_logs=True,
)
helloTask
For this example, we will use the hello-world
Docker image which is a simple image that prints "Hello from Docker!".
Upload the DAG file to the Dags storage which is Azure File Share in this example.
Run the DAG
To run the DAG, follow these steps:
-
Go to the Airflow Web UI
-
Click on the DAGs menu
-
Click on the
kpo-hello-world
DAG -
Click on the Trigger DAG button

Now we have successfully run the kpo-hello-world
DAG using the Kubernetes Pod Operator that runs the hello-world
Docker image.
In the next section, we will learn how to run a more complex task using the Kubernetes Pod Operator.
What is Sling
Powerful Data Integration CLI tool.
Running your EL tasks from the CLI has never been simpler. Whether ingesting CSV or JSON files, transferring data between databases, or exporting a custom SQL query to a Parquet file — Sling is the solution that empowers you to achieve it effortlessly.
https://sling.etl.dev/
Install Sling on Mac
Before we can run the Sling ETL task, we need to install Sling on our local machine to see how it works.
To install Sling on Mac, run the following command:
$ brew install slingdata-io/sling/sling
$ sling -h
For more information on how to install Sling on other platforms, refer to the official Sling documentation.
Scenario for Sling ETL Task
In this scenario, we will run a Sling ETL task that migrates data from a source database to Azure Blob Storage in Parquet format.
-
Source Database: PostgreSQL
-
Destination: Azure Blob Storage
Setting up Connections
Sling ENV file is supposed to be located at ~/.sling/env.yaml
. We will define the connections for the source database and Azure Blob Storage in the ENV file.
connections:
PG_REPLICA:
type: postgres
host: {your-host}
user: {db-user}
password: {db-password}
port: 5432
database: {database-name}
schema: {schema-name}
sslmode: require
AZURE_STORAGE:
type: azure
account: {storage-account}
container: {container-name}
sas_svc_url: '{sas-url}'
#!/bin/sh
echo "##### Starting Sling #####"
DEST_URL="https://{azure-storage-account}.blob.core.windows.net/{container-name}/sling/$(date +%Y-%m-%d)/division/"
echo "DEST_URL: $DEST_URL"
sling run --src-conn PG_REPLICA --src-stream 'division' \
--tgt-conn AZURE_STORAGE --tgt-object "$DEST_URL" \
--tgt-options '{file_max_rows: 10, format: parquet}'
To run the Sling ETL task, execute the run-sling.sh
script.
$ ./run-sling.sh
# Output
##### Starting Sling #####
DEST_URL: https://{azure-storage-account}.blob.core.windows.net/{container-name}/sling/2024-11-26/division/
5:21PM INF connecting to source database (postgres)
5:21PM INF reading from source database
5:21PM INF writing to target file system (azure)
5:21PM INF wrote 32 rows [8 r/s] to https://{azure-storage-account}.blob.core.windows.net/{container-name}/sling/2024-11-26/division/
5:21PM INF execution succeeded
The Sling ETL task will migrate data from the source database to Azure Blob Storage in Parquet format. We can see the Parquet files saved to the Azure Blob Storage.

Custom Docker Image
To run the Sling ETL task using the Kubernetes Pod Operator, we need to create a custom Docker image that contains the Sling CLI tool.
FROM slingdata/sling
WORKDIR /usr/app
COPY env.yaml /home/sling/.sling/env.yaml
COPY run-sling.sh /usr/app/run-sling.sh
ENTRYPOINT ["sh", "run-sling.sh"]
I used the same env.yaml and run-sling.sh files from the previous section.
Push Docker Image to Azure Container Registry
$ az acr login --name $ACR_NAME
$ az acr build --image sling-example-division:0.1.0 --registry $ACR_NAME ./sling/docker
Now we have successfully pushed the custom Docker image to the Azure Container Registry. The name of the image is sling-example-division
with the tag 0.1.0
.
Add kpo-sling-division DAG
Now, let’s create a DAG that uses the Kubernetes Pod Operator to run the Sling ETL task.
import pendulum
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s
container_resources = k8s.V1ResourceRequirements(
limits={
"memory": "512Mi",
"cpu": "200m",
},
requests={
"memory": "512Mi",
"cpu": "100m",
},
)
with DAG(
dag_id="kpo-sling-division",
schedule=None,
start_date=pendulum.datetime(2024, 10, 1, tz="UTC"),
catchup=False,
tags=["kpo"],
) as dag:
slingTask = KubernetesPodOperator(
task_id='sling',
kubernetes_conn_id="k8s_conn",
name='sling',
namespace='airflow',
container_resources=container_resources,
# pod_template_file="/opt/airflow/custom-pod-templates/pod_template_file.yaml",
image='iclinicacr.azurecr.io/sling-example-division:0.1.0',
image_pull_policy='Always',
#cmds=["sh", "-c", "/usr/app/run-sling.sh"],
# in_cluster=True,
is_delete_operator_pod=True,
get_logs=True,
#service_account_name='airflow-worker',
#config_file="/opt/airflow/dags/kube-config/config",
)
slingTask
Upload the DAG file to the Dags storage which is Azure File Share in this example.
And run the DAG using the Airflow Web UI.

We successfully ran the kpo-sling-division DAG using the Kubernetes Pod Operator to execute the Sling ETL task. The logs confirm that the Sling ETL task completed successfully, and the Parquet files have been saved to Azure Blob Storage.
Conclusion
In conclusion, we’ve learned how to use the Kubernetes Pod Operator in Apache Airflow to execute tasks within a Kubernetes pod using a custom Docker image. We successfully ran a Sling ETL task to migrate data from a source database to Azure Blob Storage in Parquet format.
All my LinkedIn articles are available at All My LinkedIn Articles.