22Contains the command to convert XML files to JSON files. 
33""" 
44
5- import  json 
5+ import  re 
6+ from  itertools  import  groupby 
67from  pathlib  import  Path 
8+ from  typing  import  Iterator , Literal 
79
810import  typer 
9- from  pydantic  import  RootModel 
1011from  typing_extensions  import  Annotated 
1112
1213from  fundamend  import  AhbReader , Anwendungshandbuch , MessageImplementationGuide , MigReader 
14+ from  fundamend .commands .app  import  app 
15+ from  fundamend .sanitize  import  sanitize_ahb 
1316
14- app  =  typer . Typer ( )
17+ FORMAT_AND_TYPE_REGEX  =  re . compile ( r"^([A-Z]+)_(AHB|MIG)_(?:(Gas|Strom)_)?" )
1518
1619
17- def  _convert_to_json_file (xml_file_path : Path ) ->  Path :
20+ def  _write_ahb_models_splitted (
21+     model : Anwendungshandbuch ,
22+     ahb_dir : Path ,
23+     * ,
24+     compressed : bool  =  False ,
25+ ) ->  None :
26+     """Writes the given Anwendungshandbuch model to multiple JSON files, one for each Anwendungsfall.""" 
27+     ahb_dir .mkdir (parents = True , exist_ok = True )
28+     for  anwendungsfall  in  model .anwendungsfaelle :
29+         json_file_path  =  ahb_dir  /  f"{ anwendungsfall .pruefidentifikator }  
30+         with  open (json_file_path , encoding = "utf-8" , mode = "w" ) as  outfile :
31+             outfile .write (anwendungsfall .model_dump_json (indent = None  if  compressed  else  2 ))
32+ 
33+     # Write meta file 
34+     ahb_meta_file_path  =  ahb_dir  /  "meta.json" 
35+     with  open (ahb_meta_file_path , encoding = "utf-8" , mode = "w" ) as  outfile :
36+         outfile .write (model .model_dump_json (exclude = {"anwendungsfaelle" }, indent = None  if  compressed  else  2 ))
37+ 
38+ 
39+ def  _write_model_to_json_file (
40+     model : Anwendungshandbuch  |  MessageImplementationGuide ,
41+     xml_file_path : Path ,
42+     * ,
43+     compressed : bool  =  False ,
44+     split_ahb : bool  =  False ,
45+ ) ->  None :
46+     """Writes the given model to a JSON file at the specified path.""" 
47+     if  split_ahb :
48+         if  not  isinstance (model , Anwendungshandbuch ):
49+             raise  ValueError ("split_ahb can only be used with Anwendungshandbuch models" )
50+         ahb_dir  =  xml_file_path .with_suffix ("" )
51+         _write_ahb_models_splitted (model , ahb_dir , compressed = compressed )
52+         typer .echo (f"Successfully converted { xml_file_path } { ahb_dir }  )
53+     else :
54+         json_file_path  =  xml_file_path .with_suffix (".json" )
55+         with  open (json_file_path , encoding = "utf-8" , mode = "w" ) as  outfile :
56+             outfile .write (model .model_dump_json (indent = None  if  compressed  else  2 ))
57+         typer .echo (f"Successfully converted { xml_file_path } { json_file_path }  )
58+ 
59+ 
60+ def  _convert_to_json_files (
61+     mig_xml_file_path : Path , ahb_xml_file_path : Path , sanitize : bool  =  False 
62+ ) ->  tuple [MessageImplementationGuide , Anwendungshandbuch ]:
1863    """converts the given XML file to a JSON file and returns the path of the latter""" 
19-     if  not  xml_file_path .is_file ():
20-         raise  ValueError (f"The given path { xml_file_path .absolute ()}  )
21-     is_ahb  =  "ahb"  in  xml_file_path .stem .lower ()
22-     is_mig  =  "mig"  in  xml_file_path .stem .lower ()
23-     if  is_ahb  and  is_mig :
24-         raise  ValueError (f"Cannot detect if { xml_file_path }  )
25-     root_model : RootModel [Anwendungshandbuch ] |  RootModel [MessageImplementationGuide ]
26-     if  is_ahb :
27-         ahb_model  =  AhbReader (xml_file_path ).read ()
28-         root_model  =  RootModel [Anwendungshandbuch ](ahb_model )
29-     elif  is_mig :
30-         mig_model  =  MigReader (xml_file_path ).read ()
31-         root_model  =  RootModel [MessageImplementationGuide ](mig_model )
64+     if  not  mig_xml_file_path .is_file ():  # pragma: no cover 
65+         raise  ValueError (f"The given path { mig_xml_file_path .absolute ()}  )
66+     if  not  ahb_xml_file_path .is_file ():  # pragma: no cover 
67+         raise  ValueError (f"The given path { ahb_xml_file_path .absolute ()}  )
68+ 
69+     mig_model  =  MigReader (mig_xml_file_path ).read ()
70+     ahb_model  =  AhbReader (ahb_xml_file_path ).read ()
71+ 
72+     # Do sanitization if requested 
73+     if  sanitize :
74+         sanitize_ahb (mig_model , ahb_model )
75+ 
76+     return  mig_model , ahb_model 
77+ 
78+ 
79+ def  xml2json_dir_mode (
80+     xml_path : Path , sanitize : bool  =  False , compressed : bool  =  False , split_ahb : bool  =  False 
81+ ) ->  None :
82+     """ 
83+     Converts all XML files in the given directory to JSON files. 
84+     The function expects to find pairs of MIG and AHB XML files in the directory. 
85+     The XML file names must match the pattern `<FORMAT>_<AHB|MIG>_[<Gas|Strom>_]*.xml`. 
86+     """ 
87+ 
88+     def  groupby_key (path_and_match : tuple [Path , re .Match [str ] |  None ]) ->  str :
89+         assert  path_and_match [1 ] is  not None 
90+         return  path_and_match [1 ].group (1 ) +  (path_and_match [1 ].group (3 ) or  "" )
91+ 
92+     def  sort_key (path_and_match : tuple [Path , re .Match [str ] |  None ]) ->  str :
93+         assert  path_and_match [1 ] is  not None 
94+         return  groupby_key (path_and_match ) +  path_and_match [1 ].group (2 )
95+ 
96+     def  xmls_and_matches () ->  Iterator [tuple [Path , re .Match [str ]]]:
97+         for  _xml_path  in  xml_path .rglob ("*.xml" ):
98+             match  =  FORMAT_AND_TYPE_REGEX .match (_xml_path .name )
99+             if  match  is  None :  # pragma: no cover 
100+                 raise  ValueError ("XML file name does not match expected format: "  +  str (_xml_path ))
101+             yield  _xml_path , match 
102+ 
103+     for  _ , _xmls_and_matches  in  groupby (sorted (xmls_and_matches (), key = sort_key ), key = groupby_key ):
104+         _xmls_and_matches_list  =  list (_xmls_and_matches )
105+         assert  len (_xmls_and_matches_list ) ==  2 , (
106+             "Expected exactly two XML files (AHB + MIG) for each format and powert type, but found: " 
107+             f"{ _xmls_and_matches_list }  
108+         )
109+         assert  (
110+             _xmls_and_matches_list [0 ][1 ].group (2 ) ==  "AHB"  and  _xmls_and_matches_list [1 ][1 ].group (2 ) ==  "MIG" 
111+         ), f"Expected AHB on first and a MIG on second position, but found: { _xmls_and_matches_list }  
112+         ahb_path  =  _xmls_and_matches_list [0 ][0 ]
113+         mig_path  =  _xmls_and_matches_list [1 ][0 ]
114+         mig , ahb  =  _convert_to_json_files (mig_path , ahb_path , sanitize = sanitize )
115+         _write_model_to_json_file (mig , mig_path .with_suffix (".json" ), compressed = compressed )
116+         _write_model_to_json_file (ahb , ahb_path .with_suffix (".json" ), compressed = compressed , split_ahb = split_ahb )
117+ 
118+ 
119+ def  xml2json_file_mode (
120+     xml_path : Path , sanitize : bool  =  False , compressed : bool  =  False , split_ahb : bool  =  False 
121+ ) ->  None :
122+     """ 
123+     Converts a single XML file to JSON. 
124+     The function expects to find the corresponding AHB or MIG file in the same directory. 
125+ 
126+     The XML file names must match the pattern `<FORMAT>_<AHB|MIG>_[<Gas|Strom>_]*.xml`. 
127+     """ 
128+     match  =  FORMAT_AND_TYPE_REGEX .match (xml_path .name )
129+     if  match  is  None :  # pragma: no cover 
130+         raise  ValueError ("XML file name does not match expected format: "  +  str (xml_path ))
131+     match_type : Literal ["MIG" , "AHB" ] =  match .group (2 )  # type: ignore[assignment] 
132+     match_type_other : Literal ["MIG" , "AHB" ] =  "AHB"  if  match_type  ==  "MIG"  else  "MIG" 
133+     pattern_other  =  f"{ match .group (1 )} { match_type_other }  
134+     if  match .group (3 ) is  not None :
135+         pattern_other  +=  f"{ match .group (3 )}  
136+     pattern_other  +=  "*.xml" 
137+ 
138+     other_matches  =  list (xml_path .parent .glob (pattern_other ))
139+     if  len (other_matches ) ==  0 :  # pragma: no cover 
140+         raise  ValueError (
141+             f"No other XML file found in the same directory as { xml_path } { pattern_other }  
142+         )
143+     if  len (other_matches ) >  1 :  # pragma: no cover 
144+         raise  ValueError (
145+             f"Multiple other XML files found in the same directory as { xml_path }  
146+             f"{ pattern_other } { other_matches }  
147+         )
148+     if  match_type  ==  "MIG" :
149+         mig , ahb  =  _convert_to_json_files (xml_path , other_matches [0 ], sanitize = sanitize )
150+         _write_model_to_json_file (mig , xml_path .with_suffix (".json" ), compressed = compressed )
32151    else :
33-         raise  ValueError (f"Seems like { xml_file_path }  )
34-     out_dict  =  root_model .model_dump (mode = "json" )
35-     json_file_path  =  xml_file_path .with_suffix (".json" )
36-     with  open (json_file_path , encoding = "utf-8" , mode = "w" ) as  outfile :
37-         json .dump (out_dict , outfile , indent = True , ensure_ascii = False )
38-     typer .echo (f"Successfully converted { xml_file_path } { json_file_path }  )
39-     return  json_file_path 
152+         mig , ahb  =  _convert_to_json_files (other_matches [0 ], xml_path , sanitize = sanitize )
153+         _write_model_to_json_file (ahb , xml_path .with_suffix (".json" ), compressed = compressed , split_ahb = split_ahb )
40154
41155
42156@app .command () 
43157def  xml2json (
44158    xml_path : Annotated [
45159        Path ,
46160        typer .Option (
161+             ...,
162+             "--xml-path" ,
163+             "-p" ,
47164            exists = True ,
48165            file_okay = True ,
49166            dir_okay = True ,
@@ -52,13 +169,50 @@ def xml2json(
52169            resolve_path = True ,
53170        ),
54171    ],
172+     sanitize : Annotated [
173+         bool ,
174+         typer .Option (
175+             ...,
176+             "--sanitize" ,
177+             "-s" ,
178+             help = "Sanitize the MIG or AHB before writing the resulting JSON. As of now, it does two things:\n " 
179+             '1) Data elements or groups which are stated as "unused" in the MIG are missing in the AHB. ' 
180+             "The sanitization will add them to the AHB to enable easy parallel iteration over MIG and AHB. \n " 
181+             "2) The five data elements C_C080 D_3036 model names. But in AHB there is only one D_3036 with " 
182+             'description "Name". The sanitization will add four extra D_3036 data elements to prevent reading' 
183+             "raster errors." ,
184+         ),
185+     ] =  False ,
186+     compressed : Annotated [
187+         bool ,
188+         typer .Option (
189+             ...,
190+             "--compressed" ,
191+             "-c" ,
192+             help = "If set, the output JSON files will contain no whitespace outside of strings. If not set" 
193+             " (default), the output JSON files will be pretty-printed with an indentation of one space." ,
194+         ),
195+     ] =  False ,
196+     split_ahb : Annotated [
197+         bool ,
198+         typer .Option (
199+             ...,
200+             "--split-ahb" ,
201+             "-a" ,
202+             help = "If set, the AHB will be split into multiple files, one for each Anwendungsfall. " 
203+             "The files will be named `<Prüfidentifikator>.json` in a directory named after the AHB file's " 
204+             "name (without the extension). It will contain an additional `meta.json` file containing the fields of " 
205+             "`Anwendungshandbuch` except for `anwendungsfaelle`." ,
206+         ),
207+     ] =  False ,
55208) ->  None :
56209    """ 
57-     converts the xml file from xml_in_path to a json file next to the .xml 
210+     Converts the xml file(s) from `xml_in_path` to a json file next to the `*.xml`. 
211+     If `xml_in_path` is a directory, it will search for all XML files in the directory and its subdirectories. 
212+ 
213+     All xml files must follow the naming convention `/^(?P<FORMAT>[A-Z]+)_(AHB|MIG)_((Gas|Strom)_)?.*\\ .xml$/` 
58214    """ 
59-     assert  xml_path .exists ()  # ensured by typer 
60215    if  xml_path .is_dir ():
61-         for  _xml_path  in  xml_path .rglob ("*.xml" ):
62-             _convert_to_json_file (_xml_path )
216+         xml2json_dir_mode (xml_path , sanitize = sanitize , compressed = compressed , split_ahb = split_ahb )
63217    else :
64-         _convert_to_json_file (xml_path )
218+         xml2json_file_mode (xml_path ,  sanitize = sanitize ,  compressed = compressed ,  split_ahb = split_ahb )
0 commit comments