-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcollector.py
189 lines (162 loc) · 8.04 KB
/
collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
'''
This module collects local file system metadata from the Mac local file
system.
Indaleko Mac Local Collector
Copyright (C) 2024-2025 Tony Mason
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
import argparse
import datetime
import inspect
import os
import logging
import platform
import sys
import uuid
from pathlib import Path
from typing import Union
from icecream import ic
if os.environ.get('INDALEKO_ROOT') is None:
current_path = os.path.dirname(os.path.abspath(__file__))
while not os.path.exists(os.path.join(current_path, 'Indaleko.py')):
current_path = os.path.dirname(current_path)
os.environ['INDALEKO_ROOT'] = current_path
sys.path.append(current_path)
# pylint: disable=wrong-import-position
from db.service_manager import IndalekoServiceManager
from platforms.mac.machine_config import IndalekoMacOSMachineConfig
from storage.collectors.base import BaseStorageCollector
from storage.collectors.data_model import IndalekoStorageCollectorDataModel
from storage.collectors.local.local_base import BaseLocalStorageCollector
from utils.misc.file_name_management import generate_file_name, extract_keys_from_file_name, find_candidate_files
# pylint: enable=wrong-import-position
class IndalekoMacLocalStorageCollector(BaseLocalStorageCollector):
'''
This is the class that indexes Mac local file systems.
'''
mac_platform = 'Mac'
mac_local_collector_name = 'fs_collector'
indaleko_mac_local_collector_uuid = '14d6c989-0d1e-4ccc-8aea-a75688a6bb5f'
indaleko_mac_local_collector_service_name = 'Mac Local Storage Collector'
indaleko_mac_local_collector_service_description = 'This service collects metadata from the local filesystems of a Mac machine.'
indaleko_mac_local_collector_service_version = '1.0'
indaleko_mac_local_collector_service_type = IndalekoServiceManager.service_type_storage_collector
indaleko_mac_local_collector_service ={
'service_name' : indaleko_mac_local_collector_service_name,
'service_description' : indaleko_mac_local_collector_service_description,
'service_version' : indaleko_mac_local_collector_service_version,
'service_type' : indaleko_mac_local_collector_service_type,
'service_identifier' : indaleko_mac_local_collector_uuid,
}
collector_data = IndalekoStorageCollectorDataModel(
CollectorPlatformName = mac_platform,
CollectorServiceName = mac_local_collector_name,
CollectorServiceDescription = indaleko_mac_local_collector_service_description,
CollectorServiceUUID = uuid.UUID(indaleko_mac_local_collector_uuid),
CollectorServiceVersion = indaleko_mac_local_collector_service_version,
)
def __init__(self, **kwargs):
super().__init__(**kwargs,
**IndalekoMacLocalStorageCollector.indaleko_mac_local_collector_service
)
self.dir_count=0
self.file_count=0
def generate_mac_collector_file_name(self, **kwargs) -> str:
if 'platform' not in kwargs:
kwargs['platform'] = IndalekoMacLocalStorageCollector.mac_platform
if 'collector_name' not in kwargs:
kwargs['collector_name'] = IndalekoMacLocalStorageCollector.mac_local_collector_name
if 'machine_id' not in kwargs:
kwargs['machine_id'] = uuid.UUID(self.machine_config.machine_id).hex
ic(kwargs)
return BaseStorageCollector.generate_collector_file_name(**kwargs)
def build_stat_dict(self, name: str, root : str, last_uri = None) -> tuple:
'''
Given a file name and a root directory, this will return a dict
constructed from the file system metadata ("stat") for that file.
Returns: dict_stat, last_uri
'''
file_path = os.path.join(root, name)
if last_uri is None:
last_uri = file_path
try:
stat_data = os.stat(file_path)
except Exception as e: # pylint: disable=broad-except
# at least for now, we just skip errors
logging.warning('Unable to stat %s : %s', file_path, e)
self.error_count += 1
return None
stat_dict = {key : getattr(stat_data, key) \
for key in dir(stat_data) if key.startswith('st_')}
stat_dict['Name'] = name
stat_dict['Path'] = root
stat_dict['URI'] = os.path.join(root, name)
stat_dict['Collector'] = str(self.get_collector_service_identifier())
return stat_dict
class local_collector_mixin(BaseLocalStorageCollector.local_collector_mixin):
@staticmethod
def load_machine_config(keys: dict[str, str]) -> IndalekoMacOSMachineConfig:
'''Load the machine configuration'''
if keys.get('debug'):
ic(f'local_collector_mixin.load_machine_config: {keys}')
if 'machine_config_file' not in keys:
raise ValueError(f'{inspect.currentframe().f_code.co_name}: machine_config_file must be specified')
offline = keys.get('offline', False)
return IndalekoMacOSMachineConfig.load_config_from_file(
config_file=str(keys['machine_config_file']),
offline=offline)
@staticmethod
def find_machine_config_files(
config_dir : Union[str, Path],
platform : str,
debug : bool = False
) -> Union[list[str], None]:
'''Find the machine configuration files'''
if debug:
ic(f'find_machine_config_files: config_dir = {config_dir}')
ic(f'find_machine_config_files: platform = {platform}')
if not Path(config_dir).exists():
ic(f'Warning: did not find any config files in {config_dir}')
return None
platform = 'macos'
ic(platform)
return [
fname for fname, _ in find_candidate_files([platform, '-hardware-info'], str(config_dir))
if fname.endswith('.json')
]
@staticmethod
def extract_filename_metadata(file_name):
# the mac uses non-standard naming for machine config files, so we have to handle that here.
if not file_name.startswith(IndalekoMacOSMachineConfig.macos_machine_config_file_prefix):
return BaseLocalStorageCollector.local_collector_mixin.extract_filename_metadata(file_name)
# macos-hardware-info-f6ff7c7f-b4d7-484f-9b58-1ad2820a8d85-2024-12-04T00-44-25.583891Z.json
assert file_name.endswith('.json') # if not, generalize this
prefix_length = len(IndalekoMacOSMachineConfig.macos_machine_config_file_prefix)
machine_id = uuid.UUID(file_name[prefix_length+1:prefix_length+37]).hex
timestamp = file_name[prefix_length+38:-5]
keys = {
'platform' : platform.system(),
'service' : 'macos_machine_config',
'machine' : machine_id,
'timestamp' : timestamp,
'suffix' : '.json',
}
return keys
cli_handler_mixin = local_collector_mixin
def main():
'''This is the CLI handler for the mac local storage collector.'''
BaseLocalStorageCollector.local_collector_runner(
IndalekoMacLocalStorageCollector,
IndalekoMacOSMachineConfig
)
if __name__ == '__main__':
main()