Service Foundry
Young Gyu Kim <credemol@gmail.com>

Apache Airflow - Data Migration using Sling

sling and spark

Introduction

This article is the following of the previous article: Apache Airflow KubernetesPodOperator and assumes that you have already installed Apache Airflow on AKS.

What is Sling?

Sling is a data migration tool that helps to move data from one database to another. It is a command-line tool that can be used to move data from one database to another. Sling is a good tool for moving relatively small amounts of data from one database to another.

Airflow KubernetesPodOperator for Sling in a parallelized way

In this example, we will use the KubernetesPodOperator to run the Sling tool in an Airflow DAG. The KubernetesPodOperator is an operator that runs a task in a Kubernetes Pod in a parallelized way.

sling pod
Figure 1. Sling Pod

The env.yaml file will be mounted as a secret in the Sling Pod. The env.yaml file contains the connection details for the source and destination databases.

The Sling scripts will be mounted as a PVC in the Sling Pod. The Sling scripts are the scripts that are used to move the data from the source database to the destination database.

In each KubernetesPodOperator task, we will run a Sling script to move the data from one source database to the destination database.

airflow graph
Figure 2. Airflow Graph

All the Sling scripts in a dag will run in parallel.

Create a secret for env.yaml

In env.yaml, we will store the environment variables that are required to run the Sling tool. In the env.yaml file, we will store the connection details for the source and destination databases. For this example, we will use 3 source databases and 1 destination database.

env.yaml
connections:
  SRC_DB_1:
    type: postgres
    host: postgresql-prod-server1
    user: user1
    password: password1
    port: 5432
    database: db_1
    sslmode: require
  SRC_DB_2:
    type: postgres
    host: postgresql-prod-server2
    user: user2
    password: password2
    port: 5432
    database: db_2
    #schema: public
    sslmode: require
  SRC_DB_3:
    type: postgres
    host: postgresql-prod-server3
    user: user3
    password: password3
    port: 5432
    database: db_3
    #schema: public
    sslmode: require
  DEST_DB_1:
    type: postgres
    host: dest-postgresql-server
    user: dest_user
    password: dest_password
    port: 5432
    database: datalake
    #schema: public
    sslmode: disable

To create a secret for env.yaml, run the following command:

#$ kubectl -n airflow delete secret sling-env
#$ kubectl -n airflow create secret generic sling-env --from-file=env.yaml

$ kubectl -n airflow create secret generic sling-env --from-file=env.yaml --dry-run=client -o yaml | yq eval 'del(.metadata.creationTimestamp)' > sling-env-secret.yaml

$ kubectl -n airflow apply -f sling-env-secret.yaml

To verify that the secret has been created successfully, run the following command:

$ kubectl -n airflow get secret

$ kubectl -n airflow get secret sling-env -o yaml
$ kubectl -n airflow get secret sling-env -o yaml | yq '.data.["env.yaml"]' | base64 -d

Create a PVC for sling scripts

Table 1. File share for sling scripts
Property Value

Storage Account

your-storage-account-name

Fileshare name

sling-scripts

In this example, we will use Azure Fileshare storage to store the Sling scripts. We will create a PVC for the Sling scripts.

Here is the manifest file for the PVC:

pvc-sling-scripts.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-sling-scripts-storage
  labels:
    environment: airflow
    app: airflow

spec:
  capacity:
    storage: 1Gi
  accessModes:
    - ReadWriteMany
  storageClassName: azurefile
  azureFile:
    secretNamespace: 'airflow'
    # This secret contains iclinicaksstorage and key
    secretName: aksdepstorage-secret
    # QC2 will use the same IDOC storage as COLO.
    shareName: sling-scripts
    readOnly: false
  mountOptions:
    # data directory "/var/lib/postgresql/data" has invalid permissions
    # Permissions should be u=rwx (0700) or u=rwx,g=rx (0750).
    #    - dir_mode=0777
    #    - file_mode=0777
    - dir_mode=0777
    - file_mode=0777
    # uid and gid used to be 1000, and then 65534, now its value is 0
    # the value came from values-3.5.0-debian-11-r0.yaml, and cache and nosharesock are added recently.
    # see https://docs.microsoft.com/en-us/azure/aks/azure-files-volume#mount-file-share-as-a-persistent-volume

    # use the same uid and gid as the airflow container
    - uid=0
    - gid=0
    - mfsymlinks
    - cache=strict
    - nosharesock
    - nobrl
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: data-sling-scripts
  namespace: airflow
  labels:
    environment: airflow
    app: airflow
