📜 Kafka Consumer Python Script 📜
This script demonstrates how to consume messages from a Kafka topic using the Confluent Kafka Python client. It connects to a Kafka broker, subscribes to a topic, and processes incoming messages.
Script Overview
from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
# Kafka broker address
bootstrap_servers = 'kafka.example.in:9092'
# Kafka topic to consume from
topic = 'web_logs_topic' # Update this with your topic name
# Consumer configuration
conf = {
'bootstrap.servers': bootstrap_servers,
'group.id': 'my_consumer_group', # Consumer group ID
'auto.offset.reset': 'earliest' # Start consuming from the earliest message (optional)
}
# Create Kafka Consumer
consumer = Consumer(conf)
# Subscribe to topic
consumer.subscribe([topic])
try:
while True:
# Poll for messages
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition, continue to next
continue
else:
# Other errors
print(msg.error())
break
# Print message value
print('Received message: {}'.format(msg.value().decode('utf-8')))
except KeyboardInterrupt:
pass
finally:
# Close the consumer
consumer.close()
Configuration
Kafka Broker Address
bootstrap_servers
: The address of your Kafka broker (e.g.,'kafka.example.in:9092'
).
Kafka Topic
topic
: The Kafka topic you want to consume from (e.g.,'web_logs_topic'
).
Consumer Configuration
group.id
: The ID for the consumer group. Consumers with the same group ID share the messages.auto.offset.reset
: Defines where to start consuming messages if no previous offset is found.'earliest'
starts from the beginning.
Handling Messages
Polling Messages
consumer.poll(1.0)
: Polls for messages with a timeout of 1 second.msg
: The fetched message object.
Error Handling
msg.error()
: Checks for errors in the message.KafkaError._PARTITION_EOF
: Indicates end-of-partition. Continue to the next partition.- Other Errors: Print the error message and stop processing.
Message Processing
msg.value().decode('utf-8')
: Decodes the message value from bytes to a UTF-8 string.
Graceful Shutdown
Keyboard Interrupt
KeyboardInterrupt
: Handles interruption (e.g., pressingCtrl+C
).
Finally Block
consumer.close()
: Ensures the consumer is properly closed when the script ends.
Running the Script
-
Install Dependencies: Make sure you have
confluent-kafka
installed.pip install confluent_kafka
-
Run the Script: Execute the script with Python.
python kafka_consumer.py
This documentation provides a complete overview of the Kafka consumer script, including its configuration, functionality, and error handling. It helps in managing and processing Kafka messages efficiently.