The 10 Commandments of Data Pipelines

When building data pipelines all day long, every day, every year, ad infinitum, suprisingly I have managed learn some things. You see the same problems with data pipelines many times over. Years ago it was SSIS (I’m sorry you still have to use it, it just isn’t cool enough anymore), now if it’s not Streaming it must be wrong (Insert eye roll). The technology and what’s hot is always changing, but the 10 Commandments of Data Pipelines never change.

What are the 10 Commandments of Data Pipelines that thou shalt not break? Glad you asked.

  1. Memory Usage
  2. File I/O
  3. Aysnc
  4. Dont Recreate the Wheel
  5. No Over Engineering
  6. SQL
  7. Data Storage
  8. Test
  9. Configuration
  10. Distribute

Let’s take a look at each one. Sorry if your commandment is missing, get your own blog, and your entitled to be wrong.

Thou Shalt Control Thy Memory Usage

I’m not sure if you noticed but there is a nifty thingy called a generator in Python. In todays Age of AWS one thing is true, memory = money. If you decide to ignore memory usage in your data pipelines someone is going to pay $4/hour for a 2TB instance because they are processing “big data.” Yes doing things in memory is fast and I highly recommend it whenever possible, but with great power comes great responsibility. Using generators for data pipelines will allow you to process large amounts of data in memory without the need for a monstrous machine.

#the old way
def read_my_data(lines):
    for line in lines:
        return line

#the better way
def read_my_data(lines):
    for line in lines:
        yield line

Thou Shalt Avoid File I/O

Hello! Are your data pipelines too fast, need to slow things down? For the low price of writing to disk your dreams can come true! While keeping in mind the first commandment, this one is probably even more important.

Writing data pipelines that are multi-staged or modularized makes it tempting to write data to disk. I think it just helps us grasp what’s happening to the data as it travels, or sometimes its just leftovers from testing when we are inspecting things. But reading in and out of disk is just not cool. Why do you think Spark took over MapReduce? Memory baby. Just dont forget the First Commandment. It’s called a File Like Object, it lives in memory and is your new best friend. StringIO and BytesIO. Remember our generator code from above? What if we wanted to create a file for use on down the line in our data pipeline

import io

#yes, it's as simple as that, and you didn't write the file to disk only to be read again later on.
file = io.StringIO("The text for a our file.")

Thou Shalt Use Async.

Arguably one of the greatest and most important features of Python 3 was Async. It also appears to be not well adopted in the wider community, hopefully that will change with time. That being said there are major speed-ups to be had with Async IO.

Without rehashing the GIL concept and the challenges of data sharing and overhead with multiprocressing, what can be done with a sleepy thread that’s wasting its life away doing nothing? Put it to work with Async. I guarantee after making your http calls async you will never go back. I wrote a blog post awhile ago where I used AsyncIO http calls.

loop = asyncio.get_event_loop()
future = asyncio.ensure_future(run())

Thou Shalt Not Recreate the Wheel

When it comes to data pipeline code less is more of course. When you write code that exists as a shipped package, function, or some complex class to track items when you could use Named Tuples, you’re being silly. Ever have a simple problem of reading a bunch of files in a directory? Its called iGlob. Need the current working directory? Its called os.getcwd(). Just remember before you spend a day writing some code, look for someone who’s already spent that day.

from glob import glob, iglob

for file in iglob('my/directory/*.csv'):
    do_something(file)

Thou Shalt Not Over Engineer

There is possibility nothing with longer lasting consequences than a data pipeline built to bolster a resume. As the landscape changes rapidly many new and exciting tech stacks arrive on the scene.

I’ve seen streaming services used for not other reason then because streams are cool. I’ve seen a dizzying amount of tech cobbled and webbed together. All this to write something to a database in the end! Remember to use only the tech and code that is needed to quickly move and transform the data in the most reliable and performant manner. You or someone else will have to configure, maintain, and fix the pipeline.

Thou Shalt Use SQL

I’m not sure when databases became uncool. Remember, there is always a place for relational databases, especially when working on sets of data. I find it funny that people who scoff at relational databases, love Spark (as do I) whos greatest feature is the SQL Api. SQL was designed to work on uniform sets of data, 50% of the data you work with in pipelines either comes from a relational database or is going there. Why pull the data out the database to transform it and put it back into a database?

I agree, but with some caveats that when dealing with terabytes of data a Postgres database might not be the best. I’ve seen 10’s of GBs of relational data stored in Dynamo (insert me laughing).

Thou Shalt Choose The Correct Data Storage

I love JSON and I use it every day, but dont abuse it. Dont write a beautiful data pipeline and then output a bunch of large JSON files that I will have to read to get what I want. There are a plethora of file types out there, like Parquet, that have revolutionized the way you can read and write data to disk. Understand your options and use them.

I know you’ve been using CSV files your whole life but there are other ways to solve the problem!

import pyarrow.parquet as pq

#tell me this is not better than reading a whole CSV file to memory.
my_columns = pq.read_table('test.parquet', columns=['column1','column10'])

Thou Shalt Test Thy Data Pipelines

This pretty much applies to any code but having tests written for data pipelines is crucial. You will receive unforeseen, new, mutated, broken data. What happens to your pipeline? How are errors handled, do things continue on? How is duplicate data dealt with? I would argue data pipelines are more prone to failure than your average piece of code that is running, because data is dirty.

If you have not at least attempted to write tests for your pipelines then they are weak, unreliable, unscalable pieces of code that should be buried. Not writing tests means you haven’t really thought through or designed your pipelines to be resilient. Give PyTest a try!!

def hello(x):
    return f'Hello, {x}'

def test_hello():
    assert hello('you') == 'Hello, you'

Thou Salt Make Your Pipelines Configurable

There is nothing worse then having to crack open data pipeline code to change the url of a RESTful call or a database connection string. Wouldn’t you rather just open up a JSON file and change what you need?

Not making your data pipelines configurable is always going to come back and haunt you. Part of the attraction of a well configured pipeline is that simply by reading the configuration files(s) will give great insight into the data sources, destinations, etc without reading a single line of code.

{
   "databases" :
    { "url" : "postgres://mydatabase",
      "username" : "my_user",
      "port" : "5432"
    }
}

Thou Shalt Distribute Your Data Pipelines

Of course not every problem can be solved with distribution, but many can. This is really a problem for big data vs not big data. For example, say you have 1-2 million files you need to process in a reasonable amount of time. You need a distributed system for this.

Luckily with th rise of systems like Spark and Dask, with tools like Docker and Kubernetes, distribution of computation is much more attainable then ever before. They can bring complexity, but turning to AWS Glue for example you get distributed code without touching or thinking about a cluster. Systems like Dask are simply pip installed, even Spark installation is not that hard.

pip install dask distributed --upgrade
from dask.distributed import Client
import dask.dataframe as dd

client = Client('45.33.54.XXX:8786')

df = dd.read_csv('./directory/home/*.csv')

There you have it, the 10 Commandments of Data Pipelines. Let me know if you have favorites or think I missed one… or not.