Have you ever thought, “I need to collect, clean, and analyze data at the same time every day… can’t this be automated?” That’s exactly when Apache Airflow comes in handy.

With version 3.1.0 released in September 2025, Airflow remains one of the most actively used workflow orchestration tools in modern data engineering. Today, I’ll walk you through this powerful tool so that even beginners can understand and start using it effectively.

 

Apache Airflow

 

 

1. What Exactly Does Apache Airflow Do?

Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows.

Problems That Airflow Solves

Previously, people automated tasks using Cron or shell scripts, but these approaches had several issues:

  • Dependency Management Challenges: Managing execution order like “Task B must run after Task A completes” becomes complex
  • Lack of Failure Handling: When tasks fail, it’s difficult to identify where the failure occurred and restart
  • Limited Monitoring: Hard to know which tasks are running and when they’ll complete
  • Scalability Issues: Managing hundreds or thousands of tasks becomes impossible

Airflow solves all these problems. Simply put, it’s a “smart task manager” that automatically executes and manages multiple tasks in a defined order.

Understanding Through Real-World Examples

Let’s say you need to perform these tasks every day at 7 AM:

  1. Extract yesterday’s sales data from the e-commerce database
  2. Clean and analyze the data
  3. Update results to the dashboard
  4. Send alerts to stakeholders if anomalies are detected

Doing this manually every day would be exhausting. With Airflow:

  • Write it once in Python code
  • Runs automatically at scheduled times
  • Receive notifications when issues occur at any stage
  • Restart from the failed step
  • Monitor the entire process on a web interface

 

 

2. Understanding Core Concepts of Airflow

To use Airflow effectively, you need to understand a few core concepts. Let’s go through them one by one.

DAG (Directed Acyclic Graph) – Workflow Blueprint

A DAG defines the relationships and execution order of tasks in a workflow.

Breaking Down the Term:

  • Directed: Tasks flow in one direction, A → B
  • Acyclic: Once you go A → B → C, you don’t loop back to A
  • Graph: A diagram connecting tasks

Simply put, it’s your “task execution plan.” Like cooking, there’s an order:

Prep ingredients → Chop → Cook → Plate

It progresses in sequence, and once you “plate,” you don’t go back to “prep ingredients.” That’s a DAG!

For the e-commerce example above, the DAG would look like:

Extract data → Clean data → Analyze → Update dashboard
                                           ↓
                                      Send alerts

DAG Characteristics:

  • One DAG = One complete workflow
  • Define one DAG per Python file
  • Can schedule to run daily, weekly, or at specific times

Tasks and Operators – Actual Work Units

Here’s an important concept: the relationship between Tasks and Operators.

What is an Operator? A “template” or “blueprint” for performing work. Like LEGO blocks that are pre-made components.

What is a Task? An “instance” of an Operator in actual use. Like assembled LEGO blocks.

A simple analogy:

  • Operator = Cookie cutter (PythonOperator, BashOperator, etc.)
  • Task = Actual cookies made with that cutter (extract_data, clean_data, etc.)
# PythonOperator: The cookie cutter (template)
# task1: The actual cookie (instance)
task1 = PythonOperator(
    task_id='extract_data',  # Unique name for this Task
    python_callable=extraction_function,
)

Airflow provides various types of Operators ready to use. Let’s look at common Operators:

Operator Type Description Real Use Cases
PythonOperator Executes Python functions Data cleaning, calculations, ML model execution
BashOperator Executes Bash commands File operations, script execution, server commands
EmailOperator Sends emails Task completion/failure notifications
PostgresOperator Executes PostgreSQL queries Data queries, inserts, updates
HttpOperator Makes HTTP API calls Fetching data from external APIs

Key Points:

  • Each Task should handle only one small job (single responsibility principle)
  • Combine Tasks to create complex workflows
  • Each Task must have a unique task_id

Core Components – Airflow’s Internal Architecture

Airflow consists of several components. The Scheduler schedules workflows, the Web Server provides the user interface, and the MetaStore is a database that stores metadata.

Airflow works like a company organization where multiple teams collaborate:

