Configuring Kafka

Sample project data of Flink is stored in Kafka. A user with Kafka permission can send data to Kafka and receive data from it.

  1. Ensure that clusters, including HDFS, Yarn, Flink, and Kafka are installed.

  2. Create a topic.

    • Run Linux command line to create a topic. Before running commands, ensure that the kinit command, for example, kinit flinkuser, is run for authentication.

      Note

      To create a Flink user, you need to have the permission to create Kafka topics.

      The format of the command is shown as follows, in which {zkQuorum} indicates ZooKeeper cluster information and the format is IP:port, and {Topic} indicates the topic name.

      bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 5 --topic {Topic}

      Assume the topic name is topic 1. The command for creating this topic is displayed as follows:

      /opt/client/Kafka/kafka/bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --replication-factor 1 --partitions 5 --topic topic1
      
    • Configure the permission of the topic on the server.

      Set the allow.everyone.if.no.acl.found parameter of Kafka Broker to true.

  3. Perform the security authentication.

    The Kerberos authentication, SSL encryption authentication, or Kerberos + SSL authentication mode can be used.

    • Kerberos authentication

      • Client configuration

        In the Flink configuration file flink-conf.yaml, add configurations about Kerberos authentication. For example, add KafkaClient in contexts as follows:

        security.kerberos.login.keytab: /home/demo//keytab/flinkuser.keytab
        security.kerberos.login.principal: flinkuser
        security.kerberos.login.contexts: Client,KafkaClient
        security.kerberos.login.use-ticket-cache: false
        
      • Running parameter

        Running parameters about the SASL_PLAINTEXT protocol are as follows:

        --topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT  --sasl.kerberos.service.name kafka //10.96.101.32:21007 indicates the IP:port of the Kafka server.
        
    • SSL encryption

      • Configuration on the server

        Log in to FusionInsight Manager, choose Cluster > Services > Kafka > Configurations, and set Type to All. Search for ssl.mode.enable and set it to true.

      • Configuration on the client

        1. Log in to FusionInsight Manager, choose Cluster > Name of the desired cluster > Services > Kafka > More > Download Client to download Kafka client.

        2. Use the ca.crt certificate file in the client root directory to generate the truststore file for the client.

          Run the following command:

          keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks
          

          The command execution result is similar to the following:

          image1

        3. Run parameters.

          The value of ssl.truststore.password must be the same as the password you entered when creating truststore. Run the following command to run parameters:

          --topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX
          
    • Kerberos+SSL encryption

      After completing preceding configurations of the client and server of Kerberos and SSL, modify the port number and protocol type in running parameters to enable the Kerberos+SSL encryption mode.

      --topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL  --sasl.kerberos.service.name kafka --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password XXX