Skip to content

[Flashpoint] Import CCM Alerts as Incidents #4025

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jun 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
e6f726b
[feat] add `import_ccm_alerts` config variable
Powlinett May 13, 2025
511419e
[feat] add get_compromised_credentials_sightings() in flashpoint client
Powlinett May 14, 2025
df212cb
[feat] add retry strategy to flashpoint client
Powlinett May 14, 2025
a0cc2b0
[feat] add compromised credentials models
Powlinett May 16, 2025
f97079f
[feat] move flashpoint client in its own module
Powlinett May 16, 2025
fab349d
[feat] add flashpoint_client unit tests
Powlinett May 21, 2025
1c548e2
[feat]convert CCM alert to STIX Incident and related observables
Powlinett May 22, 2025
67cfae0
[clean] clean up + add some typing
Powlinett May 22, 2025
45e948f
[feat] add import of CCM alert to FlashpointConnector class
Powlinett May 22, 2025
8899493
[lint] black + isort
Powlinett May 22, 2025
d8184df
Merge branch 'master' into feat/2933-flashpoint-add-ccm-alerts-feature
Powlinett May 23, 2025
b0d7b47
[feat] add `FLASHPOINT_FRESH_CCM_ALERTS_ONLY` env var
Powlinett May 27, 2025
c9e7feb
[feat] add filter to fetch only fresh CCM alerts
Powlinett May 27, 2025
bf3b908
[fix] fix undefined variable
Powlinett May 27, 2025
46d8903
[clean] clean up connector's state update
Powlinett May 27, 2025
ed162e9
[lint] fix / improve typing
Powlinett May 27, 2025
17a3bd2
[doc] update README
Powlinett May 27, 2025
556b867
[deps] require fixed versions of dependencies
Powlinett May 27, 2025
df50406
[fix] resolve TODO comment
Powlinett May 27, 2025
41a2d9f
[clean] re-use data samples for ConverterToStix unit tests
Powlinett Jun 2, 2025
f6840ef
[fix] rely on connector_state["last_run"] only for all import start d…
Powlinett Jun 3, 2025
56df6e3
[feat] wrap errors in FlashpointClientError
Powlinett Jun 3, 2025
dc9b4c9
[fix] handle client and converter errors during CCM Alerts import
Powlinett Jun 3, 2025
4f0f888
[test] update client unit tests
Powlinett Jun 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion external-import/flashpoint/.env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ FLASHPOINT_API_KEY=ChangeMe
FLASHPOINT_IMPORT_START_DATE=P30D
FLASHPOINT_IMPORT_REPORTS=true
FLASHPOINT_INDICATORS_IN_REPORTS=false
FLASHPOINT_GUESS_RELATIONSHIPS_FROM_REPORTS=false
FLASHPOINT_IMPORT_INDICATORS=true
FLASHPOINT_IMPORT_ALERTS=true
FLASHPOINT_ALERT_CREATE_RELATED_ENTITIES=false
FLASHPOINT_IMPORT_COMMUNITIES=false
FLASHPOINT_COMMUNITIES_QUERIES=cybersecurity,cyberattack
FLASHPOINT_GUESS_RELATIONSHIPS_FROM_REPORTS=false
FLASHPOINT_IMPORT_CCM_ALERTS=false
FLASHPOINT_FRESH_CCM_ALERTS_ONLY=true
159 changes: 127 additions & 32 deletions external-import/flashpoint/README.md

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion external-import/flashpoint/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ services:
- FLASHPOINT_IMPORT_START_DATE=P30D
- FLASHPOINT_IMPORT_REPORTS=true
- FLASHPOINT_INDICATORS_IN_REPORTS=false
- FLASHPOINT_GUESS_RELATIONSHIPS_FROM_REPORTS=false
- FLASHPOINT_IMPORT_INDICATORS=true
- FLASHPOINT_IMPORT_ALERTS=true
- FLASHPOINT_ALERT_CREATE_RELATED_ENTITIES=false
- FLASHPOINT_IMPORT_COMMUNITIES=false
- FLASHPOINT_COMMUNITIES_QUERIES=cybersecurity,cyberattack
- FLASHPOINT_GUESS_RELATIONSHIPS_FROM_REPORTS=false
- FLASHPOINT_IMPORT_CCM_ALERTS=false
- FLASHPOINT_FRESH_CCM_ALERTS_ONLY=true
restart: always
4 changes: 3 additions & 1 deletion external-import/flashpoint/src/config.yml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ flashpoint:
import_start_date: 'P30D'
import_reports: true
indicators_in_reports: false
guess_relationships_from_reports: false
import_indicators: true
import_alerts: true
alert_create_related_entities: false
import_communities: false
communities_queries: 'cybersecurity,cyberattack'
guess_relationships_from_reports: false
import_ccm_alerts: true
fresh_ccm_alerts_only: true
7 changes: 7 additions & 0 deletions external-import/flashpoint/src/flashpoint_client/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .client_api import FlashpointClient, FlashpointClientError

# Flatten imports
__all__ = [
"FlashpointClient",
"FlashpointClientError",
]
247 changes: 247 additions & 0 deletions external-import/flashpoint/src/flashpoint_client/client_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
import base64
from datetime import datetime, timedelta
from typing import Generator

import requests
from pydantic import ValidationError
from requests.adapters import HTTPAdapter, Retry

from .models import CompromisedCredentialSighting


class FlashpointClientError(Exception):
"""
Custom exception for Flashpoint client errors
"""


class FlashpointClient:

def __init__(
self,
api_base_url: str,
api_key: str,
retry: int = 3,
backoff: timedelta = timedelta(seconds=1),
):
"""
Initialize the client with necessary configurations
"""
self.api_base_url = api_base_url

