Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions backend/serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ provider:
Resource: '*'
- Effect: Allow
Action:
- sqs:ListQueues
- sqs:ReceiveMessage
- sqs:DeleteMessage
- sqs:SendMessage
Expand Down
1 change: 1 addition & 0 deletions backend/src/xfd_django/xfd_api/schema_models/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class GenericMessageResponseModel(BaseModel):
cpu="1024",
memory="8192",
description="Fetch passive port, banner, and vulnerability data from shodan",
maxConcurrentTasks=10,
),
"shodan_sync": ScanSchema(
type="fargate",
Expand Down
14 changes: 12 additions & 2 deletions backend/src/xfd_django/xfd_api/tasks/ecs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def run_command(self, command_options):
"CENSYS_API_ID": os.getenv("CENSYS_API_ID"),
"CENSYS_API_SECRET": os.getenv("CENSYS_API_SECRET"),
"WORKER_USER_AGENT": os.getenv("WORKER_USER_AGENT"),
"SHODAN_API_KEY": os.getenv("SHODAN_API_KEY"),
"SHODAN_API_KEY": command_options["SHODAN_API_KEY"],
"SIXGILL_CLIENT_ID": os.getenv("SIXGILL_CLIENT_ID"),
"SIXGILL_CLIENT_SECRET": os.getenv("SIXGILL_CLIENT_SECRET"),
"PE_SHODAN_API_KEYS": os.getenv("PE_SHODAN_API_KEYS"),
Expand Down Expand Up @@ -152,7 +152,17 @@ def run_command(self, command_options):
if memory
else "",
},
],
]
+ (
[
{
"name": "SHODAN_API_KEY",
"value": command_options["SHODAN_API_KEY"],
}
]
if "SHODAN_API_KEY" in command_options
else []
),
}
],
},
Expand Down
23 changes: 23 additions & 0 deletions backend/src/xfd_django/xfd_api/tasks/helpers/get_ips.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
"""Get Ips per organization."""
# Third-Party Libraries
from xfd_mini_dl.models import Ip


def get_ips_by_cidr(organization_id):
"""
Retrieve a list of IPs associated with CIDRs owned by the specified organization.

Filters:
- The IP's origin_cidr must not be null.
- The CIDR must belong to the given organization via M2M.
- The IP must have Shodan results.
- The IP must be current.
"""
ip_qs = Ip.objects.filter(
origin_cidr__isnull=False,
origin_cidr__organizations__id=organization_id,
has_shodan_results=True,
current=True,
).values_list("ip", flat=True)

return list(ip_qs)
59 changes: 47 additions & 12 deletions backend/src/xfd_django/xfd_api/tasks/scanExecution.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,54 @@ def start_desired_tasks(
scan_type, desired_count, scan_id, organizations, is_pe=False, shodan_api_keys=[]
):
"""Start the desired number of tasks on AWS ECS or local Docker based on configuration."""
# Step 1: Get all Scan instances with this name
# Step 1: Get the scan instance
scans_with_name = Scan.objects.filter(name=scan_type)

# Step 2: Determine the max concurrentTasks among them
max_concurrent = max((scan.concurrentTasks for scan in scans_with_name), default=1)

# Step 3: Get all currently running concurrencyIndexes
# Step 3: Get all currently running concurrencyIndexes across all scans of this type
existing_indexes = list(
ScanTask.objects.filter(
scan__name=scan_type,
status__in=["created", "queued", "requested", "started"],
).values_list("concurrencyIndex", flat=True)
)

# Calculate available indexes to use for new tasks
available_indexes = sorted(
set(range(1, max_concurrent + 1)) - set(existing_indexes)
)
remaining_count = len(available_indexes)

# Step 4: Check how many tasks are already running for this specific scan
this_scan_running = ScanTask.objects.filter(
scan_id=scan_id,
status__in=["created", "queued", "requested", "started"],
).count()

# Step 5: Determine how many *this* scan is allowed to start
remaining_for_this_scan = desired_count - this_scan_running
if scan_type == "shodan" and len(shodan_api_keys) < remaining_for_this_scan:
print(
"Not enough Shodan API keys. Needed: {}, Provided: {}".format(
remaining_for_this_scan, len(shodan_api_keys)
)
)
return
if remaining_for_this_scan <= 0:
print(
"Scan {} already has {} tasks running (desired: {}). Not launching more.".format(
scan_id, this_scan_running, desired_count
)
)
return

# Step 6: Global cap applies too
remaining_count = min(len(available_indexes), remaining_for_this_scan)

if remaining_count == 0:
print(
"Max concurrency already reached for scan '{}': {}".format(
scan_type, max_concurrent
"No available concurrency slots for scan '{}'. Max: {}, Running: {}".format(
scan_type, max_concurrent, len(existing_indexes)
)
)
return
Expand All @@ -94,7 +118,12 @@ def start_desired_tasks(

while remaining_count > 0:
current_batch_count = min(remaining_count, batch_size)
shodan_api_key = shodan_api_keys[remaining_count - 1] if shodan_api_keys else ""
shodan_api_key = (
shodan_api_keys[available_indexes[0] - 1]
if available_indexes and len(shodan_api_keys) >= available_indexes[0]
else ""
)

if is_pe:
if os.getenv("IS_LOCAL"):
# Use local Docker environment (old method)
Expand Down Expand Up @@ -151,6 +180,8 @@ def start_desired_tasks(
"SERVICE_TYPE": scan_type,
"count": current_batch_count,
}
if scan_type == "shodan":
command_options["SHODAN_API_KEY"] = shodan_api_key

result = ecs.run_command(command_options)

Expand All @@ -160,10 +191,11 @@ def start_desired_tasks(
"Failed to start ECS task for scan {}".format(scan_type)
)

# After launching ECS tasks, assign concurrency indexes correctly
for i, task in enumerate(result["tasks"]):
for task in result["tasks"]:
task_arn = task["taskArn"]
index_to_use = available_indexes[i] # Use one of the available indexes
if not available_indexes:
raise Exception("Not enough available concurrency indexes")
index_to_use = available_indexes.pop(0) # Use and remove
create_scan_task(
scan_id,
scan_type,
Expand Down Expand Up @@ -232,7 +264,10 @@ def handler(event, context):
return {"statusCode": 400, "body": "Failed: no scanType provided."}

if scan_type == "shodan":
api_key_list = event.get("apiKeyList", "")
if is_pe:
api_key_list = event.get("apiKeyList", "")
else:
api_key_list = os.getenv("PE_SHODAN_API_KEYS", "")
shodan_api_keys = (
[key.strip() for key in api_key_list.split(",")] if api_key_list else []
)
Expand All @@ -250,7 +285,7 @@ def handler(event, context):
scan_id,
organizations,
is_pe=is_pe,
shodan_api_keys=[],
shodan_api_keys=shodan_api_keys,
)

else:
Expand Down
Loading
Loading