Common Parameters¶
Overview¶
This section describes common configuration items used in Spark. This section is divided into sub-sections based on features to help you quickly find required configuration items. If you use a MRS cluster, most parameters described in this section have been adapted and you do not need to configure them again. For details about the parameters that need to be configured based on the site requirements, see Configuring Parameters Rapidly.
Configuring the Number of Stage Retries¶
When FetchFailedException occurs in a Spark task, a stage retry is triggered. To prevent infinite stage retries, the number of stage retries is limited. The number of retry times can be adjusted based on the site requirements.
Configure the following parameters in the spark-defaults.conf file on the Spark client.
Parameter | Description | Default Value |
---|---|---|
spark.stage.maxConsecutiveAttempts | Indicates the maximum number of stage retries. | 4 |
Configuring Whether to Use Cartesian Product¶
To enable the Cartesian product function, configure the following parameter in the spark-defaults.conf configuration file of Spark.
Parameter | Description | Default Value |
---|---|---|
spark.sql.crossJoin.enabled | Indicates whether to allow implicit Cartesian product execution.
| true |
Note
For JDBC applications, configure this parameter in the spark-defaults.conf configuration file of the server.
For tasks submitted by the Spark client, configure this parameter in the spark-defaults.conf configuration file of the client.
Configuring Security Authentication for Long-Time Spark Tasks¶
In security mode, if the kinit command is used for security authentication when the Spark CLI (such as spark-shell, spark-sql, or spark-submit) is used, the task fails due to authentication expiration when the task is running for a long time.
Set the following parameters in the spark-defaults.conf configuration file on the client. After the configuration is complete, run the Spark CLI again.
Note
If this parameter is set to true, ensure that the values of keytab and principal in spark-defaults.conf and hive-site.xml are the same.
Parameter | Description | Default Value |
---|---|---|
spark.kerberos.principal | Indicates the principal user who has the Spark operation permission. Contact the system administrator to obtain the principal user. |
|
spark.kerberos.keytab | Indicates the name and path of the keytab file used to configure Spark operation permissions. Contact the system administrator to obtain the keytab file. |
|
spark.security.bigdata.loginOnce | Indicates whether the principal user logs in to the system only once. true: single login; false: multiple logins. The difference between a single login and multiple logins is as follows: The Spark community uses the Kerberos user to log in to the system for multiple times. However, the TGT or token may expire, causing the application to fail to run for a long time. The Kerberos login mode of DataSight is modified to allow users to log in only once, which effectively resolves the expiration problem. The restrictions are as follows: The principal and keytab configuration items of Hive must be the same as those of Spark. Note If this parameter is set to true, ensure that the values of keytab and principal in spark-defaults.conf and hive-site.xml are the same. | true |
Python Spark¶
Python Spark is the third programming language of Spark except Scala and Java. Different from Java and Scala that run on the JVM platform, Python Spark has its own Python process as well as the JVM process. The following configuration items apply only to Python Spark scenarios. However, other configuration items can also take effect in Python Spark scenarios.
Parameter | Description | Default Value |
---|---|---|
spark.python.profile | Indicates whether to enable profiling on the Python worker. Use sc.show_profiles() to display the analysis results or display the analysis results before the Driver exits. You can use sc.dump_profiles(path) to dump the results to a disk. If some analysis results have been manually displayed, they will not be automatically displayed before the driver exits. By default, pyspark.profiler.BasicProfiler is used. You can transfer the specified profiler during SparkContext initialization to overwrite the default profiler. | false |
spark.python.worker.memory | Indicates the memory size that can be used by each Python worker process during aggregation. The value format is the same as that of the specified JVM memory, for example, 512 MB and 2 GB. If the memory used by a process during aggregation exceeds the value of this parameter, data will be written to disks. | 512m |
spark.python.worker.reuse | Indicates whether to reuse Python workers. If the reuse function is enabled, a fixed number of Python workers will be reused by the next batch of submitted tasks instead of forking a Python process for each task. This function is useful in large-scale broadcasting because the data does not need to be transferred from the JVM to the Python workers again for the next batch of submitted tasks. | true |
Dynamic Allocation¶
Dynamic resource scheduling is a unique feature of the On Yarn mode. This function can be used only after Yarn External Shuffle is enabled. When Spark is used as a resident service, dynamic resource scheduling greatly improves resource utilization. For example, the JDBCServer process does not accept JDBC requests in most of the time. Therefore, releasing resources in this period greatly reduces the waste of cluster resources.
Parameter | Description | Default Value |
---|---|---|
spark.dynamicAllocation.enabled | Indicates whether to use dynamic resource scheduling, which is used to adjust the number of executors registered with the application according to scale. Currently, this parameter is valid only in Yarn mode. To enable dynamic resource scheduling, set spark.shuffle.service.enabled to true. Related parameters are as follows: spark.dynamicAllocation.minExecutors, spark.dynamicAllocation.maxExecutors, and spark.dynamicAllocation.initialExecutors. |
|
spark.dynamicAllocation.minExecutors | Indicates the minimum number of executors. | 0 |
spark.dynamicAllocation.initialExecutors | Indicates the number of initial executors. | spark.dynamicAllocation.minExecutors |
spark.dynamicAllocation.maxExecutors | Indicates the maximum number of executors. | 2048 |
spark.dynamicAllocation.schedulerBacklogTimeout | Indicates the first timeout period for scheduling. The unit is second. | 1s |
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout | Indicates the second and later timeout interval for scheduling. | 1s |
spark.dynamicAllocation.executorIdleTimeout | Indicates the idle timeout interval for common executors. The unit is second. | 60 |
spark.dynamicAllocation.cachedExecutorIdleTimeout | Indicates the idle timeout interval for executors with cached blocks. |
|
Spark Streaming¶
Spark Streaming is a streaming data processing function provided by the Spark batch processing platform. It processes data input from external systems in mini-batch mode.
Configure the following parameters in the spark-defaults.conf file on the Spark client.
Parameter | Description | Default Value |
---|---|---|
spark.streaming.receiver.writeAheadLog.enable | Indicates whether to enable the write-ahead log (WAL) function. After this function is enabled, all input data received by the receiver is saved in the WAL. WAL ensures that data can be restored if the driver program becomes faulty. | false |
spark.streaming.unpersist | Determines whether to automatically remove RDDs generated and saved by Spark Streaming from the Spark memory. If this function is enabled, original data received by Spark Streaming is also automatically cleared. If this function is disabled, original data and RDDs cannot be automatically cleared. External applications can access the data in Streaming. This, however, occupies more Spark memory resources. | true |
Spark Streaming Kafka¶
The receiver is an important component of Spark Streaming. It receives external data, encapsulates the data into blocks, and provides the blocks for Streaming to consume. The most common data source is Kafka. Spark Streaming integrates Kafka to ensure reliability and can directly use Kafka as the RDD input.
Parameter | Description | Default Value |
---|---|---|
spark.streaming.kafka.maxRatePerPartition | Indicates the maximum rate (number of records per second) for reading data from each Kafka partition if the Kafka direct stream API is used. |
|
spark.streaming.blockInterval | Indicates the interval (ms) for accumulating data received by a Spark Streaming receiver into a data block before the data is stored in Spark. A minimum value of 50 ms is recommended. | 200ms |
spark.streaming.receiver.maxRate | Indicates the maximum rate (number of records per second) for each receiver to receive data. The value 0 or a negative value indicates no limit to the rate. |
|
spark.streaming.receiver.writeAheadLog.enable | Indicates whether to use ReliableKafkaReceiver. This receiver ensures the integrity of streaming data. | false |
Netty/NIO and Hash/Sort Configuration¶
Shuffle is critical for big data processing, and the network is critical for the entire shuffle process. Currently, Spark supports two shuffle modes: hash and sort. There are two network modes: Netty and NIO.
Parameter | Description | Default Value |
---|---|---|
spark.shuffle.manager | Indicates the data processing mode. There are two implementation modes: sort and hash. The sort shuffle has a higher memory utilization. It is the default option in Spark 1.2 and later versions. | SORT |
spark.shuffle.consolidateFiles | (Only in hash mode) To merge intermediate files created during shuffle, set this parameter to true. Decreasing the number of files to be created can improve the processing performance of the file system and reduce risks. If the ext4 or xfs file system is used, you are advised to set this parameter to true. Due to file system restrictions, this setting on ext3 may reduce the processing performance of a server with more than eight cores. | false |
spark.shuffle.sort.bypassMergeThreshold | This parameter is valid only when spark.shuffle.manager is set to sort. When Map aggregation is not performed and the number of partitions for Reduce tasks is less than or equal to the value of this parameter, do not merge and sort data to prevent performance deterioration caused by unnecessary sorting. | 200 |
spark.shuffle.io.maxRetries | (Only in Netty mode) If this parameter is set to a non-zero value, fetch failures caused by I/O-related exceptions will be automatically retried. This retry logic helps the large shuffle keep stable when long GC pauses or intermittent network disconnections occur. | 12 |
spark.shuffle.io.numConnectionsPerPeer | (Only in Netty mode) Connections between hosts are reused to reduce the number of connections between large clusters. For a cluster with many disks but a few hosts, this function may make concurrent requests unable to occupy all disks. Therefore, you can increase the value of this parameter. | 1 |
spark.shuffle.io.preferDirectBufs | (Only in Netty mode) The off-heap buffer is used to reduce GC during shuffle and cache block transfer. In an environment where off-heap memory is strictly limited, you can disable it to force all applications from Netty to use heap memory. | true |
spark.shuffle.io.retryWait | (Only in Netty mode) Specifies the duration for waiting for fetch retry, in seconds. The maximum delay caused by retry is maxRetries x retryWait. The default value is 15 seconds. | 5 |
Common Shuffle Configuration¶
Parameter | Description | Default Value |
---|---|---|
spark.shuffle.spill | If this parameter is set to true, data is overflowed to the disk to limit the memory usage during a Reduce task. | true |
spark.shuffle.spill.compress | Indicates whether to compress the data overflowed during shuffle. The algorithm specified by spark.io.compression.codec is used for data compression. | true |
spark.shuffle.file.buffer | Specifies the size of the memory buffer for storing output streams of each shuffle file, in KB. These buffers can reduce the number of disk seek and system calls during the creation of intermediate shuffle file streams. You can also set this parameter by setting spark.shuffle.file.buffer.kb. | 32KB |
spark.shuffle.compress | Indicates whether to compress the output files of a Map task. You are advised to compress the broadcast variables. using spark.io.compression.codec. | true |
spark.reducer.maxSizeInFlight | Specifies the maximum output size of the Map task that fetches data from each Reduce task, in MB. Each output requires a buffer, which is the fixed memory overhead of each Reduce task. Therefore, keep the value small unless there is a large amount of memory. You can also set this parameter by setting spark.reducer.maxMbInFlight. | 48MB |
Driver Configuration¶
Spark driver can be considered as the client of Spark applications. All code parsing is completed in this process. Therefore, the parameters of this process are especially important. The following describes how to configure parameters for Spark driver.
JavaOptions: parameter following -D in the Java command, which can be obtained by System.getProperty
ClassPath: path for loading the Java classes and Native library
Java Memory and Cores: memory and CPU usage of the Java process
Spark Configuration: Spark internal parameter, which is irrelevant to the Java process
Parameter | Description | Default Value |
---|---|---|
spark.driver.extraJavaOptions | Indicates a series of extra JVM options passed to the driver, for example, GC setting and logging. Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter. | For details, see Configuring Parameters Rapidly. |
spark.driver.extraClassPath | Indicates the extra class path entries attached to the class path of the driver. Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter. | For details, see Configuring Parameters Rapidly. |
spark.driver.userClassPathFirst | (Trial) Indicates whether to allow JAR files added by users to take precedence over Spark JAR files when classes are loaded in the driver. This feature can be used to mitigate conflicts between Spark dependencies and user dependencies. This feature is in the trial phase and is used only in cluster mode. | false |
spark.driver.extraLibraryPath | Sets a special library path for starting the driver JVM. Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter. |
|
spark.driver.cores | Specifies the number of cores used by the driver process. This parameter is available only in cluster mode. | 1 |
spark.driver.memory | Indicates the memory used by the driver process, that is, the memory used by the SparkContext initialization process (for example, 512 MB and 2 GB). Note: In client mode, this configuration cannot be set directly in the application using SparkConf because the driver JVM has been started. You can use --driver-java-options or the default property file to set the parameter. | 4G |
spark.driver.maxResultSize | Indicates the total size of serialization results of all partitions for each Spark action operation (for example, collect). The value must be at least 1 MB. If this parameter is set to 0, the size is not limited. If the total amount exceeds this limit, the task will be aborted. If the value is too large, the memory of the driver may be insufficient (depending on the object memory overhead of spark.driver.memory and JVM). Set a proper limit to ensure sufficient memory for the driver. | 1G |
spark.driver.host | Specifies the host name or IP address for the driver to listen on, which is used for the driver to communicate with the executor. | (local hostname) |
spark.driver.port | Specifies the port for the driver to listen on, which is used for the driver to communicate with the executor. | (random) |
ExecutorLauncher Configuration¶
ExecutorLauncher exists only in Yarn-client mode. In Yarn-client mode, ExecutorLauncher and the driver are not in the same process. Therefore, you need to configure parameters for ExecutorLauncher.
Parameter | Description | Default Value |
---|---|---|
spark.yarn.am.extraJavaOptions | Indicates a string of extra JVM options to pass to the YARN ApplicationMaster in client mode. Use spark.driver.extraJavaOptions in cluster mode. | For details, see Configuring Parameters Rapidly. |
spark.yarn.am.memory | Indicates the amount of memory to use for the YARN ApplicationMaster in client mode, in the same format as JVM memory strings (for example, 512 MB or 2 GB). In cluster mode, use spark.driver.memory instead. | 1G |
spark.yarn.am.memoryOverhead | This parameter is the same as spark.yarn.driver.memoryOverhead. However, this parameter applies only to ApplicationMaster in client mode. |
|
spark.yarn.am.cores | Indicates the number of cores to use for the YARN ApplicationMaster in client mode. Use spark.driver.cores in cluster mode. | 1 |
Executor Configuration¶
An executor is a Java process. However, unlike the driver and ApplicationMaster, an executor can have multiple processes. Spark supports only same configurations. That is, the process parameters of all executors must be the same.
Parameter | Description | Default Value |
---|---|---|
spark.executor.extraJavaOptions | Indicates extra JVM option passed to the executor, for example, GC setting and logging. Do not set Spark attributes or heap size using this option. Instead, set Spark attributes using the SparkConf object or the spark-defaults.conf file specified when the spark-submit script is called. Set heap size using spark.executor.memory. | For details, see Configuring Parameters Rapidly. |
spark.executor.extraClassPath | Indicates the extra classpath attached to the executor classpath. This parameter ensures compatibility with historical versions of Spark. Generally, you do not need to set this parameter. |
|
spark.executor.extraLibraryPath | Sets the special library path used when the executor JVM is started. | For details, see Configuring Parameters Rapidly. |
spark.executor.userClassPathFirst | (Trial) Same function as spark.driver.userClassPathFirst. However, this parameter applies to executor instances. | false |
spark.executor.memory | Indicates the memory size used by each executor process. Its character sting is in the same format as the JVM memory (example: 512 MB or 2 GB). | 4G |
spark.executorEnv.[EnvironmentVariableName] | Adds the environment variable specified by EnvironmentVariableName to the executor process. You can specify multiple environment variables. |
|
spark.executor.logs.rolling.maxRetainedFiles | Sets the number of latest log files to be retained by the system during rolling. The old log files are deleted. This function is disabled by default. |
|
spark.executor.logs.rolling.size.maxBytes | Sets the maximum size of the executor log file for rolling. This function is disabled by default. The value is in bytes. To automatically clear old logs, see spark.executor.logs.rolling.maxRetainedFiles. |
|
spark.executor.logs.rolling.strategy | Sets the executor log rolling policy. Rolling is disabled by default. The value can be time (time-based rolling) or size (size-based rolling). If this parameter is set to time, the value of the spark.executor.logs.rolling.time.interval attribute is used as the log rolling interval. If this parameter is set to size, spark.executor.logs.rolling.size.maxBytes is used to set the maximum size of the file for rolling. |
|
spark.executor.logs.rolling.time.interval | Sets the time interval for executor log rolling. This function is disabled by default. The value can be daily, hourly, minutely, or any number of seconds. To automatically clear old logs, see spark.executor.logs.rolling.maxRetainedFiles. | daily |
WebUI¶
The Web UI displays the running process and status of the Spark application.
Parameter | Description | Default Value |
---|---|---|
spark.ui.killEnabled | Allows stages and jobs to be stopped on the web UI. Note For security purposes, the default value of this parameter is set to false to prevent misoperations. To enable this function, set this parameter to true in the spark-defaults.conf configuration file. Exercise caution when performing this operation. | true |
spark.ui.port | Specifies the port for your application's dashboard, which displays memory and workload data. |
|
spark.ui.retainedJobs | Specifies the number of jobs recorded by the Spark UI and status API before GC. | 1000 |
spark.ui.retainedStages | Specifies the number of stages recorded by the Spark UI and status API before GC. | 1000 |
HistoryServer¶
A History Server reads the EventLog file in the file system and displays the running status of the Spark application.
Parameter | Description | Default Value |
---|---|---|
spark.history.fs.logDirectory | Specifies the log directory of a History Server. |
|
spark.history.ui.port | Specifies the port for JobHistory listening to connection. | 18080 |
spark.history.fs.updateInterval | Specifies the update interval of the information displayed on a History Server, in seconds. Each update checks for changes made to the event logs in the persistent store. | 10s |
spark.history.fs.update.interval.seconds | Specifies the interval for checking the update of each event log. This parameter has the same function as spark.history.fs.updateInterval. spark.history.fs.updateInterval is recommended. | 10s |
spark.history.updateInterval | This parameter has the same function as spark.history.fs.update.interval.seconds and spark.history.fs.updateInterval. spark.history.fs.updateInterval is recommended. | 10s |
History Server UI Timeout and Maximum Number of Access Times¶
Parameter | Description | Default Value |
---|---|---|
spark.session.maxAge | Specifies the session timeout interval, in seconds. This parameter applies only to the security mode. This parameter cannot be set in normal mode. | 600 |
spark.connection.maxRequest | Specifies the maximum number of concurrent client access requests to JobHistory. | 5000 |
EventLog¶
During the running of Spark applications, the running status is written into the file system in JSON format in real time for the History Server service to read and reproduce the application running status.
Parameter | Description | Default Value |
---|---|---|
spark.eventLog.enabled | Indicates whether to log Spark events, which are used to reconstruct the web UI after the application execution is complete. | true |
spark.eventLog.dir | Indicates the directory for logging Spark events if spark.eventLog.enabled is set to true. In this directory, Spark creates a subdirectory for each application and logs events of the application in the subdirectory. You can also set a unified address similar to the HDFS directory so that the History Server can read historical files. | hdfs://hacluster/spark2xJobHistory2x |
spark.eventLog.compress | Indicates whether to compress logged events when spark.eventLog.enabled is set to true. | false |
Periodic Clearing of Event Logs¶
Event logs on JobHistory increases with submitted tasks. Too many event log files exist as the number of submitted tasks increases. Spark provides the function for periodically clearing event logs. You can enable this function and set the clearing interval using related parameters.
Parameter | Description | Default Value |
---|---|---|
spark.history.fs.cleaner.enabled | Indicates whether to enable the clearing function. | true |
spark.history.fs.cleaner.interval | Indicates the check interval of the clearing function. | 1d |
spark.history.fs.cleaner.maxAge | Indicates the maximum duration for storing logs. | 4d |
Kryo¶
Kryo is a highly efficient Java serialization framework, which is integrated into Spark by default. Almost all Spark performance tuning requires the process of converting the default serializer of Spark into a Kryo serializer. Kryo serialization supports only serialization at the Spark data layer. To configure Kryo serialization, set spark.serializer to org.apache.spark.serializer.KryoSerializer and configure the following parameters to optimize Kryo serialization performance:
Parameter | Description | Default Value |
---|---|---|
spark.kryo.classesToRegister | Specifies the name of the class that needs to be registered with Kryo when Kryo serialization is used. Multiple classes are separated by commas (,). |
|
spark.kryo.referenceTracking | Indicates whether to trace the references to the same object when Kryo is used to serialize data. This function is applicable to the scenario where the object graph has circular references or the same object has multiple copies. Otherwise, you can disable this function to improve performance. | true |
spark.kryo.registrationRequired | Indicates whether Kryo is used to register an object. When this parameter is set to true, an exception is thrown if an object that is not registered with Kryo is serialized. When it is set to false (default value), Kryo writes unregistered class names to the serialized object. This operation causes a large amount of performance overhead. Therefore, you need to enable this option before deleting a class from the registration queue. | false |
spark.kryo.registrator | If Kryo serialization is used, use Kryo to register the class with the custom class. Use this property if you need to register a class in a custom way, such as specifying a custom field serializer. Otherwise, use spark.kryo.classesToRegister, which is simpler. Set this parameter to a class that extends KryoRegistrator. |
|
spark.kryoserializer.buffer.max | Specifies the maximum size of the Kryo serialization buffer, in MB. The value must be greater than the object that attempts to be serialized. If the error "buffer limit exceeded" occurs in Kryo, increase the value of this parameter. You can also set this parameter by setting spark.kryoserializer.buffer.max. | 64MB |
spark.kryoserializer.buffer | Specifies the initial size of the Kryo serialization buffer, in MB. Each core of each worker has a buffer. If necessary, the buffer size will be increased to the value of spark.kryoserializer.buffer.max. You can also set this parameter by setting spark.kryoserializer.buffer. | 64KB |
Broadcast¶
Broadcast is used to transmit data blocks between Spark processes. In Spark, broadcast can be used for JAR packages, files, closures, and returned results. Broadcast supports two modes: Torrent and HTTP. The Torrent mode divides data into small fragments and distributes them to clusters. Data can be obtained remotely if necessary. The HTTP mode saves files to the local disk and transfers the entire files to the remote end through HTTP if necessary. The former is more stable than the latter. Therefore, Torrent is the default broadcast mode.
Parameter | Description | Default Value |
---|---|---|
spark.broadcast.factory | Indicates the broadcast mode. | org.apache.spark.broadcast.TorrentBroadcastFactory |
spark.broadcast.blockSize | Indicates the block size of TorrentBroadcastFactory. If the value is too large, the concurrency during broadcast is reduced (the speed is slow). If the value is too small, BlockManager performance may be affected. | 4096 |
spark.broadcast.compress | Indicates whether to compress broadcast variables before sending them. You are advised to compress the broadcast variables. | true |
Storage¶
Spark features in-memory computing. Spark Storage is used to manage memory resources. Storage stores data blocks generated during RDD caching. The heap memory in the JVM acts as a whole. Therefore, Storage Memory Size is an important concept during Spark Storage management.
Parameter | Description | Default Value |
---|---|---|
spark.storage.memoryMapThreshold | Specifies the block size. If the size of a block exceeds the value of this parameter, Spark performs memory mapping for the disk file. This prevents Spark from mapping too small blocks during memory mapping. Generally, memory mapping for blocks whose page size is close to or less than that of the operating system has high overhead. | 2m |
PORT¶
Parameter | Description | Default Value |
---|---|---|
spark.ui.port | Specifies the port for your application's dashboard, which displays memory and workload data. |
|
spark.blockManager.port | Specifies all ports listened by BlockManager. These ports are on both the driver and executor. | |
spark.driver.port | Specifies the port for the driver to listen on, which is used for the driver to communicate with the executor. |
Range of Random Ports¶
All random ports must be within a certain range.
Parameter | Description | Default Value |
---|---|---|
spark.random.port.min | Sets the minimum random port. | 22600 |
spark.random.port.max | Sets the maximum random port. | 22899 |
TIMEOUT¶
By default, computation tasks that can well process medium-scale data are configured in Spark. However, if the data volume is too large, the tasks may fail due to timeout. In the scenario with a large amount of data, the timeout parameter in Spark needs to be assigned a larger value.
Parameter | Description | Default Value |
---|---|---|
spark.files.fetchTimeout | Specifies the communication timeout (in seconds) when fetching files added using SparkContext.addFile() of the driver. | 60s |
spark.network.timeout | Specifies the default timeout for all network interactions, in seconds. You can use this parameter to replace spark.core.connection.ack.wait.timeout, spark.akka.timeout, spark.storage.blockManagerSlaveTimeoutMs, or spark.shuffle.io.connectionTimeout. | 360s |
spark.core.connection.ack.wait.timeout | Specifies the timeout for a connection to wait for a response, in seconds. To avoid long-time waiting caused by GC, you can set this parameter to a larger value. | 60 |
Encryption¶
Spark supports SSL for Akka and HTTP (for the broadcast and file server) protocols, but does not support SSL for the web UI and block transfer service.
SSL must be configured on each node and configured for each component involved in communication using a particular protocol.
Parameter | Description | Default Value |
---|---|---|
spark.ssl.enabled | Indicates whether to enable SSL connections for all supported protocols. All SSL settings similar to spark.ssl.xxx indicate the global configuration of all supported protocols. To override the global configuration of a particular protocol, you must override the property in the namespace specified by the protocol. Use spark.ssl.YYY.XXX to overwrite the global configuration of the particular protocol specified by YYY. YYY can be either akka for Akka-based connections or fs for the broadcast and file server. | false |
spark.ssl.enabledAlgorithms | Indicates the comma-separated list of passwords. The specified passwords must be supported by the JVM. |
|
spark.ssl.keyPassword | Specifies the password of a private key in the keystore. |
|
spark.ssl.keyStore | Specifies the path of the keystore file. The path can be absolute or relative to the directory where the component is started. |
|
spark.ssl.keyStorePassword | Specifies the password of the keystore. |
|
spark.ssl.protocol | Specifies the protocol name. This protocol must be supported by the JVM. The reference list of protocols is available on this page. |
|
spark.ssl.trustStore | Specifies the path of the truststore file. The path can be absolute or relative to the directory where the component is started. |
|
spark.ssl.trustStorePassword | Specifies the password of the truststore. |
|
Security¶
Spark supports shared key-based authentication. You can use spark.authenticate to configure authentication. This parameter controls whether the Spark communication protocol uses the shared key for authentication. This authentication is a basic handshake that ensures that both sides have the same shared key and are allowed to communicate. If the shared keys are different, the communication is not allowed. You can create shared keys as follows:
For Spark on Yarn deployments, set spark.authenticate to true. Then, shared keys are automatically generated and distributed. Each application exclusively occupies a shared key.
For other types of Spark deployments, configure Spark parameter spark.authenticate.secret on each node. All masters, workers, and applications use this key.
Parameter | Description | Default Value |
---|---|---|
spark.acls.enable | Indicates whether to enable Spark ACLs. If Spark ACLs are enabled, the system checks whether the user has the permission to access and modify jobs. Note that this requires the user to be identifiable. If the user is identified as invalid, the check will not be performed. Filters can be used to verify and set users on the UI. | true |
spark.admin.acls | Specifies the comma-separated list of users/spark administrators that have the permissions to view and modify all Spark jobs. This list can be used if you are running on a shared cluster and working with the help of an spark administrator or developer. | admin |
spark.authenticate | Indicates whether Spark authenticates its internal connections. If the application is not running on Yarn, see spark.authenticate.secret. | true |
spark.authenticate.secret | Sets the key for authentication between Spark components. This parameter must be set if Spark does not run on Yarn and authentication is disabled. |
|
spark.modify.acls | Specifies the comma-separated list of users who have the permission to modify Spark jobs. By default, only users who have enabled Spark jobs have the permission to modify the list (for example, delete the list). |
|
spark.ui.view.acls | Specifies the comma-separated list of users who have the permission to access the Spark web UI. By default, only users who have enabled Spark jobs have the access permission. |
|
Enabling the Authentication Mechanism Between Spark Processes¶
Spark processes support shared key-based authentication. You can configure spark.authenticate to control whether Spark performs authentication during communication. In this authentication mode, the two communication parties share the same key only using simple handshakes.
Configure the following parameters in the spark-defaults.conf file on the Spark client.
Parameter | Description | Default Value |
---|---|---|
spark.authenticate | For Spark on Yarn deployments, set this parameter to true. Then, keys are automatically generated and distributed, and each application uses a unique key. | true |
Compression¶
Data compression is policy that optimizes memory usage at the expense of CPU. Therefore, when the Spark memory is severely insufficient (this issue is common due to the characteristics of in-memory computing), data compression can greatly improve performance. Spark supports three types of compression algorithm: Snappy, LZ4, and LZF. Snappy is the default compression algorithm and invokes the native method to compress and decompress data. In Yarn mode, pay attention to the impact of non-heap memory on the container process.
Parameter | Description | Default Value |
---|---|---|
spark.io.compression.codec | Indicates the codec for compressing internal data, such as RDD partitions, broadcast variables, and shuffle output. By default, Spark supports three types of compression algorithm: LZ4, LZF, and Snappy. You can specify algorithms using fully qualified class names, such as org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, and org.apache.spark.io.SnappyCompressionCodec. | lz4 |
spark.io.compression.lz4.block.size | Indicates the block size (bytes) used in LZ4 compression when the LZ4 compression algorithm is used. When LZ4 is used, reducing the block size also reduces the shuffle memory usage. | 32768 |
spark.io.compression.snappy.block.size | Indicates the block size (bytes) used in Snappy compression when the Snappy compression algorithm is used. When Snappy is used, reducing the block size also reduces the shuffle memory usage. | 32768 |
spark.shuffle.compress | Indicates whether to compress the output files of a Map task. You are advised to compress the broadcast variables. using spark.io.compression.codec. | true |
spark.shuffle.spill.compress | Indicates whether to compress the data overflowed during shuffle using spark.io.compression.codec. | true |
spark.eventLog.compress | Indicates whether to compress logged events when spark.eventLog.enabled is set to true. | false |
spark.broadcast.compress | Indicates whether to compress broadcast variables before sending them. You are advised to compress the broadcast variables. | true |
spark.rdd.compress | Indicates whether to compress serialized RDD partitions (for example, the StorageLevel.MEMORY_ONLY_SER partition). Substantial space can be saved at the cost of some extra CPU time. | false |
Reducing the Probability of Abnormal Client Application Operations When Resources Are Insufficient¶
When resources are insufficient, ApplicationMaster tasks must wait and will not be processed until enough resources are available for use. If the actual waiting time exceeds the configured waiting time, the ApplicationMaster tasks will be deleted. Adjust the following parameters to reduce the probability of abnormal client application operation.
Configure the following parameters in the spark-defaults.conf file on the client.
Parameter | Description | Default Value |
---|---|---|
spark.yarn.applicationMaster.waitTries | Specifies the number of the times that ApplicationMaster waits for Spark master, which is also the times that ApplicationMaster waits for SparkContext initialization. Enlarge this parameter value to prevent ApplicationMaster tasks from being deleted and reduce the probability of abnormal client application operations. | 10 |
spark.yarn.am.memory | Specifies the ApplicationMaster memory. Enlarge this parameter value to prevent ApplicationMaster tasks from being deleted by ResourceManager due to insufficient memory and reduce the probability of abnormal client application operations. | 1G |