Skip to main content

Pipelines

Pipelines are a core component of Graylog’s log message processing system, providing a structured framework for evaluating, modifying, and routing incoming data. They define the sequence of processing steps applied to messages after ingestion, ensuring consistent, efficient, and customized handling of log data.

Each pipeline consists of a series of rules organized into streams, and can be linked to one or more streams. This connection allows you to determine exactly how and when certain messages are processed, giving you fine-grained control over enrichment, normalization, and routing.

Key Concepts

Pipelines

A pipeline is a collection of pipeline rules organized into stages. When attached to a stream, messages entering that stream are processed through all connected pipelines in the defined order of stages.

Pipeline Rules

Pipeline rules define the logic that determines how messages are processed.

They can:

  • Route messages to different streams

  • Enrich data by adding or modifying fields

  • Transform message content

  • Normalize message formats for consistent search and analysis

A set of related rules that operate together forms a complete pipeline workflow.

Functions

Functions are the building blocks of pipeline rules. Each function performs a specific operation, such as parsing text, checking field values, or changing message content, and can accept parameters to control its behavior. Functions return results that influence how subsequent rules handle a message.

Stages

Pipelines are divided into stages, each containing one or more rules. Stages are executed sequentially, in numerical order.

All stages with the same priority execute in parallel across all connected pipelines. This structure makes it possible to build multi-step workflows, for example, parsing a message in one stage, enriching it in another, and routing it in the final stage.

Streams

Every pipeline must be attached to at least one stream, which determines which messages the pipeline processes.

Messages entering a stream trigger the execution of all pipelines connected to it. For more information, see Streams.

Pipeline rule logic

Pipelines in Security Data Lake are built from pipeline rules, which define how log messages are inspected, transformed, and routed before they are indexed or stored. These rules use a dedicated domain-specific language (DSL) that provides a controlled, readable syntax for defining processing logic while maintaining strong runtime performance.

Each rule combines a condition and an action:

  • The condition determines when the rule applies.

  • The action specifies what happens when the condition is met.

Understanding data types is essential when writing rules. Data types define what kind of value a field holds (such as a string, number, or IP address) and how that value can be manipulated within a rule.

Pipeline rules are built using functions—predefined methods that perform specific tasks such as converting data types, manipulating strings, parsing JSON, or retrieving lookup table data. Graylog includes a wide range of built-in functions to help you enrich, transform, and manage log data effectively.

Rules can be created and tested interactively through the rule builder interface, or written manually in the source code editor for advanced users.

Example pipeline

The following example shows how pipelines and their rules are structured internally:

pipeline "My new pipeline"
stage 1 match all
  rule "has firewall fields";
  rule "from firewall subnet";
stage 2 match either
  rule "geocode IPs";
  rule "anonymize source IPs";
end

This pipeline defines two stages:

  • Stage 1 executes only if all listed rules evaluate to true.

  • Stage 2 executes if any of its rules match (similar to an OR condition).

Stages run in ascending numerical order, and each can reference reusable rules. This allows modular design: rules like has firewall fields can be shared across multiple pipelines, avoiding duplication.

Example rules

Here are two example rules referenced in the pipeline above:

Rule 1

rule "has firewall fields"
when
    has_field("src_ip") && has_field("dst_ip")
then
end

Rule 2

rule "from firewall subnet"
when
    cidr_match("10.10.10.0/24", to_ip($message.gl2_remote_ip))
then
end

Both rules use built-in functions to define conditions:

  • has_field() checks for the presence of specific message fields.

  • cidr_match() evaluates whether an IP address falls within a defined subnet.

The to_ip() conversion ensures that the value is interpreted as an IP address rather than a string, demonstrating Security Data Lake’s strong type enforcement for rule validation.

These rules contain no actions (then is empty) because they are used only to control pipeline flow.

Conditions

A rule’s when clause is a Boolean expression evaluated against each message.

It supports logical operators AND (&&), OR (||), and NOT (!), as well as comparison operators such as <, <=, >, >=, ==, and !=.

For example:

has_field("src_ip") && cidr_match("10.0.0.0/8", to_ip($message.src_ip))

If a condition references a function that does not exist, it automatically evaluates to false.

When comparing fields, ensure both are of the same type, for example:

to_string($message.src_ip) == to_string($message.dst_ip)

Actions

The then clause defines what happens when a condition evaluates to true.

Actions can be:

  • Function calls, e.g. set_field("type", "firewall_log");

  • Variable assignments, e.g. let subnet = to_string($message.network);

Variables allow you to store and reuse temporary values, avoid recomputation, and make rules more readable.

Reserved Words

Certain tokens in the rule language are reserved and cannot be used as variable names, including:

  • All

  • Either

  • Pass

  • And

  • Or

  • Not

  • Pipeline

  • Rule

  • During

  • Stage

  • When

  • Then

  • End

  • Let

  • Match

For example:

let match = regex(a,b);

will fail because match is a reserved word.

Data types

Security Data Lake enforces type safety in pipeline rules to prevent invalid operations. The following built-in data types are supported:

Data type

Description

string

UTF-8 text value

double

Floating-point number (Java Double)

long

Integer number (Java Long)

boolean

True or false value

void

Function with no return value

ip

IP address (subset of InetAddress)

Plugins may define additional types. Conversion functions prefixed with to_ (e.g., to_string()

, to_ip(), to_long()) ensure proper type handling. For a full list of functions, refer to Functions reference.

Note

Always convert message fields to the correct type before using them in comparisons or functions. For example:

set_field("timestamp", to_string(`$message.@extracted_timestamp`));

Build pipeline rules

Pipelines are defined by rules that determine how messages are processed as they pass through Graylog. Each rule specifies conditions and actions that allow you to filter, enrich, transform, or route log data based on specific criteria.

To set up a rule, you define its logic using the rule builder or the source code editor, combining “when” conditions with “then” actions to describe exactly how messages should be handled. Once rules are created, they can be added to pipelines, organized into stages, and connected to streams—enabling flexible, automated control over message processing from ingestion to storage.

This article outlines the processes involved in creating and managig pipeline rules.

Configure the Message Processor

Before you start building pipeline rules, make sure that the message processor is enabled and correctly configured:

  1. G to System > Configurations.

  2. Select Message Processors.

  3. Select Edit configurations and enable the Pipeline Processorby selecting the checkbox next to it.

  4. Drag the Pipeline Processor so that it comes after Message Filter Chain. Use the six dots to the left to drag.

  5. Click Update configuration.

Create and manage rules

Rules can be built with either the Rule Builder or the Source Code Editor.

The Rule Builder (default view) offers a guided, visual way to create rules.

To switch to manual editing, select Use Source Code Editor from the creation menu.

Warning

You can convert a rule from the Rule Builder to the Source Code Editor, but not back again.

Create a rule using the Rule Builder

The Rule Builder provides a visual, structured method for writing rules directly in the Graylog interface.

Each rule follows a simple when → then pattern:

  • When defines the condition that triggers the rule.

  • Then defines the action to take when that condition is met.

Both boxes feature searchable drop-downs. Typing a few letters of a function name displays suggestions and short descriptions. Refer to the Functions documentation for a complete list.

To build a rule using Rule Builder, follow these steps:

  1. Go to System > Pipelines > Manage Rules.

  2. Select Create Rule.

  3. Create a when statement.

  4. (Optional) You can add additional statements, and combine them with and or or operators, which you can select from the upper right section of the When section.

  5. Create a then statement specifying the action.

If a then statement produces a value, the output variables appear automatically and can be reused in later statements.

Note

Rules can be tested immediately in the Rule Simulation module.

Create a rule with the Source Code Editor

You can also write rules manually using when and then statements in the Source Code Editor. This view supports full syntax editing and includes a quick-reference list of functions with their descriptions.

To create a rule with the Source Code Editor, follow these rules:

  1. Go to System > Pipelines > Manage Rules.

  2. Select Create Rule.

  3. Select Use Source Code Editor from the upper right side of the page.

  4. Configure the rule.

    Note

    Refer to Pipeline Rule Logic for syntax details.

  5. Select Create rule.

