Have you ever thought, “I need to collect, clean, and analyze data at the same time every day… can’t this be automated?” That’s exactly when Apache Airflow comes in handy.
With version 3.1.0 released in September 2025, Airflow remains one of the most actively used workflow orchestration tools in modern data engineering. Today, I’ll walk you through this powerful tool so that even beginners can understand and start using it effectively.
1. What Exactly Does Apache Airflow Do?
Apache Airflow is an open-source platform for programmatically authoring, scheduling, and monitoring workflows.
Problems That Airflow Solves
Previously, people automated tasks using Cron or shell scripts, but these approaches had several issues:
- Dependency Management Challenges: Managing execution order like “Task B must run after Task A completes” becomes complex
- Lack of Failure Handling: When tasks fail, it’s difficult to identify where the failure occurred and restart
- Limited Monitoring: Hard to know which tasks are running and when they’ll complete
- Scalability Issues: Managing hundreds or thousands of tasks becomes impossible
Airflow solves all these problems. Simply put, it’s a “smart task manager” that automatically executes and manages multiple tasks in a defined order.
Understanding Through Real-World Examples
Let’s say you need to perform these tasks every day at 7 AM:
- Extract yesterday’s sales data from the e-commerce database
- Clean and analyze the data
- Update results to the dashboard
- Send alerts to stakeholders if anomalies are detected
Doing this manually every day would be exhausting. With Airflow:
- Write it once in Python code
- Runs automatically at scheduled times
- Receive notifications when issues occur at any stage
- Restart from the failed step
- Monitor the entire process on a web interface
2. Understanding Core Concepts of Airflow
To use Airflow effectively, you need to understand a few core concepts. Let’s go through them one by one.
DAG (Directed Acyclic Graph) – Workflow Blueprint
A DAG defines the relationships and execution order of tasks in a workflow.
Breaking Down the Term:
- Directed: Tasks flow in one direction, A → B
- Acyclic: Once you go A → B → C, you don’t loop back to A
- Graph: A diagram connecting tasks
Simply put, it’s your “task execution plan.” Like cooking, there’s an order:
Prep ingredients → Chop → Cook → Plate
It progresses in sequence, and once you “plate,” you don’t go back to “prep ingredients.” That’s a DAG!
For the e-commerce example above, the DAG would look like:
Extract data → Clean data → Analyze → Update dashboard
↓
Send alerts
DAG Characteristics:
- One DAG = One complete workflow
- Define one DAG per Python file
- Can schedule to run daily, weekly, or at specific times
Tasks and Operators – Actual Work Units
Here’s an important concept: the relationship between Tasks and Operators.
What is an Operator? A “template” or “blueprint” for performing work. Like LEGO blocks that are pre-made components.
What is a Task? An “instance” of an Operator in actual use. Like assembled LEGO blocks.
A simple analogy:
- Operator = Cookie cutter (PythonOperator, BashOperator, etc.)
- Task = Actual cookies made with that cutter (extract_data, clean_data, etc.)
# PythonOperator: The cookie cutter (template)
# task1: The actual cookie (instance)
task1 = PythonOperator(
task_id='extract_data', # Unique name for this Task
python_callable=extraction_function,
)
Airflow provides various types of Operators ready to use. Let’s look at common Operators:
Operator Type | Description | Real Use Cases |
---|---|---|
PythonOperator | Executes Python functions | Data cleaning, calculations, ML model execution |
BashOperator | Executes Bash commands | File operations, script execution, server commands |
EmailOperator | Sends emails | Task completion/failure notifications |
PostgresOperator | Executes PostgreSQL queries | Data queries, inserts, updates |
HttpOperator | Makes HTTP API calls | Fetching data from external APIs |
Key Points:
- Each Task should handle only one small job (single responsibility principle)
- Combine Tasks to create complex workflows
- Each Task must have a unique
task_id
Core Components – Airflow’s Internal Architecture
Airflow consists of several components. The Scheduler schedules workflows, the Web Server provides the user interface, and the MetaStore is a database that stores metadata.
Airflow works like a company organization where multiple teams collaborate:
1. Scheduler – Task Planning Manager
- Role: “When should this DAG run? Is this Task ready?”
- Continuously checks DAG files
- Identifies DAGs whose execution time has arrived
- Verifies task order and dependencies
- Requests the Executor to “please run this Task”
- The most critical component! Nothing runs without the Scheduler
2. Web Server – Monitoring Manager
- Role: “Let me show you which tasks are running and their success/failure status”
- Provides the web UI (typically at http://localhost:8080)
- Visualizes DAG execution status
- Allows manual DAG execution or stopping
- Provides log access
- Important: Scheduled tasks continue running even if Web Server stops!
3. Executor – Actual Task Performer
- Role: “The worker who actually executes tasks”
- Receives instructions from Scheduler and executes Tasks
- Multiple types available:
- SequentialExecutor: Executes one at a time (for testing)
- LocalExecutor: Executes multiple Tasks concurrently on same server
- CeleryExecutor: Distributes Tasks across multiple servers
- KubernetesExecutor: Executes each Task as a Pod in Kubernetes
4. MetaStore – Record Keeper
- Role: “Stores all execution history and status in a database”
- What DAGs exist
- When each Task was executed
- Success or failure status
- Execution duration
- Typically uses PostgreSQL or MySQL
5. Worker – Executor’s Assistant
- Operates differently depending on Executor type
- The actual process that executes Python functions or Bash commands
Understanding the Complete Flow:
1. Scheduler: "Oh, time to run this DAG!"
2. Scheduler: "Let's start with the first Task. Executor, run this Task"
3. Executor: "Assigning work to Worker!"
4. Worker: "Executing... Done!"
5. Executor: "Scheduler, first Task completed"
6. MetaStore: "Execution result recorded"
7. Scheduler: "Great, let's run the next Task"
8. Web Server: (to user) "Here's the current progress~"
XCom – Data Transfer Between Tasks
What if Tasks need to exchange data? For example, if the “extract data” Task’s results need to be used by the “clean data” Task?
That’s when you use XCom (Cross-Communication).
Understanding XCom:
- Task A “pushes” data → stored in MetaStore
- Task B “pulls” data → retrieved from MetaStore
def task_a(**context):
result = "important data"
# Store data (push)
context['ti'].xcom_push(key='my_data', value=result)
def task_b(**context):
# Retrieve data (pull)
data = context['ti'].xcom_pull(key='my_data', task_ids='task_a')
print(f"Received data: {data}")
Important Notes:
- XCom should only transfer small data (within a few MB)
- For large files, use S3 or file systems and pass only the path via XCom
Schedule Interval – Setting Execution Frequency
Schedule Interval determines when to run the DAG.
Various Configuration Methods:
# 1. Cron expression (most precise)
schedule_interval='0 9 * * *' # Daily at 9 AM
# 2. Preset shortcuts (convenient)
schedule_interval='@daily' # Daily at midnight
schedule_interval='@hourly' # Every hour
schedule_interval='@weekly' # Every Sunday at midnight
schedule_interval='@monthly' # First day of month at midnight
# 3. timedelta object (simple intervals)
from datetime import timedelta
schedule_interval=timedelta(hours=6) # Every 6 hours
Understanding Cron Expressions:
* * * * *
│ │ │ │ │
│ │ │ │ └─ Day of week (0-6, 0=Sunday)
│ │ │ └─── Month (1-12)
│ │ └───── Day (1-31)
│ └─────── Hour (0-23)
└───────── Minute (0-59)
Examples:
0 9 * * *
: Daily at 9:00 AM30 14 * * 1
: Every Monday at 2:30 PM0 0 1 * *
: First day of each month at midnight*/15 * * * *
: Every 15 minutes
3. Installing and Starting Airflow
Let’s install Airflow. I’ll introduce two common methods.
Method 1: Installation via pip (Recommended)
Installation using pip is the officially supported primary installation method. This is the simplest method if you have a Python environment.
Step 1: Create Python Virtual Environment
# Python 3.8 or higher required
python3 -m venv airflow_venv
source airflow_venv/bin/activate # Windows: airflow_venv\Scripts\activate
Step 2: Set Airflow Home Directory
export AIRFLOW_HOME=~/airflow
Step 3: Install Airflow
# Install latest version (3.1.0 as of October 2025)
AIRFLOW_VERSION=3.1.0
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
Step 4: Initialize Database
airflow db init
This command creates configuration files and a SQLite database in the ~/airflow
folder.
Step 5: Create Admin Account
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--password admin \
--email admin@example.com
Step 6: Run Airflow
Open two terminals and run each:
# Terminal 1: Run Scheduler
airflow scheduler
# Terminal 2: Run Web Server
airflow webserver --port 8080
Now access the Airflow web interface at http://localhost:8080
in your browser!
Method 2: Docker Installation
Using Docker allows you to containerize all Airflow components for easy management. This method is also good if you have Docker installed.
Step 1: Create Working Directory and Download docker-compose.yaml
mkdir airflow-docker
cd airflow-docker
# Download official docker-compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
Step 2: Create Required Directories and Set Environment Variables
mkdir -p ./dags ./logs ./plugins ./config
echo -e "AIRFLOW_UID=$(id -u)" > .env
Step 3: Initialize Database
docker-compose up airflow-init
Step 4: Run Airflow
docker-compose up -d
Default credentials are username: airflow
, password: airflow
. Access http://localhost:8080
in your browser!
4. Writing Your First DAG
Let’s create an actual DAG. We’ll start with a simple example.
Create a my_first_dag.py
file in the ~/airflow/dags
folder and enter the code below:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# Define Python function
def print_hello():
print("Welcome to the Airflow world!")
return "Task completed"
# Default settings (Default Args)
default_args = {
'owner': 'admin',
'depends_on_past': False,
'start_date': datetime(2025, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# DAG definition
with DAG(
'my_first_dag',
default_args=default_args,
description='First Airflow DAG example',
schedule_interval='@daily', # Run daily
catchup=False,
tags=['tutorial', 'beginner'],
) as dag:
# Task 1: Execute Bash command
task1 = BashOperator(
task_id='print_date',
bash_command='date',
)
# Task 2: Execute Python function
task2 = PythonOperator(
task_id='say_hello',
python_callable=print_hello,
)
# Task 3: Another Bash command
task3 = BashOperator(
task_id='print_end',
bash_command='echo "All tasks completed!"',
)
# Define task order
task1 >> task2 >> task3 # Execute in order: task1 → task2 → task3
Code Explanation:
- default_args: Settings applied to all Tasks
owner
: DAG owner (responsible person)start_date
: When the DAG becomes valid (must be a past date!)retries
: Number of retry attempts on failureretry_delay
: Interval between retries
- DAG Parameters:
dag_id
: Unique DAG nameschedule_interval
: Execution frequencycatchup=False
: Skip past date executionstags
: Tags for DAG classification
After saving the code and waiting briefly (about 30 seconds to 1 minute), my_first_dag
will appear in the Airflow web interface.
Running the DAG
- Turn on the toggle button to the left of
my_first_dag
in the web interface - Click the play button (▶) in the Actions column to run immediately
- Click the DAG name to view detailed execution status
Understanding Task Status Colors:
- 🟢 Green (Success): Completed successfully
- 🟡 Yellow (Running/Queued): Currently running or waiting
- 🔴 Red (Failed): Failed
- ⚪ Gray (No Status): Not yet executed
- 🟠 Orange (Upstream Failed): Didn’t run because previous Task failed
5. Practical Example: Building a Data Processing Pipeline
Let’s build a more practical example: a typical ETL pipeline that reads, processes, and stores CSV files.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import json
import pandas as pd
def extract_data(**context):
"""Data extraction function (Extract)"""
# Example: Read data from CSV file
data = {
'date': ['2025-01-01', '2025-01-02', '2025-01-03'],
'sales': [1000, 1500, 1200],
'product': ['A', 'B', 'A']
}
# Pass data to next task (using XCom)
context['ti'].xcom_push(key='raw_data', value=data)
print(f"Data extraction complete: {len(data['date'])} records")
return "extract_success"
def transform_data(**context):
"""Data transformation function (Transform)"""
# Get data from previous task
ti = context['ti']
raw_data = ti.xcom_pull(key='raw_data', task_ids='extract_data')
# Process data
df = pd.DataFrame(raw_data)
total_sales = df['sales'].sum()
processed_data = {
'total_sales': int(total_sales),
'record_count': len(df),
'processed_at': datetime.now().isoformat()
}
# Pass processed data
ti.xcom_push(key='processed_data', value=processed_data)
print(f"Data processing complete: Total sales ${total_sales}")
return "transform_success"
def load_data(**context):
"""Data loading function (Load)"""
ti = context['ti']
processed_data = ti.xcom_pull(key='processed_data', task_ids='transform_data')
# In practice, save to database or file
print(f"Data loading complete: {json.dumps(processed_data, indent=2)}")
return "load_success"
def send_notification(**context):
"""Task completion notification function"""
ti = context['ti']
processed_data = ti.xcom_pull(key='processed_data', task_ids='transform_data')
message = f"Data pipeline complete! Processed {processed_data['record_count']} records"
print(f"Notification sent: {message}")
return "notification_sent"
# DAG default settings
default_args = {
'owner': 'data_team',
'start_date': datetime(2025, 1, 1),
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'data_processing_pipeline',
default_args=default_args,
description='ETL data processing pipeline',
schedule_interval='0 9 * * *', # Run daily at 9 AM
catchup=False,
tags=['etl', 'production'],
) as dag:
# Start notification
start_task = BashOperator(
task_id='start_pipeline',
bash_command='echo "Data pipeline starting: $(date)"',
)
# Task 1: Data extraction (Extract)
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
)
# Task 2: Data transformation (Transform)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
)
# Task 3: Data loading (Load)
load_task = PythonOperator(
task_id='load_data',
python_callable=load_data,
)
# Task 4: Completion notification
notify_task = PythonOperator(
task_id='send_notification',
python_callable=send_notification,
)
# End task
end_task = BashOperator(
task_id='end_pipeline',
bash_command='echo "Data pipeline complete: $(date)"',
)
# Define workflow (ETL sequence)
start_task >> extract_task >> transform_task >> load_task >> notify_task >> end_task
Key Takeaways from This Example:
- Implementing the ETL (Extract-Transform-Load) pattern
- Data transfer between Tasks using XCom
- Sequential task execution flow
- Start and end notification pattern
6. Setting Up Connections – Connecting to External Services
For production use of Airflow, you’ll need to connect to external services like databases or APIs. Let’s learn how to set up Connections.
What is a Connection?
A place to store information for accessing external systems (databases, APIs, cloud services, etc.). Think of it like saving “contacts.”
Why is it needed?
- Writing passwords directly in code is a security risk
- Connection information can be reused across multiple DAGs
- Centralized configuration management
Setting Up Connection in Web UI
Step 1: In the web interface top menu, click Admin → Connections
Step 2: Click the + button to add a new connection
Step 3: Enter required information
Example) PostgreSQL database connection:
- Connection Id:
my_postgres_db
(name to use in code) - Connection Type: Select
Postgres
- Host:
localhost
or DB server address - Schema: Database name
- Login: Username
- Password: Password
- Port:
5432
(PostgreSQL default port)
Step 4: Test connection with Test button, then Save
Using Connection in DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
with DAG('db_example_dag', ...) as dag:
create_table = PostgresOperator(
task_id='create_table',
postgres_conn_id='my_postgres_db', # Use Connection ID
sql='''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
'''
)
Note: To use PostgresOperator, provider installation is required:
pip install apache-airflow-providers-postgres
7. Useful Tips and Best Practices
Here are some tips for using Airflow effectively.
Testing Tasks
You can test individual Tasks before running the entire DAG.
# Test specific Task only
airflow tasks test [DAG_name] [Task_ID] [date]
# Example
airflow tasks test my_first_dag print_date 2025-01-01
This method is really useful during development as you can run Tasks directly without the scheduler!
Using Variables
It’s convenient to store frequently used values as Variables.
Set in Web UI: Admin → Variables → Click +
from airflow.models import Variable
# Use variables in DAG
api_key = Variable.get("my_api_key")
db_name = Variable.get("database_name", default_var="default_db")
DAG File Location and Structure
DAG files must be located in the $AIRFLOW_HOME/dags
folder. This path can be changed in the airflow.cfg
file.
# Check DAG folder path
echo $AIRFLOW_HOME/dags
# Typically: ~/airflow/dags
Recommended Folder Structure:
~/airflow/
├── dags/
│ ├── daily_reports.py
│ ├── weekly_backup.py
│ └── utils/ # Common functions
│ └── helpers.py
├── plugins/ # Custom Operators
├── logs/ # Execution logs
└── airflow.cfg # Configuration file
Important Considerations
Airflow should not be used for data processing itself.
Correct Usage:
- ✅ Airflow: Task orchestration, scheduling, monitoring
- ✅ Spark/Pandas: Actual large-scale data processing
- ✅ Execute Spark jobs from Airflow and check results only
Incorrect Usage:
- ❌ Processing tens of GB of data directly in Airflow Tasks
- ❌ Performing complex calculations directly in Airflow Python functions
Other Best Practices:
- Keep Tasks small and simple (one responsibility only!)
- Use clear, descriptive Task IDs (
extract_sales_data
rather thantask1
) - Configure
retries
to handle failures - Set up notifications for critical tasks
8. Where Can You Use Airflow?
Airflow has a wide range of applications:
Data Engineering
- Building ETL/ELT pipelines
- Data warehouse updates
- Data quality testing automation
- Real-time data collection and processing
Machine Learning/MLOps
- Model training pipelines (periodic retraining)
- Batch prediction jobs
- Model performance monitoring
- Feature engineering automation
Business Automation
- Regular report generation and distribution
- Data synchronization (between systems)
- KPI dashboard updates
- System monitoring and alerts
Development/Operations
- Database backup automation
- Log collection and analysis
- Infrastructure management tasks
- Batch job scheduling
9. Cloud Managed Services
If managing Airflow yourself feels overwhelming, consider cloud managed services:
Service | Provider | Features |
---|---|---|
Amazon MWAA | AWS | Tight AWS integration, S3-based DAG management |
Cloud Composer | Google Cloud | Optimized for GCP, easy BigQuery integration |
Azure Data Factory | Microsoft | Airflow integrated with Fabric, Azure service integration |
Managed Service Advantages:
- Automated installation, upgrades, and patching
- Pre-configured monitoring and logging
- Automatic scalability management
- Automatic security updates
Disadvantages:
- Higher costs
- Limited customization
- Cloud vendor lock-in
10. Resources for Further Learning
If you want to study Airflow more deeply, I recommend these resources:
- Official Documentation: https://airflow.apache.org/docs/ – Most accurate and up-to-date information
- GitHub Repository: https://github.com/apache/airflow – Source code and examples
- Official Announcements: https://airflow.apache.org/announcements/ – Latest release information
- Slack Community: Apache Airflow Slack – Connect with users worldwide
Apache Airflow is a powerful tool that enables simple and systematic management of complex data workflows. Concepts like DAG, Task, and Operator may seem unfamiliar at first, but once you get used to them, you’ll wonder “how did I work without this?”
Key Summary:
- DAG: Workflow defining task flow
- Task: Actual work unit to be performed
- Operator: Template for creating Tasks
- Scheduler: Decides when and what to execute
- Executor: Actually performs the work
Now that you’ve taken your first steps with Airflow through today’s explanation, try applying it to your actual work. I recommend starting with small tasks and gradually expanding to more complex pipelines. 🙂