Authentication and Encryption

Security Authentication

Flink uses the following three authentication modes:

  • Kerberos authentication: It is used between the Flink Yarn client and Yarn ResourceManager, JobManager and ZooKeeper, JobManager and HDFS, TaskManager and HDFS, Kafka and TaskManager, as well as TaskManager and ZooKeeper.

  • Security cookie authentication: Security cookie authentication is used between Flink Yarn client and JobManager, JobManager and TaskManager, as well as TaskManager and TaskManager.

  • Internal authentication of Yarn: The Internal authentication mechanism of Yarn is used between Yarn ResourceManager and ApplicationMaster (AM).

    Note

    • Flink JobManager and Yarn ApplicationMaster are in the same process.

    • If Kerberos authentication is enabled for the user's cluster, Kerberos authentication is required.

    Table 1 Authentication modes

    Authentication Mode

    Description

    Configuration Method

    Kerberos authentication

    Currently, only keytab authentication mode is supported.

    1. Download the user keytab from the KDC server, and place the keytab to a directory on the host of the Flink client.

    2. Configure the following parameters in the flink-conf.yaml file:

      1. Keytab path

        security.kerberos.login.keytab: /home/flinkuser/keytab/abc222.keytab
        

        Note:

        /home/flinkuser/keytab/abc222.keytab indicates the user directory.

      2. Principal name

        security.kerberos.login.principal: abc222
        
      3. In HA mode, if ZooKeeper is configured, the Kerberos authentication configuration items must be configured as follows:

        zookeeper.sasl.disable: false
        security.kerberos.login.contexts: Client
        
      4. If you want to perform Kerberos authentication between Kafka client and Kafka broker, set the value as follows:

        security.kerberos.login.contexts: Client,KafkaClient
        

    Security cookie authentication

    -

    1. Generate the generate_keystore.sh script by referring to Example of Issuing a Certificate and place it in the bin directory of the Flink client. In the bin directory of the Flink client, run the generate_keystore.sh script to generate security cookie, flink.keystore, and flink.truststore.

      Run the sh generate_keystore.sh command and enter the user-defined password. The password cannot contain #.

      Note

      After the script is executed, the flink.keystore and flink.truststore files are generated in the conf directory on the Flink client. In the flink-conf.yaml file, default values are specified for following parameters:

      • Set security.ssl.keystore to the absolute path of the flink.keystore file.

      • Set security.ssl.truststore to the absolute path of the flink.truststore file.

      • Set security.cookie to a random password automatically generated by the generate_keystore.sh script.

      • By default, security.ssl.encrypt.enabled: false is set in the flink-conf.yaml file by default. The generate_keystore.sh script sets security.ssl.key-password, security.ssl.keystore-password, and security.ssl.truststore-password to the password entered when the generate_keystore.sh script is called.

      • If ciphertext is required and security.ssl.encrypt.enabled: true, is set in the flink-conf.yaml file, the generate_keystore.sh script does not set security.ssl.key-password, security.ssl.keystore-password, and security.ssl.truststore-password. To obtain the values, use the Manager plaintext encryption API by running the following command: curl -k -i -u Username:Password -X POST -HContent-type:application/json -d '{"plainText":"Password"}' 'https://x.x.x.x:28443/web/api/v2/tools/encrypt'

        In the preceding command, Username:Password indicates the user name and password for logging in to the system. The password of "plainText" indicates the one used to call the generate_keystore.sh script. x.x.x.x indicates the floating IP address of Manager.

    2. Set security.enable: true in the flink-conf.yaml file and check whether security cookie is configured successfully. Example:

      security.cookie: ae70acc9-9795-4c48-ad35-8b5adc8071744f605d1d-2726-432e-88ae-dd39bfec40a9
      

    Note

    Obtain the SSL certificate and save it to the Flink client. For details, see Example of Issuing a Certificate.

    Internal authentication of Yarn

    This authentication mode does not need to be configured by the user.

    -

    Note

    One Flink cluster supports only one user. One user can create multiple Flink clusters.

Encrypted Transmission

