, , , ,

PySpark SQLContext….tired of your decades old ETL process?

Seriously. Haven’t you had enough of SSIS, SAP Data Services, Informatica, blah blah blah? It’s been the same old ETL process for the last 20 years. CSV files appear somewhere, some poor old aged and angry Developer soul in a cubicle pulls up the same old GUI ETL tool, maps a bunch of columns to some SQL Server, if you’re in a forward thinking shop…maybe Postgres. This is after painstakingly designing the Data Warehouse with good ole’ Kimball in mind. Data flows from some staging table to some facts and dimensions. Eventually some SQL queries are run and a Data Mart is produced summarizing a years worth of data for a crabby Sales or Product department. Brings a tear to my eye. And this is all because Apache Spark sounds scary to some people?

Where ETL is today

Anyone who has worked around ETL, Data Warehousing, Data Developer land knows that things haven’t really changed for 20 years in 90%+ of companies. There are data teams everywhere that are stuck dragging and dropping connections to dump some sort of flat file into some database schema after minor transformations. If someone working in that shop is above average you will have some wonderful Kimball designed Data Warehouse on SQL Server with facts and dimensions. All done to make the queries of “Big Data” efficient and fast.

Write some SQL, and there you have it a great Data Mart with summarized data from some number of records. The warehouse data still looks remarkably like the raw data. Sure, some data from other sources are there, joins are done. It does beg the question, as the rest of Tech speeds on into the future why have all the data warehouses/analytics at 90% of the companies stayed the same?

Why has Data Warehousing and ETL not changed?

Because of one thing of course, SQL. The concept of ANSI SQL and its applications for storing and working with data sets is impossible to beat. This is why the same old ETL has not changed most places…because it works, it’s comfortable, tons of people to do it, its approachable and doesn’t require an expert programmer to accomplish. All that some old ETL comes with a price.

All those fancy ETL GUI tools that never change are expensive… SSIS and Data Services… yikes. That are nearly impossible to test and put in source control. Yeah I know all about TFS. The whole system is bloated and unchanging. As the complexity and type of data output from IOT systems change and data grows, streams take over, these classic ETL tools and systems, the whole architecture around them struggle to keep up.

I lived in that world for years, yes, I can still appreciate a classic RDBMS Data Warehouse. It can solve problems and give business insights that are almost impossible to answer otherwise. A good data warehouse is key for companies making data driven decisions. But, it’s time to realize newer technology is here and it’s time the data warehouse ETL world to come along.

Apache Spark and SQLContext.

I believe one of the best answers today for the growing size of data, data types, streams….while still providing the core SQL functionality that drives data warehousing and analytical capabilities is…. Apache Spark… and more specifically the SQLContext provided.

“Spark SQL is a Spark module for structured data processing……One use of Spark SQL is to execute SQL queries…”

the essence of SQLContext

Why Spark? Well because it has been built to be a scalable data processing system that is the clear leader today. It provides DataFrames and DataSets that present APIs that are logical to how most of us “see” data in our mind. Yes, Spark is probably provides a little bit of a learning curve and may be a challenge at first for the classic ETL Developer. But this isn’t that bad. First of all PySpark makes it approachable for everyone. Anyone can learn Python, so anyone can write PySpark ETL scripts.

More specifically, the SQLContext makes it possible to take traditional and architecture heavy processes from tools like SSIS and SQL Server and turn them into a single PySpark script. I mean I’m running a $30 a month Spark/Hadoop cluster myself….try doing that with SQL Server and SSIS. Not to mention AWS Glue and GCP Dataproc offer totally managed Spark clusters for your use.

Enough chit chat…Let’s write some PySpark with SQLContext.

Less you accuse me of not using real data, lets fall back to the Divvy Bike trips data sets. Sets of CSV files with plenty of real life data.

So let’s approach this classic data warehousing problem. Say we have a bunch of historical data stored in csv files, maybe the csv files show up everyday on some file system. The classic approach would be…

  • design some staging table in a RDBMS system.
  • maybe a final table with changed column names, other slight transforms.
  • a data mart that will hold the summary information.
  • Design some drag and drop ETL package, one for source file into RDBMS. Then from staging table into a normalized table.
  • Probably another ETL package to make the Data Mart with summarized information.

Maybe Sales and Product just want to know the average bike ride duration per month, historically.

Solving the ETL/Analytical problem with a new approach.

The above steps are with 90%+ of data warehousing and ETL Developer teams are doing today. It’s been done like this for the last 20 years. But as we discussed earlier, this style is starting to be an island in a sea of Streams, IOT, Big Data, and DevOps. I still think simple is better, not every new and shiny tech is for every job. But I think as we look at what it would be like to solve the above ETL problem with PySpark SQLContext we will see the reduction in complexity, and powerful scalability provided by this solution.

PySpark ETL

The first thing we need to do as always is gather the csv files and get them into HDFS.

The first thing I did from my Spark master node is pull down the csv file(s). You could of course in theory just pull files from some cloud bucket etc…

