Source code for monitoring.monitoring_agent

import asyncio
import os
import sys
from dataclasses import asdict

import marshmallow_dataclass

sys.path.append(os.getcwd())
from src.monitoring.clickhouse_batch_sender import *
from src.base.kafka_handler import SimpleKafkaConsumeHandler
from src.base.data_classes.clickhouse_connectors import TABLE_NAME_TO_TYPE
from src.base.log_config import get_logger
from src.base.utils import setup_config

logger = get_logger()

CONFIG = setup_config()
CREATE_TABLES_DIRECTORY = "docker/create_tables"  # TODO: Get from config
CLICKHOUSE_HOSTNAME = CONFIG["environment"]["monitoring"]["clickhouse_server"][
    "hostname"
]


[docs] def prepare_all_tables(): """Prepares and creates all ClickHouse tables from SQL files. Reads all SQL files from the CREATE_TABLES_DIRECTORY and executes them to create the required database tables for monitoring data storage. Raises: Exception: If any CREATE TABLE statement fails to execute. """ def _load_contents(file_name: str) -> str: with open(file_name, "r") as file: return file.read() for filename in os.listdir(CREATE_TABLES_DIRECTORY): if filename.endswith(".sql"): file_path = os.path.join(CREATE_TABLES_DIRECTORY, filename) sql_content = _load_contents(file_path) with clickhouse_connect.get_client(host=CLICKHOUSE_HOSTNAME) as client: try: client.command(sql_content) except Exception as e: logger.critical("Error in CREATE TABLE statement") raise e
[docs] class MonitoringAgent: """Main component of the Monitoring stage to collect and store pipeline data Consumes monitoring data from Kafka topics and batches them for efficient insertion into ClickHouse. Handles data deserialization and forwards it to the batch sender for persistent storage. """
[docs] def __init__(self): """ Sets up consumption from all ClickHouse-related Kafka topics and initializes the batch sender for efficient data insertion. """ self.table_names = [ "server_logs", "server_logs_timestamps", "failed_dns_loglines", "logline_to_batches", "dns_loglines", "logline_timestamps", "batch_timestamps", "suspicious_batches_to_batch", "suspicious_batch_timestamps", "alerts", "fill_levels", ] self.topics = [f"clickhouse_{table_name}" for table_name in self.table_names] self.kafka_consumer = SimpleKafkaConsumeHandler(self.topics) self.batch_sender = ClickHouseBatchSender()
[docs] async def start(self): """Starts the monitoring agent to consume and process data continuously. Runs an infinite loop to consume messages from Kafka topics, deserialize the data according to table schemas, and forward it to the batch sender for insertion into ClickHouse. Raises: KeyboardInterrupt: When the agent is manually stopped. Exception: For any other processing errors (logged as warnings). """ loop = asyncio.get_running_loop() while True: try: key, value, topic = await loop.run_in_executor( None, self.kafka_consumer.consume ) logger.debug(f"From Kafka: {value}") table_name = topic.replace("clickhouse_", "") data_schema = marshmallow_dataclass.class_schema( TABLE_NAME_TO_TYPE.get(table_name) )() data = data_schema.loads(value) self.batch_sender.add(table_name, asdict(data)) except KeyboardInterrupt: logger.info("Stopped MonitoringAgent.") break except Exception as e: logger.warning(e)
[docs] def main(): """Creates the :class:`MonitoringAgent` instance and starts it. Entry point for the monitoring agent that initializes and runs the asynchronous monitoring process. """ clickhouse_consumer = MonitoringAgent() asyncio.run(clickhouse_consumer.start())
if __name__ == "__main__": # pragma: no cover main()