Skip to content

Use datagouv client #474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 0 additions & 1 deletion config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
# Datagouv
DATAGOUV_SECRET_API_KEY = Variable.get("DATAGOUV_SECRET_API_KEY", "")
DEMO_DATAGOUV_SECRET_API_KEY = Variable.get("DEMO_DATAGOUV_SECRET_API_KEY", "")
DEMO_DATAGOUV_URL = Variable.get("DEMO_DATAGOUV_URL", "")
FILES_BASE_URL = Variable.get("FILES_BASE_URL", "")

# Mattermost
Expand Down
16 changes: 10 additions & 6 deletions data_processing/dvf/task_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
MINIO_BUCKET_DATA_PIPELINE_OPEN,
)
from datagouvfr_data_pipelines.utils.postgres import PostgresClient
from datagouvfr_data_pipelines.utils.datagouv import post_remote_resource, DATAGOUV_URL
from datagouvfr_data_pipelines.utils.datagouv import local_client
from datagouvfr_data_pipelines.utils.filesystem import File
from datagouvfr_data_pipelines.utils.mattermost import send_message
from datagouvfr_data_pipelines.utils.minio import MinIOClient
Expand Down Expand Up @@ -1087,9 +1087,11 @@ def send_distribution_to_minio() -> None:
def publish_stats_dvf(ti) -> None:
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}dvf/config/dgv.json") as fp:
data = json.load(fp)
post_remote_resource(
local_client.resource(
id=data["mensuelles"][AIRFLOW_ENV]["resource_id"],
dataset_id=data["mensuelles"][AIRFLOW_ENV]["dataset_id"],
resource_id=data["mensuelles"][AIRFLOW_ENV]["resource_id"],
fetch=False,
).update(
payload={
"url": (
f"https://object.files.data.gouv.fr/{MINIO_BUCKET_DATA_PIPELINE_OPEN}"
Expand All @@ -1105,9 +1107,11 @@ def publish_stats_dvf(ti) -> None:
},
)
print("Done with stats mensuelles")
post_remote_resource(
local_client.resource(
id=data["totales"][AIRFLOW_ENV]["resource_id"],
dataset_id=data["totales"][AIRFLOW_ENV]["dataset_id"],
resource_id=data["totales"][AIRFLOW_ENV]["resource_id"],
fetch=False,
).update(
payload={
"url": (
f"https://object.files.data.gouv.fr/{MINIO_BUCKET_DATA_PIPELINE_OPEN}"
Expand All @@ -1131,6 +1135,6 @@ def notification_mattermost(ti) -> None:
f"Stats DVF générées :"
f"\n- intégré en base de données"
f"\n- publié [sur {'demo.' if AIRFLOW_ENV == 'dev' else ''}data.gouv.fr]"
f"({DATAGOUV_URL}/fr/datasets/{dataset_id})"
f"({local_client.base_url}/fr/datasets/{dataset_id})"
f"\n- données upload [sur Minio]({MINIO_URL}/buckets/{MINIO_BUCKET_DATA_PIPELINE_OPEN}/browse)"
)
35 changes: 21 additions & 14 deletions data_processing/elections/aggregation/task_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from itertools import chain
from datetime import datetime
import math
from datagouv import Client

from datagouvfr_data_pipelines.config import (
AIRFLOW_DAG_HOME,
Expand All @@ -13,9 +14,7 @@
MINIO_BUCKET_DATA_PIPELINE_OPEN,
)
from datagouvfr_data_pipelines.utils.datagouv import (
post_remote_resource,
get_all_from_api_query,
DATAGOUV_URL
local_client,
)
from datagouvfr_data_pipelines.utils.filesystem import File
from datagouvfr_data_pipelines.utils.mattermost import send_message
Expand Down Expand Up @@ -179,10 +178,10 @@ def process_election_data():
del results

# getting preprocessed resources
resources_url = [r['url'] for r in get_all_from_api_query(
resources_url = [r['url'] for r in Client().get_all_from_api_query(
# due to https://github.com/MongoEngine/mongoengine/issues/2748
# we have to specify a sort parameter for now
'https://www.data.gouv.fr/api/1/datasets/community_resources/'
'api/1/datasets/community_resources/'
'?organization=646b7187b50b2a93b1ae3d45&sort=-created_at_internal'
)]
resources = {
Expand Down Expand Up @@ -244,9 +243,11 @@ def send_results_to_minio():
def publish_results_elections():
with open(f"{AIRFLOW_DAG_HOME}{DAG_FOLDER}elections/aggregation/config/dgv.json") as fp:
data = json.load(fp)
post_remote_resource(
local_client.resource(
id=data["general"][AIRFLOW_ENV]["resource_id"],
dataset_id=data["general"][AIRFLOW_ENV]["dataset_id"],
resource_id=data["general"][AIRFLOW_ENV]["resource_id"],
fetch=False,
).update(
payload={
"url": (
f"https://object.files.data.gouv.fr/{MINIO_BUCKET_DATA_PIPELINE_OPEN}"
Expand All @@ -263,9 +264,11 @@ def publish_results_elections():
},
)
print('Done with general results')
post_remote_resource(
local_client.resource(
id=data["candidats"][AIRFLOW_ENV]["resource_id"],
dataset_id=data["candidats"][AIRFLOW_ENV]["dataset_id"],
resource_id=data["candidats"][AIRFLOW_ENV]["resource_id"],
fetch=False,
).update(
payload={
"url": (
f"https://object.files.data.gouv.fr/{MINIO_BUCKET_DATA_PIPELINE_OPEN}"
Expand All @@ -282,9 +285,11 @@ def publish_results_elections():
},
)
print('Done with candidats results')
post_remote_resource(
local_client.resource(
id=data["general_parquet"][AIRFLOW_ENV]["resource_id"],
dataset_id=data["general_parquet"][AIRFLOW_ENV]["dataset_id"],
resource_id=data["general_parquet"][AIRFLOW_ENV]["resource_id"],
fetch=False,
).update(
payload={
"url": (
f"https://object.files.data.gouv.fr/{MINIO_BUCKET_DATA_PIPELINE_OPEN}"
Expand All @@ -301,9 +306,11 @@ def publish_results_elections():
},
)
print('Done with general results parquet')
post_remote_resource(
local_client.resource(
id=data["candidats_parquet"][AIRFLOW_ENV]["resource_id"],
dataset_id=data["candidats_parquet"][AIRFLOW_ENV]["dataset_id"],
resource_id=data["candidats_parquet"][AIRFLOW_ENV]["resource_id"],
fetch=False,
).update(
payload={
"url": (
f"https://object.files.data.gouv.fr/{MINIO_BUCKET_DATA_PIPELINE_OPEN}"
Expand All @@ -328,7 +335,7 @@ def send_notification():
text=(
":mega: Données élections mises à jour.\n"
f"- Données stockées sur Minio - Bucket {MINIO_BUCKET_DATA_PIPELINE_OPEN}\n"
f"- Données référencées [sur data.gouv.fr]({DATAGOUV_URL}/fr/datasets/"
f"- Données référencées [sur data.gouv.fr]({local_client.base_url}/fr/datasets/"
f"{data['general'][AIRFLOW_ENV]['dataset_id']})"
)
)
9 changes: 5 additions & 4 deletions data_processing/elections/miom_mirroring/task_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from datagouvfr_data_pipelines.utils.filesystem import File
from datagouvfr_data_pipelines.utils.minio import MinIOClient
from datagouvfr_data_pipelines.utils.datagouv import (
post_remote_resource,
demo_client,
)

minio_open = MinIOClient(bucket=MINIO_BUCKET_DATA_PIPELINE_OPEN)
Expand Down Expand Up @@ -309,17 +309,18 @@ def publish_results_elections(ti):
if d["filename"]:
filesize = os.path.getsize(os.path.join(DATADIR, d["filename"]))

post_remote_resource(
demo_client.resource(
id=d[AIRFLOW_ENV]["resource_id"],
dataset_id=d[AIRFLOW_ENV]["dataset_id"],
resource_id=d[AIRFLOW_ENV]["resource_id"],
fetch=False,
).update(
payload={
"url": d['url'],
"filesize": filesize,
"title": d['name'] + complement,
"format": d['format'],
"description": "",
},
on_demo=True,
)


Expand Down
4 changes: 2 additions & 2 deletions data_processing/formation/task_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from datagouvfr_data_pipelines.utils.filesystem import File
from datagouvfr_data_pipelines.utils.minio import MinIOClient
from datagouvfr_data_pipelines.utils.mattermost import send_message
from datagouvfr_data_pipelines.utils.datagouv import DATAGOUV_URL
from datagouvfr_data_pipelines.utils.datagouv import local_client

minio_open = MinIOClient(bucket=MINIO_BUCKET_DATA_PIPELINE_OPEN)

Expand All @@ -22,7 +22,7 @@ def download_latest_data(ti):
download_files(
list_urls=[
File(
url=f"{DATAGOUV_URL}/fr/datasets/r/{config['resource_id']}",
url=f"{local_client.base_url}/fr/datasets/r/{config['resource_id']}",
dest_path=f"{AIRFLOW_DAG_TMP}formation/",
dest_name=f"{config['name']}.csv",
)
Expand Down
38 changes: 0 additions & 38 deletions data_processing/geozones/config/dgv.json

This file was deleted.

Loading