Review of Prefect for Data Engineers

Not going to lie, I do enjoy the vendor wars that this marketing craze called “The Modern Data Stack” has created. I like to keep just about everything in life at arm’s length. Kinda like the way you look at your crazy third cousin out of the corner of your eye at the family reunion. I mean it’s nice to have all these options to choose from these days when building data pipelines.

One tool I haven’t been able to poke the tires on yet is Prefect. It appears to be another data orchestration tool for Python, but we shall find out. I want this to be an introduction to Prefect, we shall just try it out and let the chips fall where they may.

Introduction to Prefect.

It’s almost dizzying the pace at which new Python data pipeline orchestration tools pop up these days. Ever since the rise of Apache Airflow to the top of the pile, there is always someone trying to tear it down from its’ high tower. I suppose that is an inevitable part of life, but enough of the pontificating, let’s dig into Prefect.

“Prefect 1.0 makes it easy to build, test, and run dataflows right from your Python code. With an intuitive API and 50+ integrations, quickly turn even the most complex data pipelines into managed workflows.”

Prefect website

What is Prefect, and what’s it trying to do?

I mean, they take a few shots over the bow of Apache Airflow of course, but that’s no surprise. Prefect says in their documentation that they’ve “rebuilt data engineering” and is a “new workflow management system.” An “easy way to build, run, and monitor data pipelines at scale.” It appears on the surface to me that Prefect just wants to be the new Python data pipeline orchestration tool.

Off the cuff, I’m not sure what makes it different or better than something like Airflow, but I’m sure we will get there.

Prefect Concepts.

As far as I can tell initially by reading the documentation, Prefect has two major concepts, Workflow and Task. Workflows appear to contain one or more tasks, they handle the dependencies between different tasks. To me, it sounds like a Worfklow is similar to an Airflow DAG and a task is similar to the concept of a Operator inside a DAG.

  • Task
  • Workflow

Task

Tasks are apparently the building blocks of Prefect, they are conceptually like a “function.” They are the work-horses of Prefect and and aware of the upstream dependencies, it appears they can do data processing, or call externally to some other system etc. It basically sounds like an Airflow Operator, at least on the surface. I’m sure some crabby Prefect user will point out something else I don’t know about them.

Workflow

There is not much to say about a Workflow other than it appears to be similar to an Airflow DAG, a container that you can use to string your Tasks together any way you want.

Concepts of Note

Some other high-level concepts that are introduced in the docs, State , both tasks and containers produce states, which isn’t a surprise for any workflow tool. Prefect also contains a “Workflow Engine”, aka that’s what is going to run the Workflow and decide what action to take depending on what happens.

It sounds like Prefect’s State is a little more complex and extended than what you would be used to with Airflow for example, Prefect talks about things like being able to skip a task, or maybe even pause if you needed to.

Prefect vs Airflow

Not surprisingly Prefect spends a good amount of time explaining its differences, and of course, how it’s better than Airflow here. Some of these arguments are not surprising, some of them slightly justified but mostly just not that accurate. If Airflow was as bad as Prefect leads us to believe, places like AWS and GCP would not be offering it as a managed service. You don’t invest in a tool like that unless it’s a force to be reckoned with.

Prefect calls Airflow a “monolithic batch scheduler” and says that Airflow “never anticipated the rich variety of data applications that has emerged,” which is pretty much hogwash. Airflow has an incredible community and a massive amount of open source connectors for integrating into every system known to exist.

On a somewhat justified note Prefect complains about the rigidity of Airflow DAGs, their rigid outcomes, and the difficulty of DAG to DAG or task to task communication. These are well-known but easily overcome obstacles to anyone who’s worked with Airflow.

What’s the main way Prefect tries to overcome these inflexibilities?

“… you can even convert any Python function to a task with one line of Prefect code. Calling tasks on each other like functions builds the DAG in a natural, Pythonic way …”

– Prefect

Also, surprisingly, Prefect takes aim at Airflow’s scheduler, complains about the way it’s core to Airflow, how long it takes to run tasks, and other performance issues. I’m sure if you depend on your orchestrations scheduler to run tasks within milliseconds Prefect is probably better, but it begs the question if your needing latency like that, are you even using the right tools?