As with the Rule Builder, you can validate your rule in the Rule Simulation module before saving.

Simulate a pipeline rule

Simulation lets you test a rule before deployment. You can simulate a full message or just a single field. Enter a raw message string, key-value pair, or JSON payload in the simulation box.

The simulator shows assigned output variables and processed results step by step.

Tip

The last used message is saved with each rule, so it’s always available for simulation.

To run a simulation, follow these steps:

  1. Go to System > Pipelines > Simulator.

  2. Select Run Rule Simulation.

  3. Enter a sample message.

  4. Review the processed output.

  5. Reset or adjust the rule and run again if needed.

Managing pipelines

After creating rules, you can combine them into pipelines that process and enrich messages. Go to System > Pipelines > Manage Pipelines to create, edit, or delete pipelines.

Each pipeline contains one or more stages that define execution order and logic.

Creating a pipeline

To create a new pipeline, follow these steps:

  1. Go to System > Pipelines > Manage Pipelines.

  2. Select the Add new pipeline button on the upper right side of the screen.

  3. Enter a descriptive name and description for the pipeline and select ???

  4. Select Edit connections under the Pipeline connections sections.

    The Edit connections window is displayed.

  5. Under the Streams field, select the streams you want to attach.

    Note

    Pipelines act only on messages in the streams they are connected to. Multiple pipelines can process the same stream; their rules run according to stage priority.

    Tip

    The All messages stream is the default entry point for all incoming data and a good place for general pipelines handling routing, filtering, or field enrichment.

    Once selected, they will be added to a list below the menu. You can select Remove to remove them from the list.

  6. Select Add a new stage and configure the stage:

    1. Under Stage, enter the stage priority, which determines at what point in the sequence the pipeline will be applied. This number can be any integer, and lower numbers will be ran first.

    2. Select how to continue processing rules on subsequent stages:

      • All rules in this stage match the message - Continue to the next stage only if all conditions are met.

      • At least one of the rules in this stage matches the message - Continue to the next stage if any of the conditions are met

      • None or more rules on this stage match - Continue to the next stage only if none of conditions are met.

    3. Under Stage rules, select the rules to apply.

    4. Select Add stage to save the information.

  7. If required, add additional stages.

    Note

    Each stage you create adds a new section under the Pipeline menu. Select Edit to modify stage details or Delete to remove the stage.

With all stages added, the pipeline is now complete and will appear on the Pipelines page. Once connected to a stream, it will automatically start processing incoming messages according to the rules and logic you defined.

Stream testing and pipeline simulation

Use the Pipeline Simulator to preview how messages are processed through current pipeline settings. To test a stream, follow these steps:

  1. Go to System > Pipelines >Simulator.

  2. Under Stream, select the stream you want to test.

  3. Under Raw message, provide a raw sample message in the same format as incoming logs (for example, a GELF message).

  4. (Optional) Specify a source IP, input type, and codec (the parsing mechanism for log messages).

After execution, the simulator displays:

  • Changes Summary – Lists modified, added, or removed fields.

  • Results Preview – Shows the fully processed message.

  • Simulation Trace – Details which rules and pipelines executed and how long each took.

Editing and pipeline stages

All pipelines are displayed under the System > Pipelines > Manage Pipelines page. For each pipeline, you can select Delete to remove the pipeline, or Edit to modify its configuration.

Use cases

This article presents practical use cases for creating and applying pipeline rules in Graylog. These examples demonstrate how to filter unwanted logs, enrich message data, and route messages to specific streams or alerting systems. Use these scenarios as a reference to design and implement efficient pipeline rules that optimize how your log data is processed and analyzed.

Rule

When example

Then example

Rule syntax

Anonymization

Pipeline rules can redact or remove sensitive data before messages are stored or forwarded.

This ensures compliance with privacy standards by masking information such as IP addresses, usernames, or personal identifiers.

Check whether the source_ip field exists.

Remove the source_ip field from the message to eliminate the sensitive data.

rule "Mask sensitive information"
when
    has_field("source_ip")
then
    remove_single_field("source_ip");
end

Breadcrumb

Breadcrumb rules add metadata to messages so their flow can be traced across systems or stages.

These rules are often used for debugging, tagging, or tracking message processing.

Leave blank. This will ensure the rule applies to every incoming message.

Add or update the field demo with a static value.

rule "Set demo field"
when
    true
then
    set_field("rule_demo", "test");
end

Filter

Filter rules help you drop unnecessary messages, reducing data ingestion volume and license usage.

Check if the testing field exists.

Drop the message entirely to prevent it from being stored or processed further.

rule "Drop Test Messages"
when
    has_field("testing")
then
    drop_message();
end

Modification

Modification rules change message content, for example, by reformatting timestamps or updating field values.

Check if the message contains an event_time field.

Convert the timestamp from UTC to UK time and write it to a new field called event_time_uk.

rule "convert event_time to UK timezone"
when
    has_field("event_time")
then
    let event_time_date = parse_date(
    value: to_string($message.event_time),
    pattern: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", // Adjust this pattern as needed
    timezone: "UTC"
    );
    let event_time_uk = format_date(
    value: event_time_date,
    date_format: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
    timezone: "Europe/London"
    );
    set_field("event_time_uk", event_time_uk
    );
end

Enrichment

Enrichment rules enhance messages by adding additional context or updating existing data values.

Check whether the field Src_ip_geo_country exists and equals US.

Update the field’s value to UniSt.

rule "SrcCountryUnitedStates"
when
    has_field("Src_ip_geo_country") &&
    to_string($message.Src_ip_geo_country) == "US"
then
    set_field(
        field: "Src_ip_geo_country",
        value: "UniSt",
        clean_field: false
    );
end

Routing

Routing rules send specific messages to another stream and optionally remove them from the current stream.

Tip

Create the destination stream in advance before referencing it in your rule.

Check if the gl2_remote_ip field exists and matches a specific string value.

Route the message to the target stream (My First Stream) and remove it from the default stream.

rule "Route Message to Stream"
when
    has_field("gl2_remote_ip") &&
    to_string($message.gl2_remote_ip) == "66914166ac1d1568bad817f3"
then
    route_to_stream(
        name: "My First Stream",
        remove_from_default: true
    );
end

Pipeline functions

Functions are the building blocks of pipeline rules. Each function is a predefined method that performs a specific action on a log message as it moves through Security Data Lake’s processing pipeline.

Functions can take one or more parameters and return outputs that determine how a message is transformed, enriched, or routed. By combining functions in conditions and actions, you can define powerful processing logic that tailors message handling to your organization’s needs.

For a full list of all supported Security Data Lake functions, their descriptions, and sample syntax, see Functions Reference.

Syntax

Pipeline functions in Graylog are implemented in Java and designed to be pluggable, allowing you to easily extend the platform’s processing capabilities.

Conceptually, a function receives parameters—such as the current message context—and returns a value. The parameter and return data types define where the function can be used within a rule. Graylog automatically validates these types to ensure that all rules are logically and syntactically sound.

Function parameters can be passed either as named key-value pairs or in positional order, provided that any optional parameters are declared and processed last.

Java Data Types

Pipeline rules can use certain Java data types when building queries or performing calculations. This is limited to those types that are queried using the GET function.

For example, you can use the .millis property of DateTime and Period objects to retrieve time values in milliseconds.

This allows you to perform precise time-based calculations, such as measuring how old a message is relative to the current time.

rule "time diff calculator millis"
when
    true
then
    let time_diff =
        to_long(
            parse_date(
                value: to_string(now(timezone: "Europe/Berlin")),
                pattern: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
                locale: "de_DE"
            ).millis
        )
        -
        to_long(
            parse_date(
                value: to_string($message.timestamp),
                pattern: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
                locale: "de_DE"
            ).millis
        );

    set_field("scan_age_millis", time_diff);
end

In this example, the rule calculates the difference between the current time (in the “Europe/Berlin” timezone) and the message’s timestamp, converting both values to milliseconds.

