Skip to content

Improving regex oring #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions sigma/backends/splunk/splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,53 @@ class SplunkDeferredORRegularExpression(DeferredTextQueryExpression):
}

def __init__(self, state, field, arg) -> None:
SplunkDeferredORRegularExpression.add_field(field)
index_suffix = SplunkDeferredORRegularExpression.get_index_suffix(field)
self.template = (
'rex field={field} "(?<{field}Match'
+ index_suffix
+ '>{value})"\n| eval {field}Condition'
+ index_suffix
+ "=if(isnotnull({field}Match"
+ index_suffix
+ '), "true", "false")'
self.add_field(field)
field_condition = self.get_field_condition(field)
field_match = self.get_field_match(field)
self.template = 'rex field={{field}} "(?<{field_match}>{{value}})"\n| eval {field_condition}=if(isnotnull({field_match}), "true", "false")'.format(
field_match=field_match, field_condition=field_condition
)
return super().__init__(state, field, arg)

@staticmethod
def clean_field(field):
# splunk does not allow dots in regex group, so we need to clean variables
return re.sub(".*\\.", "", field)

@classmethod
def add_field(cls, field):
cls.field_counts[field] = (
cls.field_counts.get(field, 0) + 1
) # increment the field count

@classmethod
def get_index_suffix(cls, field):

index_suffix = cls.field_counts.get(field, 0)
def get_field_suffix(cls, field):
index_suffix = cls.field_counts.get(field, "")
if index_suffix == 1:
# return nothing for the first field use
return ""
return str(index_suffix)
index_suffix = ""
return index_suffix

@classmethod
def construct_field_variable(cls, field, variable):
cleaned_field = cls.clean_field(field)
index_suffix = cls.get_field_suffix(field)
return f"{cleaned_field}{variable}{index_suffix}"

@classmethod
def get_field_match(cls, field):
return cls.construct_field_variable(field, "Match")

@classmethod
def get_field_condition(cls, field):
return cls.construct_field_variable(field, "Condition")

@classmethod
def reset(cls):
cls.field_counts = {}


class SplunkDeferredCIDRExpression(DeferredTextQueryExpression):
template = 'where {op}cidrmatch("{value}", {field})'
class SplunkDeferredFieldRefExpression(DeferredTextQueryExpression):
template = "where {op}'{field}'='{value}'"
operators = {
True: "NOT ",
False: "",
Expand Down Expand Up @@ -107,6 +119,7 @@ class SplunkBackend(TextQueryBackend):
)
group_expression: ClassVar[str] = "({expr})"

bool_values = {True: "true", False: "false"}
or_token: ClassVar[str] = "OR"
and_token: ClassVar[str] = " "
not_token: ClassVar[str] = "NOT"
Expand All @@ -125,7 +138,7 @@ class SplunkBackend(TextQueryBackend):
re_escape_char: ClassVar[str] = "\\"
re_escape: ClassVar[Tuple[str]] = ('"',)

cidr_expression: ClassVar[str] = "{value}"
cidr_expression: ClassVar[str] = '{field}="{value}"'

compare_op_expression: ClassVar[str] = "{field}{operator}{value}"
compare_operators: ClassVar[Dict[SigmaCompareExpression.CompareOperators, str]] = {
Expand All @@ -135,6 +148,7 @@ class SplunkBackend(TextQueryBackend):
SigmaCompareExpression.CompareOperators.GTE: ">=",
}

field_equals_field_expression: ClassVar[str] = "{field2}"
field_null_expression: ClassVar[str] = "NOT {field}=*"

convert_or_as_in: ClassVar[bool] = True
Expand Down Expand Up @@ -248,9 +262,7 @@ def convert_condition_field_eq_val_re(
).postprocess(None, cond)

cond_true = ConditionFieldEqualsValueExpression(
cond.field
+ "Condition"
+ str(SplunkDeferredORRegularExpression.get_index_suffix(cond.field)),
SplunkDeferredORRegularExpression.get_field_condition(cond.field),
SigmaString("true"),
)
# returning fieldX=true
Expand All @@ -259,19 +271,19 @@ def convert_condition_field_eq_val_re(
state, cond.field, super().convert_condition_field_eq_val_re(cond, state)
).postprocess(None, cond)

def convert_condition_field_eq_val_cidr(
def convert_condition_field_eq_field(
self,
cond: ConditionFieldEqualsValueExpression,
state: "sigma.conversion.state.ConversionState",
) -> SplunkDeferredCIDRExpression:
"""Defer CIDR network range matching to pipelined where cidrmatch command after main search expression."""
) -> SplunkDeferredFieldRefExpression:
"""Defer FieldRef matching to pipelined with `where` command after main search expression."""
if cond.parent_condition_chain_contains(ConditionOR):
raise SigmaFeatureNotSupportedByBackendError(
"ORing CIDR matching is not yet supported by Splunk backend",
"ORing FieldRef matching is not yet supported by Splunk backend",
source=cond.source,
)
return SplunkDeferredCIDRExpression(
state, cond.field, super().convert_condition_field_eq_val_cidr(cond, state)
return SplunkDeferredFieldRefExpression(
state, cond.field, super().convert_condition_field_eq_field(cond, state)
).postprocess(None, cond)

def finalize_query(
Expand Down Expand Up @@ -381,13 +393,11 @@ def finalize_query_data_model(
cim_fields = " ".join(
splunk_sysmon_process_creation_cim_mapping.values()
)

elif rule.logsource.category == "proxy":
data_model = "Web"
data_set = "Proxy"
cim_fields = " ".join(
splunk_web_proxy_cim_mapping.values()
)
cim_fields = " ".join(splunk_web_proxy_cim_mapping.values())

try:
data_model_set = state.processing_state["data_model_set"]
Expand Down
90 changes: 88 additions & 2 deletions tests/test_backend_splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import pytest
from sigma.backends.splunk import SplunkBackend
from sigma.collection import SigmaCollection
from sigma.processing.pipeline import ProcessingPipeline
from sigma.pipelines.splunk import splunk_cim_data_model


Expand Down Expand Up @@ -224,6 +225,43 @@ def test_splunk_regex_query_explicit_or(splunk_backend: SplunkBackend):
)


def test_splunk_regex_query_explicit_or_with_nested_fields():

pipeline = ProcessingPipeline.from_yaml(
"""
name: Test
priority: 100
transformations:
- id: field_mapping
type: field_name_mapping
mapping:
fieldA: Event.EventData.fieldA
fieldB: Event.EventData.fieldB
"""
)
splunk_backend = SplunkBackend(pipeline)

collection = SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel1:
fieldA|re: foo.*bar
sel2:
fieldB|re: boo.*foo
condition: sel1 or sel2
"""
)

assert splunk_backend.convert(collection) == [
'\n| rex field=Event.EventData.fieldA "(?<fieldAMatch>foo.*bar)"\n| eval fieldACondition=if(isnotnull(fieldAMatch), "true", "false")\n| rex field=Event.EventData.fieldB "(?<fieldBMatch>boo.*foo)"\n| eval fieldBCondition=if(isnotnull(fieldBMatch), "true", "false")\n| search fieldACondition="true" OR fieldBCondition="true"'
]


def test_splunk_single_regex_query(splunk_backend: SplunkBackend):
assert (
splunk_backend.convert(
Expand Down Expand Up @@ -264,12 +302,12 @@ def test_splunk_cidr_query(splunk_backend: SplunkBackend):
"""
)
)
== ['fieldB="foo" fieldC="bar"\n| where cidrmatch("192.168.0.0/16", fieldA)']
== ['fieldA="192.168.0.0/16" fieldB="foo" fieldC="bar"']
)


