"""
The Write-Exactly-Once-Semantics used by the :class:`KafkaHandler` is shown by
https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/eos-transactions.py,
parts of which are similar to the code in this module.
"""
import ast
import os
import sys
import time
from abc import abstractmethod
from typing import Optional
import marshmallow_dataclass
from confluent_kafka import (
Consumer,
KafkaError,
KafkaException,
Producer,
)
from confluent_kafka.admin import AdminClient, NewTopic
sys.path.append(os.getcwd())
from src.base.data_classes.batch import Batch
from src.base.log_config import get_logger
from src.base.utils import kafka_delivery_report, setup_config
logger = get_logger()
HOSTNAME = os.getenv("HOSTNAME", "default_tid")
CONSUMER_GROUP_ID = os.getenv("GROUP_ID", "default_gid")
NUMBER_OF_INSTANCES = int(os.getenv("NUMBER_OF_INSTANCES", 1))
config = setup_config()
KAFKA_BROKERS = config["environment"]["kafka_brokers"]
[docs]
class TooManyFailedAttemptsError(Exception):
"""Exception raised when operations exceed the maximum number of retry attempts
This exception is typically raised during Kafka topic creation or connection
establishment when the maximum number of retry attempts has been exceeded.
"""
pass
[docs]
class KafkaMessageFetchException(Exception):
"""Exception raised when Kafka message consumption fails
This exception is raised when there are errors during the process of fetching
or consuming messages from Kafka topics, including network issues, timeout
errors, or malformed message data.
"""
pass
[docs]
class KafkaHandler:
"""Base class for all Kafka wrappers and handlers
Provides common initialization and configuration setup for Kafka producers
and consumers. This abstract base class establishes the foundation for
specific Kafka handling implementations.
"""
[docs]
def __init__(self) -> None:
"""
Sets up the initial configuration and initializes the consumer attribute
to None. Specific implementations should override this method to establish
their respective Kafka clients.
"""
self.consumer = None
[docs]
class KafkaProduceHandler(KafkaHandler):
"""Abstract base class for Kafka Producer wrappers
Extends KafkaHandler to provide producer-specific functionality. This class
establishes the interface for Kafka message production with different
semantic guarantees (simple vs exactly-once).
"""
[docs]
def __init__(self, conf):
"""
Args:
conf (dict): Configuration dictionary for the Kafka producer.
Should contain broker settings and producer-specific options.
"""
super().__init__()
self.producer = Producer(conf)
[docs]
@abstractmethod
def produce(self, *args, **kwargs):
"""Abstract method for producing messages to Kafka topics
Encodes the given data for transport and sends it to the specified topic.
Implementations must define the specific behavior for message production.
Args:
*args: Variable arguments depending on implementation.
**kwargs: Keyword arguments depending on implementation.
Raises:
NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError
[docs]
def __del__(self) -> None:
"""Cleanup method called when the object is destroyed
Ensures that all pending messages are flushed before the producer
is destroyed, preventing message loss.
"""
self.producer.flush()
[docs]
class SimpleKafkaProduceHandler(KafkaProduceHandler):
"""Simple Kafka Producer wrapper without Write-Exactly-Once semantics
Provides basic message production capabilities with at-least-once delivery
guarantees. This implementation prioritizes simplicity and performance over
strict consistency guarantees.
"""
[docs]
def __init__(self):
"""
Sets up a Kafka producer with standard configuration for simple message
production without transactional guarantees. Broker addresses are
automatically configured from the global KAFKA_BROKERS setting.
"""
self.brokers = ",".join(
[f"{broker['hostname']}:{broker['port']}" for broker in KAFKA_BROKERS]
)
conf = {
"bootstrap.servers": self.brokers,
"enable.idempotence": False,
"acks": "1",
}
super().__init__(conf)
[docs]
def produce(self, topic: str, data: str, key: None | str = None) -> None:
"""Produce a message to the specified Kafka topic.
Encodes and sends the provided data to the specified topic. The producer
is flushed before sending to ensure message delivery. Empty data is
silently ignored.
Args:
topic (str): Target Kafka topic name.
data (str): Message data to send (ignored if empty).
key (str, optional): Optional message key for partitioning.
Default: None.
Raises:
KafkaException: If message production fails.
BufferError: If the producer's message buffer is full.
"""
if not data:
return
self.producer.flush()
self.producer.produce(
topic=topic,
key=key,
value=data,
callback=kafka_delivery_report,
)
[docs]
class ExactlyOnceKafkaProduceHandler(KafkaProduceHandler):
"""Kafka Producer wrapper with Write-Exactly-Once semantics
Provides transactional message production with exactly-once delivery
guarantees. This implementation ensures that messages are delivered
exactly once, even in the presence of failures and retries.
Configuration:
- transactional.id: Set to HOSTNAME for unique transaction identification
- enable.idempotence: True (required for exactly-once semantics)
Note:
Each instance must have a unique transactional.id to avoid conflicts.
"""
[docs]
def __init__(self):
"""
Sets up a Kafka producer with transactional capabilities for exactly-once
semantics. The producer is initialized with transactions enabled and
configured with a unique transactional ID based on the hostname.
Raises:
KafkaException: If transaction initialization fails.
"""
self.brokers = ",".join(
[f"{broker['hostname']}:{broker['port']}" for broker in KAFKA_BROKERS]
)
conf = {
"bootstrap.servers": self.brokers,
"transactional.id": HOSTNAME,
"enable.idempotence": True,
}
super().__init__(conf)
self.producer.init_transactions()
[docs]
def produce(self, topic: str, data: str, key: None | str = None) -> None:
"""Produce a message to the specified Kafka topic with exactly-once semantics.
Sends the provided data within a Kafka transaction to ensure exactly-once
delivery. The transaction is automatically committed on success or aborted
on failure. Empty data is silently ignored.
Args:
topic (str): Target Kafka topic name.
data (str): Message data to send (ignored if empty).
key (str, optional): Optional message key for partitioning.
Default: None.
Raises:
KafkaException: If message production or transaction handling fails.
RuntimeError: If transaction commit fails after retries.
"""
if not data:
return
self.producer.flush()
self.producer.begin_transaction()
try:
self.producer.produce(
topic=topic,
key=key,
value=data,
callback=kafka_delivery_report,
)
self.commit_transaction_with_retry()
except Exception:
self.producer.abort_transaction()
logger.error("Transaction aborted.")
raise
[docs]
def commit_transaction_with_retry(
self, max_retries: int = 3, retry_interval_ms: int = 1000
) -> None:
"""Commit a Kafka transaction with automatic retry logic.
Attempts to commit the current transaction with built-in retry mechanism
for handling transient failures. If committing fails due to conflicting
API calls, the method will retry after the specified interval.
Args:
max_retries (int): Maximum number of commit retry attempts. Default: 3.
retry_interval_ms (int): Time to wait between retries in milliseconds.
Default: 1000.
Raises:
KafkaException: If transaction commit fails for reasons other than
conflicting API calls.
RuntimeError: If transaction commit fails after all retry attempts.
"""
committed = False
retry_count = 0
while not committed and retry_count < max_retries:
try:
self.producer.commit_transaction()
committed = True
except KafkaException as e:
if (
"Conflicting commit_transaction API call is already in progress"
in str(e)
):
retry_count += 1
logger.debug(
"Conflicting commit_transaction API call is already in progress: Retrying"
)
time.sleep(retry_interval_ms / 1000.0)
else:
raise e
if not committed:
raise RuntimeError("Failed to commit transaction after retries.")
[docs]
class KafkaConsumeHandler(KafkaHandler):
"""Abstract base class for Kafka Consumer wrappers
Provides common functionality for Kafka message consumption including
topic creation, subscription management, and consumer configuration.
All consumer implementations should extend this class.
Attributes:
brokers (str): Comma-separated list of Kafka broker addresses.
consumer (Consumer): Confluent Kafka Consumer instance.
"""
[docs]
def __init__(self, topics: str | list[str]) -> None:
"""
Creates a Kafka consumer, ensures the specified topics exist, and
subscribes to them. Topics are automatically created if they don't exist.
Args:
topics (str | list[str]): Topic name(s) to subscribe to.
Can be a single topic string or list of topics.
Raises:
TooManyFailedAttemptsError: If topic creation fails after retries.
KafkaException: If consumer creation or subscription fails.
"""
super().__init__()
# get brokers
self.brokers = ",".join(
[f"{broker['hostname']}:{broker['port']}" for broker in KAFKA_BROKERS]
)
# create consumer
conf = {
"bootstrap.servers": self.brokers,
"group.id": CONSUMER_GROUP_ID,
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"enable.partition.eof": True,
}
self.consumer = Consumer(conf)
if isinstance(topics, str):
topics = [topics]
# create topics
admin_client = AdminClient(
{
"bootstrap.servers": self.brokers,
}
)
admin_client.create_topics(
[NewTopic(topic, NUMBER_OF_INSTANCES, 1) for topic in topics]
)
# check if topics are created
if not self._all_topics_created(topics):
raise TooManyFailedAttemptsError("Not all topics were created.")
# subscribe to the topics
self.consumer.subscribe(topics)
[docs]
@abstractmethod
def consume(self, *args, **kwargs):
"""Abstract method for consuming messages from Kafka topics
Implementations must define the specific behavior for message consumption,
including how to handle message polling, error handling, and data decoding.
Args:
*args: Variable arguments depending on implementation.
**kwargs: Keyword arguments depending on implementation.
Raises:
NotImplementedError: This method must be implemented by subclasses.
"""
raise NotImplementedError
[docs]
def consume_as_json(self) -> tuple[Optional[str], dict]:
"""Consume messages and return them in JSON format.
Consumes available messages from subscribed topics, decodes the data,
and returns the contents as a JSON dictionary. This method blocks
until a message is available.
Returns:
tuple[Optional[str], dict]: A tuple containing:
- Message key (str or None)
- Message value as dictionary (empty dict if no message)
Raises:
ValueError: If the message data format is invalid or cannot be parsed.
"""
key, value, topic = self.consume()
if not key and not value:
return None, {}
try:
eval_data = ast.literal_eval(value)
if isinstance(eval_data, dict):
return key, eval_data
else:
raise
except Exception:
raise ValueError("Unknown data format")
def _all_topics_created(self, topics: list[str]) -> bool:
"""Verify that all specified topics have been created successfully.
Polls the Kafka cluster to check if each topic in the provided list
has been created. Retries for a maximum duration if topics are not
immediately available.
Args:
topics (list[str]): List of topic names to verify.
Returns:
bool: True if all topics are created, False if timeout exceeded.
"""
number_of_retries_left = 30
all_topics_created = False
while not all_topics_created: # try for 15 seconds
assigned_topics = self.consumer.list_topics(timeout=10)
all_topics_created = True
for topic in topics:
if topic not in assigned_topics.topics:
all_topics_created = False
if not all_topics_created:
number_of_retries_left -= 1
if not number_of_retries_left > 0:
return False
time.sleep(0.5)
return True
[docs]
def __del__(self) -> None:
"""Cleanup method called when the object is destroyed
Properly closes the Kafka consumer connection to release resources
and ensure graceful shutdown.
"""
if self.consumer:
self.consumer.close()
[docs]
class SimpleKafkaConsumeHandler(KafkaConsumeHandler):
"""Simple Kafka Consumer wrapper without Write-Exactly-Once semantics
Provides basic message consumption capabilities with at-least-once delivery
semantics. Messages are not automatically committed, allowing for manual
offset management by the application.
"""
[docs]
def __init__(self, topics: str | list[str]) -> None:
"""
Args:
topics (str | list[str]): Topic name(s) to subscribe to.
"""
super().__init__(topics)
[docs]
def consume(self) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""
Consume messages from subscribed Kafka topics.
Polls for available messages and decodes them. This method blocks
until a message is available or a keyboard interrupt is received.
The consumer does not automatically commit offsets.
Returns:
tuple[Optional[str], Optional[str], Optional[str]]: A tuple containing:
- Message key (str or None)
- Message value (str or None)
- Topic name (str or None)
Returns (None, None, None) if no valid message is retrieved.
Raises:
ValueError: If the received message is invalid.
KeyboardInterrupt: If consumption is interrupted by user.
KafkaException: If message commit fails.
"""
empty_data_retrieved = False
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
if not empty_data_retrieved:
logger.info("Waiting for messages...")
empty_data_retrieved = True
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logger.error(f"Consumer error: {msg.error()}")
raise ValueError("Message is invalid")
# unpack message
key = msg.key().decode("utf-8") if msg.key() else None
value = msg.value().decode("utf-8") if msg.value() else None
topic = msg.topic() if msg.topic() else None
return key, value, topic
except KeyboardInterrupt:
logger.info("Stopping KafkaConsumeHandler...")
[docs]
class ExactlyOnceKafkaConsumeHandler(KafkaConsumeHandler):
"""Kafka Consumer wrapper with Write-Exactly-Once semantics
Provides message consumption with exactly-once processing guarantees.
Messages are automatically committed after successful processing to
ensure each message is processed exactly once.
"""
[docs]
def __init__(self, topics: str | list[str]) -> None:
"""
Args:
topics (str | list[str]): Topic name(s) to subscribe to.
"""
super().__init__(topics)
[docs]
def consume(self) -> tuple[Optional[str], Optional[str], Optional[str]]:
"""
Consume messages from subscribed Kafka topics with exactly-once semantics.
Polls for available messages, decodes them, and automatically commits
the message offset after successful processing. This ensures each
message is processed exactly once.
Returns:
tuple[Optional[str], Optional[str], Optional[str]]: A tuple containing:
- Message key (str or None)
- Message value (str or None)
- Topic name (str or None)
Returns (None, None, None) if no valid message is retrieved.
Raises:
ValueError: If the received message is invalid.
KeyboardInterrupt: If consumption is interrupted by user.
KafkaException: If message commit fails.
"""
empty_data_retrieved = False
try:
while True:
msg = self.consumer.poll(timeout=1.0)
if msg is None:
if not empty_data_retrieved:
logger.info("Waiting for messages...")
empty_data_retrieved = True
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
logger.error(f"Consumer error: {msg.error()}")
raise ValueError("Message is invalid")
# unpack message
key = msg.key().decode("utf-8") if msg.key() else None
value = msg.value().decode("utf-8") if msg.value() else None
topic = msg.topic() if msg.topic() else None
self.consumer.commit(msg)
return key, value, topic
except KeyboardInterrupt:
logger.info("Shutting down KafkaConsumeHandler...")
@staticmethod
def _is_dicts(obj):
"""Check if the provided object is a list containing only dictionaries.
Args:
obj: Object to check.
Returns:
bool: True if obj is a list of dictionaries, False otherwise.
"""
return isinstance(obj, list) and all(isinstance(item, dict) for item in obj)
[docs]
def consume_as_object(self) -> tuple[Optional[str], Batch]:
"""
Consume messages and return them as Batch objects.
Consumes available messages from subscribed topics, decodes the data,
and converts it to a structured Batch object using marshmallow schema
validation. This method provides type-safe message consumption.
Returns:
tuple[Optional[str], Batch]: A tuple containing:
- Message key (str or None).
- Batch object containing the deserialized message data.
Raises:
ValueError: If the message data format is invalid or cannot be
converted to a Batch object.
marshmallow.ValidationError: If data doesn't conform to Batch schema.
"""
key, value, topic = self.consume()
if not key and not value:
# TODO: Change return value to fit the type, maybe switch to raise
return None, {}
eval_data: dict = ast.literal_eval(value)
if self._is_dicts(eval_data.get("data")):
eval_data["data"] = eval_data.get("data")
else:
eval_data["data"] = [
ast.literal_eval(item) for item in eval_data.get("data")
]
batch_schema = marshmallow_dataclass.class_schema(Batch)()
eval_data: Batch = batch_schema.load(eval_data)
if isinstance(eval_data, Batch):
return key, eval_data
else:
raise ValueError("Unknown data format.")