The resulting value (scan_age_millis) represents the age of the event in milliseconds and is stored as a new field in the message.

Warning

Security Data Lake does not support the use of any functions that are not officially documented. Exercise caution if you choose to test any unsupported function data types.

Function Types

Built-in Graylog functions can be categorized by the following function types. For a full list of all functions and their descriptions, see Functions Reference.

Anonymization

Anonymization functions obfuscate sensitive data from a dataset or log message.

Asset Enrichment

Asset Enrichment functions enhance, retrieve, or remove asset-related log data. See Asset Enrichment for more information on this Graylog Security feature.

Boolean

Boolean data is primarily associated with conditional statements, which allow different actions by changing control flow depending on whether a condition evaluates to true or false. Boolean functions determine Boolean values or operators.

Conversion

Conversion functions are used to convert a value from one format to another.

Date/Time

Date/time functions perform an action or calculation on a date and time value.

Debug

Debug functions are used to determine the state of your program at any point of execution.

Encoding

Encoding functions enable you to decode and convert strings.

List

List functions create or retrieve a collection that can be manipulated for your analysis.

Lookup

Lookup functions enable you to search a database for a value then return additional information from the same record.

Map

Map functions apply a given action to each or all elements in a collection.

Message Handling

Message Handling functions define what is to be done in response to a message. They are used for various enrichment, removal, retrieval, and routing operations for log data when building pipeline rules.

Pattern Matching

Pattern matching functions specify patterns to which some data should conform and deconstruct the data according to those patterns.

String

String functions are used to manipulate a string or query information about a string.

Watchlist

Watchlist functions perform actions that allow you to retrieve or modify watchlists.

Functions reference

The following list describes the built-in functions that ship with Graylog.

Function

Category

Description

Syntax

abbreviate

String

Abbreviates a string using ellipses. The width defines the maximum length of the resulting string.

abbreviate(value: string, width: long)

abusech_ransom_lookup_domain

String

Matches a domain name against the abuse.ch Ransomware Domain Blocklist RW_DOMBL.

abusech_ransom_lookup_domain(domain_name) : GenericLookupResult

abusech_ransom_lookup_ip

String

Matches a IPv4 or IPv6 address against the abuse.ch Ransomware Domain Blocklist RW_DOMBL.

abusech_ransom_lookup_ip(ip_address) : GenericLookupResult

add_asset_categories

Asset Enrichment

Adds a list of categories to an asset.

add_asset_categories(asset_name: string, categories: list)

anonymize_ip

Anonymization

Anonymizes an IP address by setting the last octet to 0.

anonymize_ip(ip) : IpAddress

array_contains

Message Handling

Checks if the specified element is contained in the array.

See example

array_contains (elements, value, [case-sensitive]): boolean

array_remove

Message Handling

Removes the specified element from the array.

See example

array_remove (elements, value, [remove_all]) : list

base16_decode

String

Provides base16 decoding of the string that returns lower-case letters. It requires regular hexadecimals, 0-9 A-F.

base16_decode (value, [omit_padding: boolean])

base16_encode

String

Provides standard case-insensitive hexadecimal encoding using a 16-character subset. It requires regular hexadecimals, 0-9 A-F.

base16_encode (value, [omit_padding: boolean])

base32_decode

String

Decodes a string using a 32-character subset. Uses the "numerical" base 32 and is extended from the traditional hexadecimal alphabet, 0-9 A-V.

base32_decode (value, [omit_padding: boolean])

base32_encode

String

Encodes a string using a 32-character subset. Uses the "numerical" base 32 and is extended from the traditional hexadecimal alphabet, 0-9 A-V.

base32_encode (value, [omit_padding: boolean])

base32human_decode

String

Decodes a string in human-readable format using a 32-character subset. It is a "readable" base 32, so there is no possibility of confusing 0/O or 1/I, A-Z 2-7.

base32human_decode (value, [omit_padding: boolean])

base32human_encode

String

Encodes a string in human-readable format using a 32-character subset. It is a "readable" base 32, so there is no possibility of confusing 0/O or 1/I, A-Z 2-7.

base32human_encode (value, [omit_padding: boolean])

base64_decode

String

Decodes a string using a 64-character subset. Regular base64 allows both upper and lowercase letters. It does not need to be human readable.

base64_decode (value, [omit_padding: boolean])

base64_encode

String

Decodes a string using a 64-character subset. Regular base64 allows both upper and lowercase letters. It does not need to be human readable.

base64_encode (value, [omit_padding: boolean])

base64url_decode

String

Provides URL-safe decoding of a string using a 64-character subset. It is safe to use as file names or to pass in URLs without escaping.

base64url_decode (value, [omit_padding: boolean])

base64url_encode

String

Provides URL-safe encoding of the string using a 64-character subset. It is safe to use as file names or to pass in URLs without escaping.

base64url_encode (value, [omit_padding: boolean])

capitalize

String

Capitalizes a string, changing the first letter to title case.

capitalize(value: string)

cidr_match

Boolean/Message Function

Checks whether the given IP address object matches the cidr pattern.

See also: to_ip

cidr_match(cidr: string, ip: IpAddress)

clone_message

Message Handling

Clones a message. If message is omitted, this function uses the currently processed message.

clone_message([message: Message])

concat

String

Returns a new string combining the text of first and second. The concat function only concatenates two strings. If you want to build a string from more than two sub-strings, you must use concat multiple times.

See example

concat(first: string, second: string)

contains

String

Checks if a string contains another string. It ignores the case.

See example

contains(value: string, search: string, [ignore_case: boolean])

crc32

String Function/Encoding

Returns the hex-encoded CRC32 digest of the given string.

crc32(value: string)

crc32c

String Function/Encoding

Returns the hex-encoded CRC32C (RFC 3720, Section 12.1) digest of the given string.

crc32c(value: string)

create_message

Message Handling

Creates a new message from the given parameters. If any of these parameters is omitted, their value is taken from the corresponding fields of the currently processed message. If timestamp is omitted, the timestamp of the created message will be the timestamp at that moment in time.

create_message([message: string], [source: string], [timestamp: DateTime])

csv_to_map

Conversion

Converts a single line of a CSV string into a map usable by set_fields.

See also: set_fields

csv_to_map(value, fieldNames, [separator], [quoteChar], [escapeChar], [strictQuotes], [trimLeadingWhitespace], [ignoreExtraFieldNames])

days

Date/Time

Creates a time period with value number of days.

See also: is_period, period

days(value: long)

debug

Debug

Prints the passed value as a string in the Graylog log. Note that the debug message will only appear in the log of the Graylog node processing the message you are trying to debug.

See example

debug(value: any)

drop_message

Message Handling

Removes the given message after the rule is finished executing. This does not prevent later stages of the same pipeline from being applied to the message. If message is omitted, this function uses the currently processed message. This can be used to implement flexible blacklisting based on various conditions.

See example

drop_message(message: Message)

ends_with

String

Checks if value ends with suffix, optionally ignoring the case of the string.

See example

ends_with(value: string, suffix: string, [ignore_case: boolean])

expand_syslog_priority

Conversion

Converts a syslog priority number to its level and facility.

expand_syslog_priority(value: any)

expand_syslog_priority_as_string

Conversion

Converts the syslog priority number in value to its severity and facility string representations.

expand_syslog_priority_as_string(value: any)

first_non_null

List

Returns first element found in the specified list that is not null. Returns null for an empty list.

first_non_null (value: list)

flatten_json

String

Parses the value string as a JSON tree while flattening all containers to a single level. Parsing of JSON arrays is determined by the array_handler parameter value. Available options for array_handler are:

  • ignore: Ignores all top-level arrays.

  • json: Returns top-level arrays as valid JSON strings.

  • flatten: Explodes all arrays and objects into top-level key/values.

flatten_json(value, array_handler) : JsonNode

flex_parse_date

Date/Time

