Pipeline#

Overview#

The core component of the software’s architecture is its data pipeline. It consists of five stages/modules, and data traverses through it using Apache Kafka.

_images/pipeline_overview.png

Stage 1: Log Storage#

This stage serves as the central contact point for all data. Data is read and entered into the pipeline.

Overview#

The LogServer class is the core component of this stage. It reads data from one or multiple data sources and enters it into the pipeline by sending it to Kafka, where it can be obtained by the following module. For monitoring, it logs all ingoing log lines including their timestamps of entering and leaving the module.

Main Class#

class src.logserver.server.LogServer[source]#

Main component of the Log Storage stage to enter data into the pipeline

Receives and sends single log lines. Simultaneously, listens for messages via Kafka and reads newly added lines from an input file. Sends every log line to a Kafka topic under which it is obtained by the next stage.

Usage and configuration#

The LogServer simultaneously listens on a Kafka topic and reads from an input file. The configuration allows changing the Kafka topic to listen on, as well as the file name to read from.

The Kafka topic to listen on can be changed through setting the environment.kafka_topics.pipeline.logserver_in field in config.yaml. Changing the file name to read from differs depending on your environment:

  • Without Docker:

    • To change the input file path, change pipeline.log_storage.logserver.input_file in the config.yaml. The default setting is "/opt/file.txt".

  • With Docker:

    • Docker mounts the file specified in MOUNT_PATH in the file docker/.env. By default, this is set to ../../default.txt, which refers to the file docker/default.txt.

    • By changing this variable, the file to be mounted can be set. Please note that in this case, the variable specified in the config.yaml must be set to the default value.

Stage 2: Log Collection#

The Log Collection stage validates and processes incoming loglines from the Log Storage stage, organizes them into batches based on subnet IDs, and forwards them to the next pipeline stage for further analysis.

Core Functionality#

The Log Collection stage is responsible for retrieving loglines from the Log Storage, parsing their information fields, and validating the data. Each field is checked to ensure it is of the correct type and format. This stage ensures that all data is accurate, reducing the need for further verification in subsequent stages.

Data Processing and Validation#

Any loglines that do not meet the required format are immediately discarded to maintain data integrity. The validation process includes data type verification and value range checks (e.g., verifying that IP addresses are valid). Only validated loglines proceed to the batching phase.

Batching and Performance Optimization#

Valid loglines are buffered and transmitted in batches after a pre-defined timeout or when the buffer reaches its capacity. This minimizes the number of messages sent to the next stage and optimizes performance. The client’s IP address is retrieved from the logline and used to create the subnet_id with the number of subnet bits specified in the configuration.

Advanced Features#

The functionality of the buffer system is detailed in the subsection Buffer Functionality. This approach helps detect errors or attacks that may occur at the boundary between two batches when analyzed in later pipeline stages.

Overview#

The Log Collection stage comprises three main classes:

  1. LogCollector: Connects to the LogServer to retrieve and parse loglines, validating their format and content. Adds subnet_id that it retrieves from the client’s IP address in the logline.

  2. BufferedBatch: Buffers validated loglines with respect to their subnet_id. Maintains the timestamps for accurate processing and analysis per key (subnet_id). Returns sorted batches.

  3. BufferedBatchSender: Adds messages to the data structure BufferedBatch, maintains the timer and checks the fill level of the key-specific batches. Sends the key’s batches if full, sends all batches at timeout.

Main Classes#

class src.logcollector.collector.LogCollector[source]#

Main component of the Log Collection stage to pre-process and format data

Consumes incoming loglines from the LogServer. Validates all data fields by type and value, invalid loglines are discarded. All valid loglines are sent to the BatchSender.

class src.logcollector.batch_handler.BufferedBatch[source]#

Data structure for managing batches, buffers, and timestamps in the log collection pipeline

Manages two data structures: a current batch that collects incoming messages and a buffer that stores previously processed batch messages. The batch groups messages by key (typically subnet ID) and handles automatic sending when size or timeout limits are reached. All batches are sorted by timestamp to ensure chronological processing. Tracks batch metadata including IDs, timestamps, and fill levels for monitoring.

class src.logcollector.batch_handler.BufferedBatchSender[source]#

Main component for managing batch collection and dispatch in the log collection stage

Coordinates the addition of messages to batches and handles automatic sending based on two triggers: size-based (when a batch reaches the configured size limit) or time-based (when a timeout expires). Manages a timer that ensures batches are sent even if they don’t reach the size threshold, preventing data from being held indefinitely. Sends completed batches to the next pipeline stage via Kafka and tracks message timestamps for monitoring and debugging purposes.

