1+ import uuid
12from typing import TYPE_CHECKING , Any
23
34from pandas import DataFrame
1213
1314class Pipe :
1415 def __init__ (self , session : "Session" , * , default_rows_limit : int = 1000 ):
15- self .__session = session
16- self .__default_rows_limit = default_rows_limit
16+ self ._session = session
17+ self ._default_rows_limit = default_rows_limit
1718
18- self ._data_materialized = False
1919 self ._data_materialized_rows : int | None = None
2020 self ._data_result : ExecutionResult | None = None
21+
2122 self ._visualization_materialized = False
2223 self ._visualization_result : VisualisationResult | None = None
2324 self ._visualization_request : str | None = None
2425
26+ # N.B. Pipes/Threads are currently append-only and cannot be "forked".
27+ self ._opas_processed_count = 0
2528 self ._opas : list [Opa ] = []
2629 self ._meta : dict [str , Any ] = {}
2730
28- def __materialize_data (self , rows_limit : int | None ) -> "ExecutionResult" :
29- rows_limit = rows_limit if rows_limit else self .__default_rows_limit
30- if not self ._data_materialized or rows_limit != self ._data_materialized_rows :
31- # Execute each opa individually, keeping the last result
32- for opa in self ._opas :
33- self ._data_result = self .__session .executor .execute (
34- self .__session , opa , rows_limit = rows_limit , cache_scope = str (id (self ))
31+ self ._cache_scope = f"{ self ._session .name } /{ uuid .uuid4 ()} "
32+
33+ def _materialize_data (self , rows_limit : int | None ) -> "ExecutionResult" :
34+ # TODO Recompute on rows_limit change without recomputing the last Opa
35+ rows_limit = rows_limit if rows_limit else self ._default_rows_limit
36+ new_opas = self ._opas [self ._opas_processed_count :]
37+ if len (new_opas ) > 0 or rows_limit != self ._data_materialized_rows :
38+ for opa in new_opas :
39+ self ._data_result = self ._session .executor .execute (
40+ self ._session , opa , rows_limit = rows_limit , cache_scope = self ._cache_scope
3541 )
3642 self ._meta .update (self ._data_result .meta )
37- self ._data_materialized = True
43+ self ._opas_processed_count += len ( new_opas )
3844 self ._data_materialized_rows = rows_limit
3945 if self ._data_result is None :
40- raise RuntimeError ("__data_result is None after materialization" )
46+ raise RuntimeError ("_data_result is None after materialization" )
4147 return self ._data_result
4248
43- def __materialize_visualization (self , request : str | None , rows_limit : int | None ) -> "VisualisationResult" :
44- data = self .__materialize_data (rows_limit )
49+ def _materialize_visualization (self , request : str | None , rows_limit : int | None ) -> "VisualisationResult" :
50+ data = self ._materialize_data (rows_limit )
4551 if not self ._visualization_materialized or request != self ._visualization_request :
4652 # TODO Cache visualization results as in Executor.execute()?
47- self ._visualization_result = self .__session .visualizer .visualize (request , data )
53+ self ._visualization_result = self ._session .visualizer .visualize (request , data )
4854 self ._visualization_materialized = True
4955 self ._visualization_request = request
5056 self ._meta .update (self ._visualization_result .meta )
5157 self ._meta ["plot_code" ] = self ._visualization_result .code # maybe worth to expand as a property later
5258 if self ._visualization_result is None :
53- raise RuntimeError ("__visualization_result is None after materialization" )
59+ raise RuntimeError ("_visualization_result is None after materialization" )
5460 return self ._visualization_result
5561
5662 def df (self , * , rows_limit : int | None = None ) -> DataFrame | None :
57- return self .__materialize_data (rows_limit if rows_limit else self ._data_materialized_rows ).df
63+ return self ._materialize_data (rows_limit if rows_limit else self ._data_materialized_rows ).df
5864
5965 def plot (self , request : str | None = None , * , rows_limit : int | None = None ) -> "VisualisationResult" :
6066 # TODO Currently, we can't chain calls or maintain a "plot history": pipe.plot("red").plot("blue").
6167 # We have to do pipe.plot("red"), but then pipe.plot("blue") is independent of the first call.
62- return self .__materialize_visualization (request , rows_limit if rows_limit else self ._data_materialized_rows )
68+ return self ._materialize_visualization (request , rows_limit if rows_limit else self ._data_materialized_rows )
6369
6470 def text (self ) -> str :
65- return self .__materialize_data (self ._data_materialized_rows ).text
71+ return self ._materialize_data (self ._data_materialized_rows ).text
6672
6773 def __str__ (self ) -> str :
6874 return self .text ()
6975
7076 def ask (self , query : str ) -> "Pipe" :
7177 self ._opas .append (Opa (query = query ))
72- self ._data_materialized = False
7378 self ._visualization_materialized = False
7479 return self
7580
@@ -79,4 +84,4 @@ def meta(self) -> dict[str, Any]:
7984
8085 @property
8186 def code (self ) -> str | None :
82- return self .__materialize_data (self ._data_materialized_rows ).code
87+ return self ._materialize_data (self ._data_materialized_rows ).code
0 commit comments