Uses the Natty date parser to parse a date and time value. If no timezone is detected in the pattern, the optional timezone parameter is used as the assumed timezone. If omitted the timezone defaults to UTC. In case the parser fails to detect a valid date and time, the default date and time is being returned; otherwise, the expression fails to evaluate and will be aborted.

See also: is_date

flex_parse_date(value: string, [default: DateTime], [timezone: string])

format_date

Date/Time

Returns the given date and time value formatted according to the format string. If no timezone is given, it defaults to UTC.

format_date(value: DateTime, format: string, [timezone: string])

from_forwarder_input

Message Handling

Checks whether the currently processed message was received on the given forwarder input. The input can be looked up by either specifying its name (the comparison ignores the case) or the id.

from_forwarder_input(id: string | name: string)

from_input

Message Handling

Checks whether the currently processed message was received on the given (non-forwarder) input. The input can be looked up by either specifying its name (the comparison ignores the case) or the id.

from_input(id: string | name: string)

get_field

Message Handling

Retrieves the value for a field.

get_field(field, [message]) : Object

grok

Pattern Matching

Applies the grok pattern grok to value. Returns a match object, containing a map of field names and values. You can set only_named_captures to true to return only matches using named captures. The result of executing the grok function can be passed as argument for set_fields to set the extracted fields into a message.

See also: set_fields

grok(pattern: string, value: string, [only_named_captures: boolean])

grok_exists

Boolean

Checks if the given Grok pattern exists. log_missing determines whether a log message is generated when no matching pattern is found.

grok_exists (pattern:string, [log_missing:boolean])

has_field

Boolean/Message Function

Checks whether the given message contains a field with the name field. If message is omitted, this function uses the currently processed message.

has_field(field: string, [message: Message])

hours

Date/Time

Creates a time period with value number of hours.

hours(value: long)

in_private_net

Message Handling

Checks if an IP address is in a private network as defined in RFC 1918 (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16) or RFC 4193 (fc00::/7).

in_private_net(ip_address) : Boolean

is_bool

Boolean

Checks whether the given value is a Boolean value (true or false).

is_bool(value: any)

is_collection

Boolean

Checks whether the given value is an iterable collection.

is_collection(value: any)

is_date

Boolean

Checks whether the given value is a date (of type DateTime).

See also: now, parse_date, flex_parse_date, parse_unix_milliseconds

is_date(value: any)

is_double

Boolean

Checks whether the given value is a floating point value (of type double).

See also: to_double

is_double(value: any)

is_ip

Boolean

Checks whether the given value is an IP address (IPv4 or IPv6).

See also: to_ip

is_ip(value: any)

is_json

Boolean

Checks whether the given value is a parsed JSON tree.

See also: parse_json

is_json(value: any)

is_list

Boolean

Checks whether a value is an iterable list.

is_list(value: any)

is_long

Boolean

Checks whether a value is an integer value (of type long).

See also: to_long

is_long(value: any)

is_map

Boolean

Checks whether the given value is a map.

See also: to_map

is_map(value: any)

is_not_null

Boolean

Checks whether a value is not null.

See example

is_not_null(value: any)

is_null

Boolean

Checks whether a value is null.

See example

is_null(value: any)

is_number

Boolean

Checks whether the given value is a numeric value (of type long or double).

See also: is_double, to_double, is_long, to_long

is_number(value: any)

is_period

Boolean

Checks whether the given value is a time period (of type period).

See also: years, months, weeks, days, hours, minutes, seconds, millis, period

is_period(value: any)

is_string

Boolean

Checks whether a value is a string.

See also: to_string

is_string(value: any)

is_url

Boolean

Checks whether the given value is a parsed URL.

See also: to_url

is_url(value: any)

join

String

Joins the specified range of elements of the provided array into a single string. Start index defaults to 0, and end index defaults to the last element index of the list. If specified, the elements are separated by the delimiter in the resulting string.

join (elements: list, [delimiter:string], [start:long], [end:long])

key_value

Boolean

Extracts key-value pairs from the given value and returns them as a map of field names and values. You can optionally specify:

  • delimiters: Characters used to separate pairs. We will use each character in the string, so you do not need to separate them. Default value: <[whitespace]>.

  • kv_delimiters: Characters used to separate keys from values. Again, there is no need to separate each character. Default value: =.

  • ignore_empty_values: Ignores keys containing empty values. Default value: true.

  • allow_dup_keys: Indicates if duplicated keys are allowed. Default value: true.

  • handle_dup_keys: How to handle duplicated keys (if allow_dup_keysis is set). It can take the values take_first, which will only use the first value for the key, or take_last, which will only use the last value for the key. Setting this option to any other value will change the handling to concatenate, which will combine all values given to the key, separating them with the value set in this option. For example, setting handle_dup_keys: "," would combine all values given to a key a, separating them with a comma, such as 1,2,foo. Default value: take_first.

  • trim_key_chars: Characters to trim (remove from the beginning and end) from keys. Default value: no trim.

  • trim_value_chars: Characters to trim (remove from the beginning and end) from values. Default value: no trim.

Also note the result of executing the key_value function can be passed as argument for set_fields to set the extracted fields into a message.

See also: set_fields

key_value ( value : string , [ delimiters : string ], [ kv_delimiters : string ], [ ignore_empty_values : boolean ], [ allow_dup_keys : boolean ], [ handle_dup_keys : string ], [ trim_key_chars : string ], [ trim_value_chars : string ] )

length

String

Counts the characters in a string. If bytes=true, it counts the number of bytes instead (assumes UTF-8 encoding).

length (value:string, [bytes: boolean])

list_count

List

Gets number of elements in list.

list_count(list:list) : Long

list_get

List

Gets a value from a list.

list_get(list:list, index:long) : Object

lookup

Lookup

Looks up a multi value in the named lookup table.

See example

lookup(lookup_table: string, key: any, [default: any])

lookup_add_string_list

Lookup

Adds a string list in the named lookup table and returns the updated list on success or returns null on failure. This function only supports the MongoDB Lookup Table at the time of writing.

lookup_add_string_list(lookup_table, key, value,[keep_duplicates])

lookup_all

Lookup

Looks up all provided values in the named lookup table and returns all results as an array.

See example

lookup_all(lookup_table, keys) : list

lookup_assign_ttl

Lookup

Add a time to live to the key in the named lookup table. Returns the updated entry on success and null on failure.

lookup_assign_ttl(lookup_table, key, ttl) : Object

lookup_clear_key

Lookup

Clears (removes) a key in the named lookup table. This function only supports the MongoDB Lookup Table at the time of writing.

lookup_clear_key(lookup_table, key)

lookup_has_value

Lookup

Determines whether a given key is present in a lookup table. Will return true if the key is present and false if the key is not present.

lookup_has_value (lookup_table, key)

lookup_remove_string_list

Lookup

Removes the entries of the given string list from the named lookup table. Returns the updated list on success and returns null on failure.This function only supports the MongoDB Lookup Table at the time of writing.

lookup_remove_string_list(lookup_table, key, value)

lookup_set_string_list

Lookup

Sets a string list in the named lookup table. Returns the new value on success and returns null on failure.This function only supports the MongoDB Lookup Table at the time of writing.

lookup_set_string_list(lookup_table:string, key:string, value:list)

lookup_set_value

Lookup

Sets a single value in the named lookup table. Returns the new value on success and returns null on failure.This function only supports the MongoDB Lookup Table at the time of writing.

lookup_set_value(lookup_table, key, value)

lookup_string_list

Lookup

Looks up a string list value in the named lookup table.This function only supports the MongoDB Lookup Table at the time of writing.

lookup_string_list(lookup_table, key, [default])

lookup_string_list_contains

Boolean

Looks up value in the string list referenced by the key in the named lookup table. Returns true only if the key/value mapping is present, otherwise it returns false.

lookup_string_list_contains (lookup_table, key, value)

lookup_value

Lookup

Looks up a single value in the named lookup table.

See example

lookup_value(lookup_table: string, key: any, [default: any])

lowercase

String

