Python

This section takes Linux CentOS as an example to describe how to access a Kafka instance using a Kafka client in Python, including how to install the client, and produce and consume messages.

Before getting started, ensure that you have collected the information listed in Collecting Connection Information.

Preparing the Environment

  • Python

    Generally, Python is pre-installed in the system. Enter python in a CLI. If the following information is displayed, Python has already been installed.

    [root@ecs-test python-kafka]# python3
    Python 3.7.1 (default, Jul  5 2020, 14:37:24)
    [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
    Type "help", "copyright", "credits" or "license" for more information.
    >>>
    

    If Python is not installed, run the following command:

    yum install python

  • Kafka clients in Python

    Run the following command to install a Python client of the recommended version:

    pip install kafka-python==2.0.1

Producing Messages

  • With SASL

    from kafka import KafkaProducer
    import ssl
    ##Connection information
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_username': 'username',
        'sasl_password': 'password'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    ## If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
    context.verify_mode = ssl.CERT_REQUIRED
    ## The certificate file. Obtain the SSL certificate by referring to "Collecting Connection Information". If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
    context.load_verify_locations("phy_ca.crt")
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_username'],
                            sasl_plain_password=conf['sasl_password'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')
    

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port

    • topic_name: topic name

    • sasl_plain_username/sasl_plain_password: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.

    • context.load_verify_locations: certificate file. This parameter is mandatory when Security Protocol is set to SASL_SSL. CRT certificates are used for accessing instances in Python.

    • sasl_mechanism: SASL authentication mechanism. View it on the Basic Information page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if SASL Mechanism is not displayed on the instance details page, PLAIN is used by default.

    • security_protocol: Kafka security protocol. Obtain it from the Basic Information page on the Kafka console. For Kafka instances that were created much earlier, if Security Protocol is not displayed on the instance details page, SASL_SSL is used by default.

      • When Security Protocol is set to SASL_SSL, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.

      • When Security Protocol is set to SASL_PLAINTEXT, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.

  • Without SASL

    from kafka import KafkaProducer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
    }
    
    print('start producer')
    producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'])
    
    data = bytes("hello kafka!", encoding="utf-8")
    producer.send(conf['topic_name'], data)
    producer.close()
    print('end producer')
    

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port

    • topic_name: topic name

Consuming Messages

  • With SASL

    from kafka import KafkaConsumer
    import ssl
    ##Connection information
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic_name',
        'sasl_username': 'username',
        'sasl_password': 'password',
        'consumer_id': 'consumer_id'
    }
    
    context = ssl.create_default_context()
    context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
    ## If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
    context.verify_mode = ssl.CERT_REQUIRED
    ## The certificate file. Obtain the SSL certificate by referring to "Collecting Connection Information". If Security Protocol is set to SASL_PLAINTEXT, comment out the following parameters:
    context.load_verify_locations("phy_ca.crt")
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'],
                            sasl_mechanism="PLAIN",
                            ssl_context=context,
                            security_protocol='SASL_SSL',
                            sasl_plain_username=conf['sasl_username'],
                            sasl_plain_password=conf['sasl_password'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')
    

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port

    • topic_name: topic name

    • sasl_plain_username/sasl_plain_password: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.

    • consumer_id: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.

    • context.load_verify_locations: certificate file. This parameter is mandatory when Security Protocol is set to SASL_SSL. CRT certificates are used for accessing instances in Python.

    • sasl_mechanism: SASL authentication mechanism. View it on the Basic Information page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if SASL Mechanism is not displayed on the instance details page, PLAIN is used by default.

    • security_protocol: Kafka security protocol. Obtain it from the Basic Information page on the Kafka console. For Kafka instances that were created much earlier, if Security Protocol is not displayed on the instance details page, SASL_SSL is used by default.

      • When Security Protocol is set to SASL_SSL, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.

      • When Security Protocol is set to SASL_PLAINTEXT, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.

  • Without SASL

    from kafka import KafkaConsumer
    
    conf = {
        'bootstrap_servers': ["ip1:port1","ip2:port2","ip3:port3"],
        'topic_name': 'topic-name',
        'consumer_id': 'consumer-id'
    }
    
    print('start consumer')
    consumer = KafkaConsumer(conf['topic_name'],
                            bootstrap_servers=conf['bootstrap_servers'],
                            group_id=conf['consumer_id'])
    
    for message in consumer:
        print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
    print('end consumer')
    

    The parameters in the example code are described as follows. For details about how to obtain the parameter values, see Collecting Connection Information.

    • bootstrap_servers: instance connection address and port

    • topic_name: topic name

    • consumer_id: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.