State Backend

Scenarios

Flink enables HA and job exception, as well as job pause and recovery during version upgrade. Flink depends on state backend to store job states and on the restart strategy to restart a job. You can configure state backend and the restart strategy.

Configuration Description

Configuration items include the state backend type, storage path, and restart strategy.

Table 1 Parameters

Parameter

Description

Default Value

Mandatory

state.backend.fs.checkpointdir

Path when the backend is set to filesystem. The path must be accessible by JobManager. Only the local mode is supported. In the cluster mode, use an HDFS path.

hdfs:///flink/checkpoints

No

state.savepoints.dir

Savepoint storage directory used by Flink to restore and update jobs. When a savepoint is triggered, the metadata of the savepoint is saved to this directory.

hdfs:///flink/savepoint

Mandatory in security mode

restart-strategy

Default restart policy, which is used for jobs for which no restart policy is specified. The options are as follows:

  • fixed-delay

  • failure-rate

  • none

none

No

restart-strategy.fixed-delay.attempts

Number of retry times when the fixed-delay restart strategy is used.

  • If the checkpoint is enabled, the default value is the value of Integer.MAX_VALUE.

  • If the checkpoint is disabled, the default value is 3.

No

restart-strategy.fixed-delay.delay

Retry interval when the fixed-delay strategy is used. The unit is ms/s/m/h/d.

  • If the checkpoint is enabled, the default value is 10s.

  • If the checkpoint is disabled, the default value is the value of akka.ask.timeout.

No

restart-strategy.failure-rate.max-failures-per-interval

Maximum number of restart times in a specified period before a job fails when the fault rate policy is used.

1

No

restart-strategy.failure-rate.failure-rate-interval

Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d.

60 s

No

restart-strategy.failure-rate.delay

Retry interval when the failure-rate strategy is used. The unit is ms/s/m/h/d.

The default value is the same as the value of akka.ask.timeout. For details, see Distributed Coordination (via Akka).

No