Lessons Learned from MERGE operations with Billions of Records on Databricks Spark

Something happens with you starting working with 10’s of billions of records and data sets that are hundreds of TBs in size. Do you know what happens? Things stop working, that’s what. I miss the days where 1-10 TBs were considered large and in charge. the good ole days.

I want to talk about lessons learned from working with MERGE INTO using Databricks Sparks. The suggestions, the marketing material, the internet, and what you actually need to do to gain reasonable performance. It’s easy to say … “here … use this new feature, you will get % 50-speed improvements.” Yeah right. Honestly, new features and fancy tricks always help, but typically it comes down to the fundamentals. The “boring” stuff if you will, that make or break Big Data operations.

The Big Problem.

It’s usually the same problems that can plague many large Data Lakes / Data Warehouses / Lakes Houses … whatever, I get tired of all those names. Eventually, there just isn’t enough compute that can solve every single problem. Size matters. Size matters when talking about data. The new world of Data Lakes poses many new challenges for the ingestion and processing of hundreds of terabytes and billions of records.

When you get to certain size things just become a little harder and every decision matters and can have large side effects.

One problem I recently ran into when building a large Data Lake on Databricks was the issue of populating new and updated records into massive Fact tables. Many tens of billions of records.

“How the devil do I merge 35 million records from one Delta Lake table into another table that has 10’s of billions of records? And do this a few times a day for crying outoud.”

– me

In a classic SQL database environment on which many the old school Data Warehouse was built, this would be a terribly common problem. We have a set of new records that need to be merged from one table to another, typically staging into some final resting place, a fact or dimension.

Some records might be new, aka INSERT and some might be current records that need to be UPDATED.

Databricks offers a solution for this problem. MERGE INTO

MERGE INTO on Databricks Spark.

You can read the official Databricks MERGE INTO documentation here. The idea behind this wonderful MERGE statement is made to simplify the common task of combining and updating records between two tables. Such a common Data Warehouse task since the days of old.

Here is a simple MERGE statement.

MERGE INTO some_fact_table as F
USING some_staging_table as S
ON F.some_key = S.some_key
WHEN MATCHED THEN UPDATE .....
WHEN NOT MATCHED THEN INSERT ....

Basically, you have some JOIN between one or more columns between two Delta tables. If there is a match then UPDATE, otherwise if there is NO match then INSERT.

Yet another problem.

This is where the problem comes in. Google it if you want … “Databricks MERGE statement poor performance.” Slowness. Waiting forever. Large Spark clusters, sitting there spinning and not making any reasonable progress.

Oh, what oh what is the problem?

To understand the problem we have to understand what is behind the two Delta tables. Parquet files. And when you have billions of records and many hundreds of terabytes of data, this means lots of files of course!

Of course, this MERGE operation is going to be slow. You have 30-50 million staging records and you need to find out if they exist or not in billions of records in another spot (files), and take the appropriate action. Yikes.

Common snake-oil recommendations to fix slow MERGE statement performance on Databricks.

Ok, first off, probably not every recommendation is snake oil, but boy, sometimes it sure feels like it.

The problem is all the marketing material that will say “Hey, gain huge performance improvements by using this new fancy feature.” The problem is this information many times will take your eye off the prize, from the simple things that usually will solve your problem.

Here are some of the recommendations that you find touted as solving your MERGE performance issues.

  • low-shuffle-merge. https://databricks.com/blog/2021/09/08/announcing-public-preview-of-low-shuffle-merge.html
  • new Photon engine. https://databricks.com/blog/2021/06/17/announcing-photon-public-preview-the-next-generation-query-engine-on-the-databricks-lakehouse-platform.html
  • file-size. (file compaction, which you should be doing anways.)
  • z-order on join key.

So easy! Want to do low-shuffle-merge to solve all your MERGE issues? Easy, just a Spark config. spark.databricks.delta.merge.enableLowShuffle = true

Problems solved! Not.

Here, try the new Photon engine, it will blow you out of the water. All you have to do is swap your Spark-scala version. 'spark_version': '9.0.x-photon-scala2.12',

update the instance type you’re using on your Databricks cluster 'node_type_id': 'i3.4xlarge' and BAM! Things are running twice as fast. Not.

(By the way, I suggest you use all these features, I’m sure they work great in some situations).

Back to the basics.

So if all these fancy features can’t just solve your problem, what will?

The basics of Big Data of course. The number one thing and the pinnacle of Big Data engineering and getting reasonable performance from your Spark scripts is …. partitions and specifically in our case “partition pruning.”

When it comes to Big Data, especially with Delta Lake and Spark, and probably many other technologies, when you start accumulating terabytes of data, and hundreds of terabytes of data, partitioning is the most important topic. Without good partitioning strategies, the data is going to be close to useless, even with massive compute you can’t overcome this issue.

Partitioning is the way to break big data up into chunks, logically and physically. The most common form of partitioning in data is usually a date or some variation of date.

Year, Month, Day is probably one of the most common partitions. Think of a bunch of folders with a year=2021 and inside that folder a bunch more folders that say something like month-12 and inside that a bunch of day folders like day=25.

Now how does this relate to partition pruning your MERGE statements for Databricks?

When identifying your JOIN ON key for the MERGE a statement you should also add a AND operator, a put a bunch of partition columns and values in.

This will give you the greatest performance boost far and away above any of the above-mentioned features. The more partitions you can “prune” or hone in on the less files and data must be scanned for matches.

Here would be a simple example …

ON fact_table.primary_key = staging_data.primary_key AND fact_table.partition_1 = 'some_value' 
                    AND fact_table.year in ({years}) AND fact_table.month in ({months})
                    AND fact_table.day in ({days})

If you have a table with billions of records you are trying to MERGE records into, identifying which partitions you are interested in, and therefore those that will actually contain the data we are interested in is the number one rule.

Sure, this might take a few steps of pre-processing and logic to identify what partitions are of interest to us based on the incoming data, but the performance and cost savings by not scanning unused partitions is going to be mind-boggling.

All this assumes you or someone else knew what they were doing when data partitions were being created.

Musings

It just is never the fancy stuff that answers our hardest and most basic problems. You have to go back to the basics of what you know about Big Data and computing. No amount of large clusters or resources is going to help you have if you have to read billions of records every time you MERGE or mess with data.

Data partitions is one of the most basic building blocks of Big Data and Delta Tables on Databricks. You have to understand how your data is used and come with at least 3 or more data partitions in my opinion if you are going to have data of any size that will be useful in the end.

The simple step of saying “I am interested in data from 2021 and month 12” in your queries and MERGE statements will make the difference between something running for a 12 hours versus 20 minutes.

I know, it’s so disappointing, isn’t it? You kinda just wish you could switch to Photon and be done with it, everything fast and quick, no thinking required. You can’t ignore the fundamentals or the new features will be of no use to you, that is the lesson.

2 replies
  1. Steve
    Steve says:

    For everything you lament, and so much more to follow, Databrick’s MERGE is trash.
    BUT FIRST, LET’S UNDERSTAND THE TRUTH. THE CONCEPT OF MERGE IS NOT ORIGINAL AND IS NOT DATABRICKS’.

    Have fun with it like this.
    Over a join of two DELTA tables just add an `AND` clause matching two, three, or more columns on the left to columns on the right.

    LET’S SAY YOU ARE AND JOINING ON FIVE COLUMNS.
    Because of the columnstore nature you just caused your query to reduce to five separate table scans.

    In my heart of hearts I truly believe Databricks went for the, “Hey. That’ll be COOL. And who wants to do an update on a join anyway?”

Comments are closed.