base package

Contents

base package#

Submodules#

base.clickhouse_kafka_sender module#

The ClickHouseKafkaSender serves as the sender for all inserts into ClickHouse. Whenever a class wants to insert into a ClickHouse table, the ClickHouseKafkaSender is used to send the respective insert via Kafka.

class base.clickhouse_kafka_sender.ClickHouseKafkaSender(table_name: str)[source]#

Bases: object

Sends insert operations for the specified table via Kafka to the MonitoringAgent.

The ClickHouseKafkaSender serves as a Kafka producer that encapsulates database insert operations into Kafka messages. It automatically handles data schema validation and serialization for the specified ClickHouse table.

__init__(table_name: str)[source]#
Parameters:

table_name (str) – Name of the ClickHouse table to send insert operations for.

Raises:

KeyError – If the specified table name is not found in TABLE_NAME_TO_TYPE mapping.

insert(data: dict)[source]#

Produces the insert operation to Kafka for ClickHouse insertion.

Validates the provided data against the table schema, serializes it, and sends it to the appropriate Kafka topic for processing by the MonitoringAgent.

Parameters:

data (dict) – Dictionary containing the data to insert into ClickHouse. Must conform to the table’s schema structure.

Raises:
  • marshmallow.ValidationError – If the data does not conform to the table schema.

  • KafkaException – If the Kafka message cannot be produced.

base.kafka_handler module#

The Write-Exactly-Once-Semantics used by the KafkaHandler is shown by confluentinc/confluent-kafka-python, parts of which are similar to the code in this module.

class base.kafka_handler.ExactlyOnceKafkaConsumeHandler(topics: str | list[str])[source]#

Bases: KafkaConsumeHandler

Kafka Consumer wrapper with Write-Exactly-Once semantics

Provides message consumption with exactly-once processing guarantees. Messages are automatically committed after successful processing to ensure each message is processed exactly once.

__init__(topics: str | list[str]) None[source]#
Parameters:

topics (str | list[str]) – Topic name(s) to subscribe to.

consume() tuple[str | None, str | None, str | None][source]#

Consume messages from subscribed Kafka topics with exactly-once semantics.

Polls for available messages, decodes them, and automatically commits the message offset after successful processing. This ensures each message is processed exactly once.

Returns:

tuple[Optional[str], Optional[str], Optional[str]]

A tuple containing:
  • Message key (str or None)

  • Message value (str or None)

  • Topic name (str or None)

Returns (None, None, None) if no valid message is retrieved.

Raises:
  • ValueError – If the received message is invalid.

  • KeyboardInterrupt – If consumption is interrupted by user.

  • KafkaException – If message commit fails.

consume_as_object() tuple[str | None, Batch][source]#

Consume messages and return them as Batch objects.

Consumes available messages from subscribed topics, decodes the data, and converts it to a structured Batch object using marshmallow schema validation. This method provides type-safe message consumption.

Returns:

tuple[Optional[str], Batch]

A tuple containing:
  • Message key (str or None).

  • Batch object containing the deserialized message data.

Raises:
  • ValueError – If the message data format is invalid or cannot be converted to a Batch object.

  • marshmallow.ValidationError – If data doesn’t conform to Batch schema.

class base.kafka_handler.ExactlyOnceKafkaProduceHandler[source]#

Bases: KafkaProduceHandler

Kafka Producer wrapper with Write-Exactly-Once semantics

Provides transactional message production with exactly-once delivery guarantees. This implementation ensures that messages are delivered exactly once, even in the presence of failures and retries.

Configuration:
  • transactional.id: Set to HOSTNAME for unique transaction identification

  • enable.idempotence: True (required for exactly-once semantics)

Note

Each instance must have a unique transactional.id to avoid conflicts.

__init__()[source]#

Sets up a Kafka producer with transactional capabilities for exactly-once semantics. The producer is initialized with transactions enabled and configured with a unique transactional ID based on the hostname.

Raises:

KafkaException – If transaction initialization fails.

commit_transaction_with_retry(max_retries: int = 3, retry_interval_ms: int = 1000) None[source]#

