Guide for Apache Spark Setup, Job Optimisation, AWS EMR Cluster Configuration, S3, YARN and HDFS Optimisation

How to tune Apache Spark Job for optimizations? How to perform join efficiently? How to tune AWS EMR Cluster for optimizations? How to tune S3 for optimizations? How to tune YARN for optimizations? How to tune HDFS for optimizations? How to Apache Spark Job fix errors? How to fix AWS EMR Cluster errors? How to fix S3 errors? How to fix YARN errors? and How to fix HDFS errors?

We cover answer all those questions in this super long technical blog.😅 Since this is optimisation guide, I will consider you are already familiar with basics.

Blog repo link for requesting changes: https://devendraap.github.io/Spark-job-and-AWS-EMR-cluster-S3-YARN-and-HDFS-tuning/

We will process our data in AWS environment using Spark (on EMR) and use object storage (S3) for storage in below examples.

While processing billions of records or TB’s of data we faced multiple hurdles. This wiki documents extensively the error team faced while processing large dataset using Spark jobs and how to resolve them. The Spark job and cluster optimization for processing large dataset are also explained below.

We should prefer Dataset for storing and processing data in memory because of following reasons:

  • Static-typing and runtime type-safety. With a DataFrame, you can select a nonexistent column and notice your mistake only when you run your code. With a Dataset, you have a compile time error.
  • Provides Catalyst optimization and benefit from Tungsten’s efficient bytecode generation due to Encoders used for Dataset
  • Dataset has helpers called encoders, which are smart and efficient encoding utilities that convert data inside each user-defined object into a compact binary format. Spark understands the structure of data in Datasets, it can create a more optimal layout in memory when caching Datasets. This translates into a reduction of memory usage if and when a Dataset is cached in memory as well as a reduction in the number of bytes that Spark needs to transfer over a network during the shuffling process.
  • Kryo serializer usage leads to Spark storing every row in the Dataset as a flat binary object using Spark’s internal encoders and is >10x faster than dataframe’s Kryo serialization.

Ref: Heather Miller’s Course

Note: Please go through reference links provided to fully understand how spark options affects the data processing

Optimising Spark job for Joins and other data feature based optimisation:

Let’s assume we have two tables whose raw/csv file size is 3TB and 500GB respectively and needs to be joined on particular columns. Following are the ways to optimize the joins and prevent the job failures as the data grows gradually after each refresh.

  1. Set spark.sql. files.maxPartitionBytes to 128MB which will reparation the files after reading so that resultant partitions will be each of 128MB.
  2. If fill rate of joining column is not 100%, filter records containing null and perform join on those records. Union the output with records containing null values.
  3. Set spark.shuffle.paritions value to re-partition data and increase tasks during join operation resulting in increased parallel processing. The partition size should be ~128MB corresponding to the block size in EMRFS (ref. AWS docs).
  4. To know amount of data processed and time taken by each task, open the stage summary metrics in Application Master.
  5. In Application Master, if the MAX time taken is greater than 5 min for a task, try increasing partition size.
  6. In Application Master, if few tasks are taking too long to execute then you are performing cartesian joins due to NULL or repeated values in column used to join.
  7. In Application Master, if 25th percentile takes <100ms, but MAX time is > 5 min for task implies that the data is skewed. The data can be evenly distributed by adding salt column:

import org.apache. spark.sql.functions._

df.withColumn(“salt”, (rand * n).cast(IntegerType)) .groupBy(“salt”, groupByFields) .agg(aggFields) .groupBy(groupByFields) .agg(aggFields)

If data processed in only few ETL steps is too large try following options:

  1. Turn on auto-scaling option of cluster to allocate more core nodes while processing those few large data processing ETL steps.
  2. Partition the larger table by column which can evenly distribute the records (like year or quarter) and persist it to EMRFS.
  3. Read each partition at a time by using filter, perform join and write output to EMRFS.

Executor resource calculation:

The r series memory optimised instance for EMR have 1:8 core to (GB) memory ratio. The optimal CPU count per executor is 5. So, to prevent underutilisation of CPU or memory resource, the executor’s optimal resource per executor will be 40GB memory and 5 CPU. Following is an example for r5.12xlarge instace:

By assigning 1 core and 1GB for YARN, we are left with 47 core per node for r5.12xlarge instace.

