Service Foundry
Young Gyu Kim <credemol@gmail.com>

Apache Airflow - Kubernetes Pod Operator

apache airflow k8s pod operator

Introduction

This article is part of the series on Airflow on Kubernetes. In this series, we will cover the following topics:

This is the second article in the series.

In this article, we’ll explore how to use the Kubernetes Pod Operator in Apache Airflow to execute tasks within a Kubernetes pod. Specifically, we’ll run a Sling ETL task to migrate data from a source database to Azure Blob Storage in Parquet format. Along the way, you’ll learn how to leverage a custom Docker image with the Kubernetes Pod Operator.

What is Airflow Kubernetes Pod Operator

Kubernetes Pod Operator is an operator that executes a task in a Kubernetes Pod. It is useful when you want to run a task in a Kubernetes Pod with your custom Docker image.

In comparison to the Kubernetes Executor, the Kubernetes Pod Operator can handle more complex tasks that require a custom Docker image.

In this article, we will run a Sling ETL task that migrate data from a source database to Azure Blob Storage in Parquet format.

Add Kubernetes Cluster Connection on Airflow Web UI

First, we need to add the Kubernetes cluster connection on the Airflow Web UI.

  1. Go to the Airflow Web UI

  2. Click on the Admin menu

  3. Click on Connections

  4. Click on the Create button

k8s connection add
Figure 1. Add Kubernetes Cluster Connection
  1. Fill in the following fields:

    • Connection Id: k8s_conn

    • Connection Type: Kubernetes Cluster Connection

    • Kube Config (JSON format): refer to the following section

    • Namespace: airflow

    • Cluster context: your-cluster-context

  2. Click on the Save button

Fill in the form with the appropriate values. The 'Kube Config (JSON format)' field should contain the content of the kubeconfig file in JSON format.

k8s connection form
Figure 2. Kubernetes Cluster Connection Form

Convert kube config to JSON

If you are working with kubectl command, you already have a Kube config file in YAML format. The config file is usually located at ~/.kube/config.

~/.kube/config (YAML)

# convert yaml to json
$ yq -o json eval . ~/.kube/config > kube-config/config.json

The command above will convert the kubeconfig file to JSON format and save it to kube-config/config.json.

We can use the content of the JSON file as the 'Kube Config (JSON format)' in the Airflow Web UI.

Current Context

to get the current context of the kubeconfig file, run the following command:

$ kubectl config current-context

We can use the output of the command as the 'Cluster context' in the Airflow Web UI.

Add kpo-hello-world DAG

Now, let’s create a DAG that uses the Kubernetes Pod Operator to run hello-world Docker image.

/dags/kpo-hello-world.py
import pendulum

from airflow import DAG

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

container_resources = k8s.V1ResourceRequirements(
    limits={
        "memory": "512Mi",
        "cpu": 0.2,
    },
    requests={
        "memory": "512Mi",
        "cpu": 0.1,
    },
)

with DAG(
    dag_id="kpo-hello-world",
    schedule=None,
    start_date=pendulum.datetime(2024, 10, 1, tz="UTC"),
    catchup=False,
    tags=["kpo"],
) as dag:
    helloTask = KubernetesPodOperator(
        task_id='hello',
        kubernetes_conn_id="k8s_conn",
        name='hello',
        namespace='airflow',
        container_resources=container_resources,
        image='hello-world:latest',
        #image='ubuntu:20.04',
        #cmds=["bash", "-cx"],
        #arguments=["echo", "hello world"],
        is_delete_operator_pod=True,
        get_logs=True,

    )

    helloTask

For this example, we will use the hello-world Docker image which is a simple image that prints "Hello from Docker!".

Upload the DAG file to the Dags storage which is Azure File Share in this example.

Run the DAG

To run the DAG, follow these steps:

  1. Go to the Airflow Web UI

  2. Click on the DAGs menu

  3. Click on the kpo-hello-world DAG

  4. Click on the Trigger DAG button

kpo hello world logs
Figure 3. kpo-hello-world DAG - Logs

Now we have successfully run the kpo-hello-world DAG using the Kubernetes Pod Operator that runs the hello-world Docker image.

In the next section, we will learn how to run a more complex task using the Kubernetes Pod Operator.

What is Sling

Powerful Data Integration CLI tool.

Running your EL tasks from the CLI has never been simpler. Whether ingesting CSV or JSON files, transferring data between databases, or exporting a custom SQL query to a Parquet file — Sling is the solution that empowers you to achieve it effortlessly.

— Sling
https://sling.etl.dev/

Install Sling on Mac

Before we can run the Sling ETL task, we need to install Sling on our local machine to see how it works.

To install Sling on Mac, run the following command:

$ brew install slingdata-io/sling/sling

$ sling -h

For more information on how to install Sling on other platforms, refer to the official Sling documentation.

Scenario for Sling ETL Task

