logcollector package#
Submodules#
logcollector.batch_handler module#
- class logcollector.batch_handler.BufferedBatch[source]#
Bases:
objectData structure for managing batches, buffers, and timestamps in the log collection pipeline
Manages two data structures: a current batch that collects incoming messages and a buffer that stores previously processed batch messages. The batch groups messages by key (typically subnet ID) and handles automatic sending when size or timeout limits are reached. All batches are sorted by timestamp to ensure chronological processing. Tracks batch metadata including IDs, timestamps, and fill levels for monitoring.
- add_message(key: str, logline_id: UUID, message: str) None[source]#
Adds a message to the batch associated with the given key.
If the key does not exist in the current batch, a new batch entry is created with a unique batch ID. For existing keys, the message is appended to the existing batch. Logs the association between the logline and batch ID, updates batch timestamps, and tracks fill levels for monitoring purposes.
- complete_batch(key: str) dict[source]#
Completes the batch for a specific key and returns a formatted data packet.
Handles multiple scenarios based on available data: - Variant 1: Only batch has entries - sends current batch data - Variant 2: Both buffer and batch have entries - combines both in chronological order - Variant 3: Only buffer has entries - cleans up old buffer data - Variant 4: No data exists - raises ValueError
The method sorts all messages by timestamp, creates a data packet with batch metadata, logs completion timestamps, and moves current batch data to buffer for next iteration.
- Parameters:
key (str) – Key for which to complete the current batch and return data packet.
- Returns:
Dictionary containing batch_id, begin_timestamp, end_timestamp, and chronologically sorted message data combining buffer and batch messages.
- Raises:
ValueError – No data is available for sending.
- get_message_count_for_batch() int[source]#
Returns the total number of messages across all batches.
Calculates the sum of message counts from all key-specific batches currently stored.
- Returns:
Total number of messages in all batches.
- get_message_count_for_batch_key(key: str) int[source]#
Returns the number of messages in the batch for a specific key.
- Parameters:
key (str) – Key for which message count is returned.
- Returns:
Number of messages in the batch for the given key, or 0 if key doesn’t exist.
- get_message_count_for_buffer() int[source]#
Returns the total number of messages across all buffers.
Calculates the sum of message counts from all key-specific buffers currently stored.
- Returns:
Total number of messages in all buffers.
- get_message_count_for_buffer_key(key: str) int[source]#
Returns the number of messages in the buffer for a specific key.
- Parameters:
key (str) – Key for which message count is returned.
- Returns:
Number of messages in the buffer for the given key, or 0 if key doesn’t exist.
- get_stored_keys() set[source]#
Retrieves all keys stored in either the batch or the buffer.
Combines keys from both the current batch dictionary and the buffer dictionary to provide a complete set of all keys that have associated data.
- Returns:
Set of all unique keys stored in either batch or buffer dictionaries.
- class logcollector.batch_handler.BufferedBatchSender[source]#
Bases:
objectMain component for managing batch collection and dispatch in the log collection stage
Coordinates the addition of messages to batches and handles automatic sending based on two triggers: size-based (when a batch reaches the configured size limit) or time-based (when a timeout expires). Manages a timer that ensures batches are sent even if they don’t reach the size threshold, preventing data from being held indefinitely. Sends completed batches to the next pipeline stage via Kafka and tracks message timestamps for monitoring and debugging purposes.
- add_message(key: str, message: str) None[source]#
Adds a message to the batch and triggers sending if batch size limit is reached.
Extracts the logline ID from the JSON message, logs the processing timestamps, and adds the message to the appropriate batch. If the batch reaches the configured size limit, it is immediately sent. On the first message, starts a timer that will trigger sending of all batches when the timeout expires.
logcollector.collector module#
- class logcollector.collector.LogCollector[source]#
Bases:
objectMain component of the Log Collection stage to pre-process and format data
Consumes incoming loglines from the LogServer. Validates all data fields by type and value, invalid loglines are discarded. All valid loglines are sent to the BatchSender.
- async fetch() None[source]#
Fetches data from the configured Kafka topic in a loop.
Starts an asynchronous loop to continuously listen on the configured Kafka topic and fetch new messages. If a message is consumed, it is decoded and sent.
- send(timestamp_in: datetime, message: str) None[source]#
Adds a message to the BatchSender to be stored temporarily.
The message is added in JSON format to the BatchSender, where it is stored in a temporary batch before being sent to the Prefilter. The subnet ID is added to the message. In the case that a message does not have a valid logline format, it is logged as a failed logline including timestamps of entering and being detected as invalid. In the case of a valid message, the logline’s fields as well as an “in_process” event are logged using the timestamp of it entering the module. After processing, a “finished” event is logged for it.
- Parameters:
timestamp_in (datetime.datetime) – Timestamp of entering the pipeline.
message (str) – Message to be stored.
- logcollector.collector.main() None[source]#
Creates the
LogCollectorinstance and starts it.