Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit cb0e5f6

Browse files
committedMar 4, 2022
✅ Update : version 0.0.3
1 parent cb175a1 commit cb0e5f6

File tree

5 files changed

+71
-7
lines changed

5 files changed

+71
-7
lines changed
 

‎ParquetLoader/__init__.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
__name__ = 'parquet-loader'
22
__description__ = 'Parquet file Load and Read from minio & S3'
3-
__version__ = '0.0.2'
3+
__version__ = '0.0.3'
44
__url__ = 'https://github.com/Keunyoung-Jung/ParquetLoader'
55
__download_url__ = 'https://github.com/Keunyoung-Jung/ParquetLoader'
66
__install_requires__ = [
7-
"pandas==1.2.0",
7+
"pandas<=1.2.0",
88
"fastparquet==0.8.0",
99
"s3fs"
1010
],

‎ParquetLoader/loader.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ def __init__(self,
3939
try :
4040
self.initialize()
4141
except IndexError as e :
42-
print(e)
42+
print("IndexError :",e)
4343
print(f'"{self.root_path}" may be incorrect or it may be an empty folder.')
44+
exit()
4445

4546
def initialize(self) :
4647
path = f'{self.root_path}/{self.folder}{"/*"*self.depth}/*.parquet'

‎ParquetLoader/s3.py

+33-3
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@
77
class S3Loader(DataLoader) :
88
def __init__(self,
99
chunk_size : int =100000,
10+
s3_endpoint_url: str = '',
11+
s3_access_key: str ='',
12+
s3_secret_key: str ='',
1013
bucket : str = '.',
1114
folder : str = 'data',
1215
shuffle : bool = True,
@@ -15,6 +18,13 @@ def __init__(self,
1518
depth : int = 0,
1619
std_out: bool = True
1720
):
21+
self.client_dict = None
22+
if s3_endpoint_url != '' or \
23+
s3_access_key != '' or \
24+
s3_secret_key != '' :
25+
self.client_dict = self.create_s3_client_dict(s3_endpoint_url,
26+
s3_access_key,
27+
s3_secret_key)
1828
super().__init__(
1929
chunk_size=chunk_size,
2030
root_path=bucket,
@@ -29,8 +39,12 @@ def __init__(self,
2939
def initialize(self) :
3040
path = f'{self.root_path}/{self.folder}{"/*"*self.depth}/*.parquet'
3141

32-
s3 = s3fs.S3FileSystem()
33-
fs = s3fs.core.S3FileSystem()
42+
if self.client_dict != None :
43+
s3 = s3fs.S3FileSystem(client_kwargs=self.client_dict)
44+
fs = s3fs.core.S3FileSystem(client_kwargs=self.client_dict)
45+
else :
46+
s3 = s3fs.S3FileSystem()
47+
fs = s3fs.core.S3FileSystem()
3448

3549
all_paths = fs.glob(path=path)
3650
if self.std_out :
@@ -49,4 +63,20 @@ def initialize(self) :
4963
if self.std_out :
5064
print(f'{len(all_paths)}files Initialize ... complete {round(time()-start_init_time,2)}sec')
5165
self.total_num = self.fp_obj.info['row_groups']
52-
self.shuffle_seed_list = random.Random(self.random_seed).sample(range(100000+self.total_num*10),self.total_num)
66+
self.shuffle_seed_list = random.Random(self.random_seed).sample(range(100000+self.total_num*10),self.total_num)
67+
68+
def create_s3_client_dict(self,
69+
s3_endpoint_url,
70+
s3_access_key,
71+
s3_secret_key
72+
):
73+
import os
74+
client_dict = {}
75+
if s3_endpoint_url != '' :
76+
client_dict['endpoint_url'] = s3_endpoint_url
77+
if s3_access_key != '' :
78+
os.environ['AWS_ACCESS_KEY_ID'] = s3_access_key
79+
if s3_secret_key != '' :
80+
os.environ['AWS_SECRET_ACCESS_KEY'] = s3_secret_key
81+
82+
return client_dict

‎README.md

+19-1
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,22 @@ print(data_loader.schema) # get data schema
130130
print(data_loader.columns) # get data columns
131131
print(data_loader.count) # get total count
132132
print(data_loader.info) # get metadata infomation
133-
```
133+
```
134+
135+
## 6. Customize S3 Path
136+
If you use minio or other obejct storage,you will use s3 parameters
137+
```python
138+
dl = S3Loader(
139+
s3_endpoint_url : str = '',
140+
s3_access_key : str = '',
141+
s3_secret_key : str = '',
142+
bucket : str = '.',
143+
folder : str = 'data',
144+
)
145+
```
146+
* `s3_endpoint_url`
147+
* Storage endpoint url you want to use
148+
* example : "http://mino-service.kubeflow:9000"
149+
* `s3_access_key` and `s3_secret_key`
150+
* you can set s3_access_key and s3_secret_key, but I don't recommend using it
151+
* it is recommended to use environment variables.

‎tests/minio.py

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from ParquetLoader import S3Loader
2+
3+
def s3_data_loading():
4+
sl = S3Loader(
5+
s3_endpoint_url='http://localhost:9000',
6+
s3_access_key='minio',
7+
s3_secret_key='minio123',
8+
bucket='test',
9+
folder="test-data")
10+
print(sl.schema)
11+
for df in sl :
12+
print(df.head())
13+
14+
if __name__ == '__main__':
15+
s3_data_loading()

0 commit comments

Comments
 (0)
Please sign in to comment.