11from time import sleep
22from typing import Any
33from enum import StrEnum
4- from utils import send_command
4+ from utils import send_command , is_empty
55from redis import Redis
66from redis .retry import Retry
77from redis .backoff import FullJitterBackoff
1111
1212
1313class HAProxyOutput (StrEnum ):
14+ """
15+ Enum for HAProxy output messages.
16+ """
1417 SERVER_REGISTERED = "New server registered."
1518 SERVER_DELETED = "Server deleted."
1619 SERVER_NOT_FOUND = "No such server."
1720 BACKEND_NOT_FOUND = 'No such backend.'
1821
1922
2023class Handler (object ):
24+ """
25+ Handler class for managing HAProxy and Redis Sentinel.
26+ """
27+
2128 def __init__ (
2229 self ,
2330 sentinel_host : str = "127.0.0.1" ,
@@ -28,6 +35,16 @@ def __init__(
2835 haproxy_backend : str = "redis_master" ,
2936 haproxy_server_name : str = "current_master" ,
3037 ) -> None :
38+ """
39+ Initialize the Handler class.
40+ :param sentinel_host: Redis Sentinel host
41+ :param sentinel_port: Redis Sentinel port
42+ :param sentinel_password: Redis Sentinel password
43+ :param master_name: Redis master name
44+ :param haproxy_socket: HAProxy socket path
45+ :param haproxy_backend: HAProxy backend name
46+ :param haproxy_server_name: HAProxy server name
47+ """
3148 self .conn = Redis (
3249 host = sentinel_host ,
3350 port = sentinel_port ,
@@ -41,6 +58,10 @@ def __init__(
4158 self .haproxy_server_name = haproxy_server_name
4259
4360 def get_master_address (self ) -> str :
61+ """
62+ Get the address of the current master from Redis Sentinel.
63+ :return: Address of the current master
64+ """
4465 address = None
4566 sentinel_info : dict [str , Any ] = self .conn .info () # type: ignore
4667 try :
@@ -56,12 +77,22 @@ def get_master_address(self) -> str:
5677 return address
5778
5879 def send_command (self , commands : str | list [str ], log_data = True ) -> str :
80+ """
81+ Send a command to the HAProxy socket and return the response.
82+ :param commands: Command to send (string or list of strings)
83+ :param log_data: Whether to log the command and output
84+ :return: Response from the HAProxy socket
85+ """
5986 out = send_command (self .haproxy_socket , commands )
6087 if log_data :
6188 info (f"HAProxy command: { commands } , Output: { out } " )
6289 return out
6390
6491 def shutdown_current_server (self ) -> str :
92+ """
93+ Shuts down the current sesions of the server and sets it to maintenance mode.
94+ :return: Response from the HAProxy socket
95+ """
6596 return self .send_command (
6697 [
6798 f"set server { self .haproxy_backend } /{ self .haproxy_server_name } state maint" , # noqa: E501
@@ -74,6 +105,11 @@ def remove_current_server(
74105 ignore_notfound : bool = True ,
75106 shutdown : bool = True ,
76107 ) -> None :
108+ """
109+ Remove the current server from HAProxy.
110+ :param ignore_notfound: Whether to ignore "not found" errors
111+ :param shutdown: Whether to shut down the current server
112+ """
77113 out = ""
78114 if shutdown :
79115 out = self .shutdown_current_server ()
@@ -91,6 +127,10 @@ def remove_current_server(
91127 raise Exception (f"Error while removing old server: { out } " )
92128
93129 def add_server (self , address : str ):
130+ """
131+ Add a new server to HAProxy (Used for initial sets).
132+ :param address: Address of the new server
133+ """
94134 out = self .send_command (
95135 f"add server { self .haproxy_backend } /{ self .haproxy_server_name } { address } " # noqa: E501
96136 )
@@ -101,6 +141,11 @@ def add_server(self, address: str):
101141 )
102142
103143 def set_server_address (self , host : str , port : int ):
144+ """
145+ Set the address of the current server in HAProxy.
146+ :param host: Host of the new server
147+ :param port: Port of the new server
148+ """
104149 self .send_command (
105150 [
106151 f"set server { self .haproxy_backend } /{ self .haproxy_server_name } addr { host } " , # noqa: E501
@@ -110,6 +155,10 @@ def set_server_address(self, host: str, port: int):
110155 )
111156
112157 def subscriber (self ):
158+ """
159+ Subscribe to Redis Sentinel events and handle master failover (Blocking).
160+ :return: None
161+ """
113162 pubsub = self .conn .pubsub ()
114163 pubsub .subscribe ("+switch-master" )
115164 for message in pubsub .listen ():
@@ -123,38 +172,58 @@ def subscriber(self):
123172 self .set_server_address (master_info [3 ], int (master_info [4 ]))
124173
125174 def set_initial_server (self ):
175+ """
176+ Set the initial server in HAProxy.
177+ :return: None
178+ """
179+ info ("Setting initial server..." )
126180 self .remove_current_server ()
127181 return self .add_server (self .get_master_address ())
128182
129183 def haproxy_server_checker (self ):
184+ """
185+ Check the HAProxy server status and set the initial server if needed.
186+ :return: None
187+ """
130188 stats : list [list [dict | None ] | None ] | None = orjson .loads (
131189 self .send_command (
132190 f"show stat { self .haproxy_backend } 4 -1 json" ,
133191 log_data = False ,
134192 )
135193 )
136- if stats in (None , [], {}):
194+ if is_empty (stats ):
195+ info ("Empty data in stats, Setting initial server..." )
137196 return self .set_initial_server ()
138- for group in stats :
139- if group in (None , [], {}):
197+ for group in stats : # type: ignore
198+ if is_empty (group ):
199+ info ("Empty group in stats, Setting initial server..." )
140200 return self .set_initial_server ()
141- for item in group :
142- if (item is None ) or (not isinstance (item , dict )):
201+ for item in group : # type: ignore
202+ if is_empty (item ) or (not isinstance (item , dict )):
143203 continue
144204 field = item .get ("field" , {})
145- if (field is None ) or (not isinstance (field , dict )):
205+ if is_empty (field ) or (not isinstance (field , dict )):
146206 continue
147207 if field .get ("name" ) == "addr" :
148208 addr_value = item .get ("value" , {}).get ("value" , "" )
149- if not addr_value or len (addr_value ) < 0 :
209+ if is_empty (addr_value ):
210+ info ("Empty addr value in stats, Setting initial server..." ) # noqa: E501
150211 return self .set_initial_server ()
151212
152213 def haproxy_server_checker_worker (self ):
214+ """
215+ Worker for checking the HAProxy server status (Blocking).
216+ :return: None
217+ """
153218 while True :
154219 self .haproxy_server_checker ()
155220 sleep (2 )
156221
157222 def start_worker (self ):
223+ """
224+ Start the worker processes for subscriber and HAProxy server checker (Blocking).
225+ :return: None
226+ """
158227 subscriber_process = Process (target = self .subscriber , name = "subscriber" )
159228 haproxy_server_checker_process = Process (
160229 target = self .haproxy_server_checker_worker ,
0 commit comments