-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathpythondcs.py
More file actions
490 lines (475 loc) · 25.7 KB
/
pythondcs.py
File metadata and controls
490 lines (475 loc) · 25.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
from datetime import datetime, date, time, timedelta, timezone
from threading import RLock
import requests, logging
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
try:
import ijson, gzip
IJSONAVAILABLE = True
except ImportError:
IJSONAVAILABLE = False
class DcsWebApi:
"""
The DcsWebApi class can be used to login and interface with a
Coherent Research DCS system using it's built in json based public
web API. You can then download data using class methods which map
to the DCS API but with some convenient python data conversions.
The class can be used in a unauthenticated or authenticated regime.
The official API methods supported are detailed here, but all of
the semantics of the API, including the login cookies are
handled by the class and its methods:
https://github.com/coherent-research/dcs-documentation/blob/master/DcsPublicApiDescription.md
"""
if hasattr(datetime,"fromisoformat"):
@staticmethod
def _fromisoformat(isostr):
"""Converts ISO formatted datetime strings to datetime objects
using built in methods but with added support for "Z" timezones.
Generally supported by Python 3.7+"""
return datetime.fromisoformat(isostr.replace('Z', '+00:00', 1))
else:
@staticmethod
def _fromisoformat(isostr):
"""Converts ISO formatted datetime strings to datetime objects
using string manipulations for Python 3.6 and lower where the built in
"fromisoformat" method isn't available. This function is slower but
much faster than "strptime", but not as forgiving if the strings are
incorrectly formatted.
Expected format: YYYY*MM*DD*HH*MM*SS[.f][Z|[{+|-}HH*MM]] where * can
match any single character, and "f" can be up to 6 digits"""
isostr = isostr.replace('Z', '+00:00', 1)
strlen = len(isostr)
tz_pos = (isostr.find("+",19)+1 or isostr.find("-",19)+1 or strlen+1)-1
if tz_pos == strlen:
tz = None
else:
tz_parts = (
int(isostr[tz_pos+1:tz_pos+3]),
int(isostr[tz_pos+4:tz_pos+6]),
)
if not any(tz_parts):
tz = timezone.utc
else:
tz = timezone(
(1 if isostr[tz_pos] == "+" else -1)
* timedelta(
hours=tz_parts[0],
minutes=tz_parts[1],
)
)
return datetime(
int(isostr[0:4]), # Year
int(isostr[5:7]), # Month
int(isostr[8:10]), # Day
int(isostr[11:13]), # Hour
int(isostr[14:16]), # Minute
int(isostr[17:19]), # Second
(
int(isostr[20:tz_pos].ljust(6,"0"))
if strlen > 19 and isostr[19] == "."
else 0
), # Microsecond
tz, # Timezone
)
@classmethod
def _readingsgenerator(cls, parse_events, format):
"""Provides an iterator of element from the 'readings' object in 'standard' or
'complete' format. Converts timestamps to datetime objects and values to floats"""
n=0
if format == "standard":
for item in ijson.items(parse_events,"readings.item", use_float=True):
item["timestamp"] = cls._fromisoformat(item["timestamp"])
item["value"] = float(item["value"])
yield item
n+=1
elif format == "complete":
for item in ijson.items(parse_events,"readings.item", use_float=True):
item["timestamp"] = cls._fromisoformat(item["timestamp"])
item["totalValue"] = float(item["totalValue"])
item["periodValue"] = float(item["periodValue"])
yield item
n+=1
else:
for item in ijson.items(parse_events,"readings.item", use_float=True):
item["timestamp"] = cls._fromisoformat(item["timestamp"])
yield item
n+=1
logging.info(f"All {n} readings retreived")
@classmethod
def _iterjson_reads(cls, reply, format):
"""Takes the http response and decodes the json payload by streaming it,
decompressing it if required, and decoding it into a dictionary with an
iterator in place of the 'readings' using the _readingsgenerator method.
Note that any items appearing after the 'readings' object will be lost."""
if "content-encoding" in reply.headers and reply.headers["content-encoding"] in ("gzip", "deflate"):
raw = gzip.open(reply.raw)
else:
raw = reply.raw
results = dict()
parse_events = ijson.parse(raw)
while True:
path, name, value = next(parse_events)
if name == "map_key" and value != "readings":
path, name, value = next(parse_events)
results[path] = cls._fromisoformat(value) if path in ("startTime", "endTime") else value
elif name == "map_key" and value == "readings":
break
results["readings"] = cls._readingsgenerator(parse_events, format)
return results
@classmethod
def _json_reads(cls, reply, format):
"""Takes the http response and decodes the json payload as one object
converting timestamps to datetime objects and all reading values to floats"""
results = reply.json()
results["startTime"] = cls._fromisoformat(results["startTime"])
results["endTime"] = cls._fromisoformat(results["endTime"])
if format == "standard":
for item in results["readings"]:
# Convert to datetimes and floats
item["timestamp"] = cls._fromisoformat(item["timestamp"])
item["value"] = float(item["value"])
elif format == "complete":
for item in results["readings"]:
# Convert to datetimes and floats
item["timestamp"] = cls._fromisoformat(item["timestamp"])
item["totalValue"] = float(item["totalValue"])
item["periodValue"] = float(item["periodValue"])
else:
for item in results["readings"]:
# Convert to datetimes
item["timestamp"] = cls._fromisoformat(item["timestamp"])
logging.info(f"All {len(results['readings'])} readings retreived")
return results
@classmethod
def _raise_for_status(cls, reply):
"""Raises :class:`HTTPError`, if one occurred with the human readable error message included"""
if 400 <= reply.status_code < 500:
http_error_msg = f"\n{reply.status_code} Client Error: {reply.reason} for url: {reply.url}"
try:
payload = reply.json()
http_error_msg += f"\n{ chr(10).join( ' : '.join(item) for item in payload.items() ) }"
except requests.models.complexjson.JSONDecodeError:
pass
raise requests.exceptions.HTTPError(http_error_msg, response=reply)
else:
reply.raise_for_status() # Fallback on the requests library exception
def __enter__(self):
"""Context Manager Enter"""
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Context Manager Exit"""
return None
def __init__(self, rooturl, username=None, password=None):
"""
Creates a Public API Session object with the rooturl and logs in if
credentials are provided. Returns this object for future use.
"""
# Lock used to limit sessions to 1 transaction at a time to avoid
# accidental flooding of the server if used within multithreaded loops
self.lock = RLock()
self.timeout = (3.05,120) # Connect and Read timeouts
self.s = requests.Session()
self.s.stream = True
# Attempt up to 5 increasingly delayed retries for recoverable errors
self.s.mount(rooturl, HTTPAdapter(
max_retries=Retry( # Delays between retries: 0, 1, 2, 4, 8 seconds
total=5, backoff_factor=0.5, status_forcelist=[ 502, 503, 504 ]
) ))
self.rooturl = rooturl.rstrip(" /")
self.username = None
self.role = None
if None not in (username, password):
self.signin(username, password)
else:
logging.warning("Incomplete credentials given - Unauthenticated mode will be used; Please use the signin method for Authenticated mode")
def status(self):
"""
Gets the Status of the API
"""
subpath = "/status"
with self.lock:
reply = self.s.get(self.rooturl+subpath, timeout=self.timeout)
self._raise_for_status(reply)
return reply.json()
def signin(self, username, password):
"""
Signs in to the DCS server for the current Session object.
An authentication token (cookie) will be stored for this session.
Must be provided with a username and password.
Returns None.
"""
subpath = "/authentication/signin"
if len(self.s.cookies) > 0 or self.username is not None:
self.signout()
try:
with self.lock:
reply = self.s.post(self.rooturl+subpath,
json={"username":username,"password":password},
timeout=self.timeout
)
self._raise_for_status(reply)
result = reply.json()
self.username = result['username']
self.role = result['role']
logging.info(f"Successfully signed in to DCS as '{self.username}' with {self.role} privileges")
except requests.exceptions.HTTPError as err:
r = err.response
logging.error(f"{r.status_code}: {r.reason}, '{r.text}'\n{r.url}")
def signout(self):
"""
Signs out of the current session and expires the authentication cookie.
Returns None.
"""
subpath = "/authentication/signout"
with self.lock:
self.s.post(self.rooturl+subpath, timeout=self.timeout)
self.username = None
self.role = None
logging.info("Signed Out of DCS")
def __del__(self):
"""Signs out of DCS upon deletion and garbage collection of this object"""
if self.username is not None:#
self.signout()
def meters(self, iterator=False):
"""
Returns a list of available meters defined in DCS including the registers for each meter.
What is available depends on permissions.
Structure is approximately a List containing Dicts for each meter
where Registers are another List containing Dicts for each register.
"""
subpath = "/public/meters" if self.username is None else "/meters"
with self.lock:
reply = self.s.get(self.rooturl+subpath, timeout=self.timeout)
self._raise_for_status(reply)
if iterator and IJSONAVAILABLE:
# The user must ask for an iterator AND the module must be available
if "content-encoding" in reply.headers and reply.headers["content-encoding"] in ("gzip", "deflate"):
raw = gzip.open(reply.raw)
else:
raw = reply.raw
return ijson.items(raw,"item", use_float=True)
else:
return reply.json()
def virtualmeters(self, iterator=False):
"""
Returns a list of available virtual meters defined in DCS. Each virtual meter object will contain 1 or more register alias objects.
What is available depends on permissions.
Structure is approximately a List containing Dicts for each virtual meter
where register aliases are another List containing Dict for each alias.
"""
subpath = "/public/virtualMeters" if self.username is None else "/virtualMeters"
with self.lock:
reply = self.s.get(self.rooturl+subpath, timeout=self.timeout)
self._raise_for_status(reply)
if iterator and IJSONAVAILABLE:
# The user must ask for an iterator AND the module must be available
if "content-encoding" in reply.headers and reply.headers["content-encoding"] in ("gzip", "deflate"):
raw = gzip.open(reply.raw)
else:
raw = reply.raw
return ijson.items(raw,"item", use_float=True)
else:
return reply.json()
def readings(self, id, startTime=None, endTime=None, periodCount=None,
calibrated=True, interpolated=True, periodType="halfhour", format="standard", iterator=False):
"""
Returns a list or iterator of readings for the specified register or virtual meter and timespan.
Structure is approximately a Dict containing header information with a nested list of readings
where readings are a list/iterator containing a Dict for each reading.
If very large queries are needed, or server limits are met, use the 'largereadings' method.
Using an iterator (iterator=True with ijson module) will yield one
reading at a time which may be more memory efficient for large data sets
but the values are not retained after consumption.
If memory usage is not a concern or you need to retain the data, then
the iterator=False (default) will simply return one single list of reads.
In both cases, each element of the list or iterator will consist of a
dictionary with values as floats and dates as timezone aware datetime objects.
It is possible for the floats to represent positive and negative infinities or nan.
Parameters are as required by DCS:
- "id" - string of the register or virtual meter prepended by R or VM (Required)
- "startTime" - datetime or date object (Optional, see note)
- "endTime" - datetime or date object (Optional, see note)
- "periodCount" - integer number of periodTypes (Optional, see note)
- "calibrated" - boolean for whether values should be calibrated (Optional, default True)
- "interpolated" - boolean for whether gaps should be linearly filled (Optional, default True)
- "periodType" - string of "halfHour", or "hour",
"day", "week", "month" (Optional, default halfhour)
- "format" - string of "standard", or "complete" (Optional, default "standard")
- "iterator" - False to return a single potentially larget nested list, or
True to return an iterator which streams and yields each item.
if the ijson module is not available, this option does nothing and
is always equivelent to False.
Note: The timespan covered by the request can be specified by including any 2 of
startTime, endTime or periodCount. It is an error to specify anything other than 2.
"""
if (startTime,endTime,periodCount).count(None) != 1:
raise TypeError("Only two parameters are permitted from startTime, endTime and periodCount")
dataparams = {
'id' : id, # String, such as "R123" or "VM456"
'format' : format.lower(), # Enum: "standard" "complete"
'startTime' : startTime, # if None, it wont get arsed to the url
'endTime' : endTime, # If None, it wont get parsed to the url
'periodCount' : periodCount, # If None, it wont get parsed to the url
'calibrated' : calibrated, # Boolean
'interpolated' : interpolated, # Boolean
'periodType' : periodType.lower(), # Enum: "halfHour" "hour" "day" "week" "month"
}
subpath = "/public/readings" if self.username is None else "/readings"
# Convert to ISO strings assuming datetimes or dates were given
if isinstance(dataparams["startTime"], datetime):
dataparams["startTime"] = dataparams["startTime"].astimezone(timezone.utc).isoformat().replace("+00:00", "Z", 1)
elif isinstance(dataparams["startTime"], date):
dataparams["startTime"] = dataparams["startTime"].isoformat() + "T00:00:00Z"
if isinstance(dataparams["endTime"], datetime):
dataparams["endTime"] = dataparams["endTime"].astimezone(timezone.utc).isoformat().replace("+00:00", "Z", 1)
elif isinstance(dataparams["endTime"], date):
dataparams["endTime"] = dataparams["endTime"].isoformat() + "T00:00:00Z"
# Actually get the data and stream it into the json iterative decoder
with self.lock:
# Stream the response into json decoder for efficiency
reply = self.s.get(self.rooturl+subpath, params=dataparams,
timeout=self.timeout)
self._raise_for_status(reply)
logging.info(f"Got readings for {id}, server response time: {reply.elapsed.total_seconds()}s")
if iterator and IJSONAVAILABLE:
# The user must ask for an iterator AND the module must be available
return self._iterjson_reads(reply, format)
else:
return self._json_reads(reply, format)
def largereadings(self, *args, maxwindow=timedelta(days=365), periodCount=None,
startTime=None, endTime=None, iterator=False, periodType="halfHour", **kwargs):
"""
Returns a list or iterator of readings for the specified register or virtual meter and timespan.
Structure is approximately a Dict containing header information with a nested list of readings
where readings are a list/iterator containing a Dict for each reading.
A potentially very large query will be chunked into numerous smaller transactions
using the 'readings' method based on a maxwindow size (a timedelta defaulting
to about 12 months) and provides the result as if one single transaction was completed;
either a very large list of readings or a single iterator over all readings from all
constituent underlying transactions.
This method utilises the simpler 'readings' method and aside from maxwindow,
all other arguments are as per 'readings' and are passed through, however,
a startTime and endTime must be provided explicitly (not using periodCount).
Using an iterator (iterator=True with ijson module) will yield one
reading at a time which may be more memory efficient for large data sets
but the values are not retained after consumption.
If memory usage is not a concern or you need to retain the data, then
the iterator=False (default) will simply return one single list of reads.
In both cases, each element of the list or iterator will consist of a
dictionary with values as floats and dates as timezone aware datetime objects.
It is possible for the floats to represent positive and negative infinities or nan.
"""
periodTimedelta = {
"halfhour" : timedelta(minutes=30),
"hour" : timedelta(hours=1),
"day" : timedelta(days=1),
"week" : timedelta(days=7),
"month" : timedelta(days=31),
}
periodType = periodType.lower()
ptd = periodTimedelta[periodType]
if maxwindow < ptd:
raise TypeError("Max window is smaller than the periodType")
if None in (startTime, endTime):
raise TypeError("You must explicitly provide a startTime and endTime if you want batching")
if isinstance(startTime, datetime):
startTime = startTime.astimezone(timezone.utc)
elif isinstance(startTime, date):
startTime = datetime.combine(startTime, time(), timezone.utc)
else:
raise TypeError("The startTime isn't a 'datetime' or 'date' instance")
if isinstance(endTime, datetime):
endTime = endTime.astimezone(timezone.utc)
elif isinstance(endTime, date):
endTime = datetime.combine(endTime, time(), timezone.utc)
else:
raise TypeError("The endTime isn't a 'datetime' or 'date' instance")
if endTime < startTime: # Swap dates if reversed
startTime, endTime = endTime, startTime
if startTime == endTime:
raise TypeError("The startTime and endTime are the same")
elif (
(periodType == "halfhour" and not ( # Check its a clean half hour
endTime.microsecond == endTime.second == startTime.microsecond == startTime.second == 0 and
startTime.minute in (0,30) and endTime.minute in (0,30)
)) or
(periodType == "hour" and not ( # Check its a clean hour
endTime.microsecond == endTime.second == endTime.minute == startTime.microsecond == startTime.second == startTime.minute == 0
)) or
(periodType == "day" and not ( # Check its a clean day
endTime.microsecond == endTime.second == endTime.minute == endTime.hour == startTime.microsecond == startTime.second == startTime.minute == startTime.hour == 0
)) or
(periodType == "week" and not ( # Check its a clean week starting on (and therfore including) Monday and ending on (but not including) another Monday
endTime.microsecond == endTime.second == endTime.minute == endTime.hour == startTime.microsecond == startTime.second == startTime.minute == startTime.hour == 0 and
startTime.weekday() == endTime.weekday() == 0
)) or
(periodType == "month" and not ( # Check its a clean month starting on (and therfore including) the 1st and ending on (but not including) another 1st
endTime.microsecond == endTime.second == endTime.minute == endTime.hour == startTime.microsecond == startTime.second == startTime.minute == startTime.hour == 0 and
startTime.day == endTime.day == 1
)) ):
raise TypeError("The startTime and endTime must be aligned with the periodType")
maxwindow=abs(maxwindow) # Strip negative durations and dont go too small
if maxwindow < timedelta(days=1):
maxwindow = timedelta(days=1)
reqwindow = endTime - startTime # Requested window/duration
logging.info(f"{reqwindow} requested and the maximum limit is {maxwindow}")
if reqwindow <= maxwindow: # If the period is smaller than max, use directly
logging.info("Only 1 transaction is needed")
return self.readings(*args, startTime=startTime, endTime=endTime, periodType=periodType, iterator=iterator, **kwargs)
else: # If the period is larger than max, then break it down
if periodType == "month":
reqperiods = (endTime.year-startTime.year)*12 + (endTime.month-startTime.month) # Requested duration counted in months
else:
reqperiods = reqwindow // ptd # Requested duration in periods
maxperiods = maxwindow // ptd # Maximum duration in periods
d = 2 # Start by dividing into 2 intervals, since 1 was tested above
while True:
# Divide into incrementally more/smaller peices and check
i, r = divmod(reqperiods, d)
# Once the peices are small enough, stop.
# If there is no remainder, take i, otherwise the remainders will
# be added to other peices so add 1 and check that instead.
if (i+1 if r else i) <= maxperiods: break
d += 1
# Make a list of HH sample sizes, with the remainders added onto the
# first sets. Such as 11, 11, 10 for a total of 32.
periodsBlocks = [i+1]*r + [i]*(d-r)
logging.info(f"{len(periodsBlocks)} transactions will be used")
Intervals=[]
IntervalStart = startTime # The first starttime is the original start
for i in periodsBlocks:
# Add calculated number of periods to the start date
IntervalEnd = IntervalStart + i * ptd
if periodType == "month":
IntervalEnd = IntervalEnd.replace(day=1)
# Define each sample window and start the next one after the last
Intervals.append({"startTime":IntervalStart,"endTime":IntervalEnd})
IntervalStart = IntervalEnd
if iterator:
result = {
"startTime" : Intervals[0]["startTime"],
"endTime" : Intervals[-1]["endTime"],
}
iterresults = ( self.readings(*args, periodType=periodType, iterator=iterator, periodCount=None, **chunk, **kwargs) for chunk in Intervals )
firstchunk = next(iterresults)
for item in firstchunk:
if item not in ("startTime", "endTime", "readings"):
result[item] = firstchunk[item]
elif item == "readings":
def concatreadings(firstone, otherones):
yield from firstone["readings"]
for chunk in iterresults:
yield from chunk["readings"]
result["readings"] = concatreadings(firstchunk,iterresults)
return result
else:
# Gather results
result = {"readings":[]}
for chunk in Intervals:
chunkresult = self.readings(*args, periodType=periodType, iterator=iterator, **chunk, **kwargs)
for item in chunkresult:
if item not in result or item == "endTime":
result[item] = chunkresult[item]
elif item == "readings":
result["readings"].extend(chunkresult["readings"])
return result