|
| 1 | +from json import loads |
| 2 | + |
| 3 | +import panther_event_type_helpers as event_type |
| 4 | +from panther_aws_helpers import lookup_aws_account_name |
| 5 | +from panther_base_helpers import add_parse_delay |
| 6 | +from panther_ipinfo_helpers import PantherIPInfoException, geoinfo_from_ip |
| 7 | + |
| 8 | + |
| 9 | +def rule(event): |
| 10 | + # filter events on unified data model field |
| 11 | + return event.udm("event_type") == event_type.FAILED_LOGIN |
| 12 | + |
| 13 | + |
| 14 | +def title(event): |
| 15 | + # use unified data model field in title |
| 16 | + log_type = event.get("p_log_type") |
| 17 | + title_str = ( |
| 18 | + f"{log_type}: User [{event.udm('actor_user')}] has exceeded the failed logins threshold" |
| 19 | + ) |
| 20 | + if log_type == "AWS.CloudTrail": |
| 21 | + title_str += f" in [{lookup_aws_account_name(event.get('recipientAccountId'))}]" |
| 22 | + return title_str |
| 23 | + |
| 24 | + |
| 25 | +def alert_context(event): |
| 26 | + try: |
| 27 | + geoinfo = geoinfo_from_ip(event=event, match_field=event.udm_path("source_ip")) |
| 28 | + except PantherIPInfoException: |
| 29 | + geoinfo = {} |
| 30 | + if isinstance(geoinfo, str): |
| 31 | + geoinfo = loads(geoinfo) |
| 32 | + context = {} |
| 33 | + context["geolocation"] = ( |
| 34 | + f"{geoinfo.get('city')}, {geoinfo.get('region')} in " f"{geoinfo.get('country')}" |
| 35 | + ) |
| 36 | + context["ip"] = geoinfo.get("ip") |
| 37 | + context["reverse_lookup"] = geoinfo.get("hostname", "No reverse lookup hostname") |
| 38 | + context["ip_org"] = geoinfo.get("org", "No organization listed") |
| 39 | + try: |
| 40 | + context = add_parse_delay(event, context) |
| 41 | + except TypeError: |
| 42 | + pass |
| 43 | + except AttributeError: |
| 44 | + pass |
| 45 | + return context |
0 commit comments