monitoring package#
Submodules#
monitoring.clickhouse_batch_sender module#
- class monitoring.clickhouse_batch_sender.ClickHouseBatchSender[source]#
Bases:
objectManages batched insert operations for ClickHouse tables.
Collects insert commands in batches and sends them to ClickHouse when either the batch size limit is reached or a timeout occurs. Provides efficient bulk insertion with automatic schema validation for all monitored tables.
- add(table_name: str, data: dict[str, Any])[source]#
Adds the data to the batch for the table.
Verifies the data fields first, then adds the data to the appropriate table batch. Triggers immediate insertion if batch size limit is reached.
- Parameters:
- Raises:
ValueError – If table name is invalid or data format is incorrect.
TypeError – If data types don’t match table schema.
- class monitoring.clickhouse_batch_sender.Table(name: str, columns: dict[str, type])[source]#
Bases:
objectDefines the table name and allowed column fields with types.
Stores metadata about ClickHouse table structure including column names and their expected data types for validation during batch insertion.
- verify(data: dict[str, Any])[source]#
Verifies if the data has the correct columns and types.
Validates that the provided data dictionary contains the expected columns with correct data types according to the table schema definition.
- Parameters:
data (dict) – The values for each cell.
- Raises:
ValueError – If column count or column names don’t match expected schema.
TypeError – If data types don’t match expected column types.
monitoring.monitoring_agent module#
- class monitoring.monitoring_agent.MonitoringAgent[source]#
Bases:
objectMain component of the Monitoring stage to collect and store pipeline data
Consumes monitoring data from Kafka topics and batches them for efficient insertion into ClickHouse. Handles data deserialization and forwards it to the batch sender for persistent storage.
- __init__()[source]#
Sets up consumption from all ClickHouse-related Kafka topics and initializes the batch sender for efficient data insertion.
- async start()[source]#
Starts the monitoring agent to consume and process data continuously.
Runs an infinite loop to consume messages from Kafka topics, deserialize the data according to table schemas, and forward it to the batch sender for insertion into ClickHouse.
- Raises:
KeyboardInterrupt – When the agent is manually stopped.
Exception – For any other processing errors (logged as warnings).
- monitoring.monitoring_agent.main()[source]#
Creates the
MonitoringAgentinstance and starts it.Entry point for the monitoring agent that initializes and runs the asynchronous monitoring process.
- monitoring.monitoring_agent.prepare_all_tables()[source]#
Prepares and creates all ClickHouse tables from SQL files.
Reads all SQL files from the CREATE_TABLES_DIRECTORY and executes them to create the required database tables for monitoring data storage.
- Raises:
Exception – If any CREATE TABLE statement fails to execute.