Whether you’re new to data pipelines or a seasoned professional looking for a more robust solution, this post will introduce you to the fundamentals of Airflow and how to SelfHost your own Airflow instance.

Apache Airflow is a tool that allows developers and data engineers to design, schedule, and monitor complex workflows with ease.

The Airflow Project

Think of Airflow as the traffic cop for data, ensuring everything flows smoothly and nothing crashes.

SelfHosting Airflow

Thanks to Airflow, we will have a platform to programmatically author, schedule and monitor workflows.

Pre-Requisites!! Just Get Docker 🐋👇

You can install it for any PC/mac/Linux at home or in any cloud provider that you wish.

It will just take few moments, this one. If you are in Linux, just

apt-get update && sudo apt-get upgrade && curl -fsSL https://get.docker.com -o get-docker.sh
sh get-docker.sh 
#sudo apt install docker-compose -y

And install also Docker-compose with:

apt install docker-compose -y

And when the process finishes - you can use it to SelfHost other services as well.

You should see the versions with:

docker --version
docker-compose --version

#sudo systemctl status docker #and the status

Airflow with Docker

By default, docker-airflow runs Airflow with SequentialExecutor :

docker run -d -p 8080:8080 puckel/docker-airflow webserver #docker.io/puckel/docker-airflow
version: '3'

services:
  airflow:
    image: puckel/docker-airflow
    container_name: airflow-webserver
    ports:
      - "8080:8080"
    command: webserver

Host : postgres Schema : airflow Login : airflow Password : airflow

Meteo

http://localhost:8080/admin/airflow/code?dag_id=meteo_dag&root=

docker exec -it [CONTAINER_ID_OR_NAME] /bin/bash docker exec -it airflow-tutorial-postgres-1 /bin/bash

sqlite3 /database/my_database.db

How to use Airflow - DAGs

Create a Python script (e.g., meteo_dag.py) in the dags folder of your Airflow directory. This script will define your DAG and the task to run your meteo script.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# Define the function to run your meteo script
def run_meteo_script():
    # Your meteo script here
    # ...

# Define default_args dictionary
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Instantiate a DAG
dag = DAG(
    'meteo_dag',  # DAG ID
    default_args=default_args,
    description='A DAG to run the meteo script every hour',
    schedule_interval=timedelta(hours=1),  # Run every hour
    start_date=datetime(2023, 8, 21),  # Start date of the DAG
    catchup=False,  # If set to True, Airflow will run the DAG for any date in the interval between start_date and the current date
)

# Define a task using PythonOperator
run_script_task = PythonOperator(
    task_id='run_meteo_script',
    python_callable=run_meteo_script,
    dag=dag,
)

Airflow Tutorial

https://github.com/tuanavu/airflow-tutorial

docker-compose.yml File: This file will define all the services (webserver, scheduler, worker, database) and their configurations. Airflow Configuration (airflow.cfg): This file will contain all the Airflow-specific configurations.

This is the proposal:

psql –version 9.6.24

#inside the container, connect to the DB with

psql -U airflow -d airflow

list tables with:

\dt

CREATE TABLE meteo_data (
    time TIMESTAMP,
    apparent_temperature FLOAT,
    precipitation FLOAT
);
SELECT * FROM meteo_data LIMIT 5;
INSERT INTO meteo_data (time, apparent_temperature, precipitation) VALUES (current_timestamp, 25.5, 0.1);
SELECT * FROM meteo_data;

This configuration is all you need to have it running with the puckel/docker-airflow image

version: '3'
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  webserver:
    image: puckel/docker-airflow:1.10.1
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
    volumes:
      - ./examples/intro-example/dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

Thanks to.

PYTHON_DEPS: sqlalchemy==1.2.0,openmeteo_py,…..

Airflow UI is ready at: localhost:8080

Airflow Components: Web Server: The Airflow web interface. Scheduler: The component that triggers tasks. Worker(s): Celery or another executor to run the tasks. Metastore Database: PostgreSQL or MySQL to store metadata.

  1. Scaling and Monitoring:
  • Prometheus: For monitoring Airflow metrics.
  • Grafana: For visualizing the metrics.

Later you can just use: but with this it wont detect the intro/examples/dag folder, be careful

version: '3'
services:
  postgres:
    image: postgres:9.6
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow
    ports:
      - "5432:5432"

  webserver:
    image: puckel/docker-airflow:1.10.1
    restart: always
    depends_on:
      - postgres
    environment:
      - LOAD_EX=n
      - EXECUTOR=Local
      - FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
    volumes:
      - ./examples/intro-example/dags:/usr/local/airflow/dags
      # Uncomment to include custom plugins
      # - ./plugins:/usr/local/airflow/plugins
    ports:
      - "8080:8080"
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3

to test docker yml

version: '3'

