1. What is a Spark Join?

When Spark joins two tables, it:

  1. Looks at the join key (example: user_id, order_id)
  2. Sends rows with the same key to the same worker (executor)
  3. That worker matches rows and creates the result

This is how Spark runs fast — by dividing work across many machines


2. What is Data Skew?

Data skew means some keys have much more data than others


You are joining data on country

CountryRows
India5,000
UK5,000
Canada5,000
USA1,000,000

Spark sends:

So:

This problem is called data skew


3. Why Skew is Dangerous

Spark works best when all workers get equal work

With skew:


4. How to Detect Data Skew

A) Simple Query Check

SELECT key, COUNT(*) 
FROM table
GROUP BY key
ORDER BY COUNT(*) DESC
LIMIT 10;

If top keys have huge counts, skew exists.


B) Spark UI Signs

Look for:


How to fix Data Skew


Broadcast Join

When to Use

If one table is small (usually <100–200MB)

Idea

Spark copies the small table to every worker, so no data needs to move around during join.

Code

from pyspark.sql.functions import broadcast

result = big_df.join(broadcast(small_df), "id")

Why It Works

No shuffle → no heavy worker → skew disappears


Salting

Simple Idea

Break one heavy key into multiple smaller fake keys so Spark can distribute them.


Example

If:

user_id = 1 appears 1,000,000 times

Convert to:

1_1, 1_2, 1_3, 1_4, 1_5

Now Spark sends these to different workers


How It Works (Steps)

1️⃣ Add a random number (salt) to the big table
2️⃣ Copy the small table for each salt value
3️⃣ Join using both key + salt


Sample Code

from pyspark.sql.functions import rand, floor, explode, array

# Add salt to big table
big_salted = big_df.withColumn("salt", floor(rand() * 5))

# Duplicate small table
small_salted = small_df.withColumn("salt", explode(array([0,1,2,3,4])))

# Join
result = big_salted.join(
    small_salted,
    (big_salted.id == small_salted.id) &
    (big_salted.salt == small_salted.salt)
)

Let Spark Fix It Automatically (AQE)

Best for Spark 3+

Turn It On

spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")

What Spark Does

Why It’s Great

No code change needed — just configuration


Separate Heavy Keys

Idea


Example

from pyspark.sql.functions import col

hot_keys = [1, 2]

skewed = big_df.filter(col("id").isin(hot_keys))
normal = big_df.filter(~col("id").isin(hot_keys))

result = skewed.join(small_df, "id") \
         .union(normal.join(small_df, "id"))

Increase Shuffle Partitions

When to Use

For mild skew

spark.conf.set("spark.sql.shuffle.partitions", 500)

This gives Spark more buckets to spread data.


Which Method Should I Use? (Decision Table)

SituationBest Choice
One table is smallBroadcast Join
Both tables are bigSalting
Spark 3+Enable AQE
Few known heavy keysSeparate them
Mild skewIncrease partitions

7. Real Production Example

Case

Fix

Result

Job time dropped from 90 minutes to 12 minutes

Easy Memory Trick

B-A-S


Leave a Reply

Your email address will not be published. Required fields are marked *