Source code for train.feature

import os
import sys
from string import ascii_lowercase as alc
from typing import List

import math
import polars as pl

sys.path.append(os.getcwd())
from src.base.log_config import get_logger

logger = get_logger("train.feature")


[docs] class Processor: """Extracts statistical and linguistic features from domain name datasets. Computes comprehensive feature sets including domain label statistics, character frequencies, entropy measures, and domain structure analysis for machine learning model training and DGA detection tasks. """
[docs] def __init__(self, features_to_drop: List) -> None: """ Args: features_to_drop (List): List of column names to exclude from final features. """ self.features_to_drop = features_to_drop
[docs] def transform(self, x: pl.DataFrame) -> pl.DataFrame: """Extracts comprehensive feature set from domain name dataset. Computes domain label statistics, character frequencies for all letters, character type ratios, and entropy measures for different domain levels. Handles missing values and removes specified columns from final output. Args: x (pl.DataFrame): Input dataset with domain structure columns. Returns: pl.DataFrame: Feature-engineered dataset ready for ML model training. """ logger.debug("Start data transformation") x = x.with_columns( [ (pl.col("query").str.split(".").list.len().alias("label_length")), ( pl.col("query") .str.split(".") .list.max() .str.len_chars() .alias("label_max") ), ( pl.col("query") .str.strip_chars(".") .str.len_chars() .alias("label_average") ), ] ) logger.debug("Get letter frequency") for i in alc: x = x.with_columns( [ ( pl.col("query") .str.to_lowercase() .str.count_matches(rf"{i}") .truediv(pl.col("query").str.len_chars()) ).alias(f"freq_{i}"), ] ) logger.debug("Get full, alpha, special, and numeric count.") for level in ["thirdleveldomain", "secondleveldomain", "fqdn"]: x = x.with_columns( [ ( pl.when(pl.col(level).str.len_chars().eq(0)) .then(pl.lit(0)) .otherwise( pl.col(level) .str.len_chars() .truediv(pl.col(level).str.len_chars()) ) ).alias(f"{level}_full_count"), ( pl.when(pl.col(level).str.len_chars().eq(0)) .then(pl.lit(0)) .otherwise( pl.col(level) .str.count_matches(r"[a-zA-Z]") .truediv(pl.col(level).str.len_chars()) ) ).alias(f"{level}_alpha_count"), ( pl.when(pl.col(level).str.len_chars().eq(0)) .then(pl.lit(0)) .otherwise( pl.col(level) .str.count_matches(r"[0-9]") .truediv(pl.col(level).str.len_chars()) ) ).alias(f"{level}_numeric_count"), ( pl.when(pl.col(level).str.len_chars().eq(0)) .then(pl.lit(0)) .otherwise( pl.col(level) .str.count_matches(r"[^\w\s]") .truediv(pl.col(level).str.len_chars()) ) ).alias(f"{level}_special_count"), ] ) logger.debug("Start entropy calculation") for ent in ["fqdn", "thirdleveldomain", "secondleveldomain"]: x = x.with_columns( [ ( pl.col(ent).map_elements( lambda x: [ float(str(x).count(c)) / len(str(x)) for c in dict.fromkeys(list(str(x))) ], return_dtype=pl.List(pl.Float64), ) ).alias("prob"), ] ) t = math.log(2.0) x = x.with_columns( [ ( pl.col("prob") .list.eval(-pl.element() * pl.element().log() / t) .list.sum() ).alias(f"{ent}_entropy"), ] ) x = x.drop("prob") logger.debug("Finished entropy calculation") logger.debug("Fill NaN.") x = x.fill_nan(0) logger.debug("Drop features that are not useful.") x = x.drop(self.features_to_drop) logger.debug("Finished data transformation") logger.info("Finished data transformation") return x