-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparsers.py
More file actions
251 lines (193 loc) · 9.18 KB
/
Copy pathparsers.py
File metadata and controls
251 lines (193 loc) · 9.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import pandas as pd
import numpy as np
import json
from pathlib import Path
from datetime import datetime, timedelta
from src.config import ACTIGRAPH_PATH, BANGLE_PATH, EMOTIBIT_PATH, CALORIMETRY_PATH
class ActigraphParser:
@staticmethod
def parse_raw_file(file_path):
# Skip header lines (first 10 lines are metadata)
df = pd.read_csv(file_path, skiprows=11, decimal=',')
# Rename columns
df.columns = ['timestamp', 'acc_x', 'acc_y', 'acc_z']
# Convert timestamp to datetime
df['timestamp'] = pd.to_datetime(df['timestamp'])
# Calculate vector magnitude
df['acc_magnitude'] = np.sqrt(df['acc_x']**2 + df['acc_y']**2 + df['acc_z']**2)
return df
@staticmethod
def parse_5sec_file(file_path):
# Skip header lines
df = pd.read_csv(file_path, skiprows=11)
# Read metadata to get start time
with open(file_path, 'r') as f:
lines = [f.readline() for _ in range(11)]
# Extract start time and date
start_time = lines[2].split('Start Time ')[1].strip()
start_date = lines[3].split('Start Date ')[1].strip()
start_datetime = pd.to_datetime(f"{start_date} {start_time}")
# Create timestamps (5-second intervals)
df['timestamp'] = [start_datetime + timedelta(seconds=5*i) for i in range(len(df))]
# Rename columns
df.columns = ['axis1', 'axis2', 'axis3', 'steps', 'timestamp']
return df
class BangleParser:
@staticmethod
def parse_file(file_path):
df = pd.read_csv(file_path)
# Ensure Time column is numeric
df['Time'] = pd.to_numeric(df.get('Time'), errors='coerce')
# Drop rows with invalid Time values
df = df.dropna(subset=['Time'])
# The Bangle app writes delta times (ms) between consecutive readings.
# We compute cumulative time which will be aligned to a reference timestamp later.
if len(df) > 0:
# All Time values are deltas in milliseconds
# Cumulative sum gives elapsed time from start of recording
df['cumulative_time_ms'] = df['Time'].cumsum()
# Create a placeholder timestamp (will be replaced during alignment)
df['timestamp'] = pd.to_datetime(df['cumulative_time_ms'], unit='ms', utc=True)
else:
df['cumulative_time_ms'] = pd.Series(dtype=float)
df['timestamp'] = pd.Series(dtype='datetime64[ns]')
# Ensure accelerometer columns are numeric
df['Acc_x'] = pd.to_numeric(df.get('Acc_x'), errors='coerce')
df['Acc_y'] = pd.to_numeric(df.get('Acc_y'), errors='coerce')
df['Acc_z'] = pd.to_numeric(df.get('Acc_z'), errors='coerce')
# Drop rows with invalid accelerometer values
df = df.dropna(subset=['Acc_x', 'Acc_y', 'Acc_z'])
# Calculate accelerometer vector magnitude
df['acc_magnitude'] = np.sqrt(df['Acc_x']**2 + df['Acc_y']**2 + df['Acc_z']**2)
return df
class EmotibitParser:
@staticmethod
def parse_info_json(json_path):
with open(json_path, 'r') as f:
info = json.load(f)
return info
@staticmethod
def parse_csv_file(file_path):
# Read line by line because of variable columns
data = []
with open(file_path, 'r') as f:
for line in f:
parts = line.strip().split(',')
# Pad with None if needed
while len(parts) < 20: # Ensure minimum columns
parts.append(None)
data.append(parts[:20]) # Take first 20 columns max
# Create dataframe
base_cols = ['local_timestamp', 'emotibit_timestamp', 'data_length', 'type_tag', 'packet_number', 'reliability']
data_cols = [f'data_{i}' for i in range(14)] # Up to 14 data columns
df = pd.DataFrame(data, columns=base_cols + data_cols)
# Convert numeric columns
df['local_timestamp'] = pd.to_numeric(df['local_timestamp'], errors='coerce')
df['emotibit_timestamp'] = pd.to_numeric(df['emotibit_timestamp'], errors='coerce')
df['data_length'] = pd.to_numeric(df['data_length'], errors='coerce')
return df
@staticmethod
def extract_accelerometer(df):
ax = df[df['type_tag'] == 'AX'].copy()
ay = df[df['type_tag'] == 'AY'].copy()
az = df[df['type_tag'] == 'AZ'].copy()
# Combine into single dataframe
# Note: data can have multiple values per row (data_length > 1)
acc_data = []
for tag, data in [('x', ax), ('y', ay), ('z', az)]:
for _, row in data.iterrows():
try:
num_values = int(row['data_length']) if pd.notna(row['data_length']) else 1
timestamp = row['local_timestamp']
for i in range(num_values):
col_name = f'data_{i}'
# Skip if column doesn't exist or is NaN
if col_name not in row.index or pd.isna(row[col_name]):
continue
acc_data.append({
'timestamp': timestamp,
'axis': tag,
'value': float(row[col_name])
})
except (ValueError, KeyError, TypeError):
# Skip malformed rows
continue
if not acc_data:
# Return empty dataframe with expected columns
return pd.DataFrame(columns=['timestamp', 'acc_x', 'acc_y', 'acc_z', 'acc_magnitude'])
acc_df = pd.DataFrame(acc_data)
# Pivot to have x, y, z as columns
acc_pivot = acc_df.pivot_table(index='timestamp', columns='axis', values='value', aggfunc='first')
acc_pivot.columns = ['acc_x', 'acc_y', 'acc_z']
acc_pivot.reset_index(inplace=True)
# Calculate magnitude
acc_pivot['acc_magnitude'] = np.sqrt(
acc_pivot['acc_x']**2 +
acc_pivot['acc_y']**2 +
acc_pivot['acc_z']**2
)
return acc_pivot
@staticmethod
def extract_ppg(df):
pi = df[df['type_tag'] == 'PI'] # Infrared
pr = df[df['type_tag'] == 'PR'] # Red
pg = df[df['type_tag'] == 'PG'] # Green
ppg_data = {
'timestamp': pi['local_timestamp'].values,
'ppg_infrared': pi['data_0'].astype(float).values,
'ppg_red': pr['data_0'].astype(float).values,
'ppg_green': pg['data_0'].astype(float).values
}
return pd.DataFrame(ppg_data)
class CalorimetryParser:
@staticmethod
def parse_xls_file(file_path):
try:
# Try different engines
df = pd.read_excel(file_path, engine='xlrd')
except:
try:
df = pd.read_excel(file_path, engine='openpyxl')
except Exception as e:
print(f"Error reading {file_path}: {e}")
return None
return df
def calculate_magnitude_5s_windows(df, timestamp_col='timestamp', window='5S'):
df_copy = df.copy()
df_copy.set_index(timestamp_col, inplace=True)
# Resample to 5-second windows and calculate mean
result = df_copy['acc_magnitude'].resample(window).mean().reset_index()
result.columns = [timestamp_col, 'acc_magnitude_5s']
return result
# Example usage
if __name__ == "__main__":
print("Testing parsers...")
# Test Actigraph
print("\n1. Testing Actigraph parser...")
actigraph_files = list(ACTIGRAPH_PATH.glob("*RAW.csv"))
small_files = [f for f in actigraph_files if f.stat().st_size < 10*1024*1024]
if small_files:
df_acti = ActigraphParser.parse_raw_file(small_files[0])
print(f" Loaded {len(df_acti)} rows")
print(f" Columns: {list(df_acti.columns)}")
print(f" Time range: {df_acti['timestamp'].min()} to {df_acti['timestamp'].max()}")
# Test Bangle
print("\n2. Testing Bangle parser...")
bangle_files = list(BANGLE_PATH.glob("*activity.csv"))
if bangle_files:
df_bangle = BangleParser.parse_file(bangle_files[0])
print(f" Loaded {len(df_bangle)} rows")
print(f" Columns: {list(df_bangle.columns)}")
print(df_bangle[['Time', 'Acc_x', 'Acc_y', 'Acc_z', 'acc_magnitude']].head())
# Test Emotibit
print("\n3. Testing Emotibit parser...")
emotibit_dirs = [d for d in EMOTIBIT_PATH.iterdir() if d.is_dir()]
if emotibit_dirs:
csv_file = list(emotibit_dirs[0].glob("*.csv"))[0]
df_emoti = EmotibitParser.parse_csv_file(csv_file)
print(f" Loaded {len(df_emoti)} rows")
print(f" Type tags found: {df_emoti['type_tag'].unique()}")
acc_data = EmotibitParser.extract_accelerometer(df_emoti)
print(f" Extracted {len(acc_data)} accelerometer samples")
print(acc_data.head())
print("\nAll parsers tested!")