88
99from . ingredient import *
1010from . dag import DAG , IngredientNode , ProcedureNode
11+ from . helpers import read_opt
1112from .. import config
1213from . procedure import *
13- from .. str import format_float_digits
1414
1515import logging
1616
@@ -26,9 +26,30 @@ def _loadfile(f):
2626
2727
2828# functions for reading/running recipe
29- def build_recipe (recipe_file , to_disk = False ):
30- """build a complete recipe file if there are includes in
31- recipe file, if no includes found than return the file as is.
29+ def build_recipe (recipe_file , to_disk = False , ** kwargs ):
30+ """build a complete recipe object.
31+
32+ This function will check each part of recipe, convert string (the ingredient ids,
33+ dictionaries file names) into actual objects.
34+
35+ If there are includes in recipe file, this function will run recurivly.
36+ If no includes found then return the parsed object as is.
37+
38+ Parameters
39+ ----------
40+ recipe_file : `str`
41+ path to recipe file
42+
43+ Keyword Args
44+ ------------
45+ to_disk : bool
46+ if true, save the parsed reslut to a yaml file in working dir
47+
48+ Other Parameters
49+ ----------------
50+ ddf_dir : `str`
51+ path to search for DDF datasets, will overwrite the contfig in recipe
52+
3253 """
3354 recipe = _loadfile (recipe_file )
3455
@@ -64,6 +85,12 @@ def build_recipe(recipe_file, to_disk=False):
6485
6586 recipe ['cooking' ][p ][i ]['options' ]['dictionary' ] = _loadfile (path )
6687
88+ # setting ddf search path if option is provided
89+ if 'ddf_dir' in kwargs .keys ():
90+ if 'config' not in recipe .keys ():
91+ recipe ['config' ] = AttrDict ()
92+ recipe .config .ddf_dir = kwargs ['ddf_dir' ]
93+
6794 if 'include' not in recipe .keys ():
6895 return recipe
6996 else : # append sub-recipe entities into main recipe
@@ -156,6 +183,10 @@ def check_dataset_availability(recipe):
156183
157184
158185def build_dag (recipe ):
186+ """build a DAG model for the recipe.
187+
188+ For more detail for DAG model, see :py:mod:`ddf_utils.chef.dag`.
189+ """
159190
160191 def add_dependency (dag , upstream_id , downstream ):
161192 if not dag .has_task (upstream_id ):
@@ -203,9 +234,11 @@ def add_dependency(dag, upstream_id, downstream):
203234 if not dag .has_task (i ):
204235 raise ValueError ('Ingredient not found: ' + i )
205236 if 'serving' in recipe .keys ():
237+ if len (serving ) > 0 :
238+ raise ValueError ('can not have serve procedure and serving section at same time!' )
206239 for i in recipe ['serving' ]:
207- if not dag .has_task (i ):
208- raise ValueError ('Ingredient not found: ' + i )
240+ if not dag .has_task (i [ 'id' ] ):
241+ raise ValueError ('Ingredient not found: ' + i [ 'id' ] )
209242 # display the tree
210243 # dag.tree_view()
211244 return dag
@@ -215,7 +248,25 @@ def run_recipe(recipe):
215248 """run the recipe.
216249
217250 returns a dictionary. keys are `concepts`, `entities` and `datapoints`,
218- and values are ingredients return by the procedures
251+ and values are ingredients defined in the `serve` procedures or `serving` section.
252+ for example:
253+
254+ .. code-block:: python
255+
256+ {
257+ "concepts": [{"ingredient": DataFrame1, "options": None}]
258+ "datapoints": [
259+ {
260+ "ingredient": DataFrame2,
261+ "options": {"digits": 5}
262+ },
263+ {
264+ "ingredient": DataFrame3,
265+ "options": {"digits": 1}
266+ },
267+ ]
268+ }
269+
219270 """
220271 try :
221272 config .DDF_SEARCH_PATH = recipe ['config' ]['ddf_dir' ]
@@ -242,62 +293,28 @@ def run_recipe(recipe):
242293 func = p ['procedure' ]
243294 if func == 'serve' :
244295 ingredients = [dag .get_task (x ).evaluate () for x in p ['ingredients' ]]
245- [dishes [k ].append (i ) for i in ingredients ]
296+ opts = read_opt (p , 'options' , default = dict ())
297+ [dishes [k ].append ({'ingredient' : i , 'options' : opts }) for i in ingredients ]
246298 continue
247299 out = dag .get_task (p ['result' ]).evaluate ()
248300 # if there is no seving procedures/section, use the last output Ingredient object as final result.
249301 if len (dishes [k ]) == 0 and 'serving' not in recipe .keys ():
250302 logger .warning ('serving last procedure output for {}: {}' .format (k , out .ingred_id ))
251- dishes [k ].append (out )
303+ dishes [k ].append ({ 'ingredient' : out , 'options' : dict ()} )
252304 # update dishes when there is serving section
253305 if 'serving' in recipe .keys ():
254306 for i in recipe ['serving' ]:
255- ing = dag .get_task (i ).evaluate ()
307+ opts = read_opt (i , 'options' , default = dict ())
308+ ing = dag .get_task (i ['id' ]).evaluate ()
256309 if ing .dtype in dishes .keys ():
257- dishes [ing .dtype ].append (ing )
310+ dishes [ing .dtype ].append ({ 'ingredient' : ing , 'options' : opts } )
258311 else :
259- dishes [ing .dtype ] = [ing ]
312+ dishes [ing .dtype ] = [{ 'ingredient' : ing , 'options' : opts } ]
260313 return dishes
261314
262315
263316def dish_to_csv (dishes , outpath ):
317+ """save the recipe output to disk"""
264318 for t , ds in dishes .items ():
265319 for dish in ds :
266- all_data = dish .get_data ()
267- if isinstance (all_data , dict ):
268- for k , df in all_data .items ():
269- # change boolean into string
270- for i , v in df .dtypes .iteritems ():
271- if v == 'bool' :
272- df [i ] = df [i ].map (lambda x : str (x ).upper ())
273- if t == 'datapoints' :
274- by = dish .key_to_list ()
275- path = os .path .join (outpath , 'ddf--{}--{}--by--{}.csv' .format (t , k , '--' .join (by )))
276- elif t == 'concepts' :
277- path = os .path .join (outpath , 'ddf--{}.csv' .format (t ))
278- elif t == 'entities' :
279- domain = dish .key [0 ]
280- if k == domain :
281- path = os .path .join (outpath , 'ddf--{}--{}.csv' .format (t , k ))
282- else :
283- path = os .path .join (outpath , 'ddf--{}--{}--{}.csv' .format (t , domain , k ))
284- else :
285- raise ValueError ('Not a correct collection: ' + t )
286-
287- if t == 'datapoints' :
288- df = df .set_index (by )
289- if not np .issubdtype (df [k ].dtype , np .number ):
290- try :
291- df [k ] = df [k ].astype (float )
292- # TODO: make floating precision an option
293- df [k ] = df [k ].map (lambda x : format_float_digits (x , 5 ))
294- except ValueError :
295- logging .warning ("data not numeric: " + k )
296- else :
297- df [k ] = df [k ].map (lambda x : format_float_digits (x , 5 ))
298- df [[k ]].to_csv (path , encoding = 'utf8' )
299- else :
300- df .to_csv (path , index = False , encoding = 'utf8' )
301- else :
302- path = os .path .join (outpath , 'ddf--{}.csv' .format (t ))
303- all_data .to_csv (path , index = False , encoding = 'utf8' )
320+ dish ['ingredient' ].serve (outpath , ** dish ['options' ])
0 commit comments