Service Foundry
Young Gyu Kim <credemol@gmail.com>

Apache Airflow - Postgres Hook

postgres hook intro

Introduction

In this article, we are going to see how to use PostgresHook in Apache Airflow to interact with Postgres database.

In this example, we will cover the following:

  • How to set up a Postgres connection in Apache Airflow

  • How to use PostgresHook to interact with Postgres database

  • How to load data from Postgres to Pandas DataFrame using PostgresHook

  • How to save Pandas DataFrame to Postgres using PostgresHook

  • How to handle JSON data in Postgres

  • How to use a Configmap for environment variables

What is Hook in Airflow

In Apache Airflow, a Hook is an interface to interact with external systems like databases, cloud services, and APIs. Hooks are used to connect to external systems and execute operations like reading data, writing data, and running commands.

What is PostgresHook

PostgresHook is a hook that allows you to interact with a Postgres database. It provides methods to execute SQL queries, load data into a Pandas DataFrame, and save data from a Pandas DataFrame to a Postgres table.

Description of the example

In this example, we will create a simple DAG that loads data from a Postgres table to a Pandas DataFrame using PostgresHook and saves the DataFrame back to another Postgres table using PostgresHook.

This example has two tasks:

  • static_sql_example: It loads data using static SQL from a Postgres table to a Pandas DataFrame and saves the DataFrame back to another Postgres table. This has a JSON column in the Postgres table.

  • sql_from_env_example: It uses environment variables for the SQL to load data from a Postgres table to a Pandas DataFrame and saves the DataFrame back to another Postgres table. We can use a ConfigMap to store the SQL and other environment variables.

Create a ConfigMap for this example

In this ConfigMap, it has the following environment variables:

  • SAMPLE_SQL : SQL to load data from a Postgres table to a Pandas DataFrame

  • TARGET_DB_SCHEMA: Target Postgres schema to save the data

  • TARGET_DB_TABLE: Target Postgres table to save the data

postgres-hook-configmap.yaml
apiVersion: v1
data:
  SAMPLE_SQL: |-
    SELECT * from your_table
    WHERE start_date=?
  TARGET_DB_SCHEMA: "postgres_hook_example"
  TARGET_DB_TABLE: "sql_from_env_example_view"
kind: ConfigMap
metadata:
  name: postgres-hook
  namespace: airflow

Add a Postgres connection

First, we need to add a Postgres connection on the Airflow Web UI.

  • Go to the Airflow Web UI and click on the Admin menu

  • Click on Connections

Then, you can see the list of connections. Click on the Create button to add a new connection.

postgres connection
Figure 1. Add Connection Form for Postgres

Fill in the following fields:

  • Connection Id: connection id for the Postgres connection.

  • Connection Type: Postgres

  • Host: Postgres host

  • Database: Postgres database

  • Login: Postgres user

  • Password: Postgres password

  • Port: Postgres port

  • Extra: Extra connection parameters like sslmode

Click on the Save button to save the connection.

We are going to use the Connection Id in the DAG to connect to the Postgres database.

Load data from Postgres to Pandas DataFrame using PostgresHook

The following code snippet shows how to load data from a Postgres table to a Pandas DataFrame using PostgresHook.

load data from Postgres to Pandas DataFrame using PostgresHook
POSTGRES_CONN_ID = "your_postgres_conn_id"

pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)

df = pg_hook.get_pandas_df(sql)

Save Pandas DataFrame to Postgres using PostgresHook

The following code snippet shows how to save a Pandas DataFrame to a Postgres table using PostgresHook.

engine = pg_hook.get_sqlalchemy_engine()

df.to_sql(table_name, engine, if_exists="replace", index=False, schema=schema_name,  dtype=datatype,)

Use a Configmap for environment variables

The following code snippet shows how to use a ConfigMap for environment variables in Airflow. We can use the environment variables defined in the ConfigMap in the DAG.

