Apache Airflow - Postgres Hook
- Introduction
- What is Hook in Airflow
- What is PostgresHook
- Description of the example
- Create a ConfigMap for this example
- Add a Postgres connection
- Load data from Postgres to Pandas DataFrame using PostgresHook
- Save Pandas DataFrame to Postgres using PostgresHook
- Use a Configmap for environment variables
- Sample DAG of PostgresHook
- Conclusion
- Referencers

Introduction
In this article, we are going to see how to use PostgresHook in Apache Airflow to interact with Postgres database.
In this example, we will cover the following:
-
How to set up a Postgres connection in Apache Airflow
-
How to use PostgresHook to interact with Postgres database
-
How to load data from Postgres to Pandas DataFrame using PostgresHook
-
How to save Pandas DataFrame to Postgres using PostgresHook
-
How to handle JSON data in Postgres
-
How to use a Configmap for environment variables
What is Hook in Airflow
In Apache Airflow, a Hook is an interface to interact with external systems like databases, cloud services, and APIs. Hooks are used to connect to external systems and execute operations like reading data, writing data, and running commands.
For more information, refer to the official documentation: https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/connections.html#hooks
What is PostgresHook
PostgresHook is a hook that allows you to interact with a Postgres database. It provides methods to execute SQL queries, load data into a Pandas DataFrame, and save data from a Pandas DataFrame to a Postgres table.
Description of the example
In this example, we will create a simple DAG that loads data from a Postgres table to a Pandas DataFrame using PostgresHook and saves the DataFrame back to another Postgres table using PostgresHook.
This example has two tasks:
-
static_sql_example: It loads data using static SQL from a Postgres table to a Pandas DataFrame and saves the DataFrame back to another Postgres table. This has a JSON column in the Postgres table.
-
sql_from_env_example: It uses environment variables for the SQL to load data from a Postgres table to a Pandas DataFrame and saves the DataFrame back to another Postgres table. We can use a ConfigMap to store the SQL and other environment variables.
Create a ConfigMap for this example
In this ConfigMap, it has the following environment variables:
-
SAMPLE_SQL : SQL to load data from a Postgres table to a Pandas DataFrame
-
TARGET_DB_SCHEMA: Target Postgres schema to save the data
-
TARGET_DB_TABLE: Target Postgres table to save the data
apiVersion: v1
data:
SAMPLE_SQL: |-
SELECT * from your_table
WHERE start_date=?
TARGET_DB_SCHEMA: "postgres_hook_example"
TARGET_DB_TABLE: "sql_from_env_example_view"
kind: ConfigMap
metadata:
name: postgres-hook
namespace: airflow
Add a Postgres connection
First, we need to add a Postgres connection on the Airflow Web UI.
-
Go to the Airflow Web UI and click on the Admin menu
-
Click on Connections
Then, you can see the list of connections. Click on the Create button to add a new connection.