1. Scheduler – Task Planning Manager

  • Role: “When should this DAG run? Is this Task ready?”
  • Continuously checks DAG files
  • Identifies DAGs whose execution time has arrived
  • Verifies task order and dependencies
  • Requests the Executor to “please run this Task”
  • The most critical component! Nothing runs without the Scheduler

2. Web Server – Monitoring Manager

  • Role: “Let me show you which tasks are running and their success/failure status”
  • Provides the web UI (typically at http://localhost:8080)
  • Visualizes DAG execution status
  • Allows manual DAG execution or stopping
  • Provides log access
  • Important: Scheduled tasks continue running even if Web Server stops!

3. Executor – Actual Task Performer

  • Role: “The worker who actually executes tasks”
  • Receives instructions from Scheduler and executes Tasks
  • Multiple types available:
    • SequentialExecutor: Executes one at a time (for testing)
    • LocalExecutor: Executes multiple Tasks concurrently on same server
    • CeleryExecutor: Distributes Tasks across multiple servers
    • KubernetesExecutor: Executes each Task as a Pod in Kubernetes

4. MetaStore – Record Keeper

  • Role: “Stores all execution history and status in a database”
  • What DAGs exist
  • When each Task was executed
  • Success or failure status
  • Execution duration
  • Typically uses PostgreSQL or MySQL

5. Worker – Executor’s Assistant

  • Operates differently depending on Executor type
  • The actual process that executes Python functions or Bash commands

Understanding the Complete Flow:

1. Scheduler: "Oh, time to run this DAG!"
2. Scheduler: "Let's start with the first Task. Executor, run this Task"
3. Executor: "Assigning work to Worker!"
4. Worker: "Executing... Done!"
5. Executor: "Scheduler, first Task completed"
6. MetaStore: "Execution result recorded"
7. Scheduler: "Great, let's run the next Task"
8. Web Server: (to user) "Here's the current progress~"

XCom – Data Transfer Between Tasks

What if Tasks need to exchange data? For example, if the “extract data” Task’s results need to be used by the “clean data” Task?

That’s when you use XCom (Cross-Communication).

Understanding XCom:

  • Task A “pushes” data → stored in MetaStore
  • Task B “pulls” data → retrieved from MetaStore
def task_a(**context):
    result = "important data"
    # Store data (push)
    context['ti'].xcom_push(key='my_data', value=result)

def task_b(**context):
    # Retrieve data (pull)
    data = context['ti'].xcom_pull(key='my_data', task_ids='task_a')
    print(f"Received data: {data}")

Important Notes:

  • XCom should only transfer small data (within a few MB)
  • For large files, use S3 or file systems and pass only the path via XCom

Schedule Interval – Setting Execution Frequency

Schedule Interval determines when to run the DAG.

Various Configuration Methods:

# 1. Cron expression (most precise)
schedule_interval='0 9 * * *'  # Daily at 9 AM

# 2. Preset shortcuts (convenient)
schedule_interval='@daily'     # Daily at midnight
schedule_interval='@hourly'    # Every hour
schedule_interval='@weekly'    # Every Sunday at midnight
schedule_interval='@monthly'   # First day of month at midnight

# 3. timedelta object (simple intervals)
from datetime import timedelta
schedule_interval=timedelta(hours=6)  # Every 6 hours

Understanding Cron Expressions:

* * * * *
│ │ │ │ │
│ │ │ │ └─ Day of week (0-6, 0=Sunday)
│ │ │ └─── Month (1-12)
│ │ └───── Day (1-31)
│ └─────── Hour (0-23)
└───────── Minute (0-59)

Examples:

  • 0 9 * * *: Daily at 9:00 AM
  • 30 14 * * 1: Every Monday at 2:30 PM
  • 0 0 1 * *: First day of each month at midnight
  • */15 * * * *: Every 15 minutes

 

 

3. Installing and Starting Airflow

Let’s install Airflow. I’ll introduce two common methods.

Method 1: Installation via pip (Recommended)

Installation using pip is the officially supported primary installation method. This is the simplest method if you have a Python environment.

Step 1: Create Python Virtual Environment

# Python 3.8 or higher required
python3 -m venv airflow_venv
source airflow_venv/bin/activate  # Windows: airflow_venv\Scripts\activate

Step 2: Set Airflow Home Directory

export AIRFLOW_HOME=~/airflow

Step 3: Install Airflow

# Install latest version (3.1.0 as of October 2025)
AIRFLOW_VERSION=3.1.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Step 4: Initialize Database

airflow db init

This command creates configuration files and a SQLite database in the ~/airflow folder.

Step 5: Create Admin Account

airflow users create \
    --username admin \
    --firstname Admin \
    --lastname User \
    --role Admin \
    --password admin \
    --email admin@example.com

Step 6: Run Airflow

Open two terminals and run each:

# Terminal 1: Run Scheduler
airflow scheduler

# Terminal 2: Run Web Server
airflow webserver --port 8080

Now access the Airflow web interface at http://localhost:8080 in your browser!

Method 2: Docker Installation

Using Docker allows you to containerize all Airflow components for easy management. This method is also good if you have Docker installed.

Step 1: Create Working Directory and Download docker-compose.yaml

mkdir airflow-docker
cd airflow-docker

# Download official docker-compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'

Step 2: Create Required Directories and Set Environment Variables

mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env

Step 3: Initialize Database

docker-compose up airflow-init

Step 4: Run Airflow

docker-compose up -d

Default credentials are username: airflow, password: airflow. Access http://localhost:8080 in your browser!

 

 

4. Writing Your First DAG

Let’s create an actual DAG. We’ll start with a simple example.

Create a my_first_dag.py file in the ~/airflow/dags folder and enter the code below:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

# Define Python function
def print_hello():
    print("Welcome to the Airflow world!")
    return "Task completed"

# Default settings (Default Args)
default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'start_date': datetime(2025, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# DAG definition
with DAG(
    'my_first_dag',
    default_args=default_args,
    description='First Airflow DAG example',
    schedule_interval='@daily',  # Run daily
    catchup=False,
    tags=['tutorial', 'beginner'],
) as dag:

    # Task 1: Execute Bash command
    task1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    # Task 2: Execute Python function
    task2 = PythonOperator(
        task_id='say_hello',
        python_callable=print_hello,
    )

    # Task 3: Another Bash command
    task3 = BashOperator(
        task_id='print_end',
        bash_command='echo "All tasks completed!"',
    )

    # Define task order
    task1 >> task2 >> task3  # Execute in order: task1 → task2 → task3

Code Explanation:

  • default_args: Settings applied to all Tasks
    • owner: DAG owner (responsible person)
    • start_date: When the DAG becomes valid (must be a past date!)
    • retries: Number of retry attempts on failure
    • retry_delay: Interval between retries
  • DAG Parameters:
    • dag_id: Unique DAG name
    • schedule_interval: Execution frequency
    • catchup=False: Skip past date executions
    • tags: Tags for DAG classification

After saving the code and waiting briefly (about 30 seconds to 1 minute), my_first_dag will appear in the Airflow web interface.

Running the DAG

  1. Turn on the toggle button to the left of my_first_dag in the web interface
  2. Click the play button (▶) in the Actions column to run immediately
  3. Click the DAG name to view detailed execution status

Understanding Task Status Colors:

  • 🟢 Green (Success): Completed successfully
  • 🟡 Yellow (Running/Queued): Currently running or waiting
  • 🔴 Red (Failed): Failed
  • Gray (No Status): Not yet executed
  • 🟠 Orange (Upstream Failed): Didn’t run because previous Task failed

 

 

5. Practical Example: Building a Data Processing Pipeline

Let’s build a more practical example: a typical ETL pipeline that reads, processes, and stores CSV files.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import json
import pandas as pd

def extract_data(**context):
    """Data extraction function (Extract)"""
    # Example: Read data from CSV file
    data = {
        'date': ['2025-01-01', '2025-01-02', '2025-01-03'],
        'sales': [1000, 1500, 1200],
        'product': ['A', 'B', 'A']
    }
    
    # Pass data to next task (using XCom)
    context['ti'].xcom_push(key='raw_data', value=data)
    print(f"Data extraction complete: {len(data['date'])} records")
    return "extract_success"

def transform_data(**context):
    """Data transformation function (Transform)"""
    # Get data from previous task
    ti = context['ti']
    raw_data = ti.xcom_pull(key='raw_data', task_ids='extract_data')
    
    # Process data
    df = pd.DataFrame(raw_data)
    total_sales = df['sales'].sum()
    
    processed_data = {
        'total_sales': int(total_sales),
        'record_count': len(df),
        'processed_at': datetime.now().isoformat()
    }
    
    # Pass processed data
    ti.xcom_push(key='processed_data', value=processed_data)
    print(f"Data processing complete: Total sales ${total_sales}")
    return "transform_success"

def load_data(**context):
    """Data loading function (Load)"""
    ti = context['ti']
    processed_data = ti.xcom_pull(key='processed_data', task_ids='transform_data')
    
    # In practice, save to database or file
    print(f"Data loading complete: {json.dumps(processed_data, indent=2)}")
    return "load_success"

def send_notification(**context):
    """Task completion notification function"""
    ti = context['ti']
    processed_data = ti.xcom_pull(key='processed_data', task_ids='transform_data')
    
    message = f"Data pipeline complete! Processed {processed_data['record_count']} records"
    print(f"Notification sent: {message}")
    return "notification_sent"

# DAG default settings
default_args = {
    'owner': 'data_team',
    'start_date': datetime(2025, 1, 1),
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'data_processing_pipeline',
    default_args=default_args,
    description='ETL data processing pipeline',
    schedule_interval='0 9 * * *',  # Run daily at 9 AM
    catchup=False,
    tags=['etl', 'production'],
) as dag:

    # Start notification
    start_task = BashOperator(
        task_id='start_pipeline',
        bash_command='echo "Data pipeline starting: $(date)"',
    )

    # Task 1: Data extraction (Extract)
    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_data,
    )

    # Task 2: Data transformation (Transform)
    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data,
    )

    # Task 3: Data loading (Load)
    load_task = PythonOperator(
        task_id='load_data',
        python_callable=load_data,
    )

    # Task 4: Completion notification
    notify_task = PythonOperator(
        task_id='send_notification',
        python_callable=send_notification,
    )

    # End task
    end_task = BashOperator(
        task_id='end_pipeline',
        bash_command='echo "Data pipeline complete: $(date)"',
    )

    # Define workflow (ETL sequence)
    start_task >> extract_task >> transform_task >> load_task >> notify_task >> end_task