We allocated 5 cores per executor for max HDFS throughput

MemoryStore and BlockManagerMaster per node consumes 12GB per node

  • Memory per executor = (374–12 -12) / 9 ~= 40 GB
  • Number of executor = (48–1) / 5 ~= 9

Specs per CORE or TASK node of r4.12xlarge instance type:

  • Cores = 48
  • Memory = (384 GiB * 1000) / 1024 = 375 GB

Note: If EMR cluster is configured to use task nodes, do not exceed CORE Node to TASK Node ratio 2:1 (as task node does not have HDFS storage. Also, allocate more HDFS storage to compensate for the lack of HDFS storage on task nodes).

Spark submit options:

Spark executor memory allocation layout and calculations:

spark.yarn.executor.MemoryOverhead = 3 * 1024 = 3072

spark.executor.memory = 33 * 1024 = 33792

spark.memory.fraction = 0.8 * 34816 = 27852.8

spark.memory.storageFraction (cache, broadcast, accumulator) = 0.4 * 34816 = 13926.4

User memory = ( 1.0–0.8 ) * 34816 = 6963.2

yarn.nodemanager.resource.memory-mb stays around = ~40GB

Parameter: spark.executor.memory

Value: 33g

Explanation: Benefits

Reference: Link

Parameter: spark.executor.cores

Value: 5

Explanation: Benefits

Reference: (Explained above)

Parameter: spark.memory.fraction

Value: 0.8

Explanation: Approx. (spark.memory.fraction * spark.executor.memory) memory for task execution, shuffle, join, sort, aggregate

Benefits:

Reference:

Parameter: spark.memory. storageFraction

Value: 0.5

Explanation: Approx. (spark.memory. storageFraction * spark.executor. memory) memory for cache, broadcast and accumulator

Benefits:

Reference:

Parameter: spark.dynamicAllocation. enabled and

Value: TRUE

Explanation: To allocate executor dynamically based on yarn.scheduler. capacity.resource- calculator = org.apache. hadoop.yarn. util.resource. DominantResource Calculator

Benefits: Scales number of executors based on CPU and memory requirements.

Reference: Link

Parameter: spark.shuffle. service.enabled

Value: TRUE

Explanation: Spark shuffle service maintains the shuffle files generated by all Spark executors that ran on that node. Spark executors write the shuffle data and manage it

Benefits: Spark shuffle service service preserves the shuffle files written by executors so the executors can be safely removedResolves error: java.io.IOException: All datanodes are bad.”

Reference:

Parameter: spark.executor. extraJavaOptions

Value: -XX:+UseG1GC -XX: InitiatingHeapOccupancy Percent=35 -XX: OnOutOfMemoryError=’ kill -9 %p’

Explanation: The parameter -XX:+UseG1GC specifies that the G1GC garbage collector should be used. (The default is -XX: +UseParallelGC.) To understand the frequency and execution time of the garbage collection, use the parameters -verbose:gc -XX: +PrintGCDetails -XX: +PrintGCDateStamps. To initiate garbage collection sooner, set Initiating HeapOccupancyPercent to 35 (the default is 0.45). Doing this helps avoid potential garbage collection for the total memory, which can take a significant amount of time.

Benefits: Better garbage collection as G1 is suItable for large heap to resolve Out of memory issue, reduce the gc pause time, high latency and low throughput

Reference: Link

Parameter: spark.driver. maxResultSize

Value: 20G

Explanation: spark.sql. autoBroadcast JoinThreshold \< spark.driver. maxResultSize \< spark.driver.memory

Benefits: Resolves error: serialized results of x tasks is bigger than spark.driver. maxResultSize

Reference:

Parameter: spark.yarn. maxAppAttempts

Value: 2

Explanation: Maximum attempts for running application

Benefits: Reference

Parameter: spark.rpc. message.maxSize

Value: 2048

Explanation: Increases remote procedure call message size

Benefits: Resolves error: exceeds max allowed: spark.rpc. message.maxSize

Reference:

Parameter: spark.spark. worker.timeout

Value: 240

Explanation: Allows task working on skewed data more time for execution. Proper re-partitioning (with salting) on join or groupBy column reduces time for execution

Benefits:

Resolves: Lost executor xx on slave1.cluster: Executor heartbeat timed out after xxxxx msWARN TransportChannel Handler: Exception in connection from /172.31.3.245:46014