Usage#

LogCollector#

The LogCollector connects to the LogServer to retrieve one logline, which it then processes and validates. The logline is parsed into its respective fields, each checked for correct type and format:

  • Field Validation:

    • Checks include data type verification and value range checks (e.g., verifying that an IP address is valid).

    • Only loglines meeting the criteria are forwarded to the BufferedBatchSender.

  • Subnet Identification:

    • The configuration file specifies the number n of bits in a subnet (e.g. 24). The client’s IP address serves as a base for the subnet_id. For this, the initial IP address is cut off after n bits, the rest is filled with zeros, and _n is added to the end of the subnet_id. For example:

      Client IP address

      Subnet ID

      171.154.4.17

      171.154.4.0_24

  • Connection to LogServer:

    • The LogCollector establishes a connection to the LogServer and retrieves loglines when they become available.

  • Log Line Format:

    • By default, log lines have the following format:

      TIMESTAMP STATUS CLIENT_IP DNS_IP HOST_DOMAIN_NAME RECORD_TYPE RESPONSE_IP SIZE
      

      Field

      Description

      TIMESTAMP

      The date and time when the log entry was recorded. The format is configurable through the logline_format in config.yaml.

      • Default Format: %Y-%m-%dT%H:%M:%S.%fZ (ISO 8601 with microseconds and UTC).

      • Example: 2024-07-28T14:45:30.123456Z

      The format can be customized by modifying the timestamp configuration in the pipeline configuration file.

      STATUS

      The status of the DNS query, e.g., NOERROR, NXDOMAIN.

      CLIENT_IP

      The IP address of the client that made the request.

      DNS_IP

      The IP address of the DNS server processing the request.

      HOST_DOMAIN_NAME

      The domain name being queried.

      RECORD_TYPE

      The type of DNS record requested, such as A or AAAA.

      RESPONSE_IP

      The IP address returned in the DNS response.

      SIZE

      The size of the DNS query response in bytes. Represented in the format like 150b, where the number indicates the size and b denotes bytes.

    • Users can change the format and field types, as described in the Logline format configuration section.

BufferedBatch#

The BufferedBatch manages the buffering of validated loglines as well as their timestamps and batch metadata:

  • Batching Logic and Buffering Strategy:

    • Collects log entries into a batch dictionary, with the subnet_id as key.

    • Uses a buffer per key to concatenate and send both the current and previous batches together.

    • This approach helps detect errors or attacks that may occur at the boundary between two batches when analyzed in Stage 4: Inspection and Stage 5: Detection.

    • All batches get sorted by their timestamps at completion to ensure correct chronological order.

    • A begin_timestamp and end_timestamp per key are extracted and sent as metadata (needed for analysis). These are taken from the chronologically first and last message in a batch.

    • Tracks batch IDs, timestamps, and fill levels for comprehensive monitoring and debugging.

  • Monitoring and Metadata:

    • Each batch is assigned a unique batch ID for tracking purposes.

    • Logs associations between loglines and their respective batches.

    • Maintains fill level statistics for both batches and buffers.

    • Records batch status changes (waiting, completed) with timestamps.

BufferedBatchSender#

The BufferedBatchSender manages the sending of validated loglines stored in the BufferedBatch:

  • Timer-based and Size-based Triggers:

    • Starts a timer upon receiving the first log entry.

    • When a batch reaches the configured size (e.g., 1000 entries), the current and previous batches of this key are concatenated and sent to the Kafka topic batch_sender_to_prefilter.

    • Upon timer expiration, the currently stored batches of all keys are sent. Serves as backup if batches don’t reach the configured size.

    • If no messages are present when the timer expires, nothing is sent.

  • Message Processing and Monitoring:

    • Extracts logline IDs from JSON messages for tracking purposes.

    • Logs processing timestamps (in_process, batched) for each message.

    • Provides detailed logging about the number of messages and batches sent.

    • Uses the Batch schema for serialization before sending to Kafka.

Configuration#

The LogCollector checks the validity of incoming loglines. For this, it uses the logline_format configured in the config.yaml. Section Logline format configuration provides detailed information on how to customize the logline format and field definitions. The LogCollector uses the following configuration options from the configuration:

  • LogCollector Analyzation Criteria:

    • Valid status codes: The accepted status codes for logline validation. This is defined in the field with name "status_code" in the logline_format list.

    • Valid record types: The accepted DNS record types for logline validation. This is defined in the field with name "record_type" in the logline_format list.