Commit a Kafka transaction with automatic retry logic.

Attempts to commit the current transaction with built-in retry mechanism for handling transient failures. If committing fails due to conflicting API calls, the method will retry after the specified interval.

Parameters:
  • max_retries (int) – Maximum number of commit retry attempts. Default: 3.

  • retry_interval_ms (int) – Time to wait between retries in milliseconds. Default: 1000.

Raises:
  • KafkaException – If transaction commit fails for reasons other than conflicting API calls.

  • RuntimeError – If transaction commit fails after all retry attempts.

produce(topic: str, data: str, key: None | str = None) None[source]#

Produce a message to the specified Kafka topic with exactly-once semantics.

Sends the provided data within a Kafka transaction to ensure exactly-once delivery. The transaction is automatically committed on success or aborted on failure. Empty data is silently ignored.

Parameters:
  • topic (str) – Target Kafka topic name.

  • data (str) – Message data to send (ignored if empty).

  • key (str, optional) – Optional message key for partitioning. Default: None.

Raises:
  • KafkaException – If message production or transaction handling fails.

  • RuntimeError – If transaction commit fails after retries.

class base.kafka_handler.KafkaConsumeHandler(topics: str | list[str])[source]#

Bases: KafkaHandler

Abstract base class for Kafka Consumer wrappers

Provides common functionality for Kafka message consumption including topic creation, subscription management, and consumer configuration. All consumer implementations should extend this class.

brokers#

Comma-separated list of Kafka broker addresses.

Type:

str

consumer#

Confluent Kafka Consumer instance.

Type:

Consumer

__del__() None[source]#

Cleanup method called when the object is destroyed

Properly closes the Kafka consumer connection to release resources and ensure graceful shutdown.

__init__(topics: str | list[str]) None[source]#

Creates a Kafka consumer, ensures the specified topics exist, and subscribes to them. Topics are automatically created if they don’t exist.

Parameters:

topics (str | list[str]) – Topic name(s) to subscribe to. Can be a single topic string or list of topics.

Raises:
  • TooManyFailedAttemptsError – If topic creation fails after retries.

  • KafkaException – If consumer creation or subscription fails.

abstract consume(*args, **kwargs)[source]#

Abstract method for consuming messages from Kafka topics

Implementations must define the specific behavior for message consumption, including how to handle message polling, error handling, and data decoding.

Parameters:
  • *args – Variable arguments depending on implementation.

  • **kwargs – Keyword arguments depending on implementation.

Raises:

NotImplementedError – This method must be implemented by subclasses.

consume_as_json() tuple[str | None, dict][source]#

Consume messages and return them in JSON format.

Consumes available messages from subscribed topics, decodes the data, and returns the contents as a JSON dictionary. This method blocks until a message is available.

Returns:

tuple[Optional[str], dict]

A tuple containing:
  • Message key (str or None)

  • Message value as dictionary (empty dict if no message)

Raises:

ValueError – If the message data format is invalid or cannot be parsed.

class base.kafka_handler.KafkaHandler[source]#

Bases: object

Base class for all Kafka wrappers and handlers

Provides common initialization and configuration setup for Kafka producers and consumers. This abstract base class establishes the foundation for specific Kafka handling implementations.

__init__() None[source]#

Sets up the initial configuration and initializes the consumer attribute to None. Specific implementations should override this method to establish their respective Kafka clients.

exception base.kafka_handler.KafkaMessageFetchException[source]#

Bases: Exception

Exception raised when Kafka message consumption fails

This exception is raised when there are errors during the process of fetching or consuming messages from Kafka topics, including network issues, timeout errors, or malformed message data.

class base.kafka_handler.KafkaProduceHandler(conf)[source]#

Bases: KafkaHandler

Abstract base class for Kafka Producer wrappers

Extends KafkaHandler to provide producer-specific functionality. This class establishes the interface for Kafka message production with different semantic guarantees (simple vs exactly-once).

__del__() None[source]#

Cleanup method called when the object is destroyed

