Skip to content

Introduce MongoEngine #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: refactoring-mongodb-constants
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 12 additions & 17 deletions dev_utils/mongo_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@
mongo_update_many,
mongo_update_one,
)
from modules.reporting.mongodb_constants import ANALYSIS_COLL, FILE_KEY, FILE_REF_KEY, FILES_COLL, ID_KEY, TASK_IDS_KEY

log = logging.getLogger(__name__)

FILES_COLL = "files"
FILE_KEY = "sha256"
TASK_IDS_KEY = "_task_ids"
FILE_REF_KEY = "file_ref"


def normalize_file(file_dict, task_id):
"""Pull out the detonation-independent attributes of the given file and
Expand Down Expand Up @@ -65,12 +61,12 @@ def normalize_file(file_dict, task_id):
except KeyError:
pass

new_dict["_id"] = key
new_dict[ID_KEY] = key
file_dict[FILE_REF_KEY] = key
return UpdateOne({"_id": key}, {"$set": new_dict, "$addToSet": {TASK_IDS_KEY: task_id}}, upsert=True, hint=[("_id", 1)])
return UpdateOne({ID_KEY: key}, {"$set": new_dict, "$addToSet": {TASK_IDS_KEY: task_id}}, upsert=True, hint=[(ID_KEY, 1)])


@mongo_hook((mongo_insert_one, mongo_update_one), "analysis")
@mongo_hook((mongo_insert_one, mongo_update_one), ANALYSIS_COLL)
def normalize_files(report):
"""Take the detonation-independent file data from various parts of
the report and extract them out to a separate collection, keeping a
Expand All @@ -88,7 +84,7 @@ def normalize_files(report):
return report


@mongo_hook(mongo_find, "analysis")
@mongo_hook(mongo_find, ANALYSIS_COLL)
def denormalize_files_from_reports(reports):
"""Pull the file info from the FILES_COLL collection in to associated parts of
the reports.
Expand Down Expand Up @@ -116,8 +112,8 @@ def denormalize_files_from_reports(reports):
while batch := tuple(itertools.islice(file_ref_iter, batch_size)):
# Reduce the size of the $in clause when there are large numbers of file refs by
# making multiple requests, passing batches of refs in.
for file_doc in mongo_find(FILES_COLL, {"_id": {"$in": batch}}, {TASK_IDS_KEY: 0}):
file_docs[file_doc.pop("_id")] = file_doc
for file_doc in mongo_find(FILES_COLL, {ID_KEY: {"$in": batch}}, {TASK_IDS_KEY: 0}):
file_docs[file_doc.pop(ID_KEY)] = file_doc

for file_dict in file_dicts:
if file_dict[FILE_REF_KEY] not in file_docs:
Expand All @@ -129,7 +125,7 @@ def denormalize_files_from_reports(reports):
return reports


@mongo_hook(mongo_find_one, "analysis")
@mongo_hook(mongo_find_one, ANALYSIS_COLL)
def denormalize_files(report):
"""Pull the file info from the FILES_COLL collection in to associated parts of
the report.
Expand All @@ -138,7 +134,7 @@ def denormalize_files(report):
return report


@mongo_hook(mongo_delete_data, "analysis")
@mongo_hook(mongo_delete_data, ANALYSIS_COLL)
def remove_task_references_from_files(task_ids):
"""Remove the given task_ids from the TASK_IDS_KEY field on "files"
documents that were referenced by those tasks that are being deleted.
Expand All @@ -158,11 +154,10 @@ def delete_unused_file_docs():
return mongo_delete_many(FILES_COLL, {TASK_IDS_KEY: {"$size": 0}})


NORMALIZED_FILE_FIELDS = ("target.file", "dropped", "CAPE.payloads", "procdump", "procmemory")


def collect_file_dicts(report) -> itertools.chain:
"""Return an iterable containing all of the candidates for files
"""Collect file dictionaries based on NORMALIZED_FILE_FIELDS.

Return an iterable containing all the candidates for files
from various parts of the report to be normalized.
"""
file_dicts = []
Expand Down
109 changes: 73 additions & 36 deletions dev_utils/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,44 +4,75 @@
import time
from typing import Callable, Sequence, Union

import mongoengine

from lib.cuckoo.common.config import Config
from modules.reporting.mongodb_constants import ANALYSIS_COLL, CALLS_COLL, DB_ALIAS, ID_KEY, INFO_ID_KEY

log = logging.getLogger(__name__)
logging.getLogger("pymongo").setLevel(logging.ERROR)
repconf = Config("reporting")

mdb = repconf.mongodb.get("db", "cuckoo")
mdb = None
conn = None


