Configuring Kafka Clients in Java¶
This section describes how to add Kafka clients in Maven, and use the clients to access Kafka instances and produce and consume messages. To check how the demo project runs in IDEA, see Setting Up the Java Development Environment.
The Kafka instance connection addresses, topic name, and user information used in the following examples are available in Collecting Connection Information.
Adding Kafka Clients in Maven¶
//Kafka instances are based on Kafka 1.1.0/2.3.0/2.7/3.x. Use the same version of the client.
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0/2.3.0/2.7.2/3.4.0</version>
</dependency>
Preparing Kafka Configuration Files¶
The following describes example producer and consumer configuration files. If ciphertext access is not enabled for the Kafka instance, comment out lines regarding the encryption. Otherwise, set configurations for encrypted access.
Producer configuration file (the dms.sdk.producer.properties file in the message production code)
#The topic name is in the specific production and consumption code. ####################### #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #Producer acknowledgement acks=all #Method of turning the key into bytes key.serializer=org.apache.kafka.common.serialization.StringSerializer #Method of turning the value into bytes value.serializer=org.apache.kafka.common.serialization.StringSerializer #Memory available to the producer for buffering buffer.memory=33554432 #Number of retries retries=0 ####################### #Comment out the following parameters if ciphertext access is not enabled. ####################### # Set the SASL authentication mechanism, username, and password. #sasl.mechanism is the SASL mechanism. username and password are the SASL username and password. Obtain them by referring to section "Collecting Connection Information". For security purposes, you are advised to encrypt the username and password. # If the SASL mechanism is PLAIN, the configuration is as follows: sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; # If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows: sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; # Set security.protocol. # If the security protocol is SASL_SSL, the configuration is as follows: security.protocol=SASL_SSL # ssl truststore.location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment. ssl.truststore.location=E:\\temp\\client.jks # ssl truststore.password is the password of the server certificate. This password is used for accessing the JKS file generated by Java. ssl.truststore.password=dms@kafka # ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification. ssl.endpoint.identification.algorithm= # If the security protocol is SASL_PLAINTEXT, the configuration is as follows: security.protocol=SASL_PLAINTEXT
Consumer configuration file (the dms.sdk.consumer.properties file in the message consumption code)
#The topic name is in the specific production and consumption code. ####################### #Information about Kafka brokers. ip:port are the connection addresses and ports used by the instance. The values can be obtained by referring to the "Collecting Connection Information" section. Example: bootstrap.servers=100.xxx.xxx.87:909x,100.xxx.xxx.69:909x,100.xxx.xxx.155:909x bootstrap.servers=ip1:port1,ip2:port2,ip3:port3 #Unique string to identify the group of consumer processes to which the consumer belongs. Configuring the same group.id for different processes indicates that the processes belong to the same consumer group. group.id=1 #Method of turning the key into bytes key.deserializer=org.apache.kafka.common.serialization.StringDeserializer #Method of turning the value into bytes value.deserializer=org.apache.kafka.common.serialization.StringDeserializer #Offset reset policy auto.offset.reset=earliest ####################### #Comment out the following parameters if ciphertext access is not enabled. ####################### # Set the SASL authentication mechanism, username, and password. #sasl.mechanism is the SASL mechanism. username and password are the SASL username and password. Obtain them by referring to section "Collecting Connection Information". For security purposes, you are advised to encrypt the username and password. # If the SASL mechanism is PLAIN, the configuration is as follows: sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="username" \ password="password"; # If the SASL mechanism is SCRAM-SHA-512, the configuration is as follows: sasl.mechanism=SCRAM-SHA-512 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="username" \ password="password"; # Set security.protocol. # If the security protocol is SASL_SSL, the configuration is as follows: security.protocol=SASL_SSL # ssl truststore.location is the path for storing the SSL certificate. The following code uses the path format in Windows as an example. Change the path format based on the actual running environment. ssl.truststore.location=E:\\temp\\client.jks # ssl truststore.password is the password of the server certificate. This password is used for accessing the JKS file generated by Java. ssl.truststore.password=dms@kafka # ssl.endpoint.identification.algorithm indicates whether to verify the certificate domain name. This parameter must be left blank, which indicates disabling domain name verification. ssl.endpoint.identification.algorithm= # If the security protocol is SASL_PLAINTEXT, the configuration is as follows: security.protocol=SASL_PLAINTEXT
Producing Messages¶
Test code
package com.dms.producer; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; import org.junit.Test; public class DmsProducerTest { @Test public void testProducer() throws Exception { DmsProducer<String, String> producer = new DmsProducer<String, String>(); int partition = 0; try { for (int i = 0; i < 10; i++) { String key = null; String data = "The msg is " + i; //Enter the name of the topic you created. There are multiple APIs for producing messages. For details, see the Kafka official website or the following code. producer.produce("topic-0", partition, key, data, new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); return; } System.out.println("produce msg completed"); } }); System.out.println("produce msg:" + data); } } catch (Exception e) { // TODO: Exception handling e.printStackTrace(); } finally { producer.close(); } } }
Message production code
package com.dms.producer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.ArrayList; import java.util.Enumeration; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class DmsProducer<K, V> { //Add the producer configurations that have been specified earlier. public static final String CONFIG_PRODUCER_FILE_NAME = "dms.sdk.producer.properties"; private Producer<K, V> producer; DmsProducer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } DmsProducer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_PRODUCER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } producer = new KafkaProducer<K,V>(props); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data */ public void produce(String topic, Integer partition, K key, V data) { produce(topic, partition, key, data, null, (Callback)null); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data * @param timestamp timestamp */ public void produce(String topic, Integer partition, K key, V data, Long timestamp) { produce(topic, partition, key, data, timestamp, (Callback)null); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Callback callback) { produce(topic, partition, key, data, null, callback); } public void produce(String topic, V data) { produce(topic, null, null, data, null, (Callback)null); } /** * Producing messages * * @param topic Topic * @param partition partition * @param key Message key * @param data Message data * @param timestamp timestamp * @param callback callback */ public void produce(String topic, Integer partition, K key, V data, Long timestamp, Callback callback) { ProducerRecord<K, V> kafkaRecord = timestamp == null ? new ProducerRecord<K, V>(topic, partition, key, data) : new ProducerRecord<K, V>(topic, partition, timestamp, key, data); produce(kafkaRecord, callback); } public void produce(ProducerRecord<K, V> kafkaRecord) { produce(kafkaRecord, (Callback)null); } public void produce(ProducerRecord<K, V> kafkaRecord, Callback callback) { producer.send(kafkaRecord, callback); } public void close() { producer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = DmsProducer.class.getClassLoader(); } return classLoader; } /** * Load configuration information from classpath. * * @param configFileName Configuration file name * @return Configuration information * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }
Consuming Messages¶
Test code
package com.dms.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.junit.Test; import java.util.Arrays; public class DmsConsumerTest { @Test public void testConsumer() throws Exception { DmsConsumer consumer = new DmsConsumer(); consumer.consume(Arrays.asList("topic-0")); try { for (int i = 0; i < 10; i++){ ConsumerRecords<Object, Object> records = consumer.poll(1000); System.out.println("the numbers of topic:" + records.count()); for (ConsumerRecord<Object, Object> record : records) { System.out.println(record.toString()); } } }catch (Exception e) { // TODO: Exception handling e.printStackTrace(); }finally { consumer.close(); } } }
Message consumption code
package com.dms.consumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URL; import java.util.*; public class DmsConsumer { public static final String CONFIG_CONSUMER_FILE_NAME = "dms.sdk.consumer.properties"; private KafkaConsumer<Object, Object> consumer; DmsConsumer(String path) { Properties props = new Properties(); try { InputStream in = new BufferedInputStream(new FileInputStream(path)); props.load(in); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } DmsConsumer() { Properties props = new Properties(); try { props = loadFromClasspath(CONFIG_CONSUMER_FILE_NAME); }catch (IOException e) { e.printStackTrace(); return; } consumer = new KafkaConsumer<Object, Object>(props); } public void consume(List topics) { consumer.subscribe(topics); } public ConsumerRecords<Object, Object> poll(long timeout) { return consumer.poll(timeout); } public void close() { consumer.close(); } /** * get classloader from thread context if no classloader found in thread * context return the classloader which has loaded this class * * @return classloader */ public static ClassLoader getCurrentClassLoader() { ClassLoader classLoader = Thread.currentThread() .getContextClassLoader(); if (classLoader == null) { classLoader = DmsConsumer.class.getClassLoader(); } return classLoader; } /** * Load configuration information from classpath. * * @param configFileName Configuration file name * @return Configuration information * @throws IOException */ public static Properties loadFromClasspath(String configFileName) throws IOException { ClassLoader classLoader = getCurrentClassLoader(); Properties config = new Properties(); List<URL> properties = new ArrayList<URL>(); Enumeration<URL> propertyResources = classLoader .getResources(configFileName); while (propertyResources.hasMoreElements()) { properties.add(propertyResources.nextElement()); } for (URL url : properties) { InputStream is = null; try { is = url.openStream(); config.load(is); } finally { if (is != null) { is.close(); is = null; } } } return config; } }