Ensures that all pending messages are flushed before the producer is destroyed, preventing message loss.

__init__(conf)[source]#
Parameters:

conf (dict) – Configuration dictionary for the Kafka producer. Should contain broker settings and producer-specific options.

abstract produce(*args, **kwargs)[source]#

Abstract method for producing messages to Kafka topics

Encodes the given data for transport and sends it to the specified topic. Implementations must define the specific behavior for message production.

Parameters:
  • *args – Variable arguments depending on implementation.

  • **kwargs – Keyword arguments depending on implementation.

Raises:

NotImplementedError – This method must be implemented by subclasses.

class base.kafka_handler.SimpleKafkaConsumeHandler(topics: str | list[str])[source]#

Bases: KafkaConsumeHandler

Simple Kafka Consumer wrapper without Write-Exactly-Once semantics

Provides basic message consumption capabilities with at-least-once delivery semantics. Messages are not automatically committed, allowing for manual offset management by the application.

__init__(topics: str | list[str]) None[source]#
Parameters:

topics (str | list[str]) – Topic name(s) to subscribe to.

consume() tuple[str | None, str | None, str | None][source]#

Consume messages from subscribed Kafka topics.

Polls for available messages and decodes them. This method blocks until a message is available or a keyboard interrupt is received. The consumer does not automatically commit offsets.

Returns:

tuple[Optional[str], Optional[str], Optional[str]]

A tuple containing:
  • Message key (str or None)

  • Message value (str or None)

  • Topic name (str or None)

Returns (None, None, None) if no valid message is retrieved.

Raises:
  • ValueError – If the received message is invalid.

  • KeyboardInterrupt – If consumption is interrupted by user.

  • KafkaException – If message commit fails.

class base.kafka_handler.SimpleKafkaProduceHandler[source]#

Bases: KafkaProduceHandler

Simple Kafka Producer wrapper without Write-Exactly-Once semantics

Provides basic message production capabilities with at-least-once delivery guarantees. This implementation prioritizes simplicity and performance over strict consistency guarantees.

__init__()[source]#

Sets up a Kafka producer with standard configuration for simple message production without transactional guarantees. Broker addresses are automatically configured from the global KAFKA_BROKERS setting.

produce(topic: str, data: str, key: None | str = None) None[source]#

Produce a message to the specified Kafka topic.

Encodes and sends the provided data to the specified topic. The producer is flushed before sending to ensure message delivery. Empty data is silently ignored.

Parameters:
  • topic (str) – Target Kafka topic name.

  • data (str) – Message data to send (ignored if empty).

  • key (str, optional) – Optional message key for partitioning. Default: None.

Raises:
  • KafkaException – If message production fails.

  • BufferError – If the producer’s message buffer is full.

exception base.kafka_handler.TooManyFailedAttemptsError[source]#

Bases: Exception

Exception raised when operations exceed the maximum number of retry attempts

This exception is typically raised during Kafka topic creation or connection establishment when the maximum number of retry attempts has been exceeded.

base.log_config module#

class base.log_config.CustomHandler(stream=None)[source]#

Bases: StreamHandler

Custom logging handler that applies different formatting based on log level

Provides level-specific message formatting where INFO and WARNING messages use a simplified format, while DEBUG, ERROR, and CRITICAL messages include detailed context information such as module name, line number, and function name.

format(record) str[source]#

Format log records with level-appropriate detail.

Applies simple formatting for INFO and WARNING messages, and detailed formatting (including module, line number, and function name) for all other log levels.

Parameters:

record – The log record to format.

Returns:

str – Formatted log message string.

base.log_config.get_logger(module_name: str = 'base') Logger[source]#

Create or retrieve a configured logger for a specific module.

Sets up a logger with custom formatting and debug level configuration based on the module-specific settings in config.yaml. If no module-specific configuration exists, falls back to the base module settings.

Parameters:

module_name (str) – Name of the module to create a logger for. Must match a module defined in config.yaml logging.modules. Default: “base”.

Returns:

logging.Logger – Configured logger instance for the specified module.

