-
Notifications
You must be signed in to change notification settings - Fork 71
Implementing Filters
-
Ingestion: A log enters the system (via Input Plugin) with basic metadata:
dataSource,dataType, andtenantId. -
Draft Creation: The engine creates a Draft object. The original log is stored in the
rawfield. -
Pipeline Matching: The engine iterates through the
pipelineconfiguration.- Stages are evaluated in order.
- A stage executes if the log's
dataTypeis included in the stage'sdataTypesarray. -
Multiple Matches: A log can match and run through multiple stages if they all contain its
dataType.
- Step Execution: Within a stage, steps run sequentially. Each step modifies the Draft's internal JSON string.
- Finalization: Once all matching stages finish, the final Draft is converted into a structured Event and sent to the Analysis stage.
A well-designed pipeline follows these four phases:
Use json, csv, kv, or grok to pull data out of the raw string.
- json:
source: rawMap extracted fields to the Standard Event Schema.
- rename:
from: [log.source_ip, log.src]
to: origin.ipAdd context or transform data using dynamic plugins or reformat.
- dynamic:
plugin: com.utmstack.geolocation
params: { source: origin.ip, destination: origin.geolocation }
- cast:
fields: [origin.port]
to: intRemove temporary fields to optimize storage and indexing.
- delete:
fields: [log.temp_id, log.unnecessary_meta]Note: The
rawfield is protected for auditing purposes and cannot be deleted.
Every step can be made conditional using CEL expressions. If the expression returns false, the step is skipped. For a full list of available comparison functions, see the CEL Overloads Guide.
-
Example: Only run a
grokif a certain field exists.- grok: source: log.message patterns: [...] where: 'exists("log.message")'
If you encounter "noise" (logs that shouldn't be stored or analyzed), use the drop step. It immediately stops the pipeline and discards the log.
- drop:
where: 'contains(raw, "HealthCheck")'This example processes a firewall log that arrives as a Key-Value string.
pipeline:
- dataTypes: [firewall-fortigate-traffic]
steps:
# 1. Extract KV pairs (automatically goes into log.*)
- kv:
source: raw
fieldSplit: " "
valueSplit: "="
# 2. Normalize to standard schema
- rename:
from: [log.src, log.s_ip]
to: origin.ip
- rename:
from: [log.dst, log.d_ip]
to: target.ip
# 3. Handle data types for better indexing
- cast:
fields: [origin.port, target.port]
to: int
# 4. Logical enrichment: Determine action
- add:
function: string
params: { key: action, value: denied }
where: 'equals("log.policy", "block")'
# 5. Cleanup temporary metadata
- delete:
fields: [log.policy]For a full list of step parameters, see the Filter Steps Reference.