Skip to content

Commit 3ed4f9f

Browse files
committed
update archiver to process reports day-by-day
1 parent a188f76 commit 3ed4f9f

File tree

7 files changed

+98
-59
lines changed

7 files changed

+98
-59
lines changed

src/riskdb/archiver/config.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
from os import environ
2+
from datetime import datetime
23

34
REPO_ARCHIVE = 'github.com/O-X-L/risk-db-archive'
4-
ARCHIVE_DEDUPE_FIELDS = ['fp', 'cmt', 'user', 'by', 'cat']
5-
HEADERS_ARCHIVE_CSV = ['time', 'ip', 'an', 'cat', 'cmt', 'by', 'user', 'fp']
5+
ARCHIVE_DEDUPE_FIELDS = ['cmt', 'user', 'by', 'cat']
6+
HEADERS_ARCHIVE_CSV = ['time', 'ip', 'an', 'cat', 'cmt', 'by', 'user']
67
GIT_TOKEN = environ.get('GIT_TOKEN')
8+
ARCHIVE_START_DATE = datetime(year=2024, month=11, day=1)

src/riskdb/archiver/main.py

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,29 +5,28 @@
55
from time import time
66
from hashlib import md5
77
from pathlib import Path
8-
from datetime import datetime
98
from os import system as shell
109
from operator import itemgetter
1110
from sys import path as sys_path
1211
from ipaddress import ip_network
12+
from datetime import datetime, timedelta
1313

1414
sys_path.append(str(Path(__file__).parent.parent.parent))
1515

1616
from riskdb.config import NET_SIZE
17-
from riskdb.archiver.config import REPO_ARCHIVE, HEADERS_ARCHIVE_CSV, ARCHIVE_DEDUPE_FIELDS
1817
from riskdb.builder.util import log
1918
from riskdb.builder.load_reports import FileLoader
2019
from riskdb.archiver.util import git_commit_and_push, git_clone, git_check_token
20+
from riskdb.archiver.config import REPO_ARCHIVE, HEADERS_ARCHIVE_CSV, ARCHIVE_DEDUPE_FIELDS, ARCHIVE_START_DATE
2121

2222

23-
# NOTE: de-duplicating raw-report values to make the archive more compact
24-
def _reports_by_day(tmp_dir: str) -> dict[list[dict]]:
25-
reports = {}
26-
tmp_dir_dedupe = f'{tmp_dir}/dedupe'
27-
dedupe_map = {k: [] for k in ARCHIVE_DEDUPE_FIELDS}
28-
shell(f'mkdir -p {tmp_dir_dedupe}')
23+
def _generate_archive_for_day(date: datetime, dedupe_map: dict, tmp_dir: Path) -> dict:
24+
reports = []
25+
for r in FileLoader(sliding_window=False, match_date=date):
26+
rdate = datetime.fromtimestamp(r['time'])
27+
if rdate.year != date.year or rdate.month != date.month or rdate.day != date.day:
28+
continue
2929

30-
for r in FileLoader():
3130
for k, v in r.items():
3231
if v is None:
3332
r[k] = ''
@@ -46,25 +45,19 @@ def _reports_by_day(tmp_dir: str) -> dict[list[dict]]:
4645
if 'v' in r:
4746
r.pop('v')
4847

49-
day = datetime.fromtimestamp(r['time']).strftime('%Y_%m_%d')
50-
if day not in reports:
51-
reports[day] = []
52-
5348
if 'an' not in r:
5449
r['an'] = ''
5550

56-
if 'fp' not in r:
57-
r['fp'] = ''
51+
if r['by'] != '':
52+
if r['by'].find(':') != -1:
53+
cidr = NET_SIZE['6']
5854

59-
if r['by'].find(':') != -1:
60-
cidr = NET_SIZE['6']
55+
else:
56+
cidr = NET_SIZE['4']
6157

62-
else:
63-
cidr = NET_SIZE['4']
64-
65-
r['by'] = str(ip_network(f"{r['by']}/{cidr}", strict=False)).split('/', 1)[0]
66-
if r['by'] in ['::', '::1', '127.0.0.0']:
67-
r['by'] = ''
58+
r['by'] = str(ip_network(f"{r['by']}/{cidr}", strict=False)).split('/', 1)[0]
59+
if r['by'] in ['::', '::1', '127.0.0.0']:
60+
r['by'] = ''
6861

6962
for k in ARCHIVE_DEDUPE_FIELDS:
7063
if r[k] == '':
@@ -77,47 +70,60 @@ def _reports_by_day(tmp_dir: str) -> dict[list[dict]]:
7770

7871
r[k] = dedupe_map[k].index(r[k])
7972

80-
reports[day].append(r)
73+
reports.append(r)
8174

