The world is facing a data deluge. With this enormous growth in data volumes, there is a compelling need to process data with speed, scale and performance. While intense meetings and conversations are had during the design phase with known parameters of data size and cores to achieve performance, things change dramatically when there is a continuous growth in data. The challenge then is to achieve performance with scalability. At DataRPM, we have been using spark for quite a while now and it seems to be a perfect time to pen down some of my findings in optimizing spark performance. In this blog post, I will be writing about how we optimized spark for Predictive Maintenance, especially with multiple sensor data coming in.
Choosing the right resource manager:
The first area that should be planned really well when starting to use spark is to analyze which resource manager amidst standalone, yarn and mesos makes more sense for your infrastructure management needs. Although all three resource managers have certain pros and cons but when choosing for production setup, yarn and mesos are definitely more mature and stable. A lot can be written about the different purposes for existence of mesos and yarn but I will leave that for now and concentrate on spark. As our Cognitive Data Science Platform’s technology stack is built on Hadoop Eco system, we chose yarn over mesos.Enormous growth in #data = Compelling need to process data with #performance #ApacheSpark Click To Tweet
Data ingested into spark is stored as distributed collection of elements called RDD (Resilient distributed dataset) which is divided into set of partition across all nodes in the spark cluster. Number of partitions determine parallelism of computation on RDD. It is therefore required to have enough number of partitions to optimally utilize available resources for spark.
Let’s assume a spark cluster has 30 nodes with each node hosting one executor with one computational thread. If a RDD in the cluster has 10 partitions, then at any point of computation 20 nodes will remain unutilized. To fully utilize computational resource the RDD should have been partitioned into at least 30 partitions.
In a real use case scenario, an executor can hold multiple number of partitions depending on the memory configured for each executor. Generally a cluster of ‘n’ executors with each executor having ‘c’ computational threads (cores) should have n*c partitions. However it should be noted that above way of calculating number of partitions may not fit well for all size of input data for a given size of cluster. Another way of getting optimal partitions count is by keeping each partition size between 50MB to 200MB. It is advised that a task should not take more than 5 seconds to complete. If average time of task completion exceeds more than 5 seconds then RDD should be repartitioned to decrease partitions size. It should also be kept in mind that RDD repartition will cause shuffle and hence will increase execution time.
Number of partitions of a base RDD depends on input datasource. If RDD has been created from textFile method of sparkContext, then the number of partitions is either number of blocks on filesystem or the number specified in second argument of the method. Number of partitions in RDD produced by parallelize method comes from the argument given by user or by configuration spark.default.parallelism if no argument is provided. RDDs produced after applying certain transformation have same number of partitions as that of the number of partitions in parent RDD except some transformations that require data shuffling. Eg. Coalesce will generate RDD with fewer partitions than in parent RDD. All operations that required data shuffle have an option to provide number of partitions to be created for the resultant RDD.
Resource Allocation For Spark Applications:
So far we have seen how optimal parallel processing in spark can achieved by having proper number of RDD partitions. Now let’s have a look at resource allocation for spark application. As of now, available spark cluster managers are capable of managing cores and memory. Other resources like disk and network I/O are not actively managed though they also play significant role in spark performance.
Every executor in a spark application has same number of cores and heap size. The number of cores for each executor can be specified by flag –executor-cores while invoking spark-submit, spark-shell and pyspark from command line or by setting spark.executor.cores in spark-defaults.conf file or on sparkConf object. Executor heap size can be configured by flag –executor-memory or property spark.executor.memory. The cores property determines number of concurrent task an executor can run and memory property controls size of data that can be cached and shuffle data size. Number of executors for a spark application can be controlled by flag –num-executors or property spark.executor.instances.
By default 60% of memory allocated to spark application is used for RDD caching, 20% is used of data shuffle and rest 20% is used for storing objects created for task execution. It is highly recommended not to configure executor with more than 40G of memory as higher memory allocation for executor can lead to large Garbage Collection (GC) pauses.
Core configuration for each executor should also be done judiciously. Higher core configuration per executor can impact Hadoop Distributed File System(HDFS) IO adversely, potentially resulting in reduced spark performance. It’s recommended that an executor be configured with not more than 4 cores.Higher core configuration per executor can impact HDFS IO resulting in reduced #ApacheSpark performance. Click To Tweet
Elasticity in Spark Applications:
It’s also possible to bring elasticity to spark application by enabling dynamic allocation. In this case the number of executors can be scaled up or down at run time depending on requirement. This feature is currently(spark 1.5) available with Yarn only.
Dynamic allocation can be enabled by setting property spark.dynamicAllocation.enabled to true. Application will start with initial number of executors specified by spark.dynamicAllocation.initialExecutors with lower bound of spark.dynamicAllocation.minExecutors and upper bound of spark.dynamicAllocation.maxExecutors. New request for executor is made when there have been pending tasks for number of seconds specified in spark.dynamicAllocation.schedulerBacklogTimeout.
An executor will be removed if it’s been idle for duration specified by spark.dynamicAllocation.executorIdle Timeout .
Good luck with your Apache Spark 1.5 performance tuning and share with us your best practices.