Reference: Link

Parameter: spark.network. timeout

Value: 9999s

Explanation

Benefits: java.io.IOException: Connection reset by peer

Reference:

Parameter: spark.shuffle. file.buffer

Value: 1024k

Explanation: Reduce the number of times the disk file overflows during the shuffle write process, which can reduce the number of disk IO times and improve performance

Benefits: Reference

Parameter: spark.locality. wait

Value: 15s

Explanation: Reduces large amounts of data transfer over network (shuffling)

Benefits: Reference

Parameter: spark.shuffle. io.connectionTimeout

Value: 3000

Explanation:

Benefits: Resolves error: “org.apache. spark.rpc. RpcTimeoutException: Futures timed out after [120 seconds]”

Reference: Link

Parameter: spark.shuffle. io.retryWait

Value: 60s

Explanation:

Benefits:

Resolves error: org.apache. spark.shuffle. MetadataFetch FailedException: Missing an output location for shuffle 1

Reference:

Parameter: spark.reducer. maxReqsInFlight

Value: 1

Explanation:

Benefits:

Reference:

Parameter: spark.shuffle. io.maxRetries

Value: 10

Explanation:

Benefits:

Reference:

Parameter: spark.scheduler. maxRegistered ResourcesWaitingTime

Value: 180s

Explanation: The maximum amount of time it will wait before scheduling begins is controlled

Benefits: Resolves error: Application_xxxxx_xxx failed 2 times due to AM container for appattempt_xxxx_xxxxx. Exception from container-launch.

Reference

Parameter: spark.dynamicAllocation. enabled

Value: TRUE

Explanation: Benefits

Reference:

Parameter: spark.dynamicAllocation. executorIdleTimeout

Value: 60s

Explanation: Remove executor with if idle for more than this duration

Benefits:

Reference:

Parameter: spark.dynamicAllocation. cachedExecutorIdleTimeout

Value: 36000s

Explanation: Remove executor with cached data blocks if idle for more than this duration

Benefits:

Reference:

Parameter: spark.sql. broadcastTimeout

Value: 72000

Explanation: Timeout in seconds for the broadcast wait time in broadcast joins

Benefits: Resolves error: ERROR yarn.ApplicationMaster: User class threw exception: java.util. concurrent. TimeoutException: Futures timed out after

Reference:

Parameter: spark.hadoop. mapreduce. fileoutputcommitter. algorithm.version

Value: 2

Explanation: Major difference between mapreduce. fileoutputcommitter. algorithm.version =1 and 2 is : Either AM or Reducers will do the mergePaths().

Benefits: Allows reducers to do mergePaths() to move those files to the final output directory

Reference: Link

Parameter: spark.sql. autoBroadcast JoinThreshold

Value: 0

Explanation: Maximum broadcast table is limited by spark default i.e 8gb

Benefits:

Reference: Link

Parameter: spark.io. compression.codec

Value: zstd

Explanation: Reduces serialized data size by 50% resulting in less spill size (memory and disk), storage io and network io, but increases CPU overhead by 2–5% which is acceptable while processing large datasets

Benefits: Used by spark.sql. inMemoryColumnarStorage. compressed, spark.rdd. compress, spark.shuffle. compress, spark.shuffle. compress, spark.shuffle. spill.compress, spark.checkpoint. compress, spark.broadcast. compress. Which allows us to broadcast table with 2x records, spill less size (memory and data), reduce disk and network io.

Reference: Link

Parameter: spark.io. compression.zstd. level

Value: 6

Explanation:

Benefits:

Reference:

Parameter: spark.sql. inMemoryColumnarStorage. compressed

Value: TRUE

Explanation: Enables compression. Reduce network IO and memory usage using spark compression codec spark.io. compression.codec

Benefits: Reference

Parameter: spark.rdd. compress

Value: TRUE

Explanation:

Benefits:

Reference:

Parameter: spark.shuffle. compress

Value: TRUE

Explanation:

Benefits:

Reference:

Parameter: spark.shuffle. spill.compress

Value: TRUE

Explanation:

Benefits:

Reference:

Parameter: spark.checkpoint. compress

Value: TRUE

Explanation: Benefits

Reference:

Parameter: spark.broadcast. compress

Value: TRUE

Explanation:

Benefits:

Reference:

Parameter: spark.storage. level