Raises:
base.log_config.load_config() Dict[str, Any][source]#

Load the application configuration from the YAML configuration file.

Returns:

Dict[str, Any] – Parsed configuration data as a dictionary.

Raises:
  • FileNotFoundError – If the configuration file cannot be found at the expected path.

  • yaml.YAMLError – If the configuration file contains invalid YAML syntax.

base.logline_handler module#

class base.logline_handler.FieldType(name: str)[source]#

Bases: object

Base class for all field validation types in the logline format configuration

Provides the common interface for field validation. All specific field types inherit from this class and implement their own validation logic in the validate() method.

validate(value) bool[source]#

Validates the input value according to the field type’s rules.

This method must be implemented by all inheriting field type classes. Each implementation defines specific validation logic appropriate for the field type.

Parameters:

value – The value to be validated.

Returns:

True if the value is valid according to the field type’s rules, False otherwise.

Raises:

NotImplementedError – This base method must be overridden by subclasses.

class base.logline_handler.IpAddress(name)[source]#

Bases: FieldType

Field type for IP address validation

Validates both IPv4 and IPv6 addresses using the utility validation functions. No additional configuration parameters are required beyond the field name.

validate(value) bool[source]#

Validates the input value as a valid IP address.

Parameters:

value – The IP address string to be validated.

Returns:

True if the value is a valid IPv4 or IPv6 address, False otherwise.

class base.logline_handler.ListItem(name: str, allowed_list: list, relevant_list: list)[source]#

Bases: FieldType

Field type for list-based validation with optional relevance filtering

Validates field values against an allowed list and optionally defines which values are considered relevant for filtering in later pipeline stages. The allowed_list contains all valid values, while the optional relevant_list defines a subset used for relevance-based filtering in the Log Filtering stage.

check_relevance(value) bool[source]#

Checks if the given value is considered relevant for filtering.

Parameters:

value – Value to be checked for relevance.

Returns:

True if the value is relevant (in relevant_list or if no relevant_list is defined), False otherwise.

validate(value) bool[source]#

Validates the input value against the allowed list.

Parameters:

value – The value to be validated.

Returns:

True if the value is in the allowed_list, False otherwise.

class base.logline_handler.LoglineHandler[source]#

Bases: object

Main handler for logline validation and processing

Manages the configuration-based validation of loglines according to the format specified in the configuration file. Provides validation, field extraction, and relevance checking functionality for the log processing pipeline.

check_relevance(logline_dict: dict) bool[source]#

Checks if a logline is relevant based on configured relevance criteria.

Iterates through all ListItem fields and checks their relevance using the check_relevance method. A logline is considered relevant only if all ListItem fields pass their relevance checks.

Parameters:

logline_dict (dict) – Logline fields as dictionary to be checked for relevance.

Returns:

True if the logline is relevant according to all configured criteria, False otherwise.

validate_logline(logline: str) bool[source]#

Validates a complete logline according to the configured format.

Checks if the number of fields is correct and validates each field using the appropriate field type validator. Provides detailed error logging with visual indicators for incorrect fields.

Parameters:

logline (str) – Logline string to be validated.

Returns:

True if the logline contains correct fields in the configured format, False otherwise.

validate_logline_and_get_fields_as_json(logline: str) dict[source]#

Validates a logline and returns the fields as a dictionary.

Combines validation and field extraction in a single operation. First validates the logline format, then extracts and returns the fields.

Parameters:

logline (str) – Logline string to be validated and parsed.

Returns:

Dictionary with field names as keys and field values as values.

Raises:

ValueError – If logline validation fails.

class base.logline_handler.RegEx(name: str, pattern: str)[source]#

Bases: FieldType

Field type for regular expression pattern validation

Validates field values against a specified regular expression pattern. Useful for validating structured text fields like domain names, sizes, or custom formats.

validate(value) bool[source]#

Validates the input value against the configured regular expression pattern.

Parameters:

value – The value to be validated against the regex pattern.

Returns:

True if the value matches the pattern, False otherwise.

class base.logline_handler.Timestamp(name: str, timestamp_format: str)[source]#

