Service Foundry
Young Gyu Kim <credemol@gmail.com>

Apache Airflow - How to use SparkKubernetesOperator

spark operator intro

Introduction

In this article, I will guide you through using the SparkKubernetesOperator with the Spark-Pi example, a sample application conveniently included in the Spark Docker image. The SparkKubernetesOperator is a powerful tool for running Spark applications on Kubernetes, leveraging Kubernetes’ native capabilities to manage and execute tasks in a highly parallelized and efficient way.

By using this operator, you can seamlessly integrate Spark’s distributed data processing capabilities with Kubernetes’ container orchestration, making it an ideal solution for running complex data processing pipelines in a scalable environment.

What is Airflow SparkKubernetesOperator

The SparkKubernetesOperator allows you to create and run spark job on a Kubernetes cluster. It is based on spark-on-k8s-operator project.

This operator simplifies the interface and accepts different parameters to configure and run spark application on Kubernetes. Similar to the KubernetesOperator, we have added the logic to wait for a job after submission, manage error handling, retrieve logs from the driver pod and the ability to delete a spark job. It also supports out-of-the-box Kubernetes functionalities such as handling of volumes, config maps, secrets, etc.

— Airflow Documentation

Refer to the link if you want to know more about the SparkKubernetesOperator: link:

Airflow SparkKubernetesOperator is an operator that runs a Spark application on Kubernetes. It is a subclass of the KubernetesPodOperator, which is an operator that runs a task in a Kubernetes Pod. The SparkKubernetesOperator is used to run a Spark application on Kubernetes in a parallelized way.

To use the SparkKubernetesOperator, you need to have a Kubernetes cluster running and have the Spark Operator installed on the cluster. The Spark Operator is a Kubernetes Operator for Apache Spark that aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes.

What is Spark Operator

The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications.

— Kubernetes Documentation
https://github.com/kubeflow/spark-operator

Refer to the link if you want to know more about the Spark Operator:

In this document, we are not going to cover how to install the Spark Operator on Kubernetes. If you want to know how to install the Spark Operator on Kubernetes, please refer to the link above.

Spark-Pi Example

We are going to use `spark-pi-yaml' example that you can find from the GitHub url below:

I just modified the namespace from `default' to `airflow' in the example because I have chosen the `airflow' namespace for the Spark Operator.

spark-pi.yaml
# omitting the Copyright notice

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
#  namespace: default
  namespace: airflow
spec:
  type: Scala
  mode: cluster
  image: spark:3.5.3
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
  arguments:
    - "5000"
  sparkVersion: 3.5.3
  driver:
    labels:
      version: 3.5.3
    cores: 1
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    labels:
      version: 3.5.3
    instances: 1
    cores: 1
    memory: 512m

Create a Dag for SparkKubernetesOperator

In this example, we are going to create a DAG for the SparkKubernetesOperator. The DAG will run the Spark-Pi example using the SparkKubernetesOperator.

spark-py-example.py
from datetime import timedelta, datetime

# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.utils.dates import days_ago
# [END import_module]

# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'max_active_runs': 1,
    'retries': 0,
}
# [END default_args]

# [START instantiate_dag]

with DAG(
    dag_id='spark_pi',
    start_date=days_ago(1),
    default_args=default_args,
    schedule=None,
    tags=['example']
) as dag:
    spark_pi_task = SparkKubernetesOperator(
        task_id='spark_example',
        namespace='airflow',
        # relative path to DAG file
        (1)
        application_file='k8s-spark-operator/spark-pi.yaml',
        (2)
        kubernetes_conn_id='k8s_conn',
        (3)
        # do_xcom_push=True,
    )
    spark_pi_task
1 The `application_file' is the path to the Spark-Pi example file. The path is relative to the DAG file.
2 The `kubernetes_conn_id' is the connection id to the Kubernetes cluster. You need to create a connection to the Kubernetes cluster in the Airflow UI.
3 The `do_xcom_push' is a boolean value that determines whether to push the logs to the XCom. I will show you what is happening when you set this value to True in the next section.

For more information on how to create a connection to the Kubernetes cluster, please refer to the link below:

dags folder structure

As for dags folder, I have specified the 'data-airflow-dags' PVC in the `values.yaml' file. The 'airflow_dags' PVC is mounted to the '/opt/airflow/dags' path in the Airflow Pod.

I used Azure Fileshares for the 'data-airflow-dags' PVC so that I can upload the DAG files and Spark application files to the Azure Fileshares.

values.yaml file of the Airflow Helm chart
dags:
  persistence:
    enabled: true
    existingClaim: data-airflow-dags

/opt/airflow/dags : root folder for all DAGs /opt/airflow/dags/k8s-spark-operator : folder for the Spark-Pi example file and other files related to the Spark Operator.

That’s why the `application_file' is set to 'k8s-spark-operator/spark-pi.yaml' in the DAG file.

Run the DAG

After creating the DAG file, you can upload the DAG file to the Airflow Pod by using the Azure Fileshares. Once the DAG file is uploaded to the Airflow Pod, you can run the DAG in the Airflow UI.

When you run the DAG, the Spark-Pi example will be executed.

