Intro to Apache Beam for Data Engineers

Apache Beam for Data Engineers.

What is this thing? What’s it good for? Who’s using it and why? That’s pretty much what I ask myself once a month when I actually see the name Apache Beam pop up in some feed I’m scrolling through. I figured it has to be legit to be Apache incubated, but I’ve never run across anyone in the wild using it yet. On the surface it appears to be semi-pointless since it runs on-top of other distributed systems like Spark, but I’m sure there is more to it. Today, I’m going to run through an overview of Apache Beam and then try installing and running some data through it, kick the tires as it were. And see if my mind changes about the pointless bit.

Overview of Apache Beam.

First thing I usually do is actually read the Wiki page about a product, many times it can be more forthcoming about the history and the product in general then the documentation itself. I noticed that apparently this project originated out of Google, which honestly raises my opinion about what I’m dealing with by a few notches. I’m a firm believer that GCP is the best cloud out there from a developer experience standpoint. Let’s continue on …. (code available on GitHub)

So what exactly is Beam anyways?

So what is it, what purpose does it serve?

“Apache Beam is an open source, unified model for defining both batch and streaming data-parallel processing pipelines.”

– Apache Beam website.

Honestly, I don’t think this description is very helpful and might give you the wrong impression, as it did for me. I mean unless you know what “unified” means, you just assume this is a replacement for Spark, which it isn’t. How would I describe it?

“Apache Beam is to distributed data pipelines, what Apache Airflow is to every-day pipelines…. sorta.”

– Me

Beam is a “model” … aka a programming interface/API for defining distributed data pipelines. Unlike Spark/Hadoop/Kafka/Presto… it isn’t a cluster computing framework. Beam doesn’t run on a cluster, it runs on-top of an existing distributed compute environment. I’m thinking of Beam like I do an Airflow DAG, it’s way for me to define in an approachable and coherent way, a ETL pipeline. Beam will do the work of translating my pipeline into work on the backend cluster of my choice.

It kinda begs the question of why? At least with Airflow I get a nice UI and it can actually execute the workloads for me as well. Does it just make writing distributed ETL easier? I mean a PySpark script isn’t exactly rocket science either…. I guess we will find out when we try it out.

Apache Beam Concepts

  • Runners – the backend of the Beam pipelines (Spark, Flink, etc.)
    • Defined in execution of the config with Pipeline.
  • Pipeline – describes the entire data flow from end to end.
    • Abstraction of the entire end to end ETL and data-flow.
  • PCollection – the distributed dataset/data that the pipeline will be working on.
    • Immutable, unbounded or bounded in size, in memory or not, can be distributed by nature (split into records or elements).
  • PTransform – performing an operation/transform upon the data… aka the PCollection.
    • Use apply() method to trigger the transform against the PCollection and return a new PCollection.

At a very high level, that is pretty much it. Of course things get more complicated as you dive into each topic.

You will have to define a Runner or backend system you are planning on running your Beam pipeline on. Your Pipeline will just be written in your SDK of choice, Python, Java whatever, it’s the code that’s going to determine what happens. Your Pipeline will probably read in some PCollection at the beginning, act upon that PCollection with one or more PTransforms, and output some final PCollection to a data sink.

Probably the best way to dig deeper is just to try it out!

Installing/Setting up Apache Beam

I already have a running HDFS/Spark cluster from my previous excursions. This will act as the backend Runner for my Beam pipeline.

So, I also already have Python3 installed as well as PIP, you will need both.

pip3 install apache-beam

I have to say, it’s nice when a pip install finishes and doesn’t die with a bunch of errors…. eeehmmm (Kafka). So knowing that I want to run Beam on-top of my already running Spark cluster… I was little caught off guard at this next part. This wasn’t covered will in most of the documentation, but I figured it out eventually.

Gotcha of using Beam with Spark Cluster. (Spark Runner)

I found the setup to using Spark as a backend runner to complicated for my first foray into Beam. But here are some things I learned about trying to use Spark with Beam.

  • You will need Docker running on your master/executor. It will run the… JobService

“The JobService will create a Spark job for the pipeline and execute the job…”

docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://localhost:7077
// output
20/11/16 01:51:17 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: ArtifactStagingService started on localhost:8098
20/11/16 01:51:17 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: Java ExpansionService started on localhost:8097
20/11/16 01:51:17 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: JobService started on localhost:8099
20/11/16 01:51:17 INFO org.apache.beam.runners.jobsubmission.JobServerDriver: Job server now running, terminate with Ctrl+C

It wasn’t bad, just surprising, I can see keeping this running and connected to a production Spark cluster might be a little strange. Also there are a number of other configurations that have to be passed to the Beam command, things like environment_type, job_endpoint (spark uri), runner, etc.

But I’m going to skip all that and use the default DirectRunner, meaning the machine I’m running the Beam commands on.

Writing your first Beam Python script.

