<img height="1" width="1" style="display:none" src="https://www.facebook.com/tr?id=1063935717132479&amp;ev=PageView&amp;noscript=1 https://www.facebook.com/tr?id=1063935717132479&amp;ev=PageView&amp;noscript=1 "> Bitovi Blog - UX and UI design, JavaScript and Front-end development
Loading

Backend |

Transitioning from Apache Airflow to Temporal

Learn how to transition from Apache Airflow to Temporal for better workflow orchestration. Instructions with code examples by Temporal consulting experts!

Emil Kais

Emil Kais

Twitter Reddit

Are you tired of relying on Directed Acyclic Graphs (DAGs) for your workflow orchestration? Consider transitioning from Apache Airflow to Temporal. With a focus on scalability and fault tolerance, Temporal is the perfect solution for complex workflows. Migrating from Airflow to Temporal can improve error handling and even solve looming tech debt.

In this post, we’ll go through a basic example in Apache Airflow and how to structure the same example in Temporal.

Note: This blog post assumes you have a Temporal server running, but if you aren’t sure how to start one, see the Temporal documentation here. Examples in this post are written in Python, as that’s the language supported by Apache Airflow.

Apache Airflow Project

Let’s go through the pieces of your DAG within Apache Airflow. Here’s the current DAG definition:

import random
from datetime import datetime, timedelta

from airflow.decorators import dag, task
from airflow.providers.postgres.hooks.postgres import PostgresHook


default_args = {
    'owner': 'EK',
    'retries': 5,
    'retry_delay': timedelta(minutes=5)
}

@dag(dag_id='our_sample_dag',
     default_args=default_args,
     start_date=datetime(2023, 21, 1),
     schedule_interval=None
    )

def long_process_method(results):
    for person_data in results:
        num_additional_hobbies = random.randint(1, 3)
        additional_hobbies = [f"Extra Hobby {i}" for i in range(1, num_additional_hobbies)]
        person_data["hobbies"].extend(additional_hobbies)

        hobby_skill_levels = {hobby: random.randint(1, 10) for hobby in person_data["hobbies"]}
        person_data["hobby_skill_levels"] = hobby_skill_levels

        person_data["has_pet"] = random.choice([True, False])

return results

def our_sample_dag():
    @task()
    def long_running_function():
        return [
            {"name": "Alice", "age": 25, "address": "123 Main St", "hobbies": ["Reading", "Painting"]},
            {"name": "Bob", "age": 30, "address": "456 Oak St", "hobbies": ["Cooking", "Gardening"]},
            {"name": "Charlie", "age": 22, "address": "789 Pine St", "hobbies": ["Swimming", "Coding"]},
            {"name": "David", "age": 28, "address": "101 Elm St", "hobbies": ["Playing Guitar", "Photography"]},
            {"name": "Eve", "age": 35, "address": "202 Birch St", "hobbies": ["Traveling", "Hiking"]}
        ]

    @task()
    def perform_second_long_task(results):
        massaged_data = long_process_method(results)
        return massaged_data

    sql_data = long_running_function()
    perform_second_long_task(sql_data)

complicated_dag = our_sample_dag()

The long_running_function may be your project's method to perform some SQL query and then return the results. There is also the long_process_method, which in your project might be how you massage the data back to your client. This is a simple use-case, but let’s focus in on what would happen if the DAG fails when attempting to run long_process_method.

Once this DAG fails, it will attempt to re-run the workflow, which would re-fetch this data by performing another SQL query. If there are hundreds (if not thousands) of operations being done on your database, errors that cause more refetches like these can bloat up other workflows and processes. In Temporal, the re-run or “retry” would continue after the SQL query was completed, so the query wouldn’t be recalled.

You may have other reasons for switching from Apache Airflow to Temporal, but let’s go into how you can port over your example code into Temporal.

Temporal Project

Your two methods, long_running_function and long_process_method, can be considered as activities in Temporal. An activity is a normal function or method that (ideally) performs a single well-defined action that can be non-deterministic. Let’s create an activities.py file and write the following code:

import random
from temporalio import activity

@activity.defn
def long_running_function():
    # let this be your complicated SQL statement here that returns the data
    return [
        {"name": "Alice", "age": 25, "address": "123 Main St", "hobbies": ["Reading", "Painting"]},
        {"name": "Bob", "age": 30, "address": "456 Oak St", "hobbies": ["Cooking", "Gardening"]},
        {"name": "Charlie", "age": 22, "address": "789 Pine St", "hobbies": ["Swimming", "Coding"]},
        {"name": "David", "age": 28, "address": "101 Elm St", "hobbies": ["Playing Guitar", "Photography"]},
        {"name": "Eve", "age": 35, "address": "202 Birch St", "hobbies": ["Traveling", "Hiking"]}
    ]

