, ,

2 Useful PySpark Functions

I’ve come to have a great love for PySpark, it’s such an easy and powerful tool to use. I use it every day to crunch tens to hundreds of terabytes of data, without even blinking an eye. And all this with the ease of Python, it’s almost too good to be true. I have to say though, where things get a little dicey is when you need to do something maybe “out-the-box”, say, strange text manipulations, something that is easy in Python becomes a challenge in PySpark using DataFrame API functionality.

Sure, you could use a udf written in Python for that, but we all know the performance penalty for that. Many times I just try to get creative with a combination of PySpark functions to accomplish the same task others would use a udf for.

I want to talk about two wonderful PySpark functions I find myself using a lot, they come in handy and I rarely see them used, hopefully, they come in handy for you!

regexp_extract

I’m not sure if I could function (pun intended) as a data engineer without the regexp_extract PySpark function. It’s basically a function that allows you to pull a string between two other strings. This is a fairly common problem to solve in Python and other languages.

The annoying part about the regexp_extract PySpark method is that uses Java regex pattern matching. Is there honestly anything worse then regex?

The PySpark regex_extract() method takes three arguments …

  • input string or column
  • regex expression
  • the groupt to return (0 for the first match)

Let’s look at an example. It’s common to pull dates out of file names, so let’s tackle that problem with regex_extract()

import pyspark.sql.functions as F

df = spark.createDataFrame([('s3://some-bucket/my_folder/file_2021-12-01_part.parquet',)],['file_uri'])
>> df.head()
Row(file_uri='s3://some-bucket/my_folder/file_2021-12-01_part.parquet')

transformed = df.withColumn('date', F.to_date(F.regexp_extract(F.col('file_uri'),'(?<=file_).*(?=_part)', 0), 'yyyy-MM-dd'))
>> transformed.head()
Row(file_uri='s3://some-bucket/my_folder/file_2021-12-01_part.parquet', date=datetime.date(2021, 12, 1))

The heart of what we are doing is here …

transformed = df.withColumn('date', F.to_date(
                                               F.regexp_extract(F.col('file_uri'),'(?<=file_).*(?=_part)', 0)
                                          , 'yyyy-MM-dd')
                              )

If you inspect the F.regexp_extract() call you can see we first pass in the string, which in our case is the column F.col('file_uri').

Second, we pass in our regex pattern matching string … '(?<=file_).*(?=_part)'.

I know it looks like little cryptic but we are looking for the string this is greater (?<=) then file_ and before _part.

I’m then using that value with the to_date() function to actually turn it into a valid date.

>> transformed.show()
+--------------------+----------+
|            file_uri|      date|
+--------------------+----------+
|s3://some-bucket/...|2021-12-01|
+--------------------+----------+

expr

Another useful PySpark function is the expr or expression function from the SQL API.

I enjoy this method because it allows you to run transformations more like you would if you were writing SQL, without going full tilt and using the spark.sql() feature.

There is also a feature or two when using expr that makes possible something that is otherwise nearly annoyingly impossible in PySpark.

Let’s take for example the simple substring method. Typically this substring method takes a string, position, and length. But what if you just want to start a specific string and grab to the end of the string?

Example.

import pyspark.sql.functions as F

df = spark.createDataFrame([('junk junk 123 APPROVED',)],['response_description'])
>> df.head()
Row(response_description='junk junk 123 APPROVED')

transformed = df.withColumn('response_code', F.expr("SUBSTRING(response_description, 14, LENGTH(response_description))"))
transformed.head()
Row(response_description='junk junk 123 APPROVED', response_code=' APPROVED')

Here we simply call F.expr() with our string expression.

We SUBSTRING the column response_description starting from the 14th position to the LENGTH of the response_description column. It’s simple, concise, easy to read, and flows well.

The sky is the limit with the expr method.

Musings

I think there are few things in life as fun as writing PySpark code, especially when you promise yourself you will not simply rely on writing udfs for everything. I’ve found in my experience writing Spark that using too many udfs causes a number of problems …

  • poor performance
  • code illegibilty and readability (when too many UDF’s are used)
  • you don’t learn Spark’s built in functionality very well
  • it isn’t as challenging

Learning how to solve the problems you have with the tools at hand makes you a better PySpark developer, you learn more about what built-in functions and methods are available. regex_extract and expr are two very powerful functions that solve problems that are common to most people writing Spark transforms, but for some reason, I don’t see them used much.

Are there PySpark functions and method you find yourself using and love a lot? Share them in the comments, help each other out!