@@ -21,8 +21,15 @@ def convert_to_csv(self, txt_dfs):
2121 # If file is empty or has no valid records, skip
2222 continue
2323
24- # Normalize nested dicts into flat columns so downstream QC stays unchanged
25- flattened_df = pd .json_normalize (records )
24+ normalized_records = [
25+ self ._flatten_record (record ) for record in records if isinstance (record , dict )
26+ ]
27+ if not normalized_records :
28+ continue
29+
30+ flattened_df = pd .json_normalize (normalized_records , sep = "_" )
31+ flattened_df = self ._harmonize_columns (flattened_df )
32+ flattened_df = self ._normalize_semantics (flattened_df )
2633 new_dfs .append (flattened_df )
2734
2835 return new_dfs
@@ -82,6 +89,117 @@ def _collect_records(self, payload):
8289
8390 return []
8491
92+ def _flatten_record (self , record ):
93+ """
94+ Recursively merge wrapper keys (``data``, ``trialdata``) into a flat dict so
95+ downstream QC modules see consistent column names like ``block`` and ``correct``.
96+ """
97+ flattened = {}
98+ for key , value in record .items ():
99+ if isinstance (value , dict ) and key .lower () in {"data" , "trialdata" }:
100+ flattened .update (self ._flatten_record (value ))
101+ elif isinstance (value , dict ):
102+ nested = self ._flatten_record (value )
103+ for nested_key , nested_value in nested .items ():
104+ combined_key = f"{ key } _{ nested_key } "
105+ flattened [combined_key ] = nested_value
106+ else :
107+ flattened [key ] = value
108+ return flattened
109+
110+ def _harmonize_columns (self , df : pd .DataFrame ) -> pd .DataFrame :
111+ """
112+ Strip known wrapper prefixes to restore historical column names and drop duplicates.
113+ """
114+ rename_map = {}
115+ for col in df .columns :
116+ new_col = col
117+ for prefix in ("trialdata_" , "data_" , "payload_" , "TrialData_" , "trialData_" ):
118+ if new_col .startswith (prefix ):
119+ new_col = new_col [len (prefix ):]
120+ rename_map [col ] = new_col
121+
122+ harmonized = df .rename (columns = rename_map )
123+
124+ canonical_map = {
125+ "Block" : "block" ,
126+ "BlockName" : "block" ,
127+ "blockName" : "block" ,
128+ "Block_Type" : "block" ,
129+ "block_type" : "block" ,
130+ "Condition" : "condition" ,
131+ "Cond" : "condition" ,
132+ "stim_condition" : "condition" ,
133+ "Correct" : "correct" ,
134+ "isCorrect" : "correct" ,
135+ "Session" : "session_number" ,
136+ "session" : "session_number" ,
137+ "SessionID" : "session_number" ,
138+ "Subject" : "subject_id" ,
139+ "subject" : "subject_id" ,
140+ }
141+
142+ harmonized = harmonized .rename (columns = lambda col : canonical_map .get (col , col ))
143+
144+ # If both original and harmonized columns exist, keep the first non-null values.
145+ if harmonized .columns .duplicated ().any ():
146+ deduped = {}
147+ for col in harmonized .columns .unique ():
148+ dupes = [c for c in harmonized .columns if c == col ]
149+ if len (dupes ) == 1 :
150+ deduped [col ] = harmonized [dupes [0 ]]
151+ else :
152+ stacked = harmonized [dupes ].bfill (axis = 1 )
153+ deduped [col ] = stacked .iloc [:, 0 ]
154+ harmonized = pd .DataFrame (deduped )
155+
156+ return harmonized
157+
158+ def _normalize_semantics (self , df : pd .DataFrame ) -> pd .DataFrame :
159+ """
160+ Coerce critical columns (block/condition/correct/session/subject_id) into their
161+ historical dtypes and label space so downstream QC and persistence stay stable.
162+ """
163+ normalized = df .copy ()
164+
165+ if "block" in normalized .columns :
166+ normalized ["block" ] = normalized ["block" ].map (self ._standardize_block )
167+
168+ if "condition" in normalized .columns :
169+ normalized ["condition" ] = normalized ["condition" ].apply (
170+ lambda val : val .strip () if isinstance (val , str ) else val
171+ )
172+
173+ if "correct" in normalized .columns :
174+ normalized ["correct" ] = pd .to_numeric (normalized ["correct" ], errors = "coerce" )
175+
176+ if "session_number" in normalized .columns :
177+ normalized ["session_number" ] = pd .to_numeric (
178+ normalized ["session_number" ], errors = "coerce"
179+ )
180+
181+ if "subject_id" in normalized .columns :
182+ normalized ["subject_id" ] = normalized ["subject_id" ].apply (
183+ lambda val : str (val ).strip () if pd .notna (val ) else val
184+ )
185+
186+ return normalized
187+
188+ @staticmethod
189+ def _standardize_block (value ):
190+ if isinstance (value , str ):
191+ cleaned = value .strip ().lower ()
192+ if cleaned .startswith ("test" ):
193+ return "test"
194+ if cleaned .startswith (("prac" , "practice" )):
195+ return "prac"
196+ if cleaned in {"training" , "train" }:
197+ return "prac"
198+ if cleaned == "" :
199+ return np .nan
200+ return cleaned
201+ return value
202+
85203 def save_csv (self ):
86204 return None
87205
0 commit comments