Authentication and Encryption

Security Authentication

Flink uses the following three authentication modes:

  • Kerberos authentication: It is used between the Flink Yarn client and Yarn ResourceManager, JobManager and ZooKeeper, JobManager and HDFS, TaskManager and HDFS, Kafka and TaskManager, as well as TaskManager and ZooKeeper.

  • Security cookie authentication: Security cookie authentication is used between Flink Yarn client and JobManager, JobManager and TaskManager, as well as TaskManager and TaskManager.

  • Internal authentication of Yarn: The Internal authentication mechanism of Yarn is used between Yarn ResourceManager and ApplicationMaster (AM).

    Note

    • Flink JobManager and Yarn ApplicationMaster are in the same process.

    • If Kerberos authentication is enabled for the user's cluster, Kerberos authentication is required.

    • For versions earlier than MRS 3.x, Flink does not support security cookie authentication.

    Table 1 Authentication modes

    Authentication Mode

    Description

    Configuration Method

    Kerberos authentication

    Currently, only keytab authentication mode is supported.

    1. Download the user keytab from the KDC server, and place the keytab to a directory on the host of the Flink client.

    2. Configure the following parameters in the flink-conf.yaml file:

      1. Keytab path

        security.kerberos.login.keytab: /home/flinkuser/keytab/abc222.keytab
        

        Note:

        /home/flinkuser/keytab/abc222.keytab indicates the user directory.

      2. Principal name

        security.kerberos.login.principal: abc222
        
      3. In HA mode, if ZooKeeper is configured, the Kerberos authentication configuration items must be configured as follows:

        zookeeper.sasl.disable: false
        security.kerberos.login.contexts: Client
        
      4. If you want to perform Kerberos authentication between Kafka client and Kafka broker, set the value as follows:

        security.kerberos.login.contexts: Client,KafkaClient
        

    Security cookie authentication

    -

    1. In the bin directory of the Flink client, run the generate_keystore.sh script to generate security cookie, flink.keystore, and flink.truststore.

      Run the sh generate_keystore.sh command and enter the user-defined password. The password cannot contain #.

      Note

      After the script is executed, the flink.keystore and flink.truststore files are generated in the conf directory on the Flink client. In the flink-conf.yaml file, default values are specified for following parameters:

      • Set security.ssl.keystore to the absolute path of the flink.keystore file.

      • Set security.ssl.truststore to the absolute path of the flink.truststore file.

      • Set security.cookie to a random password automatically generated by the generate_keystore.sh script.

      • By default, security.ssl.encrypt.enabled: false is set in the flink-conf.yaml file by default. The generate_keystore.sh script sets security.ssl.key-password, security.ssl.keystore-password, and security.ssl.truststore-password to the password entered when the generate_keystore.sh script is called.

      • For MRS 3.x or later, if ciphertext is required and security.ssl.encrypt.enabled is set to true in the flink-conf.yaml file, the generate_keystore.sh script does not set security.ssl.key-password, security.ssl.keystore-password, and security.ssl.truststore-password. To obtain the values, use the Manager plaintext encryption API by running curl -k -i -u Username:Password -X POST -HContent-type:application/json -d '{"plainText":"Password"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt'.

        In the preceding command, Username:Password indicates the user name and password for logging in to the system. The password of "plainText" indicates the one used to call the generate_keystore.sh script. x.x.x.x indicates the floating IP address of Manager.

    2. Set security.enable: true in the flink-conf.yaml file and check whether security cookie is configured successfully. Example:

      security.cookie: ae70acc9-9795-4c48-ad35-8b5adc8071744f605d1d-2726-432e-88ae-dd39bfec40a9
      

    Internal authentication of Yarn

    This authentication mode does not need to be configured by the user.

    -

    Note

    One Flink cluster supports only one user. One user can create multiple Flink clusters.

Encrypted Transmission

Flink uses following encrypted transmission modes:

  • Encrypted transmission inside Yarn: It is used between the Flink Yarn client and Yarn ResourceManager, as well as Yarn ResourceManager and JobManager.

  • SSL transmission: SSL transmission is used between Flink Yarn client and JobManager, JobManager and TaskManager, as well as TaskManager and TaskManager.

  • Encrypted transmission inside Hadoop: The internal encrypted transmission mode of Hadoop used between JobManager and HDFS, TaskManager and HDFS, JobManager and ZooKeeper, as well as TaskManager and ZooKeeper.

Note

Configuration about SSL encrypted transmission is mandatory while configuration about encryption of Yarn and Hadoop is not required.

