, , , ,

New to PySpark? Do this, not that.

Photo by Aziz Acharki on Unsplash

Do this, not that. Well, I’ve got my own list. With everyone jumping on the PySpark / Databricks / EMR / Glue / Whatever bandwagon I thought it was long overdue for a post on what to do, and not to do when working with Spark / PySpark. I take the pragmatic approach to working with Spark, it’s honestly very forgiving well and far into the 10s of TBs of data. Once you wander past that point things tend to get a little spicy if you don’t have it all dialed in. As with most things in life if you get a few things right, and of course don’t do some things, that will get you a long way, the same applies to Spark.

Do this, not that … Spark style.

I want to keep this simple and to the point because that’s the best way to keep your PySpark running hot. People get weird when it comes to Big Data sometimes, trying to be cool or whatever, don’t do that with Spark, just keep it real. Use the standard library of functions, look around and see what features it has to offer, and resist the temptation to go outside of Spark to solve a problem because you probably don’t need to 99% of the time.

I’ve found there is a wide range of Spark developers.

  • Beginners ( treat it like Pandas, don’t understand the distributed concepts )
  • Intermediate ( daily use, comfortable and understand functions )
  • Expert ( able to use the Spark UI and understand query execution and plans, optimizations, etc. )

Contrary to what you read and see every day, it is probably a waste of your time to dream of being that Spark expert who can open up the UI, dig into the query and execution plans and be a general genius where all who meet you bow at your feet. Why? Because you probably need a few years of just being a good intermediate Spark developer where you learn most all the standard functions, and start to understand concepts like partitions, clusters, and the few basic knobs you can touch without ending the world before you try to go all in.

Why do I say this? Because I’ve been using Spark for years, crunching hundreds of TBs of data, and almost all the problems can be solved and overcome by simply knowing what Spark has available to you out of the box, and putting basic concepts and good practices in place.

With all my complaining out of the way, I suppose you can get to the list of do this, don’t do that Spark style. Not all of my tips just do this to make it faster tips, they are, do this … because it makes your life easier.

Encapsulate your code in functions.

Reusability, readability, sanity, unit testability, etc.

Photo by Dawid Zawiła on Unsplash

Do This.

import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession


def sample_transform(input_df: DataFrame) -> DataFrame:
    inter_df = input_df.where(input_df['that_column'] == \
                              F.lit('hobbit')).groupBy('another_column').agg(F.sum('yet_another').alias('new_column'))
    output_df = inter_df.select('another_column', 'new_column', \
                                F.when(F.col('new_column') > 10, 'yes').otherwise('no').alias('indicator')).where(
                F.col('indicator') == F.lit('yes'))
    return output_df

def main() -> None:
    spark = SparkSession.builder.appName('SampleTransform').enableHiveSupport().getOrCreate()
    logging.basicConfig(format='%(asctime)s %(levelname)s - Sample Transform - %(message)s', level=logging.INFO)
    logging.getLogger().setLevel(logging.INFO)
    test_df = spark_session.createDataFrame(
        [
            ('hobbit', 'Samwise', 5),
            ('hobbit', 'Billbo', 50),
            ('hobbit', 'Billbo', 20),
            ('wizard', 'Gandalf', 1000)
        ],
        ['that_column', 'another_column', 'yet_another']
    )
    new_df = sample_transform(test_df)

if __name__ == '__main__':
    main()

Not That.

import pyspark.sql.functions as F
from pyspark.sql import DataFrame, SparkSession

spark = SparkSession.builder.appName('SampleTransform').enableHiveSupport().getOrCreate()
logging.basicConfig(format='%(asctime)s %(levelname)s - Sample Transform - %(message)s', level=logging.INFO)
logging.getLogger().setLevel(logging.INFO)
test_df = spark_session.createDataFrame(
        [
            ('hobbit', 'Samwise', 5),
            ('hobbit', 'Billbo', 50),
            ('hobbit', 'Billbo', 20),
            ('wizard', 'Gandalf', 1000)
        ],
        ['that_column', 'another_column', 'yet_another']
    )
inter_df = test_df.where(input_df['that_column'] == F.lit('hobbit')).groupBy('another_column').agg(F.sum('yet_another').alias('new_column'))
output_df = inter_df.select('another_column', 'new_column', \
                                F.when(F.col('new_column') > 10, 'yes').otherwise('no').alias('indicator')).where(
                F.col('indicator') == F.lit('yes'))

Use That Which Spark Gives You, you Knave.

Spark can do that my friend, no need for your UDF or custom code.

Photo by Joshua Earle on Unsplash