Key Takeaways from This Example:

  • Implementing the ETL (Extract-Transform-Load) pattern
  • Data transfer between Tasks using XCom
  • Sequential task execution flow
  • Start and end notification pattern

 

 

6. Setting Up Connections – Connecting to External Services

For production use of Airflow, you’ll need to connect to external services like databases or APIs. Let’s learn how to set up Connections.

What is a Connection?

A place to store information for accessing external systems (databases, APIs, cloud services, etc.). Think of it like saving “contacts.”

Why is it needed?

  • Writing passwords directly in code is a security risk
  • Connection information can be reused across multiple DAGs
  • Centralized configuration management

Setting Up Connection in Web UI

Step 1: In the web interface top menu, click AdminConnections

Step 2: Click the + button to add a new connection

Step 3: Enter required information

Example) PostgreSQL database connection:

  • Connection Id: my_postgres_db (name to use in code)
  • Connection Type: Select Postgres
  • Host: localhost or DB server address
  • Schema: Database name
  • Login: Username
  • Password: Password
  • Port: 5432 (PostgreSQL default port)

Step 4: Test connection with Test button, then Save

Using Connection in DAG

from airflow.providers.postgres.operators.postgres import PostgresOperator

with DAG('db_example_dag', ...) as dag:
    
    create_table = PostgresOperator(
        task_id='create_table',
        postgres_conn_id='my_postgres_db',  # Use Connection ID
        sql='''
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name VARCHAR(100),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
        '''
    )

