Configuring Kafka Clients in Java

This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDEA, see Setting Up the Java Development Environment.

The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.

Adding Kafka Clients in Maven

//Kafka instances are based on Kafka 1.1.0/2.3.0/2.7. Use the same version of clients.
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.1.0/2.3.0/2.7</version>
        </dependency>

Preparing Kafka Configuration Files

The following describes example producer and consumer configuration files. If SASL is not enabled for the Kafka instance, comment out lines regarding SASL. If SASL has been enabled, set SASL configurations for encrypted access.

  • Producer configuration file (the dms.sdk.producer.properties file in the demo project)

    The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.

    #The topic name is in the specific production and consumption code.
    #######################
    #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Producer acknowledgement
    acks=all
    #Method of turning the key into bytes
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Method of turning the value into bytes
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    #Memory available to the producer for buffering
    buffer.memory=33554432
    #Number of retries
    retries=0
    #######################
    #Comment out the following parameters if SASL access is not enabled.
    #######################
    #Configure the JAAS username and password. username and password are set when you enable Kafka SASL_SSL during instance creation or when you create a SASL_SSL user.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL mechanism
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, only SASL_SSL is supported.
    security.protocol=SASL_SSL
    #Location of ssl.truststore
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file for accessing the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    #Whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification.
    ssl.endpoint.identification.algorithm=
    
  • Consumer configuration file (the dms.sdk.consumer.properties file in the demo project)

    The information in bold is specific to different Kafka instances and must be modified. Other parameters can also be added.

    #The topic name is in the specific production and consumption code.
    #######################
    #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x
    bootstrap.servers=ip1:port1,ip2:port2,ip3:port3
    #Unique string to identify the group of consumer processes to which the consumer belongs. Configuring the same group.id for different processes indicates that the processes belong to the same consumer group.
    group.id=1
    #Method of turning the key into bytes
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Method of turning the value into bytes
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #Offset reset policy
    auto.offset.reset=earliest
    #######################
    #Comment out the following parameters if SASL access is not enabled.
    #######################
    #Configure the JAAS username and password. username and password are set when you enable Kafka SASL_SSL during instance creation or when you create a SASL_SSL user.
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
        username="username" \
        password="password";
    #SASL mechanism
    sasl.mechanism=PLAIN
    #Encryption protocol. Currently, only SASL_SSL is supported.
    security.protocol=SASL_SSL
    #Location of ssl.truststore
    ssl.truststore.location=E:\\temp\\client.truststore.jks
    #Password of the SSL truststore file for accessing the JKS file generated by Java.
    ssl.truststore.password=dms@kafka
    # Disable certificate domain name verification.
    ssl.endpoint.identification.algorithm=
    

Producing Messages

  • Test code

    package com.dms.producer;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.junit.Test;
    
    public class DmsProducerTest {
        @Test
        public void testProducer() throws Exception {
            DmsProducer<String, String> producer = new DmsProducer<String, String>();
            int partition = 0;
            try {
                for (int i = 0; i < 10; i++) {
                    String key = null;
                    String data = "The msg is " + i;
                    //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following code.
                    producer.produce("topic-0", partition, key, data, new Callback() {
                        public void onCompletion(RecordMetadata metadata,
                            Exception exception) {
                            if (exception != null) {
                                exception.printStackTrace();
                                return;
                            }
                            System.out.println("produce msg completed");
                        }
                    });
                    System.out.println("produce msg:" + data);
                }
            } catch (Exception e) {
                // TODO: Exception handling
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    }
    
  • Message production code

    package com.dms.producer;
    
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Enumeration;
    import java.util.List;
    import java.util.Properties;
    
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class DmsProducer<K, V> {
         //Add the producer configurations that have been specified earlier.
        public static final String CONFIG_PRODUCER_FILE_NAME = "dms.sdk.producer.properties";
    
        private Producer<K, V> producer;
    
        DmsProducer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
        DmsProducer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            producer = new KafkaProducer<K,V>(props);
        }
    
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         */
        public void produce(String topic, Integer partition, K key, V data)
        {
            produce(topic, partition, key, data, null, (Callback)null);
        }
    
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         * @param timestamp    timestamp
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp)
        {
            produce(topic, partition, key, data, timestamp, (Callback)null);
        }
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Callback callback)
        {
            produce(topic, partition, key, data, null, callback);
        }
    
        public void produce(String topic, V data)
        {
            produce(topic, null, null, data, null, (Callback)null);
        }
    
        /**
         * Producing messages
         *
         * @param topic        Topic
         * @param partition    partition
         * @param key          Message key
         * @param data         Message data
         * @param timestamp    timestamp
         * @param callback    callback
         */
        public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback)
        {
            ProducerRecord<K, V> kafkaRecord =
                    timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data)
                            : new ProducerRecord<K, V>(topic, partition, timestamp, key, data);
            produce(kafkaRecord, callback);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord)
        {
            produce(kafkaRecord, (Callback)null);
        }
    
        public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback)
        {
            producer.send(kafkaRecord, callback);
        }
    
        public void close()
        {
            producer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = DmsProducer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * Load configuration information from classpath.
         *
         * @param configFileName Configuration file name
         * @return Configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }
    

