Skip to content

Commit fc5b767

Browse files
authored
Merge pull request #59 from Bobfrat/trajectories
Add trajectories to celery task
2 parents 7fdc784 + 20f1110 commit fc5b767

7 files changed

Lines changed: 144 additions & 64 deletions

File tree

.dockerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
config.local.yml
22
*.sw[po]
3+
web/static/json/trajectories/

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
*.swp
22
*.pyc
33
logs/
4-
web/static/json/trajectories/*.json
4+
web/static/json/trajectories/
55
web/npm_components/
66
web/node_modules/
77
web/static/app/

app.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@ def __call__(self, *args, **kwargs):
4242
"task": "status.tasks.get_dac_status",
4343
"schedule": crontab(minute="*/15") # Run every 15 mins
4444
},
45+
"get_dac_trajectories_task": {
46+
"task": "status.tasks.get_trajectory_features",
47+
"schedule": crontab(minute=10, hour="*/1") # Run every hr
48+
},
4549
"get_dac_profile_plots_task": {
4650
"task": "status.tasks.generate_dac_profile_plots",
47-
"schedule": crontab(minute=0, hour=[0, 12]) # Run twice a day
51+
"schedule": crontab(minute=0, hour='*/6') # Run 4 times a day
4852
},
4953
}
5054

requirements/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,4 @@ cmocean==2.0
1717
netCDF4>=1.4.2
1818
netcdftime>=1.0.0a2
1919
boto3==1.13.18
20+
shapely==1.7.0

