|
2 | 2 | from typing import Union |
3 | 3 | from numpy.typing import ArrayLike |
4 | 4 | from pyhctsa.Operations.Correlation import FirstCrossing, AutoCorr |
| 5 | +from pyhctsa.Operations.Stationarity import SlidingWindow |
| 6 | +from scipy.stats import ansari, gaussian_kde |
| 7 | +from statsmodels.sandbox.stats.runs import runstest_1samp |
| 8 | + |
| 9 | + |
| 10 | +def Walker(y : ArrayLike, walkerRule : str = 'prop', walkerParams : Union[None, float, int, list] = None) -> dict: |
| 11 | + """ |
| 12 | + Simulates a hypothetical walker moving through the time domain. |
| 13 | +
|
| 14 | + Note: due to differences in how the kde is implemented, exepct a discrepancy in the |
| 15 | + `sw_distdiff` feature. |
| 16 | + """ |
| 17 | + N = len(y) |
| 18 | + |
| 19 | + # Define default values and type requirements for each rule |
| 20 | + WALKER_CONFIGS = { |
| 21 | + 'prop': { |
| 22 | + 'default': 0.5, |
| 23 | + 'valid_types': (int, float), |
| 24 | + 'error_msg': 'must be float or integer' |
| 25 | + }, |
| 26 | + 'biasprop': { |
| 27 | + 'default': [0.1, 0.2], |
| 28 | + 'valid_types': (list,), |
| 29 | + 'error_msg': 'must be a list' |
| 30 | + }, |
| 31 | + 'momentum': { |
| 32 | + 'default': 2, |
| 33 | + 'valid_types': (int, float), |
| 34 | + 'error_msg': 'must be float or integer' |
| 35 | + } |
| 36 | + } |
| 37 | + |
| 38 | + if walkerRule not in WALKER_CONFIGS: |
| 39 | + valid_rules = ", ".join(f"'{rule}'" for rule in WALKER_CONFIGS.keys()) |
| 40 | + raise ValueError(f"Unknown walker_rule: '{walkerRule}'. Choose from: {valid_rules}") |
| 41 | + |
| 42 | + # get configuration for the specified rule |
| 43 | + config = WALKER_CONFIGS[walkerRule] |
| 44 | + |
| 45 | + # use the default value if no parameters provided |
| 46 | + if walkerParams is None: |
| 47 | + walkerParams = config['default'] |
| 48 | + |
| 49 | + if not isinstance(walkerParams, config["valid_types"]): |
| 50 | + raise ValueError( |
| 51 | + f"walkerParams {config['error_msg']} for walker rule: '{walkerRule}'" |
| 52 | + ) |
| 53 | + |
| 54 | + # Do the walk |
| 55 | + w = np.zeros(N) |
| 56 | + |
| 57 | + if walkerRule == 'prop': |
| 58 | + # % walker starts at zero and narrows the gap between its position |
| 59 | + #and the time series value at that point by the proportion given |
| 60 | + #in walkerParams, to give the value at the subsequent time step |
| 61 | + p = walkerParams |
| 62 | + w[0] = 0 # start at zero |
| 63 | + for i in range(1, N): |
| 64 | + w[i] = w[i-1] + p * (y[i-1] - w[i-1]) |
| 65 | + |
| 66 | + elif walkerRule == 'biasprop': |
| 67 | + #walker is biased in one or the other direction (i.e., prefers to |
| 68 | + # go up, or down). Requires a vector of inputs: [p_up, p_down] |
| 69 | + pup, pdown = walkerParams |
| 70 | + |
| 71 | + w[0] = 0 |
| 72 | + for i in range(1, N): |
| 73 | + if y[i] > y[i-1]: # time series inceases |
| 74 | + w[i] = w[i-1] + pup*(y[i-1]-w[i-1]) |
| 75 | + else: |
| 76 | + w[i] = w[i-1] + pdown*(y[i-1]-w[i-1]) |
| 77 | + elif walkerRule == 'momentum': |
| 78 | + # % walker moves as if it had inertia from the previous time step, |
| 79 | + # i.e., it 'wants' to move the same amount; the time series acts as |
| 80 | + # a force changing its motion |
| 81 | + m = walkerParams # 'inertial mass' |
| 82 | + |
| 83 | + w[0] = y[0] |
| 84 | + w[1] = y[1] |
| 85 | + for i in range(2, N): |
| 86 | + w_inert = w[i-1] + (w[i-1]-w[i-2]) |
| 87 | + w[i] = w_inert + (y[i]-w_inert)/m # dissipative term |
| 88 | + # % equation of motion (s-s_0=ut+F/m*t^2) |
| 89 | + # where the 'force' F is the change in the original time series |
| 90 | + # at that point |
| 91 | + else: |
| 92 | + raise ValueError(f"Unknown rule : {walkerRule}") |
| 93 | + |
| 94 | + # Get statistics on the walk |
| 95 | + out = {} |
| 96 | + # the walk itself |
| 97 | + out['w_mean'] = np.mean(w) |
| 98 | + out['w_median'] = np.median(w) |
| 99 | + out['w_std'] = np.std(w, ddof=1) |
| 100 | + out['w_ac1'] = AutoCorr(w, 1, 'Fourier')[0] # lag 1 autocorr |
| 101 | + out['w_ac2'] = AutoCorr(w, 2, 'Fourier')[0] # lag 2 autocorr |
| 102 | + out['w_tau'] = FirstCrossing(w, 'ac', 0, 'continuous') |
| 103 | + out['w_min'] = np.min(w) |
| 104 | + out['w_max'] = np.max(w) |
| 105 | + # fraction of time series length that walker crosses time series |
| 106 | + out['w_propzcross'] = (np.sum((w[:-1] * w[1:]) < 0)) / (N-1) |
| 107 | + |
| 108 | + # Differences between the walk at signal |
| 109 | + out['sw_meanabsdiff'] = np.mean(np.abs(y - w)) |
| 110 | + out['sw_taudiff'] = FirstCrossing(y, 'ac', 0, 'continuous') - FirstCrossing(w, 'ac', 0 , 'continuous') |
| 111 | + out['sw_stdrat'] = np.std(w, ddof=1)/np.std(y, ddof=1) |
| 112 | + out['sw_ac1rat'] = out['w_ac1']/AutoCorr(y, 1)[0] |
| 113 | + out['sw_minrat'] = np.min(w)/np.min(y) |
| 114 | + out['sw_maxrat'] = np.max(w)/np.max(y) |
| 115 | + out['sw_propcross'] = np.sum((w[:-1] - y[:-1]) * (w[1:] - y[1:]) < 0)/(N-1) |
| 116 | + |
| 117 | + #% test from same distribution: Ansari-Bradley test |
| 118 | + _, pval = ansari(w, y) |
| 119 | + out['sw_ansarib_pval'] = pval |
| 120 | + |
| 121 | + r = np.linspace( |
| 122 | + min(min(y), min(w)), |
| 123 | + max(max(y), max(w)), |
| 124 | + 200 |
| 125 | + ) |
| 126 | + |
| 127 | + kde_y = gaussian_kde(y) |
| 128 | + kde_w = gaussian_kde(w) |
| 129 | + dy = kde_y(r) |
| 130 | + dw = kde_w(r) |
| 131 | + out['sw_distdiff'] = np.sum(np.abs(dy - dw)) |
| 132 | + |
| 133 | + #% (iii) Looking at residuals between time series and walker |
| 134 | + res = w - y |
| 135 | + _, runs_pval = runstest_1samp(res, cutoff='mean') |
| 136 | + out['res_runstest'] = runs_pval |
| 137 | + out['res_swss5_1'] = SlidingWindow(res, 'std', 'std', 5, 1) # sliding window stationarity |
| 138 | + out['res_ac1'] = AutoCorr(res, 1)[0] # auto correlation at lag-1 |
| 139 | + |
| 140 | + return out |
5 | 141 |
|
6 | 142 |
|
7 | 143 | def ForcePotential(y : ArrayLike, whatPotential : str = 'dblwell', params : Union[list, None] = None) -> dict: |
|
0 commit comments