@activity.defn
def long_process_method(results):
    for person_data in results:
        # Add a random number of additional hobbies
        num_additional_hobbies = random.randint(1, 3)
        additional_hobbies = [f"Extra Hobby {i}" for i in range(1, num_additional_hobbies)]
        person_data["hobbies"].extend(additional_hobbies)

        # Add a random skill level to each hobby
        hobby_skill_levels = {hobby: random.randint(1, 10) for hobby in person_data["hobbies"]}
        person_data["hobby_skill_levels"] = hobby_skill_levels

        # Add a random boolean indicating if the person has a pet
        person_data["has_pet"] = random.choice([True, False])

    return results

Note: The @activity.defn decorator must be used on all activities you define.

Creating Your Temporal Workflow

In Temporal, a Workflow is a function that contains certain deterministic traits - which essentially have the same commands executed in the same sequence. This is so Temporal can leverage its Replay capabilities when an activity or workflow fails. The best part? When an activity fails, the state of the workflow is preserved so only that single activity will be run again, rather than having to re-run the entire workflow!

Your workflow method will follow a format similar to the our_sample_dag method defined before. Define this function in a file called workflows.py:

from datetime import timedelta
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from .activities import long_running_function, long_process_method

@workflow.defn
class OurSampleFunc:
    @workflow.run
    async def run (self):
        data = await workflow.execute_activity(
            long_running_function, schedule_to_close_timeout=timedelta(seconds=5)
        )

        return await workflow.execute_activity(
            long_process_method, data, schedule_to_close_timeout=timedelta(seconds=5)
        )

Above, you are defining a @workflow.defn - this decorator must be used when defining a workflow function. The code under the@workflow.run decorator is exactly what your workflow will do and in what order. In this scenario, you’re calling each of your activities that you defined earlier, but doing so in the specific order you want to have them executed.

Note: If your long_process_method fails, then the retry would not require a re-execution of long_running_function, so there wouldn’t be a need to perform a second SQL query or any other asynchronous task before that point in the workflow again.

Setting up the Worker

Now, you have to write the Worker logic that will be able to perform the workflows and activities defined earlier. Take a look at the code below:

import asyncio
import concurrent.futures
from temporalio.client import Client
from temporalio.worker import Worker

# Import the activity and workflow from our other files
from .activities import long_running_function, long_process_method
from .workflows import OurSampleFunc

async def main():
    client = await Client.connect("localhost:7233")

    # Run the worker
    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="my-task-queue",
            workflows=[OurSampleFunc],
            activities=[long_running_function, long_process_method],
            activity_executor=activity_executor,
        )
        await worker.run()

if __name__ == "__main__":
    asyncio.run(main())

From here you’re importing the activities and workflows defined in your activities.py and workflows.py before. When you create a Worker in Temporal, you are registering your workflows and activities you want it to be able to execute.

Assuming you have a Temporal server running on localhost, you can run the worker by executing:

python run_worker.py

This will initialize the worker to be ready to perform the work when you run your workflow. Let’s run an instance of your workflow.

Running a Workflow

import asyncio
from temporalio.client import Client

from .workflows import OurSampleFunc

async def main():
    # Create clent connected to server at the given address
    client = await Client.connect("localhost:7233")

    # execute workflow
    result = await client.execute_workflow(
        OurSampleFunc.run,
        id="my-workflow-id",
        task_queue="my-task-queue"
    )

    print(f"Result: {result}")

if __name__ == "__main__":
    asyncio.run(main())

If you have a single worker responsible for a specific task queue, the task_queue specified when starting a workflow should match the task_queue of that worker. Each workflow is required to have a unique id as well.

Once you have a worker running, you can run the following:

python run_workflow.py

From there you should see the formatted data result from the long_process_method.

Conclusion

There are conceptual and terminology differences that come with porting over code from Apache Airflow to Temporal. Namely the decoupling between Activities, Workflow definitions, Workflow executions, and Workers. One of the benefits of porting over to Temporal is Temporal’s built-in Replay functionality, which can reduce error handling code you may have to write in Apache Airflow.

Need more help? You’re in the right place

Looking to get a deeper dive into porting your project or workflows from Apache Airflow to Temporal We’re official Temporal partners specializing in Temporal Consulting and audits. Feel free to reach out to our department on our Community Discord for assistance with whatever you’re working on!

Join our Discord