diff --git a/.gitignore b/.gitignore index 2cbcfa0..3ce3c39 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,9 @@ - +.vscode *.pyc *.png +notebooks +*.csv +*.txt +*.txt +*.ipynb + diff --git a/argchecker.py b/argchecker.py index c45ab25..c83de95 100644 --- a/argchecker.py +++ b/argchecker.py @@ -17,9 +17,10 @@ def check_arguments(self, args): assert not(args.is_test == 1 and args.future_bars < 2), "You want to test but the future bars are less than 2. That does not give us enough data to test the model properly. Please use a value larger than 2.\nExiting now..." - assert not(args.history_to_use != "all" and int(args.history_to_use_int) < + assert not(args.history_to_use != "all" and int(args.history_to_use) < args.future_bars), "It is a good idea to use more history and less future bars. Please change these two values and try again.\nExiting now..." args.market_index = str(args.market_index).upper() if args.history_to_use != "all": args.history_to_use = int(args.history_to_use) + return args diff --git a/backtester.py b/backtester.py index 1f479f5..9270c34 100644 --- a/backtester.py +++ b/backtester.py @@ -1,117 +1,129 @@ -# Basic libraries -import os -import sys -import math -import scipy -import random -import collections -import numpy as np -import pandas as pd -import matplotlib.pyplot as plt -import warnings -warnings.filterwarnings("ignore") - -# Styling for plots -plt.style.use('seaborn-white') -plt.rc('grid', linestyle="dotted", color='#a0a0a0') -plt.rcParams['axes.edgecolor'] = "#04383F" - - -class BackTester: - """ - Backtester module that does both backward and forward testing for our portfolios. - """ - def __init__(self): - print("\n--# Backtester has been initialized") - - def calculate_percentage_change(self, old, new): - """ - Percentage change - """ - return ((new - old) * 100) / old - - def portfolio_weight_manager(self, weight, is_long_only): - """ - Manage portfolio weights. If portfolio is long only, set the negative weights to zero. - """ - if is_long_only == 1: - weight = max(weight, 0) - else: - weight = weight - return weight - - def back_test(self, symbol_names, portfolio_weights_dictionary, portfolio_data_dictionary, historical_price_market, is_long_only, market_chart, strategy_name): - """ - Main backtest function. Takes in the portfolio weights and compares the portfolio returns with a market index of your choice. - """ - - # Get market returns during the backtesting time - historical_price_market = list(historical_price_market["Close"]) - market_returns = [self.calculate_percentage_change(historical_price_market[i - 1], historical_price_market[i]) for i in range(1, len(historical_price_market))] - market_returns_cumulative = np.cumsum(market_returns) - - # Get invidiual returns for each stock in our portfolio - normal_returns_matrix = [] - for symbol in symbol_names: - symbol_historical_prices = list(portfolio_data_dictionary[symbol]["historical_prices"]["Close"]) - symbol_historical_returns = [self.calculate_percentage_change(symbol_historical_prices[i - 1], symbol_historical_prices[i]) for i in range(1, len(symbol_historical_prices))] - normal_returns_matrix.append(symbol_historical_returns) - - # Get portfolio returns - normal_returns_matrix = np.array(normal_returns_matrix).transpose() - portfolio_weights_vector = np.array([self.portfolio_weight_manager(portfolio_weights_dictionary[symbol], is_long_only) for symbol in portfolio_weights_dictionary]).transpose() - portfolio_returns = np.dot(normal_returns_matrix, portfolio_weights_vector) - portfolio_returns_cumulative = np.cumsum(portfolio_returns) - - # Plot returns - x = np.arange(len(portfolio_returns_cumulative)) - plt.plot(x, portfolio_returns_cumulative, linewidth = 2.0, label = strategy_name) - plt.axhline(y = 0, linestyle = 'dotted', alpha = 0.3, color = 'black') - if market_chart: - x = np.arange(len(market_returns_cumulative)) - plt.plot(x, market_returns_cumulative, linewidth = 2.0, color = '#282828', label = 'Market Index', linestyle = '--') - - # Plotting styles - plt.title("Backtest Results", fontsize = 14) - plt.xlabel("Bars (Time Sorted)", fontsize = 14) - plt.ylabel("Cumulative Percentage Return", fontsize = 14) - plt.xticks(fontsize = 14) - plt.yticks(fontsize = 14) - - def future_test(self, symbol_names, portfolio_weights_dictionary, portfolio_data_dictionary, future_price_market, is_long_only, market_chart, strategy_name): - """ - Main future test function. If future data is available i.e is_test is set to 1 and future_bars set to > 0, this takes in the portfolio weights and compares the portfolio returns with a market index of your choice in the future. - """ - - # Get future prices - future_price_market = [item[4] for item in list(future_price_market)] - market_returns = [self.calculate_percentage_change(future_price_market[i - 1], future_price_market[i]) for i in range(1, len(future_price_market))] - market_returns_cumulative = np.cumsum(market_returns) - - # Get invidiual returns for each stock in our portfolio - normal_returns_matrix = [] - for symbol in symbol_names: - symbol_historical_prices = [item[4] for item in list(portfolio_data_dictionary[symbol]["future_prices"])] - symbol_historical_returns = [self.calculate_percentage_change(symbol_historical_prices[i - 1], symbol_historical_prices[i]) for i in range(1, len(symbol_historical_prices))] - normal_returns_matrix.append(symbol_historical_returns) - - # Get portfolio returns - normal_returns_matrix = np.array(normal_returns_matrix).transpose() - portfolio_weights_vector = np.array([self.portfolio_weight_manager(portfolio_weights_dictionary[symbol], is_long_only) for symbol in portfolio_weights_dictionary]).transpose() - portfolio_returns = np.dot(normal_returns_matrix, portfolio_weights_vector) - portfolio_returns_cumulative = np.cumsum(portfolio_returns) - - # Plot - x = np.arange(len(portfolio_returns_cumulative)) - plt.axhline(y = 0, linestyle = 'dotted', alpha = 0.3, color = 'black') - plt.plot(x, portfolio_returns_cumulative, linewidth = 2.0, label = strategy_name) - if market_chart: - x = np.arange(len(market_returns_cumulative)) - plt.plot(x, market_returns_cumulative, linewidth = 2.0, color = '#282828', label = 'Market Index', linestyle = '--') - - # Plotting styles - plt.title("Future Test Results", fontsize = 14) - plt.xlabel("Bars (Time Sorted)", fontsize = 14) - plt.ylabel("Cumulative Percentage Return", fontsize = 14) - plt.xticks(fontsize = 14) - plt.yticks(fontsize = 14) +# Basic libraries +import os +import sys +import math +import scipy +import random +import collections +import numpy as np +import pandas as pd +import scipy.stats as st +import matplotlib.pyplot as plt +import warnings +from utils import dotdict, get_price_deltas, get_log_returns +from utils import get_predicted_returns +warnings.filterwarnings("ignore") + + +class BackTester: + """ + Backtester module that does both backward and forward testing for our portfolios. + """ + + @staticmethod + def filter_short(weights, long_only): + """ + Manage portfolio weights. If portfolio is long only, + set the negative weights to zero. + """ + if long_only: + return np.array([max(i, 0) for i in weights]) + return weights + + @staticmethod + def plot_market(market_returns): + x = np.arange(len(market_returns)) + plt.plot(x, market_returns, linewidth=2.0, + color='#282828', label='Market Index', linestyle='--') + + @staticmethod + def plot_test(**kwargs): + # Styling for plots + kwargs = dotdict(kwargs) + plt.style.use('seaborn-white') + plt.rc('grid', linestyle="dotted", color='#a0a0a0') + plt.rcParams['figure.figsize'] = (18, 6) + plt.rcParams['axes.edgecolor'] = "#04383F" + plt.rcParams['axes.titlesize'] = "large" + plt.rcParams['axes.labelsize'] = "medium" + plt.rcParams['lines.linewidth'] = 2 + # Plot + plt.axhline(y=0, linestyle='dotted', alpha=0.3, color='black') + + # Plotting styles + kwargs.df.plot(fontsize=14, title=kwargs.title, + xlabel=kwargs.xlabel, ylabel=kwargs.ylabel,) + + @staticmethod + def get_test(p_weights, data, direction: str, long_only: bool): + """ + Main backtest function. Takes in the portfolio weights and compares + the portfolio returns with a market index of your choice. + """ + + assert (direction in ["historical", "future", "sim"] + ), "direction must be 'historical', 'future' or 'sim'" + + # Get invidiual returns for each stock in our portfolio + normal_returns_matrix = [] + symbol_historical_prices = data[direction] + + # Get portfolio returns + normal_returns_matrix = get_price_deltas( + symbol_historical_prices).cumsum() + portfolio_returns = np.dot(normal_returns_matrix, p_weights) + + return portfolio_returns + + @staticmethod + def get_market_returns(market_data, direction): + assert (direction in ["historical", "future", "sim"] + ), "direction must be 'historical', 'future' or 'sim'" + # Get future prices + future_price_market = market_data[direction] + market_returns = get_price_deltas(future_price_market) + return np.cumsum(market_returns) + + @staticmethod + def simulate_future_prices(data: dict, r_func, + simulation_timesteps: int = 30) ->pd.DataFrame: + """Simulates the price of a collection of stocks in the future + + [description] + :param data: a dictionary with the loaded data + :type data: dict + :param simulation_timesteps: number of steps, defaults to 30 + :type simulation_timesteps: number, optional + :returns: A dataframe of simulated prices + :rtype: pd.Dataframe + """ + + # Get log returns from historical data + close_prices = data["historical"] + returns = r_func(close_prices) + symbol_simulations = [] + for col in returns.columns: + # Get distribution of returns + hist = np.histogram(returns[col], bins=32) + hist_dist = st.rv_histogram(hist) # Distribution function + + simulations = [] + # Do 25 iterations to simulate prices + for _ in range(100): + timeseries = [close_prices[col].values[-1]] + for _ in range(min(simulation_timesteps, + data["future"].shape[0])): + # Get simulated return + return_value = np.round( + np.exp(hist_dist.ppf(random.uniform(0, 1))), 5) + data_point = timeseries[-1] * return_value + + # Add to list + timeseries.append(data_point) + # print(timeseries) + simulations.append(np.array(timeseries)) + symbol_simulations.append(np.mean(np.array(simulations), axis=0)) + + df = pd.DataFrame(columns=returns.columns, + data=np.array(symbol_simulations).T) + return df diff --git a/commands.json b/commands.json index 7dc1ca4..f0efcc1 100644 --- a/commands.json +++ b/commands.json @@ -2,12 +2,12 @@ "comm": "--history_to_use", "type": "str", "default": "all", - "help": "How many bars of 1 hour do you want to use for the anomaly detection model. Either an integer or all" + "help": "How many bars of do you want to use for the anomaly detection model. Either an integer or all" }, { "comm": "--data_granularity_minutes", "type": "int", - "default": "15", + "default": "3600", "help": "Minute level data granularity that you want to use. Default is 60 minute bars." }, { @@ -19,7 +19,7 @@ { "comm": "--future_bars", "type": "int", - "default": "30", + "default": "90", "help": "How many bars to keep for testing purposes." }, { @@ -51,5 +51,11 @@ "type": "str", "default": "stocks/stocks.txt", "help": "Stocks file that contains the list of stocks you want to build your portfolio with." + }, + { + "comm": "--save_plot", + "type": "bool", + "default": "False", + "help": "Save plot instead of rendering it immediately." } -] \ No newline at end of file +] \ No newline at end of file diff --git a/data_loader.py b/data_loader.py index 4a04b73..3448875 100644 --- a/data_loader.py +++ b/data_loader.py @@ -1,180 +1,159 @@ -# Basic libraries -import os -import collections -import pandas as pd -import yfinance as yf -from tqdm import tqdm -import warnings -warnings.filterwarnings("ignore") - - -class DataEngine: - def __init__(self, args): - print("\n--> Data engine has been initialized...") - self.args = args - - # Stocks list - self.directory_path = str(os.path.dirname(os.path.abspath(__file__))) - self.stocks_file_path = f"{self.directory_path}/{self.args.stocks_file_path}" - self.stocks_list = [] - - # Load stock names in a list - self.load_stocks_from_file() - - # Dictionary to store data. This will only store and save data if the argument is_save_dictionary is 1. - self.data_dictionary = {} - - # Data length - self.stock_data_length = [] - - def load_stocks_from_file(self): - """ - Load stock names from the file - """ - print("Loading all stocks from file...") - stocks_list = open(self.stocks_file_path, "r").readlines() - stocks_list = [str(item).strip("\n") for item in stocks_list] - - # Load symbols - stocks_list = list(sorted(set(stocks_list))) - print("Total number of stocks: %d" % len(stocks_list)) - self.stocks_list = stocks_list - - def get_most_frequent_key(self, input_list): - counter = collections.Counter(input_list) - return list(counter.keys())[0] - - def get_data(self, symbol): - """ - Get stock data from yahoo finance. - """ - future_prices = None - historical_prices = None - # Find period - if self.args.data_granularity_minutes == 1: - period = "7d" - interval = str(self.args.data_granularity_minutes) + "m" - if self.args.data_granularity_minutes == 3600: - period = "5y" - interval = "1d" - else: - period = "30d" - interval = str(self.args.data_granularity_minutes) + "m" - - # Get stock price - try: - # Stock price - stock_prices = yf.download( - tickers=symbol, - period=period, - interval=interval, - auto_adjust=False, - progress=False) - stock_prices = stock_prices.reset_index() - try: - stock_prices = stock_prices.drop(columns=["Adj Close"]) - except Exception as e: - print("Exception", e) - - data_length = stock_prices.shape[0] - self.stock_data_length.append(data_length) - - # After getting some data, ignore partial data from yfinance - # based on number of data samples - if len(self.stock_data_length) > 5: - most_frequent_key = self.get_most_frequent_key( - self.stock_data_length) - if (data_length != most_frequent_key and - data_length != self.args.history_to_use and - symbol != self.args.market_index): # Needs index - return [], [], True - - if self.args.history_to_use == "all": - # For some reason, yfinance gives some 0 - # values in the first index - stock_prices = stock_prices.iloc[1:] - else: - stock_prices = stock_prices.iloc[-self.args.history_to_use:] - - if self.args.is_test == 1: - future_prices = stock_prices.iloc[-self.args.future_bars:] - historical_prices = stock_prices.iloc[:-self.args.future_bars] - else: - historical_prices = stock_prices - - if stock_prices.shape[0] == 0: - return [], [], True - except Exception as e: - print("Exception", e) - return [], [], True - - return historical_prices, future_prices.values.tolist(), False - - def get_market_index_price(self): - """ - Gets market index price e.g SPY. One can change it to some other index - """ - symbol = self.args.market_index - stock_price_data, future_prices, not_found = self.get_data(symbol) - if not_found: - return None, None - - return stock_price_data, future_prices - - def collect_data_for_all_tickers(self): - """ - Iterates over all symbols and collects their data - """ - - print("Loading data for all stocks...") - symbol_names = [] - historical_price = [] - future_price = [] - - # Any stock with very low volatility is ignored. - # You can change this line to address that. - for i in tqdm(range(len(self.stocks_list))): - symbol = self.stocks_list[i] - try: - stock_price_data, future_prices, not_found = self.get_data( - symbol) - if not not_found: - # Add to lists - symbol_names.append(symbol) - historical_price.append(stock_price_data) - future_price.append(future_prices) - except Exception as e: - print("Exception", e) - continue - - # Sometimes, there are some errors in feature generation or price - # extraction, let us remove that stuff - historical_price_info, future_price_info, symbol_names = self.remove_bad_data( - historical_price, future_price, symbol_names) - for i in range(0, len(symbol_names)): - self.data_dictionary[symbol_names[i]] = { - "historical_prices": historical_price_info[i], - "future_prices": future_price_info[i]} - - return self.data_dictionary - - def remove_bad_data(self, historical_price, future_price, symbol_names): - """ - Remove bad data i.e data that had some errors while scraping or feature generation - - *** This can be much more improved with dicts and filter function. - """ - - length_dictionary = collections.Counter( - [i.shape[0] for i in historical_price]) - length_dictionary = list(length_dictionary.keys()) - most_common_length = length_dictionary[0] - - filtered_historical_price, filtered_future_prices, filtered_symbols = [], [], [], - for i in range(len(future_price)): - if historical_price[i].shape[0] == most_common_length: - filtered_symbols.append(symbol_names[i]) - filtered_historical_price.append(historical_price[i]) - filtered_future_prices.append(future_price[i]) - - return filtered_historical_price, filtered_future_prices, filtered_symbols +# Basic libraries +import os +import collections +import pandas as pd +import yfinance as yf +from tqdm import tqdm +import warnings +warnings.filterwarnings("ignore") + + +class DataEngine: + def __init__(self, args): + print("\n--> Data engine has been initialized...") + self.args = args + + # Stocks list + self.directory_path = str(os.path.dirname(os.path.abspath(__file__))) + str_path = f"{self.directory_path}/{self.args.stocks_file_path}" + self.stocks_file_path = str_path + self.stocks_list = [] + + # Load stock names in a list + self.load_stocks_from_file() + + # Dictionary to store data. This will only store and save data if + # the argument is_save_dictionary is 1. + self.data_dictionary = {} + + # Data length + self.stock_data_length = 0 + + def load_stocks_from_file(self): + """ + Load stock names from the file + """ + print("Loading all stocks from file...") + stocks_list = [] + with open(self.stocks_file_path, "r") as f: + stocks_list = [str(item).strip() for item in f] + + # Load symbols + stocks_list = list(sorted(set(stocks_list))) + print("Total number of stocks: %d" % len(stocks_list)) + self.stocks_list = stocks_list + + def get_most_frequent_count(self, input_list): + counter = collections.Counter(input_list) + return list(counter.keys())[0] + + def _split_data(self, data): + if self.args.is_test: + + return (data.iloc[:-self.args.future_bars]["Adj Close"].values, + data.iloc[-self.args.future_bars:]["Adj Close"].values) + return data["Adj Close"].values, None + + def _format_symbol(self, s): + x = s.upper() + x = x.replace(".VN", ".V") + if len(x.split(".")) > 2: + x = x.replace(".", "-", 1) + return x + + def get_data(self, symbol_raw): + """ + Get stock data from yahoo finance. + """ + symbol = self._format_symbol(symbol_raw) + future_prices = None + historical_prices = None + # Find period + if self.args.data_granularity_minutes == 1: + period = "7d" + interval = str(self.args.data_granularity_minutes) + "m" + if self.args.data_granularity_minutes == 3600: + period = "5y" + interval = "1d" + else: + period = "30d" + interval = str(self.args.data_granularity_minutes) + "m" + + # Get stock price + try: + # Stock price + stock_prices = yf.download( + tickers=symbol, + period=period, + interval=interval, + auto_adjust=False, + progress=False) + # stock_prices = stock_prices.reset_index() + + if self.stock_data_length == 0: + self.stock_data_length = stock_prices.shape[0] + elif stock_prices.shape[0] != self.stock_data_length: + raise Exception(f"{symbol}: Invalid Stock Length") + + if self.args.history_to_use == "all": + # For some reason, yfinance gives some 0 + # values in the first index + stock_prices = stock_prices.iloc[1:] + else: + stock_prices = stock_prices.iloc[-self.args.history_to_use:] + + historical_prices, future_prices = self._split_data(stock_prices) + + except Exception as e: + print("Exception", e) + return None, None + + return historical_prices, future_prices + + def collect_data_for_all_tickers(self): + """ + Iterates over all symbols and collects their data + """ + + print("Loading data for all stocks...") + data_dict = {"historical": pd.DataFrame(), + "future": pd.DataFrame() + } + + # Any stock with very low volatility is ignored. + # You can change this line to address that. + for i in tqdm(range(len(self.stocks_list))): + symbol = self.stocks_list[i] + try: + historical_data, future_data = self.get_data(symbol) + if historical_data is not None: + data_dict["historical"][symbol] = historical_data + if future_data is not None: + data_dict["future"][symbol] = future_data + except Exception as e: + print("Exception", e) + continue + data_dict["historical"] = data_dict["historical"].fillna(1) + data_dict["future"] = data_dict["future"].fillna(1) + + try: + data_dict["historical"].to_csv("historical.csv") + data_dict["future"].to_csv("future.csv") + + except Exception as e: + print("Exception: ", e) + + # try: + # plt.style.use('seaborn-white') + # plt.rc('grid', linestyle="dotted", color='#a0a0a0') + # plt.rcParams['axes.edgecolor'] = "#04383F" + # plt.rcParams['figure.figsize'] = (16, 9) + # data_dict["historical"].plot() + # plt.savefig("./output/gt_historical.png") + # data_dict["future"].plot() + # plt.savefig("./output/gt_future.png") + # plt.clf() + # except Exception as e: + # print("Exception: ", e) + + return data_dict diff --git a/eiten.py b/eiten.py index b5ecf14..4de7cfb 100644 --- a/eiten.py +++ b/eiten.py @@ -1,271 +1,167 @@ -import math -import numpy as np -import matplotlib.pyplot as plt - -# Load our modules -from data_loader import DataEngine -from simulator import MontoCarloSimulator -from backtester import BackTester -from strategy_manager import StrategyManager - - -class Eiten: - def __init__(self, args): - plt.style.use('seaborn-white') - plt.rc('grid', linestyle="dotted", color='#a0a0a0') - plt.rcParams['axes.edgecolor'] = "#04383F" - plt.rcParams['figure.figsize'] = (12, 6) - - print("\n--* Eiten has been initialized...") - self.args = args - - # Create data engine - self.dataEngine = DataEngine(args) - - # Monto carlo simulator - self.simulator = MontoCarloSimulator() - - # Strategy manager - self.strategyManager = StrategyManager() - - # Back tester - self.backTester = BackTester() - - # Data dictionary - self.data_dictionary = {} - - print('\n') - - def calculate_percentage_change(self, old, new): - """ - Calculate percentage change - """ - return ((new - old) * 100) / old - - def create_returns(self, historical_price_info): - """ - Create log return matrix, percentage return matrix, and mean return - vector - """ - - returns_matrix = [] - returns_matrix_percentages = [] - predicted_return_vectors = [] - for i in range(0, len(historical_price_info)): - close_prices = list(historical_price_info[i]["Close"]) - log_returns = [math.log(close_prices[i] / close_prices[i - 1]) - for i in range(1, len(close_prices))] - percentage_returns = [self.calculate_percentage_change( - close_prices[i - 1], close_prices[i]) for i in range(1, len(close_prices))] - - total_data = len(close_prices) - - # Expected returns in future. We can either use historical returns as future returns on try to simulate future returns and take the mean. For simulation, you can modify the functions in simulator to use here. - future_expected_returns = np.mean([(self.calculate_percentage_change(close_prices[i - 1], close_prices[i])) / ( - total_data - i) for i in range(1, len(close_prices))]) # More focus on recent returns - - # Add to matrices - returns_matrix.append(log_returns) - returns_matrix_percentages.append(percentage_returns) - - # Add returns to vector - # Assuming that future returns are similar to past returns - predicted_return_vectors.append(future_expected_returns) - - # Convert to numpy arrays for one liner calculations - predicted_return_vectors = np.array(predicted_return_vectors) - returns_matrix = np.array(returns_matrix) - returns_matrix_percentages = np.array(returns_matrix_percentages) - - return predicted_return_vectors, returns_matrix, returns_matrix_percentages - - def load_data(self): - """ - Loads data needed for analysis - """ - # Gather data for all stocks in a dictionary format - # Dictionary keys will be -> historical_prices, future_prices - self.data_dictionary = self.dataEngine.collect_data_for_all_tickers() - - # Add data to lists - symbol_names = list(sorted(self.data_dictionary.keys())) - historical_price_info, future_prices = [], [] - for symbol in symbol_names: - historical_price_info.append( - self.data_dictionary[symbol]["historical_prices"]) - future_prices.append(self.data_dictionary[symbol]["future_prices"]) - - # Get return matrices and vectors - predicted_return_vectors, returns_matrix, returns_matrix_percentages = self.create_returns( - historical_price_info) - return historical_price_info, future_prices, symbol_names, predicted_return_vectors, returns_matrix, returns_matrix_percentages - - def run_strategies(self): - """ - Run strategies, back and future test them, and simulate the returns. - """ - historical_price_info, future_prices, symbol_names, predicted_return_vectors, returns_matrix, returns_matrix_percentages = self.load_data() - historical_price_market, future_prices_market = self.dataEngine.get_market_index_price() - - # Calculate covariance matrix - covariance_matrix = np.cov(returns_matrix) - - # Use random matrix theory to filter out the noisy eigen values - if self.args.apply_noise_filtering: - print( - "\n** Applying random matrix theory to filter out noise in the covariance matrix...\n") - covariance_matrix = self.strategyManager.random_matrix_theory_based_cov( - returns_matrix) - - # Get weights for the portfolio - eigen_portfolio_weights_dictionary = self.strategyManager.calculate_eigen_portfolio( - symbol_names, covariance_matrix, self.args.eigen_portfolio_number) - mvp_portfolio_weights_dictionary = self.strategyManager.calculate_minimum_variance_portfolio( - symbol_names, covariance_matrix) - msr_portfolio_weights_dictionary = self.strategyManager.calculate_maximum_sharpe_portfolio( - symbol_names, covariance_matrix, predicted_return_vectors) - ga_portfolio_weights_dictionary = self.strategyManager.calculate_genetic_algo_portfolio( - symbol_names, returns_matrix_percentages) - - # Print weights - print("\n*% Printing portfolio weights...") - self.print_and_plot_portfolio_weights( - eigen_portfolio_weights_dictionary, 'Eigen Portfolio', plot_num=1) - self.print_and_plot_portfolio_weights( - mvp_portfolio_weights_dictionary, 'Minimum Variance Portfolio (MVP)', plot_num=2) - self.print_and_plot_portfolio_weights( - msr_portfolio_weights_dictionary, 'Maximum Sharpe Portfolio (MSR)', plot_num=3) - self.print_and_plot_portfolio_weights( - ga_portfolio_weights_dictionary, 'Genetic Algo (GA)', plot_num=4) - self.draw_plot("output/weights.png") - - # Back test - print("\n*& Backtesting the portfolios...") - self.backTester.back_test(symbol_names, eigen_portfolio_weights_dictionary, - self.data_dictionary, - historical_price_market, - self.args.only_long, - market_chart=True, - strategy_name='Eigen Portfolio') - self.backTester.back_test(symbol_names, - mvp_portfolio_weights_dictionary, - self.data_dictionary, historical_price_market, - self.args.only_long, - market_chart=False, - strategy_name='Minimum Variance Portfolio (MVP)') - self.backTester.back_test(symbol_names, msr_portfolio_weights_dictionary, - self.data_dictionary, - historical_price_market, - self.args.only_long, - market_chart=False, - strategy_name='Maximum Sharpe Portfolio (MSR)') - self.backTester.back_test(symbol_names, - ga_portfolio_weights_dictionary, - self.data_dictionary, - historical_price_market, - self.args.only_long, - market_chart=False, - strategy_name='Genetic Algo (GA)') - self.draw_plot("output/backtest.png") - - if self.args.is_test: - print("\n#^ Future testing the portfolios...") - # Future test - self.backTester.future_test(symbol_names, - eigen_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.only_long, - market_chart=True, - strategy_name='Eigen Portfolio') - self.backTester.future_test(symbol_names, - mvp_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.only_long, - market_chart=False, - strategy_name='Minimum Variance Portfolio (MVP)') - self.backTester.future_test(symbol_names, - msr_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.only_long, - market_chart=False, - strategy_name='Maximum Sharpe Portfolio (MSR)') - self.backTester.future_test(symbol_names, - ga_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.only_long, - market_chart=False, - strategy_name='Genetic Algo (GA)') - self.draw_plot("output/future_tests.png") - - # Simulation - print("\n+$ Simulating future prices using monte carlo...") - self.simulator.simulate_portfolio(symbol_names, - eigen_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.is_test, - market_chart=True, - strategy_name='Eigen Portfolio') - self.simulator.simulate_portfolio(symbol_names, - eigen_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.is_test, - market_chart=False, - strategy_name='Minimum Variance Portfolio (MVP)') - self.simulator.simulate_portfolio(symbol_names, - eigen_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.is_test, - market_chart=False, - strategy_name='Maximum Sharpe Portfolio (MSR)') - self.simulator.simulate_portfolio(symbol_names, - ga_portfolio_weights_dictionary, - self.data_dictionary, - future_prices_market, - self.args.is_test, - market_chart=False, - strategy_name='Genetic Algo (GA)') - self.draw_plot("output/monte_carlo.png") - - def draw_plot(self, filename="output/graph.png"): - """ - Draw plots - """ - # Styling for plots - - plt.grid() - plt.legend(fontsize=14) - plt.tight_layout() - plt.show() - - """if self.args.save_plot: - plt.savefig(filename) - else: - plt.tight_layout() - plt.show()""" # Plots were not being generated properly. Need to fix this. - - def print_and_plot_portfolio_weights(self, weights_dictionary: dict, strategy, plot_num: int) -> None: - print("\n-------- Weights for %s --------" % strategy) - symbols = list(sorted(weights_dictionary.keys())) - symbol_weights = [] - for symbol in symbols: - print("Symbol: %s, Weight: %.4f" % - (symbol, weights_dictionary[symbol])) - symbol_weights.append(weights_dictionary[symbol]) - - # Plot - width = 0.1 - x = np.arange(len(symbol_weights)) - plt.bar(x + (width * (plot_num - 1)) + 0.05, - symbol_weights, label=strategy, width=width) - plt.xticks(x, symbols, fontsize=14) - plt.yticks(fontsize=14) - plt.xlabel("Symbols", fontsize=14) - plt.ylabel("Weight in Portfolio", fontsize=14) - plt.title("Portfolio Weights for Different Strategies", fontsize=14) +# !/usr/bin/env python +import numpy as np +import pandas as pd +import matplotlib.pyplot as plt +import json + +# Load our modules +from data_loader import DataEngine +from backtester import BackTester +from utils import random_matrix_theory_based_cov +from utils import dotdict, get_predicted_returns, get_exp_returns +from utils import get_price_deltas, get_log_returns +from strategies import portfolios + + +class Eiten: + def __init__(self, args: dict = None): + if args is None: + arg_types = {"str": str, "int": int, "bool": bool} + x = json.load(open("commands.json", "r")) + args = dotdict( + {i["comm"][2:]: arg_types[i["type"]](i["default"]) for i in x}) + + print("\n--* Eiten has been initialized...") + self.args = args + if self.args.history_to_use != "all": + self.args.history_to_use = int(self.args.history_to_use) + + # Data dictionary + self.data_dict = {} # {"market": args.market_index} + self.market_data = {} + + print('\n') + + def load_data(self): + """ + Loads data needed for analysis + """ + # Gather data for all stocks in a dictionary format + # Dictionary keys will be -> historical, future + de = DataEngine(self.args) + self.data_dict = de.collect_data_for_all_tickers() + p, f = de.get_data(self.args.market_index) + + self.market_data["historical"] = pd.DataFrame( + columns=[self.args.market_index], data=p) + self.market_data["future"] = pd.DataFrame( + columns=[self.args.market_index], data=f) + # Get return matrices and vectors + return self.data_dict + + def _test(self, direction): + # Back test + print("\n*& Backtesting the portfolios...") + assert direction in ["historical", "future"], "Invalid direction!" + + return pd.DataFrame(columns=self.portfolios.columns, + data=BackTester.get_test( + self.portfolios, + self.data_dict, + direction, + self.args.only_long)) + + def _monte_carlo(self, span): + self.data_dict["sim"] = BackTester.simulate_future_prices( + self.data_dict, get_predicted_returns, span) + return pd.DataFrame(columns=self.portfolios.columns, + data=BackTester.get_test( + self.portfolios, + self.data_dict, + "sim", + self.args.only_long)) + # BackTester.plot_test(title="Simulated Future Returns", + # xlabel="Bars (Time Sorted)", + # ylabel="Cumulative Percentage Return", + # df=df) + + def run_strategies(self): + """ + Run strategies, back and future test them, and simulate the returns. + """ + self.load_data() + + # Calculate covariance matrix + log_returns = get_log_returns(self.data_dict["historical"]) + cov_matrix = log_returns.cov() + + # Use random matrix theory to filter out the noisy eigen values + if self.args.apply_noise_filtering: + print("\nFiltering noise from cov matrix\n") + cov_matrix = random_matrix_theory_based_cov(log_returns) + + pred_returns = get_predicted_returns(self.data_dict["historical"]) + perc_returns = get_price_deltas(self.data_dict["historical"]) + + self.portfolios = {} + # Get weights for the portfolio + for p in portfolios: + name = p.name + weights = p.generate_portfolio( + cov_matrix=cov_matrix, p_number=self.args.eigen_portfolio_number, + pred_returns=pred_returns.T, + perc_returns=perc_returns, + long_only=self.args.only_long) + self.portfolios[name] = weights + self.portfolios = pd.DataFrame.from_dict(self.portfolios) + + # Print weights + print("\n*% Printing portfolio weights...") + p_count = 1 + print(self.portfolios) + self.draw_plot("output/weights.png", (p_count, 6)) + self._test("historical") + self.draw_plot("output/back_test.png") + + if self.args.is_test: + self._test("future") + self.draw_plot("output/future_tests.png") + + # Simulation + print("\n+$ Simulating future prices using monte carlo...") + self._monte_carlo(self.args.future_bars) + self.draw_plot("output/monte_carlo.png") + return + + def draw_plot(self, filename="output/graph.png", figsize=(12, 6)): + """ + Draw plots + """ + # Styling for plots + plt.style.use('seaborn-white') + plt.rc('grid', linestyle="dotted", color='#a0a0a0') + plt.rcParams['axes.edgecolor'] = "#04383F" + plt.rcParams['axes.titlesize'] = "large" + plt.rcParams['axes.labelsize'] = "medium" + plt.rcParams['lines.linewidth'] = 2 + plt.rcParams['figure.figsize'] = figsize + + plt.grid() + plt.legend(fontsize=14) + if self.args.save_plot: + plt.savefig(filename) + else: + plt.tight_layout() + plt.show() + # plt.cla() + plt.clf() + + def print_and_plot_portfolio_weights(self, + weights: dict, strategy: str, + plot_num: int, figsize=(12, 6)): + + print("\n-------- Weights for %s --------" % strategy) + symbols = list(weights.keys()) + for k, v in weights.items(): + print(f"Symbol: {k}, Weight: {v:.4f}") + + # Plot + width = 0.1 + x = np.arange(len(weights)) + plt.bar(x + (width * (plot_num - 1)) + 0.05, + list(weights.values()), label=strategy, width=width) + plt.xticks(x, symbols, rotation=90) + plt.yticks(fontsize=14) + plt.xlabel("Symbols", fontsize=14) + plt.ylabel("Weight in Portfolio", fontsize=14) + plt.title("Portfolio Weights for Different Strategies", fontsize=14) diff --git a/portfolio_manager.py b/portfolio_manager.py index 335cce8..8928b6a 100644 --- a/portfolio_manager.py +++ b/portfolio_manager.py @@ -25,8 +25,7 @@ def main(): # Get arguments args = argParser.parse_args() - # Check arguments - ArgChecker(args) + # Run strategies eiten = Eiten(args) diff --git a/simulator.py b/simulator.py deleted file mode 100644 index da1d08b..0000000 --- a/simulator.py +++ /dev/null @@ -1,140 +0,0 @@ -# Basic libraries -import os -import sys -import math -import scipy -import random -import collections -import numpy as np -import pandas as pd -import scipy.stats as st -import matplotlib.pyplot as plt -import warnings -warnings.filterwarnings("ignore") - -# Styling for plots -plt.style.use('seaborn-white') -plt.rc('grid', linestyle="dotted", color='#a0a0a0') -plt.rcParams['axes.edgecolor'] = "#04383F" - - -class MontoCarloSimulator: - """ - Monto carlo simulator that calculates the historical returns distribution and uses it to predict the future returns - """ - - def __init__(self): - print("\n--$ Simulator has been initialized") - - def calculate_percentage_change(self, old, new): - """ - Percentage change - """ - return ((new - old) * 100) / old - - def draw_portfolio_performance_chart(self, returns_matrix, portfolio_weights_dictionary, strategy_name): - """ - Draw returns chart for portfolio performance - """ - - # Get portfolio returns - returns_matrix = np.array(returns_matrix).transpose() - portfolio_weights_vector = np.array([portfolio_weights_dictionary[symbol] for symbol in portfolio_weights_dictionary]).transpose() - portfolio_returns = np.dot(returns_matrix, portfolio_weights_vector) - portfolio_returns_cumulative = np.cumsum(portfolio_returns) - - # Plot - x = np.arange(len(portfolio_returns_cumulative)) - plt.axhline(y = 0, linestyle = 'dotted', alpha = 0.3, color = 'black') - plt.plot(x, portfolio_returns_cumulative, linewidth = 2.0, label = "Projected Returns from " + str(strategy_name)) - plt.title("Simulated Future Returns", fontsize = 14) - plt.xlabel("Bars (Time Sorted)", fontsize = 14) - plt.ylabel("Cumulative Percentage Return", fontsize = 14) - plt.xticks(fontsize = 14) - plt.yticks(fontsize = 14) - - def draw_market_performance_chart(self, actual_returns, strategy_name): - """ - Draw actual market returns if future data is available - """ - - # Get market returns - cumulative_returns = np.cumsum(actual_returns) - - # Plot - x = np.arange(len(cumulative_returns)) - plt.axhline(y = 0, linestyle = 'dotted', alpha = 0.3, color = 'black') - plt.plot(x, cumulative_returns, linewidth = 2.0, color = '#282828', linestyle = '--', label = "Market Index Returns") - plt.title("Simulated Future Returns", fontsize = 14) - plt.xlabel("Bars (Time Sorted)", fontsize = 14) - plt.ylabel("Cumulative Percentage Return", fontsize = 14) - plt.xticks(fontsize = 14) - plt.yticks(fontsize = 14) - - def simulate_portfolio(self, symbol_names, portfolio_weights_dictionary, portfolio_data_dictionary, future_prices_market, test_or_predict, market_chart, strategy_name, simulation_timesteps = 25): - """ - Simulate portfolio returns in the future - """ - returns_matrix = [] - actual_returns_matrix = [] - - # Iterate over each symbol to get their returns - for symbol in symbol_names: - - # Get symbol returns using monte carlo - historical_close_prices = list(portfolio_data_dictionary[symbol]["historical_prices"]["Close"]) - future_price_predictions, _ = self.simulate_and_get_future_prices(historical_close_prices, simulation_timesteps = max(simulation_timesteps, len(list(portfolio_data_dictionary[symbol]["future_prices"])))) - predicted_future_returns = [self.calculate_percentage_change(future_price_predictions[i - 1], future_price_predictions[i]) for i in range(1, len(future_price_predictions))] - returns_matrix.append(predicted_future_returns) - - - # Get portfolio returns - self.draw_portfolio_performance_chart(returns_matrix, portfolio_weights_dictionary, strategy_name) - - # Check whether we have actual future data available or not - if test_or_predict == 1: - future_prices_market = [item[4] for item in list(future_prices_market)] - actual_future_prices_returns = [self.calculate_percentage_change(future_prices_market[i - 1], future_prices_market[i]) for i in range(1, len(future_prices_market))] - if market_chart == True: - # Also draw the actual future returns - self.draw_market_performance_chart(actual_future_prices_returns, strategy_name) - - def simulate_and_get_future_prices(self, historical_prices, simulation_timesteps = 25): - - # Get log returns from historical data - close_prices = historical_prices - returns = [math.log(close_prices[i] / close_prices[i - 1]) for i in range(1, len(close_prices))] - - # Get distribution of returns - hist = np.histogram(returns, bins = 32) - hist_dist = scipy.stats.rv_histogram(hist) # Distribution function - - predicted_prices = [] - # Do 25 iterations to simulate prices - for iteration in range(25): - new_close_prices = [close_prices[-1]] - new_close_prices_percentages = [] - for i in range(simulation_timesteps): - random_value = random.uniform(0, 1) - return_value = round(np.exp(hist_dist.ppf(random_value)), 5) # Get simulated return - price_last_point = new_close_prices[-1] - price_next_point = price_last_point * return_value - percentage_change = self.calculate_percentage_change(price_last_point, price_next_point) - - # Add to list - new_close_prices.append(price_next_point) - - predicted_prices.append(new_close_prices) - - # Calculate confidence intervals and average future returns. Conf intervals are not being used right now - conf_intervals = st.t.interval(0.95, len(predicted_prices), loc=np.mean(predicted_prices, axis = 0), scale=st.sem(predicted_prices, axis = 0)) - predicted_prices_mean = np.mean(predicted_prices, axis = 0) - return predicted_prices_mean, conf_intervals - - def is_nan(self, object): - """ - Check if object is null - """ - return object != object - - diff --git a/strategies/__init__.py b/strategies/__init__.py new file mode 100644 index 0000000..5039f0a --- /dev/null +++ b/strategies/__init__.py @@ -0,0 +1,14 @@ +from os.path import dirname, basename, isfile, join +import glob +import importlib.util +import inspect + +modules = glob.glob(join(dirname(__file__), "*.py")) +__names = [basename(f)[:-3] for f in modules if isfile(f) + and not f.endswith('__init__.py')] +portfolios = [] +for i in __names: + spec = importlib.util.spec_from_file_location("", f"./strategies/{i}.py") + foo = importlib.util.module_from_spec(spec) + spec.loader.exec_module(foo) + portfolios.append(inspect.getmembers(foo, inspect.isclass)[0][1]()) diff --git a/strategies/eigen_portfolio_strategy.py b/strategies/eigen_portfolio_strategy.py index 51f5af0..2ae527b 100644 --- a/strategies/eigen_portfolio_strategy.py +++ b/strategies/eigen_portfolio_strategy.py @@ -2,19 +2,30 @@ import os import warnings import numpy as np +from utils import dotdict, normalize_weights warnings.filterwarnings("ignore") + class EigenPortfolioStrategy: - def __init__(self): - print("Eigen portfolio strategy has been created") - - def generate_portfolio(self, symbols, covariance_matrix, eigen_portfolio_number): - """ - Inspired by: https://srome.github.io/Eigenvesting-I-Linear-Algebra-Can-Help-You-Choose-Your-Stock-Portfolio/ - """ - eig_values, eig_vectors = np.linalg.eigh(covariance_matrix) - market_eigen_portfolio = eig_vectors[:,-1] / np.sum(eig_vectors[:,-1]) # We don't need this but in case someone wants to analyze - eigen_portfolio = eig_vectors[:,-eigen_portfolio_number] / np.sum(eig_vectors[:,-eigen_portfolio_number]) # This is a portfolio that is uncorrelated to market and still yields good returns + def __init__(self): + self.name = "Eigen Portfolio" - portfolio_weights_dictionary = dict([(symbols[x], eigen_portfolio[x]) for x in range(0, len(eigen_portfolio))]) - return portfolio_weights_dictionary + def generate_portfolio(self, **kwargs): + """ + Inspired by: https://srome.github.io/Eigenvesting-I-Linear-Algebra-Can-Help-You-Choose-Your-Stock-Portfolio/ + """ + kwargs = dotdict(kwargs) + eigh_values, eigh_vectors = np.linalg.eigh(kwargs.cov_matrix) + # We don't need this but in case someone wants to analyze + # market_eigen_portfolio = eig_vectors[:, -1] / np.sum(eig_vectors[:, -1]) + # This is a portfolio that is uncorrelated to market and still yields good returns + eigen_portfolio = eigh_vectors[:, -kwargs.p_number] / \ + np.sum(eigh_vectors[:, -kwargs.p_number]) + # if kwargs.long_only: + # weights = {kwargs.cov_matrix.columns[i]: max(0, eigen_portfolio[i]) + # for i in range(eigen_portfolio.shape[0])} + # else: + eigen_portfolio = normalize_weights(eigen_portfolio) + weights = {kwargs.cov_matrix.columns[i]: eigen_portfolio[i] + for i in range(eigen_portfolio.shape[0])} + return weights diff --git a/strategies/genetic_algo_strategy.py b/strategies/genetic_algo_strategy.py index 124409a..1fdafdb 100644 --- a/strategies/genetic_algo_strategy.py +++ b/strategies/genetic_algo_strategy.py @@ -1,118 +1,116 @@ # Basic libraries -import os -import random import warnings import numpy as np +from utils import dotdict, normalize_weights warnings.filterwarnings("ignore") + class GeneticAlgoStrategy: - """ - My own custom implementation of genetic algorithms for portfolio - """ - def __init__(self): - print("Genetic algo strategy has been created") - self.initial_genes = 100 - self.selection_top = 25 - self.mutation_iterations = 50 - self.weight_update_factor = 0.1 - self.gene_length = None - self.genes_in_each_iteration = 250 - self.iterations = 50 - self.crossover_probability = 0.05 - - def generate_portfolio(self, symbols, return_matrix): - self.gene_length = len(symbols) - - # Create initial genes - initial_genes = self.generate_initial_genes(symbols) - - for i in range(self.iterations): - # Select - top_genes = self.select(return_matrix, initial_genes) - #print("Iteration %d Best Sharpe Ratio: %.3f" % (i, top_genes[0][0])) - top_genes = [item[1] for item in top_genes] - - # Mutate - mutated_genes = self.mutate(top_genes) - initial_genes = mutated_genes - - top_genes = self.select(return_matrix, initial_genes) - best_gene = top_genes[0][1] - transposed_gene = np.array(best_gene).transpose() # Gene is a distribution of weights for different stocks - return_matrix_transposed = return_matrix.transpose() - returns = np.dot(return_matrix_transposed, transposed_gene) - returns_cumsum = np.cumsum(returns) - - ga_portfolio_weights = best_gene - ga_portfolio_weights = dict([(symbols[x], ga_portfolio_weights[x]) for x in range(0, len(ga_portfolio_weights))]) - return ga_portfolio_weights - - def generate_initial_genes(self, symbols): - total_symbols = len(symbols) - - genes = [] - for i in range(self.initial_genes): - gene = [random.uniform(-1, 1) for _ in range(0, total_symbols)] - genes.append(gene) - - return genes - - def mutate(self, genes): - new_genes = [] - - for gene in genes: - for x in range(0, self.mutation_iterations): - mutation = gene + (self.weight_update_factor * np.random.uniform(-1, 1, self.gene_length)) - mutation = list(mutation) - new_genes.append(mutation) - - new_genes = genes + new_genes - random.shuffle(new_genes) - genes_to_keep = new_genes[:self.genes_in_each_iteration] - - # Add crossovers - crossovers = self.crossover(new_genes) - genes_to_keep = genes_to_keep + crossovers - - return genes_to_keep - - def select(self, return_matrix, genes): - genes_with_scores = [] - for gene in genes: - transposed_gene = np.array(gene).transpose() # Gene is a distribution of weights for different stocks - return_matrix_transposed = return_matrix.transpose() - returns = np.dot(return_matrix_transposed, transposed_gene) - returns_cumsum = np.cumsum(returns) - - # Get fitness score - fitness = self.fitness_score(returns) - genes_with_scores.append([fitness, gene]) - - # Sort - random_genes = [self.generate_a_gene() for _ in range(5)] - genes_with_scores = list(reversed(sorted(genes_with_scores))) - genes_with_scores = genes_with_scores[:self.selection_top] + random_genes - return genes_with_scores - - def fitness_score(self, returns): - sharpe_returns = np.mean(returns) / np.std(returns) - return sharpe_returns - - def generate_a_gene(self): - gene = [random.uniform(-1, 1) for _ in range(self.gene_length)] - return gene - - def crossover(self, population): - crossover_population = [] - for z in range(0, len(population)): - if random.uniform(0, 1) < self.crossover_probability: - try: - random_gene_first = list(random.sample(population, 1)[0]) - random_gene_second = list(random.sample(population, 1)[0]) - random_split = random.randrange(1, len(random_gene_first) - 1) - crossover_gene = random_gene_first[:random_split] + random_gene_second[random_split:] - crossover_population.append(crossover_gene) - except Exception as e: - continue - - return crossover_population \ No newline at end of file + """ + My own custom implementation of genetic algorithms for portfolio + """ + + def __init__(self): + self.name = "Genetic Algo" + self.initial_genes = 100 + self.selection_top = 10 + self.mutation_iterations = 50 + self.weight_update_factor = 0.01 + self.gene_length = None + self.genes_in_each_iteration = 250 + self.iterations = 100 + self.crossover_probability = 0.1 + + def generate_portfolio(self, **kwargs): + kwargs = dotdict(kwargs) + symbols = list(kwargs.cov_matrix.columns) + self.gene_length = len(symbols) + + # Create initial genes + initial_genes = self.generate_initial_genes(symbols) + + for i in range(self.iterations): + # Select + top_genes = self.select(kwargs.sample_returns, initial_genes) + # print("Iteration %d Best Sharpe Ratio: %.3f" % (i, top_genes[0][0])) + top_genes = [item[1] for item in top_genes] + + # Mutate + mutated_genes = self.mutate(top_genes) + initial_genes = mutated_genes + + top_genes = self.select(kwargs.sample_returns, initial_genes) + best_gene = top_genes[0][1] + # Gene is a distribution of weights for different stocks + # transposed_gene = np.array(best_gene).transpose() + # returns = np.dot(return_matrix, transposed_gene) + # returns_cumsum = np.cumsum(returns) + n_best = normalize_weights(best_gene) + weights = {symbols[x]: n_best[x] for x in range(0, len(best_gene))} + return weights + + def generate_initial_genes(self, symbols): + return np.array( + [self.generate_gene() for _ in range(self.gene_length)]) + + def mutate(self, genes): + new_genes = [] + + for gene in genes: + for x in range(0, self.mutation_iterations): + mutation = gene + (self.weight_update_factor * + np.random.uniform(-1, 1, self.gene_length)) + new_genes.append(mutation) + + new_genes = genes + new_genes + np.random.shuffle(new_genes) + genes_to_keep = new_genes[:self.genes_in_each_iteration] + + # Add crossovers + crossovers = self.crossover(new_genes) + genes_to_keep = genes_to_keep + crossovers + + return genes_to_keep + + def select(self, return_matrix, genes): + genes_with_scores = [] + for gene in genes: + # Gene is a distribution of weights for different stocks + transposed_gene = gene.transpose() + returns = np.dot(return_matrix, transposed_gene) + # returns_cumsum = np.cumsum(returns) + + # Get fitness score + fitness = self.fitness_score(returns) + genes_with_scores.append([fitness, gene]) + + # Sort + random_genes = [self.generate_gene() for _ in range(5)] + genes_with_scores = sorted( + genes_with_scores, reverse=True, key=lambda x: x[0]) + genes_with_scores = (genes_with_scores[:self.selection_top] + + random_genes) + return genes_with_scores + + def fitness_score(self, returns): + sharpe_returns = np.mean(returns) / np.std(returns) + return sharpe_returns + + def generate_gene(self): + return np.random.uniform(-1, 1, self.gene_length) + + def crossover(self, population): + rng = np.random.default_rng() + crossover_population = [] + + population = np.array( + list(filter(lambda x: type(x) == np.ndarray, population))) + for z in range(0, len(population)): + if np.random.uniform(0, 1) < self.crossover_probability: + a, b = rng.choice(population, 2) + random_split = np.random.randint(1, len(a) - 1) + ab = np.concatenate( + (a[:random_split], b[random_split:]), axis=0) + crossover_population.append(ab) + + return crossover_population diff --git a/strategies/maximum_sharpe_ratio_strategy.py b/strategies/maximum_sharpe_ratio_strategy.py index f47c251..23156af 100644 --- a/strategies/maximum_sharpe_ratio_strategy.py +++ b/strategies/maximum_sharpe_ratio_strategy.py @@ -1,24 +1,32 @@ # Basic libraries -import os -import random import warnings import numpy as np +from utils import dotdict, normalize_weights warnings.filterwarnings("ignore") + class MaximumSharpeRatioStrategy: - def __init__(self): - print("Maximum sharpe ratio strategy has been created") - - def generate_portfolio(self, symbols, covariance_matrix, returns_vector): - """ - Inspired by: Eigen Portfolio Selection: A Robust Approach to Sharpe Ratio Maximization, https://papers.ssrn.com/sol3/papers.cfm?abstract_id=3070416 - """ - inverse_cov_matrix = np.linalg.pinv(covariance_matrix) - ones = np.ones(len(inverse_cov_matrix)) + def __init__(self): + self.name = 'Maximum Sharpe Portfolio (MSR)' + + def generate_portfolio(self, **kwargs): + """ + Inspired by: Eigen Portfolio Selection: + A Robust Approach to Sharpe Ratio Maximization, + https://papers.ssrn.com/sol3/papers.cfm?abstract_id=3070416 + """ + kwargs = dotdict(kwargs) + inverse_cov_matrix = np.linalg.pinv(kwargs.cov_matrix) + ones = np.ones(len(inverse_cov_matrix)) + + numerator = np.dot(inverse_cov_matrix, kwargs.pred_returns) + denominator = np.dot( + np.dot(ones.transpose(), inverse_cov_matrix), kwargs.pred_returns) + msr_portfolio_weights = numerator / denominator + + msr_portfolio_weights = normalize_weights(msr_portfolio_weights) + + weights = {kwargs.cov_matrix.columns[i]: msr_portfolio_weights[i] + for i in range(len(msr_portfolio_weights))} - numerator = np.dot(inverse_cov_matrix, returns_vector) - denominator = np.dot(np.dot(ones.transpose(), inverse_cov_matrix), returns_vector) - msr_portfolio_weights = numerator / denominator - - portfolio_weights_dictionary = dict([(symbols[x], msr_portfolio_weights[x]) for x in range(0, len(msr_portfolio_weights))]) - return portfolio_weights_dictionary \ No newline at end of file + return weights diff --git a/strategies/minimum_variance_strategy.py b/strategies/minimum_variance_strategy.py index 53ab176..4f1b26c 100644 --- a/strategies/minimum_variance_strategy.py +++ b/strategies/minimum_variance_strategy.py @@ -1,21 +1,25 @@ # Basic libraries -import os -import random import warnings import numpy as np +from utils import dotdict, normalize_weights warnings.filterwarnings("ignore") + class MinimumVarianceStrategy: - def __init__(self): - print("Minimum Variance strategy has been created") - - def generate_portfolio(self, symbols, covariance_matrix): - """ - Inspired by: https://srome.github.io/Eigenvesting-II-Optimize-Your-Portfolio-With-Optimization/ - """ - inverse_cov_matrix = np.linalg.pinv(covariance_matrix) - ones = np.ones(len(inverse_cov_matrix)) - inverse_dot_ones = np.dot(inverse_cov_matrix, ones) - min_var_weights = inverse_dot_ones / np.dot( inverse_dot_ones, ones) - portfolio_weights_dictionary = dict([(symbols[x], min_var_weights[x]) for x in range(0, len(min_var_weights))]) - return portfolio_weights_dictionary + def __init__(self): + self.name = "Minimum Variance Portfolio (MVP)" + + def generate_portfolio(self, **kwargs): + """ + Inspired by: https://srome.github.io/Eigenvesting-II-Optimize-Your-Portfolio-With-Optimization/ + """ + kwargs = dotdict(kwargs) + + inverse_cov_matrix = np.linalg.pinv(kwargs.cov_matrix) + ones = np.ones(len(inverse_cov_matrix)) + inverse_dot_ones = np.dot(inverse_cov_matrix, ones) + min_var_weights = inverse_dot_ones / np.dot(inverse_dot_ones, ones) + min_var_weights = normalize_weights(min_var_weights) + weights = {kwargs.cov_matrix.columns[i]: min_var_weights[i] + for i in range(min_var_weights.shape[0])} + return weights diff --git a/strategies/strategy_helper_functions.py b/strategies/strategy_helper_functions.py deleted file mode 100644 index dd32694..0000000 --- a/strategies/strategy_helper_functions.py +++ /dev/null @@ -1,41 +0,0 @@ -# Basic libraries -import os -import random -import warnings -import numpy as np -warnings.filterwarnings("ignore") - -class StrategyHelperFunctions: - def __init__(self): - print("Helper functions have been created") - - def random_matrix_theory_based_cov(self, returns_matrix): - """ - This is inspired by the excellent post @ https://srome.github.io/Eigenvesting-III-Random-Matrix-Filtering-In-Finance/ - """ - - # Calculate variance and std, will come in handy during reconstruction - variances = np.diag(np.cov(returns_matrix)) - standard_deviations = np.sqrt(variances) - - # Get correlation matrix and compute eigen vectors and values - correlation_matrix = np.corrcoef(returns_matrix) - eig_values, eig_vectors = np.linalg.eigh(correlation_matrix) - - # Get maximum theoretical eigen value for a random matrix - sigma = 1 # The variance for all of the standardized log returns is 1 - Q = len(returns_matrix[0]) / len(returns_matrix) - max_theoretical_eval = np.power(sigma*(1 + np.sqrt(1/Q)),2) - - # Prune random eigen values - eig_values_pruned = eig_values[eig_values > max_theoretical_eval] - eig_values[eig_values <= max_theoretical_eval] = 0 - - # Reconstruct the covariance matrix from the correlation matrix and filtered eigen values - temp = np.dot(eig_vectors, np.dot(np.diag(eig_values), np.transpose(eig_vectors))) - np.fill_diagonal(temp, 1) - filtered_matrix = temp - filtered_cov_matrix = np.dot(np.diag(standard_deviations), - np.dot(filtered_matrix,np.diag(standard_deviations))) - - return filtered_cov_matrix \ No newline at end of file diff --git a/strategy_manager.py b/strategy_manager.py deleted file mode 100644 index d32b3d2..0000000 --- a/strategy_manager.py +++ /dev/null @@ -1,60 +0,0 @@ -# Basic libraries -import os -import warnings -from strategies.genetic_algo_strategy import GeneticAlgoStrategy -from strategies.maximum_sharpe_ratio_strategy import MaximumSharpeRatioStrategy -from strategies.eigen_portfolio_strategy import EigenPortfolioStrategy -from strategies.minimum_variance_strategy import MinimumVarianceStrategy -from strategies.strategy_helper_functions import StrategyHelperFunctions -warnings.filterwarnings("ignore") - -class StrategyManager: - """ - Runs and manages all strategies - """ - def __init__(self): - print("\n--= Strategy manager has been created...") - self.geneticAlgoStrategy = GeneticAlgoStrategy() - self.minimumVarianceStrategy = MinimumVarianceStrategy() - self.eigenPortfolioStrategy = EigenPortfolioStrategy() - self.maximumSharpeRatioStrategy = MaximumSharpeRatioStrategy() - self.strategyHelperFunctions = StrategyHelperFunctions() - - def calculate_genetic_algo_portfolio(self, symbols, returns_matrix_percentages): - """ - Genetic algorithm based portfolio that maximizes sharpe ratio. This is my own implementation - """ - print("-* Calculating portfolio weights using genetic algorithm...") - portfolio_weights_dictionary = self.geneticAlgoStrategy.generate_portfolio(symbols, returns_matrix_percentages) - return portfolio_weights_dictionary - - def calculate_eigen_portfolio(self, symbols, covariance_matrix, eigen_portfolio_number): - """ - 2nd Eigen Portfolio - """ - print("-$ Calculating portfolio weights using eigen values...") - portfolio_weights_dictionary = self.eigenPortfolioStrategy.generate_portfolio(symbols, covariance_matrix, eigen_portfolio_number) - return portfolio_weights_dictionary - - def calculate_minimum_variance_portfolio(self, symbols, covariance_matrix): - """ - Minimum variance portfolio - """ - print("-! Calculating portfolio weights using minimum variance portfolio algorithm...") - portfolio_weights_dictionary = self.minimumVarianceStrategy.generate_portfolio(symbols, covariance_matrix) - return portfolio_weights_dictionary - - def calculate_maximum_sharpe_portfolio(self, symbols, covariance_matrix, returns_vector): - """ - Maximum sharpe portfolio - """ - print("-# Calculating portfolio weights using maximum sharpe portfolio algorithm...") - portfolio_weights_dictionary = self.maximumSharpeRatioStrategy.generate_portfolio(symbols, covariance_matrix, returns_vector) - return portfolio_weights_dictionary - - def random_matrix_theory_based_cov(self, returns_matrix): - """ - Covariance matrix filtering using random matrix theory - """ - filtered_covariance_matrix = self.strategyHelperFunctions.random_matrix_theory_based_cov(returns_matrix) - return filtered_covariance_matrix \ No newline at end of file diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..3c9ed4a --- /dev/null +++ b/utils.py @@ -0,0 +1,90 @@ +# Basic libraries +import warnings +import numpy as np +import pandas as pd + +warnings.filterwarnings("ignore") + + +class dotdict(dict): + """dot.notation access to dictionary attributes""" + __getattr__ = dict.get + __setattr__ = dict.__setitem__ + __delattr__ = dict.__delitem__ + +def normalize_weights(w): + pos_sum = 0 + neg_sum = 0 + for i in w: + if i > 0: + pos_sum += i + else: + neg_sum += i + neg_sum = abs(neg_sum) + for i in range(len(w)): + if w[i] > 0: + w[i] /= pos_sum + else: + w[i] /= neg_sum + return w + + + +def random_matrix_theory_based_cov(log_returns): + """ + This is inspired by the excellent post @ + https://srome.github.io/Eigenvesting-III-Random-Matrix-Filtering-In-Finance/ + """ + returns_matrix = log_returns.T + + # Calculate variance and std, will come in handy during reconstruction + variances = np.diag(np.cov(returns_matrix)) + standard_deviations = np.sqrt(variances) + + # Get correlation matrix and compute eigen vectors and values + correlation_matrix = np.corrcoef(returns_matrix) + eig_values, eig_vectors = np.linalg.eigh(correlation_matrix) + + # Get maximum theoretical eigen value for a random matrix + sigma = 1 # The variance for all of the standardized log returns is 1 + Q = returns_matrix.shape[1] / returns_matrix.shape[0] + max_theoretical_eval = np.power(sigma*(1 + np.sqrt(1/Q)), 2) + + # Prune random eigen values + # eig_values_pruned = eig_values[eig_values > max_theoretical_eval] + eig_values[eig_values <= max_theoretical_eval] = 0 + + # Reconstruct the covariance matrix from the correlation matrix + # and filtered eigen values + temp = np.dot(eig_vectors, np.dot( + np.diag(eig_values), np.transpose(eig_vectors))) + np.fill_diagonal(temp, 1) + filtered_matrix = temp + filtered_cov_matrix = np.dot(np.diag(standard_deviations), + np.dot(filtered_matrix, + np.diag(standard_deviations))) + return pd.DataFrame(columns=log_returns.columns, + data=filtered_cov_matrix) + + +def get_price_deltas(prices: pd.DataFrame): + """ + Calculate ratio of change + """ + return ((prices - prices.shift()) / prices.shift())[1:] + +def get_capm_returns(data:pd.DataFrame) -> pd.DataFrame: + #not correct + return data.std() * (get_price_deltas(data).mean() ) + +def get_log_returns(data: pd.DataFrame) -> pd.DataFrame: + return np.log((data / data.shift())[1:]) + + +def get_exp_returns(data: pd.DataFrame) -> pd.DataFrame: + return get_price_deltas(data).ewm(span=len(data)).mean() + + +def get_predicted_returns(data: pd.DataFrame) -> pd.DataFrame: + return get_price_deltas(data).div( + np.array(np.arange((len(data) - 1), 0, -1)), axis=0)