Value: MEMORY_AND_DISK_SER

Explanation: Spill partitions that don’t fit in executor memory. Uses low space (i.e. memory in RAM or storage in SSD)

Benefits:

Reference: StackOverflow

Parameter: spark.serializer

Value: org.apache. spark.serializer. KryoSerializer

Explanation: Better than default spark serializer

Benefits:

Reference:

Parameter: spark.hadoop.s3. multipart.committer. conflict-mode

Value: replace

Explanation: Setting for new Hadoop parquet magic committer

Benefits:

Reference:

Parameter: spark.shuffle. consolidateFiles

Value: TRUE

Explanation:Optimization for custom ShuffleHash join implementation. Note that the MergeSort join is default method which is better for large datasets due to memory limitation

Benefits: Reference

Parameter: spark.reducer. maxSizeInFlight

Value: 96

Explanation: Increase data reducers is requested from “map” task outputs in bigger chunks which would improve performance

Benefits:

Reference:

Parameter: spark.kryoserializer. buffer.max

Value: 1024m

Explanation:

Benefits: Resolves error: com.esotericsoftware. kryo.KryoException: Buffer overflow. Available: 0, required: 57197

Reference:

Parameter: spark.sql.shuffle. partitions

Value: 10000

Explanation: Number of partitions during join operation

Benefits:

Reference:

Parameter: spark.sql. files.maxPartitionBytes

Value: 134217728

Explanation: Reparation file after reading to 128MB each

Benefits:

Reference:

Parameter: spark.scheduler. listenerbus.eventqueue. capacity

Value: 20000

Explanation: Resolves error: ERROR scheduler. LiveListenerBus: Dropping SparkListenerEvent because no remaining room in event queue. This likely means one of the SparkListeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler

EMR cluster tuning:

Configuration Property: dfs.replication

Classification: hdfs-site

Value: 2

Usage: HDFS data replication factor for EMR with auto scaling enabled for core nodes

Reference: (blank)

Configuration Property: fs.s3.enableServer SideEncryption

Classification: emrfs-site

Value: TRUE

Usage: Enables S3 AES256 data encryption

Reference: (blank)

Configuration Property:

fs.s3a. attempts.maximum

Classification: emrfs-site

Value: 100

Usage: Workaround to resolve S3’s storage eventual consistency missing file error due to replication in multiple AZ (availability zone)

Reference: (blank)

Configuration Property: fs.s3a.committer. magic.enabled

Classification: emrfs-site

Value: TRUE

Usage: Setting for new Hadoop parquet magic committer

Reference: (blank)

Configuration Property: fs.s3a.connection. maximum

Classification: emrfs-site

Value: 250

Usage: Increases S3 IO speed

Reference: Link

Configuration Property: fs.s3a.fast. upload

Classification: emrfs-site

Value: TRUE

Usage: Increases S3 IO speed

Reference: Link

Configuration Property: fs.s3a.server-side-encryption-algorithm

Classification: core-site

Value: AES256

Usage: Enables S3 AES256 data encryption

Reference: (blank)

Configuration Property: fs.s3a. threads.core

Classification: emrfs-site

Value: 250

Usage: Increases S3 IO speed

Reference: Link

Configuration Property: yarn.log-aggregation. retain-seconds

Classification: yarn-site

Value: -1

Usage: (blank)

Reference: (blank)

Configuration Property: yarn.log-aggregation-enable

Classification: yarn-site

Value: TRUE

Usage: Aggregates logs at driver node

Reference: (blank)

Configuration Property: yarn.nm.liveness-monitor.expiry-interval-ms

Classification: yarn-site

Value: 360000

Usage: Increases time to wait until a node manager is considered dead

Reference: (blank)

Configuration Property: yarn.nodemanager. pmem-check-enabled

Classification: yarn-site

Value: FALSE

Usage: (Note: Re-partition data in job based on size)

Reference: (blank)

Configuration Property: yarn.nodemanager. vmem-check-enabled

Classification: yarn-site

Value: FALSE

Usage: To disable hard memory restriction causing OOM (out of memory) JVM error

Reference: (blank)

Configuration Property: yarn.resourcemanager. decommissioning.timeout

Classification: yarn-site

Value: 3600

Usage: Increases timeout interval to blacklist node

Reference: (blank)

Configuration Property: yarn.scheduler. capacity.resource-calculator