Trying out Prefect

Enough of the pontificating, let’s kick the tires a little and see what happens. Let’s make our first Prefect task.

from prefect import task
import csv


@task
def stream_file(file_uri: str) -> iter:
    with open(file_uri) as f:
        rdr = csv.reader(f, delimiter=',')
        for row in rdr:
            yield row

Easy enough, import prefect and use the @task decorator to make our task that streams records from a CSV file. We will also need a Flow to be able to run our task inside.

” flows are used to describe the dependencies between tasks”

Prefect
from prefect import task, Flow
import csv


@task
def stream_file(file_uri: str) -> iter:
    with open(file_uri) as f:
        rdr = csv.reader(f, delimiter=',')
        for row in rdr:
            yield row


if __name__ == '__main__':
    with Flow("CSV Flow") as flow:
        trips = stream_file('data/202201-divvy-tripdata.csv')
        for trip in trips:
            print(trip)
            
    state = flow.run()
    assert state.is_successful()

Yikes, got my first error.

Traceback (most recent call last):
  File "/Users/danielbeach/PycharmProjects/PrefectIntroduction/main.py", line 16, in <module>
    for trip in trips:
  File "/Users/danielbeach/PycharmProjects/PrefectIntroduction/venvrm/lib/python3.8/site-packages/prefect/core/task.py", line 1034, in __iter__
    raise TypeError(
TypeError: Task is not iterable. If your task returns multiple results, pass `nout` to the task decorator/constructor, or provide a `Tuple` return-type annotation to your task.

Apparently, Prefect can’t handle my task returning a generator, aka yield without some more instruction. Well apparently this is a known issue, apparently our wonderfully flexible and Pythonic Perfect tasks can’t return a generator? Wait … I thought the whole point of this was that you could write Python code and with one line turn your functions into Python tasks and workflows, making it super flexible and the like?

I mean really … But, ok, whatever, let’s just modify our code to not use a generator. (Sure this might not seem like a big deal, but remember, I’m just trying to figure out what Prefect is all about, poke at their claims, see how easy it is to work with, the fact that they want to tightly integrate with Python code but can’t handle a generator in a task??? Seems a little strange.)

from prefect import task, Flow
import csv


@task
def stream_file(file_uri: str) -> list:
    with open(file_uri) as f:
        rdr = csv.reader(f, delimiter=',')
        rows = [row for row in rdr]
    return rows

@task
def process_stream(trip: str):
    print(trip)


if __name__ == '__main__':
    with Flow("CSV Flow") as flow:
        trips = stream_file('data/202203-divvy-tripdata.csv')
        process_stream.map(trips)
    state = flow.run()
    assert state.is_successful()

Our second attempt seems to work fine, although to me it feels a little strange. I defined two tasks, one to pull the records from the CSV file, the other to print/process them. Since the Flow really just binds tasks together, I pull the trips trips = stream_file('data/202203-divvy-tripdata.csv') and then map them each to the process_stream task.

Feel’s a little Dask'y to me. Blah.

That is apparently no surprise as Prefect recommends Dask as the execution engine. This brings up another point, if this is the recommended engine … you’re either going to have to use Prefect Cloud or find some host, or host yourself, a Dask cluster. Uh … no thanks? Dask has its uses, but that little bugger unless you’re going to use a hosted solution is a little stinker and will give you nothing but headaches.

I executed the above code with Dask as follows.

from prefect.executors import DaskExecutor

if __name__ == '__main__':
    with Flow("CSV Flow") as flow:
        trips = stream_file('data/202203-divvy-tripdata.csv')
        process_stream.map(trips)
    executor = DaskExecutor(address="tcp://localhost:8786")
    flow.run(executor=executor)
    state = flow.run()
    assert state.is_successful()

I’m obviously only scratching the surface of Prefect here, but at least we can get an idea of what a pipeline using Prefect might look like.

Orchestration and the UI

One important piece of any pipeline system and tool for orchestration is a good UI. Fortunately, Prefect ships an open-source version of their UI, so you don’t need Prefect Cloud to try it out. Apparently as simple as locally running this command (requires Docker and docker-compose)

>> prefect backend server
>> prefect server start

This as expected will put a UI running on http://localhost:8080 , make sure you don’t already have a Postgres container running locally, as this will break the setup! This is where Prefect shines, it’s UI is impressive and clean, something you wish Airflow had.

I’m not going to dive much into the UI, it’s just clean nice, and gives a very analytical overview of your data pipelines, something that comes out of the box is hard and impossible to find. But here are a few steps I took in the UI.

I created a Project, the holder of Workflows.

Next, I register my Flow using the CLI.

 prefect register --project "Trip Data" --path main.py --name "CSV Flow"
Collecting flows...
/Users/danielbeach/PycharmProjects/PrefectIntroduction/main.py:22: UserWarning: Attempting to call `flow.run` during execution of flow file will lead to unexpected results.
  flow.run(executor=executor)
Processing 'main.py':
  Building `Local` storage...
  Registering 'CSV Flow'... Done
  └── ID: 2b244373-c651-4d99-9753-dd575e0af6be
  └── Version: 1
======================== 1 registered ========================

Prefect Integrations

Another area of concern for me when I’m looking into new pipeline tools is integrations. Part of Data Engineering these days requires stitching together many different services and technologies, GCP, AWS, Databricks, Snowflake, the list goes on. Sure, all of these services have their own Python APIs etc, but that’s not the point.

One of the features that make Airflow so popular is its large library of supplied Operators for every service and tech stack under the sun. We can’t ignore the fact that we developers are lazy, and we want it easy. The easier it is for a Data Engineer to integrate with third-party tools and services, the more likely they are going to be to use that tech for pipeline orchestration.

On their installation page Prefect lists the following add-ons when installing with pip.

all_extras: includes all of the optional dependencies
dev: tools for developing Prefect itself
templates: tools for working with string templates
viz: tools for visualizing Prefect flows
aws: tools for interacting with Amazon Web Services
azure: tools for interacting with Microsoft Azure
google: tools for interacting with Google Cloud Platform
kubernetes: tools for interacting with Kubernetes API objects
twitter: tools for interacting with the Twitter API
airtable: tools for interacting with the Airtable API
spacy: tools for building NLP pipelines using Spacy
redis: tools for interacting with a Redis database

Off the cuff, this is sorta disappointing because I don’t see anything about Databricks or Snowflake. It shows aws for example, how hard would it be to send a command to EMR for example?

But, never fear, it does appear Prefect does have modules for Databricks Jobs and more! It’s nice to see a great list of Tasks for different tools, this indicates a tool to be reckoned with, that it’s got some work behind it.

Musings

What are my thoughts on Prefect? From what I can tell in a few hours, it’s a well-put-together tool with some serious clout behind it. They didn’t shy away from making pretty much all the functionality you need open-source and free, although, of course, they would want you to use Prefect Cloud. The UI is impressive and truly a masterpiece, and put’s Airflow to shame a little. That being said I do have a few other thoughts.

The problem with tools like Prefect that focus so much on tight integrations with Python, and that ability to simply add a Decoration @task to turn functions into Tasks … is that it almost leaves almost too much open for interpretation. I can see how this tool would be used heavily by those using smaller datasets, where all your code transformation code is written in Python and you want to turn it straight into a data pipeline.

It’s similar to Dask in a sense, and would probably be nice for custom Python ML pipelines. On the other hand, if you are using your orchestration tool more as a pipeline and not to compute or transform data, then I’m not sure if Prefect is the tool for you. Part of what makes Airflow powerful and attractive is its simplicity and various community-supplied Operators for third-party tools, and no matter what anyone says, it makes it easy to stitch together a pipeline with all the dependencies and visuals you need.

I would personally say that Prefect is way better than Dagster, but I think that although they may be going after and trying to convert the Airflow user … this will probably only work if that Airflow user(s) is building Python compute-heavy complex data pipelines. Someone who is not afraid of going to deep into orchestration and distributed computing. It’s a sweet tool, but I think there is enough difference between Prefect and Airflow, or rather Prefect assumed that Airflow users want more complexity and that most Airflow users were using Python Operators, and not the hundreds of others that are available.