Thursday, 25 June 2015

Shufflling and repartitioning of RDD’s in apache spark

To write the optimize spark application you should carefully use transformation and actions, if you use wrong transformation and action will make your application  slow. So when you are writing application some points should be remember to make your application more optimize.
1. Number of partitions when creating RDD
By default spark create one partition for each block of the file in HDFS it is 64MB by default. You can also pass second argument as a number of partition when creating RDD.Let see example of creating RDD of text file
val rdd= sc.textFile(“file.txt”,5)
above statement make a RDD of textFile with 5 partition. Now if we have a cluster with 4 cores then each partition need to process 5 minutes so 4 partition process parallel and 5 partition process after that whenever core will be free so it so final result will be completed in 10 minutes and resources also ideal while only one partition process.
So to overcome this problem we should make RDD with number of partition is equal to number of cores in the cluster by this all partition will process parallel and resources are also used equally
 2 . reduceByKey Vs. groupByKey
Let see example of word count you can process RDD and find the frequency of word using both the transformations groupByKey and reduceBykey
word count using reduceBykey
val wordPairsRDD = rdd.map(word => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()
See in diagram how RDD are process and shuffle over the network
reduce_by
As you see in above diagram all worker node first process its own partition and count words on its own machine and then shuffle for final result
On the other hand if we use groupByKey for word count as follow
val wordCountsWithGroup = rdd
  .groupByKey()
  .map(t => (t._1, t._2.sum))
  .collect()
Let see diagram how RDD are process and shuffle over the network using groupByKey
group_by
As you see above all worker node shuffle data and at final node it will be count words so using groupByKey so lot of unnecessary data will be transfer over the network.
So avoid using groupByKey as much as possible.
3. Hash-partition before transformation over pair RDD
Before perform any transformation we should shuffle same key data at the same worker so for that we use Hash-partition to shuffle data and make partition using the key of the pair RDD let see the example of the Hash-Partition data
val wordPairsRDD = rdd.map(word => (word, 1)).
                   partitonBy(new HashPartition(4))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)
  .collect()
When we are using Hash-partition the data will be shuffle and all same key data will shuffle at same worker, Let see in diagram
p1
In the above diagram you can see all the data of  “c” key will be shuffle at same worker node. So if we use tansformation over pair RDD we should use hash-partitioning.
4. Do not use collect() over a big dataset
Collect() action collect all elements of RDD and send it to master so if we use it on the big dataset sometimes it might be give out of memory because data set not fit into memory so filter the data before use collect() or use take and sampleTake action.
5. Use coalesce to repartition in decrease number of partition
Use coalesce if you decrease number of partition of the RDD instead of repartition. coalesce is usefull because its not shuffle data over network.
rdd.coalesce(1)

No comments:

Post a Comment