-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm_finance_predictor.py
More file actions
318 lines (260 loc) · 14.2 KB
/
llm_finance_predictor.py
File metadata and controls
318 lines (260 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
# ===============================================================
# llm_finance_predictor.py — Языковая модель для анализа рынка
# ---------------------------------------------------------------
# Основной скрипт эксперимента:
# • Преобразует исторические котировки OHLCV в текстовые описания
# • Передаёт их в DistilBERT / Transformers
# • Предсказывает направление движения цены (рост/падение)
# • Сохраняет метрики (accuracy, f1, auc) и логи
#
# Автор: Михаил Шардин
# Онлайн-визитка: https://shardin.name/?utm_source=python
#
# Репозиторий: https://github.com/empenoso/llm-stock-market-predictor
# ===============================================================
import pandas as pd
import numpy as np
from pathlib import Path
from typing import List, Tuple, Dict
from dataclasses import dataclass
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
TrainingArguments,
Trainer,
EarlyStoppingCallback
)
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, roc_auc_score
from tqdm import tqdm
import warnings
warnings.filterwarnings('ignore')
tqdm.pandas()
@dataclass
class FeatureConfig:
"""Конфигурация для создания признаков"""
short_window: int = 3
medium_window: int = 7
long_window: int = 14
volume_threshold: float = 1.2
class OHLCVFeatureExtractor:
"""Преобразует данные OHLCV в троичные признаки и текстовые паттерны"""
def __init__(self, config: FeatureConfig):
self.config = config
def _ternary_encode(self, series: pd.Series) -> pd.Series:
"""Преобразует изменения цены в троичный формат: +1 (рост), 0 (без изменений), -1 (падение)"""
changes = series.pct_change()
threshold = 0.001
result = pd.Series(0, index=series.index, dtype=np.int8)
result[changes > threshold] = 1
result[changes < -threshold] = -1
return result
def _calculate_patterns(self, df: pd.DataFrame) -> pd.DataFrame:
"""Рассчитывает все числовые признаки для датафрейма"""
features = pd.DataFrame(index=df.index)
features['close_ternary'] = self._ternary_encode(df['close'])
features['short_trend'] = features['close_ternary'].rolling(window=self.config.short_window).sum()
features['medium_trend'] = features['close_ternary'].rolling(window=self.config.medium_window).sum()
features['hl_range'] = ((df['high'] - df['low']) / df['close']).rolling(window=self.config.short_window).mean()
avg_volume = df['volume'].rolling(window=self.config.long_window).mean()
features['volume_momentum'] = df['volume'] / (avg_volume + 1e-9)
features['price_momentum'] = df['close'].pct_change(self.config.short_window)
features['near_high'] = (df['close'] / df['high'].rolling(window=self.config.long_window).max()) > 0.98
features['near_low'] = (df['close'] / df['low'].rolling(window=self.config.long_window).min()) < 1.02
features['target'] = (df['close'].shift(-1) > df['close']).astype(int)
return features
def _features_to_text_vectorized(self, features: pd.DataFrame) -> pd.Series:
"""Векторизованное преобразование признаков в текст."""
text_parts = []
# Short trend
conditions = [features['short_trend'] >= 2, features['short_trend'] >= 1,
features['short_trend'] <= -2, features['short_trend'] <= -1]
choices = ["price rising strongly", "price rising", "price falling strongly", "price falling"]
text_parts.append(pd.Series(np.select(conditions, choices, default="price consolidating"), index=features.index))
# Medium trend
conditions = [features['medium_trend'] >= 3, features['medium_trend'] <= -3]
choices = ["uptrend established", "downtrend established"]
text_parts.append(pd.Series(np.select(conditions, choices, default="sideways movement"), index=features.index))
# Volatility
conditions = [features['hl_range'] > 0.03, features['hl_range'] < 0.01]
choices = ["high volatility", "low volatility"]
text_parts.append(pd.Series(np.select(conditions, choices, default="normal volatility"), index=features.index))
# Volume
conditions = [features['volume_momentum'] > 1.5, features['volume_momentum'] > 1.2,
features['volume_momentum'] < 0.7]
choices = ["volume surging", "volume increasing", "volume declining"]
text_parts.append(pd.Series(np.select(conditions, choices, default="volume stable"), index=features.index))
# Momentum
conditions = [features['price_momentum'] > 0.05, features['price_momentum'] < -0.05]
choices = ["strong momentum", "weak momentum"]
text_parts.append(pd.Series(np.select(conditions, choices, default=""), index=features.index))
# Support/Resistance - чистое pandas решение
support_res_parts = []
support_res_parts.append(features['near_high'].map({True: "near resistance", False: ""}))
support_res_parts.append(features['near_low'].map({True: "near support", False: ""}))
text_parts.extend(support_res_parts)
# Объединяем все части
combined = pd.concat(text_parts, axis=1)
return combined.apply(lambda row: ' '.join([x for x in row if x and str(x).strip()]).strip(), axis=1)
def process_dataframe(self, df: pd.DataFrame) -> pd.DataFrame:
"""Полный конвейер: OHLCV -> признаки -> текст"""
df = df.sort_values(['ticker', 'datetime']).reset_index(drop=True)
# Применяем расчеты к каждой группе тикеров
features = df.groupby('ticker', group_keys=False).apply(self._calculate_patterns)
# Векторизованная генерация текста
features['text'] = self._features_to_text_vectorized(features)
# Объединяем обратно с исходными данными
final_df = df.join(features)
# Удаляем строки с NaN
final_df = final_df.dropna(subset=['text', 'target'])
return final_df
class FinancialTextDataset(Dataset):
"""PyTorch Dataset для финансовых текстовых данных"""
def __init__(self, texts: List[str], labels: List[int], tokenizer, max_length: int = 128):
self.texts = texts
self.labels = labels
self.tokenizer = tokenizer
self.max_length = max_length
def __len__(self):
return len(self.texts)
def __getitem__(self, idx):
text = self.texts[idx]
label = self.labels[idx]
encoding = self.tokenizer(
text,
max_length=self.max_length,
padding='max_length',
truncation=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].flatten(),
'attention_mask': encoding['attention_mask'].flatten(),
'labels': torch.tensor(label, dtype=torch.long)
}
class WalkForwardValidator:
"""Реализация скользящей (walk-forward) валидации для временных рядов"""
def __init__(self, train_size: int = 252, test_size: int = 21, step_size: int = 21):
self.train_size = train_size
self.test_size = test_size
self.step_size = step_size
def split(self, data: pd.DataFrame) -> List[Tuple[pd.DataFrame, pd.DataFrame]]:
"""Генерирует наборы для обучения и теста"""
splits = []
total_length = len(data)
start_idx = 0
while start_idx + self.train_size + self.test_size <= total_length:
train_end = start_idx + self.train_size
test_end = train_end + self.test_size
train_data = data.iloc[start_idx:train_end]
test_data = data.iloc[train_end:test_end]
splits.append((train_data, test_data))
start_idx += self.step_size
return splits
class LLMFinancialPredictor:
"""Основной класс для предиктора"""
def __init__(self, model_name: str = "distilbert-base-uncased", device: str = None):
self.model_name = model_name
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
print(f"🚀 Инициализация модели: {model_name}")
print(f"💻 Используемое устройство: {self.device}")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = None
self.feature_extractor = OHLCVFeatureExtractor(FeatureConfig())
def load_data(self, file_path: Path) -> pd.DataFrame:
"""Загружает и предварительно обрабатывает данные OHLCV"""
df = pd.read_csv(file_path, sep='\t', parse_dates=['datetime'], dayfirst=True)
df = df.sort_values('datetime').reset_index(drop=True)
return df
def prepare_dataset(self, features_df: pd.DataFrame) -> Tuple[List[str], List[int]]:
"""Извлекает тексты и метки из датафрейма с признаками"""
texts = features_df['text'].tolist()
labels = features_df['target'].astype(int).tolist()
return texts, labels
def train(self, train_texts: List[str], train_labels: List[int],
val_texts: List[str] = None, val_labels: List[int] = None,
epochs: int = 3, batch_size: int = 32, learning_rate: float = 2e-5):
"""Обучает модель"""
# Подавляем предупреждение о неинициализированных весах (это нормально для fine-tuning)
import logging
logging.getLogger("transformers.modeling_utils").setLevel(logging.ERROR)
self.model = AutoModelForSequenceClassification.from_pretrained(
self.model_name,
num_labels=2,
problem_type="single_label_classification"
).to(self.device)
train_dataset = FinancialTextDataset(train_texts, train_labels, self.tokenizer)
eval_dataset = FinancialTextDataset(val_texts, val_labels, self.tokenizer) if val_texts and val_labels else None
training_args = TrainingArguments(
output_dir='./results/training_output',
num_train_epochs=epochs,
per_device_train_batch_size=batch_size,
per_device_eval_batch_size=batch_size * 2,
learning_rate=learning_rate,
weight_decay=0.01,
logging_dir='./logs',
logging_steps=50,
evaluation_strategy="epoch" if eval_dataset else "no",
save_strategy="epoch",
load_best_model_at_end=True if eval_dataset else False,
metric_for_best_model="eval_loss" if eval_dataset else None,
fp16=True,
disable_tqdm=False,
report_to="none",
save_total_limit=1
)
trainer = Trainer(
model=self.model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
compute_metrics=self.compute_metrics if eval_dataset else None,
callbacks=[EarlyStoppingCallback(early_stopping_patience=2)] if eval_dataset else []
)
trainer.train()
def predict(self, texts: List[str], batch_size: int = 64) -> np.ndarray:
"""Предсказывает вероятности для текстов"""
self.model.eval()
dataset = FinancialTextDataset(texts, [0] * len(texts), self.tokenizer)
dataloader = DataLoader(dataset, batch_size=batch_size)
predictions = []
with torch.no_grad():
for batch in dataloader:
input_ids = batch['input_ids'].to(self.device)
attention_mask = batch['attention_mask'].to(self.device)
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)
probs = torch.softmax(outputs.logits, dim=1)
predictions.extend(probs[:, 1].cpu().numpy())
return np.array(predictions)
@staticmethod
def compute_metrics(eval_pred):
"""Рассчитывает метрики для оценки"""
predictions, labels = eval_pred
probs = torch.softmax(torch.from_numpy(predictions), dim=1).numpy()
preds = np.argmax(predictions, axis=1)
accuracy = accuracy_score(labels, preds)
precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='binary')
auc = roc_auc_score(labels, probs[:, 1])
return {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': f1, 'auc': auc}
def evaluate(self, texts: List[str], labels: List[int]) -> Dict[str, float]:
"""Оценивает производительность модели"""
if not labels or not texts:
return {}
probs = self.predict(texts)
predictions = (probs > 0.5).astype(int)
accuracy = accuracy_score(labels, predictions)
precision, recall, f1, _ = precision_recall_fscore_support(
labels, predictions, average='binary', zero_division=0
)
try:
auc = roc_auc_score(labels, probs)
except ValueError:
auc = 0.5
return {
'accuracy': accuracy,
'precision': precision,
'recall': recall,
'f1': f1,
'auc': auc
}