spec:
  accessModes:
    # code = InvalidArgument desc = Volume capability not supported, default does not support ReadWriteMany
    #    - ReadWriteMany
    - ReadWriteMany
  storageClassName: azurefile
  # PostgreSQL does not work with Azure file. This is because PostgreSQL requires hard links in the Azure File directory,
  # and since Azure File does not support hard links the pod fails to start.
  #  storageClassName: azurefile
  volumeName: pv-sling-scripts-storage
  resources:
    requests:
      storage: 1Gi

To create a PVC for sling scripts, run the following command:

$ kubectl -n airflow apply -f pvc-sling-scripts.yaml

Push Docker image to ACR

docker/Dockerfile
FROM slingdata/sling

#FROM apache/airflow:2.9.3



#RUN pip install --no-cache-dir --upgrade pip && \
#    pip install --no-cache-dir apache-airflow-providers-cncf-kubernetes

#WORKDIR /home/airflow
#WORKDIR /home/sling
WORKDIR /usr/app

#RUN curl -LO 'https://github.com/slingdata-io/sling-cli/releases/latest/download/sling_linux_amd64.tar.gz' \
#  && tar xf sling_linux_amd64.tar.gz \
#  && rm -f sling_linux_amd64.tar.gz \
#  && chmod +x sling


# env.yaml will be mounted as a secret (sling-env) in the k8s deployment
#COPY env.yaml /home/sling/.sling/env.yaml

#--COPY env.yaml /home/airflow/.sling/env.yaml
#--COPY replication.yaml /usr/app/replication.yaml

# script files will be mounted as a volume in the k8s deployment
#COPY run-sling.sh /usr/app/run-sling.sh
#COPY run-sling.sh /home/airflow/run-sling.sh

# Run Sling
#CMD ["sh", "run-sling.sh"]
#CMD ["/bin/bash"]

# override ENTRYPOINT from base image
# ENTRYPOINT ["sling"]
#ENTRYPOINT ["sh", "run-sling.sh"]
#ENTRYPOINT ["sh"]
ENTRYPOINT ["ls", "-l", "/usr/app/"]

To push the Docker image to ACR, run the following command:

$ ACR_NAME=your-acr-name
# login to ACR
$ az acr login --name $ACR_NAME

# push the Docker image to ACR
$ az acr build --registry $ACR_NAME --image dep-sling:0.1.0 ./docker

pharmanet-schedculer-app

Environment variables for the pharmanet-schedculer-app:

Table 2. Environment variables

Name

Store Type

Value

Description

GET_MEDREC_SERVICE_REQUESTS_SQL

ConfigMap

APPOINTMENT_JDBC_URL

Secret

APPOINTMENT_DB_USERNAME

Secret

APPOINTMENT_DB_PASSWORD

Secret

PHARMANET_JDBC_URL

Secret

PHARMANET_DB_USERNAME

Secret

PHARMANET_DB_PASSWORD

Secret

ICLINIC_MODULES_PATIENT_ENABLED

ConfigMap

false

PHARMANET_DB_SCHEMA

ConfigMap

decision_support

Create a secret for environment variables for the pharmanet-schedculer-app

