@@ -2,6 +2,8 @@ use pyo3::prelude::*;
22use pyo3:: types:: { PyDict , PyList , PyBool } ;
33use parquet:: file:: reader:: { FileReader , SerializedFileReader } ;
44use std:: { fs:: File , path:: Path } ;
5+ use parquet:: record:: Row ;
6+ use pyo3:: exceptions:: PyStopIteration ;
57use serde_json:: Value ;
68
79struct PyValue ( Value ) ;
@@ -74,6 +76,48 @@ fn to_json_str(path: &str) -> PyResult<String> {
7476 }
7577}
7678
79+ #[ pyclass]
80+ struct ParquetRowIterator {
81+ // FIXME: This should be a RowIter instead
82+ iter : std:: vec:: IntoIter < Row >
83+ }
84+
85+ #[ pymethods]
86+ impl ParquetRowIterator {
87+ #[ new]
88+ fn new ( path : & str ) -> PyResult < Self > {
89+ let file_path = Path :: new ( path) ;
90+ let file = File :: open ( & file_path) . map_err ( |e| pyo3:: exceptions:: PyIOError :: new_err ( e. to_string ( ) ) ) ?;
91+ let reader = SerializedFileReader :: new ( file) . map_err ( |e| pyo3:: exceptions:: PyValueError :: new_err ( e. to_string ( ) ) ) ?;
92+
93+ // FIXME: This should be a RowIter instead of rendering vec and then providing an iterator!!!
94+ Ok ( Self { iter : reader. get_row_iter ( None ) . unwrap ( ) . map ( |r| r. unwrap ( ) ) . collect :: < Vec < _ > > ( ) . into_iter ( ) } )
95+ }
96+
97+ fn __iter__ ( slf : PyRef < Self > ) -> PyRef < Self > {
98+ slf
99+ }
100+
101+ fn __next__ ( mut slf : PyRefMut < Self > ) -> PyResult < PyObject > {
102+ let row = slf. iter . next ( ) . ok_or_else ( || PyErr :: new :: < PyStopIteration , _ > ( "End of iterator" ) ) ?;
103+ let row_dict = row. to_json_value ( ) ;
104+ let dict = PyDict :: new_bound ( slf. py ( ) ) ;
105+ for ( key, value) in row_dict. as_object ( ) . unwrap ( ) {
106+ dict. set_item ( key, PyValue ( value. clone ( ) ) ) ?;
107+ }
108+ Ok ( dict. into ( ) )
109+ }
110+ }
111+
112+ #[ pyfunction]
113+ fn to_iter ( path : & str ) -> PyResult < ParquetRowIterator > {
114+ let file_path = Path :: new ( path) ;
115+ let file = File :: open ( & file_path) . map_err ( |e| pyo3:: exceptions:: PyIOError :: new_err ( e. to_string ( ) ) ) ?;
116+ let reader = SerializedFileReader :: new ( file) . map_err ( |e| pyo3:: exceptions:: PyValueError :: new_err ( e. to_string ( ) ) ) ?;
117+
118+ Ok ( ParquetRowIterator { iter : reader. get_row_iter ( None ) . unwrap ( ) . map ( |r| r. unwrap ( ) ) . collect :: < Vec < _ > > ( ) . into_iter ( ) } )
119+ }
120+
77121#[ pyfunction]
78122fn to_list ( path : & str , py : Python ) -> PyResult < PyObject > {
79123 let file_path = Path :: new ( path) ;
@@ -101,5 +145,7 @@ fn to_list(path: &str, py: Python) -> PyResult<PyObject> {
101145fn parq ( m : & Bound < ' _ , PyModule > ) -> PyResult < ( ) > {
102146 m. add_function ( wrap_pyfunction ! ( to_json_str, m) ?) ?;
103147 m. add_function ( wrap_pyfunction ! ( to_list, m) ?) ?;
148+ m. add_function ( wrap_pyfunction ! ( to_iter, m) ?) ?;
149+ m. add_class :: < ParquetRowIterator > ( ) ?;
104150 Ok ( ( ) )
105151}
0 commit comments