if repconf.mongodb.enabled:
from pymongo import MongoClient, version_tuple
try:
import pymongo
from pymongo.database import Database
from pymongo.errors import AutoReconnect, ConnectionFailure, OperationFailure, ServerSelectionTimeoutError

if version_tuple[0] < 4:
log.warning("You using old version of PyMongo, upgrade: poetry install")
MONGO_AVAILABLE = True

def connect_to_mongo() -> MongoClient:
def connect_to_mongo() -> pymongo.MongoClient:
"""Create the connection to MongoDB."""
global mdb
try:
return MongoClient(
log.info("Creating MongoClient connection.")
repconf = Config("reporting")
mdb = repconf.mongodb.get("db", "cuckoo")

mongoengine.register_connection(
db=mdb,
alias=DB_ALIAS,
host=repconf.mongodb.get("host", "127.0.0.1"),
port=repconf.mongodb.get("port", 27017),
username=repconf.mongodb.get("username"),
password=repconf.mongodb.get("password"),
authSource=repconf.mongodb.get("authsource", "cuckoo"),
authentication_source=repconf.mongodb.get("authsource", "cuckoo"),
uuidRepresentation="standard",
tlsCAFile=repconf.mongodb.get("tlscafile", None),
connect=False,
)

return mongoengine.get_db(alias=DB_ALIAS).client
except (ConnectionFailure, ServerSelectionTimeoutError):
log.error("Cannot connect to MongoDB")
except Exception as e:
log.warning("Unable to connect to MongoDB database: %s, %s", mdb, e)
log.warning("Unable to connect to MongoDB database, %s", e)

def init_mongo():
"""Initialize MongoDB connection."""
global conn
if conn is not None:
# Already initialized.
return
log.info("Initializing MongoDB connection.")
repconf = Config("reporting")

if repconf.mongodb.enabled:
if pymongo.version_tuple[0] < 4:
log.warning("You are using an old version of PyMongo, upgrade: poetry install")
conn = connect_to_mongo()

def get_results_db() -> Database:
"""Initialize mongodb, if needed. Return a handle to the database."""
if conn is None:
init_mongo()
return conn[mdb]

# code.interact(local=dict(locals(), **globals()))
# q = results_db.analysis.find({"info.id": 26}, {"memory": 1})
# https://pymongo.readthedocs.io/en/stable/changelog.html
except ImportError:
MONGO_AVAILABLE = False

conn = connect_to_mongo()
results_db = conn[mdb]

MAX_AUTO_RECONNECT_ATTEMPTS = 5

Expand Down Expand Up @@ -91,30 +122,30 @@ def inner(*args, **kwargs):

@graceful_auto_reconnect
def mongo_bulk_write(collection: str, requests, **kwargs):
return getattr(results_db, collection).bulk_write(requests, **kwargs)
return getattr(get_results_db(), collection).bulk_write(requests, **kwargs)


@graceful_auto_reconnect
def mongo_create_index(collection: str, index, background: bool = True, name: str = False):
if name:
getattr(results_db, collection).create_index(index, background=background, name=name)
getattr(get_results_db(), collection).create_index(index, background=background, name=name)
else:
getattr(results_db, collection).create_index(index, background=background)
getattr(get_results_db(), collection).create_index(index, background=background)


@graceful_auto_reconnect
def mongo_insert_one(collection: str, doc):
for hook in hooks[mongo_insert_one][collection]:
doc = hook(doc)
return getattr(results_db, collection).insert_one(doc)
return getattr(get_results_db(), collection).insert_one(doc)


@graceful_auto_reconnect
def mongo_find(collection: str, query, projection=False, sort=None, limit=None):
if sort is None:
sort = [("_id", -1)]
sort = [(ID_KEY, -1)]

find_by = functools.partial(getattr(results_db, collection).find, query, sort=sort)
find_by = functools.partial(getattr(get_results_db(), collection).find, query, sort=sort)
if projection:
find_by = functools.partial(find_by, projection=projection)
if limit:
Expand All @@ -130,11 +161,11 @@ def mongo_find(collection: str, query, projection=False, sort=None, limit=None):
@graceful_auto_reconnect
def mongo_find_one(collection: str, query, projection=False, sort=None):
if sort is None:
sort = [("_id", -1)]
sort = [(ID_KEY, -1)]
if projection:
result = getattr(results_db, collection).find_one(query, projection, sort=sort)
result = getattr(get_results_db(), collection).find_one(query, projection, sort=sort)
else:
result = getattr(results_db, collection).find_one(query, sort=sort)
result = getattr(get_results_db(), collection).find_one(query, sort=sort)
if result:
for hook in hooks[mongo_find_one][collection]:
result = hook(result)
Expand All @@ -143,46 +174,50 @@ def mongo_find_one(collection: str, query, projection=False, sort=None):

@graceful_auto_reconnect
def mongo_delete_one(collection: str, query):
return getattr(results_db, collection).delete_one(query)
return getattr(get_results_db(), collection).delete_one(query)


@graceful_auto_reconnect
def mongo_delete_many(collection: str, query):
return getattr(results_db, collection).delete_many(query)
return getattr(get_results_db(), collection).delete_many(query)


@graceful_auto_reconnect
def mongo_update_many(collection: str, query, update):
return getattr(results_db, collection).update_many(query, update)
return getattr(get_results_db(), collection).update_many(query, update)


@graceful_auto_reconnect
def mongo_update_one(collection: str, query, projection, bypass_document_validation: bool = False):
if query.get("$set", None):
for hook in hooks[mongo_find_one][collection]:
query["$set"] = hook(query["$set"])
return getattr(results_db, collection).update_one(query, projection, bypass_document_validation=bypass_document_validation)
return getattr(get_results_db(), collection).update_one(
query, projection, bypass_document_validation=bypass_document_validation
)


@graceful_auto_reconnect
def mongo_aggregate(collection: str, query):
return getattr(results_db, collection).aggregate(query)
return getattr(get_results_db(), collection).aggregate(query)


@graceful_auto_reconnect
def mongo_collection_names() -> list:
return results_db.list_collection_names()
return get_results_db().list_collection_names()


@graceful_auto_reconnect
def mongo_find_one_and_update(collection, query, update, projection=None):
if projection is None:
projection = {"_id": 1}
return getattr(results_db, collection).find_one_and_update(query, update, projection)
projection = {ID_KEY: 1}
return getattr(get_results_db(), collection).find_one_and_update(query, update, projection)


@graceful_auto_reconnect
def mongo_drop_database(database: str):
"""Drop the mongo database!"""
init_mongo()
conn.drop_database(database)


Expand All @@ -193,28 +228,30 @@ def mongo_delete_data(task_ids: Union[int, Sequence[int]]):

analyses_tmp = []
found_task_ids = []
tasks = mongo_find("analysis", {"info.id": {"$in": task_ids}}, {"behavior.processes.calls": 1, "info.id": 1})
tasks = mongo_find(ANALYSIS_COLL, {INFO_ID_KEY: {"$in": task_ids}}, {"behavior.processes.calls": 1, INFO_ID_KEY: 1})

for task in tasks or []:
for process in task.get("behavior", {}).get("processes", []):
if process.get("calls"):
mongo_delete_many("calls", {"_id": {"$in": process["calls"]}})
mongo_delete_many(CALLS_COLL, {ID_KEY: {"$in": process["calls"]}})
analyses_tmp.append(task["_id"])
task_id = task.get("info", {}).get("id", None)
if task_id is not None:
found_task_ids.append(task_id)

if analyses_tmp:
mongo_delete_many("analysis", {"_id": {"$in": analyses_tmp}})
mongo_delete_many(ANALYSIS_COLL, {ID_KEY: {"$in": analyses_tmp}})
if found_task_ids:
for hook in hooks[mongo_delete_data]["analysis"]:
for hook in hooks[mongo_delete_data][ANALYSIS_COLL]:
hook(found_task_ids)
except Exception as e:
log.error(e, exc_info=True)


def mongo_is_cluster():
"""Detect if we are connected to a MongoDB cluster."""
# This is only useful at the moment for clean to prevent destruction of cluster database
init_mongo()
try:
conn.admin.command("listShards")
return True
Expand Down
3 changes: 2 additions & 1 deletion lib/cuckoo/common/cape_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

if repconf.mongodb.enabled:
from dev_utils.mongodb import mongo_find_one
from modules.reporting.mongodb_constants import ANALYSIS_COLL, ID_KEY, INFO_ID_KEY

if repconf.elasticsearchdb.enabled:
from dev_utils.elasticsearchdb import elastic_handler, get_analysis_index
Expand Down Expand Up @@ -303,7 +304,7 @@ def static_config_lookup(file_path, sha256=False):

if repconf.mongodb.enabled:
document_dict = mongo_find_one(
"analysis", {"target.file.sha256": sha256}, {"CAPE.configs": 1, "info.id": 1, "_id": 0}, sort=[("_id", -1)]
ANALYSIS_COLL, {"target.file.sha256": sha256}, {"CAPE.configs": 1, INFO_ID_KEY: 1, ID_KEY: 0}, sort=[(ID_KEY, -1)]
)
elif repconf.elasticsearchdb.enabled:
document_dict = es.search(
Expand Down
Loading