-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
123 lines (97 loc) · 3.97 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from datetime import datetime, timezone
from re import search
import os
from csv import writer
# ANSI escape sequences for text colors
COLOR_ERROR = '\033[91m'
COLOR_SUCCESS = '\033[92m'
COLOR_WARNING = '\033[93m'
COLOR_INFO = '\033[94m'
COLOR_RESET = '\033[0m'
def parse_log(log_file_path, start_at=None, format='UE'):
logs = []
if format == "UE":
pattern = r'^\[(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3})\]'
expr = "%Y-%m-%d %H:%M:%S.%f"
elif format == "AMF":
pattern = r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z)"
expr = "%Y-%m-%dT%H:%M:%SZ"
time_window = start_at == None
with open(log_file_path, 'r') as file:
for line in file:
match = search(pattern, line)
if match:
#print(line)
timestamp = datetime.strptime(match.group(1), expr)
# Ensure the timestamp is in UTC (offset-aware)
timestamp_utc = timestamp.replace(tzinfo=timezone.utc)
if not time_window and timestamp_utc >= start_at:
time_window = True
if time_window:
logs.append((timestamp_utc, line))
#print(line)
return logs
def parse_log_with_prefix(log_key_prefix, start_at=None):
logs = {}
folder_path = 'logs'
#print(log_key_prefix)
# Iterate over the files in the folder
for filename in os.listdir(folder_path):
log_file_path = os.path.join(folder_path, filename)
if os.path.isfile(log_file_path) and log_key_prefix in log_file_path:
one_log_output = []
with open(log_file_path, 'r') as file:
for line in file:
match = search(r'^\[(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3})\]', line)
if match:
#print(line)
timestamp = datetime.strptime(match.group(1), "%Y-%m-%d %H:%M:%S.%f")
# Ensure the timestamp is in UTC (offset-aware)
timestamp_utc = timestamp.replace(tzinfo=timezone.utc)
one_log_output.append((timestamp_utc, line))
logs[log_file_path] = one_log_output
return logs
def read_subprocess(process):
logs = []
line_counter = 0
line_limit = 200
# Read the output of the subprocess line by line
for line_raw in iter(process.stdout.readline, b''):
line = line_raw.decode('utf-8')
match = search(r'^\[(\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\.\d{3})\]', line)
if match:
#print(line)
timestamp = datetime.strptime(match.group(1), "%Y-%m-%d %H:%M:%S.%f")
# Ensure the timestamp is in UTC (offset-aware)
timestamp_utc = timestamp.replace(tzinfo=timezone.utc)
logs.append((timestamp_utc, line))
line_counter += 1
if line_counter > line_limit:
break
return logs
def find_message(logs, ue_id, MESSAGE):
for timestamp, line in logs:
if f'[{ue_id}|nas]' in line and MESSAGE in line:
return timestamp
return None
def save_data_points(data_points, filename):
mode = "w" # "w" for writing, "a" for appending
x,y = data_points
# Open the CSV file in the specified mode
with open(filename, mode, newline="") as file:
my_writer = writer(file)
# Write each point as a row in the CSV file
for i in range (0,len(x),1):
my_writer.writerow([x[i],y[i]])
print(f' Data saved as csv ({filename})')
def delete_files_in_folder(folder_path, key=''):
# Get a list of all files in the folder
files = os.listdir(folder_path)
if key:
files = [filename for filename in files if key in filename]
# Iterate over the files and delete each one
for file_name in files:
file_path = os.path.join(folder_path, file_name)
if os.path.isfile(file_path):
os.remove(file_path)
#print(f"Deleted file: {file_path}")