services:
  mariadb:
    image: mariadb:10.5
    environment:
      - MYSQL_ROOT_PASSWORD=root_password
      - MYSQL_USER=airflow
      - MYSQL_PASSWORD=airflow
      - MYSQL_DATABASE=airflow

  redis:
    image: redis:latest

  webserver:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - mariadb
      - redis
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mariadb/airflow?charset=utf8
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
      - AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow:airflow@mariadb/airflow
      - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
      - AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
    volumes:
      - ./dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    command: webserver

  scheduler:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - webserver
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mariadb/airflow?charset=utf8
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
      - AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow:airflow@mariadb/airflow
      - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
      - AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
    volumes:
      - ./dags:/opt/airflow/dags
    command: scheduler

  worker:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - webserver
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+mysqldb://airflow:airflow@mariadb/airflow?charset=utf8
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
      - AIRFLOW__CELERY__RESULT_BACKEND=db+mysql://airflow:airflow@mariadb/airflow
      - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
      - AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
    volumes:
      - ./dags:/opt/airflow/dags
    command: celery worker

  flower:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - webserver
    environment:
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
    ports:
      - "5555:5555"
    command: celery flower
version: '3'

services:
  postgres:
    image: postgres:13
    environment:
      - POSTGRES_USER=airflow
      - POSTGRES_PASSWORD=airflow
      - POSTGRES_DB=airflow

  redis:
    image: redis:latest

  webserver:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - postgres
      - redis
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
      - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
      - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
      - AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
    volumes:
      - ./dags:/opt/airflow/dags
    ports:
      - "8080:8080"
    command: webserver

  scheduler:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - webserver
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
      - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
      - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
      - AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
    volumes:
      - ./dags:/opt/airflow/dags
    command: scheduler

  worker:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - webserver
    environment:
      - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@postgres/airflow
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
      - AIRFLOW__CELERY__RESULT_BACKEND=db+postgresql://airflow:airflow@postgres/airflow
      - AIRFLOW__CORE__EXECUTOR=CeleryExecutor
      - AIRFLOW__CORE__FERNET_KEY=<YOUR_FERNET_KEY>
    volumes:
      - ./dags:/opt/airflow/dags
    command: celery worker

  flower:
    image: apache/airflow:2.1.2
    restart: always
    depends_on:
      - webserver
    environment:
      - AIRFLOW__CELERY__BROKER_URL=redis://redis:6379/0
    ports:
      - "5555:5555"
    command: celery flower

FAQ

What are Other F/OSS Big Data Technologies?

Data Lake Technologies

Data lake technologies are tools and frameworks designed to store, process, and manage large volumes of data, often in a raw format, from various sources.

They enable organizations to perform analytics and gain insights from big data. Here’s a brief overview of the tools you mentioned:

  • Great Expectations is an open-source data validation and testing tool. It allows users to create “expectations” – assertions about data quality – that need to be met in a data pipeline. For example, expectations can include that a column in a table should never have null values, or that the values in a numeric column should fall within a certain range. Great Expectations provides a way to validate data against these predefined rules, ensuring that data that moves through the pipeline meets the quality standards.

  • PyDeequ is a Python API for Deequ, which is an open-source tool developed by Amazon to perform unit tests on data, similar to how software developers perform unit tests on their code. Deequ is built on top of Apache Spark and provides functionalities to measure data quality, test static and dynamic data, and suggest data quality constraints. PyDeequ makes these capabilities accessible in Python, which is a widely used language in data science and analytics.

  • Talend Data Quality (TalendDQ): Talend Data Quality is part of the Talend suite, which provides tools for data integration, transformation, and quality management. TalendDQ allows users to profile, cleanse, and monitor data quality within their data integration processes. It offers various components and functionalities for ensuring that the data in a data lake or any other data storage system is accurate, complete, and reliable.

Apache Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. While Great Expectations, PyDeequ, and TalendDQ are tools focused on data validation, quality checks, and data management, they can be related to Airflow in the sense that they can be integrated into data pipelines that are orchestrated by Airflow.

In summary, while Great Expectations, PyDeequ, and TalendDQ are not components of Airflow, they can be used in conjunction with Airflow to ensure that data pipelines are robust, and the data used throughout an organization is of high quality.

  1. Apache Griffin: An open-source Data Quality solution for distributed data systems at any scale. It supports both batch and streaming modes.

  2. Great Expectations: As previously mentioned, it’s an open-source tool for validating, documenting, and profiling your data to ensure quality.

  3. OpenRefine: A standalone open-source desktop application for data cleanup and transformation to other formats, often used for data quality improvements.

  4. Deequ: An open-source tool developed by Amazon for building unit tests for data, which can be integrated with Apache Spark.

  5. Pandas Profiling: An open-source Python library for profiling the data which can be a part of data quality assessment in a Data Science workflow.

Simiar Tools to Airflow

Trigger.dev is an open source platform and SDK which allows you to create long-running background jobs with no timeouts. Write normal async code, deploy, and never hit a timeout.