Buffer Functionality#

The BufferedBatch class manages the batching and buffering of messages associated with specific keys, along with the corresponding timestamps. The class ensures efficient data processing by maintaining two sets of messages - those currently being batched and those that were part of the previous batch. It also tracks the necessary timestamps to manage the timing of message processing.

Class Overview#

  • Batch: Stores the latest incoming messages associated with a particular key.

  • Buffer: Stores the previous batch of messages associated with a particular key.

  • Batch ID: Unique identifier assigned to each batch for tracking and monitoring purposes.

  • Monitoring Databases: Tracks logline-to-batch associations, batch timestamps, and fill levels for comprehensive monitoring.

Key Procedures#

  1. Message Arrival and Addition:

  • When a new message arrives, the add_message() method is called.

  • If the key already exists in the batch, the message is appended to the list of messages for that key.

  • If the key does not exist, a new entry is created in the batch with a unique batch ID.

  • Batch timestamps and logline-to-batch associations are logged for monitoring.

  • Example:

    • message_1 arrives for key_1 and is added to batch["key_1"].

  1. Retrieving Message Counts:

  • Use get_message_count_for_batch_key(key) to get the count of messages in the current batch for a specific key.

  • Use get_message_count_for_buffer_key(key) to get the count of messages in the buffer for a specific key.

  • Use get_message_count_for_batch() to get the total count across all batches.

  • Use get_message_count_for_buffer() to get the total count across all buffers.

  1. Completing a Batch:

  • The complete_batch() method is called to finalize and retrieve the batch data for a specific key.

  • Scenarios:

    • Variant 1: If only the current batch contains messages (buffer is empty), the batch is returned sorted by and with its timestamps. begin_timestamp reflects the timestamp of the first message in the batch, and end_timestamp the timestamp of the chronologically last message in the batch.

    • Variant 2: If both the batch and buffer contain messages, the buffered messages are included in the returned data. The begin_timestamp now reflects the first message’s timestamp in the buffer instead of the batch.

    • Variant 3: If only the buffer contains messages (no new messages arrived), the buffer data is discarded.

    • Variant 4: If neither the batch nor the buffer contains messages, a ValueError is raised.

  1. Managing Stored Keys:

  • The get_stored_keys() method returns a set of all keys currently stored in either the batch or the buffer, allowing the retrieval of all keys with associated messages or buffered data.

Example Workflow#

  1. Initial Message:

  • message_1 arrives for key_1, added to batch["key_1"].

  1. Subsequent Message:

  • message_2 arrives for key_1, added to batch["key_1"].

  1. Completing the Batch:

  • complete_batch("key_1") is called, and if buffer["key_1"] exists, it includes both buffered and batch messages, otherwise just the batch.

  • The current batch is moved to the buffer.

  1. Buffer Management:

  • If no new messages arrive, buffer["key_1"] data is discarded upon the next call to complete_batch("key_1").

This class design effectively manages the batching and buffering of messages, allowing for precise timestamp tracking and efficient data processing across different message streams.

Stage 3: Log Filtering#

The Log Filtering stage processes batches from the Log Collection stage and filters out irrelevant entries based on configurable relevance criteria, ensuring only meaningful data proceeds to anomaly detection.

Core Functionality#

The Log Filtering stage is responsible for processing and refining log data by filtering out entries based on relevance criteria defined in the logline format configuration. This step ensures that only relevant logs are passed on for further analysis, optimizing the performance and accuracy of subsequent pipeline stages.

Data Processing Pipeline#

The filtering process operates on complete batches rather than individual loglines, maintaining batch metadata and timestamps throughout the process. Each batch is processed as a unit, preserving the subnet-based grouping established in the previous stage.

Relevance-Based Filtering#

The filtering mechanism uses the check_relevance() method from the LoglineHandler to determine which entries should proceed to the next stage. This approach allows for flexible filtering criteria based on field values defined in the configuration.

Main Class#

class src.prefilter.prefilter.Prefilter[source]#

Main component of the Log Filtering stage to process and filter batches

Consumes batches from the Log Collection stage and applies relevance-based filtering using the LoglineHandler. Filters out irrelevant loglines and forwards only relevant data to the next pipeline stage for anomaly detection.

Usage#

Data Flow and Processing#

The Prefilter consumes batches from the Kafka topic batch_sender_to_prefilter and processes them through the following workflow:

  1. Batch Reception: Receives complete batches with metadata (batch_id, begin_timestamp, end_timestamp, subnet_id)

  2. Relevance Filtering: Applies relevance checks to each logline within the batch

  3. Monitoring: Tracks filtered and unfiltered data counts for monitoring purposes

  4. Batch Forwarding: Sends filtered batches to the prefilter_to_inspector topic

