PitchHut logo
kafka-manager
by cruel_pink_merrill
Simplifying Kafka management for developers with a user-friendly Python interface.
Pitch

kafka-manager is a Python library designed to simplify interactions with Kafka Producers, Consumers, and Topics. By providing a high-level abstraction, it streamlines the development process and promotes better maintainability of Kafka applications, enabling developers to focus on building features without getting bogged down in Kafka's intricacies.

Description

kafka-manager is a Python library designed to simplify interactions with Apache Kafka by providing a high-level abstraction for managing Producers, Consumers, and Topics. This utility empowers developers to effectively incorporate Kafka within their applications while minimizing the complexities that typically arise from using the kafka-python library. This abstraction accelerates development and facilitates easier maintenance of Kafka-related applications.

Key Features

Producer Management

The library offers functionality to manage Kafka Producers, including the ability to start and stop producers, send messages to specified topics, and monitor the running status of producers. The following example demonstrates how to initialize a Kafka producer, send a message to a topic, and ensure the successful transmission of the message:

import json
from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace with your topic name
group_id = 'example_group'  # Replace with your consumer group ID

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# Start the Kafka producer
kafka_manager.start_producer()

# Send Kafka message
try:
    message_payload = json.dumps({"message_key": "message_value"})
    metadata = kafka_manager.send_message(topic=topic_name, value=message_payload)
    if metadata:
        print(f'Message sent successfully to Kafka topic: "{topic_name}"')
    else:
        print(f'Failed to send message to Kafka topic: "{topic_name}"')
except Exception as e:
    print(f'Error in sending message to Kafka topic: {e}')

# Stop Kafka producer
kafka_manager.stop_producer()

Consumer Management

Kafka-manager provides tools for configuring and managing consumers, allowing developers to create consumers tailored to their application's needs, including different deserialization methods and offset management strategies. The code below shows how to set up a consumer and handle incoming messages using a user-defined callback function:

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace with your topic name
group_id = 'example_group'  # Replace with your consumer group ID

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# Create a Kafka Consumer
consumer = kafka_manager.create_consumer(topics=[topic_name], group_id=group_id, auto_offset_reset='earliest')

# Start the Kafka Consumer
kafka_manager.start_consumer(consumer_id=group_id)

def message_handler(message):
    """
    Callback function to handle received messages.
    In a production application, this logic could include:
    - Business logic processing
    - Storing messages in a database
    - Message deserialization
    """
    print(f'Received message: Partition={message.partition}, Offset={message.offset}, Value={message.value}')

# Consume Messages
kafka_manager.consume_messages(consumer_id=group_id, message_handler=message_handler)

Topic Management

With kafka-manager, developers can dynamically create and delete topics as needed to manage data streams effectively. The following example shows how to create a topic using the library:

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses
topic_name = 'example_topic'  # Replace with your topic name

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# Connect to Kafka admin client
kafka_manager.connect_admin_client()

# Create a topic if it doesn't exist
kafka_manager.create_topic(topic_name=topic_name, num_partitions=1, replication_factor=1)

Admin Client

The library includes an interface to connect to the Kafka Admin client, enabling administrative operations such as creating and deleting topics. Below is an example:

from kafka_manager.kafka_manager import KafkaManager

bootstrap_servers = ['localhost:9092']  # Replace with your Kafka broker addresses

# Create a KafkaManager instance
kafka_manager = KafkaManager(bootstrap_servers=bootstrap_servers)

# Connect to Kafka admin client
admin_client = kafka_manager.connect_admin_client()

# List Consumer Groups
consumers_groups = admin_client.list_consumer_groups()
print(consumers_groups)

# Describe Consumer Groups
admin_client.describe_consumer_groups(list(consumers_groups))

Error Handling

kafka-manager effectively addresses potential errors from network failures, broker issues, or misconfigurations, ensuring stable application performance.

Resource Management

The library ensures all Kafka connections are properly closed with a dedicated close() function, protecting against resource leaks and maintaining data integrity within the Kafka cluster.

For more details, please refer to the full documentation at Kafka Manager Documentation.

The library requires Python 3.7 or higher and the kafka-python package.

0 comments

No comments yet.

Sign in to be the first to comment.