def test_splunk_cidr_or(splunk_backend: SplunkBackend):
with pytest.raises(SigmaFeatureNotSupportedByBackendError, match="ORing CIDR"):
assert (
splunk_backend.convert(
SigmaCollection.from_yaml(
"""
Expand All @@ -289,6 +327,54 @@ def test_splunk_cidr_or(splunk_backend: SplunkBackend):
"""
)
)
== ['fieldA="192.168.0.0/16" OR fieldA="10.0.0.0/8" fieldB="foo" fieldC="bar"']
)


def test_splunk_fieldref_query(splunk_backend: SplunkBackend):
assert (
splunk_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref: fieldD
fieldB: foo
fieldC: bar
condition: sel
"""
)
)
== ["fieldB=\"foo\" fieldC=\"bar\"\n| where 'fieldA'='fieldD'"]
)


def test_splunk_fieldref_or(splunk_backend: SplunkBackend):
with pytest.raises(SigmaFeatureNotSupportedByBackendError, match="ORing FieldRef"):
splunk_backend.convert(
SigmaCollection.from_yaml(
"""
title: Test
status: test
logsource:
category: test_category
product: test_product
detection:
sel:
fieldA|fieldref:
- fieldD
- fieldE
fieldB: foo
fieldC: bar
condition: sel
"""
)
)


def test_splunk_fields_output(splunk_backend: SplunkBackend):
Expand Down