status/controller.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,34 @@
66
routes and logic API
77
'''
88

9+
import requests
10+
from flask import jsonify, current_app
911
from status import api
10-
from status.tasks import get_trajectory
11-
12-
from flask import jsonify, request, current_app
12+
from status.trajectories import get_trajectory
1313

14-
import requests
1514

1615
@api.route('/test')
1716
def test():
1817
return jsonify(message="Running")
1918

20-
#--------------------------------------------------------------------------------
19+
20+
# --------------------------------------------------------------------------------
2121
# Proxies - Use with caution
22-
#--------------------------------------------------------------------------------
22+
# --------------------------------------------------------------------------------
2323
@api.route('/deployment', methods=['GET'])
2424
def get_deployments():
2525
url = current_app.config.get('DAC_API')
2626
response = requests.get(url)
2727
return response.content, response.status_code, dict(response.headers)
2828

29+
2930
@api.route('/deployment/<string:username>/<string:deployment_name>', methods=['GET'])
3031
def get_deployment(username, deployment_name):
3132
url = current_app.config.get('DAC_API')
3233
url += '/%s/%s' % (username, deployment_name)
3334
response = requests.get(url)
3435
return response.content, response.status_code, dict(response.headers)
35-
#--------------------------------------------------------------------------------
36+
3637

3738
@api.route('/track/<string:username>/<string:deployment_name>')
3839
def track(username, deployment_name):
@@ -45,4 +46,3 @@ def track(username, deployment_name):
4546
erddap_url = deployment['erddap']
4647
geo_data = get_trajectory(erddap_url)
4748
return jsonify(**geo_data)
48-

status/tasks.py

Lines changed: 2 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from datetime import datetime
77
from celery.utils.log import get_task_logger
88
from status.profile_plots import generate_profile_plots
9+
from status.trajectories import generate_trajectories
910
from urllib.parse import urlencode
1011
import status.clocks as clock
1112
import json
@@ -42,42 +43,6 @@ def write_json(data):
4243
return True
4344

4445

45-
def get_trajectory(erddap_url):
46-
'''
47-
Reads the trajectory information from ERDDAP and returns a GEOJSON like
48-
structure.
49-
'''
50-
# https://gliders.ioos.us/erddap/tabledap/ru01-20140104T1621.json?latitude,longitude&time&orderBy(%22time%22)
51-
url = erddap_url.replace('html', 'json')
52-
# ERDDAP requires the variable being sorted to be present in the variable
53-
# list. The time variable will be removed before converting to GeoJSON
54-
url += '?longitude,latitude,time&orderBy(%22time%22)'
55-
response = requests.get(url, timeout=180)
56-
if response.status_code != 200:
57-
raise IOError("Failed to get trajectories")
58-
data = response.json()
59-
geo_data = {
60-
'type': 'LineString',
61-
'coordinates': [c[0:2] for c in data['table']['rows']]
62-
}
63-
return geo_data
64-
65-
66-
def write_trajectory(deployment, geo_data):
67-
'''
68-
Writes a geojson like python structure to the appropriate data file
69-
'''
70-
trajectory_dir = app.config.get('TRAJECTORY_DIR')
71-
username = deployment['username']
72-
name = deployment['name']
73-
dir_path = os.path.join(trajectory_dir, username)
74-
if not os.path.exists(dir_path):
75-
os.makedirs(dir_path)
76-
file_path = os.path.join(dir_path, name + '.json')
77-
with open(file_path, 'wb') as f:
78-
f.write(json.dumps(geo_data))
79-
80-
8146
@celery.task()
8247
def generate_dac_profile_plots():
8348
profile_plot_dir = app.config.get('PROFILE_PLOT_DIR')
@@ -86,23 +51,7 @@ def generate_dac_profile_plots():
8651

8752
@celery.task(time_limit=300)
8853
def get_trajectory_features():
89-
deployments_url = app.config.get('DAC_API')
90-
91-
response = requests.get(deployments_url, timeout=60)
92-
if response.status_code != 200:
93-
raise IOError("Failed to get response from DAC ACPI")
94-
data = response.json()
95-
deployments = data['results']
96-
for d in deployments:
97-
logger.info("Reading deployment %s", d['deployment_dir'])
98-
try:
99-
geo_data = get_trajectory(d['erddap'])
100-
write_trajectory(d, geo_data)
101-
except IOError:
102-
logger.exception("Failed to get trajectory for %s",
103-
d['deployment_dir'])
104-
105-
return deployments
54+
return generate_trajectories()
10655

10756

10857
@celery.task()
@@ -309,7 +258,6 @@ def get_dac_status(time_limit=300):
309258
meta['tds'] = None
310259
if tds_request.status_code == 200:
311260
meta['tds'] = tds_das_url.replace('.das', '.html')
312-
313261
# Add the deployment metadata to the return object
314262
deployments['datasets'].append(collections.OrderedDict(
315263
sorted(list(meta.items()), key=lambda t: t[0])))

status/trajectories.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
import json
4+
import requests
5+
import os
6+
import sys
7+
from app import app
8+
from shapely.geometry import LineString
9+
from status.profile_plots import iter_deployments, is_recent_data, is_recent_update
10+
11+
12+
def get_trajectory(erddap_url):
13+
'''
14+
Reads the trajectory information from ERDDAP and returns a GEOJSON like
15+
structure.
16+
'''
17+
# https://gliders.ioos.us/erddap/tabledap/ru01-20140104T1621.json?latitude,longitude&time&orderBy(%22time%22)
18+
url = erddap_url.replace('html', 'json')
19+
# ERDDAP requires the variable being sorted to be present in the variable
20+
# list. The time variable will be removed before converting to GeoJSON
21+
url += '?longitude,latitude,time&orderBy(%22time%22)'
22+
response = requests.get(url, timeout=180)
23+
if response.status_code != 200:
24+
raise IOError("Failed to fetch trajectories: {}".format(erddap_url))
25+
data = response.json()
26+
geo_data = {
27+
'type': 'LineString',
28+
'coordinates': [c[0:2] for c in data['table']['rows']]
29+
}
30+
31+
geometry = parse_geometry(geo_data)
32+
coords = LineString(geometry['coordinates'])
33+
trajectory = coords.simplify(0.02, preserve_topology=False)
34+
geometry = {
35+
'type': 'LineString',
36+
'coordinates': list(trajectory.coords),
37+
'properties': {
38+
'oceansmap_type': 'glider'
39+
}
40+
}
41+
return geometry
42+
43+
44+
def get_path(deployment):
45+
'''
46+
Returns the path to the trajectory file
47+
48+
:param dict deployment: Dictionary containing the deployment metadata
49+
'''
50+
trajectory_dir = app.config.get('TRAJECTORY_DIR')
51+
username = deployment['username']
52+
name = deployment['name']
53+
dir_path = os.path.join(trajectory_dir, username)
54+
if not os.path.exists(dir_path):
55+
os.makedirs(dir_path)
56+
file_path = os.path.join(dir_path, name + '.json')
57+
return file_path
58+
59+
60+
def write_trajectory(deployment, geo_data):
61+
'''
62+
Writes a geojson like python structure to the appropriate data file
63+
64+
:param dict deployment: Dictionary containing the deployment metadata
65+
:param dict geometry: A GeoJSON Geometry object
66+
'''
67+
file_path = get_path(deployment)
68+
with open(file_path, 'w') as f:
69+
f.write(json.dumps(geo_data))
70+
71+
72+
def parse_geometry(geometry):
73+
'''
74+
Filters out potentially bad coordinate pairs as returned from
75+
GliderDAC. Returns a safe geometry object.
76+
77+
:param dict geometry: A GeoJSON Geometry object
78+
'''
79+
coords = []
80+
for lon, lat in geometry['coordinates']:
81+
if lon is None or lat is None:
82+
continue
83+
coords.append([lon, lat])
84+
return {'coordinates': coords}
85+
86+
87+
def trajectory_exists(deployment):
88+
'''
89+
Returns True if the data is within the last week
90+
91+
:param dict deployment: Dictionary containing the deployment metadata
92+
'''
93+
94+
file_path = get_path(deployment)
95+
return os.path.exists(file_path)
96+
97+
98+
def generate_trajectories(deployments=None):
99+
'''
100+
Determine which trajectories need to be built, and write geojson to file
101+
'''
102+
for deployment in iter_deployments():
103+
try:
104+
# Only add if the deployment has been recently updated or the data is recent
105+
recent_update = is_recent_update(deployment['updated'])
106+
recent_data = is_recent_data(deployment)
107+
existing_trajectory = trajectory_exists(deployment)
108+
if (recent_update or recent_data or not existing_trajectory):
109+
geo_data = get_trajectory(deployment['erddap'])
110+
write_trajectory(deployment, geo_data)
111+
except Exception:
112+
from traceback import print_exc
113+
print_exc()
114+
return 0
115+
116+
117+
if __name__ == '__main__':
118+
from argparse import ArgumentParser
119+
parser = ArgumentParser(description=generate_trajectories.__doc__)
120+
parser.add_argument(
121+
'-d', '--deployment',
122+
action='append',
123+
help='Which deployment to build'
124+
)
125+
args = parser.parse_args()
126+
sys.exit(generate_trajectories(args.deployment))

0 commit comments

Comments
 (0)