To configure SSL encrypted transmission, configure the following parameters in the flink-conf.yaml file on the client:

  1. Enable SSL and configure the SSL encryption algorithm. For MRS 3.x or later, see Table 2. Modify the parameters as required.

    Table 2 Parameter description

    Parameter

    Example Value

    Description

    security.ssl.enabled

    true

    Enable SSL.

    akka.ssl.enabled

    true

    Enable Akka SSL.

    blob.service.ssl.enabled

    true

    Enable SSL for the Blob channel.

    taskmanager.data.ssl.enabled

    true

    Enable SSL transmissions between TaskManagers.

    security.ssl.algorithms

    TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

    Configure the SSL encryption algorithm.

    For versions earlier than MRS 3.x, see Table 3.

    Table 3 Parameter description

    Parameter

    Example Value

    Description

    security.ssl.internal.enabled

    true

    Enable internal SSL.

    akka.ssl.enabled

    true

    Enable Akka SSL.

    blob.service.ssl.enabled

    true

    Enable SSL for the Blob channel.

    taskmanager.data.ssl.enabled

    true

    Enable SSL transmissions between TaskManagers.

    security.ssl.algorithms

    TLS_RSA_WITH_AES128CBC_SHA256

    Configure the SSL encryption algorithm.

    For versions earlier than MRS 3.x, the following parameters in Table 4 do not exist in the default Flink configuration of MRS. If you want to enable SSL for external connections, add the following parameters. After SSL for external connection is enabled, the native Flink page cannot be accessed using a Yarn proxy, because the Yarn open-source version cannot process HTTPS requests using a proxy. However, you can create a Windows VM in the same VPC of the cluster and access the native Flink page from the VM.

    Table 4 Parameter description

    Parameter

    Example Value

    Description

    security.ssl.rest.enabled

    true

    Enable external SSL. If this parameter is set to true, set the related parameters by referring to Table 4.

    security.ssl.rest.keystore

    ${path}/flink.keystore

    Path for storing the keystore.

    security.ssl.rest.keystore-password

    -

    A user-defined password of keystore.

    security.ssl.rest.key-password

    -

    A user-defined password of the SSL key.

    security.ssl.rest.truststore

    ${path}/flink.truststore

    Path for storing the truststore.

    security.ssl.rest.truststore-password

    -

    A user-defined password of truststore.

    Note

    Enabling SSL for data transmission between TaskManagers may pose great impact on the system performance.

  2. In the bin directory of the Flink client, run the sh generate_keystore.sh <password> command. For details, see Authentication and Encryption. The configuration items in Table 5 are set by default for MRS 3.x or later. You can also configure them manually.

    Table 5 Parameter description

    Parameter

    Example Value

    Description

    security.ssl.keystore

    ${path}/flink.keystore

    Path for storing the keystore. flink.keystore indicates the name of the keystore file generated by the generate_keystore.sh* tool.

    security.ssl.keystore-password

    -

    A user-defined password of keystore.

    security.ssl.key-password

    -

    A user-defined password of the SSL key.

    security.ssl.truststore

    ${path}/flink.truststore

    Path for storing the truststore. flink.truststore indicates the name of the truststore file generated by the generate_keystore.sh* tool.

    security.ssl.truststore-password

    -

    A user-defined password of truststore.

    For versions earlier than MRS 3.x, the generate_keystore.sh command is generated automatically, and the configuration items in Table 6 are set by default. You can also configure them manually.

    Table 6 Parameter description

    Parameter

    Example Value

    Description

    security.ssl.internal.keystore

    ${path}/flink.keystore

    Path for storing the keystore. flink.keystore indicates the name of the keystore file generated by the generate_keystore.sh* tool.

    security.ssl.internal.keystore-password

    -

    A user-defined password of keystore.

    security.ssl.internal.key-password

    -

    A user-defined password of the SSL key.

    security.ssl.internal.truststore

    ${path}/flink.truststore

    Path for storing the truststore. flink.truststore indicates the name of the truststore file generated by the generate_keystore.sh* tool.

    security.ssl.internal.truststore-password

    -

    A user-defined password of truststore.

    For versions earlier than MRS 3.x, if SSL for external connections is enabled, that is, security.ssl.rest.enabled is set to true, you need to configure the parameters listed in Table 7.

    Table 7 Parameters

    Parameter

    Example Value

    Description

    security.ssl.rest.enabled

    true

    Enable external SSL. If this parameter is set to true, set the related parameters by referring to Table 7.

    security.ssl.rest.keystore

    ${path}/flink.keystore

    Path for storing the keystore.

    security.ssl.rest.keystore-password

    -

    A user-defined password of keystore.

    security.ssl.rest.key-password

    -

    A user-defined password of the SSL key.

    security.ssl.rest.truststore

    ${path}/flink.truststore

    Path for storing the truststore.

    security.ssl.rest.truststore-password

    -

    A user-defined password of truststore.

    Note

    The path directory is a user-defined directory for storing configuration files of the SSL keystore and truststore. The commands vary according to the relative path and absolute path. For details, see 3 and 4.

  3. If the keystore or truststore file path is a relative path, the Flink client directory where the command is executed needs to access this relative path directly. Either of the following method can be used to transmit the keystore and truststore file:

    • Add -t option to the CLI yarn-session.sh command to transfer the keystore and truststore file to execution nodes. Example:

      ./bin/yarn-session.sh -t ssl/
      
    • Add -yt option to the flink run command to transfer the keystore and truststore file to execution nodes. Example:

      ./bin/flink run -yt ssl/ -ys 3  -m yarn-cluster -c org.apache.flink.examples.java.wordcount.WordCount /opt/client/Flink/flink/examples/batch/WordCount.jar
      

      Note

      • In the preceding example, ssl/ is the sub-directory of the Flink client directory. It is used to store configuration files of the SSL keystore and truststore.

      • The relative path of ssl/ must be accessible from the current path where the Flink client command is run.

  4. If the keystore or truststore file path is an absolute path, the keystore and truststore files must exist in the absolute path on Flink Client and all nodes.

    Note

    For versions earlier than MRS 3.x, the user who submits the job must have the permission to read the keystore and truststore files.

    Either of the following methods can be used to execute applications. The -t or -yt option does not need to be added to transmit the keystore and truststore files.

    • Run the CLI yarn-session.sh command of Flink to execute applications. Example:

      ./bin/yarn-session.sh
      
    • Run the Flink run command to execute applications. Example:

      ./bin/flink run  -ys 3 -m yarn-cluster -c org.apache.flink.examples.java.wordcount.WordCount /opt/client/Flink/flink/examples/batch/WordCount.jar