You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Copy file name to clipboardExpand all lines: content/articles/2025/2025-05-27_taradata_extract_load_mapillary.md
+13-13Lines changed: 13 additions & 13 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -72,7 +72,7 @@ Plusieurs options sont également disponibles pour créer un _DAG_ mais avec _Ta
72
72
73
73
Ci-dessous, un exemple de _DAG_ pour récupérer chaque heure la hauteur d'eau du Gardon à Anduze grâce à l'[API Hydrométrie de Hubeau](https://hubeau.eaufrance.fr/page/api-hydrometrie).
74
74
75
-
```py
75
+
```py title="_DAG_ d'extraction/chargement de données depuis Hubeau"
76
76
import requests
77
77
from airflow.decorators import dag, task
78
78
from datetime import datetime
@@ -149,7 +149,7 @@ Après cette entrée en matière sur Apache Airflow, voyons maintenant le script
149
149
150
150
La première tâche de notre _DAG_ consiste en la création du schéma d'accueil dans l'entrepôt PostgreSQL.
151
151
152
-
```py
152
+
```py title="Tâche de création du schéma PostgreSQL"
@@ -199,7 +199,7 @@ La transformation de ces données JSON en quelque chose d'exploitable, avec [QGI
199
199
200
200
Avant de poursuivre, nous devons chaîner les tâches. En effet, il ne faut pas essayer de créer la table avant d'avoir terminé la création du schéma. Ceci est fait grâce à l'opérateur `>>`.
201
201
202
-
```py
202
+
```py title="Chaînage des tâches"
203
203
create_schema_task >> create_table_task
204
204
```
205
205
@@ -253,7 +253,7 @@ repartition_aleatoire as (
253
253
254
254
Ne reste plus qu'à insérer cette répartition aléatoire dans notre table de chargement, ce qui donne la requête globale suivante.
255
255
256
-
```sql
256
+
```sql title="Requête de calcul de l'emprise d'extraction et de répartition du travail"
257
257
with emprise as (
258
258
select ST_Collect(geom) as geom
259
259
from troncons_wgs84
@@ -287,7 +287,7 @@ Nous pouvons alors afficher le résultat de cette répartition dans QGIS. De l'a
287
287
288
288
Tout est prêt pour extraire et charger les données. Histoire de ne pas rentrer directement dans le dur, analysons d'abord l'entête et le pseudo-code de la tâche.
289
289
290
-
```py
290
+
```py title="Entête et pseudo-code de la tâche d'extraction/chargement"
@@ -323,7 +323,7 @@ De nouvelles cellules sont donc potentiellement créées à la sortie de la bouc
323
323
324
324
Cette étape consiste simplement à exécuter la requête suivante.
325
325
326
-
```sql
326
+
```sql title="Requête de récupération de la liste des cellules à extraire/charger"
327
327
select
328
328
geom,
329
329
ST_XMin(geom) as x_min,
@@ -348,7 +348,7 @@ Sur chaque cellule, l'extraction se fait via un appel HTTP à l'API en passant e
348
348
349
349
Nous avons encapsulé cet appel dans la fonction ci-dessous.
350
350
351
-
```py
351
+
```py title="Fonction d'appel à l'API Mapillary"
352
352
defcall_map_features_api(cell: dict):
353
353
"""
354
354
Appel à l'API d'extraction des "features" au format JSON pour une cellule donnée.
@@ -373,7 +373,7 @@ Par ailleurs, Apache Airflow propose [la gestion de connexions](https://airflow.
373
373
374
374
Le chargement des données consiste en la mise à jour du champs `ìnformations` (type `jsonb`) de la table de chargement pour la géometrie correspondante.
375
375
376
-
```sql
376
+
```sql title="Requête de chargement des données"
377
377
update tmp_features
378
378
set informations = %(informations)s
379
379
where ST_Equals(geom, (%(geom)s)::geometry);
@@ -385,7 +385,7 @@ Les cellules pour lesquelles 2000 _features_ ont été retournées sont supprim
385
385
386
386
La suppression et l'ajout sont réalisés en un seul ordre SQL grâce au mot-clé [`returning`](https://www.postgresql.org/docs/current/dml-returning.html) qui permet de récupérer tout ou partie des champs des lignes modifiées.
387
387
388
-
```sql
388
+
```sql title="Requête de découpage des cellules contenant plus de 2000 _features_"
389
389
with cellules_a_diviser as (
390
390
delete
391
391
from tmp_features
@@ -410,7 +410,7 @@ La tâche 4 est construite de sorte à traiter une sous-partie des quelques 4000
410
410
411
411
Il faut donc invoquer autant de fois que souhaité la tâche pour chacune des sous-parties. Pour cela, nous mettons ces invocations dans une liste Python.
412
412
413
-
```py
413
+
```py title="Création de N tâches d'extraction/chargement"
414
414
extract_load_features_tasks = []
415
415
for extract_load_task_id inrange(1, extract_load_tasks_count +1):
Après chargement, la table définitive de stockage des données est écrasée avec la table de chargement. Seul le champ `informations` est conservé, les autres champs n'étant utiles que pour la phase d'EL.
430
430
431
-
```py
431
+
```py title="Tâche de remplacement de la table destination par la table temporaire de chargement"
0 commit comments