Apache Airflow - How to use SparkKubernetesOperator

Introduction
In this article, I will guide you through using the SparkKubernetesOperator with the Spark-Pi example, a sample application conveniently included in the Spark Docker image. The SparkKubernetesOperator is a powerful tool for running Spark applications on Kubernetes, leveraging Kubernetes’ native capabilities to manage and execute tasks in a highly parallelized and efficient way.
By using this operator, you can seamlessly integrate Spark’s distributed data processing capabilities with Kubernetes’ container orchestration, making it an ideal solution for running complex data processing pipelines in a scalable environment.
What is Airflow SparkKubernetesOperator
The SparkKubernetesOperator allows you to create and run spark job on a Kubernetes cluster. It is based on spark-on-k8s-operator project.
This operator simplifies the interface and accepts different parameters to configure and run spark application on Kubernetes. Similar to the KubernetesOperator, we have added the logic to wait for a job after submission, manage error handling, retrieve logs from the driver pod and the ability to delete a spark job. It also supports out-of-the-box Kubernetes functionalities such as handling of volumes, config maps, secrets, etc.
Refer to the link if you want to know more about the SparkKubernetesOperator: link:
Airflow SparkKubernetesOperator is an operator that runs a Spark application on Kubernetes. It is a subclass of the KubernetesPodOperator, which is an operator that runs a task in a Kubernetes Pod. The SparkKubernetesOperator is used to run a Spark application on Kubernetes in a parallelized way.
To use the SparkKubernetesOperator, you need to have a Kubernetes cluster running and have the Spark Operator installed on the cluster. The Spark Operator is a Kubernetes Operator for Apache Spark that aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes.
What is Spark Operator
The Kubernetes Operator for Apache Spark aims to make specifying and running Spark applications as easy and idiomatic as running other workloads on Kubernetes. It uses Kubernetes custom resources for specifying, running, and surfacing status of Spark applications.
https://github.com/kubeflow/spark-operator
Refer to the link if you want to know more about the Spark Operator:
In this document, we are not going to cover how to install the Spark Operator on Kubernetes. If you want to know how to install the Spark Operator on Kubernetes, please refer to the link above.
Spark-Pi Example
We are going to use `spark-pi-yaml' example that you can find from the GitHub url below:
I just modified the namespace from `default' to `airflow' in the example because I have chosen the `airflow' namespace for the Spark Operator.
# omitting the Copyright notice
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
# namespace: default
namespace: airflow
spec:
type: Scala
mode: cluster
image: spark:3.5.3
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
arguments:
- "5000"
sparkVersion: 3.5.3
driver:
labels:
version: 3.5.3
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.3
instances: 1
cores: 1
memory: 512m
Create a Dag for SparkKubernetesOperator
In this example, we are going to create a DAG for the SparkKubernetesOperator. The DAG will run the Spark-Pi example using the SparkKubernetesOperator.
from datetime import timedelta, datetime
# [START import_module]
# The DAG object; we'll need this to instantiate a DAG
from airflow import DAG
# Operators; we need this to operate!
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from airflow.utils.dates import days_ago
# [END import_module]
# [START default_args]
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'max_active_runs': 1,
'retries': 0,
}
# [END default_args]
# [START instantiate_dag]
with DAG(
dag_id='spark_pi',
start_date=days_ago(1),
default_args=default_args,
schedule=None,
tags=['example']
) as dag:
spark_pi_task = SparkKubernetesOperator(
task_id='spark_example',
namespace='airflow',
# relative path to DAG file
(1)
application_file='k8s-spark-operator/spark-pi.yaml',
(2)
kubernetes_conn_id='k8s_conn',
(3)
# do_xcom_push=True,
)
spark_pi_task
1 | The `application_file' is the path to the Spark-Pi example file. The path is relative to the DAG file. |
2 | The `kubernetes_conn_id' is the connection id to the Kubernetes cluster. You need to create a connection to the Kubernetes cluster in the Airflow UI. |
3 | The `do_xcom_push' is a boolean value that determines whether to push the logs to the XCom. I will show you what is happening when you set this value to True in the next section. |
For more information on how to create a connection to the Kubernetes cluster, please refer to the link below:
dags folder structure
As for dags folder, I have specified the 'data-airflow-dags' PVC in the `values.yaml' file. The 'airflow_dags' PVC is mounted to the '/opt/airflow/dags' path in the Airflow Pod.
I used Azure Fileshares for the 'data-airflow-dags' PVC so that I can upload the DAG files and Spark application files to the Azure Fileshares.
dags:
persistence:
enabled: true
existingClaim: data-airflow-dags
/opt/airflow/dags : root folder for all DAGs /opt/airflow/dags/k8s-spark-operator : folder for the Spark-Pi example file and other files related to the Spark Operator.
That’s why the `application_file' is set to 'k8s-spark-operator/spark-pi.yaml' in the DAG file.
Run the DAG
After creating the DAG file, you can upload the DAG file to the Airflow Pod by using the Azure Fileshares. Once the DAG file is uploaded to the Airflow Pod, you can run the DAG in the Airflow UI.
When you run the DAG, the Spark-Pi example will be executed.
Here is the screenshot of the DAG in the Airflow UI.

