github-projects
kafka-stream-docker
Kafka Consumer

📜 Kafka Consumer Python Script 📜

This script demonstrates how to consume messages from a Kafka topic using the Confluent Kafka Python client. It connects to a Kafka broker, subscribes to a topic, and processes incoming messages.

Script Overview

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
 
# Kafka broker address
bootstrap_servers = 'kafka.example.in:9092'
 
# Kafka topic to consume from
topic = 'web_logs_topic'  # Update this with your topic name
 
# Consumer configuration
conf = {
    'bootstrap.servers': bootstrap_servers,
    'group.id': 'my_consumer_group',  # Consumer group ID
    'auto.offset.reset': 'earliest'   # Start consuming from the earliest message (optional)
}
 
# Create Kafka Consumer
consumer = Consumer(conf)
 
# Subscribe to topic
consumer.subscribe([topic])
 
try:
    while True:
        # Poll for messages
        msg = consumer.poll(1.0)
 
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                # End of partition, continue to next
                continue
            else:
                # Other errors
                print(msg.error())
                break
 
        # Print message value
        print('Received message: {}'.format(msg.value().decode('utf-8')))
 
except KeyboardInterrupt:
    pass
 
finally:
    # Close the consumer
    consumer.close()

Configuration

Kafka Broker Address

  • bootstrap_servers: The address of your Kafka broker (e.g., 'kafka.example.in:9092').

Kafka Topic

  • topic: The Kafka topic you want to consume from (e.g., 'web_logs_topic').

Consumer Configuration

  • group.id: The ID for the consumer group. Consumers with the same group ID share the messages.
  • auto.offset.reset: Defines where to start consuming messages if no previous offset is found. 'earliest' starts from the beginning.

Handling Messages

Polling Messages

  • consumer.poll(1.0): Polls for messages with a timeout of 1 second.
  • msg: The fetched message object.

Error Handling

  • msg.error(): Checks for errors in the message.
  • KafkaError._PARTITION_EOF: Indicates end-of-partition. Continue to the next partition.
  • Other Errors: Print the error message and stop processing.

Message Processing

  • msg.value().decode('utf-8'): Decodes the message value from bytes to a UTF-8 string.

Graceful Shutdown

Keyboard Interrupt

  • KeyboardInterrupt: Handles interruption (e.g., pressing Ctrl+C).

Finally Block

  • consumer.close(): Ensures the consumer is properly closed when the script ends.

Running the Script

  1. Install Dependencies: Make sure you have confluent-kafka installed.

    pip install confluent_kafka
  2. Run the Script: Execute the script with Python.

    python kafka_consumer.py

This documentation provides a complete overview of the Kafka consumer script, including its configuration, functionality, and error handling. It helps in managing and processing Kafka messages efficiently.


🧙 AI Wizard - Instant Page Insights

Click the button below to analyze this page.
Get an AI-generated summary and key insights in seconds.
Powered by Perplexity AI!