-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsimapi.py
281 lines (240 loc) · 10.6 KB
/
simapi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
import csv
import json
import logging
import polling2
import requests
import pandas as pd
user_url = 'http://127.0.0.1:8000/user/'
login_url = 'http://127.0.0.1:8000/login/'
init_url = 'http://127.0.0.1:8000/init_model/'
input_url = 'http://127.0.0.1:8000/input/'
output_url = 'http://127.0.0.1:8000/output/'
graphql_url = 'http://127.0.0.1:8000/graphql/'
send_fmu = 'http://127.0.0.1:8000/send_fmu/'
# TODO add utility method to prepare user csv e.g. add time step column etc.
class SimApi:
def __init__(self, model_name, model_count, step_size, final_time, idf_path, epw_path, csv):
"""
Class represents the programming interface exposed to a user of the SimApi system.
:param model_name: (string) name of model must be unique
:param model_count: (int) number of models to instantiate
:param step_size: (int) size of each step per hour, value in seconds e.g. 4 steps per hour = 900 step size
(15 minutes in seconds)
:param final_time: (int) final runtime of model, value in hours. Will be changed to accommodate run times
over a few days
:param idf_path: (string) absolute path to .idf
:param epw_path: (string) absolute path to .epw
:param csv: (list) absolute path(s) to csv file(s), number of files must equal model count
"""
self.logger = logging.getLogger('simapi')
handler = logging.FileHandler('./simapi.log')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
self._header = None
self._model_name = model_name
self._model_count = model_count
self._step_size = step_size
self._final_time = final_time
self._idf_path = idf_path
self._epw_path = epw_path
self._csv = csv
# model initialization parameters
self._init_data = {
'model_name': self._model_name, # change name each time script is run!
'container_id': None, # TODO change container_id from hostname to src_simulator_*
'model_count': self._model_count,
'step_size': self._step_size, # step size in seconds. 600 secs = 10 mins
'final_time': self._final_time # 24 hours = 86400 secs
}
self.sim_names = []
@staticmethod
"""
Creates new user
:param user_email: (string) user email
:param user_name: (string) user name
:param user_password: (string) user password
:return:
"""
# TODO add check for existing user
json_data = {
"name": user_name,
"email": user_email,
"password": user_password
}
return requests.post(user_url, data=json_data)
"""
Login as current user and store user token as a header dictionary to be used in requests
:param username: (string) user name
:param password: (string) user password
"""
data = {"username": username, # username = email
"password": password}
print(login_url)
resp = requests.post(login_url, data=data)
if resp.status_code == 200:
json_resp = resp.json()
token = json_resp['token'] # get validation token
self._header = {'Authorization': 'Token ' + token} # set request header
return resp.status_code
def send_and_generate(self):
"""
Send files needed to generate an fmu. return when fmu has finished generating.
:return: (int) status code of request, 201 if success
"""
idf_file = open(self._idf_path, 'rb')
epw_file = open(self._epw_path, 'rb')
file = {'idf_file': ('update.idf', idf_file),
'epw_file': ('update.epw', epw_file)}
resp = requests.post(init_url, headers=self._header, data=self._init_data, files=file)
idf_file.close()
epw_file.close()
return resp.status_code
def send_and_init(self):
"""
send data and initialize model as a simulation object, returns when simulation object has finished initializing
:return: (int) status code of request, 200 if success
"""
resp = requests.post(send_fmu, headers=self._header, json=self._init_data)
# graphql query for all models in db related to initial_model_name.
model_query = """
{{
fmuModels(modelN: "{0}"){{
modelName
}}
}}
""".format(self._model_name)
r = requests.get(url=graphql_url, json={'query': model_query}).json()['data']['fmuModels']
# TODO check if model count = initialized_model_count and relay to user,
# account for case when initialized_model_count < model count
# initialized_model_count = len(r)
# prints init_data on successful post
return resp.status_code
# TODO split into multiple methods giving the user more control over simulations
def simulate_models(self):
"""
Starts communication with simulation model and returns when model has reached its final time
:return: (int) 200 for success
"""
def test_method(query, url):
resp = requests.get(url=url, json={'query': query})
json_data = resp.json()['data']['outputs']
# self.logger.info("Output current length: {}".format(len(json_data)))
return len(json_data)
# TODO needs rework asap
# query for all models in db related to initial_model_name.
model_query = """
{{
fmuModels(modelN: "{0}"){{
modelName
}}
}}
""".format(self._model_name)
r = requests.get(url=graphql_url, json={'query': model_query})
i = 0
while i < self._model_count:
name = r.json()['data']['fmuModels'][i]['modelName'] # extract model name from graphql query response
print(name)
self.sim_names.append(name) # store extracted model names.
i += 1
f_time = 60 * 60 * self._final_time
data_frames = []
for file in self._csv:
data_frames.append(pd.read_csv(file))
i = 0 # first step
while i < f_time:
j = 0
# TODO process models async client side!
while j < self._model_count:
# TODO store dataframe in generator method and call next each iter
if len(data_frames) > 1:
df = data_frames[j]
else:
df = data_frames[0]
row = df.loc[df['time_step'] == i]
input_dict = row.to_dict('records')
input_dict = input_dict[0]
input_data = {
'fmu_model': self.sim_names[j],
'time_step': i,
'input_json': json.dumps(input_dict)
}
r = requests.post(input_url, headers=self._header, data=input_data)
print(r.text + ' ' + str(r.status_code))
j += 1
output_query = """
{{
outputs(modelN: "{0}", tStep: {1}) {{
outputJson
}}
}}
""".format(self._model_name, i)
try:
polling2.poll(
lambda: test_method(query=output_query, url=graphql_url) == self._model_count,
step=0.1,
timeout=60)
except polling2.TimeoutException:
print("Timeout error occurred\nLength of results is: {}".format(
test_method(query=output_query, url=graphql_url)))
i += self._step_size
# send empty input to kill and restart process in sim container(s)
k = 0
while k < self._model_count:
input_data = {
'fmu_model': self.sim_names[k],
'time_step': 0,
'input_json': json.dumps({"end_proc": -1})
}
r = requests.post(input_url, headers=self._header, data=input_data)
k += 1
print("\nAll data sent to simulation\n")
return 200
def request_model_outputs(self, sim_name):
f_time = 60*60*self._final_time
num_of_steps = f_time/self._step_size
self.logger.info("Expected number of steps: {}".format(num_of_steps))
def test_method(query, url):
resp = requests.get(url=url, json={'query': query})
json_data = resp.json()['data']['outputs']
self.logger.info("Output current length: {}".format(len(json_data)))
return len(json_data)
output_query = """
{{
outputs(modelN: "{0}") {{
timeStep
outputJson
}}
}}
""".format(sim_name)
print("Processing remaining inputs...")
try:
polling2.poll(
lambda: test_method(query=output_query, url=graphql_url) == num_of_steps,
step=0.1,
poll_forever=True)
except polling2.TimeoutException:
print("Timeout error occurred\nLength of results is: {}".format(test_method(query=output_query, url=graphql_url)))
json_output = requests.get(url=graphql_url, json={'query': output_query}).json()['data']['outputs']
# TODO store list of output names and use as csv column
print("Retrieving outputs...")
try:
csv_columns = ['time_step', 'output']
with open(f'output_csv/{sim_name}.csv', 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
writer.writeheader()
for out in json_output:
writer.writerow({'time_step': out['timeStep'], 'output': json.loads(out['outputJson'])})
except IOError:
print("I/O error")
@staticmethod
def multi_thread_client(self):
"""
Let user make multi-threaded requests, simulations per thread = (number of sims / available threads).
Avoid sequential processing of container requests client side.
:return:
"""
return NotImplementedError