curl https://divvy-tripdata.s3.amazonaws.com/Divvy_Trips_2020_Q1.zip 
Unzip Divvy_trips_2020_Q1.zip

hdfs dfs -mkdir /tripdata
hdfs dfs -copyFromLocal Divvy_Trips_2020_Q1.csv /tripdata

I basically did that a few times and got a bunch of data from 2019.

Now that my files are ready, time to write some PySpark! First I want to create a SparkSession and a SQLContext…

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession \
    .builder \
    .appName("Learn SQLContext") \
    .getOrCreate()
sqlc = SQLContext(spark)

Next I want to read in all the CSV files into a dataframe.

df = spark.read.csv(
    path="hdfs://master:9000/tripdata/*.csv",
    sep=",",
    header=True
)

I just want to pause here for a minute. Reading flat files is something that ETL Developers and Data Engineers do all day long, it’s probably the first thing you learn to do. This is repeated constantly in GUI ETL tools by dragging some Flat File Source to probably a Table connector of some kind that will put the data in a RDBMS “staging” table. I want to point out how intuitive and uncomplicated it is to load structured data into PySpark and get a DataFrame.

These DataFrames make the data accessible and actionable in a logical way, probably how you’re seeing the data in your head, a bunch of rows and columns.

A Spark DataFrame has much functionality available right out of the box. I encourage you to look through the list, it’s amazing what you can do. I would argue more complex tasks than most ETL GUI’s will give you can be done on a DataFrame with a simple call.

SQLContext

Still skeptical uh? It is a different way of thinking about ETL and transformations if you are used to the classic tools. But I have an option for you… the SQLContext.

# switch over to SQL context so we can easily change column names
sqlc.registerDataFrameAsTable(df, "trips_data")

df2 = sqlc.sql("""SELECT 
                `01 - Rental Details Rental ID`  AS rental_id,
                `01 - Rental Details Local Start Time`  AS rental_start_time,
                `01 - Rental Details Local End Time`  AS rental_end_time,
                `01 - Rental Details Bike ID`  AS bike_id,
                `01 - Rental Details Duration In Seconds Uncapped`  AS ride_duration,
                `03 - Rental Start Station ID`  AS start_station_id,
                `03 - Rental Start Station Name`  AS start_station_name,
                `02 - Rental End Station ID`  AS end_station_id,
                `02 - Rental End Station Name`  AS end_station_name,
                `User Type`  AS user_type,
                `Member Gender`  AS member_gender,
                `05 - Member Details Member Birthday Year`  AS member_birth_year
                FROM trips_data""")
df2.show()

Here we go, this probably looks familiar, just plain old SQL. This is what the SQLContext in Spark provides. I can register my DataFrame as a table and run normal SQL against it. This is amazing. Look how easily I can re-assign column names. If I run this code I get the following ouput….

/usr/local/spark/bin/spark-submit test.py

So easy. So what if I want the Data Mart info quickly, the average trip duration by month for the year? All you SQL people learned that on your first day. What about the next step, running a very simple GROUP BY sql statement to get the average trip duration for the months of the year.

sqlc.registerDataFrameAsTable(df2, "trip_duration_data_mart")

data_mart_df = spark.sql("""SELECT 
                YEAR(to_date(rental_start_time)) as rental_year,
                MONTH(to_date(rental_start_time)) as rental_month,
                 AVG(ride_duration) as avg_duration
                FROM trip_duration_data_mart
                GROUP BY YEAR(rental_start_time), MONTH(rental_start_time)
                ORDER BY rental_month ASC
                """)

data_mart_df.show()

Well look at that, like a charm! I’m hoping you’re starting to see the possibilities here. Don’t forget that Spark can easily load and save these Dataframes from sources like…

  • JBDC drivers (other RDBMS databases)
  • Hive
  • Parquet
  • Flatfiles
  • ORC
  • Avro
  • the list goes on.

So saving our our data is as easy as something like ….

data_mart_df.write.parquet("hdfs://master:9000/tripdata/trip_duration_2019.parquet")

A Different Approach to the old ETL Problem.

So think about doing this over hundreds of Gigabytes of files… Terabytes of data…. that is what Spark was built for. Imagine a data warehouse where you just dump the input data in some cloud bucket store. The backbone of your data warehouse and analytics can become a simple PySpark script. Obviously in the real world, things would be a little bit more complicated… but as someone who did this for years…. not much.

It’s just a different way of thinking about big data, warehousing and ETL. It’s sorta like playing on the edges of NoSQL because in theory we aren’t really ingesting any data into a traditional RDBMS database with all the underlying ETL systems that usually implies. In that is the essence of the rise of NOSQL like tech stacks.

It’s very easy to envision a ETL and Data Warehouse world that is built on Spark as the backbone, and the SQLContext makes it possible. Storing files in a big data store, running PySpark scripts to transform and do analytics on the data, storing results to other files or data stores for user facing consumption. Next time we will dive a lot more into PySpark, see what other fun things we can do.

All code above is on GitHub.