82-
for k in ARCHIVE_DEDUPE_FIELDS:
83-
with open(f'{tmp_dir_dedupe}/field_{k}.csv', 'w', encoding='utf-8') as f:
84-
f.write('Key,Value\n')
85-
f.write('\n'.join([f'{i},{v}' for i, v in enumerate(dedupe_map[k])]))
75+
reports = sorted(reports, key=itemgetter('time'))
76+
77+
if len(reports) == 0:
78+
return dedupe_map
8679

87-
for day in reports:
88-
reports[day] = sorted(reports[day], key=itemgetter('time'))
80+
y = str(date.year).zfill(2)
81+
m = str(date.month).zfill(2)
82+
d = str(date.day).zfill(2)
83+
tmp_dir_mon = tmp_dir / y / m
84+
shell(f'mkdir -p {tmp_dir_mon}')
85+
with open(f'{tmp_dir_mon}/{y}_{m}_{d}.csv', 'w', encoding='utf-8') as f:
86+
f.write(f"{','.join(HEADERS_ARCHIVE_CSV)}\n")
87+
for r in reports:
88+
f.write(
89+
f"{r['time']},"
90+
f"{r['ip']},{r['an']},{r['cat']},{r['cmt']},"
91+
f"{r['by']},{r['user']}\n"
92+
)
8993

90-
return reports
94+
return dedupe_map
9195

9296

93-
def _write_reports(reports: dict[list[dict]], tmp_dir: str):
94-
for y_m_d in reports:
95-
y, m, _ = y_m_d.split('_')
96-
tmp_dir_mon = f'{tmp_dir}/{y}/{m}'
97-
shell(f'mkdir -p {tmp_dir_mon}')
98-
with open(f'{tmp_dir_mon}/{y_m_d}.csv', 'w', encoding='utf-8') as f:
99-
f.write(f"{','.join(HEADERS_ARCHIVE_CSV)}\n")
100-
for r in reports[y_m_d]:
101-
f.write(
102-
f"{r['time']},"
103-
f"{r['ip']},{r['an']},{r['cat']},{r['cmt']},"
104-
f"{r['by']},{r['user']},{r['fp']}\n"
105-
)
97+
# todo: multi-threading
98+
def _generate_archive(tmp_dir: Path):
99+
today = datetime.now()
100+
date = ARCHIVE_START_DATE
101+
dedupe_map = {k: [] for k in ARCHIVE_DEDUPE_FIELDS}
102+
103+
while date.year < today.year or date.month < today.month or date.day <= today.day:
104+
log(f'Generating archive for day: '
105+
f'{str(date.year).zfill(2)}-{str(date.month).zfill(2)}-{str(date.day).zfill(2)}')
106+
dedupe_map = _generate_archive_for_day(date=date, dedupe_map=dedupe_map, tmp_dir=tmp_dir)
107+
date += timedelta(days=1)
108+
109+
log('Writing dedupe-maps')
110+
tmp_dir_dedupe = tmp_dir / 'dedupe'
111+
shell(f'mkdir -p {tmp_dir_dedupe}')
112+
for k in ARCHIVE_DEDUPE_FIELDS:
113+
with open(f'{tmp_dir_dedupe}/field_{k}.csv', 'w', encoding='utf-8') as f:
114+
f.write('Key,Value\n')
115+
f.write('\n'.join([f'{i},{v}' for i, v in enumerate(dedupe_map[k])]))
106116

107117
git_commit_and_push(user='Report Updater', cmt='Report updates', repo=REPO_ARCHIVE, tmp_dir=tmp_dir)
108118

109119

110120
def main():
111121
log('Prepare Repository')
112122
git_check_token()
113-
tmp_dir = f'/tmp/risk_db_archive_{int(time())}'
123+
tmp_dir = Path(f'/tmp/risk_db_archive_{int(time())}')
114124
git_clone(repo=REPO_ARCHIVE, tmp_dir=tmp_dir)
115125

116-
log('Loading & Sorting Reports by Day')
117-
reports_by_day = _reports_by_day(tmp_dir)
118-
119-
log('Write Reports')
120-
_write_reports(reports_by_day, tmp_dir)
126+
_generate_archive(tmp_dir)
121127

122128

123129
if __name__ == '__main__':

src/riskdb/archiver/util.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
1-
from os import system as shell
1+
from pathlib import Path
22
from datetime import datetime
3+
from os import system as shell
34

5+
from riskdb.config import MODE_TEST
46
from riskdb.archiver.config import GIT_TOKEN
57

68

7-
def git_clone(repo: str, tmp_dir: str):
9+
def git_clone(repo: str, tmp_dir: Path):
810
shell(f'git clone https://{repo} {tmp_dir} >/dev/null')
911