Note: To use PostgresOperator, provider installation is required:

pip install apache-airflow-providers-postgres

 

 

7. Useful Tips and Best Practices

Here are some tips for using Airflow effectively.

Testing Tasks

You can test individual Tasks before running the entire DAG.

# Test specific Task only
airflow tasks test [DAG_name] [Task_ID] [date]

# Example
airflow tasks test my_first_dag print_date 2025-01-01

This method is really useful during development as you can run Tasks directly without the scheduler!

Using Variables

It’s convenient to store frequently used values as Variables.

Set in Web UI: AdminVariables → Click +

from airflow.models import Variable

# Use variables in DAG
api_key = Variable.get("my_api_key")
db_name = Variable.get("database_name", default_var="default_db")

DAG File Location and Structure

DAG files must be located in the $AIRFLOW_HOME/dags folder. This path can be changed in the airflow.cfg file.

# Check DAG folder path
echo $AIRFLOW_HOME/dags

# Typically: ~/airflow/dags

Recommended Folder Structure:

~/airflow/
├── dags/
│   ├── daily_reports.py
│   ├── weekly_backup.py
│   └── utils/           # Common functions
│       └── helpers.py
├── plugins/             # Custom Operators
├── logs/                # Execution logs
└── airflow.cfg          # Configuration file