Converts a String to lower case. The locale (IETF BCP 47 language tag) defaults to en.

lowercase(value: string, [locale: string])

machine_asset_lookup

Asset Enrichment

Looks up a single machine asset. If multiple assets match the input parameters, only one will be returned.

machine_asset_lookup(lookup_type, value) : Map

machine_asset_update

Asset Enrichment

Updates the IP or MAC addresses for a machine asset. If multiple assets match the input parameters, only one will be selected.

machine_asset_update(lookup_type, lookup_value, [ip_addresses], [hostnames]) : Void

map_copy

Map

Retrieves a value from a map.

map_copy(map) : Map

map_get

Map

Copies a map to a new map.

map_get(map, key) : Object

map_remove

Map

Removes a key from the map.

map_remove(map, key) : Map

map_set

Map

Sets a key in the map.

map_set(map, key, value) : Map

md5

String

Creates the hex-encoded MD5 digest of the value.

md5(value: string)

metric_counter_inc

Debug

Counts specific metric criteria. The counter metric name will always be prefixed with org.graylog.rulemetrics. The default value is 1 if no increment value is specified.

metric_counter_inc (name, [value]): Void

millis

Date/Time

Creates a time period with a value number of milliseconds.

See also: is_period, period

millis(value: long)

minutes

Date/Time

Creates a time period with value number of minutes.

See also: is_period, period

minutes(value: long)

months

Date/Time

Creates a time period with value number of months.

See also: is_period, period

months(value: long)

multi_grok

Applies a list of Grok patterns to a string and returns the first match.

See example

multi_grok(patterns, value, [only_named_captures]) : GrokMatch$GrokResult

murmur3_128

Encoding

Creates the hex-encoded MurmurHash3 (128-bit) digest of the value.

murmur3_128(value: string)

murmur3_32

Encoding

Creates the hex-encoded MurmurHash3 (32-bit) digest of the value.

murmur3_32(value: string)

normalize_fields

Message Handling

Normalizes all field names by setting them to lowercase.

normalize_fields([message]) : Void

now

Date/Time

Returns the current date and time. Uses the default time zone UTC.

See also: is_date

now([timezone: string])

otx_lookup_domain

String

Looks up AlienVault OTX threat intelligence data for a domain name. Requires a configured lookup table named otx-api-domain.

See example

otx_lookup_domain (domain_name: string) : OTXLookupResult

otx_lookup_ip

String

Looks up AlienVault OTX threat intelligence data for an IPv4 or IPv6 address. Requires a configured lookup table named otx-api-ip.

See example

otx_lookup_ip (ip_address: string) : OTXLookupResult

parse_cef

String

Parses any CEF-formatted string into its fields. This is the CEF string (starting with CEF:) without a syslog envelope.

parse_cef(cef_string, use_full_names) : CEFParserResult

parse_date

Date/Time

Parses a date string using the given date format.

parse_date(value: string, pattern: string, [locale: string], [timezone: string])

parse_json

String

Parses the value string as JSON, returning the resulting JSON tree.

See also: to_map

parse_json(value: string)

parse_unix_milliseconds

Date/Time

Attempts to parse a UNIX millisecond timestamp (milliseconds since 1970-01-01T00:00:00.000Z) into a proper DateTime object.

See also: is_date

See example

parse_unix_milliseconds(value: long)

period

Date/Time

Parses an ISO 8601 time period from value.

See also: is_period, years, months, weeks, days, hours, minutes, seconds, millis

period(value: string)

regex

Pattern Matching

Matches a string with a regular expression. Uses Java syntax.

