-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathcc_notifier.py
More file actions
988 lines (814 loc) · 32.9 KB
/
cc_notifier.py
File metadata and controls
988 lines (814 loc) · 32.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
#!/usr/bin/env python3
"""
cc-notifier - macOS notification system for Claude Code hooks
Note to AI: YOU MUST READ ./cc_notifier.context.md BEFORE ANALYZING OR WORKING WITH THIS FILE. It contains important details about this file.
"""
import fcntl
import json
import os
import re
import shlex
import socket
import subprocess
import sys
import time
import urllib.error
import urllib.parse
import urllib.request
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Callable, Optional
# Constants and configuration
VERSION = "0.3.0"
SESSION_DIR = Path("/tmp/cc_notifier")
CLEANUP_AGE_SECONDS = 5 * 24 * 60 * 60
NOTIFICATION_DEDUPLICATION_THRESHOLD_SECONDS = 2.0
MAX_LOG_LINES = 2250 # Trigger trim when exceeded
TRIM_TO_LINES = 1250 # Keep newest lines after trim
HAMMERSPOON_CLI = "/Applications/Hammerspoon.app/Contents/Frameworks/hs/hs"
TERMINAL_NOTIFIER = "/opt/homebrew/bin/terminal-notifier"
PUSH_IDLE_CHECK_INTERVALS_DESKTOP = [3, 20]
PUSH_IDLE_CHECK_INTERVALS_REMOTE = [4]
PUSH_IDLE_CHECK_INTERVALS_ATTACHED = [3, 20]
# Debug configuration
DEBUG = False
# Global state for threading app path to error handler
_CURRENT_APP_PATH: Optional[str] = None
def handle_command_errors(
command_name: str,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""Decorator to handle command errors with consistent logging and exit."""
def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return func(*args, **kwargs)
except Exception as e:
log_error(f"Command '{command_name}' failed", e)
sys.exit(1)
return wrapper
return decorator
# ============================================================================
# COMMAND LINE INTERFACE - Main Entry Point and Command Dispatch
# ============================================================================
def main() -> None:
"""Main entry point for cc-notifier command."""
# Guard against direct execution in hooks
if not os.getenv("CC_NOTIFIER_WRAPPER"):
print(
"ERROR: cc_notifier.py should not be run directly in Claude Code hooks.",
file=sys.stderr,
)
print("Use: cc-notifier wrapper instead", file=sys.stderr)
print("Running directly will block Claude Code execution!", file=sys.stderr)
sys.exit(1)
global DEBUG
if "--debug" in sys.argv:
DEBUG = True
sys.argv.remove("--debug")
command = sys.argv[1] if len(sys.argv) > 1 else "help"
debug_log(f"Command: {command}")
if command in ("--version", "-v"):
print(f"cc-notifier {VERSION}")
elif command == "init":
cmd_init()
elif command == "notify":
cmd_notify()
elif command == "cleanup":
cmd_cleanup()
else:
show_help()
sys.exit(1)
@handle_command_errors("init")
def cmd_init() -> None:
"""Initialize session by capturing focused window ID and app path."""
hook_data = HookData.from_stdin()
iterm2_session_id = ""
if is_remote_session():
window_id, app_path = "REMOTE", "REMOTE"
debug_log("Remote session detected, skipping window capture")
else:
try:
window_id, app_path = get_focused_window_id()
if is_iterm2_app(app_path):
iterm2_session_id = get_iterm2_focused_session_id()
except (RuntimeError, OSError) as e:
window_id, app_path = "UNAVAILABLE", "UNAVAILABLE"
debug_log(f"Window capture failed, continuing without: {e}")
tmux_session_id = get_tmux_session_id() or ""
save_window_id(
hook_data.session_id, window_id, app_path, tmux_session_id, iterm2_session_id
)
@handle_command_errors("notify")
def cmd_notify() -> None:
"""Send intelligent notification if user switched away from original window."""
global _CURRENT_APP_PATH
hook_data = HookData.from_stdin()
session_file = SESSION_DIR / hook_data.session_id
if check_deduplication(session_file):
return
lines = session_file.read_text().strip().split("\n")
original_window_id = lines[0]
app_path = lines[1]
tmux_session_id = lines[3] if len(lines) > 3 else ""
iterm2_session_id = lines[4] if len(lines) > 4 else ""
# Set global app path for error handling
_CURRENT_APP_PATH = app_path
# Local notifications only in desktop mode
if not is_remote_session():
try:
send_local_notification_if_needed(
hook_data,
original_window_id,
app_path,
tmux_session_id,
iterm2_session_id,
)
except (RuntimeError, OSError) as e:
log_error("Local notification failed, continuing to push", e)
# Push notifications if configured
push_config = PushConfig.from_env()
if push_config:
if tmux_session_id and is_tmux_session_attached(tmux_session_id):
debug_log(
f"tmux session {tmux_session_id} attached - using extended idle check"
)
intervals = PUSH_IDLE_CHECK_INTERVALS_ATTACHED
elif is_remote_session():
intervals = PUSH_IDLE_CHECK_INTERVALS_REMOTE
else:
intervals = PUSH_IDLE_CHECK_INTERVALS_DESKTOP
debug_log(f"Push idle check intervals: {intervals}")
check_idle_and_notify_push(hook_data, intervals)
@handle_command_errors("cleanup")
def cmd_cleanup() -> None:
"""Clean up session files and perform age-based maintenance."""
hook_data = HookData.from_stdin()
cleanup_session(hook_data.session_id)
def show_help() -> None:
"""Display help information."""
print(f"""cc-notifier {VERSION}
Usage: cc-notifier [--debug] {{init|notify|cleanup|--version}}
Commands:
init - Initialize session (capture focused window)
notify - Send notification if user switched away (local + push)
cleanup - Clean up session files
--version - Show version information
Options:
--debug - Enable debug logging with timestamps
macOS notification system for Claude Code hooks with push notification support.
Set PUSHOVER_API_TOKEN and PUSHOVER_USER_KEY to enable push notifications.""")
# ============================================================================
# CORE UTILITIES - Session Management and Data Structures
# ============================================================================
@dataclass
class HookData:
"""Data structure for Claude Code hook events."""
session_id: str
cwd: str = ""
hook_event_name: str = "Stop"
message: str = ""
@classmethod
def from_stdin(cls) -> "HookData":
"""Parse hook data from JSON stdin input."""
try:
data = json.loads(sys.stdin.read())
valid_fields = {"session_id", "cwd", "hook_event_name", "message"}
filtered_data = {k: v for k, v in data.items() if k in valid_fields and v}
hook_data = cls(**filtered_data)
debug_log(f"Hook: {hook_data.session_id}, {hook_data.hook_event_name}")
return hook_data
except json.JSONDecodeError as err:
raise ValueError("Invalid JSON input from stdin") from err
def check_deduplication(session_file: Path) -> bool:
"""Check if notification should be deduplicated. Returns True if should skip."""
try:
with open(session_file, "r+") as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
lines = f.read().strip().split("\n")
# Lines: [0]=window_id, [1]=app_name, [2]=timestamp, [3]=tmux_session_id (optional)
if (
time.time() - float(lines[2])
< NOTIFICATION_DEDUPLICATION_THRESHOLD_SECONDS
):
return True
app_path = lines[1] if len(lines) > 1 else ""
tmux_id = lines[3] if len(lines) > 3 else ""
iterm2_session_id = lines[4] if len(lines) > 4 else ""
f.seek(0)
updated_content = f"{lines[0]}\n{app_path}\n{time.time()}\n{tmux_id}"
if iterm2_session_id:
updated_content += f"\n{iterm2_session_id}"
f.write(updated_content)
f.truncate()
return False
except BlockingIOError:
return True
def send_local_notification_if_needed(
hook_data: HookData,
original_window_id: str,
app_path: str,
tmux_session_id: str = "",
iterm2_session_id: str = "",
) -> None:
"""Send local notification if user switched away from original window.
Detects three "switched away" scenarios:
- User switched to a different window entirely
- User switched iTerm2 tabs within the same window
- User detached/switched tmux sessions within the same window
"""
# Without Hammerspoon, check tmux session before sending
if original_window_id == "UNAVAILABLE":
if tmux_session_id and is_tmux_session_attached(tmux_session_id):
debug_log(
f"Window tracking unavailable but tmux session {tmux_session_id} is attached - suppressing notification"
)
return
debug_log("Window tracking unavailable, sending notification unconditionally")
title, subtitle, message = create_notification_data(hook_data)
send_notification(title=title, subtitle=subtitle, message=message)
return
current_window_id, current_app_path = get_focused_window_id()
iterm2_tab_switched = False
if (
original_window_id == current_window_id
and iterm2_session_id
and is_iterm2_app(app_path)
and is_iterm2_app(current_app_path)
):
current_iterm2_session_id = get_iterm2_focused_session_id()
if current_iterm2_session_id and current_iterm2_session_id != iterm2_session_id:
iterm2_tab_switched = True
debug_log(
"Same iTerm2 window but different session ID - user switched tabs"
)
elif not current_iterm2_session_id:
debug_log(
"Unable to read current iTerm2 session ID - falling back to window/tmux detection"
)
if original_window_id == current_window_id and not iterm2_tab_switched:
# Same window, but check if user switched tmux sessions within it
if tmux_session_id and not is_tmux_session_attached(tmux_session_id):
debug_log(
f"Same window but tmux session {tmux_session_id} detached - user switched tmux sessions"
)
else:
debug_log("User still on original window - no local notification needed")
return
# User switched away - send local notification
title, subtitle, message = create_notification_data(hook_data)
debug_log(
f"Sending local notification: original_window={original_window_id}, current_window={current_window_id}, notification='{title}' | '{subtitle}' | '{message}'"
)
send_notification(
title=title,
subtitle=subtitle,
message=message,
focus_window_id=original_window_id,
focus_iterm2_session_id=iterm2_session_id if is_iterm2_app(app_path) else None,
)
def save_window_id(
session_id: str,
window_id: str,
app_path: str,
tmux_session_id: str = "",
iterm2_session_id: str = "",
) -> None:
"""Save window ID, app path, tmux, and optional iTerm2 session ID."""
SESSION_DIR.mkdir(exist_ok=True)
session_file = SESSION_DIR / session_id
content = f"{window_id}\n{app_path}\n0\n{tmux_session_id}"
if iterm2_session_id:
content += f"\n{iterm2_session_id}"
session_file.write_text(content)
debug_log(
f"Session initialized: window_id={window_id}, app_path={app_path}, tmux={tmux_session_id}, iterm2_session={iterm2_session_id}, session_file={session_file}"
)
def load_window_id(session_id: str) -> str:
"""Load window ID from session file."""
session_file = SESSION_DIR / session_id
lines = session_file.read_text().strip().split("\n")
window_id = lines[0]
debug_log(f"Session restored: window_id={window_id}, session_file={session_file}")
return window_id
def cleanup_session(_: str) -> None:
"""Clean up session files and perform age-based maintenance."""
# Skip session-specific deletion due to Claude Code bug #7911 (session ID mismatch)
cutoff_time = time.time() - CLEANUP_AGE_SECONDS
cleaned_files = 0
for file_path in SESSION_DIR.glob("*"):
if not file_path.is_file():
continue
try:
if file_path.stat().st_mtime < cutoff_time:
file_path.unlink(missing_ok=True)
cleaned_files += 1
except OSError:
continue
if cleaned_files > 0 or DEBUG:
debug_log(
f"Session cleanup completed: removed {cleaned_files} old session files"
)
LOG_FILE = Path.home() / ".cc-notifier" / "cc-notifier.log"
def _trim_log_if_needed() -> None:
"""Trim log file if over MAX_LOG_LINES."""
if not LOG_FILE.exists():
return
lines = LOG_FILE.read_text().splitlines()
if len(lines) <= MAX_LOG_LINES:
return
LOG_FILE.write_text("\n".join(lines[-TRIM_TO_LINES:]) + "\n")
def _write_log_entry(
level: str, message: str, exception: Optional[Exception] = None
) -> None:
"""Write log entry with automatic trimming."""
LOG_FILE.parent.mkdir(exist_ok=True)
_trim_log_if_needed()
entry = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] [{level}] {message}"
if exception:
entry += f" - {type(exception).__name__}: {exception}"
with open(LOG_FILE, "a") as f:
f.write(entry + "\n")
def debug_log(message: str) -> None:
"""Log debug message when DEBUG is enabled."""
if DEBUG:
_write_log_entry("DEBUG", message)
def log_error(error_msg: str, exception: Optional[Exception] = None) -> None:
"""Log errors to file and send notification."""
_write_log_entry("ERROR", error_msg, exception)
# Determine click action: focus app if available, otherwise open log
if _CURRENT_APP_PATH:
execute_action = f'open "{_CURRENT_APP_PATH}"'
else:
execute_action = f"open {LOG_FILE}"
# Send error notification with fallback
try:
run_background_command(
[
TERMINAL_NOTIFIER,
"-title",
"cc-notifier Error",
"-message",
error_msg,
"-sound",
"Basso",
"-execute",
execute_action,
]
)
except Exception:
run_background_command(
[
"osascript",
"-e",
f'display notification "{error_msg}" with title "cc-notifier Error" sound name "Basso"',
]
)
# ============================================================================
# ENVIRONMENT DETECTION - Remote vs Desktop Mode
# ============================================================================
def is_remote_session() -> bool:
"""Detect if running in remote SSH session."""
ssh_conn = os.getenv("SSH_CONNECTION")
ssh_client = os.getenv("SSH_CLIENT")
ssh_tty = os.getenv("SSH_TTY")
is_remote = bool(ssh_conn or ssh_client or ssh_tty)
if is_remote:
detected_by = []
if ssh_conn:
detected_by.append(f"SSH_CONNECTION={ssh_conn}")
if ssh_client:
detected_by.append(f"SSH_CLIENT={ssh_client}")
if ssh_tty:
detected_by.append(f"SSH_TTY={ssh_tty}")
debug_log(f"Remote session detected: {', '.join(detected_by)}")
return is_remote
def get_tmux_session_id() -> Optional[str]:
"""Get the current tmux session ID (e.g. '$20'), or None if not in tmux."""
try:
result = subprocess.run(
["tmux", "display-message", "-p", "#{session_id}"],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0 and result.stdout.strip():
session_id = result.stdout.strip()
debug_log(f"tmux session ID: {session_id}")
return session_id
return None
except (FileNotFoundError, subprocess.TimeoutExpired):
return None
def is_tmux_session_attached(session_id: str) -> bool:
"""Check if a tmux session is currently attached (has active clients).
Args:
session_id: tmux session ID (e.g. '$20')
Returns:
True if attached count > 0, False otherwise.
"""
try:
result = subprocess.run(
[
"tmux",
"list-sessions",
"-f",
f"#{{==:#{{session_id}},{session_id}}}",
"-F",
"#{session_attached}",
],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode == 0 and result.stdout.strip():
attached_count = int(result.stdout.strip())
debug_log(f"tmux session {session_id} attached count: {attached_count}")
return attached_count > 0
return False
except (FileNotFoundError, subprocess.TimeoutExpired, ValueError):
return False
# ============================================================================
# HAMMERSPOON INTEGRATION - Cross-Space Window Management
# ============================================================================
def get_focused_window_id() -> tuple[str, str]:
"""Get the currently focused window ID and app path using Hammerspoon CLI.
Returns:
Tuple of (window_id, app_path)
"""
try:
output = run_command(
[
HAMMERSPOON_CLI,
"-c",
"local w=hs.window.focusedWindow(); if w then local app=w:application(); print(w:id()..'|'..(app and app:path() or 'UNKNOWN')) else print('ERROR') end",
]
)
if output == "ERROR" or not output or "|" not in output:
raise RuntimeError("Failed to get focused window ID from Hammerspoon")
window_id, app_path = output.split("|", 1)
return window_id, app_path
except subprocess.TimeoutExpired as e:
raise RuntimeError(
f"Hammerspoon command timed out after {e.timeout} seconds"
) from e
def is_iterm2_app(app_path: str) -> bool:
"""Return True when app path identifies iTerm2."""
return app_path.endswith("/iTerm.app") or app_path.endswith("/iTerm2.app")
def get_iterm2_focused_session_id() -> str:
"""Get iTerm2 focused session ID, or empty string when unavailable."""
script_lines = [
'tell application "iTerm2"',
'if not running then return ""',
"try",
"return id of current session of current window as text",
"on error",
'return ""',
"end try",
"end tell",
]
cmd = ["osascript"]
for line in script_lines:
cmd.extend(["-e", line])
try:
return run_command(cmd, timeout=5)
except (RuntimeError, subprocess.TimeoutExpired):
return ""
def _build_iterm2_restore_script(iterm2_session_id: str) -> str:
"""Build AppleScript that focuses iTerm2 on a specific session ID."""
escaped_session_id = iterm2_session_id.replace("\\", "\\\\").replace('"', '\\"')
return f"""tell application "iTerm2"
if not running then return
repeat with w in windows
repeat with t in tabs of w
repeat with s in sessions of t
if (id of s as text) is "{escaped_session_id}" then
tell w to select
tell t to select
tell s to select
activate
return
end if
end repeat
end repeat
end repeat
end tell"""
def create_focus_command(
window_id: str, iterm2_session_id: Optional[str] = None
) -> list[str]:
"""
Create the Hammerspoon focus command for cross-space window focusing.
This uses a dual-filter approach to avoid infinite hangs that occur
with setCurrentSpace(nil). The approach combines windows from current
and other spaces, then searches for the target window ID.
If the window cannot be found or focused, shows an error notification.
When iterm2_session_id is provided, chains an AppleScript command after
the Hammerspoon focus to restore the specific iTerm2 tab/session.
Args:
window_id: The window ID to focus
iterm2_session_id: Optional iTerm2 session ID for tab restoration
Returns:
List of command arguments for subprocess execution
"""
# Template for complex dual-filter cross-space window focusing
# This solves the macOS Spaces issue without using setCurrentSpace(nil) which causes hangs
# Shows error notification if window can't be found
focus_script = f"""local current = require('hs.window.filter').new():setCurrentSpace(true):getWindows()
local other = require('hs.window.filter').new():setCurrentSpace(false):getWindows()
for _,w in pairs(other) do table.insert(current, w) end
for _,w in pairs(current) do
if w:id()=={window_id} then
w:focus()
require('hs.timer').usleep(300000)
return
end
end
require('hs.notify').new({{title="cc-notifier", informativeText="Could not restore window focus. Try reopening your terminal or IDE.", soundName="Basso"}}):send()"""
if not iterm2_session_id:
return [HAMMERSPOON_CLI, "-c", focus_script]
hs_cmd = [HAMMERSPOON_CLI, "-c", focus_script]
osascript_cmd = ["osascript", "-e", _build_iterm2_restore_script(iterm2_session_id)]
combined = (
f"{' '.join(shlex.quote(arg) for arg in hs_cmd)}; "
f"{' '.join(shlex.quote(arg) for arg in osascript_cmd)}"
)
return ["/bin/sh", "-c", combined]
# ============================================================================
# NOTIFICATION SYSTEM - macOS Notifications with Click-to-Focus
# ============================================================================
def resolve_title_tokens(hook_data: HookData, template: str) -> dict[str, str]:
"""Build dict of built-in tokens for title formatting.
Tokens: {hostname}, {tmux_session}, {dir}, {cwd}.
Only resolves tokens that appear in the template string to avoid
unnecessary subprocess calls (e.g., tmux when not in tmux).
"""
tokens: dict[str, str] = {
"cwd": hook_data.cwd or "",
"dir": Path(hook_data.cwd).name if hook_data.cwd else "",
}
try:
tokens["hostname"] = socket.gethostname()
except Exception:
tokens["hostname"] = ""
if "{tmux_session}" in template:
try:
result = subprocess.run(
["tmux", "display-message", "-p", "#S"],
capture_output=True,
text=True,
timeout=2,
)
tokens["tmux_session"] = (
result.stdout.strip() if result.returncode == 0 else ""
)
except (FileNotFoundError, subprocess.TimeoutExpired):
tokens["tmux_session"] = ""
return tokens
def format_title(hook_data: HookData) -> Optional[str]:
"""Format notification title from CC_NOTIFIER_TITLE_FORMAT env var.
Supports built-in tokens ({hostname}, {tmux_session}, {dir}, {cwd})
and generic env var access via {env:VAR_NAME}.
Returns None when CC_NOTIFIER_TITLE_FORMAT is not set, allowing callers
to fall back to their own defaults.
"""
template = os.getenv("CC_NOTIFIER_TITLE_FORMAT")
if not template:
return None
# Pre-pass: resolve {env:VAR_NAME} tokens before .format()
template = re.sub(
r"\{env:([^}]+)\}",
lambda m: os.getenv(m.group(1), ""),
template,
)
tokens = resolve_title_tokens(hook_data, template)
return template.format(**tokens)
def create_notification_data(
hook_data: HookData, for_push: bool = False
) -> tuple[str, str, str]:
"""Create complete notification data (title, subtitle, message)."""
# Generate subtitle and message
subtitle = Path(hook_data.cwd).name if hook_data.cwd else "Task Completed"
message = (
hook_data.message
if (hook_data.hook_event_name == "Notification" and hook_data.message)
else "Completed task"
)
# Generate title: custom format takes over when set, otherwise use original defaults
custom_title = format_title(hook_data)
if custom_title is not None:
title = custom_title
elif for_push:
title = subtitle
else:
title = "Claude Code 🔔"
# Apply debug decorations
if DEBUG:
if for_push:
now = time.time()
dt = time.localtime(now)
milliseconds = int((now % 1) * 1000)
timestamp = f"{time.strftime('%H:%M:%S', dt)}.{milliseconds:03d}"
title = f"{title} [{timestamp}]"
else:
title = f"\\[DEBUG] {title}"
return title, subtitle, message
def send_notification(
title: str,
subtitle: str,
message: str,
focus_window_id: Optional[str] = None,
focus_iterm2_session_id: Optional[str] = None,
) -> None:
"""Send a macOS notification with optional click-to-focus functionality."""
cmd = [
TERMINAL_NOTIFIER,
"-title",
title,
"-subtitle",
subtitle,
"-message",
message,
"-sound",
"Glass",
"-ignoreDnD",
]
# Add click-to-focus functionality if window ID provided
if focus_window_id:
focus_cmd = create_focus_command(focus_window_id, focus_iterm2_session_id)
execute_cmd = " ".join(shlex.quote(arg) for arg in focus_cmd)
cmd.extend(["-execute", execute_cmd])
# Send notification in background
try:
run_background_command(cmd)
if DEBUG:
debug_log(f"Notification sent: focus_window_id={focus_window_id}")
except Exception as e:
debug_log(f"Notification failed: {type(e).__name__}")
raise
# ============================================================================
# PUSH NOTIFICATIONS - Idle Detection and Pushover Integration
# API Documentation: https://pushover.net/api
# ============================================================================
@dataclass
class PushConfig:
"""Push notification service configuration."""
token: str
user: str
@classmethod
def from_env(cls) -> Optional["PushConfig"]:
"""Create PushConfig from environment variables."""
token = os.getenv("PUSHOVER_API_TOKEN")
user = os.getenv("PUSHOVER_USER_KEY")
if token and user:
return cls(token=token, user=user)
return None
def build_push_url(hook_data: HookData) -> Optional[str]:
"""Build push notification URL from env var template.
Substitutes {cwd}, {session_id}, and all title tokens ({hostname},
{tmux_session}, {dir}, {env:VAR}) with actual values.
Returns:
URL with placeholders substituted, or None if not configured.
"""
url_template = os.getenv("CC_NOTIFIER_PUSH_URL")
if not url_template:
return None
# Pre-pass: resolve {env:VAR_NAME} tokens
url_template = re.sub(
r"\{env:([^}]+)\}",
lambda m: os.getenv(m.group(1), ""),
url_template,
)
tokens = resolve_title_tokens(hook_data, url_template)
tokens["session_id"] = hook_data.session_id
url = url_template.format(**tokens)
debug_log(f"Push URL built: {url}")
return url
def send_pushover_notification(
config: PushConfig, title: str, message: str, url: Optional[str] = None
) -> bool:
"""Send notification via Pushover API.
Args:
config: Pushover API configuration
title: Notification title
message: Notification message
url: Optional URL to open when notification is tapped
Returns:
True if Pushover API returned {"status":1}, False otherwise.
Handles network errors, JSON parsing errors, and API failures gracefully.
"""
# Enforce Pushover API limits: 250 char title, 1024 char message
title = title[:250] if len(title) > 250 else title
message = message[:1024] if len(message) > 1024 else message
data_dict = {
"token": config.token,
"user": config.user,
"title": title,
"message": message,
}
if url:
data_dict["url"] = url
data = urllib.parse.urlencode(data_dict).encode("utf-8")
req = urllib.request.Request(
"https://api.pushover.net/1/messages.json",
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as response:
if response.status == 200:
response_data = json.loads(response.read().decode("utf-8"))
success = bool(response_data.get("status") == 1)
debug_log(
f"Push notification result: status={response.status}, success={success}"
)
return success
debug_log(
f"Push notification result: status={response.status}, success=False"
)
return False
except (urllib.error.URLError, json.JSONDecodeError, KeyError) as e:
debug_log(f"Push notification result: error={type(e).__name__}, success=False")
return False
def get_macos_idle_time() -> int:
"""Get macOS system idle time in seconds using ioreg."""
try:
output = run_command(["ioreg", "-c", "IOHIDSystem"], timeout=5)
for line in output.splitlines():
if "HIDIdleTime" in line:
idle_nanoseconds = int(line.split("=", 1)[1].strip())
return idle_nanoseconds // 1_000_000_000
raise RuntimeError("HIDIdleTime not found in ioreg output")
except subprocess.TimeoutExpired as e:
raise RuntimeError(f"ioreg command timed out after {e.timeout} seconds") from e
def get_tty_idle_time() -> int:
"""Get TTY idle time in seconds based on last read operation (user input)."""
try:
# Use TTY path captured by wrapper before backgrounding
# This gives us the actual terminal device (works with tmux/screen)
tty_path = os.getenv("CC_NOTIFIER_TTY")
if not tty_path:
raise RuntimeError("CC_NOTIFIER_TTY not set by wrapper")
debug_log(f"TTY detection: CC_NOTIFIER_TTY={tty_path!r}")
tty_stat = os.stat(tty_path)
last_read_time = tty_stat.st_atime
current_time = time.time()
idle_seconds = int(current_time - last_read_time)
debug_log(
f"TTY idle: path={tty_path}, st_atime={last_read_time:.1f}, current={current_time:.1f}, idle={idle_seconds}s"
)
return idle_seconds
except (OSError, ValueError) as e:
debug_log(f"TTY idle error: {type(e).__name__}: {e}")
raise RuntimeError("Unable to get TTY idle time") from e
def get_idle_time() -> int:
"""Get idle time in seconds, environment-aware."""
if is_remote_session():
return get_tty_idle_time()
return get_macos_idle_time()
def check_idle_and_notify_push(hook_data: HookData, check_times: list[int]) -> None:
"""Check if user is idle at specified intervals and send push notification if away.
Simple logic: If idle time is less than elapsed time, user was active during check period.
"""
push_config = PushConfig.from_env()
if not push_config:
return
if not check_times:
raise ValueError("check_times cannot be empty")
mode = "remote" if is_remote_session() else "desktop"
debug_log(f"Push check started: mode={mode}")
previous_time = 0
for check_time in check_times:
time.sleep(check_time - previous_time)
try:
idle_time = get_idle_time()
# If idle time < elapsed time, user was active during check period
user_active = idle_time < check_time
debug_log(
f"Push check: elapsed={check_time}s, idle={idle_time}s, "
f"user_active={user_active}"
)
if user_active:
debug_log("Push check exit: User is active")
return
except RuntimeError as e:
debug_log(f"Push check exit: idle detection error ({e})")
return
previous_time = check_time
# User has been idle through all checks, send push notification
title, _, message = create_notification_data(hook_data, for_push=True)
push_url = build_push_url(hook_data)
debug_log(f"Sending push notification: '{title}'")
send_pushover_notification(push_config, title, message, url=push_url)
# ============================================================================
# SUBPROCESS UTILITIES - Common patterns for external command execution
# ============================================================================
def run_command(cmd: list[str], timeout: int = 10) -> str:
"""Run command and return stdout, raising RuntimeError on failure."""
result = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)
if result.returncode != 0:
raise RuntimeError(f"Command failed: {' '.join(cmd)}")
return result.stdout.strip()
def run_background_command(cmd: list[str]) -> None:
"""Run command in background (non-blocking)."""
subprocess.Popen(cmd)
if __name__ == "__main__":
main()