In this scenario, we will run a Sling ETL task that migrates data from a source database to Azure Blob Storage in Parquet format.

  • Source Database: PostgreSQL

  • Destination: Azure Blob Storage

Setting up Connections

Sling ENV file is supposed to be located at ~/.sling/env.yaml. We will define the connections for the source database and Azure Blob Storage in the ENV file.

~/.sling/env.yaml
connections:
  PG_REPLICA:
    type: postgres
    host: {your-host}
    user: {db-user}
    password: {db-password}
    port: 5432
    database: {database-name}
    schema: {schema-name}
    sslmode: require

  AZURE_STORAGE:
    type: azure
    account: {storage-account}
    container: {container-name}
    sas_svc_url: '{sas-url}'
run-sling.sh
#!/bin/sh

echo "##### Starting Sling #####"

DEST_URL="https://{azure-storage-account}.blob.core.windows.net/{container-name}/sling/$(date +%Y-%m-%d)/division/"

echo "DEST_URL: $DEST_URL"

sling run --src-conn PG_REPLICA --src-stream 'division' \
  --tgt-conn AZURE_STORAGE --tgt-object "$DEST_URL" \
  --tgt-options '{file_max_rows: 10, format: parquet}'

To run the Sling ETL task, execute the run-sling.sh script.

$ ./run-sling.sh

# Output

##### Starting Sling #####
DEST_URL: https://{azure-storage-account}.blob.core.windows.net/{container-name}/sling/2024-11-26/division/
5:21PM INF connecting to source database (postgres)
5:21PM INF reading from source database
5:21PM INF writing to target file system (azure)
5:21PM INF wrote 32 rows [8 r/s] to https://{azure-storage-account}.blob.core.windows.net/{container-name}/sling/2024-11-26/division/
5:21PM INF execution succeeded

The Sling ETL task will migrate data from the source database to Azure Blob Storage in Parquet format. We can see the Parquet files saved to the Azure Blob Storage.

azure blob storage division
Figure 4. parquet-files on Azure Blob Storage

Custom Docker Image

To run the Sling ETL task using the Kubernetes Pod Operator, we need to create a custom Docker image that contains the Sling CLI tool.

sling/docker/Dockerfile
FROM slingdata/sling

WORKDIR /usr/app

COPY env.yaml /home/sling/.sling/env.yaml
COPY run-sling.sh /usr/app/run-sling.sh

ENTRYPOINT ["sh", "run-sling.sh"]

I used the same env.yaml and run-sling.sh files from the previous section.

Push Docker Image to Azure Container Registry

$ az acr login --name $ACR_NAME
$ az acr build --image sling-example-division:0.1.0 --registry $ACR_NAME ./sling/docker

Now we have successfully pushed the custom Docker image to the Azure Container Registry. The name of the image is sling-example-division with the tag 0.1.0.

Add kpo-sling-division DAG

Now, let’s create a DAG that uses the Kubernetes Pod Operator to run the Sling ETL task.

/dags/kpo-sling-division.py
import pendulum

from airflow import DAG

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from kubernetes.client import models as k8s

container_resources = k8s.V1ResourceRequirements(
    limits={
        "memory": "512Mi",
        "cpu": "200m",
    },
    requests={
        "memory": "512Mi",
        "cpu": "100m",
    },
)

with DAG(
    dag_id="kpo-sling-division",
    schedule=None,
    start_date=pendulum.datetime(2024, 10, 1, tz="UTC"),
    catchup=False,
    tags=["kpo"],
) as dag:

    slingTask = KubernetesPodOperator(
        task_id='sling',
        kubernetes_conn_id="k8s_conn",
        name='sling',
        namespace='airflow',
        container_resources=container_resources,
#         pod_template_file="/opt/airflow/custom-pod-templates/pod_template_file.yaml",
        image='iclinicacr.azurecr.io/sling-example-division:0.1.0',
        image_pull_policy='Always',
        #cmds=["sh", "-c", "/usr/app/run-sling.sh"],
#         in_cluster=True,
        is_delete_operator_pod=True,
        get_logs=True,
        #service_account_name='airflow-worker',
        #config_file="/opt/airflow/dags/kube-config/config",
    )



slingTask

Upload the DAG file to the Dags storage which is Azure File Share in this example.

And run the DAG using the Airflow Web UI.

kpo sling division logs
Figure 5. kpo-sling-division DAG - Logs

We successfully ran the kpo-sling-division DAG using the Kubernetes Pod Operator to execute the Sling ETL task. The logs confirm that the Sling ETL task completed successfully, and the Parquet files have been saved to Azure Blob Storage.

Conclusion

In conclusion, we’ve learned how to use the Kubernetes Pod Operator in Apache Airflow to execute tasks within a Kubernetes pod using a custom Docker image. We successfully ran a Sling ETL task to migrate data from a source database to Azure Blob Storage in Parquet format.

All my LinkedIn articles are available at All My LinkedIn Articles.