, , , ,

Intro to Spark ML Pipelines for Data Engineers

Don’t you like stuff for free? Don’t you like it when stuff I just handed to you? I mean when is that last time you didn’t want to get a free t-shirt. How about 20 bucks in the mail from you Grandma? That’s kinda what Pipelines are in Spark ML. The Apache Spark ML library is probably one of the easiest ways to get started into Machine Learning. Leaving all the fancy stuff to the Data Scientist is fine, Data Engineers are more interested in the end-to-end. The Pipeline, and the Spark ML API’s provide a straight froward path to building ML Pipelines that lower the bar for entry into ML. So, set right up, come get your free ML Pipeline.

Overview of Spark ML

Of course Apache Spark needs no introduction, but I’m guessing the Spark ML library might. The popularity of Spark is exploding and it seems to be the tool of choice for big data processing, but the adoption of Spark ML itself hasn’t kept up with the rest of the pack. Spark ML provides a great offering of out of the box ML model methodologies that are used most often…. classification, regression, clustering etc.

Why Spark ML?

Well, mostly because it’s a great high-level set of API’s that lowers the bar and makes it easier to develop Machine Learning models and Pipelines in a straight forward manner. If you’ve worked in or around ML for any amount of time, in the real world, you quickly learn that 80% of the work and complexity usually is around being able to productionize the model and all the work that goes into the data BEFORE you even train a model.

Enter Spark ML Pipelines.

Spark ML Pipelines

I think the best place to start with Spark ML is to start with the concept of a Pipeline, and yes a Pipeline is an actual part of the code.

“A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.”

Spark ML documentation

Now we will get into Transformers and Estimatiors later, but what you really need to know is that a Pipeline is the actual Machine Learning workflow. This is what makes it such a powerful and great tool to use when tackling ML problems. And of course it solves some of the major problems mentioned above that plague many projects, productionization and data prep work.

So basically Pipeline in Spark ML is way for you to define your ETL pipeline to create a Machine Learning model, this solves a few problems.

  • it takes away the ambiguity of actually having a well defined ETL pipeline for your ML project.
  • it solves 80% of the problem of “productionizing” your pipeline because it’s Spark, it will scale to infinity, and since it’s well defined, the code is probably better then home-baked stuff.

Spark ML Pipeline Example

But enough babbling, let’s walk through a simple example to get a better understanding of what we are dealing with. Code on github.

from pyspark.sql import *
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark import keyword_only


class CustomTransform(Transformer):
    @keyword_only
    def __init__(self):
        """Initialize."""
        super(CustomTransform, self).__init__()

    def _transform(self, dataframe):
        df = dataframe.drop(*['start_station_name', 'end_station_name', 'started_at', 'ended_at'])
        df = df.filter(df.start_station_id.isNotNull())
        df = df.filter(df.end_station_id.isNotNull())
        df = df.withColumn("end_station_id", df["end_station_id"].cast("int"))
        return df


def read_input_files(local_files: str):
    df = spark.read.csv(f'{local_files}', recursiveFileLookup=True, header=True)
    return df

columns_to_drop = ['start_station_name', 'end_station_name', 'started_at', 'ended_at']
features = ["rideable_type", "start_station_id", "member_casual"]

spark = SparkSession.builder.appName("my ml pipeline").master("local[3]").getOrCreate()
df = read_input_files('*.csv')

steps = [CustomTransform()]
indexer_steps = [StringIndexer(inputCol=column, outputCol=f"{column}_index") for column in features]
steps.extend(indexer_steps)


assembler = VectorAssembler(
    inputCols=["rideable_type_index", "start_station_id_index", "member_casual_index"],
    outputCol="features")
steps.extend([assembler])


model_ready_data_pipeline = Pipeline(stages=steps)

output = model_ready_data_pipeline.fit(df).transform(df)


lr = LogisticRegression(featuresCol = 'features', labelCol = 'end_station_id', maxIter=20)
(trainingData, testData) = output.randomSplit([0.7, 0.3], seed = 100)
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(output)
predictions = model.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="end_station_id", predictionCol="prediction")
evaluator.evaluate(predictions)
print(predictions.head(100))

Spark ML Transform … what makes the Pipeline go!

Let’s step back for a second before we dive in. To make this easy let’s ask the question… What am I looking for?

  • I want a Pipeline that will take raw data and output model ready data…. data ready to be put into a ML Model, and to create the model.

Making a basic Spark ML Pipeline is really a straight forward processes… it’s hard to mess up. If you’ve seen pySpark scripts before most everything will look familiar, with only a few new pieces. Of course you will see the creation of the Pipeline itself … nothing earth shattering here…

model_ready_data_pipeline = Pipeline(stages=steps)

