11import logging
2- from abc import ABCMeta , abstractmethod , ABC
3- from collections import defaultdict
4- from datetime import datetime
2+ from abc import ABCMeta
53from typing import Union , List , Dict , Tuple , Any , Optional
64
75import astropy .units as u
8- import numpy as np
96from astropy .coordinates import SkyCoord
107
118from pyobs .images import Image
129from pyobs .interfaces import IAutoGuiding , IFitsHeaderBefore , IFitsHeaderAfter
1310from pyobs .utils .time import Time
1411from ._base import BasePointing
15- from ...images .meta import PixelOffsets
12+ from .guidingstatistics import GuidingStatisticsUptime , GuidingStatisticsPixelOffset
13+ from .guidingstatistics .guidingstatistics import GuidingStatistics
1614from ...interfaces import ITelescope
15+ from ...object import get_object
1716
1817log = logging .getLogger (__name__ )
1918
2019
21- class _GuidingStatistics (ABC ):
22- """Calculates statistics for guiding."""
23-
24- def __init__ (self ):
25- self ._sessions : Dict [str , List [Any ]] = defaultdict (list )
26-
27- def init_stats (self , client : str , default : Any = None ) -> None :
28- """
29- Inits a stat measurement session for a client.
30-
31- Args:
32- client: name/id of the client
33- default: first entry in session
34- """
35-
36- self ._sessions [client ] = []
37-
38- if default is not None :
39- self ._sessions [client ].append (self ._get_session_data (default ))
40-
41- @abstractmethod
42- def _build_header (self , data : Any ) -> Dict [str , Tuple [Any , str ]]:
43- raise NotImplementedError
44-
45- def add_to_header (self , client : str , header : Dict [str , Tuple [Any , str ]]) -> Dict [str , Tuple [Any , str ]]:
46- """
47- Add statistics to given header.
48-
49- Args:
50- client: id/name of the client
51- header: Header dict to add statistics to.
52- """
53-
54- data = self ._sessions .pop (client )
55- session_header = self ._build_header (data )
56-
57- return header | session_header
58-
59- @abstractmethod
60- def _get_session_data (self , input_data : Any ) -> Any :
61- raise NotImplementedError
62-
63- def add_data (self , input_data : Any ) -> None :
64- """
65- Adds data to all client measurement sessions.
66- Args:
67- input_data: Image witch metadata
68- """
69-
70- data = self ._get_session_data (input_data )
71-
72- for k in self ._sessions .keys ():
73- self ._sessions [k ].append (data )
74-
75-
76- class _GuidingStatisticsPixelOffset (_GuidingStatistics ):
77- @staticmethod
78- def _calc_rms (data : List [Tuple [float , float ]]) -> Optional [Tuple [float , float ]]:
79- """
80- Calculates RMS of data.
81-
82- Args:
83- data: Data to calculate RMS for.
84-
85- Returns:
86- Tuple of RMS.
87- """
88- if len (data ) < 3 :
89- return None
90-
91- flattened_data = np .array (list (map (list , zip (* data ))))
92- data_len = len (flattened_data [0 ])
93- rms = np .sqrt (np .sum (np .power (flattened_data , 2 ), axis = 1 ) / data_len )
94- return tuple (rms )
95-
96- def _build_header (self , data : List [Tuple [float , float ]]) -> Dict [str , Tuple [Any , str ]]:
97- header = {}
98- rms = self ._calc_rms (data )
99-
100- if rms is not None :
101- header ["GUIDING RMS1" ] = (float (rms [0 ]), "RMS for guiding on axis 1" )
102- header ["GUIDING RMS2" ] = (float (rms [1 ]), "RMS for guiding on axis 2" )
103-
104- return header
105-
106- def _get_session_data (self , data : Image ) -> Tuple [float , float ]:
107- if data .has_meta (PixelOffsets ):
108- meta = data .get_meta (PixelOffsets )
109- primitive = tuple (meta .__dict__ .values ())
110- return primitive
111- else :
112- log .warning ("Image is missing the necessary meta information!" )
113- raise KeyError ("Unknown meta." )
114-
115-
116- class _GuidingStatisticsUptime (_GuidingStatistics ):
117- @staticmethod
118- def _calc_uptime (states : List [Tuple [bool , datetime ]]) -> int :
119- uptimes : List [int ] = []
120- for i , entry in enumerate (states ):
121- state , timestamp = entry
122- # is not closed?
123- if not state or i + 1 == len (states ):
124- continue
125-
126- uptime = (states [i + 1 ][1 ] - timestamp ).seconds
127- uptimes .append (uptime )
128-
129- return sum (uptimes )
130-
131- @staticmethod
132- def _calc_total_time (states : List [Tuple [bool , datetime ]]) -> int :
133- initial_time = states [0 ][1 ]
134- end_time = states [- 1 ][1 ]
135- return (end_time - initial_time ).seconds
136-
137- @staticmethod
138- def _calc_uptime_percentage (states : List [Tuple [bool , datetime ]]) -> float :
139- uptime = _GuidingStatisticsUptime ._calc_uptime (states )
140- total_time = _GuidingStatisticsUptime ._calc_total_time (states )
141-
142- """
143- If no time has passed, return 100 if the loop is closed, 0 else.
144- We have to take the second last value in the state array, since the last value is the stop value.
145- """
146- if total_time == 0 :
147- return int (states [- 2 ][0 ]) * 100.0
148-
149- return uptime / total_time * 100.0
150-
151- def _build_header (self , data : List [Tuple [bool , datetime ]]) -> Dict [str , Tuple [Any , str ]]:
152- now = datetime .now ()
153- data .append ((None , now ))
154-
155- uptime_percentage = self ._calc_uptime_percentage (data )
156- return {"GUIDING UPTIME" : (uptime_percentage , "Time the guiding loop was closed [%]" )}
157-
158- def _get_session_data (self , input_data : bool ) -> Tuple [bool , datetime ]:
159- now = datetime .now ()
160- return input_data , now
161-
162-
16320class BaseGuiding (BasePointing , IAutoGuiding , IFitsHeaderBefore , IFitsHeaderAfter , metaclass = ABCMeta ):
16421 """Base class for guiding modules."""
16522
@@ -174,6 +31,7 @@ def __init__(
17431 pid : bool = False ,
17532 reset_at_focus : bool = True ,
17633 reset_at_filter : bool = True ,
34+ guiding_statistic : Optional [Union [Dict [str , Any ], GuidingStatistics ]] = None ,
17735 ** kwargs : Any ,
17836 ):
17937 """Initializes a new science frame auto guiding system.
@@ -205,8 +63,11 @@ def __init__(
20563 self ._ref_header = None
20664
20765 # stats
208- self ._statistics = _GuidingStatisticsPixelOffset ()
209- self ._uptime = _GuidingStatisticsUptime ()
66+ if guiding_statistic is None :
67+ guiding_statistic = GuidingStatisticsPixelOffset ()
68+
69+ self ._statistics = get_object (guiding_statistic , GuidingStatistics )
70+ self ._uptime = GuidingStatisticsUptime ()
21071
21172 async def start (self , ** kwargs : Any ) -> None :
21273 """Starts/resets auto-guiding."""
0 commit comments