diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8c05cb7..7c0d90b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,7 +16,7 @@ repos: - id: trailing-whitespace - repo: https://github.com/astral-sh/ruff-pre-commit - rev: "v0.12.1" + rev: "v0.12.3" hooks: # id: ruff-check would go here if using both - id: ruff-format diff --git a/src/lightcurves/LC.py b/src/lightcurves/LC.py index c9dead9..f44ff17 100644 --- a/src/lightcurves/LC.py +++ b/src/lightcurves/LC.py @@ -1,5 +1,5 @@ from __future__ import annotations -from typing import Union, Sequence, Self, Optional, Tuple, List + import logging import pickle @@ -22,10 +22,11 @@ ERROR: sth didn't work, abort mission """ -def load_lc(path: str) -> "LightCurve": + +def load_lc(path: str) -> LightCurve: """ Load a pickled LightCurve instance from a file created with `save_lc()`. - + WARNING ------- Uses pickle. Loaded instance reflects the class at save-time. @@ -33,10 +34,11 @@ def load_lc(path: str) -> "LightCurve": with open(path, "rb") as f: return pickle.load(f) -def load_lc_npy(path: str) -> "LightCurve": + +def load_lc_npy(path: str) -> LightCurve: """ Load pickled LightCurve instance from numpy array created with `save_lc()`. - + WARNING ------- Uses pickle. Loaded instance reflects the class at save-time. @@ -44,25 +46,24 @@ def load_lc_npy(path: str) -> "LightCurve": with open(path, "rb") as f: return pickle.load(f) -def load_lc_csv(path: str) -> "LightCurve": + +def load_lc_csv(path: str) -> LightCurve: """ Load a pickled LightCurve instance from a CSV file saved with `save_csv()`. """ a = np.genfromtxt(path) return LightCurve(a[0], a[1], a[2]) + def flux_puffer( - flux: np.ndarray, - flux_error: np.ndarray, - threshold: float, - threshold_error: float - ) -> Tuple[np.ndarray, np.ndarray]: + flux: np.ndarray, flux_error: np.ndarray, threshold: float, threshold_error: float +) -> tuple[np.ndarray, np.ndarray]: """ Set every flux value under a threshold to the threshold, with an error. Use Case -------- - Apply Bayesian blocks to puffered LC to detect significant variations + Apply Bayesian blocks to puffered LC to detect significant variations relative to threshold (e.g. flares). WARNING @@ -83,20 +84,21 @@ def flux_puffer( Returns ------- flux_new : array_like - Modified flux array with threshold applied. + Modified flux array with threshold applied. flux_error_new : array like Associated uncertainties, modified analogously. """ flux_new = np.where(flux > threshold, flux, threshold) flux_error_new = np.where(flux > threshold, flux_error, th_error) - return(flux_new, flux_error_new) + return (flux_new, flux_error_new) + def clean_data( - time: np.ndarray, - flux: np.ndarray, - flux_error: np.ndarray, - ts: Optional[np.ndarray] = None - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, Optional[np.ndarray]]: + time: np.ndarray, + flux: np.ndarray, + flux_error: np.ndarray, + ts: np.ndarray | None = None, +) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray | None]: """ Remove NaNs from flux and flux_error and drop duplicate time stamps. @@ -128,33 +130,31 @@ def clean_data( TBD --- - this could be a LC method and return a new LC or alter the instance? + this could be a LC method and return a new LC or alter the instance? """ # Mask NaN values of flux and flux_error nan_mask = ~np.isnan(flux) & ~np.isnan(flux_error) flux_ = flux[nan_mask] flux_error_ = flux_error[nan_mask] time_ = time[nan_mask] - logging.info(f'Deleted {len(flux) - len(flux_)} NaN entries.') + logging.info(f"Deleted {len(flux) - len(flux_)} NaN entries.") # Remove duplicate times (keep first occurrence) time_unique, time_unique_id = np.unique(time_, return_index=True) flux_clean = flux_[time_unique_id] flux_error_clean = flux_error_[time_unique_id] - logging.info(f'Deleted {len(time_) - len(time_unique)} time duplicates') + logging.info(f"Deleted {len(time_) - len(time_unique)} time duplicates") if ts is not None: ts_ = ts[nan_mask] ts_clean = ts_[time_unique_id] return (time_unique, flux_clean, flux_error_clean, ts_clean) - elif ts is None: - return(time_unique, flux_clean, flux_error_clean, None) + if ts is None: + return (time_unique, flux_clean, flux_error_clean, None) def get_gti_iis( - time: np.ndarray, - n_gaps: int, - n_pick: Optional[int] - ) -> Tuple[np.ndarray, np.ndarray]: + time: np.ndarray, n_gaps: int, n_pick: int | None +) -> tuple[np.ndarray, np.ndarray]: """ Determine Good Time Intervals (GTIs) of LC, based on largest time gaps. Useful for instruments with seasonal or periodic gaps, e.g., FACT. @@ -177,34 +177,29 @@ def get_gti_iis( """ diff = np.diff(time) diff1 = np.sort(diff) - ii = [x for x in range(len(diff)) if diff[x] in diff1[-n_gaps:]] - #index of the 10 longest gaps... risky business due to precision issues - GTI_start_ii = np.array(ii)+1 - GTI_start_ii = np.insert(GTI_start_ii,0,0) + ii = [x for x in range(len(diff)) if diff[x] in diff1[-n_gaps:]] + # index of the 10 longest gaps... risky business due to precision issues + GTI_start_ii = np.array(ii) + 1 + GTI_start_ii = np.insert(GTI_start_ii, 0, 0) GTI_end_ii = np.array(ii) - GTI_end_ii = np.append(GTI_end_ii, len(time)-1) + GTI_end_ii = np.append(GTI_end_ii, len(time) - 1) if n_pick: - # only consider the n_pick longest GTIs + # only consider the n_pick longest GTIs # TBD: double check, might compute index differences, not time gaps.. - gap_len = np.array([t - s for s,t in zip(GTI_start_ii, GTI_end_ii)]) + gap_len = np.array([t - s for s, t in zip(GTI_start_ii, GTI_end_ii, strict=False)]) gap_len1 = np.sort(gap_len) - ii = [x for x in range(len(gap_len)) if gap_len[x] in gap_len1[-n_pick:]] + ii = [x for x in range(len(gap_len)) if gap_len[x] in gap_len1[-n_pick:]] # n_gaps = considered gaps (longest not gaps) GTI_start_ii_ = GTI_start_ii[ii] GTI_end_ii_ = GTI_end_ii[ii] return GTI_start_ii_, GTI_end_ii_ - else: - return GTI_start_ii, GTI_end_ii + return GTI_start_ii, GTI_end_ii -def make_gti_lcs( - lc: "LightCurve", - n_gaps: int, - n_pick: int = None - ) -> np.ndarray: +def make_gti_lcs(lc: LightCurve, n_gaps: int, n_pick: int = None) -> np.ndarray: """ Divide LC into several LCs (Good Time Intervals = GTIs) based on largest - time gaps. Optionally only pick largest GTIs. + time gaps. Optionally only pick largest GTIs. Parameters ---------- @@ -222,17 +217,18 @@ def make_gti_lcs( """ gti_starts, gti_ends = get_gti_iis(lc.time, n_gaps, n_pick) if n_pick is None: - n_pick = n_gaps + 1 #select all + n_pick = n_gaps + 1 # select all chunks = [] for g in range(n_pick): gti_lc = LightCurve( - lc.time[gti_starts[g]:gti_ends[g]+1], - lc.flux[gti_starts[g]:gti_ends[g]+1], - lc.flux_error[gti_starts[g]:gti_ends[g]+1], + lc.time[gti_starts[g] : gti_ends[g] + 1], + lc.flux[gti_starts[g] : gti_ends[g] + 1], + lc.flux_error[gti_starts[g] : gti_ends[g] + 1], name=lc.name, - z=lc.z) + z=lc.z, + ) chunks.append(gti_lc) - return(np.array(chunks, dtype=object)) + return np.array(chunks, dtype=object) # ----------------------------------------------------------------------------- @@ -293,15 +289,15 @@ class LightCurve: def __init__( self, - time: Union[np.ndarray, list], - flux: Union[np.ndarray, list], - flux_error: Union[np.ndarray, list], - time_format: Optional[str] = None, - name: Optional[str] = None, - z: Optional[float] = None, - telescope: Optional[str] = None, - cadence: Optional[float] = None, - ): + time: np.ndarray | list, + flux: np.ndarray | list, + flux_error: np.ndarray | list, + time_format: str | None = None, + name: str | None = None, + z: float | None = None, + telescope: str | None = None, + cadence: float | None = None, + ): self.time = np.array(time) self.flux = np.array(flux) self.flux_error = np.array(flux_error) @@ -312,10 +308,7 @@ def __init__( self.cadence = cadence if len(time) != len(flux) or len(time) != len(flux_error): raise ValueError("Input arrays do not have same length") - if ( - len(flux[np.isnan(flux)]) > 0 - or len(flux_error[np.isnan(flux_error)]) > 0 - ): + if len(flux[np.isnan(flux)]) > 0 or len(flux_error[np.isnan(flux_error)]) > 0: raise TypeError("flux or flux_error contain np.nan values") if len(time) != len(np.unique(time)): raise ValueError("time contains duplicate values") @@ -331,7 +324,7 @@ def __repr__(self): return ( f"LightCurve (bins = {len(self.flux)}, " f"name = {self.name}, " - f"cadende = {self.cadence}, " + f"cadende = {self.cadence}, " f"telescope = {self.telescope}, " f"z = {self.z})" ) @@ -340,12 +333,11 @@ def __len__(self): return len(self.time) def __getitem__( - self, - inbr: Union[int, slice, List[int]] - ) -> Union[np.ndarray, "LightCurve"]: + self, inbr: int | slice | list[int] + ) -> np.ndarray | LightCurve: """ Access elements or subsets of the LightCurve using indexing or slicing. - + Supports: - Integer index: returns one bin (time, flux, flux_error), eg `lc[i]` - Slice: returns a new LightCurve, eg `lc[i:j]` @@ -371,11 +363,7 @@ def __getitem__( """ if isinstance(inbr, int): - return np.array([ - self.time[inbr], - self.flux[inbr], - self.flux_error[inbr] - ]) + return np.array([self.time[inbr], self.flux[inbr], self.flux_error[inbr]]) if isinstance(inbr, slice): return LightCurve( self.time[inbr], @@ -389,16 +377,11 @@ def __getitem__( ) if isinstance(inbr, list): # can't be implemented with 'int or list' -> confusion with slice - return np.array([ - self.time[inbr], - self.flux[inbr], - self.flux_error[inbr] - ]) - else: - raise TypeError("Index must be int, slice, or list of ints.") + return np.array([self.time[inbr], self.flux[inbr], self.flux_error[inbr]]) + raise TypeError("Index must be int, slice, or list of ints.") - def select_by_time(self, t_min: float, t_max: float) -> "LightCurve": - """ + def select_by_time(self, t_min: float, t_max: float) -> LightCurve: + """ Select a portion of the light curve between two time bounds. Parameters @@ -417,15 +400,15 @@ def select_by_time(self, t_min: float, t_max: float) -> "LightCurve": i_e = np.where(self.time >= t_max)[0][0] return self.__getitem__(slice(i_s, i_e, None)) - def save_npy(self,path: str) -> None: + def save_npy(self, path: str) -> None: """ Save light curve object as .npy file using pickle. - + Parameters ---------- path : str Destination file path. - + Notes ----- Use `load_lc_npy()` to read this file. @@ -450,32 +433,20 @@ def save_csv(self, path: str, bblocks: bool = False) -> None: Use `load_lc_csv()` to read this file and LC.py will be current state. """ if bblocks is True: - data = np.array([ - self.time, self.flux, self.flux_error, self.block_pbin - ]) - np.savetxt( - path, - data, - comments="#time, flux, flux_error, block_pbin" - ) + data = np.array([self.time, self.flux, self.flux_error, self.block_pbin]) + np.savetxt(path, data, comments="#time, flux, flux_error, block_pbin") else: - data = np.array([ - self.time, self.flux, self.flux_error - ]) - np.savetxt( - path, - data, - comments="#time, flux, flux_error" - ) + data = np.array([self.time, self.flux, self.flux_error]) + np.savetxt(path, data, comments="#time, flux, flux_error") def plot_lc( - self, - data_color: str = "k", - ax: Optional[Axes] = None, - new_time_format: Optional[str] = None, - size: float = 1, - **kwargs - ) -> None: + self, + data_color: str = "k", + ax: Axes | None = None, + new_time_format: str | None = None, + size: float = 1, + **kwargs, + ) -> None: """ Plot the light curve with error bars. @@ -542,12 +513,7 @@ def plot_lc( axtop.set_xticklabels(new_labels) plt.sca(ax) # go back to initial bottom axis - def plot_hline( - self, - value: float, - ax: Optional[Axes] = None, - **kwargs - ) -> None: + def plot_hline(self, value: float, ax: Axes | None = None, **kwargs) -> None: """ Plot a horizontal line across the light curve plot. @@ -568,12 +534,7 @@ def plot_hline( ax = plt.gca() ax.hlines(value, xmin=min(self.time), xmax=max(self.time), **kwargs) - def plot_vline( - self, - value: float, - ax: Optional[Axes] = None, - **kwargs - ) -> None: + def plot_vline(self, value: float, ax: Axes | None = None, **kwargs) -> None: """ Plot a vertical line across the light curve plot. @@ -595,12 +556,8 @@ def plot_vline( ax.vlines(value, ymin=min(self.flux), ymax=max(self.flux), **kwargs) def plot_shade( - self, - start_time: float, - end_time: float, - ax: Optional[Axes] = None, - **kwargs - ) -> None: + self, start_time: float, end_time: float, ax: Axes | None = None, **kwargs + ) -> None: """ Plot a shaded region between two time points. @@ -627,11 +584,8 @@ def plot_shade( ax.fill_between(x, y, y1, step="mid", alpha=0.2, zorder=0, **kwargs) def plot_grid( - self, - spacing: float = 10, - ax: Optional[Axes] = None, - **kwargs - ) -> None: + self, spacing: float = 10, ax: Axes | None = None, **kwargs + ) -> None: """ Add a minor grid to the time axis at specified spacing. @@ -652,17 +606,13 @@ def plot_grid( ax = plt.gca() rounded_start = np.round(np.min(self.time) / spacing) * spacing rounded_end = np.round(np.max(self.time) / spacing) * spacing - ax.set_xticks( - np.arange(rounded_start, rounded_end, spacing), minor=True - ) + ax.set_xticks(np.arange(rounded_start, rounded_end, spacing), minor=True) ax.grid(which="minor", **kwargs) # ------------------------------------------------------------------------- def get_bblocks( - self, - gamma_value: Optional[float] = None, - p0_value: Optional[float] = 0.05 - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + self, gamma_value: float | None = None, p0_value: float | None = 0.05 + ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Apply the Bayesian Blocks algorithm to segment the light curve based on statistically significant change points. @@ -675,13 +625,13 @@ def get_bblocks( Parameters ---------- gamma_value : float, optional - Regularization parameter controlling the number of blocks. + Regularization parameter controlling the number of blocks. Higher values yield fewer blocks. `gamma_value` overwrites `p0_value`. p0_value : float, optional - False alarm probability, used to control the sensitivity of change - point detection. Default is 0.05. Calibrated with white noise. - + False alarm probability, used to control the sensitivity of change + point detection. Default is 0.05. Calibrated with white noise. + Returns ------- block_pbin : np.ndarray @@ -698,7 +648,7 @@ def get_bblocks( # This is relevant for computing bb_i and making Hopjects. Notes - ----- + ----- - See: Scargle et al. 2013, arXiv:1304.2818 - See: Jupyter Notebook on GitHub for illustration. - For constant light curves, a single block is returned. @@ -710,14 +660,12 @@ def get_bblocks( sigma=self.flux_error, fitness="measures", gamma=gamma_value, - p0=p0_value - ) + p0=p0_value, + ) logging.debug("got edges for light curve") if len(self.edges) <= 2: - logging.warning( - "light curve is constant; only one bayesian block found." - ) + logging.warning("light curve is constant; only one bayesian block found.") self.block_pbin = np.ones(len(self.flux)) * np.mean(self.flux) self.block_val = np.array([np.mean(self.flux)]) self.block_val_error = np.array([np.std(self.flux)]) @@ -729,7 +677,7 @@ def get_bblocks( self.block_val_error, self.edge_index, self.edges, - ) + ) # get edge_index self.edge_index = np.array( @@ -748,9 +696,9 @@ def get_bblocks( start = self.edge_index[j] end = self.edge_index[j + 1] self.block_val[j] = np.mean(self.flux[start:end]) - self.block_val_error[j] = ( - np.sqrt(np.sum(self.flux_error[start:end] ** 2)) / (end-start) - ) + self.block_val_error[j] = np.sqrt( + np.sum(self.flux_error[start:end] ** 2) + ) / (end - start) # create block-per-bin array corresponding to flux self.block_pbin = np.zeros(len(self.flux)) @@ -770,9 +718,8 @@ def get_bblocks( # ------------------------------------------------------------------------- def get_bblocks_above( - self, - threshold: float - ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: + self, threshold: float + ) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Modify Bayesian blocks by replacing all block values below a threshold with the threshold, and merging consecutive blocks (neglecting edges) @@ -817,35 +764,32 @@ def get_bblocks_above( ----- - `lc.get_bblocks()` must be run before calling this function. - Errors are not recomputed after thresholding. TBD: thresholderror. - + """ # Replace all values below threshold try: self.block_pbin = np.where( self.block_pbin > threshold, self.block_pbin, threshold - ) + ) self.block_val = np.where( self.block_val > threshold, self.block_val, threshold - ) + ) except AttributeError: raise AttributeError( "Initialize Bayesian blocks with lc.get_bblocks() first!" - ) + ) # Merge neighbouring threshold blocks and delete edges block_mask = np.ones(len(self.block_val), dtype=bool) edge_mask = np.ones(len(self.edges), dtype=bool) for i in range(len(self.block_val) - 1): - if ( - self.block_val[i] == threshold - and self.block_val[i + 1] == threshold - ): + if self.block_val[i] == threshold and self.block_val[i + 1] == threshold: block_mask[i + 1] = False edge_mask[i + 1] = False self.block_val = self.block_val[block_mask] - self.block_val_error = self.block_val_error[block_mask] - #TBD -> thresholderror + self.block_val_error = self.block_val_error[block_mask] + # TBD -> thresholderror self.edge_index = self.edge_index[edge_mask] self.edges = self.edges[edge_mask] return ( @@ -859,11 +803,11 @@ def get_bblocks_above( # ------------------------------------------------------------------------- def plot_bblocks( self, - ax: Optional[Axes] = None, + ax: Axes | None = None, color: str = "steelblue", linewidth: float = 1, - **kwargs - ) -> None: + **kwargs, + ) -> None: """ Plot the Bayesian block representation of the light curve. @@ -901,7 +845,7 @@ def plot_bblocks( where="mid", linewidth=linewdith, color=color, - **kwargs + **kwargs, ) except AttributeError: raise AttributeError( @@ -912,13 +856,13 @@ def plot_bblocks( def bb_i(self, t: float): """ Get the index of the Bayesian block (in `block_val`) that contains the - given time. + given time. If `t == edge`, return the block *left* of the edge (t = end of block). This correctly identifies times from HOP 'baseline'. Identify times from other HOP methods with `bb_i_start` and `bb_i_end`! - + Parameters ---------- t : float @@ -944,7 +888,7 @@ def bb_i(self, t: float): e for e in range(len(self.edges) - 1) if t > self.edges[e] and t <= self.edges[e + 1] - ] + ] return int(block_index[0]) def bb_i_start(self, t: float): @@ -966,7 +910,7 @@ def bb_i_start(self, t: float): ------- int Index of `lc.block_val` starting with `t`. - + Notes ----- - Works fine with HOP 'flip', 'halfclap', and 'sharp' @@ -976,7 +920,7 @@ def bb_i_start(self, t: float): e for e in range(len(self.edges) - 1) if t >= self.edges[e] and t < self.edges[e + 1] - ] + ] return int(block_index[0]) def bb_i_end(self, t: float): @@ -998,7 +942,7 @@ def bb_i_end(self, t: float): ------- int Index of `lc.block_val` starting with `t`. - + Notes ----- - Works fine with HOP 'flip', 'halfclap', and 'sharp' @@ -1008,7 +952,7 @@ def bb_i_end(self, t: float): e for e in range(len(self.edges) - 1) if t > self.edges[e] and t <= self.edges[e + 1] - ] + ] return int(block_index[0]) # ------------------------------------------------------------------------- @@ -1016,18 +960,18 @@ def find_hop( self, method: str = "half", lc_edges: str = "neglect", - baseline: Optional[float] = None - ) -> List[Hopject]: #this type is not known in LC.py yet + baseline: float | None = None, + ) -> list[Hopject]: # this type is not known in LC.py yet """ - TBD + TBD currently this is kind of an implicit circular renference: - * LC has List of HOPS + * LC has List of HOPS * and HOP as LC as attribute - fix such that: + fix such that: * LC imports Hopjects * find_hop determines HOP parameters and is called by.. * dedicated get_hop function (here) which returns Hopjects with - * time, flux, flux_error, iis, bblock things + * time, flux, flux_error, iis, bblock things * and iis & bblock things are optional in creating a Hopject so spirit will be that HOP is not so standalone, just add-on to LC TBD @@ -1050,18 +994,18 @@ def find_hop( # ------------------------------------------------------------------------- def plot_hop( self, - ax: Optional[Axes] = None, - color: Union[str, List[str]] = ["lightsalmon", "orchid"], + ax: Axes | None = None, + color: str | list[str] = ["lightsalmon", "orchid"], alpha: float = 0.2, - label: Optional[str] = None, + label: str | None = None, zorder: float = 0, - **kwargs - ) -> None: + **kwargs, + ) -> None: """ Plot shaded rectangular area for each HOP group in the light curve. - To visualize HOP segments (e.g. flares) by highlighting them as - rectangular area in the light curve. Multiple colors can be used to + To visualize HOP segments (e.g. flares) by highlighting them as + rectangular area in the light curve. Multiple colors can be used to alternate between consecutive segments for clarity. Parameters @@ -1075,7 +1019,7 @@ def plot_hop( alpha : float, optional Transparency level (0-1) for the shaded area. Default is 0.2. label : str, optional - Label for the first HOP area, used in the plot legend. If None, + Label for the first HOP area, used in the plot legend. If None, defaults to `"hop "`. zorder : float, optional Z-order for the plot layer. Default is 0 (in the background). @@ -1112,8 +1056,8 @@ def plot_hop( alpha=alpha, label=label if i == 0 else None, zorder=zorder, - **kwargs - ) + **kwargs, + ) """ OLD: if i == 0: @@ -1144,10 +1088,10 @@ def plot_hop( ) elif i != 0: ax.fill_between( - x, y, y1, step="mid", color="lightsalmon", alpha=0.2, + x, y, y1, step="mid", color="lightsalmon", alpha=0.2, zorder=0 ) - """ + """ # ------------------------------------------------------------------------- def plot_all_hop(self) -> None: @@ -1186,10 +1130,3 @@ def plot_all_hop(self) -> None: self.plot_hop() plt.ylabel("sharp") fig.subplots_adjust(hspace=0) - - - - - - - diff --git a/src/lightcurves/__init__.py b/src/lightcurves/__init__.py index 42206ab..606cbe1 100644 --- a/src/lightcurves/__init__.py +++ b/src/lightcurves/__init__.py @@ -1,6 +1,7 @@ """ ... """ + from __future__ import annotations from .version import __version__ diff --git a/tests/test_basics.py b/tests/test_basics.py index fc05c4a..22cd3ae 100644 --- a/tests/test_basics.py +++ b/tests/test_basics.py @@ -2,5 +2,4 @@ def test_import(): - assert True