Ok, so let’s review and talk a few basics.

  • You will need to create a PCollection … the distributed dataset being working on …. maybe a file(s)?
  • When working on your pipeline think about what you want to do to each element of the PCollection.
  • What you do to each element is called a PTransform. Your PTransform will take a PCollection and return a new PCollection.
  • You can write your own PTransform or use out of the box PTransform.
    • ParDo
    • GroupByKey
    • CoGroupByKey
    • Combine
    • Flatten
    • Partition

Blah, blah, let’s just try something. I have another Divvy trips csv downloaded. Let’s just read in the CSV file and split each element on the comma. Here is a sample of what my PCollection .. aka … csv file contains. I want to maybe count the number of each rideable_type?

ride_id,rideable_type,started_at,ended_at,start_station_name,start_station_id,end_station_name,end_station_id,start_lat,start_lng,end_lat,end_lng,member_casual
762198876D69004D,docked_bike,2020-07-09 15:22:02,2020-07-09 15:25:52,Ritchie Ct & Banks St,180,Wells St & Evergreen Ave,291,41.906866,-87.626217,41.906724,-87.63483,member

So here is the Beam pipeline.

from __future__ import absolute_import
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
import argparse
from apache_beam.io import WriteToText, ReadFromText


class SplitRecords(beam.DoFn):
    """Spilt the element into records, return rideable_type record."""
    def process(self, element):
        records = element.split(",")
        return [records[1]]

argv = None
parser = argparse.ArgumentParser()
parser.add_argument(
  '--input',
  dest='input',
  help='Input file(s) to process.')
parser.add_argument(
  '--output',
  dest='output',
  required=True,
  help='Output file to write results to.')

opts, pipeline_args = parser.parse_known_args(argv)

pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True

p = beam.Pipeline(options=pipeline_options)
lines = (p | ReadFromText(opts.input, skip_header_lines=1))
records = (lines | beam.ParDo(SplitRecords()))
groups = (records | beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
groups | 'Write' >> WriteToText(opts.output)
p.run()
python3 my_first_beam_pipeline.py --output output.txt --input 202007-divvy-tripdata.csv

And here is the output…

('docked_bike', 549545)
('electric_bike', 1935)

Well… it worked so that’s one thing.

Let’s talk about it.

So I’m not going to talk much about the arguments… that’s pretty straight forward. In this case just getting the input and output files for the pipeline. It’s just important to note that I’m using the arguments to built the PipelineOptions() for the Beam pipeline.

pipeline_options = PipelineOptions(pipeline_args)

And then of course instantiate the pipeline itself is pretty straight forward.

p = beam.Pipeline(options=pipeline_options)

The next lines are nothing special but are really how Beam works. Each line below would be considered a PCollection. And each line is applying a PTransform to the previous PCollection. I’m using a collection of transforms in my code… some out of the box transforms like CombinePerKey, and my own ParDo (“Beam transform for generic parallel processing”).

lines = (p | ReadFromText(opts.input, skip_header_lines=1))
records = (lines | beam.ParDo(SplitRecords()))
groups = (records | beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))
groups | 'Write' >> WriteToText(opts.output)

In the words of Gollum …. What’s a ParDo precious???

This was a word that comes up all the time with Beam… ParDo. Just a strange name for….

ParDo is a Beam transform for generic parallel processing. The ParDo processing paradigm is similar to the “Map” phase of a Map/Shuffle/Reduce-style algorithm”

Docs

Here is my ParDo…

class SplitRecords(beam.DoFn):
    """Spilt the element into records, return rideable_type record."""
    def process(self, element):
        records = element.split(",")
        return [records[1]]

So basically this function get’s applied to each element in a distributed data collection .. PCollection.

What each PTransform looks like.

records = (lines | beam.ParDo(SplitRecords()))

So records is going to be the new output PCollection, ( ) is going to encapsulate the PCollection. The pipe | is telling Beam to use the previously produced lines PCollection, and apply a PTransform, in this case my SplitRecords(). You can also chain them together as needed, as below.

groups = (records | beam.Map(lambda x: (x, 1)) | beam.CombinePerKey(sum))

What is happening above is that records is a PCollection of just rideable_type elements. I’m then just adding a 1 next to each element so I can sum them using the Beam provided beam.CombinePerKey(sum). That gave me the count per rideable_type.

('docked_bike', 549545)
('electric_bike', 1935)

Thoughts on Apache Beam

I’m a little torn on this one. Honestly… it seems like a nice tool. I still think it harkens a lot to Airflow DAGS, just a nice way to write approachable pipelines. I mean it is extra nice in this case that the pipelines are for running on huge datasets in a distributed environment.

But…. to actually do that… run a Beam pipeline on a big data set you will need a pre-existing cluster to do so. Could be Spark, Flink, maybe GCP Dataflow… and here is what I don’t get. Why would I use Beam when I could just write a PySpark script to do the work if it’s going to run on a Spark cluster anyways? It doesn’t seem any more or less complicated than a Spark script.

I haven’t figured that part out yet. I would love to hear from people who have used Apache Beam in production, what problems did it solve, what do you love about it? It’s probably just because this is my first interaction with Beam, and I’m sure I’m only scratching the surface. What am I missing?