Mastering Apache Airflow, Part 2: Orchestrating Scalable and Efficient Data Workflows
Unlocking Airflow's Power: An In-Depth Exploration of Executors, Auth Managers, Object Storage, and XComs
Let’s Begin!🌟
Airflow has evolved into one of the most powerful tools for orchestrating complex workflows and managing distributed task execution. However, mastering Airflow’s capabilities goes beyond just defining some tasks and setting schedules. In this deep dive, we will explore some of the core components that make Airflow flexible, scalable, and efficient: Executors, Authentication Managers, Object Storage, and XComs.
Each of these elements plays a critical role in enabling you to build workflows that are not only high-performing but also secure and adaptable to a wide variety of infrastructures. From choosing the right executor to understanding how XComs allow tasks to communicate, we’ll break down the intricacies of these components and show you how to unlock their full potential.
Whether you’re scaling out your Airflow deployment, managing task execution across different environments, or storing and sharing large datasets efficiently, this exploration will give you the tools you need to take your Airflow workflows to the next level. Let’s dive in and discover how these powerful features can be leveraged for more robust, production-ready pipeline management! 🚀
Executors in Airflow ⚙️: Types, Setup & Customization
In Airflow, executors serve as the mechanism for running task instances, providing a modular, pluggable architecture that supports a variety of deployment strategies—from local development environments to scalable, production-grade distributed systems. Executors are specified in the [core] section of the airflow.cfg configuration file via the executor option.
Built-In Executors
Airflow includes several built-in executors, such as SequentialExecutor, LocalExecutor, CeleryExecutor, and KubernetesExecutor, each with distinct operational characteristics.
Executors are broadly categorized into local and remote types. Local executors (e.g., SequentialExecutor, LocalExecutor) execute tasks within the scheduler’s own process or thread pool. These are easy to set up and have minimal latency but are resource-constrained and not suitable for high-throughput or long-running jobs.
The SequentialExecutor, Airflow's default, is particularly limited because it executes one task at a time—making it ideal for testing but unsuitable for production. The LocalExecutor, on the other hand, allows parallel task execution using subprocesses on a single machine and is a viable option for small-scale production setups.
A jump into remote executors
Remote executors decouple task execution from the scheduler and dispatch tasks to distributed worker nodes. They are further divided into queued/batch-based and containerized models. Queued executors like the CeleryExecutor rely on a central broker (e.g., Redis or RabbitMQ) and persistent workers that pull tasks from the queue.
These allow for horizontal scaling and better resource management but come with complexity in setup and potential "noisy neighbor" issues if multiple tasks share the same host. Containerized executors, such as the KubernetesExecutor and EcsExecutor, launch a dedicated container for each task. This model offers strong isolation, flexible runtime environments, and efficient use of compute resources for ephemeral workloads, though it introduces latency on task startup and requires orchestration infrastructure like Kubernetes or ECS.
Multiple executors
Starting with Airflow 2.10.0, users can now configure multiple executors concurrently using a comma-separated list. This enables hybrid deployment strategies, such as using LocalExecutor for short, lightweight tasks and KubernetesExecutor for compute-heavy or isolated workloads. Each task or DAG can specify the desired executor via the executor parameter, either directly on the operator or through default_args in the DAG. If a task doesn’t explicitly specify an executor, the first one listed in the configuration is used as the default. Executors can also be aliased using the : notation (e.g., KubernetesExecutor:isolated) for convenience and clarity within DAG definitions.
Behind the scenes, all executors implement a common interface, the BaseExecutor, which defines the contract between the executor and the Airflow scheduler. To build a custom executor, developers must override methods such as sync() (for syncing state and executing queued tasks) and execute_async() (for submitting tasks for asynchronous execution). Optional methods like start(), end(), terminate(), and cleanup_stuck_queued_tasks() enhance lifecycle management, reliability, and observability. Executors can also expose custom CLI commands through get_cli_commands() and contribute additional context to task logs via get_task_log()—a feature leveraged by the KubernetesExecutor to surface pod-level logs in the Airflow UI.
Several compatibility attributes on BaseExecutor—like supports_pickling, is_local, is_single_threaded, and is_production—guide Airflow’s internal behavior and UI messaging depending on the executor’s characteristics. Notably, hybrid executors like LocalKubernetesExecutor and CeleryKubernetesExecutor have historically combined two executor types via the queue field, but this method is now discouraged due to maintenance complexity and functional limitations.
The takeaway
Ultimately, the executor is one of the most critical components in an Airflow deployment, deeply influencing performance, scalability, isolation, and operational simplicity. Whether using built-in executors or developing a custom one tailored to specific infrastructure or compute environments, understanding how executors work and how to configure them appropriately is essential to building a reliable and efficient workflow orchestration system.
🔐 Auth Managers in Airflow
In Apache Airflow, managing who can access your workflows and what they can do with them is critical. That’s where the Auth Manager comes in—a powerful component that handles both authentication (verifying who you are) and authorization (deciding what you can access). What makes Airflow’s auth manager so flexible is its pluggable nature: you can easily swap out the default authentication system with something that fits your organization’s needs. Whether you're running Airflow in a small team or scaling to thousands of users, the ability to tailor the authentication system is essential for security and efficiency.
Customizing Airflow's Authentication
By default, Airflow uses Flask AppBuilder (FAB) as its auth manager. However, Airflow allows you to create a custom auth manager if you have unique needs—maybe you're integrating a custom identity provider or leveraging a cloud-native IAM system. The whole system is designed around the BaseAuthManager interface, which gives you control over user authentication and authorization operations. With just a few lines of configuration, you can plug in your custom solution.
Let’s take a look at how easy it is to configure a custom Auth Manager. First, you need to implement the BaseAuthManager interface. This interface provides methods like is_logged_in(), get_user(), get_url_login(), and get_url_logout(), which handle the login and logout processes. But the magic happens when it comes to authorization. Airflow uses HTTP method-style verbs like GET, POST, PUT, and DELETE to determine what actions a user can perform on specific resources.
For example, let’s say you want to check if a user has permission to access a specific DAG. You’d use the is_authorized_dag() method, passing in details about the DAG and the type of action (like GET for read access):
auth_manager.is_authorized_dag(method="GET", access_entity=DagAccessEntity.Run, details=DagDetails(id="my_dag"))
This will check whether the user has permission to read DAG runs for a DAG with the ID my_dag. You can apply the same logic to other resources like connections, variables, and pools, ensuring that your users only see what they're authorized to access.
Performance matters too—if you’re dealing with hundreds of users and thousands of resources, you’ll want to optimize these checks. Airflow supports batch authorization methods like batch_is_authorized_dag() to check permissions for multiple items in one go, saving you from having to make individual calls for each item.
If you're integrating your custom auth manager into your Airflow environment, it’s straightforward to configure it in airflow.cfg:
[core]
auth_manager = my_company.auth_managers.MyCustomAuthManager
Additionally, your custom auth manager can expose CLI commands and REST API endpoints to manage user roles, permissions, and groups. For example, you might want to define custom CLI commands for user management:
@staticmethod
def get_cli_commands() -> list[CLICommand]:
sub_commands = [
ActionCommand(
name="add_user",
help="Add a new user to the system",
func=lazy_load_command("my_company.auth_managers.add_user"),
),
]
return [
GroupCommand(
name="my_auth_manager",
help="Manage authentication resources",
subcommands=sub_commands,
),
]This flexibility makes Airflow a powerful tool for managing workflows at scale, ensuring that each user has the right level of access and that the system remains secure as you grow.
However, a word of caution—switching auth managers is not a light operation. It will affect user experiences, sign-in processes, and could disrupt workflows if not carefully planned. All users and their permissions will need to be migrated over to the new system.
Once you’ve crafted your custom auth manager, you can seamlessly integrate it with Airflow and ensure that your workflows are both secure and easy to manage.
And…What about Object Storage?☁️
In Apache Airflow 2.8.0, a new experimental feature for Object Storage was finally introduced, designed to simplify the use of cloud object storage systems such as S3, Google Cloud Storage (GCS), and Azure Blob Storage in your workflows. Object storage systems, unlike traditional file systems, store data as objects, each identified by a unique name, making them scalable, highly available, and easy to manage for massive data volumes. These systems rely on HTTP REST operations to interact with data, which is why they offer distinct performance and feature characteristics compared to file systems.
The great thing about Airflow’s abstraction is that it allows you to work with various object storage systems without needing to modify your code. This abstraction leverages standard Python modules that can handle file-like objects, such as shutil, allowing Airflow to handle these storage systems more like local files. However, you'll need to install the appropriate Airflow provider to enable support for each system. For instance, to use Google Cloud Storage (GCS), you’ll need the apache-airflow-providers-google package, while for S3, the apache-airflow-providers-amazon[s3fs] provider is necessary.
Limitations of Object Storage Systems
Although object storage systems can resemble file systems, they do not offer all the same capabilities. For instance, atomic renaming is not guaranteed. If you move a file from one location to another and the operation fails halfway through, you might lose the file. Additionally, emulated directories in object storage can be slow to interact with, as operations like listing files require fetching all objects and filtering by prefix. Moreover, seeking within a file often incurs performance overhead or may be unsupported.
Airflow uses fsspec to provide a consistent interface across different object storage systems, and it also includes local file caching to improve access speed. That said, you should still design your DAGs with these limitations in mind, especially if your workflows rely on frequent interactions with object storage systems.
Basic Usage
To interact with object storage, you create an ObjectStoragePath object that represents the URI of the object you wish to interact with. For example, to point to an S3 bucket, you can use:
from airflow.io.path import ObjectStoragePath base = ObjectStoragePath("s3://aws_default@my-bucket/")This represents an object at the specified path. You can also pass the connection ID separately, which is handy when you want to abstract away the connection settings for portability:
base = ObjectStoragePath("s3://my-bucket/", conn_id="aws_default")Listing Files
Here’s how you can list the files within a specific bucket:
@task def list_files() -> list[ObjectStoragePath]: files = [f for f in base.iterdir() if f.is_file()] return filesNavigating a Directory Tree
You can also easily navigate through the directory tree by leveraging the / operator:
base = ObjectStoragePath("s3://my-bucket/") subdir = base / "subdir" print(subdir) # Outputs: ObjectStoragePath("s3://my-bucket/subdir")Opening and Writing Files
To open and read a file stored in object storage, use the open() method. Here’s an example of reading a file:
@task def read_file(path: ObjectStoragePath) -> str: with path.open() as f: return f.read()You can also leverage Airflow's XCom to pass paths between tasks:
@task def create(path: ObjectStoragePath) -> ObjectStoragePath: return path / "new_file.txt" @task def write_file(path: ObjectStoragePath, content: str): with path.open("wb") as f: f.write(content) new_file = create(base) write = write_file(new_file, b"data") read >> write # Task orderConfiguration
The object storage abstraction in Airflow doesn’t require much configuration; it leverages the standard Airflow connection mechanism. If you're working with S3, for example, your connection might include aws_access_key_id and aws_secret_access_key, along with other options like endpoint_url for custom endpoints. This makes it easy to handle your credentials and settings securely across different cloud environments.
Alternative Backends
Airflow also allows you to configure an alternative backend for a scheme, which is useful if you're using a non-standard object storage system. For example, to configure Databricks' DBFS backend for the dbfs scheme:
from airflow.io.path import ObjectStoragePath from airflow.io.store import attach from fsspec.implementations.dbfs import DBFSFileSystem attach(protocol="dbfs", fs=DBFSFileSystem(instance="myinstance", token="mytoken")) base = ObjectStoragePath("dbfs://my-location/")By attaching the backend at the top level of your DAG, you ensure it’s available for all tasks within that DAG.
Path API Extensions
The ObjectStoragePath class builds upon Universal Pathlib and supports additional operations beyond the standard file system API, such as mkdir, touch, stat, bucket, and more. These operations are essential for interacting with object storage systems, as they don’t natively support features like directories or atomic operations in the same way local file systems do.
For example, you can create a directory (which may just create a directory entry):
base.mkdir(parents=True) # Creates any missing parent directoriesYou can also compute the checksum of a file:
checksum = base.checksum() # Returns the checksum of the fileThese extensions make it easy to perform common operations on object storage, without needing to dive deeply into the specifics of each cloud provider's API.
External Integrations
Beyond Airflow’s native usage, you can also integrate with other tools like DuckDB and Apache Iceberg by passing the underlying fsspec implementation. For instance, with DuckDB, you can register your object storage path for seamless integration:
import duckdb from airflow.io.path import ObjectStoragePath path = ObjectStoragePath("s3://my-bucket/my-table.parquet", conn_id="aws_default") conn = duckdb.connect(database=":memory:") conn.register_filesystem(path.fs) conn.execute(f"CREATE OR REPLACE TABLE my_table AS SELECT * FROM read_parquet('{path}');")This feature adds great flexibility when dealing with large-scale data storage, especially in cloud-native environments. It provides a unified interface across various cloud storage systems while ensuring your DAGs remain clean, portable, and secure. You should keep in mind that the feature is still experimental, so be sure to test it thoroughly before relying on it for production workflows.
What are XComs?🚀
In Apache Airflow, XComs (short for "cross-communications") are like little messengers that help tasks talk to each other. Since tasks in a DAG (Directed Acyclic Graph) run independently and might be on different machines, XComs allow them to pass small amounts of data around. Think of them like post-it notes in the Airflow world that tasks can read or write.
XComs are really helpful because tasks often need to share results or inputs with each other. For example, one task might calculate a number, and another task might use that number to do something else. Instead of having tasks share files or databases, XComs allow them to quickly communicate through a simple push and pull mechanism.
How XComs Work:
Push: When a task finishes its work, it can "push" a result to XCom.
Pull: A different task can then "pull" that result from XCom to use in its execution.
You can think of XComs like a shared data store that’s unique to each task and DAG run. XComs are great for passing around small, serializable data (like strings, numbers, or JSON objects). But don’t use them for huge things like dataframes — they are not designed for large datasets.
But….How to Use XComs properly?
1. Pushing Data to XCom
When you want to send data from one task to another, you use the xcom_push method. Here’s an example:
task_instance.xcom_push(key="some_key", value=42)
Here, key="some_key" is the unique identifier for the value, and value=42 is the data we’re sending. It could be anything serializable — a string, number, list, dictionary, etc.
2. Pulling Data from XCom
To get the data from XCom in a later task, you use the xcom_pull method:
value = task_instance.xcom_pull(key="some_key", task_ids="task-1")
If you don’t specify a key, Airflow will default to pulling the return_value, which is the value automatically pushed by most Airflow operators and tasks.
3. Using XComs in Templates
You can also use XComs in templates, like in SQL queries or shell commands. For example, let’s say you pushed the name of a table into XCom in one task, and you want to use it in a SQL query in another task:
SELECT * FROM {{ task_instance.xcom_pull(task_ids='task-1', key='table_name') }}The result from the first task will be inserted into your query dynamically.
4. Pushing Multiple XComs
If you need to push more than one piece of data, Airflow lets you return a dictionary of key-value pairs. Set do_xcom_push=True and multiple_outputs=True in your task:
@task(do_xcom_push=True, multiple_outputs=True)
def push_multiple_xcoms():
return {"key1": "value1", "key2": "value2"}
In this case, both "key1" and "key2" will be pushed to XCom and can be pulled by subsequent tasks.
Object Storage for XComs:
By default, Airflow stores XComs in its database, but for large data (e.g., large files, large outputs), this can be inefficient. Here’s where Object Storage (like S3, GCS, etc.) comes in! Airflow allows you to store large XComs in object storage, so you don’t overwhelm your database with huge data.
Here’s how you can set it up:
Configure the Backend: Change your configuration to store XComs in object storage when they exceed a certain size (like 1MB):
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://your-conn-id@your-bucket/key
xcom_objectstorage_threshold = 1048576 # Anything larger than 1MB goes to object storage
xcom_objectstorage_compression = gzip # Optional compression for large XComs
This means any XCom larger than 1MB will be stored in S3, and optionally compressed with gzip.
Compress Data: You can also set a compression type for large XComs. Supported compression methods include
gzip,zip, orsnappy(you’ll need to installpython-snappyforsnappycompression).
Custom XCom Backends:
If the default XCom system doesn't quite fit your needs, you can create your own custom XCom backend. This is super flexible! For example, if you want to store XComs in a different database or service, you can subclass the BaseXCom class.
You’ll need to override:
serialize_value(): To define how the data is serialized before storage.deserialize_value(): To define how to retrieve and convert the data back.
You can even override the orm_deserialize_value method to handle large XComs efficiently in the Airflow UI.
Example:
from airflow.models.xcom import BaseXCom
class MyCustomXComBackend(BaseXCom):
def serialize_value(self, value):
# Custom serialization logic
return value
def deserialize_value(self, value):
# Custom deserialization logic
return value
Then, you have to configure Airflow to use this backend:
[core]
xcom_backend = my_module.MyCustomXComBackend
Verifying Custom XCom Backend:
If you're running Airflow in containerized environments like Docker or Kubernetes, it might be tricky to confirm if your custom XCom backend is set up correctly. But you can check easily by printing the XCom class like that:
from airflow.models.xcom import XCom
print(XCom.__name__) # This will show the class name of your custom XCom backend
Key Takeaways:
XComs are like messengers for tasks to exchange small amounts of data.
You can push and pull data between tasks using
xcom_pushandxcom_pull.Use XComs in templates for dynamic data insertion in tasks like SQL queries.
For large XComs, Airflow supports storing them in object storage (S3, GCS, etc.).
You can create your own custom XCom backend for specialized data storage and retrieval needs.
Harnessing Airflow's Power for Scalable and Secure Workflows
In this article, we've uncovered the critical building blocks of Apache Airflow that empower you to create highly efficient, scalable, and secure workflows. From the flexibility of Executors, allowing for optimal task execution strategies, to the customization of Authentication Managers for tailored security needs, Airflow offers an unparalleled toolkit for workflow orchestration. We also explored how leveraging Object Storage and XComs enhances your ability to manage large datasets and enable communication across tasks.
Mastering these components ensures that you can build workflows that are both high-performing and resilient, adapting to any infrastructure or operational requirement. Airflow's power is truly unlocked when you understand how these elements come together to form a cohesive, flexible system.
In our next article, we'll take a closer look at Airflow’s internal architecture. We'll break down how Airflow’s components interact, explore its core design principles, and show how these architectural decisions enable scalable and efficient workflow orchestration. Stay tuned as we dive deeper into the heart of Airflow’s architecture and discover the driving forces behind its success! 🏗️


