@@ -15,26 +15,46 @@ def mask_email(email: str | None) -> str | None:
15
15
if not email :
16
16
return None
17
17
18
- local , domain = email .split ("@" )
19
- domain_parts = domain .split ("." )
20
- limit = 2
21
-
22
- # Mask local part (keep first char)
23
- if len (local ) <= limit :
24
- masked_local = "*" * len (local )
25
- else :
26
- masked_local = local [0 ] + "*" * (len (local ) - 1 )
27
-
28
- # Mask each domain part except the last one (TLD)
29
- masked_domain_parts = []
30
- for _i , part in enumerate (domain_parts [:- 1 ]): # Process all parts except TLD
31
- if len (part ) <= limit :
32
- masked_part = "*" * len (part )
33
- else :
34
- masked_part = part [0 ] + "*" * (len (part ) - 1 )
35
- masked_domain_parts .append (masked_part )
36
18
37
- # Add TLD unchanged
38
- masked_domain_parts .append (domain_parts [- 1 ])
19
+ try :
20
+ # Basic email format validation
21
+ if email .count ("@" ) != 1 :
22
+ raise ValueError ("Invalid email format: Must contain exactly one '@' symbol" )
23
+
24
+ local , domain = email .split ("@" )
25
+ if not local or not domain :
26
+ raise ValueError ("Invalid email format: Local and domain parts cannot be empty" )
27
+
28
+ domain_parts = domain .split ("." )
29
+ if len (domain_parts ) < 2 : # noqa: PLR2004
30
+ raise ValueError ("Invalid email format: Domain must contain at least one dot" )
39
31
40
- return f"{ masked_local } @{ '.' .join (masked_domain_parts )} "
32
+ limit = 2
33
+
34
+ # Mask local part (keep first char)
35
+ if len (local ) <= limit :
36
+ masked_local = "*" * len (local )
37
+ else :
38
+ masked_local = local [0 ] + "*" * (len (local ) - 1 )
39
+
40
+ # Mask each domain part except the last one (TLD)
41
+ masked_domain_parts = []
42
+ for _i , part in enumerate (domain_parts [:- 1 ]): # Process all parts except TLD
43
+ if not part : # Check for empty parts (consecutive dots)
44
+ raise ValueError ("Invalid email format: Domain parts cannot be empty" )
45
+ if len (part ) <= limit :
46
+ masked_part = "*" * len (part )
47
+ else :
48
+ masked_part = part [0 ] + "*" * (len (part ) - 1 )
49
+ masked_domain_parts .append (masked_part )
50
+
51
+ # Add TLD unchanged
52
+ if not domain_parts [- 1 ]: # Check if TLD is empty
53
+ raise ValueError ("Invalid email format: TLD cannot be empty" )
54
+ masked_domain_parts .append (domain_parts [- 1 ])
55
+
56
+ return f"{ masked_local } @{ '.' .join (masked_domain_parts )} "
57
+ except ValueError :
58
+ raise
59
+ except Exception as e :
60
+ raise ValueError (f"Invalid email format: { str (e )} " ) from e
0 commit comments