Important Considerations

Airflow should not be used for data processing itself.

Correct Usage:

  • ✅ Airflow: Task orchestration, scheduling, monitoring
  • ✅ Spark/Pandas: Actual large-scale data processing
  • ✅ Execute Spark jobs from Airflow and check results only

Incorrect Usage:

  • ❌ Processing tens of GB of data directly in Airflow Tasks
  • ❌ Performing complex calculations directly in Airflow Python functions

Other Best Practices:

  • Keep Tasks small and simple (one responsibility only!)
  • Use clear, descriptive Task IDs (extract_sales_data rather than task1)
  • Configure retries to handle failures
  • Set up notifications for critical tasks

 

 

8. Where Can You Use Airflow?

Airflow has a wide range of applications:

Data Engineering

  • Building ETL/ELT pipelines
  • Data warehouse updates
  • Data quality testing automation
  • Real-time data collection and processing

Machine Learning/MLOps

  • Model training pipelines (periodic retraining)
  • Batch prediction jobs
  • Model performance monitoring
  • Feature engineering automation

Business Automation

  • Regular report generation and distribution
  • Data synchronization (between systems)
  • KPI dashboard updates
  • System monitoring and alerts

Development/Operations

  • Database backup automation
  • Log collection and analysis
  • Infrastructure management tasks
  • Batch job scheduling

 

 

9. Cloud Managed Services

If managing Airflow yourself feels overwhelming, consider cloud managed services:

Service Provider Features
Amazon MWAA AWS Tight AWS integration, S3-based DAG management
Cloud Composer Google Cloud Optimized for GCP, easy BigQuery integration
Azure Data Factory Microsoft Airflow integrated with Fabric, Azure service integration

Managed Service Advantages:

  • Automated installation, upgrades, and patching
  • Pre-configured monitoring and logging
  • Automatic scalability management
  • Automatic security updates

Disadvantages:

  • Higher costs
  • Limited customization
  • Cloud vendor lock-in

 

 

10. Resources for Further Learning

If you want to study Airflow more deeply, I recommend these resources:

 

 

Apache Airflow is a powerful tool that enables simple and systematic management of complex data workflows. Concepts like DAG, Task, and Operator may seem unfamiliar at first, but once you get used to them, you’ll wonder “how did I work without this?”

Key Summary:

  • DAG: Workflow defining task flow
  • Task: Actual work unit to be performed
  • Operator: Template for creating Tasks
  • Scheduler: Decides when and what to execute
  • Executor: Actually performs the work

Now that you’ve taken your first steps with Airflow through today’s explanation, try applying it to your actual work. I recommend starting with small tasks and gradually expanding to more complex pipelines. 🙂

 

 

Leave a Reply