Skip to content

Add documentation to data access functions #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: development
Choose a base branch
from
68 changes: 58 additions & 10 deletions lib/aip/data/access.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,52 +36,94 @@
import pandas as pd
import numpy as np

from aip.data.functions import scramble, read_zeek, getrawdata, removerawdata
from joblib import Parallel, delayed
from os import scandir, path
from joblib import Parallel
from joblib import delayed
from os import scandir
from os import path
from pathlib import Path
from aip.data.functions import scramble
from aip.data.functions import read_zeek
from aip.data.functions import get_raw_data
from aip.data.functions import remove_raw_data

project_dir = Path(__file__).resolve().parents[3]

data_path = path.join(project_dir,'data') # Deprecated, do not use
data_dir = path.join(project_dir,'data')


def _get_honeypot_ips(for_date=None):
'''
Filter those honeypots active due date for_date, if there are operation dates in the honeypot file.
Filter active honeypots IPs due date for_date, if there are operation dates in the honeypot file.
The honeypots_public_ips.csv has the following format:
# List of IPs to look for to generate the attack files.
public_ip,operation_start_date,operation_end_date
'''
logger = logging.getLogger(__name__)

# Check if the file exists before attempting to read it
honeypot_public_ips = path.join(project_dir, 'data', 'external', 'honeypots_public_ips.csv')

# If the file does not exist raise an exception
if not path.exists(honeypot_public_ips):
logger.error(f"File 'honeypot_public_ips.csv' does not exist. Raising error.")
raise FileNotFoundError("Required file 'honeypots_public_ips.csv' does not exist.")
raise FileNotFoundError("_get_honeypot_ips() required file 'honeypots_public_ips.csv' does not exist.")

# Read CSV located in data/external/honeypots_public_ips.csv
honeypots = pd.read_csv(path.join(project_dir, 'data', 'external', 'honeypots_public_ips.csv'), comment='#')

if for_date is not None:
# Convert to datetime object
for_date = pd.to_datetime(for_date)

# Parsing start date
if 'operation_start_date' in honeypots.keys():
honeypots['operation_start_date'] = pd.to_datetime(honeypots['operation_start_date'])

# Parsing end date, filling emtpy values with date of 'today'
if 'operation_end_date' in honeypots.keys():
honeypots['operation_end_date'] = honeypots['operation_end_date'].fillna(dt.date.today())
honeypots['operation_end_date'] = pd.to_datetime(honeypots['operation_end_date'])

# Keep honeypots active on the specified date
if ('operation_start_date' in honeypots.keys()) and 'operation_end_date' in honeypots.keys():
honeypots = honeypots[(for_date >= honeypots['operation_start_date']) & (for_date <= honeypots['operation_end_date'])]

ips = honeypots.public_ip.values
return ips

def _process_zeek_files(zeek_files, date):

def _process_zeek_files(list_of_zeek_files, date):
"""
Process a list of Zeek log files to extract all connections
from honeypot IPs for a given date.
"""
# Retrieve the list of honeypot IPs
ips = _get_honeypot_ips()

# Initialises daily, a dataframe that will contain
# all the connections from the honeypots IPs found
# on the input zeek files
daily = pd.DataFrame()
for z in zeek_files:

# Process each zeek file in the input list
for zeek_file in list_of_zeek_files:
hourly = pd.DataFrame()
zeekdata = read_zeek(z)

# Read the zeek file into a dataframe
zeekdata = read_zeek(zeek_file)

# Find all traffic from IPs on the zeek traffic
for ip in ips:
hourly = pd.concat([hourly, zeekdata[zeekdata['id.resp_h'] == ip]])

# Store the hourly traffic on the daily dataframe
daily = pd.concat([daily, hourly])

# Return a DF with all the traffic seen from the honeypot IPs
# on the input Zeek files
return daily


def _process_argus_files(argus_files, date):
ips = _get_honeypot_ips()
daily = pd.DataFrame()
Expand All @@ -93,6 +135,7 @@ def _process_argus_files(argus_files, date):
daily = pd.concat([daily, hourly])
return daily


def _process_raw_files(date):
'''
Create a dataset for the date string date in the data/interim folder
Expand All @@ -102,7 +145,7 @@ def _process_raw_files(date):
# if data directory does not exist, execute the magic to get it
if path.isdir(path.join(project_dir,'data','raw', date)) == False:
logging.debug(f'Downloading data for {date}')
getrawdata(date)
get_raw_data(date)
# after this point, if directory does not exist, we can skip it.
try:
zeek_files = [x.path for x in scandir(path.join(project_dir,'data','raw', date)) if x.name.startswith('conn.')]
Expand All @@ -119,6 +162,7 @@ def _process_raw_files(date):
#removerawdata(date)
return


def _extract_attacks(date):
'''
Create a dataset for the date string date in the data/interim folder
Expand Down Expand Up @@ -153,6 +197,7 @@ def _extract_attacks(date):
#removerawdata(date)
return


