Pipelines
Pipelines are a core component of Graylog’s log message processing system, providing a structured framework for evaluating, modifying, and routing incoming data. They define the sequence of processing steps applied to messages after ingestion, ensuring consistent, efficient, and customized handling of log data.
Each pipeline consists of a series of rules organized into streams, and can be linked to one or more streams. This connection allows you to determine exactly how and when certain messages are processed, giving you fine-grained control over enrichment, normalization, and routing.
Key Concepts
Pipelines
A pipeline is a collection of pipeline rules organized into stages. When attached to a stream, messages entering that stream are processed through all connected pipelines in the defined order of stages.
Pipeline Rules
Pipeline rules define the logic that determines how messages are processed.
They can:
Route messages to different streams
Enrich data by adding or modifying fields
Transform message content
Normalize message formats for consistent search and analysis
A set of related rules that operate together forms a complete pipeline workflow.
Functions
Functions are the building blocks of pipeline rules. Each function performs a specific operation, such as parsing text, checking field values, or changing message content, and can accept parameters to control its behavior. Functions return results that influence how subsequent rules handle a message.
Stages
Pipelines are divided into stages, each containing one or more rules. Stages are executed sequentially, in numerical order.
All stages with the same priority execute in parallel across all connected pipelines. This structure makes it possible to build multi-step workflows, for example, parsing a message in one stage, enriching it in another, and routing it in the final stage.
Streams
Every pipeline must be attached to at least one stream, which determines which messages the pipeline processes.
Messages entering a stream trigger the execution of all pipelines connected to it. For more information, see Streams.
Pipeline rule logic
Pipelines in Security Data Lake are built from pipeline rules, which define how log messages are inspected, transformed, and routed before they are indexed or stored. These rules use a dedicated domain-specific language (DSL) that provides a controlled, readable syntax for defining processing logic while maintaining strong runtime performance.
Each rule combines a condition and an action:
The condition determines when the rule applies.
The action specifies what happens when the condition is met.
Understanding data types is essential when writing rules. Data types define what kind of value a field holds (such as a string, number, or IP address) and how that value can be manipulated within a rule.
Pipeline rules are built using functions—predefined methods that perform specific tasks such as converting data types, manipulating strings, parsing JSON, or retrieving lookup table data. Graylog includes a wide range of built-in functions to help you enrich, transform, and manage log data effectively.
Rules can be created and tested interactively through the rule builder interface, or written manually in the source code editor for advanced users.
Example pipeline
The following example shows how pipelines and their rules are structured internally:
pipeline "My new pipeline" stage 1 match all rule "has firewall fields"; rule "from firewall subnet"; stage 2 match either rule "geocode IPs"; rule "anonymize source IPs"; end
This pipeline defines two stages:
Stage 1 executes only if all listed rules evaluate to true.
Stage 2 executes if any of its rules match (similar to an OR condition).
Stages run in ascending numerical order, and each can reference reusable rules. This allows modular design: rules like has firewall fields can be shared across multiple pipelines, avoiding duplication.
Example rules
Here are two example rules referenced in the pipeline above:
Rule 1
rule "has firewall fields"
when
has_field("src_ip") && has_field("dst_ip")
then
endRule 2
rule "from firewall subnet"
when
cidr_match("10.10.10.0/24", to_ip($message.gl2_remote_ip))
then
endBoth rules use built-in functions to define conditions:
has_field()checks for the presence of specific message fields.cidr_match()evaluates whether an IP address falls within a defined subnet.
The to_ip() conversion ensures that the value is interpreted as an IP address rather than a string, demonstrating Security Data Lake’s strong type enforcement for rule validation.
These rules contain no actions (then is empty) because they are used only to control pipeline flow.
Conditions
A rule’s when clause is a Boolean expression evaluated against each message.
It supports logical operators AND (&&), OR (||), and NOT (!), as well as comparison operators such as <, <=, >, >=, ==, and !=.
For example:
has_field("src_ip") && cidr_match("10.0.0.0/8", to_ip($message.src_ip))If a condition references a function that does not exist, it automatically evaluates to false.
When comparing fields, ensure both are of the same type, for example:
to_string($message.src_ip) == to_string($message.dst_ip)
Actions
The then clause defines what happens when a condition evaluates to true.
Actions can be:
Function calls, e.g.
set_field("type", "firewall_log");Variable assignments, e.g.
let subnet = to_string($message.network);
Variables allow you to store and reuse temporary values, avoid recomputation, and make rules more readable.
Reserved Words
Certain tokens in the rule language are reserved and cannot be used as variable names, including:
AllEitherPassAndOrNotPipelineRuleDuringStageWhenThenEndLetMatch
For example:
let match = regex(a,b);
will fail because match is a reserved word.
Data types
Security Data Lake enforces type safety in pipeline rules to prevent invalid operations. The following built-in data types are supported:
Data type | Description |
|---|---|
string | UTF-8 text value |
double | Floating-point number (Java Double) |
long | Integer number (Java Long) |
boolean | True or false value |
void | Function with no return value |
ip | IP address (subset of InetAddress) |
Plugins may define additional types. Conversion functions prefixed with to_ (e.g., to_string()
, to_ip(), to_long()) ensure proper type handling. For a full list of functions, refer to Functions reference.
Note
Always convert message fields to the correct type before using them in comparisons or functions. For example:
set_field("timestamp", to_string(`$message.@extracted_timestamp`));Build pipeline rules
Pipelines are defined by rules that determine how messages are processed as they pass through Graylog. Each rule specifies conditions and actions that allow you to filter, enrich, transform, or route log data based on specific criteria.
To set up a rule, you define its logic using the rule builder or the source code editor, combining “when” conditions with “then” actions to describe exactly how messages should be handled. Once rules are created, they can be added to pipelines, organized into stages, and connected to streams—enabling flexible, automated control over message processing from ingestion to storage.
This article outlines the processes involved in creating and managig pipeline rules.
Configure the Message Processor
Before you start building pipeline rules, make sure that the message processor is enabled and correctly configured:
G to System > Configurations.
Select Message Processors.
Select Edit configurations and enable the Pipeline Processorby selecting the checkbox next to it.
Drag the Pipeline Processor so that it comes after Message Filter Chain. Use the six dots to the left to drag.
Click Update configuration.
Create and manage rules
Rules can be built with either the Rule Builder or the Source Code Editor.
The Rule Builder (default view) offers a guided, visual way to create rules.
To switch to manual editing, select Use Source Code Editor from the creation menu.
Warning
You can convert a rule from the Rule Builder to the Source Code Editor, but not back again.
Create a rule using the Rule Builder
The Rule Builder provides a visual, structured method for writing rules directly in the Graylog interface.
Each rule follows a simple when → then pattern:
When defines the condition that triggers the rule.
Then defines the action to take when that condition is met.
Both boxes feature searchable drop-downs. Typing a few letters of a function name displays suggestions and short descriptions. Refer to the Functions documentation for a complete list.
To build a rule using Rule Builder, follow these steps:
Go to System > Pipelines > Manage Rules.
Select Create Rule.
Create a when statement.
(Optional) You can add additional statements, and combine them with and or or operators, which you can select from the upper right section of the When section.
Create a then statement specifying the action.
If a then statement produces a value, the output variables appear automatically and can be reused in later statements.
Note
Rules can be tested immediately in the Rule Simulation module.
Create a rule with the Source Code Editor
You can also write rules manually using when and then statements in the Source Code Editor. This view supports full syntax editing and includes a quick-reference list of functions with their descriptions.
To create a rule with the Source Code Editor, follow these rules:
Go to System > Pipelines > Manage Rules.
Select Create Rule.
Select Use Source Code Editor from the upper right side of the page.
Configure the rule.
Note
Refer to Pipeline Rule Logic for syntax details.
Select Create rule.
As with the Rule Builder, you can validate your rule in the Rule Simulation module before saving.
Simulate a pipeline rule
Simulation lets you test a rule before deployment. You can simulate a full message or just a single field. Enter a raw message string, key-value pair, or JSON payload in the simulation box.
The simulator shows assigned output variables and processed results step by step.
Tip
The last used message is saved with each rule, so it’s always available for simulation.
To run a simulation, follow these steps:
Go to System > Pipelines > Simulator.
Select Run Rule Simulation.
Enter a sample message.
Review the processed output.
Reset or adjust the rule and run again if needed.
Managing pipelines
After creating rules, you can combine them into pipelines that process and enrich messages. Go to System > Pipelines > Manage Pipelines to create, edit, or delete pipelines.
Each pipeline contains one or more stages that define execution order and logic.
Creating a pipeline
To create a new pipeline, follow these steps:
Go to System > Pipelines > Manage Pipelines.
Select the Add new pipeline button on the upper right side of the screen.
Enter a descriptive name and description for the pipeline and select ???
Select Edit connections under the Pipeline connections sections.
The Edit connections window is displayed.
Under the Streams field, select the streams you want to attach.
Note
Pipelines act only on messages in the streams they are connected to. Multiple pipelines can process the same stream; their rules run according to stage priority.
Tip
The All messages stream is the default entry point for all incoming data and a good place for general pipelines handling routing, filtering, or field enrichment.
Once selected, they will be added to a list below the menu. You can select Remove to remove them from the list.
Select Add a new stage and configure the stage:
Under Stage, enter the stage priority, which determines at what point in the sequence the pipeline will be applied. This number can be any integer, and lower numbers will be ran first.
Select how to continue processing rules on subsequent stages:
All rules in this stage match the message - Continue to the next stage only if all conditions are met.
At least one of the rules in this stage matches the message - Continue to the next stage if any of the conditions are met
None or more rules on this stage match - Continue to the next stage only if none of conditions are met.
Under Stage rules, select the rules to apply.
Select Add stage to save the information.
If required, add additional stages.
Note
Each stage you create adds a new section under the Pipeline menu. Select Edit to modify stage details or Delete to remove the stage.
With all stages added, the pipeline is now complete and will appear on the Pipelines page. Once connected to a stream, it will automatically start processing incoming messages according to the rules and logic you defined.
Stream testing and pipeline simulation
Use the Pipeline Simulator to preview how messages are processed through current pipeline settings. To test a stream, follow these steps:
Go to System > Pipelines >Simulator.
Under Stream, select the stream you want to test.
Under Raw message, provide a raw sample message in the same format as incoming logs (for example, a GELF message).
(Optional) Specify a source IP, input type, and codec (the parsing mechanism for log messages).
After execution, the simulator displays:
Changes Summary – Lists modified, added, or removed fields.
Results Preview – Shows the fully processed message.
Simulation Trace – Details which rules and pipelines executed and how long each took.
Editing and pipeline stages
All pipelines are displayed under the System > Pipelines > Manage Pipelines page. For each pipeline, you can select Delete to remove the pipeline, or Edit to modify its configuration.
Use cases
This article presents practical use cases for creating and applying pipeline rules in Graylog. These examples demonstrate how to filter unwanted logs, enrich message data, and route messages to specific streams or alerting systems. Use these scenarios as a reference to design and implement efficient pipeline rules that optimize how your log data is processed and analyzed.
Rule | When example | Then example | Rule syntax |
|---|---|---|---|
Anonymization Pipeline rules can redact or remove sensitive data before messages are stored or forwarded. This ensures compliance with privacy standards by masking information such as IP addresses, usernames, or personal identifiers. | Check whether the | Remove the | rule "Mask sensitive information"
when
has_field("source_ip")
then
remove_single_field("source_ip");
end
|
Breadcrumb Breadcrumb rules add metadata to messages so their flow can be traced across systems or stages. These rules are often used for debugging, tagging, or tracking message processing. | Leave blank. This will ensure the rule applies to every incoming message. | Add or update the field | rule "Set demo field"
when
true
then
set_field("rule_demo", "test");
end
|
Filter Filter rules help you drop unnecessary messages, reducing data ingestion volume and license usage. | Check if the | Drop the message entirely to prevent it from being stored or processed further. | rule "Drop Test Messages"
when
has_field("testing")
then
drop_message();
end
|
Modification Modification rules change message content, for example, by reformatting timestamps or updating field values. | Check if the message contains an | Convert the timestamp from UTC to UK time and write it to a new field called | rule "convert event_time to UK timezone"
when
has_field("event_time")
then
let event_time_date = parse_date(
value: to_string($message.event_time),
pattern: "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'", // Adjust this pattern as needed
timezone: "UTC"
);
let event_time_uk = format_date(
value: event_time_date,
date_format: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
timezone: "Europe/London"
);
set_field("event_time_uk", event_time_uk
);
end
|
Enrichment Enrichment rules enhance messages by adding additional context or updating existing data values. | Check whether the field | Update the field’s value to | rule "SrcCountryUnitedStates"
when
has_field("Src_ip_geo_country") &&
to_string($message.Src_ip_geo_country) == "US"
then
set_field(
field: "Src_ip_geo_country",
value: "UniSt",
clean_field: false
);
end |
Routing Routing rules send specific messages to another stream and optionally remove them from the current stream. TipCreate the destination stream in advance before referencing it in your rule. | Check if the | Route the message to the target stream ( | rule "Route Message to Stream"
when
has_field("gl2_remote_ip") &&
to_string($message.gl2_remote_ip) == "66914166ac1d1568bad817f3"
then
route_to_stream(
name: "My First Stream",
remove_from_default: true
);
end |
Pipeline functions
Functions are the building blocks of pipeline rules. Each function is a predefined method that performs a specific action on a log message as it moves through Security Data Lake’s processing pipeline.
Functions can take one or more parameters and return outputs that determine how a message is transformed, enriched, or routed. By combining functions in conditions and actions, you can define powerful processing logic that tailors message handling to your organization’s needs.
For a full list of all supported Security Data Lake functions, their descriptions, and sample syntax, see Functions Reference.
Syntax
Pipeline functions in Graylog are implemented in Java and designed to be pluggable, allowing you to easily extend the platform’s processing capabilities.
Conceptually, a function receives parameters—such as the current message context—and returns a value. The parameter and return data types define where the function can be used within a rule. Graylog automatically validates these types to ensure that all rules are logically and syntactically sound.
Function parameters can be passed either as named key-value pairs or in positional order, provided that any optional parameters are declared and processed last.
Java Data Types
Pipeline rules can use certain Java data types when building queries or performing calculations. This is limited to those types that are queried using the GET function.
For example, you can use the .millis property of DateTime and Period objects to retrieve time values in milliseconds.
This allows you to perform precise time-based calculations, such as measuring how old a message is relative to the current time.
rule "time diff calculator millis"
when
true
then
let time_diff =
to_long(
parse_date(
value: to_string(now(timezone: "Europe/Berlin")),
pattern: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
locale: "de_DE"
).millis
)
-
to_long(
parse_date(
value: to_string($message.timestamp),
pattern: "yyyy-MM-dd'T'HH:mm:ss.SSSZ",
locale: "de_DE"
).millis
);
set_field("scan_age_millis", time_diff);
end
In this example, the rule calculates the difference between the current time (in the “Europe/Berlin” timezone) and the message’s timestamp, converting both values to milliseconds.
The resulting value (scan_age_millis) represents the age of the event in milliseconds and is stored as a new field in the message.
Warning
Security Data Lake does not support the use of any functions that are not officially documented. Exercise caution if you choose to test any unsupported function data types.
Function Types
Built-in Graylog functions can be categorized by the following function types. For a full list of all functions and their descriptions, see Functions Reference.
Anonymization
Anonymization functions obfuscate sensitive data from a dataset or log message.
Asset Enrichment
Asset Enrichment functions enhance, retrieve, or remove asset-related log data. See Asset Enrichment for more information on this Graylog Security feature.
Boolean
Boolean data is primarily associated with conditional statements, which allow different actions by changing control flow depending on whether a condition evaluates to true or false. Boolean functions determine Boolean values or operators.
Conversion
Conversion functions are used to convert a value from one format to another.
Date/Time
Date/time functions perform an action or calculation on a date and time value.
Debug
Debug functions are used to determine the state of your program at any point of execution.
Encoding
Encoding functions enable you to decode and convert strings.
List
List functions create or retrieve a collection that can be manipulated for your analysis.
Lookup
Lookup functions enable you to search a database for a value then return additional information from the same record.
Map
Map functions apply a given action to each or all elements in a collection.
Message Handling
Message Handling functions define what is to be done in response to a message. They are used for various enrichment, removal, retrieval, and routing operations for log data when building pipeline rules.
Pattern Matching
Pattern matching functions specify patterns to which some data should conform and deconstruct the data according to those patterns.
String
String functions are used to manipulate a string or query information about a string.
Watchlist
Watchlist functions perform actions that allow you to retrieve or modify watchlists.
Functions reference
The following list describes the built-in functions that ship with Graylog.
Function | Category | Description | Syntax |
|---|---|---|---|
abbreviate | String | Abbreviates a string using ellipses. The width defines the maximum length of the resulting string. |
|
abusech_ransom_lookup_domain | String | Matches a domain name against the abuse.ch Ransomware Domain Blocklist |
|
abusech_ransom_lookup_ip | String | Matches a IPv4 or IPv6 address against the abuse.ch Ransomware Domain Blocklist |
|
add_asset_categories | Asset Enrichment | Adds a list of categories to an asset. |
|
anonymize_ip | Anonymization | Anonymizes an IP address by setting the last octet to |
|
array_contains | Message Handling | Checks if the specified element is contained in the array. See example |
|
array_remove | Message Handling | Removes the specified element from the array. See example |
|
base16_decode | String | Provides base16 decoding of the string that returns lower-case letters. It requires regular hexadecimals, 0-9 A-F. |
|
base16_encode | String | Provides standard case-insensitive hexadecimal encoding using a 16-character subset. It requires regular hexadecimals, 0-9 A-F. |
|
base32_decode | String | Decodes a string using a 32-character subset. Uses the "numerical" base 32 and is extended from the traditional hexadecimal alphabet, 0-9 A-V. |
|
base32_encode | String | Encodes a string using a 32-character subset. Uses the "numerical" base 32 and is extended from the traditional hexadecimal alphabet, 0-9 A-V. |
|
base32human_decode | String | Decodes a string in human-readable format using a 32-character subset. It is a "readable" base 32, so there is no possibility of confusing 0/O or 1/I, A-Z 2-7. |
|
base32human_encode | String | Encodes a string in human-readable format using a 32-character subset. It is a "readable" base 32, so there is no possibility of confusing 0/O or 1/I, A-Z 2-7. |
|
base64_decode | String | Decodes a string using a 64-character subset. Regular base64 allows both upper and lowercase letters. It does not need to be human readable. |
|
base64_encode | String | Decodes a string using a 64-character subset. Regular base64 allows both upper and lowercase letters. It does not need to be human readable. |
|
base64url_decode | String | Provides URL-safe decoding of a string using a 64-character subset. It is safe to use as file names or to pass in URLs without escaping. |
|
base64url_encode | String | Provides URL-safe encoding of the string using a 64-character subset. It is safe to use as file names or to pass in URLs without escaping. |
|
capitalize | String | Capitalizes a string, changing the first letter to title case. |
|
cidr_match | Boolean/Message Function | Checks whether the given IP address object matches the cidr pattern. See also: to_ip |
|
clone_message | Message Handling | Clones a message. If |
|
concat | String | Returns a new string combining the text of See example |
|
contains | String | Checks if a string contains another string. It ignores the case. See example |
|
crc32 | String Function/Encoding | Returns the hex-encoded CRC32 digest of the given string. |
|
crc32c | String Function/Encoding | Returns the hex-encoded CRC32C (RFC 3720, Section 12.1) digest of the given string. |
|
create_message | Message Handling | Creates a new message from the given parameters. If any of these parameters is omitted, their value is taken from the corresponding fields of the currently processed message. If |
|
csv_to_map | Conversion | Converts a single line of a CSV string into a map usable by See also: set_fields |
|
days | Date/Time | Creates a time period with |
|
debug | Debug | Prints the passed value as a string in the Graylog log. Note that the debug message will only appear in the log of the Graylog node processing the message you are trying to debug. See example |
|
drop_message | Message Handling | Removes the given See example |
|
ends_with | String | Checks if See example |
|
expand_syslog_priority | Conversion | Converts a syslog priority number to its level and facility. |
|
expand_syslog_priority_as_string | Conversion | Converts the syslog priority number in value to its severity and facility string representations. |
|
first_non_null | List | Returns first element found in the specified list that is not |
|
flatten_json | String | Parses the
|
|
flex_parse_date | Date/Time | Uses the Natty date parser to parse a date and time See also: is_date |
|
format_date | Date/Time | Returns the given date and time |
|
from_forwarder_input | Message Handling | Checks whether the currently processed message was received on the given forwarder input. The input can be looked up by either specifying its |
|
from_input | Message Handling | Checks whether the currently processed message was received on the given (non-forwarder) input. The input can be looked up by either specifying its |
|
get_field | Message Handling | Retrieves the |
|
grok | Pattern Matching | Applies the grok pattern See also: set_fields |
|
grok_exists | Boolean | Checks if the given Grok pattern exists. |
|
has_field | Boolean/Message Function | Checks whether the given |
|
hours | Date/Time | Creates a time period with |
|
in_private_net | Message Handling | Checks if an IP address is in a private network as defined in RFC 1918 (10.0.0.0/8, 172.16.0.0/12, 192.168.0.0/16) or RFC 4193 (fc00::/7). |
|
is_bool | Boolean | Checks whether the given |
|
is_collection | Boolean | Checks whether the given |
|
is_date | Boolean | Checks whether the given See also: now, parse_date, flex_parse_date, parse_unix_milliseconds |
|
is_double | Boolean | Checks whether the given See also: to_double |
|
is_ip | Boolean | Checks whether the given See also: to_ip |
|
is_json | Boolean | Checks whether the given See also: parse_json |
|
is_list | Boolean | Checks whether a |
|
is_long | Boolean | Checks whether a See also: to_long |
|
is_map | Boolean | Checks whether the given See also: to_map |
|
is_not_null | Boolean | Checks whether a See example |
|
is_null | Boolean | Checks whether a See example |
|
is_number | Boolean | Checks whether the given |
|
is_period | Boolean | Checks whether the given See also: years, months, weeks, days, hours, minutes, seconds, millis, period |
|
is_string | Boolean | Checks whether a See also: to_string |
|
is_url | Boolean | Checks whether the given See also: to_url |
|
join | String | Joins the specified range of elements of the provided array into a single string. Start index defaults to |
|
key_value | Boolean | Extracts key-value pairs from the given
Also note the result of executing the See also: set_fields |
|
length | String | Counts the characters in a string. If bytes=true, it counts the number of bytes instead (assumes UTF-8 encoding). |
|
list_count | List | Gets number of elements in list. |
|
list_get | List | Gets a value from a list. |
|
lookup | Lookup | Looks up a multi value in the named lookup table. See example |
|
lookup_add_string_list | Lookup | Adds a string list in the named lookup table and returns the updated list on success or returns |
|
lookup_all | Lookup | Looks up all provided values in the named lookup table and returns all results as an array. See example |
|
lookup_assign_ttl | Lookup | Add a time to live to the key in the named lookup table. Returns the updated entry on success and |
|
lookup_clear_key | Lookup | Clears (removes) a key in the named lookup table. This function only supports the MongoDB Lookup Table at the time of writing. |
|
lookup_has_value | Lookup | Determines whether a given |
|
lookup_remove_string_list | Lookup | Removes the entries of the given string list from the named lookup table. Returns the updated list on success and returns |
|
lookup_set_string_list | Lookup | Sets a string list in the named lookup table. Returns the new value on success and returns |
|
lookup_set_value | Lookup | Sets a single value in the named lookup table. Returns the new value on success and returns |
|
lookup_string_list | Lookup | Looks up a string list value in the named lookup table.This function only supports the MongoDB Lookup Table at the time of writing. |
|
lookup_string_list_contains | Boolean | Looks up |
|
lookup_value | Lookup | Looks up a single See example |
|
lowercase | String | Converts a |
|
machine_asset_lookup | Asset Enrichment | Looks up a single machine asset. If multiple assets match the input parameters, only one will be returned. |
|
machine_asset_update | Asset Enrichment | Updates the IP or MAC addresses for a machine asset. If multiple assets match the input parameters, only one will be selected. |
|
map_copy | Map | Retrieves a value from a map. |
|
map_get | Map | Copies a map to a new map. |
|
map_remove | Map | Removes a key from the map. |
|
map_set | Map | Sets a key in the map. |
|
md5 | String | Creates the hex-encoded MD5 digest of the |
|
metric_counter_inc | Debug | Counts specific metric criteria. The counter metric |
|
millis | Date/Time | Creates a time period with a |
|
minutes | Date/Time | Creates a time period with |
|
months | Date/Time | Creates a time period with |
|
multi_grok | Applies a list of Grok patterns to a string and returns the first match. See example |
| |
murmur3_128 | Encoding | Creates the hex-encoded MurmurHash3 (128-bit) digest of the |
|
murmur3_32 | Encoding | Creates the hex-encoded MurmurHash3 (32-bit) digest of the |
|
normalize_fields | Message Handling | Normalizes all field names by setting them to lowercase. |
|
now | Date/Time | Returns the current See also: is_date |
|
otx_lookup_domain | String | Looks up AlienVault OTX threat intelligence data for a domain name. Requires a configured lookup table named See example |
|
otx_lookup_ip | String | Looks up AlienVault OTX threat intelligence data for an IPv4 or IPv6 address. Requires a configured lookup table named See example |
|
parse_cef | String | Parses any CEF-formatted string into its fields. This is the CEF string (starting with |
|
parse_date | Date/Time | Parses a date string using the given date format. |
|
parse_json | String | Parses the See also: to_map |
|
parse_unix_milliseconds | Date/Time | Attempts to parse a UNIX millisecond timestamp (milliseconds since 1970-01-01T00:00:00.000Z) into a proper See also: is_date See example |
|
period | Date/Time | Parses an ISO 8601 time period from See also: is_period, years, months, weeks, days, hours, minutes, seconds, millis |
|
regex | Pattern Matching | Matches a string with a regular expression. Uses Java syntax. |
|
regex_replace | Pattern Matching | Matches the regular expression in pattern against value and replaces it, if matched, with See example |
|
remove_asset_categories | Asset Enrichment | Remove a list of categories from an asset. |
|
remove_field (legacyDeprecated) | Message Handling | Removes the given field with the name See instead: remove_single_field, remove_multiple_fields |
|
remove_from_stream | Message Handling | Removes the If you want to discard the message entirely, use the |
|
remove_multiple_fields | Message Handling | Removes fields matching a regular expression (regex) pattern and/or list of names, unless the field name is reserved. |
|
remove_single_field | Message Handling | Removes a single field from a message, unless the field name is reserved. |
|
rename_field | Message Handling | Modifies the field name |
|
replace | String | Replaces the first See example |
|
route_to_stream | Message Handling | Sets a stream assignment of the message to the given stream. Functions as 'copy' and does not remove the message from the current stream. If See example |
|
seconds | Date/Time | Create a time period with |
|
select_jsonpath | Map | Evaluates the given See also: is_json, parse_json |
|
set_associated_assets | Asset Enrichment | Adds associated asset information. |
|
set_field | Message Handling | Sets the given See also: set_fields |
|
set_fields | Message Handling | Sets all of the given name-value pairs in |
|
sha1 | Encoding | Creates the hex-encoded SHA1 digest of the |
|
sha256 | Encoding | Creates the hex-encoded SHA256 digest of the |
|
sha512 | Encoding | Creates the hex-encoded SHA512 digest of the |
|
spamhaus_lookup_ip | Lookup | Matches an IP address against the Spamhaus DROP and EDROP lists. |
|
split | String | Splits a string around matches of this pattern. Uses Java syntax. |
|
starts_with | String | Checks if See example |
|
string_array_add | String | Adds the specified string (or string array) See example |
|
string_entropy | String | Computes Shannon's entropy of the character distribution in the given string. |
|
substring | String | Returns a substring of See example |
|
swapcase | String | Swaps the case of a |
|
syslog_facility | Conversion | Converts the syslog facility number in |
|
syslog_level | Conversion | Converts the syslog severity number in |
|
threat_intel_lookup_domain | Lookup | Matches a domain name against all enabled threat intel sources, except OTX. |
|
threat_intel_lookup_ip | Lookup | Matches an IP address against all enabled threat intel sources, except OTX. |
|
to_bool | Conversion | Converts the single parameter to a Boolean value using its string value. |
|
to_date | Conversion | Converts See also: is_date |
|
to_double | Conversion | Converts the first parameter to a double floating point value. |
|
to_ip | Conversion | Converts the given See also: cidr_match |
|
to_long | Conversion | Converts the first parameter to a long integer value. |
|
to_map | Conversion | Converts the given map-like value to a valid map. The See also: set_fields, parse_json See example |
|
to_string | Conversion | Converts the first parameter to its string representation. |
|
to_url | Conversion | Converts the given |
|
tor_lookup | Lookup | Matches an IP address against known Tor exit nodes to identify connections from the Tor network. |
|
traffic_accounting_size | Message Handling | Calculates the size of the entire message, including all extra fields. This is also the value used to determine how much the message counts toward license usage. See example |
|
uncapitalize | String | Uncapitalizes a string, changing the first letter to lower case. |
|
uppercase | String | Converts a string to upper case. The locale (IETF BCP 47 language tag) defaults to |
|
urldecode | String | Decodes an application/x-www-form-urlencoded string using a specific encoding scheme. |
|
urlencode | String | Translates a string into application/x-www-form-urlencoded format using a specific encoding scheme. Valid charsets are, for example, |
|
user_asset_lookup | Asset Enrichment | Looks up a single user asset. If multiple assets match the input parameters, only one will be returned. |
|
watchlist_add | Watchlist | Adds a value to a watchlist referenced by type. Returns |
|
watchlist_contains | Watchlist | Looks up a value in the watchlist referenced by the type. Returns |
|
watchlist_remove | Watchlist | Removes a value from a watchlist referenced by type. Returns |
|
weeks | Date/Time | Creates a time period with |
|
whois_lookup_ip | Lookup | Retrieves WHOIS information for an IP address |
|
years | Date/Time | Creates a time period with |
|
Examples
Function | Example |
|---|---|
array_contains | rule "array_contains"
when
true
then
set_field("contains_number", array_contains([1, 2, 3, 4, 5], 1));
set_field("does_not_contain_number", array_contains([1, 2, 3, 4, 5], 7));
set_field("contains_string", array_contains(["test", "test2"], "test"));
set_field("contains_string_case_insensitive", array_contains(["test", "test2"], "TEST"));
set_field("contains_string_case_sensitive", array_contains(["test", "test2"], "TEST", true));
end |
array_remove | rule "array_remove"
when
true
then
set_field("remove_number", array_remove([1, 2, 3], 2));
set_field("remove_string", array_remove(["one", "two", "three"], "two"));
set_field("remove_missing", array_remove([1, 2, 3], 4));
set_field("remove_only_one", array_remove([1, 2, 2], 2));
set_field("remove_all", array_remove([1, 2, 2], 2, true));
end |
concat | let build_message_0 = concat(to_string($message.protocol), " connect from ");
let build_message_1 = concat(build_message_0, to_string($message.src_ip));
let build_message_2 = concat(build_message_1, " to ");
let build_message_3 = concat(build_message_2, to_string($message.dst_ip));
let build_message_4 = concat(build_message_3, " Port ");
let build_message_5 = concat(build_message_4, to_string($message.dst_port));
set_field("message", build_message_5); |
contains | contains(to_string($message.hostname), "example.org", true) |
debug | Dropped message from <source>"let debug_message = concat("Dropped message from ", to_string($message.source));debug(debug_message);` |
drop_message | rule "drop messages over 16383 characters"
when
has_field("message") AND
regex(pattern: "^.{16383,}$", value: to_string($message.message)).matches == true
then
drop_message();
// added debug message to be notified about the dropped message
debug( concat("dropped oversized message from ", to_string($message.source)));
end |
ends_with | Returns ends_with ( "Foobar Baz Quux" , "quux" , true ); Returns ends_with ( "Foobar Baz Quux" , "Baz" ); ` |
grok_exists | when
grok_exists("USERNAME")
then
let parsed = grok("%{USERNAME:username}", to_string($message.message));
set_field("parsed_username", parsed.username);
end |
hex_to_decimal_byte_list | hex_to_decimal_byte_list(value: "0x17B90004"); Returns: [23, 185, 0, 4] hex_to_decimal_byte_list(value: "0x117B90004"); Returns: [1, 23, 185, 0, 4] hex_to_decimal_byte_list(value: "17B90004"); Returns: [23, 185, 0, 4] hex_to_decimal_byte_list(value: "117B90004"); Returns: [1, 23, 185, 0, 4] hex_to_decimal_byte_list(value: "not_hex"); Returns: null |
is_not_null | is_null(src_addr) |
lookup | rule "dst_ip geoip lookup"
when
has_field("dst_ip")
then
let geo = lookup("geoip-lookup", to_string($message.dst_ip));
set_field("dst_ip_geolocation", geo["coordinates"]);
set_field("dst_ip_geo_country_code", geo["country"].iso_code);
set_field("dst_ip_geo_country_name", geo["country"].names.en);
set_field("dst_ip_geo_city_name", geo["city"].names.en);
end |
lookup_all | rule "function lookup all"
when
true
then
let values = lookup_all("lut_name", ["key1", "key2", "key3"]);
set_field("values", values);
end |
lookup_value | ("ip_lookup", to_string($message.src_addr)); |
multi_grok | when
true
then
set_fields(
fields: multi_grok(
patterns: [
"^ABC %{IPORHOST:msg_ip}: %{GREEDYDATA:abc_message}",
"^123 %{IPORHOST:msg_ip}: %{GREEDYDATA:123_message}",
"^ABC2 %{IPORHOST:abc_ip}: %{GREEDYDATA:abc_message}"
],
value: to_string($message.message),
only_named_captures: true
)
);
end |
otx_lookup_domain | rule "PARSE IP to DNS"
when
has_field("source_ip")
&& regex(
pattern: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
value: to_string($message.source_ip)
).matches == true
then
let rs = lookup_value("dns_lookups", to_string($message.source_ip));
set_field("source_ip_dns", to_string(rs));
end |
otx_lookup_ip | rule "PARSE source_ip - otx-api-ip"
when
// validate message has a source_ip field
has_field("source_ip")
// validate that soruce IP is IPv4 format
&& regex(
pattern: "^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$",
value: to_string($message.source_ip)
).matches == true
then
let rs = otx_lookup_ip(to_string($message.source_ip));
set_fields(rs);
end |
parse_unix_milliseconds | set_field ( "timestamp" , timestamp ); |
regex_replace | let username = regex_replace(".*user: (.*)", to_string($message.message), "$1"); |
replace | let new_field = replace(to_string($message.message), "oo", "u"); // "fu ruft uta" let new_field = replace(to_string($message.message), "oo", "u", 1); // "fu rooft oota" |
route_to_stream | route_to_stream(id: "512bad1a535b43bd6f3f5e86"); |
starts_with | Returns true: starts_with ( "Foobar Baz Quux" , "foo" , true ); Returns false: starts_with ( "Foobar Baz Quux" , "Quux" ); |
string_array_add | rule "string_array_add"
when
true
then
set_field("add_number_to_string_array_converted", string_array_add(["1", "2"], 3));
set_field("add_number_array_to_string_array_converted", string_array_add(["1", "2"], [3, 4]));
set_field("add_string", string_array_add(["one", "two"], "three"));
set_field("add_string_again", string_array_add(["one", "two"], "two"));
set_field("add_string_again_unique", string_array_add(["one", "two"], "two", true));
set_field("add_array_to_array", string_array_add(["one", "two"], ["three", "four"]));
end |
substring | = substring(to_string($message.message), 0, 20); |
to_map | let json = parse_json(to_string($message.json_payload)); let map = to_map(json); set_fields(map); |
traffic_accounting_size | set_field(
field: "license_usage",
value: traffic_accounting_size() // size in bytes
//value: traffic_accounting_size() / 1024 // size in kb
); |
Log enrichment
Lookup Tables
Lookup tables allow you to map, translate, or enrich log data by replacing message field values with new ones, or by creating entirely new message fields. For example, you can use a static CSV file to map IP addresses to hostnames or use an external data source to enrich messages with threat intelligence, geolocation, or asset information.
This feature makes it possible to enhance raw log data with context from internal systems or third-party integrations, transforming it into richer, actionable insights.
Components
The lookup table system consists of four components:
Data adapters are used to do the actual lookup for a value. They might read from a CSV file, connect to a database, or execute requests to receive the lookup result.
Data adapter implementations are pluggable and new ones can be added through plugins.
Warning
The CSV file adapter reads the entire contents of the file into HEAP memory. Ensure that you size the HEAP accordingly.
The caches are responsible for caching the lookup results to improve the lookup performance and/or to avoid overloading databases and APIs. They are separate entities to make it possible to reuse a cache implementation for different data adapters. That way, the data adapters do not have to care about caching and do not have to implement it on their own.
Cache implementations are pluggable and new ones can be added through plugins.
Tip
The CSV file adapter refreshes its contents within each check interval if the file was changed. If the cache was purged but the check interval has not elapsed, lookups might return expired values.
The lookup table component ties a data adapter instance and a cache instance together. It is needed to enable the usage of the lookup table in converters, pipeline functions, and decorators.
The lookup result is returned by a lookup table through the data adapter and can contain two types of data. A single value and a multi value .
The single value can be a string, number or boolean and will be used in converters, decorators and pipeline rules. In our CSV example to lookup host names for IP addresses, this would be the host name string.
A multi value is a map or dictionary-like data structure and can contain several different values. This is useful if the data adapter can provide multiple values for a key. A good example for this would be the geo-ip data adapter which does not only provide the latitude and longitude for an IP address, but also information about the city and country of the location. Currently, the multi value can only be used in a pipeline rule when using the lookup() pipeline function.
Example 1: Output for a CSV data adapter including a single value and a multi value.

Example 2: Output for the geo-ip data adapter including a single value and a multi value.

Setup
You can configure lookup tables in System > Lookup Tables window.
A lookup table requires at least one data adapter and one cache.
Create a Data Adapter:
Go to System > Lookup Tables > Data Adapters.
Select Create Adapter and select a data adapter type.
Complete the adapter configuration form, which includes built-in documentation for each type.
Create a Cache:
Go to System → Lookup Tables → Caches.
Click Create Cache and choose a cache type.
Complete the cache configuration form. Review the cache-specific documentation included in the form.
Note
Null results are cached unless you select Ignore empty results during configuration.
Create a Lookup Table:
Go to System > Lookup Tables.
Select Create Lookup Table.
Select your data adapter and cache instances, and optionally define a default value.
Note
The default value is used when a lookup does not return a result. If a key is not found in the lookup table, Security Data Lake automatically returns the defined default value.
Once created, the lookup table can be referenced in extractors, decorators, and pipeline rules.
Usage
Lookup tables can be applied in several areas of Security Data Lake to enrich and contextualize data:
Converters – Perform lookups on extracted values during message ingestion.
Decorators – Enrich messages at search time without modifying stored data.
Pipeline Rules – Apply logic dynamically with the
lookup()orlookup_value()functions.
Built-in Data Adapters
Security Data Lake ships with several ready-to-use data adapters. Each type has on-screen documentation in the Edit Data Adapter form.
Adapter | Description |
|---|---|
CSV File Adapter | Performs key/value lookups from a static CSV file. |
DNS Lookup Adapter | Performs hostname and IP resolution (A, AAAA, PTR, and TXT records). |
DSV File Adapter | Similar to CSV, but supports custom delimiters and configurable key/value columns. |
HTTPS JSONPath Adapter | Executes GET requests and extracts data using JSONPath expressions. |
Geo IP – MaxMind | Provides geolocation data for IP addresses using MaxMind databases. |
MongoDB
Security Data Lake adds support for MongoDB Data Adapters, which store lookup data directly in the Graylog configuration database. Entries can be added, updated, or deleted via the API, the GUI, or pipeline functions.
Managing MongoDB Data Adapters via API
Example curl request to add a key:
curl -u <token>:token \
-H 'X-Requested-By: cli' \
-H 'Accept: application/json' \
-H 'Content-Type: application/json' \
-X POST 'http://127.0.0.1:9000/api/plugins/org.graylog.plugins.lookup/lookup/adapters/mongodb/mongodb-data-name' \
--data-binary '{
"key": "myIP",
"values": ["12.34.42.99"],
"data_adapter_id": "5e578606cdda4779dd9f2611"
}'Note
Entries can also be managed directly from the Security Data Lake UI or modified dynamically via pipeline rules using lookup-related functions.
Tip
To add multiple values for a single key in the GUI, separate each value with a newline.
Geolocation
Security Data Lake lets you extract and visualize geolocation information from IP addresses in your logs.
This article provides you with step-by-step instructions on how to configure a geolocation processor and create a map using the extracted geolocation data.
Set Up the Processor
Security Data Lake ships with geolocation capabilities by default but additional configuration is still required. This section explains how to configure the functionality in detail.
Note
You must create an account to obtain a license key to download the MaxMind databases. More information is available on MaxMind’s blog post
Configure the Processor
You need to configure Graylog to start using the geolocation database to resolve IP addresses in your logs.
Navigate to System > Configurations.
Select Plugins > Geo-Location Processor, then click Edit configuration.
Select the Enable Geo-location processor check box.
Choose either MaxMind or IPInfo from the drop-down menu.
Enter the paths to both the city and ASN databases you use. You can also adjust the refresh interval.
Select Update configuration to save the configuration.
Illuminate and Geolocation
Geolocation configuration is available with Graylog Open. Illuminate is not required to use geolocation data.
If you want geolocation data with Illuminate content, you must ensure that the Illuminate Processor runs before the GeoIP Resolver in Message Processors Configuration. Note that this order should be the default.
To check the configuration in your environment:
Navigate to System > Configurations.
Select Message Processors, then confirm the order in the table.
If you need to change the order:
Select Edit configuration.
Use drag and drop to reorder the items in the list as required.
Select Update configuration.
Enforce Security Data Lake Schema Option
When you configure the geolocation processor, the Enforce default schema option is selected by default. If you disable schema enforcement, all IP fields that are not reserved IP addresses are processed and have the following fields added with the field name as a prefix:
_geolocation_country_code_city_name
An example of the generated fields for the source_ip field might read:
source_ip_city_name: Viennasource_ip_country_code: ATsource_ip_geolocation: 48.20849, 16.37208
If schema enforcement is enabled, only the following GIM schema fields that are not reserved IP addresses are processed:
destination_ipdestination_nat_ipevent_observer_iphost_ipnetwork_forwarded_ipsource_ipsource_nat_ip
An example of the generated fields for the source_ip field might read:
source_as_number: AS1853source_as_organization: ACONETsource_geo_city: Viennasource_geo_coordinates: 48.20849, 16.37208source_geo_country_iso: ATsource_geo_name: Vienna, ATsource_geo_region: Viennasource_go_timezome: Europe/Vienna
Storing Geolocation Database Files in AWS S3
A configuration option for Pull files from S3 bucket at the bottom of the configuration page lets you pull geolocation database files from AWS S3 buckets. Enabling this feature allows for an S3 bucket URL to be added to the path configuration values.

When enabled, a service runs every refresh interval and poll the files in the S3 buckets provided. If those files have been updated since the last poll, then the new files are pulled down onto each node. This service relies on the Default Credentials Provider for credentials to the S3 buckets and does not use any configuration values that may or may not be set in the Security Data Lake AWS Plugin configuration settings.
The geolocation database files retrieved from S3 are stored in the Security Data Lake data_dir directory under the geolocation subdirectory. To change where these files are downloaded, set geo_ip_processor_s3_download_location to the desired location on disk in your Security Data Lake server configuration file.
If the pull files from S3 bucket option is left disabled, all Security Data Lake nodes read the files from the path on disk and require manual updating to those files for updates.
Visualize Geolocations in a Map
Security Data Lake can display maps from geolocation stored in any field, as long as the geo-points are using the latitude,longitude format.
Display a Map in the Search Results Page
On any search result page, you can expand the field you want to use to draw a map in the search sidebar. Click the Create button (+) in the left sidebar and select aggregation under the Generic menu.
This generates an empty aggregation widget. Click Edit and enter your information. Select World Map as the Visualization Type. You then see a map with all the different points stored in that field.
You may click Update preview to take a peek at your map and make any changes before you click Update widget.

Note
Adding a metric affects the size of the dot on the map. If there is no metric defined, every dot has the same radius.
For additional fields used in Security Data Lake related to different sources of geo coordinates, view the Security Data Lake Schema.
Add a Map to a Dashboard
You can add the map visualization into any dashboards as you do with other widgets. When you display a map in the search result page:
Click the three dots in the upper right corner.
Select Export to Dashboard.
You can then rename, edit ,and save the new dashboard.
Data adapters
ThreatFox IOC Tracker Data Adapter
ThreatFox is a project from abuse.ch that tracks indicators of compromise (IOCs) associated with malware. The ThreatFox Data Adapter supports lookups by the following key types:
URL
Domain
IP:port
MD5 hash
SHA256 hash
When you create the data adapter, ThreatFox downloads and stores the data set in MongoDB. The Refresh Interval configuration parameter identifies when to fetch new sets.
Sample Lookup Data
A lookup for the file hash 923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4 may produce the following output:
{
"first_seen_utc": "2021-07-07T17:03:57.000+0000",
"ioc_id": "158365",
"ioc_value": "923fa80da84e45636a62f779913559a07420a1c6e21f093d87ddfe04bda683c4",
"ioc_type": "sha256_hash",
"threat_type": "payload",
"fk_malware": "win.agent_tesla",
"malware_alias": [
"AgenTesla",
"AgentTesla",
"Negasteal"
],
"malware_printable": "Agent Tesla",
"confidence_level": 50,
"reference": "https://twitter.com/RedBeardIOCs/status/1412819661419433988",
"tags": [
"agenttesla"
],
"anonymous": false,
"reporter": "Virus_Deck"
}
Configure the Data Adapter
TitleA short title for the data adapter.
DescriptionA description of the data adapter.
NameA unique name for the data adapter.
Custom Error TTLOptional custom TTL for caching erroneous results. The default value is 5 seconds.
Include IOCs Older Than 90 DaysOptional setting that includes IOCs older than 90 days. By default, Data Adapter's data does not include IOCs older than 90 days. To avoid false positives, handle IOCs older than 90 days carefully.
Refresh Interval- Determines how often to fetch new data. The minimum refresh interval is 3600 seconds (1 hour), because that is how often the source data updates.Case Insensitive Lookup- Allows the data adapter to perform case-insensitive lookups.
URLhaus Malware URL Data Adapter
URLhaus is a project from abuse.ch that maintains a database of malicious URLs used for malware distribution. When you create the data adapter, URLhaus downloads and stores the appropriate data set in MongoDB. Refresh Interval configuration identifies when to fetch new sets.
Sample Lookup Data
A lookup for the URL https://192.168.100.100:35564/Mozi.m might produce the following output:
{
"single_value": "malware_download",
"multi_value": {
"date_added": "2021-06-22T17:53:07.000+0000",
"url_status": "online",
"threat_type": "malware_download",
"tags": "elf,Mozi",
"url": "http://192.168.100.100:35564/Mozi.m",
"urlhaus_link": "https://urlhaus.abuse.ch/url/1234567/"
},
"string_list_value": null,
"has_error": false,
"ttl": 9223372036854776000
}
Configure the Data Adapter
TitleA short title for the data adapter.
DescriptionA description of the data adapter.
NameA unique name to refer to the data adapter.
Custom Error TTLOptional custom TTL for caching erroneous results. If no value is specified, the default is 5 seconds.
URLhaus Feed TypeDetermines which URLhaus feed the data adapter will use.
Online URLsis the smaller data set and includes only URLs that have been currently detected online.Recently Added URLsis the larger data set and includes all online and offline URLs added in the last 30 days.
Refresh Interval- Determines how often new data is fetched. The minimum refresh interval is 300 seconds (5 minutes) because that is how often the source data can be updated.Case Insensitive Lookup- allows the data adapter to perform case-insensitive lookups.