Resource Balancing Across Applications In Spark

Spark, being a fast and distributed data processing engine, has taken over the traditional MapReduce in many use cases. With the continuous support from the community, the improvements in every release is taking Spark to new heights. At  DataRPM we leverage the computation power of Spark for our Cognitive Predictive Maintenance  (CPdM) Platform. In this blog, I would like to share my experience of working with Spark for our CPdM Platform, which truly made our lives easier.

Every Spark application has dedicated independent set of executors. The same set of executors are used for any computation or caching of RDDs or DataFrames. But in our CPdM Platform, we have multiple applications running in parallel. For eg : DataLake data load, Machine Learning algorithms, number of Data Discovery and transformation applications etc.  Concurrently using multiple applications, cluster resources (memory and cores) can get exhausted as each application keeps the executors running as long as the application runs. And that’s a big problem to tackle with.

Executor resources are always measured in terms of Cores and Memory. If we are sure about the type of applications running in our deployment, it is always possible to control the executor resources by configuring –spark.executor.memory, -spark.executor.cores. But this works only for a lab kind of deployment wherein only 1 or 2 users will be playing around Spark with small data. But in case of BigData, it is always necessary to handle such cases so that we don’t end up in DOS (Denial Of Service) for any application. Most of the distributed computation platforms run on YARN, where in you will be supporting many types of applications along with Spark ones. So putting all these things together, it is quite evident that managing the resources is equally important than coding a effective transformation function as it might lead to under/over utilization of resources and applications might starve for resources for a long time bringing the situation to STOP THE WORLD!!.

 

Dynamic Resource Allocation

Spark in version 1.2 introduces Dynamic Resource Allocation,where in there is an ability to scale the executors up and down based on the need. Until 1.5.2 version of Spark, this feature was only available with YARN. With Spark 1.6, this is made available for all types of deployments : Standalone, Mesos, YARN.

How to enable this ?

Let us see how simple it is to enable the same for YARN.

  • Set the below 2 properties to true in SparkConfig.

spark.dynamicAllocation.enabled
spark.shuffle.service.enabled

In addition to this, to make the external shuffle service(explained later) enabled for YARN,

  • locate spark-<“version”>-yarn-shuffle.jar and add it to YARN Node Manager’s classpath.
  • In yarn-site.xml, add spark_shuffle to yarn.nodemanager.aux-services. (along with mapreduce_shuffle if already present)
  • Configure yarn.nodemanager.aux-services.spark_shuffle.class to org.apache.spark.network.yarn.YarnShuffleService to provide the impl for the new shuffle service of spark.
  • Restart YARN service.

How does it work ?

Based on the –spark.executor.memory, –spark.executor.cores, –spark.tasks.cpus and total number of tasks, the number of executors required will be calculated. Once the number of executors are calculated, the same can be requested from the Application Master and in turn communicate with the YARN Resource Manager to request for desired number of containers to be allocated so that executors are run. This process happens in batches when there are pending tasks for spark.dynamicAllocation.schedulerBacklogTimeout seconds, and then triggered again every spark.dynamicAllocation.sustainedSchedulerBacklogTimeout seconds. Once the execution is done, the containers are released and the executors who are idle for spark.dynamicAllocation.executorIdleTimeout are shutdown explicitly by the Application Master, so that other running applications can request for more executors.

Important and Interesting To Know..

  • Decommissioning of executors might happen when the application is still running. Hence we need someone to maintain the state. Important use case for this will be during shuffle phase, once the executor writes the shuffle output to the disk, the executor might be removed. We need someone to serve the shuffle output to other executors. The External shuffle service which was enabled, will actually solve this problem.

External Shuffle Service : A long running process that runs on each node independent of the application, serves the shuffle files to other executors, which means any executor’s shuffle state will be served even after its removal.

datarpm

  • Decommissioning of executors, removes all the cached data the executor is holding. But, this will not happen by default, because such executors are considered for decommissioning only based on the configuration spark.dynamicAllocation.cachedExecutorIdleTimeout (which is infinity by default) where the executor is idle for more than this timeout.

What’s next ?

Knowing the rich set of configurations, with the combination of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.maxExecutors, spark.dynamicAllocation.minExecutors we can divide the cluster resources among the applications by defining specific Spark Config properties for each application so that resources for the applications are not overlapped and continue to serve in parallel.

 

Picture2

Conclusion

Dynamic Resource Allocation removes most of the headache, a user has to undergo to solve memory related issues in a distributed computation environment, so that he can focus mainly on the application development. Realizing the importance of such a hook provided by Spark, has truly made the lives of engineers easy. 

Happy Sparking .. :)