Apache Airflow - Data Migration using Sling

Introduction
This article is the following of the previous article: Apache Airflow KubernetesPodOperator and assumes that you have already installed Apache Airflow on AKS.
What is Sling?
Sling is a data migration tool that helps to move data from one database to another. It is a command-line tool that can be used to move data from one database to another. Sling is a good tool for moving relatively small amounts of data from one database to another.
Airflow KubernetesPodOperator for Sling in a parallelized way
In this example, we will use the KubernetesPodOperator to run the Sling tool in an Airflow DAG. The KubernetesPodOperator is an operator that runs a task in a Kubernetes Pod in a parallelized way.

The env.yaml file will be mounted as a secret in the Sling Pod. The env.yaml file contains the connection details for the source and destination databases.
The Sling scripts will be mounted as a PVC in the Sling Pod. The Sling scripts are the scripts that are used to move the data from the source database to the destination database.
In each KubernetesPodOperator task, we will run a Sling script to move the data from one source database to the destination database.

All the Sling scripts in a dag will run in parallel.
Create a secret for env.yaml
In env.yaml, we will store the environment variables that are required to run the Sling tool. In the env.yaml file, we will store the connection details for the source and destination databases. For this example, we will use 3 source databases and 1 destination database.
connections:
SRC_DB_1:
type: postgres
host: postgresql-prod-server1
user: user1
password: password1
port: 5432
database: db_1
sslmode: require
SRC_DB_2:
type: postgres
host: postgresql-prod-server2
user: user2
password: password2
port: 5432
database: db_2
#schema: public
sslmode: require
SRC_DB_3:
type: postgres
host: postgresql-prod-server3
user: user3
password: password3
port: 5432
database: db_3
#schema: public
sslmode: require
DEST_DB_1:
type: postgres
host: dest-postgresql-server
user: dest_user
password: dest_password
port: 5432
database: datalake
#schema: public
sslmode: disable
To create a secret for env.yaml, run the following command:
#$ kubectl -n airflow delete secret sling-env
#$ kubectl -n airflow create secret generic sling-env --from-file=env.yaml
$ kubectl -n airflow create secret generic sling-env --from-file=env.yaml --dry-run=client -o yaml | yq eval 'del(.metadata.creationTimestamp)' > sling-env-secret.yaml
$ kubectl -n airflow apply -f sling-env-secret.yaml
To verify that the secret has been created successfully, run the following command:
$ kubectl -n airflow get secret
$ kubectl -n airflow get secret sling-env -o yaml
$ kubectl -n airflow get secret sling-env -o yaml | yq '.data.["env.yaml"]' | base64 -d
Create a PVC for sling scripts
Property | Value |
---|---|
Storage Account |
your-storage-account-name |
Fileshare name |
sling-scripts |
In this example, we will use Azure Fileshare storage to store the Sling scripts. We will create a PVC for the Sling scripts.
Here is the manifest file for the PVC:
apiVersion: v1
kind: PersistentVolume
metadata:
name: pv-sling-scripts-storage
labels:
environment: airflow
app: airflow
spec:
capacity:
storage: 1Gi
accessModes:
- ReadWriteMany
storageClassName: azurefile
azureFile:
secretNamespace: 'airflow'
# This secret contains iclinicaksstorage and key
secretName: aksdepstorage-secret
# QC2 will use the same IDOC storage as COLO.
shareName: sling-scripts
readOnly: false
mountOptions:
# data directory "/var/lib/postgresql/data" has invalid permissions
# Permissions should be u=rwx (0700) or u=rwx,g=rx (0750).
# - dir_mode=0777
# - file_mode=0777
- dir_mode=0777
- file_mode=0777
# uid and gid used to be 1000, and then 65534, now its value is 0
# the value came from values-3.5.0-debian-11-r0.yaml, and cache and nosharesock are added recently.
# see https://docs.microsoft.com/en-us/azure/aks/azure-files-volume#mount-file-share-as-a-persistent-volume
# use the same uid and gid as the airflow container
- uid=0
- gid=0
- mfsymlinks
- cache=strict
- nosharesock
- nobrl
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-sling-scripts
namespace: airflow
labels:
environment: airflow
app: airflow
spec:
accessModes:
# code = InvalidArgument desc = Volume capability not supported, default does not support ReadWriteMany
# - ReadWriteMany
- ReadWriteMany
storageClassName: azurefile
# PostgreSQL does not work with Azure file. This is because PostgreSQL requires hard links in the Azure File directory,
# and since Azure File does not support hard links the pod fails to start.
# storageClassName: azurefile
volumeName: pv-sling-scripts-storage
resources:
requests:
storage: 1Gi
To create a PVC for sling scripts, run the following command:
$ kubectl -n airflow apply -f pvc-sling-scripts.yaml
Push Docker image to ACR
FROM slingdata/sling
#FROM apache/airflow:2.9.3
#RUN pip install --no-cache-dir --upgrade pip && \
# pip install --no-cache-dir apache-airflow-providers-cncf-kubernetes
#WORKDIR /home/airflow
#WORKDIR /home/sling
WORKDIR /usr/app
#RUN curl -LO 'https://github.com/slingdata-io/sling-cli/releases/latest/download/sling_linux_amd64.tar.gz' \
# && tar xf sling_linux_amd64.tar.gz \
# && rm -f sling_linux_amd64.tar.gz \
# && chmod +x sling
# env.yaml will be mounted as a secret (sling-env) in the k8s deployment
#COPY env.yaml /home/sling/.sling/env.yaml
#--COPY env.yaml /home/airflow/.sling/env.yaml
#--COPY replication.yaml /usr/app/replication.yaml
# script files will be mounted as a volume in the k8s deployment
#COPY run-sling.sh /usr/app/run-sling.sh
#COPY run-sling.sh /home/airflow/run-sling.sh
# Run Sling
#CMD ["sh", "run-sling.sh"]
#CMD ["/bin/bash"]
# override ENTRYPOINT from base image
# ENTRYPOINT ["sling"]
#ENTRYPOINT ["sh", "run-sling.sh"]
#ENTRYPOINT ["sh"]
ENTRYPOINT ["ls", "-l", "/usr/app/"]
To push the Docker image to ACR, run the following command:
$ ACR_NAME=your-acr-name
# login to ACR
$ az acr login --name $ACR_NAME
# push the Docker image to ACR
$ az acr build --registry $ACR_NAME --image dep-sling:0.1.0 ./docker
pharmanet-schedculer-app
Environment variables for the pharmanet-schedculer-app:
Name |
Store Type |
Value |
Description |
GET_MEDREC_SERVICE_REQUESTS_SQL |
ConfigMap |
||
APPOINTMENT_JDBC_URL |
Secret |
||
APPOINTMENT_DB_USERNAME |
Secret |
||
APPOINTMENT_DB_PASSWORD |
Secret |
||
PHARMANET_JDBC_URL |
Secret |
||
PHARMANET_DB_USERNAME |
Secret |
||
PHARMANET_DB_PASSWORD |
Secret |
||
ICLINIC_MODULES_PATIENT_ENABLED |
ConfigMap |
false |
|
PHARMANET_DB_SCHEMA |
ConfigMap |
decision_support |
Create a secret for environment variables for the pharmanet-schedculer-app
$ kubectl -n airflow create secret generic pharmanet-scheduler-app \
--from-literal APPOINTMENT_JDBC_URL=jdbc:postgresql://postgresql.dep:5432/datalake \
--from-literal APPOINTMENT_DB_USERNAME=iclinic \
--from-literal APPOINTMENT_DB_PASSWORD="ENC(bLkn/dI6MYx5inMzcKrKPLz5A2GXB5ze)" \
--from-literal PHARMANET_JDBC_URL=jdbc:postgresql://postgresql.dep:5432/datalake \
--from-literal PHARMANET_DB_USERNAME=iclinic \
--from-literal PHARMANET_DB_PASSWORD="ENC(bLkn/dI6MYx5inMzcKrKPLz5A2GXB5ze)" \
--dry-run=client -o yaml | yq eval 'del(.metadata.creationTimestamp)' > pharmanet-scheduler-app-secret.yaml
$ kubectl apply -f pharmanet-scheduler-app-secret.yaml
Create a ConfigMap for the pharmanet-schedculer-app
SELECT a.id as appointment_id, a.patient_id, a.practitioner_id, u.id as user_id, u.pharmanet_global_id, p.phn
FROM staging_appointment.appointments a
INNER JOIN staging_user._shell_users u
ON a.practitioner_id = u.practitioner_id
INNER JOIN staging_patient.patients p
ON a.patient_id = p.id and a.patient_id != 0
INNER JOIN staging_doctor.practitioners pr
ON a.practitioner_id = pr.id and pr.record_type = 'Practitioner'
WHERE start_date=?
$ kubectl -n airflow create configmap pharmanet-scheduler-app \
--from-file=GET_MEDREC_SERVICE_REQUESTS_SQL=./get-medrec-service-requests.sql \
--from-file=GET_DECISION_SUPPORT_TARGETS_SQL=./get-decision-support-targets.sql \
--from-literal PHARMANET_DB_SCHEMA="decision_support" \
--dry-run=client -o yaml | yq eval 'del(.metadata.creationTimestamp)' > pharmanet-scheduler-app-configmap.yaml
$ kubectl apply -f pharmanet-scheduler-app-configmap.yaml
Check PostgreSQL connection
# Run a pod to test the Postgresql connection in airflow namespace
$ kubectl -n airflow run postgres-client --image=postgres --env="POSTGRES_PASSWORD=skunkwerks2010" --env="POD_NAMESPACE=airflow"
$ kubectl -n airflow exec -it postgres-client -- bash
# in the pod
$ psql -h iclinic-staging-flex-db.postgres.database.azure.com --username=iclinic -W -d icliniccore
# password: Pronouncement38#
Pronouncement38#
Write Airflow DAG
Here is the DAG file for the Sling migration:
import pendulum
from airflow import DAG
from airflow.decorators import dag, task
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from kubernetes.client import models as k8s
with DAG(
dag_id="sling-migration",
start_date=pendulum.datetime(2024, 12, 13),
schedule_interval="@daily",
catchup=False,
tags=["sling", "migration"],
) as dag:
scripts = [
#"staging1-icliniccore-public-migration.sh",
#"qc2-legacy_replica-colo-migration.sh",
# "example1.sh",
# "example2.sh",
# "example3.sh",
"staging_appointment-datalake.sh",
"staging_files-datalake.sh",
"staging_icliniccore-datalake.sh",
"staging_patient-datalake.sh",
]
# date string format: YYYY-MM-DD
date_string = pendulum.now().format("YYYY-MM-DD")
millisecond = pendulum.now().format("x")
working_dir = f"/decision-support/{date_string}/{millisecond}/"
volumes = [
k8s.V1Volume(
name="sling-env",
secret=k8s.V1SecretVolumeSource(secret_name="sling-env"),
),
k8s.V1Volume(
name="scripts-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="data-sling-scripts"),
),
]
volume_mounts = [
k8s.V1VolumeMount(
name="sling-env",
mount_path="/home/sling/.sling/env.yaml",
sub_path="env.yaml",
),
k8s.V1VolumeMount(
name="scripts-volume",
mount_path="/usr/app/scripts/",
),
]
resource_requests = k8s.V1ResourceRequirements(
requests={"cpu": "500m", "memory": "512Mi"},
limits={"cpu": "500m", "memory": "1Gi"},
)
migration_tasks = []
for script in scripts:
migration_tasks.append(KubernetesPodOperator(
task_id=f"migrate_{script}",
name=f"migrate_{script}",
namespace="airflow",
image="iclinicacr.azurecr.io/dep-sling:0.1.0",
image_pull_policy="Always",
#cmds=["sh", "-c"],
#cmds=["sleep", "5m"],
#cmds=["ls", "-la", "/usr/app/scripts/"],
cmds=["sh", "-c", f"/usr/app/scripts/{script}"],
#arguments=[f"bash /usr/app/scripts/{script}"],
# arguments=[f"/usr/app/scripts/{script}"],
container_resources=resource_requests,
volumes=volumes,
volume_mounts=volume_mounts,
is_delete_operator_pod=True,
get_logs=True,
))
@task
def start_migration():
return "Migration started"
# @task
# def end_migration():
# return "Migration ended"
sparkApp = SparkKubernetesOperator(
task_id='spark_app',
namespace='airflow',
trigger_rule="all_success",
depends_on_past=False,
retries=0,
#application_file='/opt/airflow/app/spark/spark-apps/spark-pi.yaml',
# relative path from DAG file
application_file='k8s-spark-operator/decision-support-spark-app.yaml',
kubernetes_conn_id='k8s_conn',
do_xcom_push=True,
)
start_task = start_migration()
# end_task = end_migration()
# start_task >> migration_tasks >> end_task
start_task >> migration_tasks >> sparkApp
This Dag file is supposed to run every day at 00:00 UTC. There is a task before the Sling tasks and after the Sling tasks. The end_task will run after all the Sling tasks are completed.
In the DAG file, we have defined 3 tasks for each source database. Each task will run a Sling script to move the data from the source database to the destination database.
From the DAG file, we can learn how to use VolumeMount and Volume in the KubernetesPodOperator to mount the secret and PVC in the Sling Pod. And we also learn how to specify the resource limits and requests for the Sling Pod.
By using cmds=["sh", "-c", f"/usr/app/scripts/{script}"], we can run the Sling tool in the Sling Pod.
Here is an example of a Sling script file used in the DAG:
#!/bin/sh
echo "##### Starting migration of example1 content to sling"
echo "### Migrating table_1 data from SRC_DB_1 to DEST_DB_1"
sling run --src-conn SRC_DB_1 --src-stream 'public.table_1' \
--tgt-conn DEST_DB_1 --tgt-object "src_db_1.table_1" \
--mode incremental \
--primary-key "id" \
--update-key "last_update_date" \
echo "### Migrating table_2 data from SRC_DB_1 to DEST_DB_1"
sling run --src-conn SRC_DB_1 --src-stream 'public.table_2' \
--tgt-conn DEST_DB_1 --tgt-object "src_db_1.table_2" \
--mode incremental \
--primary-key "id" \
--update-key "last_update_date"
This Sling script file moves the incremental data from the source database to the destination database.
After the Sling tasks are completed, we can check the logs of the Sling tasks in the Airflow UI and verify that the data has been moved successfully from the source database to the destination database.
Conclusion
In this article, we have seen how to use the KubernetesPodOperator to run the Sling tool in an Airflow DAG. We have created a secret for the env.yaml file, a PVC for the Sling scripts, and pushed the Docker image to ACR. We have also written an Airflow DAG file for the Sling migration and run the Sling tasks in parallel using the KubernetesPodOperator. We have also seen an example of a Sling script file that moves the data from the source database to the destination database.