Bases: FieldType

Field type for timestamp validation and parsing

Validates timestamp fields according to a specified format string and provides functionality to convert valid timestamps to ISO format for internal processing.

get_timestamp_as_str(value) str[source]#

Converts a valid timestamp to ISO format string.

Parameters:

value – Correctly formatted timestamp according to self.timestamp_format.

Returns:

ISO formatted timestamp string for internal processing.

validate(value) bool[source]#

Validates the input value against the configured timestamp format.

Parameters:

value – The timestamp string to be validated.

Returns:

True if the value matches the timestamp format, False otherwise.

base.utils module#

base.utils.kafka_delivery_report(err: KafkaError | None, msg: Message | None)[source]#

Callback function for Kafka message delivery reports

Used by Kafka Producers to handle delivery confirmations and errors. Logs successful deliveries with topic and partition information, and warns about delivery failures.

Parameters:
  • err (Optional[KafkaError]) – Error object if delivery failed, None if successful.

  • msg (Optional[Message]) – Message object containing delivery details, None if error.

base.utils.normalize_ipv4_address(address: IPv4Address, prefix_length: int) tuple[IPv4Address, int][source]#

Extract the network portion of an IPv4 address using the specified prefix length.

Creates a subnet identifier by zeroing out the host portion of the IP address based on the provided prefix length. This is useful for network analysis and grouping IP addresses by subnet.

Parameters:
  • address (ipaddress.IPv4Address) – The IPv4 address to normalize.

  • prefix_length (int) – CIDR prefix length (0-32) defining the network portion.

Returns:

tuple[ipaddress.IPv4Address, int]

A tuple containing:
  • Network address with host bits set to zero.

  • The prefix length used for normalization.

Raises:

ValueError – If prefix_length is not in the valid range (0-32).

base.utils.normalize_ipv6_address(address: IPv6Address, prefix_length: int) tuple[IPv6Address, int][source]#

Extract the network portion of an IPv6 address using the specified prefix length.

Creates a subnet identifier by zeroing out the host portion of the IP address based on the provided prefix length. This is useful for network analysis and grouping IPv6 addresses by subnet.

Parameters:
  • address (ipaddress.IPv6Address) – The IPv6 address to normalize.

  • prefix_length (int) – CIDR prefix length (0-128) defining the network portion.

Returns:

tuple[ipaddress.IPv6Address, int]

A tuple containing:
  • Network address with host bits set to zero.

  • The prefix length used for normalization.

Raises:

ValueError – If prefix_length is not in the valid range (0-128).

base.utils.setup_config()[source]#

Load and return the application configuration from the YAML configuration file.

Reads the configuration file from the predefined CONFIG_FILEPATH and parses it as a YAML document. This function provides centralized configuration loading for the entire application.

Returns:

dict

Configuration data as a Python dictionary containing all

application settings and parameters.

Raises:
  • FileNotFoundError – If the configuration file does not exist at the expected path.

  • yaml.YAMLError – If the configuration file contains invalid YAML syntax.

base.utils.validate_host(host: int | str | bytes | IPv4Address | IPv6Address) IPv4Address | IPv6Address[source]#

Validate and normalize a host IP address.

Accepts various input formats for IP addresses and validates them using the ipaddress module. Returns a properly typed IP address object for further processing.

Parameters:

host (int | str | bytes | IPv4Address | IPv6Address) – Host IP address in any supported format (string, integer, bytes, or existing IP address object).

Returns:

ipaddress.IPv4Address | ipaddress.IPv6Address

Validated IP address

object with the appropriate type.

Raises:

ValueError – If the provided host is not a valid IP address format.

base.utils.validate_port(port: int) int[source]#

Validate that a port number is within the valid range.

Checks if the provided port number is an integer and falls within the valid TCP/UDP port range (1-65535). Returns the validated port number if valid.

Parameters:

port (int) – Port number to validate.

Returns:

int – Validated port number.

Raises:
  • TypeError – If port is not an integer.

  • ValueError – If port number is not in the valid range (1-65535).