-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathmain.py
More file actions
243 lines (215 loc) · 10.3 KB
/
main.py
File metadata and controls
243 lines (215 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#!/usr/bin/env python3
"""Mediatech CLI.
Usage:
main.py download_files (--all | --source=<source>) [--debug]
main.py download_and_process_files (--all | --source=<source>) [--model=<model_name>] [--debug]
main.py create_tables [--model=<model_name>] [--delete-existing] [--debug]
main.py process_files (--all | --source=<source>) [--folder=<path>] [--model=<model_name>] [--debug]
main.py split_table (--table=<name>) [--debug]
main.py export_table (--table=<name>) [--output=<path>] [--split] [--debug]
main.py upload_dataset (--all | --dataset-name=<name>) [--input=<path>] [--repository=<name>] [--private] [--debug]
main.py -h | --help
Commands:
download_files Download files from sources
download_and_process_files Download and process files from sources
create_tables Create database tables (with option to delete existing ones)
process_files Process data from specific source or all sources and insert into database
split_table Split a table into multiple smaller tables based on source and criteria
export_table Export table to Parquet files
upload_dataset Upload dataset to Hugging Face
Options:
--delete-existing Delete existing tables before creating new ones
--all Select all data sources from the data configuration file
--model=<model_name> Embedding model name [default: BAAI/bge-m3]. It is mandatory to specify the same model for all commands.
--source=<source> Source to process (service_public, travail_emploi, legi, cnil,
state_administrations_directory, local_administrations_directory, constit, dole)
--table=<name> Table name to export or split (legi, service_public, etc.)
--folder=<path> Folder containing unprocessed data
--input=<path> Input path of the dataset to upload
--dataset-name=<name> Name of the dataset to upload to Hugging Face
--repository=<name> Hugging Face repository name [default: AgentPublic]
--output=<path> Output folder for Parquet files
--split Split the table into smaller tables before exporting
--private Upload dataset as private on Hugging Face
--debug Enable debug logging
-h --help Show this help message
Examples:
main.py create_tables --model BAAI/bge-m3 --delete-existing
main.py download_files --all
main.py download_and_process_files --source service_public --model BAAI/bge-m3 --debug
main.py download_and_process_files --all --model BAAI/bge-m3
main.py process_files --source service_public --model BAAI/bge-m3
main.py process_files --all --folder data/unprocessed --model BAAI/bge-m3
main.py split_table --table legi
main.py export_table --table legi --split
main.py export_table --table all --output data/parquet
main.py upload_dataset --input data/parquet/service_public.parquet --dataset-name service-public --repository AgentPublic --private
main.py upload_dataset --all --repository AgentPublic
"""
import os
import sys
from docopt import docopt
from config import (
BASE_PATH,
HF_TOKEN,
SOURCE_MAP,
config_file_path,
data_history_path,
get_logger,
parquet_files_folder,
setup_logging,
)
from database import create_all_tables, export_table_to_parquet, split_legi_table
from download_and_processing import (
download_and_optionally_process_all_files,
download_and_optionally_process_files,
process_all_data,
process_data,
)
def main():
try:
args = docopt(__doc__)
# Setup logging
debug_mode = args.get("--debug", False)
setup_logging(debug=debug_mode)
logger = get_logger(__name__)
# Download files
if args["download_files"]:
if args["--all"]:
logger.info(
f"Downloading all files using config: {config_file_path} and history: {data_history_path}"
)
download_and_optionally_process_all_files(
process=False,
model=args["--model"] if args["--model"] else "BAAI/bge-m3",
)
else:
source = args["--source"]
if source in SOURCE_MAP:
logger.info(
f"Downloading and processing {source} files using config: {config_file_path} and history: {data_history_path}"
)
download_and_optionally_process_files(
table_name=source,
process=False,
model=args["--model"] if args["--model"] else "BAAI/bge-m3",
)
else:
logger.error(f"Unknown source: {source}")
return 1
# Download and process files
# This method as a better storage optimization compared to download_files + process_files)
elif args["download_and_process_files"]:
if args["--all"]:
logger.info(
f"Downloading and processing all files using config: {config_file_path} and history: {data_history_path}"
)
download_and_optionally_process_all_files(
process=True,
model=args["--model"] if args["--model"] else "BAAI/bge-m3",
)
else:
source = args["--source"]
if source in SOURCE_MAP:
logger.info(
f"Downloading and processing {source} files using config: {config_file_path} and history: {data_history_path}"
)
download_and_optionally_process_files(
table_name=source,
process=True,
model=args["--model"] if args["--model"] else "BAAI/bge-m3",
)
else:
logger.error(f"Unknown source: {source}")
return 1
# Create tables
elif args["create_tables"]:
delete_existing = True if args["--delete-existing"] else False
model = args["--model"] if args["--model"] else "BAAI/bge-m3"
logger.info(
f"Creating tables with model {model} (delete_existing={delete_existing})"
)
create_all_tables(delete_existing=delete_existing, model=model)
# Process data
elif args["process_files"]:
model = args["--model"] if args["--model"] else "BAAI/bge-m3"
if args["--all"]:
folder = args["--folder"] or os.path.join(BASE_PATH, "data/unprocessed")
logger.info(f"Processing all unprocessed data from folder: {folder}")
process_all_data(unprocessed_data_folder=folder, model=model)
else:
source = args["--source"]
if source in SOURCE_MAP:
logger.info(f"Processing data from source: {source}")
process_data(table_name=source, model=model, streaming=True)
else:
logger.error(f"Unknown source: {source}")
return 1
# Split table into smaller tables based on several criteria
elif args["split_table"]:
table = args["--table"] if args["--table"] else "unknown"
if table == "legi":
logger.info(f"Splitting {table.upper()} table into smaller tables")
split_legi_table(source_table=table, export_to_parquet=False)
else:
logger.error(f"Splitting is not implemented for the {table} table.")
return 1
# Export tables to parquet
elif args["export_table"]:
output = args["--output"] or parquet_files_folder
table = args["--table"] if args["--table"] else None
if table is not None:
logger.info(
f"Exporting {table} PgVector tables to Parquet in folder: {output}"
)
if args["--split"]:
if table == "legi":
split_legi_table(source_table=table, export_to_parquet=True)
else:
export_table_to_parquet(table_name=table, parquet_folder=output)
# Upload dataset to Hugging Face
elif args["upload_dataset"]:
from utils.hugging_face import HuggingFace, upload_dataset_task
if args["--all"]:
logger.info("Uploading all datasets to Hugging Face")
private = True if args["--private"] else False
repository = (
args["--repository"] if args["--repository"] else "AgentPublic"
)
hf = HuggingFace(hugging_face_repo=repository, token=HF_TOKEN)
hf.upload_all_datasets(
config_file_path=config_file_path, private=private
)
else:
dataset_name = args[
"--dataset-name"
] # The name of the dataset to upload (e.g., service-public, travail-emploi, etc.)
input_path = (
args["--input"]
if args["--input"]
else os.path.join(
parquet_files_folder,
f"{dataset_name.lower().replace('-', '_')}",
) # Default folder path for the dataset (e.g., ./data/parquet/service_public)
)
repository = (
args["--repository"] if args["--repository"] else "AgentPublic"
)
private = True if args["--private"] else False
logger.info(
f"Uploading dataset {dataset_name} from {input_path} to Hugging Face (private={private})"
)
upload_dataset_task(
dataset_name=dataset_name,
token=HF_TOKEN,
repository=repository,
private=private,
local_folder_path=input_path,
)
return 0
except Exception as e:
logger.error(f"An error occurred: {e}")
return 1
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)