# Define headers in session and update when needed
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer " + api_key,
}
self.session = requests.Session()
self.session.headers.update(headers)

retry_strategy = Retry(
total=retry,
backoff_factor=backoff.total_seconds(),
status_forcelist=[429, 500, 502, 503, 504],
raise_on_status=False, # do not raise MaxRetryError - let response.raise_for_status() raise exceptions
)
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount(self.api_base_url, adapter)

def get_communities_doc(self, doc_id):
"""
:param doc_id:
:return:
"""
url = self.api_base_url + "/sources/v2/communities/" + doc_id
params = {}
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()

def communities_search(self, query: str, start_date: datetime) -> list[dict]:
"""
:param query:
:param start_date:
:return:
"""
url = self.api_base_url + "/sources/v2/communities"
page = 0
body_params = {
"query": query,
"include": {
"date": {
"start": start_date.strftime(
"%Y-%m-%dT%H:%M:%SZ"
), # UTC as +00:00 offset leads to 400 Bad Request
"end": "",
}
},
"size": "1000",
"sort": {"date": "asc"},
"page": page,
}
results = []
has_more = True
while has_more:
response = self.session.post(url, json=body_params)
response.raise_for_status()
data = response.json()
results.extend(data.get("items"))
if len(results) < data.get("total").get("value"):
page += 1
else:
has_more = False
return results

def get_media_doc(self, doc_id):
"""
:param doc_id:
:return:
"""
url = self.api_base_url + "/sources/v2/media/" + doc_id
params = {}
response = self.session.get(url, params=params)
response.raise_for_status()
return response.json()

def get_media(self, media_id):
"""
:return:
"""
url = self.api_base_url + "/sources/v1/media"
params = {"cdn": False, "asset_id": media_id}
response = self.session.get(url, params=params)
response.raise_for_status()
return base64.b64encode(response.content), response.headers.get("Content-Type")

def get_alerts(self, start_date: datetime) -> list[dict]:
"""
:return:
"""
alerts = []
url = self.api_base_url + "/alert-management/v1/notifications"
params = {
"created_after": start_date.strftime(
"%Y-%m-%dT%H:%M:%SZ"
) # UTC as +00:00 offset leads to 400 Bad Request
}
has_more = True
while has_more:
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
if data.get("pagination").get("next"):
url = data.get("pagination").get("next")
else:
has_more = False
alerts.extend(data.get("items"))
return alerts

def get_reports(self, start_date: datetime) -> list[dict]:
"""
:return:
"""
url = self.api_base_url + "/finished-intelligence/v1/reports"
limit = 100
params = {
"since": start_date.strftime(
"%Y-%m-%dT%H:%M:%SZ"
), # UTC as +00:00 offset leads to 400 Bad Request
"limit": limit,
"skip": 0,
"sort": "updated_at:asc",
"embed": "asset",
}
has_more = True
reports = []
while has_more:
response = self.session.get(url, params=params)
response.raise_for_status()
response_json = response.json()
total = response_json.get("total")
reports.extend(response_json.get("data", []))
params["skip"] += limit
if len(reports) == total:
has_more = False
return reports

def get_misp_feed_manifest(self):
"""
:return:
"""
url = self.api_base_url + "/technical-intelligence/v1/misp-feed/manifest.json"
response = self.session.get(url)
response.raise_for_status()
data = response.json()
return data

def get_misp_event_file(self, filename):
"""
:return:
"""
url = self.api_base_url + "/technical-intelligence/v1/misp-feed/" + filename
response = self.session.get(url)
response.raise_for_status()
data = response.json()
return data

def get_compromised_credential_sightings(
self, since: datetime | None = None, fresh_only: bool = True
) -> Generator[CompromisedCredentialSighting, None, None]:
"""
Get Compromised Credentials Sightings from Flashpoint API.

:param since: The minimum date to search for Compromised Credentials Sightings.
:param fresh_only: If True (default), only return fresh sightings, otherwise return all sightings.
:return: Found Compromised Credentials Sightings

Doc: https://docs.flashpoint.io/flashpoint/reference/common-use-cases-2#retrieve-compromised-credential-sightings-for-the-last-24-hours-or-a-specified-time-interval
"""
url = self.api_base_url + "/sources/v1/noncommunities/search"

since_timestamp = int(datetime.timestamp(since)) if since else None
body = {
"query": (
"+basetypes:(credential-sighting)"
+ (f" +header_.indexed_at: [{since_timestamp} TO now]" if since else "")
+ (" +is_fresh:(true)" if fresh_only else "")
),
"sort": ["header_.indexed_at:asc"],
"size": 25,
"scroll": "2m",
}

try:
sightings_count = 0
while True:
response = self.session.post(url, json=body)
response.raise_for_status()
data: dict = response.json()

# /search endpoint returns total hits as an integer
total_hits: int = data["hits"]["total"]
# /scroll endpoint returns total hits as a dict {'relation': str, 'value': int}
if isinstance(total_hits, dict):
total_hits: int = data["hits"]["total"]["value"] # type: ignore[no-redef]

results: list[dict] = data["hits"]["hits"]
for result in results:
try:
sighting = CompromisedCredentialSighting.model_validate(
result["_source"]
)
yield sighting
except ValidationError as err:
raise FlashpointClientError(
"Invalid Compromised Credential Sighting data"
) from err

sightings_count += len(results)
if sightings_count == total_hits:
break

url = self.api_base_url + "/sources/v1/noncommunities/scroll?scroll=2m"
body = {"scroll_id": data["_scroll_id"]}
except requests.HTTPError as err:
raise FlashpointClientError(
"Failed to fetch Compromised Credential Sightings"
) from err
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .compromised_credential_sighting import CompromisedCredentialSighting

# Flatten imports

__all__ = [
"CompromisedCredentialSighting",
]
Loading