Filtering Logic#

The filtering process:

  • Retains loglines that pass the relevance check defined by ListItem field configurations

  • Discards irrelevant loglines and marks them as “filtered_out” in the monitoring system

  • Preserves batch structure and metadata for filtered data

  • Handles empty batches gracefully (logs info but does not forward empty data)

Error Handling#

The implementation includes robust error handling:

  • Empty Data: Logs informational messages when batches contain no data

  • No Filtered Data: Raises ValueError when no relevant data remains after filtering

  • Kafka Exceptions: Continues processing on message fetch exceptions

  • Graceful Shutdown: Supports KeyboardInterrupt for clean termination

Configuration#

Filtering behavior is controlled through the logline_format configuration in config.yaml:

  • Relevance Criteria:

    • For fields of type ListItem, the fourth entry (relevant_list) defines which values are considered relevant

    • If no relevant_list is specified, all allowed values are deemed relevant

    • Multiple fields can have relevance criteria, and all must pass for a logline to be retained

  • Example Configuration:

    logline_format:
      - [ "status_code", ListItem, [ "NOERROR", "NXDOMAIN" ], [ "NXDOMAIN" ] ]  # Only NXDOMAIN relevant
      - [ "record_type", ListItem, [ "A", "AAAA" ] ]  # A and AAAA relevant
    

Monitoring and Metrics#

The Prefilter provides comprehensive monitoring:

  • Batch Processing: Tracks batch timestamps and processing status

  • Fill Levels: Monitors data volumes before and after filtering

  • Logline Tracking: Records “filtered_out” status for individual loglines

  • Performance Metrics: Logs processing statistics for each batch

Stage 4: Inspection#

Overview#

The Inspection stage performs time-series-based anomaly detection on prefiltered DNS request batches. Its primary purpose is to reduce the load on the Detection stage by filtering out non-suspicious traffic early.

This stage uses StreamAD models—supporting univariate, multivariate, and ensemble techniques—to detect unusual patterns in request volume and packet sizes.

Main Class#

class src.inspector.inspector.Inspector[source]#

Main component of the Data Inspection stage to detect anomalies in request batches

Analyzes batches of DNS requests using configurable streaming anomaly detection models. Supports univariate, multivariate, and ensemble detection modes. Processes time series features from DNS request patterns to identify suspicious network behavior and forwards anomalous batches to the Detector for further analysis.

The Inspector class is responsible for:

  • Loading batches from Kafka

  • Extracting time-series features (e.g., frequency and packet size)

  • Applying anomaly detection models

  • Forwarding suspicious batches to the detector stage

Usage#

Data Flow and Processing#

The Inspector consumes batches from the Kafka topic prefilter_to_inspector and processes them through the following workflow:

  1. Batch Reception: Receives batches with metadata (batch_id, begin_timestamp, end_timestamp) from the Prefilter

  2. Time Series Construction: Creates time series features based on configurable time windows

  3. Anomaly Detection: Applies StreamAD models to detect suspicious patterns

  4. Threshold Evaluation: Evaluates anomaly scores against configured thresholds

  5. Suspicious Batch Forwarding: Groups and forwards anomalous data by client IP to the Detector

Time Series Feature Extraction#

The Inspector creates time series features using sliding non-overlapping windows:

  • Time Window Configuration: Based on time_type (e.g., ms) and time_range (e.g., 20) from configuration

  • Univariate Mode: Counts message occurrences per time step for single-feature anomaly detection

  • Multivariate Mode: Combines message counts and mean packet sizes for two-dimensional feature analysis

  • Ensemble Mode: Uses message counts with multiple models combined through ensemble methods

Anomaly Detection Logic#

The anomaly detection process evaluates suspicious patterns through a two-level threshold system:

  • Score Threshold: Individual time steps are flagged as anomalous when scores exceed score_threshold (default: 0.5)

  • Anomaly Threshold: Batches are considered suspicious when the proportion of anomalous time steps exceeds anomaly_threshold (default: 0.01)

  • Client IP Grouping: Suspicious batches are grouped by client IP and forwarded as separate suspicious batches to the Detector

Error Handling and Monitoring#