Do This.

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F

# df = spark.createDataFrame([('s3://bucket/2022/01/01',)],['file_uri'])

def pull_string(input_df: DataFrame) -> DataFrame:
    output_df = input_df.withColumn('file_date', F.to_date(F.col('file_uri').substr(13, 10), 'yyyy/MM/dd'))
    return output_df

Not That.

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.functions as F

# df = spark.createDataFrame([('s3://bucket/2022/01/01',)],['file_uri'])

def pull_date(s: str) -> str:
    return s[12:22]

pull_date_Udf = F.udf(pull_date)

def pull_string(input_df: DataFrame) -> DataFrame:
    output_df = input_df.withColumn('file_date', F.to_date(pull_date_Udf('file_uri'), 'yyyy/MM/dd'))
    return output_df

Take Only What You Need.

Photo by Thomas Griesbeck on Unsplash

I’m fairly certain your mother told you this a time or two. So does Spark.

Do This.

df = spark.read.parquet('s3://my-bucket-is-better-than-yours/data/2022/*/*').select('column_1', 'column_4', 'column_10')

Not That.

df = spark.read.parquet('s3://my-bucket-is-better-than-yours/data/2022/*/*')

Filter Early and Often.

Photo by Tyler Nix on Unsplash

Cut early and often.

Do This.

df = filter_records(input)
df2 = some_transform(df)
df3 = another_transform(df2)
df4 = another_transform(df3)

Not That.


df = some_transform(input)
df2 = another_transform(df)
df3 = another_transform(df2)
d4 = filter_records(df3)

cache() for reuse of DataFrames.

Photo by Wells Hall on Unsplash

Spark lazy, like you. Understand when to cache() or persist() a dataset.

Do This.

df = spark.read.parquet('/some/data/set/*')
df.cache()
thing_1 = do_thing_1(df)
thing_4 = do_thing_4(df)
thing_2 = do_thing_2(thing_1)
thing_3 = do_thing_3(thing_2)

Not That.

df = spark.read.parquet('/some/data/set/*')

thing_1 = do_thing_1(df)
thing_2 = do_thing_2(thing_1)
thing_3 = do_thing_3(thing_2)
thing_4 = do_thing_4(df)

Understand Your Cluster Size and spark.sql.shuffle.partitions

Photo by Amanda Jones on Unsplash

Don’t treat every single Spark Cluster the same, because they are not. Understand RAM and CPU available, set spark.sql.shuffle.partitions to match, at minimum.

Do This.

# Cluster = c4.8xlarge	36CPU	60Mem x 12 workers
spark = SparkSession.builder.appName('DingDong') \
        .config('spark.sql.shuffle.partitions', '500') \ # somwhere inbewteen 1-2 times the number of cores.
        .enableHiveSupport().getOrCreate()

Not That.

# Cluster = c4.8xlarge	36CPU	60Mem x 12 workers
spark = SparkSession.builder.appName('DingDong').enableHiveSupport().getOrCreate()

Musings

I know I’m probably on the lower end of the Spark smart folks, even after using Spark for years. Mostly because I just stick to the basics, I try to write clean and straightforward code, use only Spark functionality. I learn when to cache(), how to select what I need, filter often and early, understand my cluster size, and how that affects a few of the easy parameters to configure that don’t blow things up. Even with billions of records being processed, keeping it simple works 95% of the time. Spark is good, let it be good for you.

3 replies
  1. Saikat Dutta
    Saikat Dutta says:

    Are the code blocks right for “Filter Early and Often.”

    Seems like you might have got the DO’s and Don’ts reversed for this one?

    • Dane
      Dane says:

      It seems good to me, the DO part shows to filter the input data, then perform transformations, instead of DO NOT’s transform first and do the filters at the end.

      I.e. we have a large input table with dozens of columns but we need only a subset of those, and only the data from the last month. So, instead of doing the heavy work on the entire dataset, the idea is to limit it to the minimum data we need.

  2. Michael Jeffrey Gardner
    Michael Jeffrey Gardner says:

    Coming from pandas myself the best thing you can do is learn to chain you code vs doing frequent variable assignments like df1, df2, df3, etc.

    In terms of you comment of filtering often it really doesn’t matter as long as you don’t trigger an action that disrupts the lazy evaluation. The spark optimized plan will that is created will essentially do that for you as early as it can.

    Finally in terms of shuffle partitions, always make sure you are using spark 3.0 or above and have AQE turned. If you set the shuffle partitions to anything greater than the number of cores it will always coalesce the partitions down to the optimal number needed for the shuffle.

Comments are closed.