-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathanalysis.py
More file actions
181 lines (149 loc) · 6.99 KB
/
analysis.py
File metadata and controls
181 lines (149 loc) · 6.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import pandas as pd
import pandas_ta as ta
from scipy.signal import find_peaks
def find_support_resistance(data):
"""
Identifies significant support and resistance levels from historical data
using price pivot points (peaks and valleys).
Args:
data (pd.DataFrame): DataFrame with historical stock data.
Returns:
dict: A dictionary containing lists of support and resistance levels.
"""
support_levels = []
resistance_levels = []
# Find peaks (resistance) and valleys (support)
# The 'prominence' parameter helps filter out minor wiggles
resistance_indices, _ = find_peaks(data['High'], prominence=1)
support_indices, _ = find_peaks(-data['Low'], prominence=1)
if resistance_indices.size > 0:
resistance_levels = data['High'].iloc[resistance_indices].to_list()
if support_indices.size > 0:
support_levels = data['Low'].iloc[support_indices].to_list()
# Get the most recent significant levels
latest_price = data['Close'].iloc[-1]
# Find the first support level below the current price
recent_support = max([s for s in support_levels if s < latest_price] or [0])
# Find the first resistance level above the current price
recent_resistance = min([r for r in resistance_levels if r > latest_price] or [float('inf')])
return {
"recent_support": f"{recent_support:.2f}" if recent_support > 0 else "None",
"recent_resistance": f"{recent_resistance:.2f}" if recent_resistance != float('inf') else "None"
}
def analyze_stock_data(data):
"""
Performs a comprehensive technical analysis using multiple indicators.
Args:
data (pd.DataFrame): DataFrame with historical stock data.
It must contain 'Open', 'High', 'Low', 'Close' columns.
Returns:
dict: A dictionary containing the analysis results, including the final signal,
signal strength, and individual indicator states.
"""
if data is None or data.empty or len(data) < 200:
return {
"signal": "Error",
"summary": "Not enough data for comprehensive analysis (requires at least 200 days)."
}
try:
# --- 1. Calculate all indicators ---
data.ta.rsi(append=True)
data.ta.macd(append=True, fast=12, slow=26, signal=9)
data.ta.bbands(append=True, length=20)
data.ta.sma(length=50, append=True)
data.ta.sma(length=200, append=True)
# --- New: Add Breakout Detection ---
# We look at the highest high over the last 50 days.
# We use .shift(1) so that today's high isn't included in the calculation.
data['high_50d'] = data['High'].rolling(window=50).max().shift(1)
# Define the exact column names generated by pandas_ta to avoid errors
rsi_col = 'RSI_14'
macd_col = 'MACD_12_26_9'
macds_col = 'MACDs_12_26_9'
bbl_col = 'BBL_20_2.0'
bbu_col = 'BBU_20_2.0'
sma50_col = 'SMA_50'
sma200_col = 'SMA_200'
# Get the latest row of data
latest = data.iloc[-1]
# --- 2. Implement Scoring System ---
buy_score = 0
sell_score = 0
reasons = []
# RSI Logic
is_oversold = latest[rsi_col] < 30
is_overbought = latest[rsi_col] > 70
if is_oversold:
buy_score += 1
reasons.append(f"RSI is Oversold ({latest[rsi_col]:.2f})")
elif is_overbought:
sell_score += 1
reasons.append(f"RSI is Overbought ({latest[rsi_col]:.2f})")
# MACD Logic (Signal Line Crossover)
if latest[macd_col] > latest[macds_col]:
buy_score += 1
reasons.append("MACD crossed above Signal Line (Bullish)")
elif latest[macd_col] < latest[macds_col]:
sell_score += 1
reasons.append("MACD crossed below Signal Line (Bearish)")
# Bollinger Bands Logic
price_below_bbl = latest['Close'] < latest[bbl_col]
price_above_bbu = latest['Close'] > latest[bbu_col]
if price_below_bbl:
buy_score += 1
reasons.append("Price touched lower Bollinger Band")
elif price_above_bbu:
sell_score += 1
reasons.append("Price touched upper Bollinger Band")
# Define High-Probability Reversal Conditions
strong_reversal_buy = is_oversold and price_below_bbl
strong_reversal_sell = is_overbought and price_above_bbu
# --- New: Check for Breakout Condition ---
is_breakout = latest['Close'] > latest['high_50d']
if is_breakout:
reasons.append(f"Price broke above 50-day high ({latest['high_50d']:.2f})")
# Trend Logic
is_uptrend = latest[sma50_col] > latest[sma200_col]
is_downtrend = latest[sma50_col] < latest[sma200_col]
if is_uptrend:
reasons.append("Primary Trend: Uptrend (50-day MA > 200-day MA)")
elif is_downtrend:
reasons.append("Primary Trend: Downtrend (50-day MA < 200-day MA)")
# --- 3. Determine Final Signal (Hybrid Model) ---
signal = "Neutral"
if is_uptrend:
# In an uptrend, we look for buys, but will sell on a strong reversal.
if buy_score >= 2:
signal = "Strong Buy" if buy_score >= 3 else "Buy"
elif strong_reversal_sell: # Only sell if we have a very strong reversal signal
signal = "Sell"
elif is_downtrend:
# In a downtrend, we look for sells, but will buy on a strong reversal.
if sell_score >= 2:
signal = "Strong Sell" if sell_score >= 3 else "Sell"
elif strong_reversal_buy: # Only buy if we have a very strong reversal signal
signal = "Buy"
else: # Sideways market (MA's are very close or equal), use original logic
if buy_score >= 2:
signal = "Strong Buy" if buy_score >= 3 else "Buy"
elif sell_score >= 2:
signal = "Strong Sell" if sell_score >= 3 else "Sell"
return {
"signal": signal,
"buy_score": buy_score,
"sell_score": sell_score,
"is_breakout": bool(is_breakout),
"summary": ", ".join(reasons) if reasons else "No strong technical signals detected.",
"indicators": {
"RSI": f"{latest[rsi_col]:.2f}",
"MACD": f"{latest[macd_col]:.2f}",
"MACD Signal": f"{latest[macds_col]:.2f}",
"SMA 50": f"{latest[sma50_col]:.2f}",
"SMA 200": f"{latest[sma200_col]:.2f}",
"Close Price": f"{latest['Close']:.2f}"
}
}
except KeyError as e:
return {"signal": "Error", "summary": f"A required indicator column was not found: {e}. The data might be incomplete."}
except Exception as e:
return {"signal": "Error", "summary": f"An unexpected error occurred during analysis: {e}"}