def process_zeek_files(dates=None):
"""
Creates the dataset or part of it
Expand All @@ -170,6 +215,7 @@ def process_zeek_files(dates=None):
Parallel(n_jobs=12, backend='multiprocessing')(delayed(_process_raw_files)(date) for date in dates)
return


def extract_attacks(dates=None):
"""
Creates the dataset or part of it
Expand All @@ -193,6 +239,7 @@ def extract_attacks(dates=None):
Parallel(n_jobs=12, backend='multiprocessing')(delayed(_extract_attacks)(date) for date in dates)
return


def get_attacks(start=None, end=None, dates=None, usecols=None):
'''
Returns a DataFrame with the attacks between the dates start and end or the
Expand All @@ -212,6 +259,7 @@ def get_attacks(start=None, end=None, dates=None, usecols=None):
for date in dates]
return dfs


if __name__ == '__main__':
log_fmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
#logging.basicConfig(level=logging.INFO, format=log_fmt)
Expand Down
70 changes: 58 additions & 12 deletions lib/aip/data/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,44 @@
import hashlib
import pandas as pd
import shutil
import subprocess, shlex
import shlex
import subprocess
import zeeklog2pandas as z2p

from dotenv import dotenv_values
from joblib import Parallel, delayed
from os import makedirs, path, access, W_OK
from joblib import Parallel
from joblib import delayed
from os import makedirs
from os import path
from os import access
from os import W_OK
from pathlib import Path

_project_dir = Path(__file__).resolve().parents[3]
_config = {
**dotenv_values(path.join(_project_dir, ".env")), # load sensitive variables
}


def read_zeek(path, **kwargs):
"""
Reads a Zeek file to a DataFrame
"""
try:
# Load Zeek path to DataFrame
df = z2p.read_zeek(path, **kwargs)

# Convert to readable time format
if 'ts' in df.keys():
df['ts'] = pd.to_datetime(df.ts, unit='s')

# Returns data frame
return df
except:
raise z2p.NotAZeekFile(path)


# Currently deprecated, AIP is using now strictly Zeek logs
def read_argus(path, **kwargs):
from os import path as ospath
if ospath.exists(path.path + '.csv'):
Expand All @@ -40,21 +56,51 @@ def read_argus(path, **kwargs):
df.rename(columns={'StartTime':'ts', 'SrcAddr':'id.orig_h', 'DstAddr':'id.resp_h', 'Dur': 'duration', 'SrcBytes': 'orig_ip_bytes', 'SrcPkts': 'orig_pkts'}, inplace=True)
return df


# This function is unused right now
def scramble(s):
return hashlib.sha1(_config['salt'].encode() + s.encode()).hexdigest()

def getrawdata(date):

def get_raw_data(date):
"""
Retrieves Zeek data from a remote? location and stores it
on a directory for AIP to process it. The copy is done in
parallel.
"""

# Validate date is well formatted
dt.datetime.strptime(date, '%Y-%m-%d')
p = path.join(_project_dir,'data','raw', date)
if access(p, W_OK):
makedirs(p, exist_ok=True)
commands = [shlex.split(_config['magic'] + f'{date}/conn.{x:02}* ' + p) for x in range(0,24)]
Parallel(n_jobs=24, backend='threading')(delayed(subprocess.run)(c) for c in commands)

def removerawdata(date, force=False):
raw_data_dir = path.join(_project_dir,'data','raw', date)

# Ensure directory exists and is writable
if access(raw_data_dir, W_OK):
# Create directory, ignore if it exists
makedirs(raw_data_dir, exist_ok=True)

# The next part seems to be prepared to retrieve data from a location
# and store it in the data/raw/YYYY-MM-DD directory for processing.
commands = [
shlex.split(_config['magic'] + f'{date}/conn.{x:02}* ' + raw_data_dir)
for x in range(0,24)
]

# Attempting to run the previous commands in parallel
Parallel(n_jobs=24, backend='threading')(delayed(subprocess.run)(cmd) for cmd in commands)


def remove_raw_data(date, force=False):
"""
Remove (delete) the content of the raw data directory
for a given date.
"""
# Validate date is well formatted
dt.datetime.strptime(date, '%Y-%m-%d')
p = path.join(_project_dir,'data','raw', date)

raw_data_dir = path.join(_project_dir,'data','raw', date)

# Only delete raw data if explicitly allowed in the configuration file
if (_config['remove_raw_data'].lower() == 'true') or force:
shutil.rmtree(p)
shutil.rmtree(raw_data_dir)

Loading