Here is the screenshot of the DAG in the Airflow UI.

spark pi result
Figure 1. spark-pi result

And, from the Logs tab, you can see the logs of the Spark-Pi example.

spark-pi logs
Pi is roughly 3.141627526283255

do_xcom_push=True

TL;DR: If the Spark application doesn’t return its result to the XCom, Do not set `do_xcom_push' to True.

When you set the `do_xcom_push' to True, the logs will be pushed to the XCom. The XCom is a feature of Airflow that allows you to push and pull data between tasks.

How does XCom work?

The KubernetesPodOperator handles XCom values differently than other operators. In order to pass a XCom value from your Pod you must specify the do_xcom_push as True. This will create a sidecar container that runs alongside the Pod. The Pod must write the XCom value into this location at the /airflow/xcom/return.json path.

— Airflow Documentation

For more information on XCom, please refer to the link below:

When XCom is set to True

When you set the `do_xcom_push' to True, you must write the XCom value into the location at the `/airflow/xcom/return.json' path. The XCom value is the logs that you want to push to the XCom.

If you don’t write the XCom value into the location at the `/airflow/xcom/return.json' path, you will get an error message like below:

error messages
2025-01-23, 15:06:01 PST] {pod_manager.py:727} INFO - Checking if xcom sidecar container is started.
[2025-01-23, 15:06:32 PST] {pod_manager.py:737} WARNING - Still waiting for the xcom sidecar container to start. Elapsed time: 30 seconds.
[2025-01-23, 15:07:02 PST] {pod_manager.py:737} WARNING - Still waiting for the xcom sidecar container to start. Elapsed time: 61 seconds.

// omitting the error messages

[2025-01-23, 15:17:44 PST] {pod_manager.py:737} WARNING - Still waiting for the xcom sidecar container to start. Elapsed time: 703 seconds.
[2025-01-23, 15:17:50 PST] {taskinstance.py:3311} ERROR - Task failed with exception

Custom Spark Applications

Mounting Azure Fileshares to the Airflow Pod and Spark Pod is a good way to upload the DAG files and Spark application files to the Airflow Pod.

mount azure fileshare
Figure 2. Mount Azure Fileshare to the Airflow Pod and Spark Pod

In the case of the Spark-Pi example, the Spark application is already included in the Spark Docker image. However, if you want to run a custom Spark application, you’ll need to mount the application file to the Spark Pod.

To handle this, you can create a Persistent Volume Claim (PVC) for the Spark application file and then mount the PVC to the Spark Pod.

Here is an example of how to mount the Spark application file to the Spark Pod.

my-spark-app.yaml
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: my-spark-app
  namespace: airflow
spec:
  type: Scala
  mode: cluster
  image: spark:3.5.3
  imagePullPolicy: IfNotPresent
  timeToLiveSeconds: 30
  deps:
    repositories:
      - https://repo1.maven.org/maven2
    packages:
      - org.apache.hadoop:hadoop-azure:3.2.0
      - com.microsoft.azure:azure-storage:8.6.3
      - org.postgresql:postgresql:42.7.0
      - com.squareup.okhttp3:okhttp:4.12.0
      - org.neo4j:neo4j-connector-apache-spark_2.12:5.3.1_for_spark_3
    files:
      (1)
      - local:///opt/spark/apps/my-spark-app/spark.conf

  (2)
  mainClass: com.nsalexamy.examples.mysparkapp.MainApp
  (3)
  mainApplicationFile: local:///opt/spark/apps/my-spark-app/my-spark-app_2.12-0.0.1-SNAPSHOT.jar
  (4)
  volumes:
    - name: spark-apps
      persistentVolumeClaim:
        claimName: data-spark-apps

  arguments:
    - "5000"
  sparkVersion: 3.5.3
  driver:
    labels:
      version: 3.5.3
    cores: 1
    instances: 1
    memory: 512m
    serviceAccount: spark
    (5)
    volumeMounts:
      - name: spark-apps
        mountPath: /opt/spark/apps

  executor:
    labels:
      version: 3.5.3
    instances: 1
    cores: 1
    memory: 8000m
    volumeMounts:
      - name: spark-apps
        mountPath: /opt/spark/app
1 The `files' is the path to the Spark application file. The path is relative to the Spark Pod.
2 The `mainClass' is the main class of the Spark application.
3 The `mainApplicationFile' is the path to the Spark application file. If the type is Scala, the path should be a jar file.
4 The `volumes' is the Persistent Volume Claim (PVC) for the Spark application file. The PVC is mounted to the Spark Pod.
5 The `volumeMounts' is the volume mount for the Spark application file. The volume mount is mounted to the Spark Pod.

Conclusion

In this article, I demonstrated how to use the SparkKubernetesOperator with the Spark-Pi example. The SparkKubernetesOperator is designed to run Spark applications on Kubernetes in a parallelized manner. It’s actually a subclass of the KubernetesPodOperator, which is used to run tasks in Kubernetes Pods.

We also created a DAG for the SparkKubernetesOperator and successfully ran the Spark-Pi example. Additionally, we discussed how to use the SparkKubernetesOperator to run custom Spark applications.

All my LinkedIn articles are available at All My LinkedIn Articles.