Skip to content

Add some support for CEF-Style logging for A10 Thunder appliances. (#19 / #17) #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 95 additions & 2 deletions cgn_ec_consumer/handlers/a10.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
# Session Creation: Tstamp AX_hostname NAT-TCP-C: fwd_src_ip:fwd_src_port<->fwd_dest_ip:fwd_dest_port, rev_src_ip:rev_src_port<-->rev_dest_ip:rev_dest_port
# Fixed-NAT: FIXED-NAT-PORTS 10.10.10.172->192.168.9.173:3000-4000

# CEF Formats:
# cgn0.thn2/cgn-demo CEF:0|A10|CFW|6.0.0-P1|CGN 100|Nat Port Allocated|5|proto=ICMP src=100.65.161.0 spt=48856 sourceTranslatedAddress=99.99.99.13 sourceTranslatedPort=48856 dst=8.8.8.8 dpt=0
# cgn0.thn2/cgn-demo CEF:0|A10|CFW|6.0.0-P1|CGN 101|Nat Port Freed|5|proto=ICMP src=100.65.150.5 spt=37892 sourceTranslatedAddress=99.99.99.8 sourceTranslatedPort=37892
# cgn0.thn2/cgn-demo CEF:0|A10|CFW|6.0.0-P1|CGN 106|Nat Port Batch Pool Allocated|5|proto=UDP src=100.65.140.73 sourceTranslatedAddress=99.99.99.12 sourceTranslatedPort=22016 cn3=22271 cn3Label=Last Nat Port
# cgn0.thn2/cgn-demo CEF:0|A10|CFW|6.0.0-P1|CGN 107|Nat Port Batch Pool Freed|5|proto=UDP src=100.65.140.73 sourceTranslatedAddress=99.99.99.12 sourceTranslatedPort=22016 cn3=22271 cn3Label=Last Nat Port

# Need to handle ASCII, COmpact and Binary logging formats
import regex as re
from typing import Any
Expand All @@ -11,6 +17,8 @@

from cgn_ec_consumer.handlers.generic import GenericSyslogHandler

import pycef

logger = get_logger("cgn-ec.handlers.a10_thunder")


Expand All @@ -19,14 +27,99 @@ class A10ThunderSyslogHandler(GenericSyslogHandler):

REGEX_PORT_MAPPING = r"NAT-(\w+)-(\w+): (\d+.\d+.\d+.\d+):(\d+)<-->(\d+.\d+.\d+.\d+):(\d+) to (\d+.\d+.\d+.\d+):(\d+)$"
REGEX_SESSION_MAPPING = r"NAT-(\w+)-(\w+): (\d+.\d+.\d+.\d+):(\d+)<->(\d+.\d+.\d+.\d+):(\d+),(\d+.\d+.\d+.\d+):(\d+)<-->(\d+.\d+.\d+.\d+):(\d+)$"
REGEX_PARSE_CEF = r"^((CEF:)?0|A10|CFW|.*)$"

PATTERNS = [
(re.compile(REGEX_SESSION_MAPPING), "parse_session_mapping"),
(re.compile(REGEX_PORT_MAPPING), "parse_port_mapping"),
(re.compile(REGEX_PARSE_CEF), "parse_cef_message"),
]

def parse_message(self, data: dict) -> dict:
syslog_message = data["message"]
host_ip = data["ip"]
host_name = data["host"]
timestamp = data["timestamp"]

for compiled_pattern, parse_func in self.PATTERNS:
has_match = compiled_pattern.search(syslog_message)
if not has_match:
continue

parse_method = getattr(self, parse_func)
event_data = has_match.groups()

result = parse_method(event_data, host_ip, host_name, timestamp)
return result

logger.debug(f"Could not find a valid regex pattern to parse syslog message: {data}")

def parse_cef_message(
self, data: tuple[str | Any], host_ip: str, host_name: str, timestamp: datetime
) -> dict:
cef = pycef.parse(f"CEF:{data[0]}")

vrf_name = host_name.split("/", 1)[1] or ""

__event_type__, __event__ = self._cef_event_map(cef['DeviceEventClassID'])

metric = None
if __event_type__ == NATEventEnum.PORT_MAPPING:
metric = {
"type": __event_type__,
"timestamp": timestamp,
"host": host_ip,
"event": __event__,
"vrf_id": 0,
"vrf_name": vrf_name,
"protocol": NATProtocolEnum.from_string(cef["proto"]),
"src_ip": cef["src"],
"src_port": cef["spt"],
"x_ip": cef["sourceTranslatedAddress"],
"x_port": int(cef["sourceTranslatedPort"]),
}
elif __event_type__ == NATEventEnum.PORT_BLOCK_MAPPING:
metric = {
"type": __event_type__,
"timestamp": timestamp,
"host": host_ip,
"event": __event__,
"vrf_id": 0,
"vrf_name": vrf_name,
"protocol": NATProtocolEnum.from_string(cef["proto"]),
"src_ip": cef["src"],
"x_ip": cef["sourceTranslatedAddress"],
"start_port": int(cef["sourceTranslatedPort"]),
"end_port": int(cef["Last Nat Port"]),
}

if metric is not None:
return metric

def _cef_event_map(
self, tag: str
) -> list:
match tag:
case "CGN 100":
__event_type__ = NATEventEnum.PORT_MAPPING
__event__ = NATEventTypeEnum.CREATED
case "CGN 101":
__event_type__ = NATEventEnum.PORT_MAPPING
__event__ = NATEventTypeEnum.DELETED
case "CGN 106":
__event_type__ = NATEventEnum.PORT_BLOCK_MAPPING
__event__ = NATEventTypeEnum.CREATED
case "CGN 107":
__event_type__ = NATEventEnum.PORT_BLOCK_MAPPING
__event__ = NATEventTypeEnum.DELETED
case _:
__event_type__ = None
__event__ = None

return [__event_type__, __event__]

def parse_port_mapping(
self, data: tuple[str | Any], host_ip: str, timestamp: datetime
self, data: tuple[str | Any], host_ip: str, host_name: str, timestamp: datetime
) -> dict:
__event_type__ = NATEventEnum.PORT_MAPPING
if len(data) != 8:
Expand All @@ -48,7 +141,7 @@ def parse_port_mapping(
return metric

def parse_session_mapping(
self, data: tuple[str | Any], host_ip: str, timestamp: datetime
self, data: tuple[str | Any], host_ip: str, host_name: str, timestamp: datetime
) -> dict:
__event_type__ = NATEventEnum.SESSION_MAPPING
if len(data) != 10:
Expand Down
Loading