In this tutorial you will learn how to interact Kafka using python module. this will help to you write your own ansible module to handle Kafka or monitoring script to check Kafka.

There are two main libraries for kafka in python, pykafka and kafka-python, we will see pykafka.

Install pykafka

pip install pykafka

Kafka configuration

With all settings to run Kafka server, you need to make sure following entry should be enabled on your Kafka server.

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

Connecting Kafka

You can use ipython to run following examples

from pykafka import KafkaClient
client = KafkaClient(hosts="ip:9092")
print client.topics

You will see topics name in output or you could face connection problems as below :

NoBrokersAvailableError: Unable to connect to a broker to fetch metadata. See logs

This problem because of Kafka port is not opened or ACL/ranger policy as missconfigured on Kafka.

You can enable debug mode using :

import logging as log
log.basicConfig(level=log.DEBUG)

Execute again

from pykafka import KafkaClient
client = KafkaClient(hosts="ip:9092")
print client.topics

Log output :

INFO:pykafka.cluster:Discovered 1 brokers
DEBUG:pykafka.cluster:Discovered broker id 0: 0.0.0.0:9092
DEBUG:pykafka.connection:Connecting to 0.0.0.0:9092
INFO:pykafka.connection:Failed to connect to 0.0.0.0:9092
WARNING:pykafka.broker:Failed to connect newly created broker for 0.0.0.0:9092

it’s failed because I used 0.0.0.0 instead of using kafka ip/hostname

Kafka Producer

PyKafka client does not support topic creation, so topic should be exists, then you can use following producer example

from pykafka import KafkaClient
client = KafkaClient(hosts="ip:9092")
# select topic 
topic = client.topics['mytopic']
with topic.get_sync_producer() as producer:
   producer.produce("Test message")

Kafka Consumer

from pykafka import KafkaClient
client = KafkaClient(hosts="ip:9092")
# select topic 
topic = client.topics['mytopic']
consumer = topic.get_simple_consumer(consumer_timeout_ms=5000)
for m in consumer:
   print m.value

in above example the consumer_timeout_ms says that, if there is no messages for 5 seconds then end the consumer, if you don’t set this, it will run forever unless you stopped the consumer.