Distributed Coordination (via Akka)¶
Scenarios¶
The Akka actor model is the basis of communications between the Flink client and JobManager, JobManager and TaskManager, as well as TaskManager and TaskManager. Flink enables you to configure the Akka connection parameters in the flink-conf.yaml file based on the network environment or optimization policy.
Configuration Description¶
You can configure timeout settings of message sending and waiting, and the Akka listening mechanism Deathwatch.
For versions earlier than MRS 3.x, see Table 1.
Parameter | Mandatory | Default Value | Description |
---|---|---|---|
akka.ask.timeout | No | 10 s | Timeout duration of Akka asynchronous and block requests. If a Flink timeout failure occurs, this value can be increased. Timeout occurs when the machine processing speed is slow or the network is blocked. The unit is ms/s/m/h/d. |
akka.lookup.timeout | No | 10 s | Timeout duration for JobManager actor object searching. The unit is ms/s/m/h/d. |
akka.framesize | No | 10485760b | Maximum size of the message transmitted between JobManager and TaskManager. If a Flink error occurs because the message exceeds this limit, the value can be increased. The unit is b/B/KB/MB. |
akka.watch.heartbeat.interval | No | 10 s | Heartbeat interval at which the Akka DeathWatch mechanism detects disconnected TaskManager. If TaskManager is frequently and incorrectly marked as disconnected due to heartbeat loss or delay, the value can be increased. The unit is ms/s/m/h/d. |
akka.watch.heartbeat.pause | No | 60 s | Acceptable heartbeat pause for Akka DeathWatch mechanism. A small value indicates that irregular heartbeat is not accepted. The unit is ms/s/m/h/d. |
akka.watch.threshold | No | 12 | DeathWatch failure detection threshold. A small value is prone to mark normal TaskManager as failed and a large value increases failure detection time. |
akka.tcp.timeout | No | 20 s | Timeout duration of Transmission Control Protocol (TCP) connection request. If TaskManager connection timeout occurs frequently due to the network congestion, the value can be increased. The unit is ms/s/m/h/d. |
akka.throughput | No | 15 | Number of messages processed by Akka in batches. After an operation, the processing thread is returned to the thread pool. A small value indicates the fair scheduling for actor message processing. A large value indicates improved overall performance but lowered scheduling fairness. |
akka.log.lifecycle.events | No | false | Switch of Akka remote time logging, which can be enabled for debugging. |
akka.startup-timeout | No | The default value is the same as the value of akka.ask.timeout. | Timeout duration of remote component started by Akka. The unit is ms/s/m/h/d. |
akka.ssl.enabled | Yes | true | Switch of Akka communication SSL. This parameter is valid only when the global switch security.ssl is enabled. |
For configuration items for MRS 3.x or later, see Table 2.
Parameter | Description | Default Value | Mandatory |
---|---|---|---|
akka.ask.timeout | Timeout duration of Akka asynchronous and block requests. If a Flink timeout failure occurs, this value can be increased. Timeout occurs when the machine processing speed is slow or the network is blocked. The unit is ms/s/m/h/d. | 10s | No |
akka.lookup.timeout | Timeout duration for JobManager actor object searching. The unit is ms/s/m/h/d. | 10s | No |
akka.framesize | Maximum size of the message transmitted between JobManager and TaskManager. If a Flink error occurs because the message exceeds this limit, the value can be increased. The unit is b/B/KB/MB. | 10485760b | No |
akka.watch.heartbeat.interval | Heartbeat interval at which the Akka DeathWatch mechanism detects disconnected TaskManager. If TaskManager is frequently and incorrectly marked as disconnected due to heartbeat loss or delay, the value can be increased. The unit is ms/s/m/h/d. | 10s | No |
akka.watch.heartbeat.pause | Acceptable heartbeat pause for Akka DeathWatch mechanism. A small value indicates that irregular heartbeat is not accepted. The unit is ms/s/m/h/d. | 60s | No |
akka.watch.threshold | DeathWatch failure detection threshold. A small value may mark normal TaskManager as failed and a large value increases failure detection time. | 12 | No |
akka.tcp.timeout | Timeout duration of Transmission Control Protocol (TCP) connection request. If TaskManager connection timeout occurs frequently due to the network congestion, the value can be increased. The unit is ms/s/m/h/d. | 20s | No |
akka.throughput | Number of messages processed by Akka in batches. After an operation, the processing thread is returned to the thread pool. A small value indicates the fair scheduling for actor message processing. A large value indicates improved overall performance but lowered scheduling fairness. | 15 | No |
akka.log.lifecycle.events | Switch of Akka remote time logging, which can be enabled for debugging. | false | No |
akka.startup-timeout | Timeout interval before a remote component fails to be started. The value must contain a time unit (ms/s/min/h/d). | The default value is the same as the value of akka.ask.timeout. | No |
akka.ssl.enabled | Switch of Akka communication SSL. This parameter is valid only when the global switch security.ssl is enabled. | true | Yes |
akka.client-socket-worker-pool.pool-size-factor | Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values. | 1.0 | No |
akka.client-socket-worker-pool.pool-size-max | Maximum number of threads calculated based on the factor. | 2 | No |
akka.client-socket-worker-pool.pool-size-min | Minimum number of threads calculated based on the factor. | 1 | No |
akka.client.timeout | Timeout duration of the client. The value must contain a time unit (ms/s/min/h/d). | 60s | No |
akka.server-socket-worker-pool.pool-size-factor | Factor that is used to determine the thread pool size. The pool size is calculated based on the following formula: ceil (available processors * factor). The size is bounded by the pool-size-min and pool-size-max values. | 1.0 | No |
akka.server-socket-worker-pool.pool-size-max | Maximum number of threads calculated based on the factor. | 2 | No |
akka.server-socket-worker-pool.pool-size-min | Minimum number of threads calculated based on the factor. | 1 | No |