22
33import io
44from contextlib import contextmanager , nullcontext
5+ from io import TextIOBase
56from pathlib import Path
6- from typing import IO , Any , Dict , List
7+ from typing import IO , Any , ContextManager , Dict , List
78
89import chardet
910import pandas as pd
@@ -104,7 +105,9 @@ def _read_from_path(path: Path):
104105 raise TypeError (f"Unsupported input type for encoding detection: { type (source )} " )
105106
106107
107- def read_ags_source (source : str | Path | IO [str ] | IO [bytes ] | bytes , encoding = None ):
108+ def read_ags_source (
109+ source : str | Path | IO [str ] | IO [bytes ] | bytes , encoding = None
110+ ) -> ContextManager [TextIOBase ]:
108111 """Opens or wraps a given source for reading AGS (text-based) data.
109112
110113 Args:
@@ -113,8 +116,7 @@ def read_ags_source(source: str | Path | IO[str] | IO[bytes] | bytes, encoding=N
113116 - IO[str]: A file-like text stream.
114117 - IO[bytes]: Byte stream
115118 - bytes: Binary content or stream (will be decoded).
116- encoding (str | None): Encoding to use for decoding bytes. Defaults if None, encoding will be
117- determined by `chardet.detect`.
119+ encoding (str | None): Encoding to use for decoding bytes. Default is None.
118120
119121 Returns:
120122 ContextManager[TextIOBase]: A context manager yielding a text stream.
@@ -165,8 +167,8 @@ def ags_to_dfs(
165167 """Converts AGS 3 or AGS 4 file to a dictionary of pandas DataFrames.
166168
167169 Args:
168- source (str | Path | IO[str] | IO[bytes] | bytes): The AGS file (str or Path) or a file-like
169- object that represents and AGS file.
170+ source (str | Path | IO[str] | IO[bytes] | bytes): The AGS file (str or Path)
171+ or a file-like object that represents the AGS file.
170172 encoding (str): default=None
171173 Encoding of text file, an attempt at detecting the encoding will be made if `None`
172174
@@ -215,12 +217,14 @@ def ags_to_dfs(
215217 return ags_dfs
216218
217219
218- def ags3_to_dfs (source : str , encoding : str ) -> Dict [str , pd .DataFrame ]:
220+ def ags3_to_dfs (
221+ source : str | Path | IO [str ] | IO [bytes ] | bytes , encoding : str
222+ ) -> Dict [str , pd .DataFrame ]:
219223 """Converts AGS 3 data to a dictionary of pandas DataFrames.
220224
221225 Args:
222- source (str | Path | IO[str] | IO[bytes] | bytes): The AGS3 file (str or Path) or a file-like
223- object that represents and AGS3 file.
226+ source (str | Path | IO[str] | IO[bytes] | bytes): The AGS 3 file (str or Path)
227+ or a file-like object that represents the AGS 3 file.
224228 encoding (str): Encoding of file or object.
225229
226230 Returns:
@@ -235,76 +239,75 @@ def ags3_to_dfs(source: str, encoding: str) -> Dict[str, pd.DataFrame]:
235239 group_data : List [List [Any ]] = [[], [], []]
236240
237241 with read_ags_source (source , encoding = encoding ) as file :
238- ags3_data = file .read ().splitlines ()
239-
240- for i , line in enumerate (ags3_data ):
241- last_line_type = line_type
242-
243- # In AGS 3.1 group names are prefixed with **
244- if line .startswith ('"**' ):
245- line_type = "group_name"
246- if group :
247- ags3_dfs [group ] = pd .DataFrame (group_data , columns = headers )
248-
249- group = line .strip (' ,"*' )
250- group_data = []
251-
252- # In AGS 3 header names are prefixed with "*
253- elif line .startswith ('"*' ):
254- line_type = "headers"
255- new_headers = line .split ('","' )
256- new_headers = [h .strip (' ,"*' ) for h in new_headers ]
257-
258- # Some groups have so many headers that they span multiple lines.
259- # Therefore we need to check whether the new headers are
260- # a continuation of the previous headers from the last line.
261- if line_type == last_line_type :
262- headers = headers + new_headers
263- else :
264- headers = new_headers
265-
266- # Skip lines where group units are defined, these are defined in the AGS 3 data dictionary.
267- elif line .startswith ('"<UNITS>"' ):
268- line_type = "units"
269- continue
270-
271- # The rest of the lines contain:
272- # 1. GI data
273- # 2. a continuation of the previous line. These lines contain "<CONT>" in the first column.
274- # 3. are empty or contain worthless data
275- else :
276- line_type = "data_row"
277- data_row = line .split ('","' )
278- if len ("" .join (data_row )) == 0 :
279- # print(f"Line {i} is empty. Last Group: {group}")
280- continue
281- elif len (data_row ) != len (headers ):
282- print (
283- f"\n 🚨 CAUTION: The number of columns on line { i + 1 } ({ len (data_row )} ) doesn't match the number of columns of group { group } ({ len (headers )} )!" ,
284- f"{ group } headers: { headers } " ,
285- f"Line { i + 1 } : { data_row } " ,
286- sep = "\n " ,
287- end = "\n \n " ,
288- )
242+ for i , line in enumerate (file ):
243+ line = line .strip ()
244+ last_line_type = line_type
245+
246+ # In AGS 3.1 group names are prefixed with **
247+ if line .startswith ('"**' ):
248+ line_type = "group_name"
249+ if group :
250+ ags3_dfs [group ] = pd .DataFrame (group_data , columns = headers )
251+
252+ group = line .strip (' ,"*' )
253+ group_data = []
254+
255+ # In AGS 3 header names are prefixed with "*
256+ elif line .startswith ('"*' ):
257+ line_type = "headers"
258+ new_headers = line .split ('","' )
259+ new_headers = [h .strip (' ,"*' ) for h in new_headers ]
260+
261+ # Some groups have so many headers that they span multiple lines.
262+ # Therefore we need to check whether the new headers are
263+ # a continuation of the previous headers from the last line.
264+ if line_type == last_line_type :
265+ headers = headers + new_headers
266+ else :
267+ headers = new_headers
268+
269+ # Skip lines where group units are defined, these are defined in the AGS 3 data dictionary.
270+ elif line .startswith ('"<UNITS>"' ):
271+ line_type = "units"
289272 continue
290- # Append continued lines (<CONT>) to the last data_row
291- elif data_row [0 ] == '"<CONT>' :
292- last_data_row = group_data [- 1 ]
293- for j , data in enumerate (data_row ):
294- data = data .strip (' "' )
295- if data and data != "<CONT>" :
296- if last_data_row [j ] is None :
297- # Last data row didn't contain data for this column
298- last_data_row [j ] = coerce_string (data )
299- else :
300- # Last data row already contains data for this column
301- last_data_row [j ] = str (last_data_row [j ]) + data
302- # Lines that are assumed to contain valid data are added to the group data
273+
274+ # The rest of the lines contain:
275+ # 1. GI data
276+ # 2. a continuation of the previous line. These lines contain "<CONT>" in the first column.
277+ # 3. are empty or contain worthless data
303278 else :
304- cleaned_data_row = []
305- for data in data_row :
306- cleaned_data_row .append (coerce_string (data .strip (' "' )))
307- group_data .append (cleaned_data_row )
279+ line_type = "data_row"
280+ data_row = line .split ('","' )
281+ if len ("" .join (data_row )) == 0 :
282+ # print(f"Line {i} is empty. Last Group: {group}")
283+ continue
284+ elif len (data_row ) != len (headers ):
285+ print (
286+ f"\n 🚨 CAUTION: The number of columns on line { i + 1 } ({ len (data_row )} ) doesn't match the number of columns of group { group } ({ len (headers )} )!" ,
287+ f"{ group } headers: { headers } " ,
288+ f"Line { i + 1 } : { data_row } " ,
289+ sep = "\n " ,
290+ end = "\n \n " ,
291+ )
292+ continue
293+ # Append continued lines (<CONT>) to the last data_row
294+ elif data_row [0 ] == '"<CONT>' :
295+ last_data_row = group_data [- 1 ]
296+ for j , data in enumerate (data_row ):
297+ data = data .strip (' "' )
298+ if data and data != "<CONT>" :
299+ if last_data_row [j ] is None :
300+ # Last data row didn't contain data for this column
301+ last_data_row [j ] = coerce_string (data )
302+ else :
303+ # Last data row already contains data for this column
304+ last_data_row [j ] = str (last_data_row [j ]) + data
305+ # Lines that are assumed to contain valid data are added to the group data
306+ else :
307+ cleaned_data_row = []
308+ for data in data_row :
309+ cleaned_data_row .append (coerce_string (data .strip (' "' )))
310+ group_data .append (cleaned_data_row )
308311
309312 # Also add the last group's df to the dictionary of AGS dfs
310313 ags3_dfs [group ] = pd .DataFrame (group_data , columns = headers ).dropna (
@@ -320,14 +323,13 @@ def ags3_to_dfs(source: str, encoding: str) -> Dict[str, pd.DataFrame]:
320323
321324
322325def ags4_to_dfs (
323- source : str | Path | IO [str ] | IO [bytes ] | bytes
326+ source : str | Path | IO [str ] | IO [bytes ] | bytes ,
324327) -> Dict [str , pd .DataFrame ]:
325328 """Converts AGS 4 data to a dictionary of pandas DataFrames.
326329
327330 Args:
328331 source (str | Path | IO[str] | IO[bytes] | bytes): The AGS4 file (str or Path) or a file-like
329332 object that represents and AGS4 file.
330- encoding (str): Encoding of file or object.
331333
332334 Returns:
333335 Dict[str, pd.DataFrame]: A dictionary of pandas DataFrames, where each key represents a group name from AGS 4 data,
0 commit comments