|
11 | 11 | from pydantic import BaseModel |
12 | 12 | from sentry_sdk.scrubber import DEFAULT_DENYLIST, DEFAULT_PII_DENYLIST, EventScrubber |
13 | 13 | from sentry_sdk.types import Event, Hint |
| 14 | +import json |
14 | 15 |
|
15 | 16 | DSN = "https://d14d2ee82f26a3585b2a892fab7fffaa@o4508256143540224.ingest.us.sentry.io/4508256153042944" |
16 | 17 |
|
@@ -79,24 +80,66 @@ def my_before_send(event: Event, hint: Hint) -> Event | None: |
79 | 80 | sentry_sdk.set_user(None) |
80 | 81 |
|
81 | 82 |
|
82 | | -def get_installation_id(file_path="installation_id.txt") -> str: |
83 | | - """Generate or load a unique installation ID.""" |
| 83 | +def get_installation_id( |
| 84 | + file_path: str = "installation_id.txt", is_manual_run: bool = True |
| 85 | +) -> Dict[str, Any]: |
| 86 | + """Generate or load installation data. |
| 87 | +
|
| 88 | + If the file exists and contains a dict, return it. |
| 89 | + If the file exists and contains only the installation ID, return a dict with default values. |
| 90 | + If the file does not exist: |
| 91 | + - If triggered manually by the user (is_manual_run is True), prompt for user_email. |
| 92 | + - Generate a new installation ID and save all data as a dict. |
| 93 | + - If not triggered manually, use default values for user_email. |
| 94 | + """ |
84 | 95 | if os.path.exists(file_path): |
| 96 | + data = { |
| 97 | + "user_email": "old_email@example.com", |
| 98 | + } |
| 99 | + rewrite_data = False |
85 | 100 | with open(file_path, "r") as file: |
86 | | - return file.read().strip() |
| 101 | + content = file.read().strip() |
| 102 | + installation_id = content |
| 103 | + data["installation_id"] = installation_id |
| 104 | + try: |
| 105 | + data = json.loads(content) |
| 106 | + if isinstance(data, dict) and "installation_id" in data: |
| 107 | + return data |
| 108 | + except json.JSONDecodeError: |
| 109 | + rewrite_data = True |
| 110 | + if rewrite_data: |
| 111 | + with open(file_path, "w") as file: |
| 112 | + json.dump(data, file) |
87 | 113 | else: |
88 | 114 | installation_id = str(uuid.uuid4()) |
| 115 | + user_email = "new_email@example.com" |
| 116 | + if is_manual_run: |
| 117 | + print( |
| 118 | + "We need your email to inform you about any urgent security patches or issues detected in testzeus-hercules." |
| 119 | + ) |
| 120 | + n_user_email = input( |
| 121 | + "Please provide your email (or press Enter to skip with empty email): " |
| 122 | + ) |
| 123 | + user_email = n_user_email if n_user_email else user_email |
| 124 | + |
| 125 | + data = { |
| 126 | + "user_email": user_email, |
| 127 | + "installation_id": installation_id, |
| 128 | + } |
89 | 129 | with open(file_path, "w") as file: |
90 | | - file.write(installation_id) |
91 | | - return installation_id |
| 130 | + json.dump(data, file) |
| 131 | + return data |
92 | 132 |
|
93 | 133 |
|
94 | 134 | # Initialize the installation_id |
95 | | -installation_id = get_installation_id() |
| 135 | +installation_data = get_installation_id( |
| 136 | + is_manual_run=os.environ.get("AUTO_MODE", "0") == "0" |
| 137 | +) |
96 | 138 |
|
97 | 139 | # Global event collector with event_type buckets |
98 | 140 | event_collector = { |
99 | | - "installation_id": installation_id, |
| 141 | + "installation_id": installation_data["installation_id"], |
| 142 | + "user_email": installation_data["user_email"], |
100 | 143 | "buckets": {}, |
101 | 144 | "start_time": datetime.now().isoformat(), |
102 | 145 | } |
@@ -144,6 +187,7 @@ def build_final_message() -> Dict[str, Any]: |
144 | 187 | """ |
145 | 188 | message = { |
146 | 189 | "installation_id": event_collector["installation_id"], |
| 190 | + "user_email": event_collector["user_email"], |
147 | 191 | "session_start": event_collector["start_time"], |
148 | 192 | "buckets": { |
149 | 193 | event_type_s: events |
|
0 commit comments