1012

11-
def git_commit_and_push(user: str, cmt: str, repo: str, tmp_dir: str):
13+
def git_commit_and_push(user: str, cmt: str, repo: str, tmp_dir: Path):
14+
if MODE_TEST == '1':
15+
return
16+
1217
today = datetime.now().strftime('%Y-%m-%d')
1318
shell(
1419
f"cd {tmp_dir} && "
@@ -22,5 +27,8 @@ def git_commit_and_push(user: str, cmt: str, repo: str, tmp_dir: str):
2227

2328

2429
def git_check_token():
30+
if MODE_TEST == '1':
31+
return
32+
2533
if GIT_TOKEN is None or not GIT_TOKEN.startswith('ghp_'):
2634
raise PermissionError('Required GIT-Token was not supplied!')

src/riskdb/builder/config.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from os import environ
2-
# from datetime import timedelta
2+
from datetime import timedelta
33

44
from riskdb.config import DL_DIR
55

@@ -15,7 +15,7 @@
1515

1616
MMDB_DESCRIPTION = 'OXL RISK-Database - risk.oxl.app (BSD-3-Clause)'
1717
REPORT_COOLDOWN = 10 # sec
18-
# REPORT_DAYS = timedelta(days=30) # sliding window
18+
REPORT_DAYS = timedelta(days=30) # sliding window
1919
TOR_EXIT_NODE_LIST = 'https://check.torproject.org/torbulkexitlist'
2020

2121
PTR_LOOKUP_THREADS = 50
File renamed without changes.

src/riskdb/builder/load_reports.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
# pylint: disable=R0915
22

33
from os import listdir
4+
from datetime import datetime
45
from json import JSONDecodeError
56
from json import loads as json_loads
67
from ipaddress import ip_address, AddressValueError
78

89
from maxminddb import open_database as mmdb_database
910

1011
from riskdb.config import EXCLUDE_NETS_IP4, EXCLUDE_NETS_IP6, REPORT_DIR, USER_TOKENS
11-
from riskdb.builder.config import REPORT_COOLDOWN, ASN_MMDB_FILE_IP4, ASN_MMDB_FILE_IP6
12+
from riskdb.builder.config import REPORT_COOLDOWN, ASN_MMDB_FILE_IP4, ASN_MMDB_FILE_IP6, REPORT_DAYS
1213
from riskdb.builder.obj.ip import IP
1314
from riskdb.builder.obj.asn import ASN
1415
from riskdb.builder.obj.report import Report
@@ -17,6 +18,7 @@
1718
from riskdb.builder.util import log
1819

1920
SKIP_REASONS_DEFAULT = {'no_cat': 0, 'bad_ip': 0, 'cooldown': 0, 'ignored': 0, 'bad_json': 0}
21+
SLIDING_WINDOW_START = datetime.now() - REPORT_DAYS
2022

2123

2224
class ReportLoader:
@@ -85,12 +87,31 @@ def __iter__(self):
8587

8688

8789
class FileLoader:
88-
def __init__(self, path: str = REPORT_DIR):
90+
def __init__(self, path: str = REPORT_DIR, sliding_window: bool = False, match_date: datetime = None):
8991
self.path = path
92+
self.match_date = match_date
93+
self.sliding_window = sliding_window
9094
self.skip_reasons = SKIP_REASONS_DEFAULT.copy()
9195

9296
def load(self):
9397
for file in listdir(REPORT_DIR):
98+
file_path = REPORT_DIR / file
99+
if self.sliding_window or self.match_date is not None:
100+
ct = datetime.fromtimestamp(file_path.stat().st_ctime)
101+
102+
if self.sliding_window and ct < SLIDING_WINDOW_START:
103+
continue
104+
105+
# only get reports of that day (even if we created the file later on)
106+
if self.match_date is not None:
107+
y = str(self.match_date.year).zfill(2)
108+
m = str(self.match_date.month).zfill(2)
109+
d = str(self.match_date.day).zfill(2)
110+
ys1, ys2 = f'{y}-{m}-{d}', f'{y}_{m}_{d}'
111+
if (ct.year != y or ct.month != m or ct.day != d) and \
112+
file.find(ys1) == -1 and file.find(ys2) == -1:
113+
continue
114+
94115
loader = ReportLoader(f'{REPORT_DIR}/{file}')
95116
yield from loader
96117

src/riskdb/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from pathlib import Path
33
from ipaddress import ip_network
44

5+
MODE_TEST = environ.get('RISKDB_TEST', '0')
6+
57
USER_TOKENS = [
68
'ceaf6e70-71c7-4415-92c0-2be6ea5f743b', # dummy test-token
79
]

0 commit comments

Comments
 (0)