JobManager & TaskManager¶
Scenarios¶
JobManager and TaskManager are main components of Flink. You can configure the parameters for different security and performance scenarios on the client.
Configuration Description¶
Main configuration items include communication port, memory management, connection retry, and so on.
For versions earlier than MRS 3.x, see Table 1.
Parameter | Mandatory | Default Value | Description |
---|---|---|---|
taskmanager.rpc.port | No | 32326-32390 | IPC port range of TaskManager |
taskmanager.data.port | No | 32391-32455 | Data exchange port range of TaskManager |
taskmanager.data.ssl.enabled | No | false | Whether to enable secure sockets layer (SSL) encryption for data transfer between TaskManagers. This parameter is valid only when the global switch security.ssl is enabled. |
taskmanager.numberOfTaskSlots | No | 3 | Number of slots occupied by TaskManager. Generally, the value is configured as the number of cores of the physical machine. In yarn-session mode, the value can be transmitted by only the -s parameter. In yarn-cluster mode, the value can be transmitted by only the -ys parameter. |
parallelism.default | No | 1 | Number of concurrent job operators. |
taskmanager.memory.size | No | 0 | Amount of heap memory of the Java virtual machine (JVM) that TaskManager reserves for sorting, hash tables, and caching of intermediate results. If unspecified, the memory manager will take a fixed ratio with respect to the size of JVM as specified by taskmanager.memory.fraction. The unit is MB. |
taskmanager.memory.fraction | No | 0.7 | Ratio of JVM heap memory that TaskManager reserves for sorting, hash tables, and caching of intermediate results. |
taskmanager.memory.off-heap | Yes | false | Whether TaskManager uses off-heap memory for sorting, hash tables and intermediate status. You are advised to enable this item for large memory needs to improve memory operation efficiency. |
taskmanager.memory.segment-size | No | 32768 | Size of memory segment on TaskManager. Memory segment is the basic unit of the reserved memory space and is used to configure network buffer stacks. The unit is bytes. |
taskmanager.memory.preallocate | No | false | Whether TaskManager allocates reserved memory space upon startup. You are advised to enable this item when off-heap memory is used. |
taskmanager.registration.initial-backoff | No | 500 ms | Initial interval between two consecutive registration attempts. The unit is ms/s/m/h/d. Note The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively. |
taskmanager.registration.refused-backoff | No | 5 min | Retry interval when a registration connection is rejected by JobManager. |
task.cancellation.interval | No | 30000 | Interval between two successive task cancellation attempts. |
For configuration items for MRS 3.x or later, see Table 2.
Parameter | Description | Default Value | Mandatory |
---|---|---|---|
taskmanager.rpc.port | IPC port range of TaskManager | 32326-32390 | No |
client.rpc.port | Akka system listening port on the Flink client. | 32651-32720 | No |
taskmanager.data.port | Data exchange port range of TaskManager | 32391-32455 | No |
taskmanager.data.ssl.enabled | Whether to enable secure sockets layer (SSL) encryption for data transfer between TaskManagers. This parameter is valid only when the global switch security.ssl is enabled. | false | No |
jobmanager.heap.size | Size of the heap memory of JobManager. In yarn-session mode, the value can be transmitted by only the -jm parameter. In yarn-cluster mode, the value can be transmitted by only the -yjm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. Unit: B/KB/MB/GB/TB. | 1024mb | No |
taskmanager.heap.size | Size of the heap memory of TaskManager. In yarn-session mode, the value can be transmitted by only the -tm parameter. In yarn-cluster mode, the value can be transmitted by only the -ytm parameter. If the value is smaller than yarn.scheduler.minimum-allocation-mb in the Yarn configuration file, the Yarn configuration value is used. The unit is B/KB/MB/GB/TB. | 1024mb | No |
taskmanager.numberOfTaskSlots | Number of slots occupied by TaskManager. Generally, the value is configured as the number of cores of the physical machine. In yarn-session mode, the value can be transmitted by only the -s parameter. In yarn-cluster mode, the value can be transmitted by only the -ys parameter. | 1 | No |
parallelism.default | Default degree of parallelism, which is used for jobs for which the degree of parallelism is not specified | 1 | No |
taskmanager.network.numberOfBuffers | Number of TaskManager network transmission buffer stacks. If an error indicates insufficient system buffer, increase the parameter value. | 2048 | No |
taskmanager.memory.fraction | Ratio of JVM heap memory that TaskManager reserves for sorting, hash tables, and caching of intermediate results. | 0.7 | No |
taskmanager.memory.off-heap | Whether TaskManager uses off-heap memory for sorting, hash tables and intermediate status. You are advised to enable this item for large memory needs to improve memory operation efficiency. | false | Yes |
taskmanager.memory.segment-size | Size of the memory buffer used by the memory manager and network stack The unit is bytes. | 32768 | No |
taskmanager.memory.preallocate | Whether TaskManager allocates reserved memory space upon startup. You are advised to enable this item when off-heap memory is used. | false | No |
taskmanager.debug.memory.startLogThread | Enable this item for debugging Flink memory and garbage collection (GC)-related problems. TaskManager periodically collects memory and GC statistics, including the current utilization of heap and off-heap memory pools and GC time. | false | No |
taskmanager.debug.memory.logIntervalMs | Interval at which TaskManager periodically collects memory and GC statistics. | 0 | No |
taskmanager.maxRegistrationDuration | Maximum duration of TaskManager registration on JobManager. If the actual duration exceeds the value, TaskManager is disabled. | 5 min | No |
taskmanager.initial-registration-pause | Initial interval between two consecutive registration attempts. The value must contain a time unit (ms/s/min/h/d), for example, 5 seconds. | 500ms Note The time value and unit are separated by half-width spaces. ms/s/m/h/d indicates millisecond, second, minute, hour, and day, respectively. | No |
taskmanager.max-registration-pause | Maximum registration retry interval in case of TaskManager registration failures. The unit is ms/s/m/h/d. | 30s | No |
taskmanager.refused-registration-pause | Retry interval when a TaskManager registration connection is rejected by JobManager. The unit is ms/s/m/h/d. | 10s | No |
task.cancellation.interval | Interval between two successive task cancellation attempts. The unit is millisecond. | 30000 | No |
classloader.resolve-order | Class resolution policies defined when classes are loaded from user codes, which means whether to first check the user code JAR file (child-first) or the application class path (parent-first). The default setting indicates that the class is first loaded from the user code JAR file, which means that the user code JAR file can contain and load dependencies that are different from those used by Flink. | child-first | No |
slot.idle.timeout | Timeout for an idle slot in Slot Pool, in milliseconds. | 50000 | No |
slot.request.timeout | Timeout for requesting a slot from Slot Pool, in milliseconds. | 300000 | No |
task.cancellation.timeout | Timeout of task cancellation, in milliseconds. If a task cancellation times out, a fatal TaskManager error may occur. If this parameter is set to 0, no error is reported when a task cancellation times out. | 180000 | No |
taskmanager.network.detailed-metrics | Indicates whether to enable the detailed metrics monitoring of network queue lengths. | false | No |
taskmanager.network.memory.buffers-per-channel | Maximum number of network buffers used by each output/input channel (sub-partition/incoming channel). In credit-based flow control mode, this indicates how much credit is in each input channel. It should be configured with at least 2 buffers to deliver good performance. One buffer is used to receive in-flight data in the sub-partition, and the other for parallel serialization. | 2 | No |
taskmanager.network.memory.floating-buffers-per-gate | Number of extra network buffers used by each output gate (result partition) or input gate, indicating the amount of floating credit shared among all input channels in credit-based flow control mode. Floating buffers are distributed based on the backlog feedback (real-time output buffers in sub-partitions) and can help mitigate back pressure caused by unbalanced data distribution among sub-partitions. Increase this value if the round-trip time between nodes is long and/or the number of machines in the cluster is large. | 8 | No |
taskmanager.network.memory.fraction | Ratio of JVM memory used for network buffers, which determines how many streaming data exchange channels a TaskManager can have at the same time and the extent of channel buffering. Increase this value or the values of taskmanager.network.memory.min and taskmanager.network.memory.max if the job is rejected or a warning indicating that the system does not have enough buffers is received. Note that the values of taskmanager.network.memory.min and taskmanager.network.memory.max may overwrite this value. | 0.1 | No |
taskmanager.network.memory.max | Maximum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB). | 1 GB | No |
taskmanager.network.memory.min | Minimum memory size of the network buffer. The value must contain a unit (B/KB/MB/GB/TB). | 64 MB | No |
taskmanager.network.request-backoff.initial | Minimum backoff for partition requests of input channels. | 100 | No |
taskmanager.network.request-backoff.max | Maximum backoff for partition requests of input channels. | 10000 | No |
taskmanager.registration.timeout | Timeout for TaskManager registration. TaskManager will be terminated if it is not successfully registered within the specified time. The value must contain a time unit (ms/s/min/h/d). | 5 min | No |
resourcemanager.taskmanager-timeout | Timeout interval for releasing an idle TaskManager, in milliseconds. | 30000 | No |