Fill in the following fields:
-
Connection Id: connection id for the Postgres connection.
-
Connection Type: Postgres
-
Host: Postgres host
-
Database: Postgres database
-
Login: Postgres user
-
Password: Postgres password
-
Port: Postgres port
-
Extra: Extra connection parameters like sslmode
Click on the Save button to save the connection.
We are going to use the Connection Id in the DAG to connect to the Postgres database.
Load data from Postgres to Pandas DataFrame using PostgresHook
The following code snippet shows how to load data from a Postgres table to a Pandas DataFrame using PostgresHook.
POSTGRES_CONN_ID = "your_postgres_conn_id"
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
df = pg_hook.get_pandas_df(sql)
Save Pandas DataFrame to Postgres using PostgresHook
The following code snippet shows how to save a Pandas DataFrame to a Postgres table using PostgresHook.
engine = pg_hook.get_sqlalchemy_engine()
df.to_sql(table_name, engine, if_exists="replace", index=False, schema=schema_name, dtype=datatype,)
Use a Configmap for environment variables
The following code snippet shows how to use a ConfigMap for environment variables in Airflow. We can use the environment variables defined in the ConfigMap in the DAG.
# Use a ConfigMap for environment variables
common_executor_config = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
env_from=[
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="postgres-hook")),
],
)
]
)
),
}
@task(executor_config=common_executor_config)
def sql_from_env_example():
"""
Load data from a SQL query from an environment variable and save it into a table.
This function demonstrates how to use environment variables defined in a ConfigMap in Airflow.
:return:
"""
# get SAMPLE_SQL environment variable
sql = os.getenv("SAMPLE_SQL")
target_db_schema = os.getenv("TARGET_DB_SCHEMA")
target_db_table = os.getenv("TARGET_DB_TABLE")
load_from_sql_save_into_table(sql, target_db_table, target_db_schema, )
Sample DAG of PostgresHook
Here is the sample DAG that demonstrates how to use PostgresHook in Apache Airflow.
import logging
import os
import pandas
import sqlalchemy
from airflow import DAG
import pendulum
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from airflow.decorators import dag, task
from sqlalchemy import create_engine
from kubernetes.client import models as k8s
# Change these to your identifiers, if needed.
POSTGRES_CONN_ID = "your_postgres_conn_id"
def load_from_sql_save_into_table(sql:str, table_name:str, schema_name:str = "public", datatype:dict = None):
"""
Load data from a SQL query and save it into a table.
:param sql: SQL for extracting data
:param table_name: DB table name to save the data
:param schema_name: DB schema name to save the data
:param datatype: Data type for special columns (e.g. JSON)
:return: void
"""
pg_hook = PostgresHook.get_hook(POSTGRES_CONN_ID)
logging.info("Save query to another table")
logging.info(f"Query: {sql}")
logging.info(f"Table name: {table_name}")
logging.info(f"Schema name: {schema_name}")
logging.info(f"pg_hook: {pg_hook}")
# create engine
engine = pg_hook.get_sqlalchemy_engine()
logging.info(f"## engine created: {engine}")
df = pg_hook.get_pandas_df(sql)
logging.info(f"DataFrame: {df}")
# save dataframe to postgres view
df.to_sql(table_name, engine, if_exists="replace", index=False, schema=schema_name, dtype=datatype,)
# set environment variables from ConfigMap named postgres-hook
common_executor_config = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
env_from=[
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(name="postgres-hook")),
],
)
]
)
),
}
with DAG(
dag_id="postgres_hook",
start_date=pendulum.datetime(2024, 12, 13),
schedule=None,
# schedule_interval="@daily",
catchup=False,
) as dag:
@task()
def static_sql_example():
"""
Load data from a static SQL query and save it into a table.
Handle a column with JSON data type.
"""
load_from_sql_save_into_table(
"SELECT * from decision_support.medrec_history limit 10",
"static_sql_example_view",
"postgres_hook_example",
{"history": sqlalchemy.types.JSON})
@task(executor_config=common_executor_config)
def sql_from_env_example():
"""
Load data from a SQL query from an environment variable and save it into a table.
This function demonstrates how to use environment variables defined in a ConfigMap in Airflow.
:return:
"""
# get SAMPLE_SQL environment variable
sql = os.getenv("SAMPLE_SQL")
logging.info(f"SQL: {sql}")
# replace start_date=? with start_date='2018-06-06'
sql = sql.replace("start_date=?", "start_date='2018-06-06'")
target_db_schema = os.getenv("TARGET_DB_SCHEMA")
target_db_table = os.getenv("TARGET_DB_TABLE")
logging.info(f"Updated SQL: {sql}")
logging.info(f"Schema: {target_db_schema}")
logging.info(f"Table: {target_db_table}")
load_from_sql_save_into_table(sql, target_db_table, target_db_schema, )
static_sql_example_task = static_sql_example()
sql_from_env_example_task = sql_from_env_example()
# run the tasks in parallel
[static_sql_example_task, sql_from_env_example_task]
Conclusion
In this article, we have seen how to use PostgresHook in Apache Airflow to interact with a Postgres database. We have also seen how to load data from a Postgres table to a Pandas DataFrame and save the DataFrame back to another Postgres table using PostgresHook. In addition, we have seen how to use a ConfigMap for environment variables in Airflow.
All my LinkedIn articles are available at All My LinkedIn Articles.