Common Flink Shell Commands¶
This section applies to MRS 3.x or later clusters.
Before running the Flink shell commands, perform the following steps:
Install the Flink client in a directory, for example, /opt/client.
Run the following command to initialize environment variables:
source /opt/client/bigdata_env
If Kerberos authentication is enabled for the current cluster, run the following command to authenticate the user. If Kerberos authentication is disabled, skip this step.
kinit Service user
Run the related commands according to Table 1.
¶ Command
Description
Description
yarn-session.sh
-at,--applicationType <arg>: Defines the Yarn application type.
-D <property=value>: Configures dynamic parameter.
-d,--detached: Disables the interactive mode and starts a separate Flink Yarn session.
-h,--help: Displays the help information about the Yarn session CLI.
-id,--applicationId <arg>: Binds to a running Yarn session.
-j,--jar <arg>: Sets the path of the user's JAR file.
-jm,--jobManagerMemory <arg>: Sets the JobManager memory.
-m,--jobmanager <arg>: Address of the JobManager (master) to which to connect. Use this parameter to connect to a specified JobManager.
-nl,--nodeLabel <arg>: Specifies the nodeLabel of the Yarn application.
-nm,--name <arg>: Customizes a name for the application on Yarn.
-q,--query: Queries available Yarn resources.
-qu,--queue <arg>: Specifies a Yarn queue.
-s,--slots <arg>: Sets the number of slots for each TaskManager.
-t,--ship <arg>: specifies the directory of the file to be sent.
-tm,--taskManagerMemory <arg>: sets the TaskManager memory.
-yd,--yarndetached: starts Yarn in the detached mode.
-z,--zookeeperNamespace <args>: specifies the namespace of ZooKeeper.
-h: Gets help information.
Start a resident Flink cluster to receive tasks from the Flink client.
flink run
-c,--class <classname>: Specifies a class as the entry for running programs.
-C,--classpath <url>: Specifies classpath.
-d,--detached: Runs a job in the detached mode.
-files,--dependencyFiles <arg>: File on which the Flink program depends.
-n,--allowNonRestoredState: A state that cannot be restored can be skipped during restoration from a snapshot point in time. For example, if an operator in the program is deleted, you need to add this parameter when restoring the snapshot point.
-m,--jobmanager <host:port>: Specifies the JobManager.
-p,--parallelism <parallelism>: Specifies the job DOP, which will overwrite the DOP parameter in the configuration file.
-q,--sysoutLogging: Disables the function of outputting Flink logs to the console.
-s,--fromSavepoint <savepointPath>: Specifies a savepoint path for recovering jobs.
-z,--zookeeperNamespace <zookeeperNamespace>: specifies the namespace of ZooKeeper.
-yat,--yarnapplicationType <arg>: Defines the Yarn application type.
-yD <arg>: Dynamic parameter configuration.
-yd,--yarndetached: Starts Yarn in the detached mode.
-yh,--yarnhelp: Obtains the Yarn help.
-yid,--yarnapplicationId <arg>: Binds a job to a Yarn session.
-yj,--yarnjar <arg>: Sets the path to Flink jar file.
-yjm,--yarnjobManagerMemory <arg>: Sets the JobManager memory (MB).
-ynm,--yarnname <arg>: Customizes a name for the application on Yarn.
-yq,--yarnquery: Queries available Yarn resources (memory and CPUs).
-yqu,--yarnqueue <arg>: Specifies a Yarn queue.
-ys,--yarnslots: Sets the number of slots for each TaskManager.
-yt,--yarnship <arg>: Specifies the path of the file to be sent.
-ytm,--yarntaskManagerMemory <arg>: Sets the TaskManager memory (MB).
-yz,--yarnzookeeperNamespace <arg>: Specifies the namespace of ZooKeeper. The value must be the same as the value of yarn-session.sh -z.
-h: Gets help information.
Submit a Flink job.
The -y* parameter is used in the yarn-cluster mode.
If the parameter is not -y*, you need to run the yarn-session command to start the Flink cluster before running this command to submit a task.
flink info
-c,--class <classname>: Specifies a class as the entry for running programs.
-p,--parallelism <parallelism>: Specifies the DOP for running programs.
-h: Gets help information.
Display the execution plan (JSON) of the running program.
flink list
-a,--all: displays all jobs.
-m,--jobmanager <host:port>: specifies the JobManager.
-r,--running: displays only jobs in the running state.
-s,--scheduled: displays only jobs in the scheduled state.
-z,--zookeeperNamespace <zookeeperNamespace>: specifies the namespace of ZooKeeper.
-yid,--yarnapplicationId <arg>: binds a job to a Yarn session.
-h: gets help information.
Query running programs in the cluster.
flink stop
-d,--drain: sends MAX_WATERMARK before the savepoint is triggered and the job is stopped.
-p,--savepointPath <savepointPath>: path for storing savepoints. The default value is state.savepoints.dir.
-m,--jobmanager <host:port>: specifies the JobManager.
-z,--zookeeperNamespace <zookeeperNamespace>: specifies the namespace of ZooKeeper.
-yid,--yarnapplicationId <arg>: binds a job to a Yarn session.
-h: gets help information.
Forcibly stop a running job (only streaming jobs are supported. StoppableFunction needs to be implemented on the source side in service code).
flink cancel
-m,--jobmanager <host:port>: specifies the JobManager.
-s,--withSavepoint <targetDirectory>: triggers a savepoint when a job is canceled. The default directory is state.savepoints.dir.
-z,--zookeeperNamespace <zookeeperNamespace>: specifies the namespace of ZooKeeper.
-yid,--yarnapplicationId <arg>: binds a job to a Yarn session.
-h: gets help information.
Cancel a running job.
flink savepoint
-d,--dispose <arg>: specifies a directory for storing the savepoint.
-m,--jobmanager <host:port>: specifies the JobManager.
-z,--zookeeperNamespace <zookeeperNamespace>: specifies the namespace of ZooKeeper.
-yid,--yarnapplicationId <arg>: binds a job to a Yarn session.
-h: gets help information.
Trigger a savepoint.
source Client installation directory/bigdata_env
None
Import client environment variables.
Restriction: If the user uses a custom script (for example, A.sh) and runs this command in the script, variables cannot be imported to the A.sh script. If variables need to be imported to the custom script A.sh, the user needs to use the secondary calling method.
For example, first call the B.sh script in the A.sh script, and then run this command in the B.sh script. Parameters can be imported to the A.sh script but cannot be imported to the B.sh script.
start-scala-shell.sh
local | remote <host> <port> | yarn: running mode
Start the scala shell.
sh generate_keystore.sh
-
Run the generate_keystore.sh script to generate security cookie, flink.keystore, and flink.truststore. You need to enter a user-defined password that does not contain number signs (#).