$ kubectl -n airflow create secret generic pharmanet-scheduler-app \
  --from-literal APPOINTMENT_JDBC_URL=jdbc:postgresql://postgresql.dep:5432/datalake \
  --from-literal APPOINTMENT_DB_USERNAME=iclinic \
  --from-literal APPOINTMENT_DB_PASSWORD="ENC(bLkn/dI6MYx5inMzcKrKPLz5A2GXB5ze)" \
  --from-literal PHARMANET_JDBC_URL=jdbc:postgresql://postgresql.dep:5432/datalake \
  --from-literal PHARMANET_DB_USERNAME=iclinic \
  --from-literal PHARMANET_DB_PASSWORD="ENC(bLkn/dI6MYx5inMzcKrKPLz5A2GXB5ze)" \
  --dry-run=client -o yaml | yq eval 'del(.metadata.creationTimestamp)' > pharmanet-scheduler-app-secret.yaml

$ kubectl apply -f pharmanet-scheduler-app-secret.yaml

Create a ConfigMap for the pharmanet-schedculer-app

get-medrec-service-requests.sql
SELECT a.id as appointment_id, a.patient_id, a.practitioner_id, u.id as user_id, u.pharmanet_global_id, p.phn
FROM staging_appointment.appointments a
         INNER JOIN staging_user._shell_users u
                    ON a.practitioner_id = u.practitioner_id
         INNER JOIN staging_patient.patients p
                    ON a.patient_id = p.id and a.patient_id != 0
    INNER JOIN staging_doctor.practitioners pr
ON a.practitioner_id = pr.id and pr.record_type = 'Practitioner'
WHERE start_date=?
$ kubectl -n airflow create configmap pharmanet-scheduler-app \
  --from-file=GET_MEDREC_SERVICE_REQUESTS_SQL=./get-medrec-service-requests.sql \
  --from-file=GET_DECISION_SUPPORT_TARGETS_SQL=./get-decision-support-targets.sql \
  --from-literal PHARMANET_DB_SCHEMA="decision_support" \
  --dry-run=client -o yaml | yq eval 'del(.metadata.creationTimestamp)' > pharmanet-scheduler-app-configmap.yaml

$ kubectl apply -f pharmanet-scheduler-app-configmap.yaml

Check PostgreSQL connection

# Run a pod to test the Postgresql connection in airflow namespace

$ kubectl -n airflow run postgres-client --image=postgres --env="POSTGRES_PASSWORD=skunkwerks2010" --env="POD_NAMESPACE=airflow"
$ kubectl -n airflow exec -it postgres-client -- bash

# in the pod
$ psql -h iclinic-staging-flex-db.postgres.database.azure.com --username=iclinic -W -d icliniccore

# password: Pronouncement38#
Pronouncement38#

Write Airflow DAG

Here is the DAG file for the Sling migration:

/dags/sling_dag.py
import pendulum
from airflow import DAG
from airflow.decorators import dag, task

from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.providers.cncf.kubernetes.operators.spark_kubernetes import SparkKubernetesOperator
from kubernetes.client import models as k8s