Consuming Messages

  • Test code

    package com.dms.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.junit.Test;
    import java.util.Arrays;
    
    public class DmsConsumerTest {
        @Test
        public void testConsumer() throws Exception {
            DmsConsumer consumer = new DmsConsumer();
            consumer.consume(Arrays.asList("topic-0"));
            try {
                for (int i = 0; i < 10; i++){
                    ConsumerRecords<Object, Object> records = consumer.poll(1000);
                    System.out.println("the numbers of topic:" + records.count());
                    for (ConsumerRecord<Object, Object> record : records)
                    {
                        System.out.println(record.toString());
                    }
                }
            }catch (Exception e)
            {
                // TODO: Exception handling
                e.printStackTrace();
            }finally {
                consumer.close();
            }
        }
    }
    
  • Message consumption code

    package com.dms.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import java.io.BufferedInputStream;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.net.URL;
    import java.util.*;
    
    public class DmsConsumer {
    
        public static final String CONFIG_CONSUMER_FILE_NAME = "dms.sdk.consumer.properties";
    
        private KafkaConsumer<Object, Object> consumer;
    
        DmsConsumer(String path)
        {
            Properties props = new Properties();
            try {
                InputStream in = new BufferedInputStream(new FileInputStream(path));
                props.load(in);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
    
        DmsConsumer()
        {
            Properties props = new Properties();
            try {
                props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);
            }catch (IOException e)
            {
                e.printStackTrace();
                return;
            }
            consumer = new KafkaConsumer<Object, Object>(props);
        }
        public void consume(List topics)
        {
            consumer.subscribe(topics);
        }
    
        public ConsumerRecords<Object, Object> poll(long timeout)
        {
            return consumer.poll(timeout);
        }
    
        public void close()
        {
            consumer.close();
        }
    
        /**
         * get classloader from thread context if no classloader found in thread
         * context return the classloader which has loaded this class
         *
         * @return classloader
         */
        public static ClassLoader getCurrentClassLoader()
        {
            ClassLoader classLoader = Thread.currentThread()
                    .getContextClassLoader();
            if (classLoader == null)
            {
                classLoader = DmsConsumer.class.getClassLoader();
            }
            return classLoader;
        }
    
        /**
         * Load configuration information from classpath.
         *
         * @param configFileName Configuration file name
         * @return Configuration information
         * @throws IOException
         */
        public static Properties loadFromClasspath(String configFileName) throws IOException
        {
            ClassLoader classLoader = getCurrentClassLoader();
            Properties config = new Properties();
    
            List<URL> properties = new ArrayList<URL>();
            Enumeration<URL> propertyResources = classLoader
                    .getResources(configFileName);
            while (propertyResources.hasMoreElements())
            {
                properties.add(propertyResources.nextElement());
            }
    
            for (URL url : properties)
            {
                InputStream is = null;
                try
                {
                    is = url.openStream();
                    config.load(is);
                }
                finally
                {
                    if (is != null)
                    {
                        is.close();
                        is = null;
                    }
                }
            }
    
            return config;
        }
    }