Flink uses following encrypted transmission modes:

  • Encrypted transmission inside Yarn: It is used between the Flink Yarn client and Yarn ResourceManager, as well as Yarn ResourceManager and JobManager.

  • SSL transmission: SSL transmission is used between Flink Yarn client and JobManager, JobManager and TaskManager, as well as TaskManager and TaskManager.

  • Encrypted transmission inside Hadoop: The internal encrypted transmission mode of Hadoop used between JobManager and HDFS, TaskManager and HDFS, JobManager and ZooKeeper, as well as TaskManager and ZooKeeper.

Note

Configuration about SSL encrypted transmission is mandatory while configuration about encryption of Yarn and Hadoop is not required.

To configure SSL encrypted transmission, configure the following parameters in the flink-conf.yaml file on the client:

  1. Enable SSL and configure the SSL encryption algorithm. see Table 2. Modify the parameters as required.

    Table 2 Parameter description

    Parameter

    Example Value

    Description

    security.ssl.enabled

    true

    Enable SSL.

    akka.ssl.enabled

    true

    Enable Akka SSL.

    blob.service.ssl.enabled

    true

    Enable SSL for the Blob channel.

    taskmanager.data.ssl.enabled

    true

    Enable SSL transmissions between TaskManagers.

    security.ssl.algorithms

    TLS_DHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_DHE_RSA_WITH_AES_256_GCM_SHA384,TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384

    Configure the SSL encryption algorithm.

    Note

    Enabling SSL for data transmission between TaskManagers may pose great impact on the system performance.

  2. Generate the generate_keystore.sh script by referring to Example of Issuing a Certificate and place it in the bin directory of the Flink client. In the bin directory of the Flink client, run the sh generate_keystore.sh <password> command. For details, see Authentication and Encryption. The configuration items in Table 3 are set by default. You can also configure them manually.

    Table 3 Parameter description

    Parameter

    Example Value

    Description

    security.ssl.keystore

    ${path}/flink.keystore

    Path for storing the keystore. flink.keystore indicates the name of the keystore file generated by the generate_keystore.sh* tool.

    security.ssl.keystore-password

    123456

    Password of the keystore. 123456 indicates a user-defined password is required.

    security.ssl.key-password

    123456

    Password of the SSL key. 123456 indicates a user-defined password is required.

    security.ssl.truststore

    ${path}/flink.truststore

    Path for storing the truststore. flink.truststore indicates the name of the truststore file generated by the generate_keystore.sh* tool.

    security.ssl.truststore-password

    123456

    Password of the truststore. 123456 indicates a user-defined password is required.

    Note

    The path directory is a user-defined directory for storing configuration files of the SSL keystore and truststore. The commands vary according to the relative path and absolute path. For details, see 3 and 4.

  3. If the keystore or truststore file path is a relative path, the Flink client directory where the command is executed needs to access this relative path directly. Either of the following method can be used to transmit the keystore and truststore file:

    • Add -t option to the CLI yarn-session.sh command to transfer the keystore and truststore file to execution nodes. Example:

      ./bin/yarn-session.sh -t ssl/
      
    • Add -yt option to the flink run command to transfer the keystore and truststore file to execution nodes. Example:

      ./bin/flink run -yt ssl/ -ys 3  -m yarn-cluster -c org.apache.flink.examples.java.wordcount.WordCount /opt/client/Flink/flink/examples/batch/WordCount.jar
      

      Note

      • In the preceding example, ssl/ is the sub-directory of the Flink client directory. It is used to store configuration files of the SSL keystore and truststore.

      • The relative path of ssl/ must be accessible from the current path where the Flink client command is run.

  4. If the keystore or truststore file path is an absolute path, the keystore and truststore files must exist in the absolute path on Flink Client and all nodes.

    Either of the following methods can be used to execute applications. The -t or -yt option does not need to be added to transmit the keystore and truststore files.

    • Run the CLI yarn-session.sh command of Flink to execute applications. Example:

      ./bin/yarn-session.sh
      
    • Run the Flink run command to execute applications. Example:

      ./bin/flink run  -ys 3 -m yarn-cluster -c org.apache.flink.examples.java.wordcount.WordCount /opt/client/Flink/flink/examples/batch/WordCount.jar