The implementation includes comprehensive monitoring and error handling:

  • Busy State Management: Prevents new batch consumption while processing current data

  • Model Validation: Validates model compatibility with selected detection mode

  • Fill Level Tracking: Monitors data volumes throughout the processing pipeline

  • Graceful Degradation: Handles empty batches and model loading failures appropriately

Configuration#

The Inspector supports comprehensive configuration through the data_inspection.inspector section in config.yaml. All StreamAD models are supported, including univariate, multivariate, and ensemble methods.

Detection Modes#

Three detection modes are available:

  • Univariate Mode (mode: univariate): Uses message count time series for anomaly detection

  • Multivariate Mode (mode: multivariate): Combines message counts and mean packet sizes

  • Ensemble Mode (mode: ensemble): Uses multiple models with ensemble combination methods

Model Configuration#

Univariate Models (streamad.model):

  • ZScoreDetector: Statistical anomaly detection using z-scores

  • KNNDetector: K-nearest neighbors based detection

  • SpotDetector: Streaming peaks-over-threshold detection

  • SRDetector: Spectral residual based detection

  • OCSVMDetector: One-class SVM for anomaly detection

  • MadDetector: Median absolute deviation detection

  • SArimaDetector: Streaming ARIMA-based detection

Multivariate Models (streamad.model):

  • xStreamDetector: Multi-dimensional streaming detection

  • RShashDetector: Random projection hash-based detection

  • HSTreeDetector: Half-space tree based detection

  • LodaDetector: Lightweight online detector of anomalies

  • OCSVMDetector: One-class SVM (supports multivariate)

  • RrcfDetector: Robust random cut forest detection

Ensemble Methods (streamad.process):

  • WeightEnsemble: Weighted combination of multiple detectors

  • VoteEnsemble: Voting-based ensemble prediction

Configuration Parameters#

data_inspection:
  inspector:
    mode: univariate                    # Detection mode: univariate, multivariate, ensemble
    models:                             # List of models to use
      - model: ZScoreDetector
        module: streamad.model
        model_args:
          is_global: false
    ensemble:                           # Ensemble configuration (when mode: ensemble)
      model: WeightEnsemble
      module: streamad.process
      model_args: {}
    anomaly_threshold: 0.01            # Proportion of anomalous time steps required
    score_threshold: 0.5               # Individual score threshold for anomaly detection
    time_type: ms                      # Time unit for window creation
    time_range: 20                     # Time range for each window step

Model Arguments: Custom arguments for specific models can be provided via the model_args dictionary. This allows fine-tuning of model parameters for specific deployment requirements.

Time Window Settings: The time_type and time_range parameters control the granularity of time series analysis. Current configuration uses 20-millisecond windows for high-resolution anomaly detection.

Stage 5: Detection#

Overview#

The Detection stage is the core of the heiDGAF pipeline. It consumes suspicious batches passed from the Inspector, applies pre-trained ML models to classify individual DNS requests, and issues alerts based on aggregated probabilities.

The pre-trained models used here are licensed under EUPL‑1.2 and built from the following datasets:

Main Class#

class src.detector.detector.Detector[source]#

Main component of the Data Analysis stage to perform anomaly detection

Processes suspicious batches from the Inspector using configurable ML models to classify DNS requests as benign or malicious. Downloads and validates models from a remote server, extracts features from domain names, calculates probability scores, and generates alerts when malicious requests are detected above the configured threshold.

The Detector class:

  • Consumes a batch flagged as suspicious.

  • Downloads and validates the ML model (if necessary).

  • Extracts features from domain names (e.g. character distributions, entropy, label statistics).

  • Computes a probability per request and an overall risk score per batch.

  • Emits alerts to ClickHouse and logs in /tmp/warnings.json where applicable.

Usage#

  1. The Detector listens on the Kafka topic from the Inspector (inspector_to_detector).

  2. For each suspicious batch: - Extracts features for every domain request. - Applies the loaded ML model (after scaling) to compute class probabilities. - Marks a request as malicious if its probability exceeds the configured threshold.

  3. Computes an overall score (e.g. median of malicious probabilities) for the batch.

  4. If malicious requests exist, issues an alert record and logs it; otherwise, the batch is filtered.

Alerts are recorded in ClickHouse and also appended to a local JSON file (warnings.json) for external monitoring.

Configuration#

You may use the provided, pre-trained models or supply your own. To use a custom model, specify:

  • base_url: URL from which to fetch model artifacts

  • model: model name

  • checksum: SHA256 digest for integrity validation

  • threshold: probability threshold for classifying a request as malicious

These parameters are loaded at startup and used to download, verify, and load the model/scaler if not already cached locally (in temp directory).