And, from the Logs tab, you can see the logs of the Spark-Pi example.
Pi is roughly 3.141627526283255
do_xcom_push=True
TL;DR: If the Spark application doesn’t return its result to the XCom, Do not set `do_xcom_push' to True.
When you set the `do_xcom_push' to True, the logs will be pushed to the XCom. The XCom is a feature of Airflow that allows you to push and pull data between tasks.
The KubernetesPodOperator handles XCom values differently than other operators. In order to pass a XCom value from your Pod you must specify the do_xcom_push as True. This will create a sidecar container that runs alongside the Pod. The Pod must write the XCom value into this location at the /airflow/xcom/return.json path.
For more information on XCom, please refer to the link below:
When XCom is set to True
When you set the `do_xcom_push' to True, you must write the XCom value into the location at the `/airflow/xcom/return.json' path. The XCom value is the logs that you want to push to the XCom.
If you don’t write the XCom value into the location at the `/airflow/xcom/return.json' path, you will get an error message like below:
2025-01-23, 15:06:01 PST] {pod_manager.py:727} INFO - Checking if xcom sidecar container is started.
[2025-01-23, 15:06:32 PST] {pod_manager.py:737} WARNING - Still waiting for the xcom sidecar container to start. Elapsed time: 30 seconds.
[2025-01-23, 15:07:02 PST] {pod_manager.py:737} WARNING - Still waiting for the xcom sidecar container to start. Elapsed time: 61 seconds.
// omitting the error messages
[2025-01-23, 15:17:44 PST] {pod_manager.py:737} WARNING - Still waiting for the xcom sidecar container to start. Elapsed time: 703 seconds.
[2025-01-23, 15:17:50 PST] {taskinstance.py:3311} ERROR - Task failed with exception
Custom Spark Applications
Mounting Azure Fileshares to the Airflow Pod and Spark Pod is a good way to upload the DAG files and Spark application files to the Airflow Pod.

In the case of the Spark-Pi example, the Spark application is already included in the Spark Docker image. However, if you want to run a custom Spark application, you’ll need to mount the application file to the Spark Pod.
To handle this, you can create a Persistent Volume Claim (PVC) for the Spark application file and then mount the PVC to the Spark Pod.
Here is an example of how to mount the Spark application file to the Spark Pod.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: my-spark-app
namespace: airflow
spec:
type: Scala
mode: cluster
image: spark:3.5.3
imagePullPolicy: IfNotPresent
timeToLiveSeconds: 30
deps:
repositories:
- https://repo1.maven.org/maven2
packages:
- org.apache.hadoop:hadoop-azure:3.2.0
- com.microsoft.azure:azure-storage:8.6.3
- org.postgresql:postgresql:42.7.0
- com.squareup.okhttp3:okhttp:4.12.0
- org.neo4j:neo4j-connector-apache-spark_2.12:5.3.1_for_spark_3
files:
(1)
- local:///opt/spark/apps/my-spark-app/spark.conf
(2)
mainClass: com.nsalexamy.examples.mysparkapp.MainApp
(3)
mainApplicationFile: local:///opt/spark/apps/my-spark-app/my-spark-app_2.12-0.0.1-SNAPSHOT.jar
(4)
volumes:
- name: spark-apps
persistentVolumeClaim:
claimName: data-spark-apps
arguments:
- "5000"
sparkVersion: 3.5.3
driver:
labels:
version: 3.5.3
cores: 1
instances: 1
memory: 512m
serviceAccount: spark
(5)
volumeMounts:
- name: spark-apps
mountPath: /opt/spark/apps
executor:
labels:
version: 3.5.3
instances: 1
cores: 1
memory: 8000m
volumeMounts:
- name: spark-apps
mountPath: /opt/spark/app
1 | The `files' is the path to the Spark application file. The path is relative to the Spark Pod. |
2 | The `mainClass' is the main class of the Spark application. |
3 | The `mainApplicationFile' is the path to the Spark application file. If the type is Scala, the path should be a jar file. |
4 | The `volumes' is the Persistent Volume Claim (PVC) for the Spark application file. The PVC is mounted to the Spark Pod. |
5 | The `volumeMounts' is the volume mount for the Spark application file. The volume mount is mounted to the Spark Pod. |
Conclusion
In this article, I demonstrated how to use the SparkKubernetesOperator with the Spark-Pi example. The SparkKubernetesOperator is designed to run Spark applications on Kubernetes in a parallelized manner. It’s actually a subclass of the KubernetesPodOperator, which is used to run tasks in Kubernetes Pods.
We also created a DAG for the SparkKubernetesOperator and successfully ran the Spark-Pi example. Additionally, we discussed how to use the SparkKubernetesOperator to run custom Spark applications.
All my LinkedIn articles are available at All My LinkedIn Articles.
References
Internal Links: nsa2/docs/airflow/airflow-on-k8s/examples/spark-operator/index.adoc