Skip to content

Implementing Filters

Osmany Montero edited this page Jan 19, 2026 · 4 revisions

The Parsing Lifecycle

  1. Ingestion: A log enters the system (via Input Plugin) with basic metadata: dataSource, dataType, and tenantId.
  2. Draft Creation: The engine creates a Draft object. The original log is stored in the raw field.
  3. Pipeline Matching: The engine iterates through the pipeline configuration.
    • Stages are evaluated in order.
    • A stage executes if the log's dataType is included in the stage's dataTypes array.
    • Multiple Matches: A log can match and run through multiple stages if they all contain its dataType.
  4. Step Execution: Within a stage, steps run sequentially. Each step modifies the Draft's internal JSON string.
  5. Finalization: Once all matching stages finish, the final Draft is converted into a structured Event and sent to the Analysis stage.

Pipeline Architecture

A well-designed pipeline follows these four phases:

phase 1: extraction

Use json, csv, kv, or grok to pull data out of the raw string.

- json:
    source: raw

phase 2: normalization

Map extracted fields to the Standard Event Schema.

- rename:
    from: [log.source_ip, log.src]
    to: origin.ip

phase 3: enrichment & logic

Add context or transform data using dynamic plugins or reformat.

- dynamic:
    plugin: com.utmstack.geolocation
    params: { source: origin.ip, destination: origin.geolocation }
- cast:
    fields: [origin.port]
    to: int

phase 4: cleanup

Remove temporary fields to optimize storage and indexing.

- delete:
    fields: [log.temp_id, log.unnecessary_meta]

Note: The raw field is protected for auditing purposes and cannot be deleted.


Technical Details

Conditional Execution (where)

Every step can be made conditional using CEL expressions. If the expression returns false, the step is skipped. For a full list of available comparison functions, see the CEL Overloads Guide.

  • Example: Only run a grok if a certain field exists.
    - grok:
        source: log.message
        patterns: [...]
        where: 'exists("log.message")'

Stopping Processing (drop)

If you encounter "noise" (logs that shouldn't be stored or analyzed), use the drop step. It immediately stops the pipeline and discards the log.

- drop:
    where: 'contains(raw, "HealthCheck")'

Annotated Full Example

This example processes a firewall log that arrives as a Key-Value string.

pipeline:
  - dataTypes: [firewall-fortigate-traffic]
    steps:
      # 1. Extract KV pairs (automatically goes into log.*)
      - kv:
          source: raw
          fieldSplit: " "
          valueSplit: "="

      # 2. Normalize to standard schema
      - rename:
          from: [log.src, log.s_ip]
          to: origin.ip
      - rename:
          from: [log.dst, log.d_ip]
          to: target.ip

      # 3. Handle data types for better indexing
      - cast:
          fields: [origin.port, target.port]
          to: int

      # 4. Logical enrichment: Determine action
      - add:
          function: string
          params: { key: action, value: denied }
          where: 'equals("log.policy", "block")'

      # 5. Cleanup temporary metadata
      - delete:
          fields: [log.policy]

For a full list of step parameters, see the Filter Steps Reference.

Clone this wiki locally