-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfix_snapshot_config.py
95 lines (76 loc) · 3.54 KB
/
fix_snapshot_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import os
import json
import tarfile
import shutil
from pathlib import Path
def fix_segment_config(segment_path):
"""Fix the configuration in a segment file to use on_disk storage"""
print(f"Processing segment: {os.path.basename(segment_path)}")
# Create a temporary directory for modification
temp_dir = Path("temp_segment")
temp_dir.mkdir(exist_ok=True)
# Extract the tar file
with tarfile.open(segment_path, 'r') as tar:
tar.extractall(temp_dir)
# Fix the segment configuration
segment_json_path = temp_dir / "snapshot" / "files" / "segment.json"
if segment_json_path.exists():
with open(segment_json_path, 'r') as f:
config = json.load(f)
print("Original config:", json.dumps(config, indent=2))
# Replace 'mmap' with 'on_disk' in the configuration
if isinstance(config, dict):
def replace_mmap(obj):
if isinstance(obj, dict):
for k, v in obj.items():
if isinstance(v, str) and v == 'mmap':
obj[k] = 'on_disk'
elif isinstance(v, (dict, list)):
replace_mmap(v)
elif isinstance(obj, list):
for item in obj:
if isinstance(item, (dict, list)):
replace_mmap(item)
replace_mmap(config)
print("\nModified config:", json.dumps(config, indent=2))
# Save the modified configuration
with open(segment_json_path, 'w') as f:
json.dump(config, f, indent=2)
# Create new tar file
backup_path = Path(str(segment_path) + '.backup')
if segment_path.exists():
shutil.move(segment_path, backup_path)
with tarfile.open(segment_path, 'w') as tar:
for file_path in temp_dir.rglob('*'):
if file_path.is_file():
arcname = file_path.relative_to(temp_dir)
tar.add(file_path, arcname=arcname)
# Clean up
shutil.rmtree(temp_dir)
print(f"Processed {os.path.basename(segment_path)}")
def main():
# Process both collections
collections = ['extracted_users_data', 'extracted_elements_data']
for collection in collections:
print(f"\nProcessing collection: {collection}")
segments_dir = Path(collection) / "0" / "segments"
if not segments_dir.exists():
print(f"Directory not found: {segments_dir}")
continue
for segment_file in segments_dir.glob('*.tar'):
fix_segment_config(segment_file)
# Also fix the main collection config
config_path = Path(collection) / "config.json"
if config_path.exists():
with open(config_path, 'r') as f:
config = json.load(f)
print("\nOriginal collection config:", json.dumps(config, indent=2))
# Update storage type in collection config
if 'params' in config:
if config['params'].get('on_disk', None) == 'mmap':
config['params']['on_disk'] = True
print("\nModified collection config:", json.dumps(config, indent=2))
with open(config_path, 'w') as f:
json.dump(config, f, indent=2)
if __name__ == "__main__":
main()