Source code for base.clickhouse_kafka_sender

"""
The ClickHouseKafkaSender serves as the sender for all inserts into ClickHouse. Whenever a class wants to insert
into a ClickHouse table, the ClickHouseKafkaSender is used to send the respective insert via Kafka.
"""

import os
import sys

import marshmallow_dataclass

sys.path.append(os.getcwd())
from src.base.data_classes.clickhouse_connectors import TABLE_NAME_TO_TYPE
from src.base.kafka_handler import SimpleKafkaProduceHandler
from src.base.log_config import get_logger

logger = get_logger()


[docs] class ClickHouseKafkaSender: """Sends insert operations for the specified table via Kafka to the MonitoringAgent. The ClickHouseKafkaSender serves as a Kafka producer that encapsulates database insert operations into Kafka messages. It automatically handles data schema validation and serialization for the specified ClickHouse table. """
[docs] def __init__(self, table_name: str): """ Args: table_name (str): Name of the ClickHouse table to send insert operations for. Raises: KeyError: If the specified table name is not found in TABLE_NAME_TO_TYPE mapping. """ self.table_name = table_name self.kafka_producer = SimpleKafkaProduceHandler() self.data_schema = marshmallow_dataclass.class_schema( TABLE_NAME_TO_TYPE.get(table_name) )()
[docs] def insert(self, data: dict): """Produces the insert operation to Kafka for ClickHouse insertion. Validates the provided data against the table schema, serializes it, and sends it to the appropriate Kafka topic for processing by the MonitoringAgent. Args: data (dict): Dictionary containing the data to insert into ClickHouse. Must conform to the table's schema structure. Raises: marshmallow.ValidationError: If the data does not conform to the table schema. KafkaException: If the Kafka message cannot be produced. """ self.kafka_producer.produce( topic=f"clickhouse_{self.table_name}", data=self.data_schema.dumps(data), )