-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathEMT_API.py
More file actions
69 lines (51 loc) · 2.14 KB
/
Copy pathEMT_API.py
File metadata and controls
69 lines (51 loc) · 2.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import os
import requests
from dotenv import load_dotenv
# Cargar las variables del archivo .env al vuelo
load_dotenv()
def get_emt_bus(stop_id="5036"):
"""Consulta los próximos buses de la EMT usando MobilityLabs y .env."""
# Extraemos las llaves secretas del entorno
CLIENT_ID = os.getenv("EMT_CLIENT_ID")
PASS_KEY = os.getenv("EMT_PASS_KEY")
if not CLIENT_ID or not PASS_KEY:
return ["Faltan llaves", "en el .env"]
try:
# 1. Iniciar sesión
login_url = "https://openapi.emtmadrid.es/v1/mobilitylabs/user/login/"
headers_login = {"X-ClientId": CLIENT_ID, "passKey": PASS_KEY}
res_login = requests.get(login_url, headers=headers_login, timeout=5).json()
if res_login.get("code") not in ["00", "01", "02"]:
return ["Error de", "credenciales"]
token = res_login["data"][0]["accessToken"]
# 2. Consultar parada
stop_url = f"https://openapi.emtmadrid.es/v2/transport/busemtmad/stops/{stop_id}/arrives/"
headers_stop = {"accessToken": token}
payload = {
"cultureInfo": "ES",
"Text_StopRequired_YN": "N",
"Text_EstimationsRequired_YN": "Y",
"Text_IncidencesRequired_YN": "N",
"DateTime_Referenced_Incidencies_YYYYMMDD": "20240101", # Fecha fija según doc de EMT
}
res_stop = requests.post(
stop_url, headers=headers_stop, json=payload, timeout=5
).json()
buses = res_stop["data"][0].get("Arrive", [])
if not buses:
return ["No hay buses", "cerca"]
buses_ordenados = sorted(buses, key=lambda x: x["estimateArrive"])
resultados = []
for bus in buses_ordenados:
linea = bus["line"]
minutos = bus["estimateArrive"] // 60
if minutos <= 0:
tiempo_str = "Llegando"
elif minutos > 45:
tiempo_str = "+45 min"
else:
tiempo_str = f"{minutos} min"
resultados.append(f"Linea {linea}: {tiempo_str}")
return resultados
except Exception:
return ["Error red", "EMT Madrid"]