# Use a ConfigMap for environment variables
common_executor_config = {
    "pod_override": k8s.V1Pod(
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name="base",
                    env_from=[
                        k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="postgres-hook")),
                    ],
                )
            ]
        )
    ),
}



    @task(executor_config=common_executor_config)
    def sql_from_env_example():
        """
        Load data from a SQL query from an environment variable and save it into a table.
        This function demonstrates how to use environment variables defined in a ConfigMap in Airflow.
        :return:
        """
        # get SAMPLE_SQL environment variable
        sql = os.getenv("SAMPLE_SQL")

        target_db_schema = os.getenv("TARGET_DB_SCHEMA")
        target_db_table = os.getenv("TARGET_DB_TABLE")

        load_from_sql_save_into_table(sql, target_db_table, target_db_schema, )

Sample DAG of PostgresHook

Here is the sample DAG that demonstrates how to use PostgresHook in Apache Airflow.

postgres_hook_example.py
import logging
import os
import pandas
import sqlalchemy
from airflow import DAG
import pendulum
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from airflow.decorators import dag, task
from sqlalchemy import create_engine
from kubernetes.client import models as k8s


# Change these to your identifiers, if needed.
POSTGRES_CONN_ID = "your_postgres_conn_id"


def load_from_sql_save_into_table(sql:str, table_name:str, schema_name:str = "public", datatype:dict = None):
    """
    Load data from a SQL query and save it into a table.
    :param sql: SQL for extracting data
    :param table_name:  DB table name to save the data
    :param schema_name: DB schema name to save the data
    :param datatype:  Data type for special columns (e.g. JSON)
    :return: void
    """
    pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
    logging.info("Save query to another table")
    logging.info(f"Query: {sql}")
    logging.info(f"Table name: {table_name}")
    logging.info(f"Schema name: {schema_name}")
    logging.info(f"pg_hook: {pg_hook}")

    # create engine
    engine = pg_hook.get_sqlalchemy_engine()
    logging.info(f"## engine created: {engine}")

    df = pg_hook.get_pandas_df(sql)
    logging.info(f"DataFrame: {df}")

    # save dataframe to postgres view
    df.to_sql(table_name, engine, if_exists="replace", index=False, schema=schema_name,  dtype=datatype,)

# set environment variables from ConfigMap named postgres-hook
common_executor_config = {
    "pod_override": k8s.V1Pod(
        spec=k8s.V1PodSpec(
            containers=[
                k8s.V1Container(
                    name="base",
                    env_from=[
                        k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="postgres-hook")),
                    ],
                )
            ]
        )
    ),
}

with DAG(
    dag_id="postgres_hook",
    start_date=pendulum.datetime(2024, 12, 13),
    schedule=None,
    # schedule_interval="@daily",
    catchup=False,
) as dag:

    @task()
    def static_sql_example():
        """
        Load data from a static SQL query and save it into a table.
        Handle a column with JSON data type.
        """
        load_from_sql_save_into_table(
            "SELECT * from decision_support.medrec_history limit 10",
            "static_sql_example_view",
            "postgres_hook_example",
            {"history": sqlalchemy.types.JSON})


    @task(executor_config=common_executor_config)
    def sql_from_env_example():
        """
        Load data from a SQL query from an environment variable and save it into a table.
        This function demonstrates how to use environment variables defined in a ConfigMap in Airflow.
        :return:
        """
        # get SAMPLE_SQL environment variable
        sql = os.getenv("SAMPLE_SQL")
        logging.info(f"SQL: {sql}")
        # replace start_date=? with start_date='2018-06-06'
        sql = sql.replace("start_date=?", "start_date='2018-06-06'")
        target_db_schema = os.getenv("TARGET_DB_SCHEMA")
        target_db_table = os.getenv("TARGET_DB_TABLE")
        logging.info(f"Updated SQL: {sql}")
        logging.info(f"Schema: {target_db_schema}")
        logging.info(f"Table: {target_db_table}")

        load_from_sql_save_into_table(sql, target_db_table, target_db_schema, )

    static_sql_example_task = static_sql_example()
    sql_from_env_example_task = sql_from_env_example()

    # run the tasks in parallel
    [static_sql_example_task, sql_from_env_example_task]

Conclusion

In this article, we have seen how to use PostgresHook in Apache Airflow to interact with a Postgres database. We have also seen how to load data from a Postgres table to a Pandas DataFrame and save the DataFrame back to another Postgres table using PostgresHook. In addition, we have seen how to use a ConfigMap for environment variables in Airflow.

All my LinkedIn articles are available at All My LinkedIn Articles.