-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathapp.py
More file actions
184 lines (147 loc) · 5.33 KB
/
app.py
File metadata and controls
184 lines (147 loc) · 5.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import gradio as gr
import asyncio
import threading
from huggingface_hub import InferenceClient
from robot import robot_instance
# ============================================================
# Dedicated async event loop for robot operations
# ============================================================
robot_loop = asyncio.new_event_loop()
def start_robot_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
threading.Thread(target=start_robot_loop, args=(robot_loop,), daemon=True).start()
def run_async_task(coro):
"""Runs an async coroutine on the robot loop and returns the result."""
future = asyncio.run_coroutine_threadsafe(coro, robot_loop)
return future.result()
# ============================================================
# Sync wrappers for Gradio + MCP exposure
# ============================================================
def get_telemetry_sync():
state = robot_instance.get_state()
return (
f"Telemetry: X={state['x']}, Y={state['y']}, "
f"Heading={state['heading']}°, Distance Traveled={state['distance_traveled']:.2f}"
)
def move_forward_sync(distance: float):
if distance <= 0:
return "Error: Distance must be positive."
try:
result = run_async_task(robot_instance.move_forward(distance))
state = result["current_state"]
return (
f"Moved forward {distance} meters.\n"
f"State: X={state['x']}, Y={state['y']}, Heading={state['heading']}°"
)
except Exception as e:
return f"Error: {e}"
def turn_left_sync():
result = run_async_task(robot_instance.turn_left())
state = result["current_state"]
return (
f"Left turn completed.\n"
f"State: X={state['x']}, Y={state['y']}, Heading={state['heading']}°"
)
def turn_right_sync():
result = run_async_task(robot_instance.turn_right())
state = result["current_state"]
return (
f"Right turn completed.\n"
f"State: X={state['x']}, Y={state['y']}, Heading={state['heading']}°"
)
# ============================================================
# Chat function
# ============================================================
def respond(
message,
history: list[dict[str, str]],
system_message,
max_tokens,
temperature,
top_p,
hf_token: gr.OAuthToken,
):
"""
For more information on `huggingface_hub` Inference API support, please check the docs: https://huggingface.co/docs/huggingface_hub/v0.22.2/en/guides/inference
"""
client = InferenceClient(token=hf_token.token, model="openai/gpt-oss-20b")
messages = [{"role": "system", "content": system_message}]
messages.extend(history)
messages.append({"role": "user", "content": message})
buffer = ""
for msg in client.chat_completion(
messages,
max_tokens=max_tokens,
temperature=temperature,
top_p=top_p,
stream=True,
):
token = msg.choices[0].delta.content or ""
buffer += token
yield buffer
# ============================================================
# Gradio UI + MCP tools
# ============================================================
"""
For information on how to customize the ChatInterface, peruse the gradio docs: https://www.gradio.app/docs/gradio/chatinterface
"""
chatbot = gr.ChatInterface(
fn=respond,
# type="messages",
additional_inputs=[
gr.Textbox(value="You are a friendly Chatbot.", label="System message"),
gr.Slider(1, 2048, value=512, step=1, label="Max new tokens"),
gr.Slider(0.1, 4.0, value=0.7, step=0.1, label="Temperature"),
gr.Slider(0.1, 1.0, value=0.95, step=0.05, label="Top-p"),
],
)
with gr.Blocks(title="Robot Control and Chatbot MCP") as demo:
with gr.Sidebar():
gr.LoginButton()
with gr.Tab("Chatbot"):
chatbot.render()
with gr.Tab("Robot Control MCP"):
gr.Markdown("## Robot Control and Telemetry")
robot_output = gr.Textbox(label="Result / Telemetry")
telemetry_btn = gr.Button("Update Telemetry", variant="secondary")
telemetry_btn.click(
fn=get_telemetry_sync,
inputs=[],
outputs=robot_output,
api_name="get_telemetry",
)
distance_input = gr.Number(label="Forward Distance (meters)", value=1.0)
forward_btn = gr.Button("Move Forward", variant="primary")
forward_btn.click(
fn=move_forward_sync,
inputs=[distance_input],
outputs=robot_output,
api_name="move_forward",
)
left_btn = gr.Button("Turn Left (90°)")
right_btn = gr.Button("Turn Right (90°)")
left_btn.click(
fn=turn_left_sync,
inputs=[],
outputs=robot_output,
api_name="turn_left",
)
right_btn.click(
fn=turn_right_sync,
inputs=[],
outputs=robot_output,
api_name="turn_right",
)
# ============================================================
# Launch
# ============================================================
if __name__ == "__main__":
print("Starting Gradio/MCP server...")
print(f"DEBUG: Gradio version: {gr.__version__}")
demo.launch(
mcp_server=True,
# share=False,
# server_name="localhost",
# server_port=7860,
)