forked from J-Quants/JPXTokyoStockExchangePrediction
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFeatures.py
More file actions
201 lines (148 loc) · 5.24 KB
/
Features.py
File metadata and controls
201 lines (148 loc) · 5.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import numpy as np
import pandas as pd
from enum import Enum
class FeatureType(Enum):
GLOBAL = 0
LOCAL = 1
class Feature:
def __init__(self, feature_type, name):
self.feature_type = feature_type
self.name = name
def add_feature_pandas(self, df):
raise NotImplementedError
def update_row(self, row):
raise NotImplementedError
def reset(self):
raise NotImplementedError
def copy(self):
raise NotImplementedError
class SMA(Feature):
def __init__(self, col, period):
super().__init__(FeatureType.LOCAL, col + "SMA" + str(period))
self.col = col
self.period = period
self.elements = [np.nan] * period
self.ptr = 0
self.mean = np.nan
def add_feature_pandas(self, df):
df[self.name] = df[self.col].rolling(self.period).mean()
for k in range(self.period):
df[self.name].iloc[k] = np.mean(df[self.col].iloc[:k+1])
return df
def update_row(self, row):
dequeue = self.elements[self.ptr % self.period]
enqueue = row[self.col]
self.elements[self.ptr % self.period] = enqueue
self.ptr += 1
mean = 0
if self.ptr < self.period: # We have not yet seen enough elements
mean = np.mean(self.elements[:self.ptr])
elif self.ptr == self.period: # We have the value for the first time
self.mean = np.mean(self.elements)
mean = self.mean
else:
mean = self.mean + (- dequeue + enqueue) / self.period # Simple and efficient updates
self.mean = mean
row[self.name] = mean
return row
def reset(self):
self.elements = [np.nan] * self.period
self.ptr = 0
self.mean = np.nan
def copy(self):
return SMA(self.col, self.period)
class Amplitude(Feature):
def __init__(self):
super().__init__(FeatureType.LOCAL, "Amplitude")
def add_feature_pandas(self, df):
df[self.name] = df["High"] - df["Low"]
return df
def update_row(self, row):
row[self.name] = row["High"] - row["Low"]
return row
def reset(self):
return
def copy(self):
return Amplitude()
class OpenCloseReturn(Feature):
def __init__(self):
super().__init__(FeatureType.LOCAL, "OpenCloseReturn")
def add_feature_pandas(self, df):
df[self.name] = (df["Close"] - df["Open"]) / df["Open"]
return df
def update_row(self, row):
row[self.name] = (row["Close"] - row["Open"]) / row["Open"]
return row
def reset(self):
return
def copy(self):
return OpenCloseReturn()
class Return(Feature):
def __init__(self):
super().__init__(FeatureType.LOCAL, "Return")
self.last_close = None
def add_feature_pandas(self, df):
df[self.name] = ((df["Close"] - df["Close"].shift(1)) / df["Close"].shift(1)).fillna(0)
return df
def update_row(self, row):
if self.last_close is None:
row[self.name] = 0
else:
row[self.name] = (row["Close"] - self.last_close) / self.last_close
self.last_close = row["Close"]
return row
def reset(self):
self.last_close = None
def copy(self):
return Return()
class Volatility(Feature):
def __init__(self, n=30):
super().__init__(FeatureType.LOCAL, "Volatility" + str(n))
self.n = n
self.returns = np.ones(n) * np.nan
self.ptr = 0
self.index = 0
def volatility_row_function(self, df, row):
l = max(0, self.index + 1 - self.n)
r = self.index + 1
self.index += 1
return np.std(df["Return"].to_numpy()[l:r], ddof=1)
def add_feature_pandas(self, df):
df[self.name] = df.apply(lambda row: self.volatility_row_function(df, row), axis=1)
return df.fillna(0)
def update_row(self, row):
self.returns[self.ptr % self.n] = row["Return"]
if self.ptr == 0:
row[self.name] = 0
elif self.ptr < self.n - 1:
vec = self.returns[:(self.ptr + 1) % self.n]
row[self.name] = np.std(vec, ddof=1)
else:
row[self.name] = np.std(self.returns, ddof=1)
self.ptr += 1
return row
def reset(self):
self.returns = np.ones(self.n) * np.nan
self.ptr = 0
self.index = 0
def copy(self):
return Volatility(self.n)
class FeatureChecker:
def verify(feature, df_prices):
df_pandas = feature.add_feature_pandas(df_prices)
df_online = df_prices.apply(lambda row: feature.update_row(row), axis=1)
return np.isclose(df_pandas[feature.name].to_numpy(),
df_online[feature.name].to_numpy(), equal_nan=True).all()
def verify_features(features, df_prices):
verifications = []
all_verified = True
for feature in features:
this = FeatureChecker.verify(feature, df_prices)
verifications.append((feature.name, this))
all_verified = all_verified and this
if all_verified:
print("All features passed the check.")
else:
print("Some features failed the check.")
print(verifications)
return verifications