Classification: capacity-scheduler

Value: org.apache.hadoop. yarn.util.resource. DominantResourceCalculator

Usage: The default resource calculator i.e org.apache. hadoop.yarn. util.resource. DefaultResource Calculator uses only memory information for allocating containers and CPU scheduling is not enabled by default

Reference: Link

Configuration Property: yarn.scheduler. capacity.root. default.capacity

Classification: capacity-scheduler

Value: 100

Usage: Uses all resources of dedicated cluster

Reference: (blank)

Configuration Property: yarn.scheduler. capacity.root. default.maximum-capacity

Classification: capacity-scheduler

Value: 100

Usage: Uses all resources of dedicated cluster

Reference: (blank)

Spark Option summary

If data size is range of GB i.e Millions of records.

/usr/lib/spark/bin/spark-submit 
--conf deploy-mode=cluster
--conf driver-memory=30G
--conf executor-memory=34G
--conf executor-cores=5

If data size is range of TB i.e Billions of records. Note: Not tested above 15 Billion records or above 15TB

--conf \"spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:+G1SummarizeConcMark -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p'\"
--conf spark.executor.memoryOverhead=3g
--conf spark.sql.files.maxPartitionBytes=134217728
--conf spark.yarn.maxAppAttempts=1
--conf spark.driver.maxResultSize=0
--conf spark.rpc.message.maxSize=2047
--conf spark.network.timeout=10000000s
--conf spark.executor.heartbeatInterval=10000000
--conf spark.authenticate=true
--conf spark.files.FetchTimeout=600s
--conf spark.shuffle.file.buffer=1024k
--conf spark.locality.wait=15s
--conf spark.shuffle.io.connectionTimeout=3000
--conf spark.shuffle.io.retryWait=60s
--conf spark.shuffle.io.maxRetries=10
--conf spark.reducer.maxReqsInFlight=1
--conf spark.scheduler.maxRegisteredResourcesWaitingTime=60s
--conf spark.dynamicAllocation.enabled=true
--conf spark.dynamicAllocation.executorIdleTimeout=60s
--conf spark.dynamicAllocation.cachedExecutorIdleTimeout=36000s
--conf spark.sql.broadcastTimeout=720000
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem
--conf spark.speculation=false
--conf spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=1
--conf spark.sql.inMemoryColumnarStorage.compressed=true
--conf spark.sql.autoBroadcastJoinThreshold=0
--conf spark.sql.parquet.fs.optimized.committer.optimization-enabled=true
--conf spark.sql.parquet.output.committer.class=com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
--conf spark.sql.sources.commitProtocolClass=org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
--conf spark.sql.hive.convertMetastoreParquet=true
--conf spark.rdd.compress=true
--conf spark.shuffle.compress=true
--conf spark.shuffle.spill.compress=true
--conf spark.checkpoint.compress=true
--conf spark.broadcast.compress=true
--conf yarn.nodemanager.vmem-check-enabled=false
--conf yarn.nodemanager.pmem-check-enabled=false
--conf spark.sql.parquet.writeLegacyFormat=false
--conf spark.memory.fraction=0.80
--conf spark.memory.storageFraction=0.50
--conf spark.storage.level=MEMORY_AND_DISK_SER
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--conf spark.hadoop.s3.multipart.committer.conflict-mode=replace
--conf spark.shuffle.consolidateFiles=true
--conf spark.reducer.maxSizeInFlight=96
--conf spark.kryoserializer.buffer.max=1024m
--conf spark.sql.shuffle.partitions=10000
--conf spark.spark.worker.timeout=240
--conf spark.sql.inMemoryColumnarStorage.compressed=true
--conf spark.ui.showConsoleProgress=true
--conf spark.scheduler.listenerbus.eventqueue.capacity=20000

Spark development setup using containers:

  • Using Docker images: Setting up Kubernetes or Docker for SPARK, HDFS, YARN, Hue, Map-Reduce, HIVE and WebHCat development
  • Using Kubernetes and HELM charts:
  • Spark development environment setup using Helm charts using Kubernetes:
  • Install Docker, Kubernetes and Helm in cluster and run following commands:

$ helm repo add bitnami https://charts.bitnami.com/bitnami

$ helm install bitnami/spark

________________________________________

Reference link:

Functional programmer who likes to process state with reactive distributed systems