monitoring package#

Submodules#

monitoring.clickhouse_batch_sender module#

class monitoring.clickhouse_batch_sender.ClickHouseBatchSender[source]#

Bases: object

Manages batched insert operations for ClickHouse tables.

Collects insert commands in batches and sends them to ClickHouse when either the batch size limit is reached or a timeout occurs. Provides efficient bulk insertion with automatic schema validation for all monitored tables.

add(table_name: str, data: dict[str, Any])[source]#

Adds the data to the batch for the table.

Verifies the data fields first, then adds the data to the appropriate table batch. Triggers immediate insertion if batch size limit is reached.

Parameters:
  • table_name (str) – Name of the table to add data to.

  • data (dict) – The values for each cell in the table.

Raises:
  • ValueError – If table name is invalid or data format is incorrect.

  • TypeError – If data types don’t match table schema.

insert(table_name: str)[source]#

Inserts the batch for the given table.

Executes the accumulated batch insert operation for the specified table and clears the batch after successful insertion.

Parameters:

table_name (str) – Name of the table to insert data to.

insert_all()[source]#

Inserts the batch for every table.

Executes batch insert operations for all tables with pending data and cancels the current timer if active.

class monitoring.clickhouse_batch_sender.Table(name: str, columns: dict[str, type])[source]#

Bases: object

Defines the table name and allowed column fields with types.

Stores metadata about ClickHouse table structure including column names and their expected data types for validation during batch insertion.

columns: dict[str, type]#
name: str#
verify(data: dict[str, Any])[source]#

Verifies if the data has the correct columns and types.

Validates that the provided data dictionary contains the expected columns with correct data types according to the table schema definition.

Parameters:

data (dict) – The values for each cell.

Raises:
  • ValueError – If column count or column names don’t match expected schema.

  • TypeError – If data types don’t match expected column types.

monitoring.monitoring_agent module#

class monitoring.monitoring_agent.MonitoringAgent[source]#

Bases: object

Main component of the Monitoring stage to collect and store pipeline data

Consumes monitoring data from Kafka topics and batches them for efficient insertion into ClickHouse. Handles data deserialization and forwards it to the batch sender for persistent storage.

__init__()[source]#

Sets up consumption from all ClickHouse-related Kafka topics and initializes the batch sender for efficient data insertion.

async start()[source]#

Starts the monitoring agent to consume and process data continuously.

Runs an infinite loop to consume messages from Kafka topics, deserialize the data according to table schemas, and forward it to the batch sender for insertion into ClickHouse.

Raises:
monitoring.monitoring_agent.main()[source]#

Creates the MonitoringAgent instance and starts it.

Entry point for the monitoring agent that initializes and runs the asynchronous monitoring process.

monitoring.monitoring_agent.prepare_all_tables()[source]#

Prepares and creates all ClickHouse tables from SQL files.

Reads all SQL files from the CREATE_TABLES_DIRECTORY and executes them to create the required database tables for monitoring data storage.

Raises:

Exception – If any CREATE TABLE statement fails to execute.