-
Notifications
You must be signed in to change notification settings - Fork 12
Expand file tree
/
Copy pathnova-chatmix.py
More file actions
executable file
·236 lines (193 loc) · 7.78 KB
/
nova-chatmix.py
File metadata and controls
executable file
·236 lines (193 loc) · 7.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#!/usr/bin/python3
# Licensed under the 0BSD
from signal import SIGINT, SIGTERM, signal
from subprocess import Popen, check_output
from hid import device
from hid import enumerate as hidenumerate
CMD_PACTL = "pactl"
CMD_PWLOOPBACK = "pw-loopback"
class ChatMix:
# Create virtual pipewire sinks
def __init__(self, output_sink: str, main_sink: str, chat_sink: str):
self.main_sink = main_sink
self.chat_sink = chat_sink
self.main_sink_process = self._create_virtual_sink(main_sink, output_sink)
self.chat_sink_process = self._create_virtual_sink(chat_sink, output_sink)
def set_main_volume(self, volume: int):
self._set_volume(self.main_sink, volume)
def set_chat_volume(self, volume: int):
self._set_volume(self.chat_sink, volume)
def set_volumes(self, main_volume: int, chat_volume: int):
self.set_main_volume(main_volume)
self.set_chat_volume(chat_volume)
def close(self):
self.main_sink_process.terminate()
self.chat_sink_process.terminate()
def _create_virtual_sink(self, name: str, output_sink: str) -> Popen:
return Popen(
[
CMD_PWLOOPBACK,
"-P",
output_sink,
"--capture-props=media.class=Audio/Sink",
"-n",
name,
]
)
def _set_volume(self, sink: str, volume: int):
Popen([CMD_PACTL, "set-sink-volume", f"input.{sink}", f"{volume}%"])
class NovaProWireless:
# USB IDs
VID = 0x1038
PID = 0x12E0
# bInterfaceNumber
INTERFACE = 0x4
# HID Message length
MSGLEN = 63
# Message read timeout
READ_TIMEOUT = 1000
# First byte controls data direction.
TX = 0x6 # To base station.
RX = 0x7 # From base station.
# Second Byte
# This is a very limited list of options, you can control way more. I just haven't implemented those options (yet)
## As far as I know, this only controls the icon.
OPT_SONAR_ICON = 0x8D
## Enabling this option enables the ability to switch between volume and ChatMix.
OPT_CHATMIX_ENABLE = 0x49
## Volume controls, 1 byte
OPT_VOLUME = 0x25
## ChatMix controls, 2 bytes show and control game and chat volume.
OPT_CHATMIX = 0x45
## EQ controls, 2 bytes show and control which band and what value.
OPT_EQ = 0x31
## EQ preset controls, 1 byte sets and shows enabled preset. Preset 4 is the custom preset required for OPT_EQ.
OPT_EQ_PRESET = 0x2E
# PipeWire Names
## String used to automatically select output sink
PW_OUTPUT_SINK_AUTODETECT = "SteelSeries_Arctis_Nova_Pro_Wireless"
## Names of virtual sound devices
PW_GAME_SINK = "NovaGame"
PW_CHAT_SINK = "NovaChat"
# Keeps track of enabled features for when close() is called
CHATMIX_CONTROLS_ENABLED = False
SONAR_ICON_ENABLED = False
# Stops processes when program exits
CLOSE = False
# Device not found error string
ERR_NOTFOUND = "Device not found"
# Selects correct device, and makes sure we can control it
def __init__(self, output_sink=None):
# Find HID device path
devpath = None
for hiddev in hidenumerate(self.VID, self.PID):
if hiddev["interface_number"] == self.INTERFACE:
devpath = hiddev["path"]
break
if not devpath:
raise DeviceNotFoundException
# Try to automatically detect output sink, this is skipped if output_sink is given
if not output_sink:
sinks = (
check_output([CMD_PACTL, "list", "sinks", "short"]).decode().split("\n")
)
for sink in sinks[:-1]:
sink_name = sink.split("\t")[1]
if self.PW_OUTPUT_SINK_AUTODETECT in sink_name:
output_sink = sink_name
self.dev = device()
self.dev.open_path(devpath)
self.dev.set_nonblocking(True)
self.output_sink = output_sink
# Enables/Disables chatmix controls
def set_chatmix_controls(self, state: bool):
assert self.dev, self.ERR_NOTFOUND
self.dev.write(
self._create_msgdata((self.TX, self.OPT_CHATMIX_ENABLE, int(state))),
)
self.CHATMIX_CONTROLS_ENABLED = state
# Enables/Disables Sonar Icon
def set_sonar_icon(self, state: bool):
assert self.dev, self.ERR_NOTFOUND
self.dev.write(
self._create_msgdata((self.TX, self.OPT_SONAR_ICON, int(state))),
)
self.SONAR_ICON_ENABLED = state
# Sets Volume
def set_volume(self, attenuation: int):
assert self.dev, self.ERR_NOTFOUND
self.dev.write(
self._create_msgdata((self.TX, self.OPT_VOLUME, attenuation)),
)
# Sets EQ preset
def set_eq_preset(self, preset: int):
assert self.dev, self.ERR_NOTFOUND
self.dev.write(
self._create_msgdata((self.TX, self.OPT_EQ_PRESET, preset)),
)
# ChatMix implementation
# Continuously read from base station and ignore everything but ChatMix messages (OPT_CHATMIX)
def chatmix_volume_control(self, chatmix: ChatMix):
assert self.dev, self.ERR_NOTFOUND
while not self.CLOSE:
try:
msg = self.dev.read(self.MSGLEN, self.READ_TIMEOUT)
if not msg or msg[1] is not self.OPT_CHATMIX:
continue
# 4th and 5th byte contain ChatMix data
gamevol = msg[2]
chatvol = msg[3]
# Actually change volume. Everytime you turn the dial, both volumes are set to the correct level
chatmix.set_volumes(gamevol, chatvol)
except OSError:
print("Device was probably disconnected, exiting.")
self.CLOSE = True
# Remove virtual sinks on exit
chatmix.close()
# Prints output from base station. `debug` argument enables raw output.
def print_output(self, debug: bool = False):
assert self.dev
while not self.CLOSE:
msg = self.dev.read(self.MSGLEN, self.READ_TIMEOUT)
if debug:
print(msg)
match msg[1]:
case self.OPT_VOLUME:
print(f"Volume: -{msg[2]}")
case self.OPT_CHATMIX:
print(f"Game Volume: {msg[2]} - Chat Volume: {msg[3]}")
case self.OPT_EQ:
print(f"EQ: Bar: {msg[2]} - Value: {(msg[3] - 20) / 2}")
case self.OPT_EQ_PRESET:
print(f"EQ Preset: {msg[2]}")
case _:
print("Unknown Message")
# Terminates processes and disables features
def close(self, signum, frame):
self.CLOSE = True
if self.CHATMIX_CONTROLS_ENABLED:
self.set_chatmix_controls(False)
if self.SONAR_ICON_ENABLED:
self.set_sonar_icon(False)
# Takes a tuple of ints and turns it into bytes with the correct length padded with zeroes
def _create_msgdata(self, data: tuple[int, ...]) -> bytes:
return bytes(data).ljust(self.MSGLEN, b"\0")
class DeviceNotFoundException(Exception):
pass
# When run directly, just start the ChatMix implementation. (And activate the icon, just for fun)
if __name__ == "__main__":
try:
nova = NovaProWireless()
nova.set_sonar_icon(state=True)
nova.set_chatmix_controls(state=True)
signal(SIGINT, nova.close)
signal(SIGTERM, nova.close)
assert nova.output_sink, "Output sink not set"
chatmix = ChatMix(
output_sink=nova.output_sink,
main_sink=nova.PW_GAME_SINK,
chat_sink=nova.PW_CHAT_SINK,
)
nova.chatmix_volume_control(chatmix=chatmix)
except DeviceNotFoundException:
print("Device not found, exiting.")