Of course it’s really what’s going on inside the Pipeline that matters… the steps. Let’s dig in. I mentioned before that the Pipeline is usually just a set of Transformers or Estimators all changed together. Let’s start with Transformers.

Transformers are easy, simply put the take a DataFrame as in input, and output a DataFrame. Transformers used in your Spark ML Pipeline can be either…

  • a custom Transform written by you.
  • one of the many out-of-the-box Transforms provided by Spark.

In the above example I showed you one of each.

Custom PySpark Transform for Spark ML Pipeline.

Yes, custom Transforms can get a little complicated depending on inputs and outputs, but at the basic level it just needs to take a DataFrame and return a DataFrame and implement a transform() method that will be called automatically by the Spark Pipeline. Here is my example.

class CustomTransform(Transformer):
    @keyword_only
    def __init__(self):
        """Initialize."""
        super(CustomTransform, self).__init__()

    def _transform(self, dataframe):
        df = dataframe.drop(*['start_station_name', 'end_station_name', 'started_at', 'ended_at'])
        df = df.filter(df.start_station_id.isNotNull())
        df = df.filter(df.end_station_id.isNotNull())
        df = df.withColumn("end_station_id", df["end_station_id"].cast("int"))
        return df

Not too hard eh? My CustomTransform just inherits and super inits’ the Spark Transfomer class, and of course implements a _transform method that does that work I want to my DataFrame.

Out of the box Transforms in your PySpark ML Pipeline

There are many Spark provided methods that that take a DataFrame and return a DataFrame, I used two of them VectorAssembler and StringIndexer. Let’s look at one of them.

assembler = VectorAssembler(
    inputCols=["rideable_type_index", "start_station_id_index", "member_casual_index"],
    outputCol="features")

A VectorAssembler in Spark is a very basic Transform, it takes a list of input columns, and appends a column to the output DataFrame that is a vector of all the input columns (smash input columns into single output column).

I have a bunch of Transforms … now what?

This is an even easier part. Now that you have your set of Transforms, built by yourself or otherwise … just stick em’ in a list … in the order you want them run. In my example you can see me creating a list with my CustomTransform and extending the list thereafter until I have a complete list of steps I would like to add to my model ready data Pipeline.

steps = [CustomTransform()]
indexer_steps = [StringIndexer(inputCol=column, outputCol=f"{column}_index") for column in features]
steps.extend(indexer_steps)


assembler = VectorAssembler(
    inputCols=["rideable_type_index", "start_station_id_index", "member_casual_index"],
    outputCol="features")
steps.extend([assembler])


model_ready_data_pipeline = Pipeline(stages=steps)

Of course calling the below code will give me a DataFrame called output that is my model ready data all Transformed with my steps. Ready to go!

output = model_ready_data_pipeline.fit(df).transform(df)

Training and Models and using PySpark ML Pipelines

This all turns out to be fairly easy too, once you’ve gotten the hang of Spark ML Pipelines and using Transforms to get all your data ready to go into a model, or creating a model. Remember at the beginning when we talked about Transforms and Estimators are what make up a Spark ML Pipeline? Here is a very simple Estimator example that is almost exactly like what we did before with Transforms.

The only difference here is that a Estimator must have a fit() method. This fit() method is going to produce a model … which is actually a Transform itself (because you can load a model and call transform() on it. Trippy I know. I think the documentation says it well….

An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

PySpark ML documentation.

It will make sense when you see my simple example.

lr = LogisticRegression(featuresCol = 'features', labelCol = 'end_station_id', maxIter=20)
(trainingData, testData) = output.randomSplit([0.7, 0.3], seed = 100)
pipeline = Pipeline(stages=[lr])
model = pipeline.fit(output)
predictions = model.transform(testData)

evaluator = MulticlassClassificationEvaluator(labelCol="end_station_id", predictionCol="prediction")
evaluator.evaluate(predictions)
print(predictions.head(100))

As you can see lr = LogisticRegression is my definition of an Estimator. Then I can add my single Estimator to my Pipeline.

pipeline = Pipeline(stages=[lr])

Getting the model is easy now.

model = pipeline.fit(output)

Want some predictions off your model that is a Transformer? Call the transform() method of course! (remember… input DataFrame and get a DataFrame back…

predictions = model.transform(testData)

Musings

Well wasn’t that easy? Leave it to Spark to give us stuff for free, we all like free stuff, especially a free way to build awesome Machine Learning pipelines! As a Data Engineering I love PySpark ML Pipelines because they are so easy to use and they make it hard to build a ML pipeline incorrectly. Usually the code will just blow up.

It takes what can be a complex and complicated topic and problem and breaks it down into small steps that can just be passed to a Pipeline. They way they have implemented Transforms and Estimators is easy to follow and understand as well. And the best part? You can run your ML Pipeline at massive scale and it won’t break, it was made for big data!

Not many tools you can say that about.