regex(pattern: string, value: string, [group_names: array[string])

regex_replace

Pattern Matching

Matches the regular expression in pattern against value and replaces it, if matched, with replacement. You can use numbered capturing groups and reuse them in the replacement string. If replace_all is set to true, then all matches will be replaced; otherwise, only the first match will be replaced.

See example

regex_replace(pattern: string, value: string, replacement: string,[replace_all: boolean])

remove_asset_categories

Asset Enrichment

Remove a list of categories from an asset.

remove_asset_categories(asset_name, categories) : Void

remove_field (legacyDeprecated)

Message Handling

Removes the given field with the name field from the given message, unless the field is reserved. If message is omitted, this function uses the currently processed message.

See instead: remove_single_field, remove_multiple_fields

remove_field(field: string, [message: Message])

remove_from_stream

Message Handling

Removes the message from the given stream. The stream can be looked up by either specifying its name or the id. If message is omitted, this function uses the currently processed message. If the message ends up being on no stream, it is implicitly routed back to the default stream “All messages.” This ensures that the message is not lost due to complex stream routing rules.

If you want to discard the message entirely, use the drop_message function. With remove_from_stream, the message continues to be processed in following stages. To abort processing, use drop_message, or structure the stage conditions so that the following stages are not executed after remove_from_stream has been called.

remove_from_stream(id: string | name: string, [message: Message])

remove_multiple_fields

Message Handling

Removes fields matching a regular expression (regex) pattern and/or list of names, unless the field name is reserved.

remove_multiple_fields ([pattern: string],[names: list],[message: Message])

remove_single_field

Message Handling

Removes a single field from a message, unless the field name is reserved.

remove_single_field (field: string, [message: Message])

rename_field

Message Handling

Modifies the field name old_field to new_field in the given message, keeping the field value unchanged.

rename_field(old_field: string, new_field: string, [message: Message])

replace

String

Replaces the first max or all occurrences of a string within another string. max is -1 per defaults, which means to replace all occurrences, use 1 only for the first one, 2 for the first two, and so on.

See example

replace(value: string, search: string, [replacement: string], [max: long])

route_to_stream

Message Handling

Sets a stream assignment of the message to the given stream. Functions as 'copy' and does not remove the message from the current stream. If message is omitted, this function uses the currently processed message. This causes the message to be evaluated on the pipelines connected to that stream, unless the stream has already been processed for this message. If remove_from_default is true, the message is also removed from the default stream, “All messages”. remove_from_default will take effect after the current pipeline has finished resolving. This rule does not prevent later stages of the pipeline from being applied to the message. The stream can also be looked up by either specifying its name or the id.

See example

route_to_stream(id: string | name: string, [message: Message], [remove_from_default: boolean])

seconds

Date/Time

Create a time period with value number of seconds.

See also: is_period, period

seconds(value: long)

select_jsonpath

Map

Evaluates the given paths against the json tree and returns the map of the resulting values.

See also: is_json, parse_json

select_jsonpath(json: JsonNode, paths: Map<string, string>)

set_associated_assets

Asset Enrichment

Adds associated asset information.

set_associated_assets([message]) : Void

set_field

Message Handling

Sets the given field to the new value. The field name must be valid and specifically cannot include a period character. It is trimmed of leading and trailing whitespace. String values are trimmed of whitespace as well. The optional prefix and suffix parameters specify which prefix or suffix should be added to the inserted field name. The optional clean_field parameter replaces invalid field name characters with underscores. If message is omitted, this function uses the currently processed message. Use the default when no value is available (i.e. it is null or throws an exception).

See also: set_fields

set_field(field: string, value: any, [prefix: string], [suffix: string], [message: Message], [default: any, [clean_field: boolean])

set_fields

Message Handling

Sets all of the given name-value pairs in field in the given message. This is a convenience function acting like set_field. It can be helpful for using the result of a function like select_jsonpath or regex in the currently processed message, especially when the key names are the result of a regular expression. The optional prefix and suffix parameters specify which prefix or suffix should be added to the inserted field names. The optional clean_fields parameter replaces invalid field name characters with underscores. If message is omitted, this function uses the currently processed message.

See also: set_field, to_map, grok, key_value

set_fields(fields: Map<string, any>, [prefix: string], [suffix: string], [message: Message], [clean_fields: boolean)

sha1

Encoding

Creates the hex-encoded SHA1 digest of the value.

sha1(value: string)

sha256

Encoding

Creates the hex-encoded SHA256 digest of the value.

sha256(value: string)

sha512

Encoding

Creates the hex-encoded SHA512 digest of the value.

sha512(value: string)

spamhaus_lookup_ip

Lookup

Matches an IP address against the Spamhaus DROP and EDROP lists.

spamhaus_lookup_ip(ip_address) : GenericLookupResult

split

String

Splits a string around matches of this pattern. Uses Java syntax.

split(pattern: string, value: string, [limit: int])

starts_with

String

Checks if value starts with prefix, optionally ignoring the case of the string.

See example

starts_with(value: string, prefix: string, [ignore_case: boolean])

string_array_add

String

Adds the specified string (or string array) value to the supplied string array. Casts the input array and value/value array to strings.

See example

string_array_add(elements, value, [only_unique]) : list

string_entropy

String

Computes Shannon's entropy of the character distribution in the given string.

string_entropy (value: string, [default: double])

substring

String

Returns a substring of value starting at the start offset (zero based indices), optionally ending at the end offset. Both offsets can be negative, indicating positions relative to the end of value.

See example

substring(value: string, start: long, [end: long])

swapcase

String

Swaps the case of a String changing upper and title case to lower case and lower case to upper case.

swapcase(value: string)

syslog_facility

Conversion

Converts the syslog facility number in value to its string representation.

syslog_facility(value: any)

syslog_level

Conversion

Converts the syslog severity number in value to its string representation.

syslog_level(value: any)

threat_intel_lookup_domain

Lookup

Matches a domain name against all enabled threat intel sources, except OTX.

threat_intel_lookup_domain(domain_name, prefix) : GlobalLookupResult

threat_intel_lookup_ip

Lookup

Matches an IP address against all enabled threat intel sources, except OTX.

threat_intel_lookup_ip(ip_address, prefix) : GlobalLookupResult

to_bool

Conversion

Converts the single parameter to a Boolean value using its string value.

to_bool(value: any)

to_date

Conversion

Converts value to a date. If no timezone is given, it defaults to UTC.

See also: is_date

to_date(value: any, [timezone: string])

to_double

Conversion

Converts the first parameter to a double floating point value.

to_double(value: any, [default: double])

to_ip

Conversion

Converts the given ip string to an IpAddress object.

See also: cidr_match

to_ip(ip: string)

to_long

Conversion

Converts the first parameter to a long integer value.

to_long(value: any, [default: long])

to_map

Conversion

Converts the given map-like value to a valid map. The to_map function currently only supports converting a parsed JSON tree into a map so that it can be used together with set_fields.

See also: set_fields, parse_json

See example

to_map(value: any)

to_string

Conversion

Converts the first parameter to its string representation.

to_string(value: any, [default: string])

to_url

Conversion

Converts the given url to a valid URL.

to_url(url: any, [default: string])

tor_lookup

Lookup

Matches an IP address against known Tor exit nodes to identify connections from the Tor network.

tor_lookup(ip_address) : GenericLookupResult

traffic_accounting_size

Message Handling

Calculates the size of the entire message, including all extra fields. This is also the value used to determine how much the message counts toward license usage.

See example

traffic_accounting_size [(message)]: long

uncapitalize

String

Uncapitalizes a string, changing the first letter to lower case.

uncapitalize(value: string)

uppercase

String

Converts a string to upper case. The locale (IETF BCP 47 language tag) defaults to en.

uppercase(value: string, [locale: string])

urldecode

String

Decodes an application/x-www-form-urlencoded string using a specific encoding scheme.

url decode (value:string, [charset:string])

urlencode

String

Translates a string into application/x-www-form-urlencoded format using a specific encoding scheme. Valid charsets are, for example, UTF-8, US-ASCII, etc. Default is UTF-8.

url encode (value, [charset])

user_asset_lookup

Asset Enrichment

Looks up a single user asset. If multiple assets match the input parameters, only one will be returned.

user_asset_lookup(lookup_type, value) : Map

watchlist_add

Watchlist

Adds a value to a watchlist referenced by type. Returns true on success and false on failure and throws an exception if the watchlist is not configured correctly.

watchlist_add(type, value) : Boolean

watchlist_contains

Watchlist

Looks up a value in the watchlist referenced by the type. Returns true on success and false on failure and throws an exception if the watchlist is not configured correctly.

watchlist_contains(type, value) : Boolean

watchlist_remove

Watchlist

Removes a value from a watchlist referenced by type. Returns true on success and false on failure and throws an exception if the watchlist is not configured correctly.

watchlist_remove(type, value) : Boolean

weeks

Date/Time

Creates a time period with value number of weeks.

See also: is_period, period

weeks(value: long)

whois_lookup_ip

Lookup

Retrieves WHOIS information for an IP address

whois_lookup_ip(ip_address, prefix) : WhoisIpLookupResult

years

Date/Time

Creates a time period with value number of years.

See also: is_period, period

years(value: long)

Examples

Function

Example

array_contains

rule "array_contains"
when
    true
then
    set_field("contains_number", array_contains([1, 2, 3, 4, 5], 1));
    set_field("does_not_contain_number", array_contains([1, 2, 3, 4, 5], 7));
    set_field("contains_string", array_contains(["test", "test2"], "test"));
    set_field("contains_string_case_insensitive", array_contains(["test", "test2"], "TEST"));
    set_field("contains_string_case_sensitive", array_contains(["test", "test2"], "TEST", true));
end

array_remove

rule "array_remove"
when
    true
then
    set_field("remove_number", array_remove([1, 2, 3], 2));
    set_field("remove_string", array_remove(["one", "two", "three"], "two"));
    set_field("remove_missing", array_remove([1, 2, 3], 4));
    set_field("remove_only_one", array_remove([1, 2, 2], 2));
    set_field("remove_all", array_remove([1, 2, 2], 2, true));
end

concat

let build_message_0 = concat(to_string($message.protocol), " connect from ");
let build_message_1 = concat(build_message_0, to_string($message.src_ip));
let build_message_2 = concat(build_message_1, " to ");
let build_message_3 = concat(build_message_2, to_string($message.dst_ip));
let build_message_4 = concat(build_message_3, " Port ");
let build_message_5 = concat(build_message_4, to_string($message.dst_port));
set_field("message", build_message_5);

contains

contains(to_string($message.hostname), "example.org", true)

debug

Dropped message from <source>"let debug_message = concat("Dropped message from ", to_string($message.source));debug(debug_message);`

drop_message

rule "drop messages over 16383 characters"
when    
    has_field("message") AND    
    regex(pattern: "^.{16383,}$", value: to_string($message.message)).matches == true
then   
    drop_message();    
    // added debug message to be notified about the dropped message    
    debug( concat("dropped oversized message from ", to_string($message.source)));
end

ends_with

Returns true:

ends_with (  "Foobar Baz Quux" , "quux" , true  );

Returns false:

ends_with (  "Foobar Baz Quux" , "Baz"  ); `

grok_exists

when
  grok_exists("USERNAME")
then
  let parsed = grok("%{USERNAME:username}", to_string($message.message));
  set_field("parsed_username", parsed.username);
end

hex_to_decimal_byte_list

hex_to_decimal_byte_list(value: "0x17B90004");

Returns: [23, 185, 0, 4]

hex_to_decimal_byte_list(value: "0x117B90004");

Returns: [1, 23, 185, 0, 4]

hex_to_decimal_byte_list(value: "17B90004");

Returns: [23, 185, 0, 4]

hex_to_decimal_byte_list(value: "117B90004");

Returns: [1, 23, 185, 0, 4]

hex_to_decimal_byte_list(value: "not_hex");

Returns: null

is_not_null

is_null(src_addr)

lookup

rule "dst_ip geoip lookup"
when 
    has_field("dst_ip")
then  
    let geo = lookup("geoip-lookup", to_string($message.dst_ip));
    set_field("dst_ip_geolocation", geo["coordinates"]); 
    set_field("dst_ip_geo_country_code", geo["country"].iso_code); 
    set_field("dst_ip_geo_country_name", geo["country"].names.en); 
    set_field("dst_ip_geo_city_name", geo["city"].names.en);
 end

lookup_all

rule "function lookup all"
when
    true
then
    let values = lookup_all("lut_name", ["key1", "key2", "key3"]);
    set_field("values", values); 
end

lookup_value

("ip_lookup", to_string($message.src_addr));

multi_grok

when
  true
then
  set_fields(
    fields: multi_grok(
        patterns: [
            "^ABC %{IPORHOST:msg_ip}: %{GREEDYDATA:abc_message}",
            "^123 %{IPORHOST:msg_ip}: %{GREEDYDATA:123_message}",
            "^ABC2 %{IPORHOST:abc_ip}: %{GREEDYDATA:abc_message}"
            ],
        value: to_string($message.message),
        only_named_captures: true
    )
  );
end

otx_lookup_domain

rule "PARSE IP to DNS"
when
    has_field("source_ip")
    && regex(
        pattern: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
        value: to_string($message.source_ip)
        ).matches == true
then
    let rs = lookup_value("dns_lookups", to_string($message.source_ip));
    set_field("source_ip_dns", to_string(rs));
end

otx_lookup_ip

rule "PARSE source_ip - otx-api-ip"
when
    // validate message has a source_ip field
    has_field("source_ip")
    // validate that soruce IP is IPv4 format
    && regex(
        pattern: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
        value: to_string($message.source_ip)
        ).matches == true
then
    let rs = otx_lookup_ip(to_string($message.source_ip));
    set_fields(rs);
end

parse_unix_milliseconds

set_field ( "timestamp" , timestamp );

regex_replace

let username = regex_replace(".*user: (.*)", to_string($message.message), "$1");

replace

let new_field = replace(to_string($message.message), "oo", "u");    // "fu ruft uta"
let new_field = replace(to_string($message.message), "oo", "u", 1); // "fu rooft oota"

route_to_stream

route_to_stream(id: "512bad1a535b43bd6f3f5e86");

starts_with

Returns true:

starts_with ( "Foobar Baz Quux" , "foo" , true ); 

Returns false:

starts_with ( "Foobar Baz Quux" , "Quux" );

string_array_add

rule "string_array_add"
when
    true
then
    set_field("add_number_to_string_array_converted", string_array_add(["1", "2"], 3));
    set_field("add_number_array_to_string_array_converted", string_array_add(["1", "2"], [3, 4]));
    set_field("add_string", string_array_add(["one", "two"], "three"));
    set_field("add_string_again", string_array_add(["one", "two"], "two"));
    set_field("add_string_again_unique", string_array_add(["one", "two"], "two", true));
    set_field("add_array_to_array", string_array_add(["one", "two"], ["three", "four"]));
end

substring

= substring(to_string($message.message), 0, 20);

to_map

let json = parse_json(to_string($message.json_payload));
let map = to_map(json);
set_fields(map);

traffic_accounting_size

set_field(
    field: "license_usage",
    value: traffic_accounting_size() // size in bytes
    //value: traffic_accounting_size() / 1024 // size in kb
    );

Log enrichment

Lookup Tables

Lookup tables allow you to map, translate, or enrich log data by replacing message field values with new ones, or by creating entirely new message fields. For example, you can use a static CSV file to map IP addresses to hostnames or use an external data source to enrich messages with threat intelligence, geolocation, or asset information.

This feature makes it possible to enhance raw log data with context from internal systems or third-party integrations, transforming it into richer, actionable insights.

Components

The lookup table system consists of four components:

Data adapters are used to do the actual lookup for a value. They might read from a CSV file, connect to a database, or execute requests to receive the lookup result.

Data adapter implementations are pluggable and new ones can be added through plugins.

Warning

The CSV file adapter reads the entire contents of the file into HEAP memory. Ensure that you size the HEAP accordingly.

The caches are responsible for caching the lookup results to improve the lookup performance and/or to avoid overloading databases and APIs. They are separate entities to make it possible to reuse a cache implementation for different data adapters. That way, the data adapters do not have to care about caching and do not have to implement it on their own.

Cache implementations are pluggable and new ones can be added through plugins.

Tip

The CSV file adapter refreshes its contents within each check interval if the file was changed. If the cache was purged but the check interval has not elapsed, lookups might return expired values.

The lookup table component ties a data adapter instance and a cache instance together. It is needed to enable the usage of the lookup table in converters, pipeline functions, and decorators.

The lookup result is returned by a lookup table through the data adapter and can contain two types of data. A single value and a multi value .

The single value can be a string, number or boolean and will be used in converters, decorators and pipeline rules. In our CSV example to lookup host names for IP addresses, this would be the host name string.

A multi value is a map or dictionary-like data structure and can contain several different values. This is useful if the data adapter can provide multiple values for a key. A good example for this would be the geo-ip data adapter which does not only provide the latitude and longitude for an IP address, but also information about the city and country of the location. Currently, the multi value can only be used in a pipeline rule when using the lookup() pipeline function.

Example 1: Output for a CSV data adapter including a single value and a multi value.

example 1.png

Example 2: Output for the geo-ip data adapter including a single value and a multi value.

example 2.png

Setup

You can configure lookup tables in System > Lookup Tables window.

A lookup table requires at least one data adapter and one cache.

  1. Create a Data Adapter:

    1. Go to System > Lookup Tables > Data Adapters.

    2. Select Create Adapter and select a data adapter type.

    3. Complete the adapter configuration form, which includes built-in documentation for each type.

  2. Create a Cache:

    1. Go to System → Lookup Tables → Caches.

    2. Click Create Cache and choose a cache type.

    3. Complete the cache configuration form. Review the cache-specific documentation included in the form.

      Note

      Null results are cached unless you select Ignore empty results during configuration.

  3. Create a Lookup Table:

    1. Go to System > Lookup Tables.

    2. Select Create Lookup Table.

    3. Select your data adapter and cache instances, and optionally define a default value.

      Note

      The default value is used when a lookup does not return a result. If a key is not found in the lookup table, Security Data Lake automatically returns the defined default value.

Once created, the lookup table can be referenced in extractors, decorators, and pipeline rules.

Usage

Lookup tables can be applied in several areas of Security Data Lake to enrich and contextualize data:

  • Converters – Perform lookups on extracted values during message ingestion.

  • Decorators – Enrich messages at search time without modifying stored data.

  • Pipeline Rules – Apply logic dynamically with the lookup() or lookup_value() functions.

Built-in Data Adapters

Security Data Lake ships with several ready-to-use data adapters. Each type has on-screen documentation in the Edit Data Adapter form.

Adapter

Description

CSV File Adapter

Performs key/value lookups from a static CSV file.

DNS Lookup Adapter

Performs hostname and IP resolution (A, AAAA, PTR, and TXT records).

DSV File Adapter

Similar to CSV, but supports custom delimiters and configurable key/value columns.

HTTPS JSONPath Adapter

Executes GET requests and extracts data using JSONPath expressions.

Geo IP – MaxMind

Provides geolocation data for IP addresses using MaxMind databases.

MongoDB

Security Data Lake adds support for MongoDB Data Adapters, which store lookup data directly in the Graylog configuration database. Entries can be added, updated, or deleted via the API, the GUI, or pipeline functions.

Managing MongoDB Data Adapters via API

Example curl request to add a key:

curl -u <token>:token \  
-H 'X-Requested-By: cli' \  
-H 'Accept: application/json' \  
-H 'Content-Type: application/json' \  
-X POST 'http://127.0.0.1:9000/api/plugins/org.graylog.plugins.lookup/lookup/adapters/mongodb/mongodb-data-name' \
--data-binary '{
    "key": "myIP",
    "values": ["12.34.42.99"],
    "data_adapter_id": "5e578606cdda4779dd9f2611"
  }'

Note

Entries can also be managed directly from the Security Data Lake UI or modified dynamically via pipeline rules using lookup-related functions.

Tip

To add multiple values for a single key in the GUI, separate each value with a newline.

Geolocation

Security Data Lake lets you extract and visualize geolocation information from IP addresses in your logs.

This article provides you with step-by-step instructions on how to configure a geolocation processor and create a map using the extracted geolocation data.

Set Up the Processor

Security Data Lake ships with geolocation capabilities by default but additional configuration is still required. This section explains how to configure the functionality in detail.

Note

You must create an account to obtain a license key to download the MaxMind databases. More information is available on MaxMind’s blog post

Configure the Processor

You need to configure Graylog to start using the geolocation database to resolve IP addresses in your logs.

  1. Navigate to System > Configurations.

  2. Select Plugins > Geo-Location Processor, then click Edit configuration.

  3. Select the Enable Geo-location processor check box.

  4. Choose either MaxMind or IPInfo from the drop-down menu.

  5. Enter the paths to both the city and ASN databases you use. You can also adjust the refresh interval.

  6. Select Update configuration to save the configuration.

Illuminate and Geolocation

Geolocation configuration is available with Graylog Open. Illuminate is not required to use geolocation data.

If you want geolocation data with Illuminate content, you must ensure that the Illuminate Processor runs before the GeoIP Resolver in Message Processors Configuration. Note that this order should be the default.

To check the configuration in your environment:

  1. Navigate to System > Configurations.

  2. Select Message Processors, then confirm the order in the table.

    If you need to change the order:

    1. Select Edit configuration.

    2. Use drag and drop to reorder the items in the list as required.

    3. Select Update configuration.

Enforce Security Data Lake Schema Option

When you configure the geolocation processor, the Enforce default schema option is selected by default. If you disable schema enforcement, all IP fields that are not reserved IP addresses are processed and have the following fields added with the field name as a prefix:

  • _geolocation

  • _country_code

  • _city_name

An example of the generated fields for the source_ip field might read:

  • source_ip_city_name: Vienna

  • source_ip_country_code: AT

  • source_ip_geolocation: 48.20849, 16.37208

If schema enforcement is enabled, only the following GIM schema fields that are not reserved IP addresses are processed:

  • destination_ip

  • destination_nat_ip

  • event_observer_ip

  • host_ip

  • network_forwarded_ip

  • source_ip

  • source_nat_ip

An example of the generated fields for the source_ip field might read:

  • source_as_number: AS1853

  • source_as_organization: ACONET

  • source_geo_city: Vienna

  • source_geo_coordinates: 48.20849, 16.37208

  • source_geo_country_iso: AT

  • source_geo_name: Vienna, AT

  • source_geo_region: Vienna

  • source_go_timezome: Europe/Vienna

Storing Geolocation Database Files in AWS S3

A configuration option for Pull files from S3 bucket at the bottom of the configuration page lets you pull geolocation database files from AWS S3 buckets. Enabling this feature allows for an S3 bucket URL to be added to the path configuration values.

Geolocation Update Configuration.png

When enabled, a service runs every refresh interval and poll the files in the S3 buckets provided. If those files have been updated since the last poll, then the new files are pulled down onto each node. This service relies on the Default Credentials Provider for credentials to the S3 buckets and does not use any configuration values that may or may not be set in the Security Data Lake AWS Plugin configuration settings.

The geolocation database files retrieved from S3 are stored in the Security Data Lake data_dir directory under the geolocation subdirectory. To change where these files are downloaded, set geo_ip_processor_s3_download_location to the desired location on disk in your Security Data Lake server configuration file.

If the pull files from S3 bucket option is left disabled, all Security Data Lake nodes read the files from the path on disk and require manual updating to those files for updates.

Visualize Geolocations in a Map

Security Data Lake can display maps from geolocation stored in any field, as long as the geo-points are using the latitude,longitude format.

Display a Map in the Search Results Page

On any search result page, you can expand the field you want to use to draw a map in the search sidebar. Click the Create button (+) in the left sidebar and select aggregation under the Generic menu.

This generates an empty aggregation widget. Click Edit and enter your information. Select World Map as the Visualization Type. You then see a map with all the different points stored in that field.

You may click Update preview to take a peek at your map and make any changes before you click Update widget.

Geolocation Map1.png

Note

Adding a metric affects the size of the dot on the map. If there is no metric defined, every dot has the same radius.

For additional fields used in Security Data Lake related to different sources of geo coordinates, view the Security Data Lake Schema.

Add a Map to a Dashboard

You can add the map visualization into any dashboards as you do with other widgets. When you display a map in the search result page:

  1. Click the three dots in the upper right corner.

  2. Select Export to Dashboard.

You can then rename, edit ,and save the new dashboard.

Data adapters

ThreatFox IOC Tracker Data Adapter

ThreatFox is a project from abuse.ch that tracks indicators of compromise (IOCs) associated with malware. The ThreatFox Data Adapter supports lookups by the following key types:

  • URL

  • Domain

  • IP:port

  • MD5 hash

  • SHA256 hash

When you create the data adapter, ThreatFox downloads and stores the data set in MongoDB. The Refresh Interval configuration parameter identifies when to fetch new sets.

Sample Lookup Data

A lookup for the file hash 923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4 may produce the following output:

{
  "first_seen_utc": "2021-07-07T17:03:57.000+0000",
  "ioc_id": "158365",
  "ioc_value": "923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4",
  "ioc_type": "sha256_hash",
  "threat_type": "payload",
  "fk_malware": "win.agent_tesla",
  "malware_alias": [
    "AgenTesla",
    "AgentTesla",
    "Negasteal"
  ],
  "malware_printable": "Agent Tesla",
  "confidence_level": 50,
  "reference": "https://twitter.com/RedBeardIOCs/status/1412819661419433988",
  "tags": [
    "agenttesla"
  ],
  "anonymous": false,
  "reporter": "Virus_Deck"
}
Configure the Data Adapter
  • Title

    • A short title for the data adapter.

  • Description

    • A description of the data adapter.

  • Name

    • A unique name for the data adapter.

  • Custom Error TTL

    • Optional custom TTL for caching erroneous results. The default value is 5 seconds.

  • Include IOCs Older Than 90 Days

    • Optional setting that includes IOCs older than 90 days. By default, Data Adapter's data does not include IOCs older than 90 days. To avoid false positives, handle IOCs older than 90 days carefully.

  • Refresh Interval - Determines how often to fetch new data. The minimum refresh interval is 3600 seconds (1 hour), because that is how often the source data updates.

  • Case Insensitive Lookup - Allows the data adapter to perform case-insensitive lookups.

URLhaus Malware URL Data Adapter

URLhaus is a project from abuse.ch that maintains a database of malicious URLs used for malware distribution. When you create the data adapter, URLhaus downloads and stores the appropriate data set in MongoDB. Refresh Interval configuration identifies when to fetch new sets.

Sample Lookup Data

A lookup for the URL https://192.168.100.100:35564/Mozi.m might produce the following output:

{
  "single_value": "malware_download",
  "multi_value": {
    "date_added": "2021-06-22T17:53:07.000+0000",
    "url_status": "online",
    "threat_type": "malware_download",
    "tags": "elf,Mozi",
    "url": "http://192.168.100.100:35564/Mozi.m",
    "urlhaus_link": "https://urlhaus.abuse.ch/url/1234567/"
  },
  "string_list_value": null,
  "has_error": false,
  "ttl": 9223372036854776000
}
Configure the Data Adapter
  • Title

    • A short title for the data adapter.

  • Description

    • A description of the data adapter.

  • Name

    • A unique name to refer to the data adapter.

  • Custom Error TTL

    • Optional custom TTL for caching erroneous results. If no value is specified, the default is 5 seconds.

  • URLhaus Feed Type

    • Determines which URLhaus feed the data adapter will use.

    • Online URLs is the smaller data set and includes only URLs that have been currently detected online.

    • Recently Added URLs is the larger data set and includes all online and offline URLs added in the last 30 days.

  • Refresh Interval - Determines how often new data is fetched. The minimum refresh interval is 300 seconds (5 minutes) because that is how often the source data can be updated.

  • Case Insensitive Lookup - allows the data adapter to perform case-insensitive lookups.