Skip to content

Commit ad4bc7b

Browse files
authored
Merge pull request #1 from Keunyoung-Jung/dev
✅ Update : version 0.0.4
2 parents cb0e5f6 + 886a9ee commit ad4bc7b

File tree

5 files changed

+169
-7
lines changed

5 files changed

+169
-7
lines changed

ParquetLoader/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
__name__ = 'parquet-loader'
22
__description__ = 'Parquet file Load and Read from minio & S3'
3-
__version__ = '0.0.3'
3+
__version__ = '0.0.4'
44
__url__ = 'https://github.com/Keunyoung-Jung/ParquetLoader'
55
__download_url__ = 'https://github.com/Keunyoung-Jung/ParquetLoader'
66
__install_requires__ = [

ParquetLoader/loader.py

+55-3
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ def __init__(self,
1414
random_seed : int = int((time() - int(time()))*100000),
1515
columns : list = None,
1616
depth : int = 0,
17-
std_out: bool = True
17+
std_out: bool = True,
18+
filters: list = None
1819
):
1920
self.chunk_size = chunk_size
2021
self.cache = None
@@ -30,6 +31,8 @@ def __init__(self,
3031
self.fp_obj = None
3132
self.depth = depth
3233
self.random_seed = random_seed
34+
self.filters = filters
35+
self.check_filter()
3336

3437
if root_path == '.' :
3538
self.root_path = os.getcwd()
@@ -84,7 +87,9 @@ def close(self):
8487
else:
8588
raise RuntimeError("generator ignored GeneratorExit")
8689
def generator(self):
87-
for df in self.fp_obj.iter_row_groups(columns=self.select_columns):
90+
for df in self.fp_obj.iter_row_groups(filters=self.filters,columns=self.select_columns):
91+
if self.filters != None :
92+
df = self.filtering(df)
8893
if self.dataset is None :
8994
self.dataset = df
9095
else :
@@ -110,4 +115,51 @@ def generator(self):
110115
yield self.dataset
111116
self.dataset = None
112117
if self.std_out :
113-
print(self.counter,'data loaded complete!',end='\n')
118+
print(self.counter,'data loaded complete!',end='\n')
119+
120+
def filtering(self,df) :
121+
op = ''
122+
df_store = []
123+
for or_part in self.filters:
124+
tmp_df = df.copy()
125+
for and_part in or_part :
126+
col = and_part[0]
127+
op = and_part[1]
128+
val = and_part[2]
129+
if op == '==' or op == '=' :
130+
tmp_df = tmp_df[tmp_df[col] == val]
131+
elif op == '>' :
132+
tmp_df = tmp_df[tmp_df[col] > val]
133+
elif op == '>=' :
134+
tmp_df = tmp_df[tmp_df[col] >= val]
135+
elif op == '<' :
136+
tmp_df = tmp_df[tmp_df[col] < val]
137+
elif op == '<=' :
138+
tmp_df = tmp_df[tmp_df[col] <= val]
139+
elif op == '!=' :
140+
tmp_df = tmp_df[tmp_df[col] != val]
141+
elif op == 'in' :
142+
tmp_df = tmp_df[tmp_df[col] in val]
143+
elif op == 'not in' :
144+
tmp_df = tmp_df[tmp_df[col] not in val]
145+
df_store.append(tmp_df)
146+
concat_df = pd.concat(df_store)
147+
return concat_df.drop_duplicates()
148+
149+
def check_filter(self) :
150+
try :
151+
op = ''
152+
if self.filters != None :
153+
for or_part in self.filters :
154+
for and_part in or_part :
155+
op = and_part[1]
156+
if len(and_part) != 3 :
157+
raise IndexError
158+
if op not in ['==','=','>','>=','<','<=','!=','in','not in'] :
159+
raise ValueError
160+
except ValueError :
161+
print("ValueError :",f'"{op}" is wrong operator')
162+
exit()
163+
except IndexError :
164+
print("IndexError :",f'{self.filters} invalid filter')
165+
exit()

ParquetLoader/s3.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ def __init__(self,
1616
random_seed : int = int((time() - int(time()))*100000),
1717
columns : list = None,
1818
depth : int = 0,
19-
std_out: bool = True
19+
std_out: bool = True,
20+
filters: list = None
2021
):
2122
self.client_dict = None
2223
if s3_endpoint_url != '' or \
@@ -33,7 +34,8 @@ def __init__(self,
3334
random_seed=random_seed,
3435
columns=columns,
3536
depth=depth,
36-
std_out=std_out
37+
std_out=std_out,
38+
filters=filters
3739
)
3840

3941
def initialize(self) :

README.md

+39-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ dl = DataLoader(
6767
random_seed : int = int((time() - int(time()))*100000),
6868
columns : list = None,
6969
depth : int = 0,
70-
std_out: bool = True
70+
std_out: bool = True,
71+
filters: list = None
7172
)
7273
```
7374
* `chunk_size`
@@ -94,6 +95,9 @@ dl = DataLoader(
9495
* `std_out`
9596
* default : True
9697
* You can turn off output.
98+
* `filters`
99+
* It is used when you want get filtered dataframe, It must use 2 dim list
100+
* example : `[[("column","==",10)]]`
97101

98102
### 4.1. Select Columns
99103
`columns` param is taken as a list.
@@ -149,3 +153,37 @@ dl = S3Loader(
149153
* `s3_access_key` and `s3_secret_key`
150154
* you can set s3_access_key and s3_secret_key, but I don't recommend using it
151155
* it is recommended to use environment variables.
156+
157+
## 7. Get Filtered Dataframe
158+
It is used when you want get filtered dataframe, It must use 2 dim list
159+
It is built with a two-dimensional list construction. (Equal fastparquet filter)
160+
```python
161+
dl = S3Loader(
162+
bucket = 'test',
163+
folder = 'data',
164+
filters = [[[("col1",">",10)]]]
165+
)
166+
```
167+
The first list consists of an OR operation.
168+
```python
169+
# col > 10 or col2 in ["children","kids"]
170+
filters = [
171+
[("col1",">",10)],
172+
["col2","in",["children","kids"]]
173+
]
174+
```
175+
The second list consists of an AND operation.
176+
```python
177+
# col > 10 and col2 == "male"
178+
filters = [
179+
[("col1",">",10),("col2","==","male")]
180+
]
181+
```
182+
You can also mix the two to make a filter.
183+
```python
184+
# (col > 10 and col2 == "male") or col3 in ["children","kids"]
185+
filters = [
186+
[("col1",">",10),("col2","==","male")],
187+
["col3","in",["children","kids"]]
188+
]
189+
```

tests/filters.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
from ParquetLoader import S3Loader
2+
3+
def s3_data_loading():
4+
sl = S3Loader(
5+
chunk_size=10000,
6+
s3_endpoint_url='http://localhost:9000',
7+
s3_access_key='minio',
8+
s3_secret_key='minio123',
9+
bucket='test',
10+
folder="data",
11+
shuffle=False)
12+
print(sl.schema)
13+
for df in sl :
14+
print(df.tail(10))
15+
break
16+
17+
def filter_data():
18+
sl = S3Loader(
19+
chunk_size=10000,
20+
s3_endpoint_url='http://localhost:9000',
21+
s3_access_key='minio',
22+
s3_secret_key='minio123',
23+
bucket='test',
24+
folder="data",
25+
filters=[
26+
[("tf",">",10),("tf","<",13)],
27+
[("tf","==",1)]
28+
],
29+
shuffle=False)
30+
print(sl.schema)
31+
for df in sl :
32+
print(df.tail(10))
33+
break
34+
35+
def wrong_op():
36+
sl = S3Loader(
37+
chunk_size=10000,
38+
s3_endpoint_url='http://localhost:9000',
39+
s3_access_key='minio',
40+
s3_secret_key='minio123',
41+
bucket='test',
42+
folder="data",
43+
filters=[[("tf","%",10)]],
44+
shuffle=False)
45+
print(sl.schema)
46+
for df in sl :
47+
print(df.tail(10))
48+
break
49+
50+
def wrong_filter():
51+
sl = S3Loader(
52+
chunk_size=10000,
53+
s3_endpoint_url='http://localhost:9000',
54+
s3_access_key='minio',
55+
s3_secret_key='minio123',
56+
bucket='test',
57+
folder="data",
58+
filters=[
59+
[("tf",">")]],
60+
shuffle=False)
61+
print(sl.schema)
62+
for df in sl :
63+
print(df.tail(10))
64+
break
65+
66+
if __name__ == '__main__':
67+
s3_data_loading()
68+
filter_data()
69+
wrong_op()
70+
wrong_filter()

0 commit comments

Comments
 (0)