with DAG(
    dag_id="sling-migration",
    start_date=pendulum.datetime(2024, 12, 13),
    schedule_interval="@daily",
    catchup=False,
    tags=["sling", "migration"],
) as dag:
    scripts = [
        #"staging1-icliniccore-public-migration.sh",
        #"qc2-legacy_replica-colo-migration.sh",
        # "example1.sh",
        # "example2.sh",
        # "example3.sh",
        "staging_appointment-datalake.sh",
        "staging_files-datalake.sh",
        "staging_icliniccore-datalake.sh",
        "staging_patient-datalake.sh",

    ]
    # date string format: YYYY-MM-DD
    date_string = pendulum.now().format("YYYY-MM-DD")
    millisecond = pendulum.now().format("x")
    working_dir = f"/decision-support/{date_string}/{millisecond}/"

    volumes = [
        k8s.V1Volume(
            name="sling-env",
            secret=k8s.V1SecretVolumeSource(secret_name="sling-env"),
        ),
        k8s.V1Volume(
            name="scripts-volume",
            persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="data-sling-scripts"),
        ),
    ]
    volume_mounts = [
        k8s.V1VolumeMount(
            name="sling-env",
            mount_path="/home/sling/.sling/env.yaml",
            sub_path="env.yaml",
        ),
        k8s.V1VolumeMount(
            name="scripts-volume",
            mount_path="/usr/app/scripts/",
        ),
    ]

    resource_requests = k8s.V1ResourceRequirements(
        requests={"cpu": "500m", "memory": "512Mi"},
        limits={"cpu": "500m", "memory": "1Gi"},
    )




    migration_tasks = []
    for script in scripts:
        migration_tasks.append(KubernetesPodOperator(
            task_id=f"migrate_{script}",
            name=f"migrate_{script}",
            namespace="airflow",
            image="iclinicacr.azurecr.io/dep-sling:0.1.0",

            image_pull_policy="Always",
            #cmds=["sh", "-c"],
            #cmds=["sleep", "5m"],
            #cmds=["ls", "-la", "/usr/app/scripts/"],
            cmds=["sh", "-c", f"/usr/app/scripts/{script}"],
            #arguments=[f"bash /usr/app/scripts/{script}"],
            # arguments=[f"/usr/app/scripts/{script}"],
            container_resources=resource_requests,
            volumes=volumes,
            volume_mounts=volume_mounts,
            is_delete_operator_pod=True,
            get_logs=True,
        ))

    @task
    def start_migration():
        return "Migration started"

    # @task
    # def end_migration():
    #     return "Migration ended"

    sparkApp = SparkKubernetesOperator(
        task_id='spark_app',
        namespace='airflow',
        trigger_rule="all_success",
        depends_on_past=False,
        retries=0,
        #application_file='/opt/airflow/app/spark/spark-apps/spark-pi.yaml',
        # relative path from DAG file
        application_file='k8s-spark-operator/decision-support-spark-app.yaml',
        kubernetes_conn_id='k8s_conn',
        do_xcom_push=True,
    )

    start_task = start_migration()
    # end_task = end_migration()

    # start_task >> migration_tasks >> end_task
    start_task >> migration_tasks >> sparkApp

This Dag file is supposed to run every day at 00:00 UTC. There is a task before the Sling tasks and after the Sling tasks. The end_task will run after all the Sling tasks are completed.

In the DAG file, we have defined 3 tasks for each source database. Each task will run a Sling script to move the data from the source database to the destination database.

From the DAG file, we can learn how to use VolumeMount and Volume in the KubernetesPodOperator to mount the secret and PVC in the Sling Pod. And we also learn how to specify the resource limits and requests for the Sling Pod.

By using cmds=["sh", "-c", f"/usr/app/scripts/{script}"], we can run the Sling tool in the Sling Pod.

Here is an example of a Sling script file used in the DAG:

scripts/example1.sh
#!/bin/sh

echo "##### Starting migration of example1 content to sling"

echo "### Migrating table_1 data from SRC_DB_1 to DEST_DB_1"
sling run --src-conn SRC_DB_1 --src-stream 'public.table_1' \
  --tgt-conn DEST_DB_1 --tgt-object "src_db_1.table_1" \
  --mode incremental \
  --primary-key "id" \
  --update-key "last_update_date" \

echo "### Migrating table_2 data from SRC_DB_1 to DEST_DB_1"
sling run --src-conn SRC_DB_1 --src-stream 'public.table_2' \
  --tgt-conn DEST_DB_1 --tgt-object "src_db_1.table_2" \
  --mode incremental \
  --primary-key "id" \
  --update-key "last_update_date"

This Sling script file moves the incremental data from the source database to the destination database.

After the Sling tasks are completed, we can check the logs of the Sling tasks in the Airflow UI and verify that the data has been moved successfully from the source database to the destination database.

Conclusion

In this article, we have seen how to use the KubernetesPodOperator to run the Sling tool in an Airflow DAG. We have created a secret for the env.yaml file, a PVC for the Sling scripts, and pushed the Docker image to ACR. We have also written an Airflow DAG file for the Sling migration and run the Sling tasks in parallel using the KubernetesPodOperator. We have also seen an example of a Sling script file that moves the data from the source database to the destination database.