My cluster:<ul><li>5 data node</li> <li>each data node has: 8 CPUs, 45GB memory</li> </ul>
Due to some other configuration limit, I can only start 5 executors on each data node. So I did
spark-submit --num-executors 30 --executor-memory 2G ...
So each executor uses 1 core.
I have two data set, each is about 20 GB. In my code, I did:
val rdd1 = sc...cache() val rdd2 = sc...cache() val x = rdd1.cartesian(rdd2).repartition(30) map ...
In the Spark UI, I saw the
repartition step took more than 30 mins, and it cause data shuffle of more than 150GB.
I don't think it is right. But I could not figure out what goes wrong...Answer1:
Did you really mean "cartesian"?
You are multiplying every row in RDD1 by every row in RDD2. So if your rows were 1k each, you had about 20,000 rows per RDD. The cartesian product will return a set with 20,000 x 20,000 or 400 million records. And note that each row would now be double in width -- 2k -- so you'd have 800 GB in RDD3 whereas you only had 20 GB each in RDD1 and RDD2.
val x = rdd1.union(rdd2).repartition(30) map ...
or maybe even:
val x = rdd1.zip(rdd2).repartition(30) map ...