forked from pst-group/pysystemtrade
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtimed_storage.py
More file actions
210 lines (155 loc) · 7.59 KB
/
Copy pathtimed_storage.py
File metadata and controls
210 lines (155 loc) · 7.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
"""
Generic timed storage; more bullet proof than a data frame
"""
from syscore.objects import (
missing_data,
success,
failure,
resolve_function,
)
from sysdata.base_data import baseData
from syslogdiag.log import logtoscreen
from sysobjects.production.timed_storage import listOfEntriesAsListOfDicts, listOfEntries, timedEntry
class classWithListOfEntriesAsListOfDicts(object):
def __init__(self, class_of_entry_list, list_of_entries_as_list_of_dicts: listOfEntriesAsListOfDicts):
self.class_of_entry_list = class_of_entry_list
self.list_of_entries_as_list_of_dicts = list_of_entries_as_list_of_dicts
def as_list_of_entries(self):
return self.list_of_entries_as_list_of_dicts.as_list_of_entries(self.class_of_entry_list)
class classStrWithListOfEntriesAsListOfDicts(object):
def __init__(self, class_of_entry_list_as_str: str, list_of_entries_as_list_of_dicts: listOfEntriesAsListOfDicts):
self.class_of_entry_list_as_str = class_of_entry_list_as_str
self.list_of_entries_as_list_of_dicts = list_of_entries_as_list_of_dicts
def with_class_object(self):
class_of_entry_list = resolve_function(self.class_of_entry_list_as_str)
return classWithListOfEntriesAsListOfDicts(class_of_entry_list, self.list_of_entries_as_list_of_dicts)
def as_list_of_entries(self):
with_class_object = self.with_class_object()
return with_class_object.as_list_of_entries()
def entry_list_as_plain_list(self):
return self.list_of_entries_as_list_of_dicts.as_plain_list()
class listOfEntriesData(baseData):
"""
base data class for list of entries
Highly generic, an args dict is any set of labels eg strategy & instrument for positions
Most of the methods are private because when we inherit we don't want to see them
"""
def _data_class_name(self) -> str:
## The type of storage we are putting in here
return "sysdata.production.generic_timed_storage.listOfEntries"
@property
def _data_class(self):
class_name = self._data_class_name()
return resolve_function(class_name)
@property
def _empty_data_series(self):
data_class = self._data_class
data_class_instance = data_class([])
empty_entry_series = data_class_instance.as_empty()
return empty_entry_series
def __init__(self, log=logtoscreen("listOfEntriesData")):
super().__init__(log=log)
def _delete_all_data_for_args_dict(
self, args_dict: dict,
are_you_really_sure: bool=False):
if not are_you_really_sure:
self.log.warn(
"To delete all data, need to set are_you_really_sure=True")
return failure
empty_entry_series = self._empty_data_series
self._write_series_for_args_dict(args_dict, empty_entry_series)
def _update_entry_for_args_dict(self, new_entry: timedEntry, args_dict: dict):
existing_series = self._get_series_for_args_dict(args_dict)
if len(existing_series) > 0:
# Check types match
self._check_class_name_matches_for_new_entry(args_dict, new_entry)
try:
existing_series.append(new_entry)
except Exception as e:
self.log.warn(
"Error %s when updating for %s with %s"
% (str(e), str(args_dict), str(new_entry))
)
return failure
self._write_series_for_args_dict(
args_dict, existing_series
)
return success
def _check_class_name_matches_for_new_entry(self, args_dict:dict, new_entry: timedEntry):
entry_class_name_new_entry = new_entry.containing_data_class_name
entry_class_name_existing = self._get_class_of_entry_list_as_str(args_dict)
split_new_name = entry_class_name_new_entry.split(".")[-1]
split_existing_name = entry_class_name_existing.split(".")[-1]
try:
assert split_new_name == split_existing_name
except BaseException:
self.log.warn(
"You tried to add an entry of type %s to existing data type %s" %
(entry_class_name_new_entry, entry_class_name_existing))
return failure
def _delete_last_entry_for_args_dict(self, args_dict, are_you_sure=False):
if not are_you_sure:
self.log.warn("Have to set are_you_sure to True when deleting")
return failure
entry_series = self._get_series_for_args_dict(args_dict)
try:
entry_series.delete_last_entry()
except IndexError:
self.log.warn(
"Can't delete last entry for %s, as none present" %
str(args_dict))
return failure
self._write_series_for_args_dict(args_dict, entry_series)
return success
def _get_current_entry_for_args_dict(self, args_dict):
entry_series = self._get_series_for_args_dict(args_dict)
current_entry = entry_series.final_entry()
return current_entry
def _get_series_for_args_dict(self, args_dict) -> listOfEntries:
class_with_series_as_list_of_dicts = self._get_series_dict_and_class_for_args_dict(
args_dict)
if class_with_series_as_list_of_dicts is missing_data:
return self._empty_data_series
entry_series = class_with_series_as_list_of_dicts.as_list_of_entries()
return entry_series
def _get_series_dict_and_class_for_args_dict(self, args_dict: dict) -> classWithListOfEntriesAsListOfDicts:
class_str_with_series_as_list_of_dicts = (
self._get_series_dict_with_data_class_for_args_dict(args_dict)
)
if class_str_with_series_as_list_of_dicts is missing_data:
return missing_data
class_with_series_as_list_of_dicts = class_str_with_series_as_list_of_dicts.with_class_object()
return class_with_series_as_list_of_dicts
def _write_series_for_args_dict(
self, args_dict: dict,
entry_series: listOfEntries
):
entry_series_as_list_of_dicts = entry_series.as_list_of_dict()
class_of_entry_list_as_str = self._get_class_of_entry_list_as_str(args_dict)
class_str_with_series_as_list_of_dicts = \
classStrWithListOfEntriesAsListOfDicts(class_of_entry_list_as_str,
entry_series_as_list_of_dicts)
self._write_series_dict_for_args_dict(args_dict,
class_str_with_series_as_list_of_dicts
)
return success
def _get_class_of_entry_list_as_str(
self, args_dict: dict,
) -> str:
## Use existing data, or if not available use the default for this object
class_str_with_series_as_list_of_dicts = \
self._get_series_dict_with_data_class_for_args_dict(args_dict)
if class_str_with_series_as_list_of_dicts is missing_data:
return self._data_class_name()
else:
return class_str_with_series_as_list_of_dicts.class_of_entry_list_as_str
def _get_series_dict_with_data_class_for_args_dict(self, args_dict: dict) -> classStrWithListOfEntriesAsListOfDicts:
# return data_class, series_as_list_of_dicts
## return missing_data if unvailable
raise NotImplementedError("Need to use child class")
def _write_series_dict_for_args_dict(
self, args_dict: dict, class_str_with_series_as_list_of_dicts: classStrWithListOfEntriesAsListOfDicts
):
raise NotImplementedError("Need to use child class")
def _get_list_of_args_dict(self) -> list:
raise NotImplementedError("Need to use child class")