Skip to main content

📜 Kafka Consumer Python Script 📜

This script demonstrates how to consume messages from a Kafka topic using the Confluent Kafka Python client. It connects to a Kafka broker, subscribes to a topic, and processes incoming messages.

Script Overview

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys

# Kafka broker address
bootstrap_servers = 'kafka.example.in:9092'

# Kafka topic to consume from
topic = 'web_logs_topic'  # Update this with your topic name

# Consumer configuration
conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'my_consumer_group',  # Consumer group ID
    'auto.offset.reset': 'earliest'   # Start consuming from the earliest message (optional)
}

# Create Kafka Consumer
consumer = Consumer(conf)

# Subscribe to topic
consumer.subscribe([topic])

try:
    while True:
        # Poll for messages
        msg = consumer.poll(1.0)

        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition, continue to next
                continue
            else:
                # Other errors
                print(msg.error())
                break

        # Print message value
        print('Received message: {}'.format(msg.value().decode('utf-8')))

except KeyboardInterrupt:
    pass

finally:
    # Close the consumer
    consumer.close()

Configuration

Kafka Broker Address

  • bootstrap_servers: The address of your Kafka broker (e.g., 'kafka.example.in:9092').

Kafka Topic

  • topic: The Kafka topic you want to consume from (e.g., 'web_logs_topic').

Consumer Configuration

  • group.id: The ID for the consumer group. Consumers with the same group ID share the messages.
  • auto.offset.reset: Defines where to start consuming messages if no previous offset is found. 'earliest' starts from the beginning.

Handling Messages

Polling Messages

  • consumer.poll(1.0): Polls for messages with a timeout of 1 second.
  • msg: The fetched message object.

Error Handling

  • msg.error(): Checks for errors in the message.
  • KafkaError._PARTITION_EOF: Indicates end-of-partition. Continue to the next partition.
  • Other Errors: Print the error message and stop processing.

Message Processing

  • msg.value().decode('utf-8'): Decodes the message value from bytes to a UTF-8 string.

Graceful Shutdown

Keyboard Interrupt

  • KeyboardInterrupt: Handles interruption (e.g., pressing Ctrl+C).

Finally Block

  • consumer.close(): Ensures the consumer is properly closed when the script ends.

Running the Script

  1. Install Dependencies: Make sure you have confluent-kafka installed.
    pip install confluent_kafka
    
  2. Run the Script: Execute the script with Python.
    python kafka_consumer.py
    

This documentation provides a complete overview of the Kafka consumer script, including its configuration, functionality, and error handling. It helps in managing and processing Kafka messages efficiently.