2525from typing import Any , Dict , Optional , Tuple
2626
2727import boto3
28+ from botocore .exceptions import BotoCoreError , ClientError
2829
2930from src .utils .trace_logging import log_payload_at_trace
3031from src .writers .writer import Writer
@@ -52,17 +53,31 @@ class WriterPostgres(Writer):
5253
5354 def __init__ (self , config : Dict [str , Any ]) -> None :
5455 super ().__init__ (config )
55- secret_name = os .environ .get ("POSTGRES_SECRET_NAME" , "" )
56- secret_region = os .environ .get ("POSTGRES_SECRET_REGION" , "" )
57-
58- if secret_name and secret_region :
59- aws_secrets = boto3 .Session ().client (service_name = "secretsmanager" , region_name = secret_region )
60- postgres_secret = aws_secrets .get_secret_value (SecretId = secret_name )["SecretString" ]
61- self ._db_config : Dict [str , Any ] = json .loads (postgres_secret )
62- else :
56+ self ._secret_name = os .environ .get ("POSTGRES_SECRET_NAME" , "" )
57+ self ._secret_region = os .environ .get ("POSTGRES_SECRET_REGION" , "" )
58+ self ._db_config : Optional [Dict [str , Any ]] = None
59+ logger .debug ("Initialized PostgreSQL writer" )
60+
61+ def _load_db_config (self ) -> None :
62+ """
63+ Load database config from AWS Secrets Manager.
64+ """
65+ if not self ._secret_name or not self ._secret_region :
6366 self ._db_config = {"database" : "" }
67+ return
6468
65- logger .debug ("Initialized PostgreSQL writer" )
69+ aws_secrets = boto3 .Session ().client (service_name = "secretsmanager" , region_name = self ._secret_region )
70+ postgres_secret = aws_secrets .get_secret_value (SecretId = self ._secret_name )["SecretString" ]
71+ self ._db_config = json .loads (postgres_secret )
72+ logger .debug ("Loaded PostgreSQL config from Secrets Manager" )
73+
74+ def _ensure_db_config (self ) -> Dict [str , Any ]:
75+ """
76+ Ensure database config is loaded and return it.
77+ """
78+ if self ._db_config is None :
79+ self ._load_db_config ()
80+ return self ._db_config # type: ignore[return-value]
6681
6782 def _postgres_edla_write (self , cursor : Any , table : str , message : Dict [str , Any ]) -> None :
6883 """
@@ -260,7 +275,9 @@ def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optiona
260275 Tuple of (success: bool, error_message: Optional[str]).
261276 """
262277 try :
263- if not self ._db_config .get ("database" ):
278+ db_config = self ._ensure_db_config ()
279+
280+ if not db_config .get ("database" ):
264281 logger .debug ("No Postgres - skipping Postgres writer" )
265282 return True , None
266283 if psycopg2 is None :
@@ -270,11 +287,11 @@ def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optiona
270287 log_payload_at_trace (logger , "Postgres" , topic_name , message )
271288
272289 with psycopg2 .connect ( # type: ignore[attr-defined]
273- database = self . _db_config ["database" ],
274- host = self . _db_config ["host" ],
275- user = self . _db_config ["user" ],
276- password = self . _db_config ["password" ],
277- port = self . _db_config ["port" ],
290+ database = db_config ["database" ],
291+ host = db_config ["host" ],
292+ user = db_config ["user" ],
293+ password = db_config ["password" ],
294+ port = db_config ["port" ],
278295 ) as connection :
279296 with connection .cursor () as cursor :
280297 if topic_name == "public.cps.za.dlchange" :
@@ -290,7 +307,7 @@ def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optiona
290307
291308 connection .commit ()
292309 except (RuntimeError , PsycopgError ) as e :
293- err_msg = f"The Postgres writer with failed unknown error: { str (e )} "
310+ err_msg = f"The Postgres writer failed with unknown error: { str (e )} "
294311 logger .exception (err_msg )
295312 return False , err_msg
296313
@@ -303,16 +320,22 @@ def check_health(self) -> Tuple[bool, str]:
303320 Returns:
304321 Tuple of (is_healthy: bool, message: str).
305322 """
306- if not self ._db_config .get ("database" ):
323+ # Checking if Postgres intentionally disabled
324+ if not self ._secret_name or not self ._secret_region :
307325 return True , "not configured"
308326
309- if not self ._db_config .get ("host" ):
310- return False , "host not configured"
311- if not self ._db_config .get ("user" ):
312- return False , "user not configured"
313- if not self ._db_config .get ("password" ):
314- return False , "password not configured"
315- if not self ._db_config .get ("port" ):
316- return False , "port not configured"
327+ try :
328+ db_config = self ._ensure_db_config ()
329+ logger .debug ("PostgreSQL config loaded during health check" )
330+ except (BotoCoreError , ClientError ) as err :
331+ return False , str (err )
332+
333+ # Validate database configuration fields
334+ if not db_config .get ("database" ):
335+ return True , "database not configured"
336+
337+ missing_fields = [field for field in ("host" , "user" , "password" , "port" ) if not db_config .get (field )]
338+ if missing_fields :
339+ return False , f"{ missing_fields [0 ]} not configured"
317340
318341 return True , "ok"
0 commit comments