1. What is a Spark Join?
When Spark joins two tables, it:
- Looks at the join key (example:
user_id,order_id) - Sends rows with the same key to the same worker (executor)
- That worker matches rows and creates the result
This is how Spark runs fast — by dividing work across many machines
2. What is Data Skew?
Data skew means some keys have much more data than others
You are joining data on country
| Country | Rows |
|---|---|
| India | 5,000 |
| UK | 5,000 |
| Canada | 5,000 |
| USA | 1,000,000 |
Spark sends:
- Small countries → many workers
- USA → mostly one worker
So:
- One worker becomes very slow
- Others finish early and wait
- Whole job becomes slow or may fail
This problem is called data skew
3. Why Skew is Dangerous
Spark works best when all workers get equal work
With skew:
- One task runs much longer (called a straggler)
- Memory may overflow
- CPU on other machines stays idle
- Job appears stuck at 95–99%
4. How to Detect Data Skew
A) Simple Query Check
SELECT key, COUNT(*)
FROM table
GROUP BY key
ORDER BY COUNT(*) DESC
LIMIT 10;
If top keys have huge counts, skew exists.
B) Spark UI Signs
Look for:
- One task running much longer than others
- One executor reading much more shuffle data
- High memory usage on one node
How to fix Data Skew
Broadcast Join
When to Use
If one table is small (usually <100–200MB)
Idea
Spark copies the small table to every worker, so no data needs to move around during join.
Code
from pyspark.sql.functions import broadcast
result = big_df.join(broadcast(small_df), "id")
Why It Works
No shuffle → no heavy worker → skew disappears
Salting
Simple Idea
Break one heavy key into multiple smaller fake keys so Spark can distribute them.
Example
If:
user_id = 1 appears 1,000,000 times
Convert to:
1_1, 1_2, 1_3, 1_4, 1_5
Now Spark sends these to different workers
How It Works (Steps)
1️⃣ Add a random number (salt) to the big table
2️⃣ Copy the small table for each salt value
3️⃣ Join using both key + salt
Sample Code
from pyspark.sql.functions import rand, floor, explode, array
# Add salt to big table
big_salted = big_df.withColumn("salt", floor(rand() * 5))
# Duplicate small table
small_salted = small_df.withColumn("salt", explode(array([0,1,2,3,4])))
# Join
result = big_salted.join(
small_salted,
(big_salted.id == small_salted.id) &
(big_salted.salt == small_salted.salt)
)
Let Spark Fix It Automatically (AQE)
Best for Spark 3+
Turn It On
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true")
What Spark Does
- Finds heavy partitions
- Breaks them into smaller parts
- Runs them in parallel
Why It’s Great
No code change needed — just configuration
Separate Heavy Keys
Idea
- Process skewed keys separately
- Process normal keys normally
- Combine results
Example
from pyspark.sql.functions import col
hot_keys = [1, 2]
skewed = big_df.filter(col("id").isin(hot_keys))
normal = big_df.filter(~col("id").isin(hot_keys))
result = skewed.join(small_df, "id") \
.union(normal.join(small_df, "id"))
Increase Shuffle Partitions
When to Use
For mild skew
spark.conf.set("spark.sql.shuffle.partitions", 500)
This gives Spark more buckets to spread data.
Which Method Should I Use? (Decision Table)
| Situation | Best Choice |
|---|---|
| One table is small | Broadcast Join |
| Both tables are big | Salting |
| Spark 3+ | Enable AQE |
| Few known heavy keys | Separate them |
| Mild skew | Increase partitions |
7. Real Production Example
Case
- Fact table: 2 billion rows
- Dimension table: 5GB
- Key
country = "US"= 40% of all data
Fix
- Enabled AQE
- Salted only
"US"key
Result
Job time dropped from 90 minutes to 12 minutes
Easy Memory Trick
B-A-S
- B → Broadcast small table
- A → AQE automatic fix
- S → Salt heavy keys