Source code for base.utils
import ipaddress
import os
import sys
from typing import Optional
import yaml
from confluent_kafka import KafkaError, Message
sys.path.append(os.getcwd())
from src.base.log_config import get_logger
logger = get_logger()
CONFIG_FILEPATH = os.path.join(os.path.dirname(__file__), "../../config.yaml")
[docs]
def setup_config():
"""Load and return the application configuration from the YAML configuration file.
Reads the configuration file from the predefined CONFIG_FILEPATH and parses
it as a YAML document. This function provides centralized configuration
loading for the entire application.
Returns:
dict: Configuration data as a Python dictionary containing all
application settings and parameters.
Raises:
FileNotFoundError: If the configuration file does not exist at the
expected path.
yaml.YAMLError: If the configuration file contains invalid YAML syntax.
"""
try:
logger.debug(f"Opening configuration file at {CONFIG_FILEPATH}...")
with open(CONFIG_FILEPATH, "r") as file:
config = yaml.safe_load(file)
except FileNotFoundError:
logger.critical(f"File {CONFIG_FILEPATH} does not exist. Aborting...")
raise
logger.debug("Configuration file successfully opened and information returned.")
return config
[docs]
def validate_host(
host: int | str | bytes | ipaddress.IPv4Address | ipaddress.IPv6Address,
) -> ipaddress.IPv4Address | ipaddress.IPv6Address:
"""Validate and normalize a host IP address.
Accepts various input formats for IP addresses and validates them using
the ipaddress module. Returns a properly typed IP address object for
further processing.
Args:
host (int | str | bytes | IPv4Address | IPv6Address): Host IP address
in any supported format (string, integer, bytes, or existing
IP address object).
Returns:
ipaddress.IPv4Address | ipaddress.IPv6Address: Validated IP address
object with the appropriate type.
Raises:
ValueError: If the provided host is not a valid IP address format.
"""
logger.debug(f"Validating host IP address {host}...")
try:
host = ipaddress.ip_address(host)
except Exception as err:
raise ValueError(f"Invalid host: {host}, {err=}")
logger.debug(f"Host {host} is valid.")
return host
[docs]
def validate_port(port: int) -> int:
"""Validate that a port number is within the valid range.
Checks if the provided port number is an integer and falls within
the valid TCP/UDP port range (1-65535). Returns the validated port
number if valid.
Args:
port (int): Port number to validate.
Returns:
int: Validated port number.
Raises:
TypeError: If port is not an integer.
ValueError: If port number is not in the valid range (1-65535).
"""
logger.debug(f"Validating port {port}...")
if not isinstance(port, int):
raise TypeError
if not (1 <= port <= 65535):
raise ValueError(f"Invalid port: {port}")
logger.debug(f"Port {port} is valid.")
return port
[docs]
def kafka_delivery_report(err: Optional[KafkaError], msg: Optional[Message]):
"""Callback function for Kafka message delivery reports
Used by Kafka Producers to handle delivery confirmations and errors.
Logs successful deliveries with topic and partition information, and
warns about delivery failures.
Args:
err (Optional[KafkaError]): Error object if delivery failed, None if successful.
msg (Optional[Message]): Message object containing delivery details, None if error.
"""
if err:
logger.warning("Message delivery failed: {}".format(err))
else:
logger.debug(
"Message delivered to topic={} [partition={}]".format(
msg.topic(), msg.partition()
)
)
[docs]
def normalize_ipv4_address(
address: ipaddress.IPv4Address, prefix_length: int
) -> tuple[ipaddress.IPv4Address, int]:
"""Extract the network portion of an IPv4 address using the specified prefix length.
Creates a subnet identifier by zeroing out the host portion of the IP address
based on the provided prefix length. This is useful for network analysis
and grouping IP addresses by subnet.
Args:
address (ipaddress.IPv4Address): The IPv4 address to normalize.
prefix_length (int): CIDR prefix length (0-32) defining the network portion.
Returns:
tuple[ipaddress.IPv4Address, int]: A tuple containing:
- Network address with host bits set to zero.
- The prefix length used for normalization.
Raises:
ValueError: If prefix_length is not in the valid range (0-32).
"""
if not (0 <= prefix_length <= 32):
raise ValueError("Invalid prefix length for IPv4. Must be between 0 and 32.")
net = ipaddress.IPv4Network((address, prefix_length), strict=False)
return net.network_address, prefix_length
[docs]
def normalize_ipv6_address(
address: ipaddress.IPv6Address, prefix_length: int
) -> tuple[ipaddress.IPv6Address, int]:
"""Extract the network portion of an IPv6 address using the specified prefix length.
Creates a subnet identifier by zeroing out the host portion of the IP address
based on the provided prefix length. This is useful for network analysis
and grouping IPv6 addresses by subnet.
Args:
address (ipaddress.IPv6Address): The IPv6 address to normalize.
prefix_length (int): CIDR prefix length (0-128) defining the network portion.
Returns:
tuple[ipaddress.IPv6Address, int]: A tuple containing:
- Network address with host bits set to zero.
- The prefix length used for normalization.
Raises:
ValueError: If prefix_length is not in the valid range (0-128).
"""
if not (0 <= prefix_length <= 128):
raise ValueError("Invalid prefix length for IPv6. Must be between 0 and 128.")
net = ipaddress.IPv6Network((address, prefix_length), strict=False)
return net.network_address, prefix_length