Whether you’re new to data pipelines or a seasoned professional looking for a more robust solution, this post will introduce you to the fundamentals of Airflow and how to SelfHost your own Airflow instance.
Apache Airflow is a tool that allows developers and data engineers to design, schedule, and monitor complex workflows with ease.
The Airflow Project
Think of Airflow as the traffic cop for data, ensuring everything flows smoothly and nothing crashes.
- The Airflow Site
- The Airflow Source Code at Github
- And Awsome Docs(https://airflow.apache.org/docs/)
- License: Apache v2.0 ❤️
- And Awsome Docs(https://airflow.apache.org/docs/)
SelfHosting Airflow
Thanks to Airflow, we will have a platform to programmatically author, schedule and monitor workflows.
Pre-Requisites!! Just Get Docker 🐋👇
You can install it for any PC/mac/Linux at home or in any cloud provider that you wish.
It will just take few moments, this one. If you are in Linux, just
apt-get update && sudo apt-get upgrade && curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh
#sudo apt install docker-compose -y
And install also Docker-compose with:
apt install docker-compose -y
And when the process finishes - you can use it to SelfHost other services as well.
You should see the versions with:
docker --version
docker-compose --version
#sudo systemctl status docker #and the status
Airflow with Docker
By default, docker-airflow runs Airflow with SequentialExecutor :
docker run -d -p 8080:8080 puckel/docker-airflow webserver #docker.io/puckel/docker-airflow
version: '3'
services:
airflow:
image: puckel/docker-airflow
container_name: airflow-webserver
ports:
- "8080:8080"
command: webserver
Host : postgres Schema : airflow Login : airflow Password : airflow
Meteo
http://localhost:8080/admin/airflow/code?dag_id=meteo_dag&root=
docker exec -it [CONTAINER_ID_OR_NAME] /bin/bash docker exec -it airflow-tutorial-postgres-1 /bin/bash
sqlite3 /database/my_database.db
How to use Airflow - DAGs
Create a Python script (e.g., meteo_dag.py) in the dags folder of your Airflow directory. This script will define your DAG and the task to run your meteo script.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# Define the function to run your meteo script
def run_meteo_script():
# Your meteo script here
# ...
# Define default_args dictionary
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Instantiate a DAG
dag = DAG(
'meteo_dag', # DAG ID
default_args=default_args,
description='A DAG to run the meteo script every hour',
schedule_interval=timedelta(hours=1), # Run every hour
start_date=datetime(2023, 8, 21), # Start date of the DAG
catchup=False, # If set to True, Airflow will run the DAG for any date in the interval between start_date and the current date
)
# Define a task using PythonOperator
run_script_task = PythonOperator(
task_id='run_meteo_script',
python_callable=run_meteo_script,
dag=dag,
)
Airflow Tutorial
https://github.com/tuanavu/airflow-tutorial
docker-compose.yml File: This file will define all the services (webserver, scheduler, worker, database) and their configurations. Airflow Configuration (airflow.cfg): This file will contain all the Airflow-specific configurations.
This is the proposal:
psql –version 9.6.24
#inside the container, connect to the DB with
psql -U airflow -d airflow
list tables with:
\dt
CREATE TABLE meteo_data (
time TIMESTAMP,
apparent_temperature FLOAT,
precipitation FLOAT
);
SELECT * FROM meteo_data LIMIT 5;
INSERT INTO meteo_data (time, apparent_temperature, precipitation) VALUES (current_timestamp, 25.5, 0.1);
SELECT * FROM meteo_data;
This configuration is all you need to have it running with the puckel/docker-airflow image
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
webserver:
image: puckel/docker-airflow:1.10.1
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./examples/intro-example/dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
PYTHON_DEPS: sqlalchemy==1.2.0,openmeteo_py,…..
Airflow UI is ready at: localhost:8080
Airflow Components: Web Server: The Airflow web interface. Scheduler: The component that triggers tasks. Worker(s): Celery or another executor to run the tasks. Metastore Database: PostgreSQL or MySQL to store metadata.
- Scaling and Monitoring:
- Prometheus: For monitoring Airflow metrics.
- Grafana: For visualizing the metrics.
Later you can just use: but with this it wont detect the intro/examples/dag folder, be careful
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
webserver:
image: puckel/docker-airflow:1.10.1
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./examples/intro-example/dags:/usr/local/airflow/dags
# Uncomment to include custom plugins
# - ./plugins:/usr/local/airflow/plugins
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
to test docker yml
version: '3'
services:
mariadb:
image: mariadb:10.5
environment:
- MYSQL_ROOT_PASSWORD=root_password
- MYSQL_USER=airflow
- MYSQL_PASSWORD=airflow
- MYSQL_DATABASE=airflow
redis:
image: redis:latest
webserver:
image: apache/airflow:2.1.2
restart: always
depends_on:
- mariadb
- redis
environment:
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mariadb/airflow?charset=utf8
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
- AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow:airflow@mariadb/airflow
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
- AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: webserver
scheduler:
image: apache/airflow:2.1.2
restart: always
depends_on:
- webserver
environment:
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mariadb/airflow?charset=utf8
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
- AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow:airflow@mariadb/airflow
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
- AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
volumes:
- ./dags:/opt/airflow/dags
command: scheduler
worker:
image: apache/airflow:2.1.2
restart: always
depends_on:
- webserver
environment:
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mariadb/airflow?charset=utf8
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
- AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow:airflow@mariadb/airflow
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
- AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
volumes:
- ./dags:/opt/airflow/dags
command: celery worker
flower:
image: apache/airflow:2.1.2
restart: always
depends_on:
- webserver
environment:
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
ports:
- "5555:5555"
command: celery flower
version: '3'
services:
postgres:
image: postgres:13
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
redis:
image: redis:latest
webserver:
image: apache/airflow:2.1.2
restart: always
depends_on:
- postgres
- redis
environment:
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
- AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
- AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
volumes:
- ./dags:/opt/airflow/dags
ports:
- "8080:8080"
command: webserver
scheduler:
image: apache/airflow:2.1.2
restart: always
depends_on:
- webserver
environment:
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
- AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
- AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
volumes:
- ./dags:/opt/airflow/dags
command: scheduler
worker:
image: apache/airflow:2.1.2
restart: always
depends_on:
- webserver
environment:
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
- AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
- AIRFLOW__CORE__EXECUTOR=CeleryExecutor
- AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
volumes:
- ./dags:/opt/airflow/dags
command: celery worker
flower:
image: apache/airflow:2.1.2
restart: always
depends_on:
- webserver
environment:
- AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
ports:
- "5555:5555"
command: celery flower
FAQ
What are Other F/OSS Big Data Technologies?
Data Lake Technologies
Data lake technologies are tools and frameworks designed to store, process, and manage large volumes of data, often in a raw format, from various sources.
They enable organizations to perform analytics and gain insights from big data. Here’s a brief overview of the tools you mentioned:
-
Great Expectations is an open-source data validation and testing tool. It allows users to create “expectations” – assertions about data quality – that need to be met in a data pipeline. For example, expectations can include that a column in a table should never have null values, or that the values in a numeric column should fall within a certain range. Great Expectations provides a way to validate data against these predefined rules, ensuring that data that moves through the pipeline meets the quality standards.
-
PyDeequ is a Python API for Deequ, which is an open-source tool developed by Amazon to perform unit tests on data, similar to how software developers perform unit tests on their code. Deequ is built on top of Apache Spark and provides functionalities to measure data quality, test static and dynamic data, and suggest data quality constraints. PyDeequ makes these capabilities accessible in Python, which is a widely used language in data science and analytics.
-
Talend Data Quality (TalendDQ): Talend Data Quality is part of the Talend suite, which provides tools for data integration, transformation, and quality management. TalendDQ allows users to profile, cleanse, and monitor data quality within their data integration processes. It offers various components and functionalities for ensuring that the data in a data lake or any other data storage system is accurate, complete, and reliable.
Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. While Great Expectations, PyDeequ, and TalendDQ are tools focused on data validation, quality checks, and data management, they can be related to Airflow in the sense that they can be integrated into data pipelines that are orchestrated by Airflow.
In summary, while Great Expectations, PyDeequ, and TalendDQ are not components of Airflow, they can be used in conjunction with Airflow to ensure that data pipelines are robust, and the data used throughout an organization is of high quality.
-
Apache Griffin: An open-source Data Quality solution for distributed data systems at any scale. It supports both batch and streaming modes.
-
Great Expectations: As previously mentioned, it’s an open-source tool for validating, documenting, and profiling your data to ensure quality.
-
OpenRefine: A standalone open-source desktop application for data cleanup and transformation to other formats, often used for data quality improvements.
-
Deequ: An open-source tool developed by Amazon for building unit tests for data, which can be integrated with Apache Spark.
-
Pandas Profiling: An open-source Python library for profiling the data which can be a part of data quality assessment in a Data Science workflow.
Simiar Tools to Airflow
Trigger.dev is an open source platform and SDK which allows you to create long-running background jobs with no timeouts. Write normal async code, deploy, and never hit a timeout.