forked from CoplayDev/unity-mcp
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathport_discovery.py
More file actions
330 lines (282 loc) · 12.5 KB
/
port_discovery.py
File metadata and controls
330 lines (282 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""
Port discovery utility for MCP for Unity Server.
What changed and why:
- Unity now writes a per-project port file named like
`~/.unity-mcp/unity-mcp-port-<hash>.json` to avoid projects overwriting
each other's saved port. The legacy file `unity-mcp-port.json` may still
exist.
- This module now scans for both patterns, prefers the most recently
modified file, and verifies that the port is actually a MCP for Unity listener
(quick socket connect + ping) before choosing it.
"""
import glob
import json
import logging
import os
from datetime import datetime
from pathlib import Path
import socket
import struct
from models.models import UnityInstanceInfo
logger = logging.getLogger("mcp-for-unity-server")
class PortDiscovery:
"""Handles port discovery from Unity Bridge registry"""
REGISTRY_FILE = "unity-mcp-port.json" # legacy single-project file
DEFAULT_PORT = 6400
CONNECT_TIMEOUT = 0.3 # seconds, keep this snappy during discovery
@staticmethod
def get_registry_path() -> Path:
"""Get the path to the port registry file"""
return PortDiscovery.get_registry_dir() / PortDiscovery.REGISTRY_FILE
@staticmethod
def get_registry_dir() -> Path:
env_dir = os.environ.get("UNITY_MCP_STATUS_DIR")
if env_dir:
return Path(env_dir)
return Path.home() / ".unity-mcp"
@staticmethod
def list_candidate_files() -> list[Path]:
"""Return candidate registry files, newest first.
Includes hashed per-project files and the legacy file (if present).
"""
base = PortDiscovery.get_registry_dir()
hashed = sorted(
(Path(p) for p in glob.glob(str(base / "unity-mcp-port-*.json"))),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
legacy = PortDiscovery.get_registry_path()
if legacy.exists():
# Put legacy at the end so hashed, per-project files win
hashed.append(legacy)
return hashed
@staticmethod
def _try_probe_unity_mcp(port: int) -> bool:
"""Quickly check if a MCP for Unity listener is on this port.
Uses Unity's framed protocol: receives handshake, sends framed ping, expects framed pong.
"""
try:
with socket.create_connection(("127.0.0.1", port), PortDiscovery.CONNECT_TIMEOUT) as s:
s.settimeout(PortDiscovery.CONNECT_TIMEOUT)
try:
# 1. Receive handshake from Unity
handshake = s.recv(512)
if not handshake or b"FRAMING=1" not in handshake:
# Try legacy mode as fallback
s.sendall(b"ping")
data = s.recv(512)
return data and b'"message":"pong"' in data
# 2. Send framed ping command
# Frame format: 8-byte length header (big-endian uint64) + payload
payload = b"ping"
header = struct.pack('>Q', len(payload))
s.sendall(header + payload)
# 3. Receive framed response
# Helper to receive exact number of bytes
def _recv_exact(expected: int) -> bytes | None:
chunks = bytearray()
while len(chunks) < expected:
chunk = s.recv(expected - len(chunks))
if not chunk:
return None
chunks.extend(chunk)
return bytes(chunks)
response_header = _recv_exact(8)
if response_header is None:
return False
response_length = struct.unpack('>Q', response_header)[0]
if response_length > 10000: # Sanity check
return False
response = _recv_exact(response_length)
if response is None:
return False
return b'"message":"pong"' in response
except Exception as e:
logger.debug(f"Port probe failed for {port}: {e}")
return False
except Exception as e:
logger.debug(f"Connection failed for port {port}: {e}")
return False
@staticmethod
def _read_latest_status() -> dict | None:
try:
base = PortDiscovery.get_registry_dir()
status_files = sorted(
(Path(p)
for p in glob.glob(str(base / "unity-mcp-status-*.json"))),
key=lambda p: p.stat().st_mtime,
reverse=True,
)
if not status_files:
return None
with status_files[0].open('r') as f:
return json.load(f)
except Exception:
return None
@staticmethod
def discover_unity_port() -> int:
"""
Discover Unity port by scanning per-project and legacy registry files.
Prefer the newest file whose port responds; fall back to first parsed
value; finally default to 6400.
Returns:
Port number to connect to
"""
# Prefer the latest heartbeat status if it points to a responsive port
status = PortDiscovery._read_latest_status()
if status:
port = status.get('unity_port')
if isinstance(port, int) and PortDiscovery._try_probe_unity_mcp(port):
logger.info(f"Using Unity port from status: {port}")
return port
candidates = PortDiscovery.list_candidate_files()
first_seen_port: int | None = None
for path in candidates:
try:
with open(path, 'r') as f:
cfg = json.load(f)
unity_port = cfg.get('unity_port')
if isinstance(unity_port, int):
if first_seen_port is None:
first_seen_port = unity_port
if PortDiscovery._try_probe_unity_mcp(unity_port):
logger.info(
f"Using Unity port from {path.name}: {unity_port}")
return unity_port
except Exception as e:
logger.warning(f"Could not read port registry {path}: {e}")
if first_seen_port is not None:
logger.info(
f"No responsive port found; using first seen value {first_seen_port}")
return first_seen_port
# Fallback to default port
logger.info(
f"No port registry found; using default port {PortDiscovery.DEFAULT_PORT}")
return PortDiscovery.DEFAULT_PORT
@staticmethod
def get_port_config() -> dict | None:
"""
Get the most relevant port configuration from registry.
Returns the most recent hashed file's config if present,
otherwise the legacy file's config. Returns None if nothing exists.
Returns:
Port configuration dict or None if not found
"""
candidates = PortDiscovery.list_candidate_files()
if not candidates:
return None
for path in candidates:
try:
with open(path, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(
f"Could not read port configuration {path}: {e}")
return None
@staticmethod
def _extract_project_name(project_path: str) -> str:
"""Extract project name from Assets path.
Examples:
/Users/sakura/Projects/MyGame/Assets -> MyGame
C:\\Projects\\TestProject\\Assets -> TestProject
"""
if not project_path:
return "Unknown"
try:
# Remove trailing /Assets or \Assets
path = project_path.rstrip('/\\')
if path.endswith('Assets'):
path = path[:-6].rstrip('/\\')
# Get the last directory name
name = os.path.basename(path)
return name if name else "Unknown"
except Exception:
return "Unknown"
@staticmethod
def discover_all_unity_instances() -> list[UnityInstanceInfo]:
"""
Discover all running Unity Editor instances by scanning status files.
Returns:
List of UnityInstanceInfo objects for all discovered instances
"""
instances_by_port: dict[int, tuple[UnityInstanceInfo, datetime]] = {}
base = PortDiscovery.get_registry_dir()
# Scan all status files
status_pattern = str(base / "unity-mcp-status-*.json")
status_files = glob.glob(status_pattern)
for status_file_path in status_files:
try:
status_path = Path(status_file_path)
file_mtime = datetime.fromtimestamp(
status_path.stat().st_mtime)
with status_path.open('r') as f:
data = json.load(f)
# Extract hash from filename: unity-mcp-status-{hash}.json
filename = os.path.basename(status_file_path)
hash_value = filename.replace(
'unity-mcp-status-', '').replace('.json', '')
# Extract information
project_path = data.get('project_path', '')
project_name = PortDiscovery._extract_project_name(
project_path)
port = data.get('unity_port')
is_reloading = data.get('reloading', False)
# Parse last_heartbeat
last_heartbeat = None
heartbeat_str = data.get('last_heartbeat')
if heartbeat_str:
try:
last_heartbeat = datetime.fromisoformat(
heartbeat_str.replace('Z', '+00:00'))
except Exception:
pass
# Verify port is actually responding
is_alive = PortDiscovery._try_probe_unity_mcp(
port) if isinstance(port, int) else False
if not is_alive:
# If Unity says it's reloading and the status is fresh, don't drop the instance.
freshness = last_heartbeat or file_mtime
now = datetime.now()
if freshness.tzinfo:
from datetime import timezone
now = datetime.now(timezone.utc)
age_s = (now - freshness).total_seconds()
if is_reloading and age_s < 60:
pass # keep it, status="reloading"
else:
logger.debug(
f"Instance {project_name}@{hash_value} has heartbeat but port {port} not responding")
continue
freshness = last_heartbeat or file_mtime
existing = instances_by_port.get(port)
if existing:
_, existing_time = existing
if existing_time >= freshness:
logger.debug(
f"Skipping stale status entry {status_path.name} in favor of more recent data for port {port}")
continue
# Create instance info
instance = UnityInstanceInfo(
id=f"{project_name}@{hash_value}",
name=project_name,
path=project_path,
hash=hash_value,
port=port,
status="reloading" if is_reloading else "running",
last_heartbeat=last_heartbeat,
# May not be available in current version
unity_version=data.get('unity_version'),
project_scoped_tools=data.get('project_scoped_tools', False),
)
instances_by_port[port] = (instance, freshness)
logger.debug(
f"Discovered Unity instance: {instance.id} on port {instance.port}")
except Exception as e:
logger.debug(
f"Failed to parse status file {status_file_path}: {e}")
continue
deduped_instances = [entry[0] for entry in sorted(
instances_by_port.values(), key=lambda item: item[1], reverse=True)]
logger.info(
f"Discovered {len(deduped_instances)} Unity instances (after de-duplication by port)")
return deduped_instances