-
Notifications
You must be signed in to change notification settings - Fork 442
Expand file tree
/
Copy pathapi_server.py
More file actions
2764 lines (2465 loc) Β· 117 KB
/
Copy pathapi_server.py
File metadata and controls
2764 lines (2465 loc) Β· 117 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
FastAPI proxy server for OpenClaw RL training data collection.
Adapted from OpenClaw-RL/openclaw-rl/openclaw_api_server.py.
Key changes vs. the original:
- SGLang HTTP forward replaced with Tinker SamplingClient.sample_async
- slime.utils.types.Sample replaced with ConversationSample
- Skills injection via SkillManager
- update_sampling_client() for hot-swapping after weight updates
Architecture mirrors OpenClaw-RL exactly:
- threading.Event for submission gating (passed in from rollout worker)
- queue.Queue for output (passed in from rollout worker)
- start() / stop() threading lifecycle
- Data production is request-driven in FastAPI handlers
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import queue
import re
import threading
import time
from datetime import datetime
from itertools import count
from typing import Any, Optional
import uvicorn
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse, StreamingResponse
from openai import OpenAI
from .config import MetaClawConfig
from .verification import build_verifier_safe
from .verification.base import VerificationResult, VerificationVerdict
from .data_formatter import ConversationSample
from .memory.scope import base_scope, derive_memory_scope
from .prm_scorer import PRMScorer
from .skill_manager import SkillManager
from .utils import run_llm
logger = logging.getLogger(__name__)
_GREEN = "\033[32m"
_YELLOW = "\033[33m"
_RED = "\033[31m"
_CYAN = "\033[36m"
_RESET = "\033[0m"
_NON_STANDARD_BODY_KEYS = {
"session_id",
"session_done",
"turn_type",
"memory_scope",
"user_id",
"workspace_id",
}
# ------------------------------------------------------------------ #
# Helper utilities #
# ------------------------------------------------------------------ #
def _ensure_reasoning_content(messages: list[dict]) -> list[dict]:
"""Ensure every assistant message carries a ``reasoning_content`` field.
Models that support extended thinking expect *all* assistant turns
(including tool-call turns) to include ``reasoning_content``. If the
original conversation history omits it for a given assistant turn, we
fill it in with an empty string so the downstream API does not reject
the request.
"""
out: list[dict] = []
for msg in messages:
if msg.get("role") == "assistant":
if "reasoning_content" in msg and msg["reasoning_content"]:
msg["reasoning"] = msg["reasoning_content"]
elif "reasoning" in msg and msg["reasoning"]:
msg["reasoning_content"] = msg["reasoning"]
else:
msg["reasoning_content"] = ""
msg["reasoning"] = ""
out.append(msg)
return out
def _flatten_message_content(content) -> str:
if isinstance(content, str):
return content
if isinstance(content, list):
parts = [
item.get("text", "")
for item in content
if isinstance(item, dict) and item.get("type") == "text"
]
return " ".join(parts) if parts else ""
return str(content) if content is not None else ""
def _normalize_assistant_content_parts(content: list[dict]) -> tuple[str, list[dict]]:
"""Extract plain text and OpenAI-style tool_calls from assistant content parts."""
text_parts: list[str] = []
tool_calls: list[dict] = []
for i, item in enumerate(content):
if not isinstance(item, dict):
continue
item_type = item.get("type")
if item_type == "text":
text = item.get("text")
if isinstance(text, str) and text:
text_parts.append(text)
elif item_type == "toolCall":
name = item.get("name")
args = item.get("arguments", {})
if not isinstance(args, str):
try:
args = json.dumps(args, ensure_ascii=False)
except Exception:
args = "{}"
tc_id = item.get("id") or f"call_{i}"
tool_calls.append(
{
"id": tc_id,
"type": "function",
"function": {
"name": name or "unknown_tool",
"arguments": args,
},
}
)
return (" ".join(text_parts).strip(), tool_calls)
_THINK_RE = re.compile(r"<think>.*?</think>", re.DOTALL)
_TOOL_HANDLE_RE = re.compile(r"^call_?(?:kimi|xml)_?\d?+$")
_TRAILING_DIGITS_RE = re.compile(r"\d+$")
_FUNCTIONS_PREFIX_RE = re.compile(r"^functions[._]?")
_KIMI_TOOL_CALL_RE = re.compile(
r"<\|tool_call_begin\|>\s*([a-zA-Z0-9_.-]+)(?::\d+)?\s*"
r"<\|tool_call_argument_begin\|>\s*(\{.*?\})\s*"
r"<\|tool_call_end\|>",
re.DOTALL,
)
_QWEN_TOOL_CALL_RE = re.compile(r"<tool_call>\s*(.*?)\s*</tool_call>", re.DOTALL)
def _normalize_tool_name(raw_name: str, args_raw: str) -> str:
"""
Normalize tool names from model output.
"""
name = (raw_name or "").strip()
if not name:
return "unknown_tool"
name = _FUNCTIONS_PREFIX_RE.sub("", name)
parts = name.split(".")
parts = [p for p in parts if not p.isdigit()]
if not parts:
return ""
name = parts[-1]
name = _TRAILING_DIGITS_RE.sub("", name)
name = name.strip("_-.")
if name and not _TOOL_HANDLE_RE.fullmatch(name):
return name
try:
args_obj = json.loads(args_raw or "{}")
except Exception:
args_obj = {}
if isinstance(args_obj, dict):
if isinstance(args_obj.get("command"), str) and args_obj.get("command"):
return "exec"
elif isinstance(args_obj.get("sessionId"), str) and args_obj.get("sessionId"):
return "process"
elif isinstance(args_obj.get("file_path"), str) and args_obj.get("file_path"):
if isinstance(args_obj.get("content"), str) and args_obj.get("content"):
return "write"
else:
return "read"
if "read" in raw_name:
return "read"
elif "write" in raw_name:
return "write"
return "unknown_tool"
def _extract_tool_calls_from_text(text: str) -> tuple[str, list[dict], str]:
"""
Parse tool-call tags embedded in assistant text into OpenAI-style tool_calls.
Supports Kimi markers and Qwen <tool_call> wrappers.
Returns:
(cleaned_text, tool_calls, reasoning_content)
"""
if not text:
return "", [], ""
tool_calls: list[dict] = []
for i, m in enumerate(_KIMI_TOOL_CALL_RE.finditer(text)):
raw_name = (m.group(1) or "").strip()
args_raw = (m.group(2) or "{}").strip()
tool_name = _normalize_tool_name(raw_name, args_raw)
try:
args_obj = json.loads(args_raw)
args_str = json.dumps(args_obj, ensure_ascii=False)
except Exception:
args_str = args_raw if args_raw else "{}"
tool_calls.append(
{
"id": f"call_kimi_{i}",
"type": "function",
"function": {"name": tool_name or "unknown_tool", "arguments": args_str},
}
)
for i, m in enumerate(_QWEN_TOOL_CALL_RE.finditer(text), start=len(tool_calls)):
payload_raw = (m.group(1) or "").strip()
try:
payload = json.loads(payload_raw)
except Exception:
continue
name = (
payload.get("name")
or payload.get("tool_name")
or payload.get("function", {}).get("name")
or "unknown_tool"
)
args = payload.get("arguments") or payload.get("function", {}).get("arguments") or {}
if not isinstance(args, str):
try:
args = json.dumps(args, ensure_ascii=False)
except Exception:
args = "{}"
name = _normalize_tool_name(str(name), args)
tool_calls.append(
{
"id": f"call_xml_{i}",
"type": "function",
"function": {"name": name, "arguments": args},
}
)
# Extract reasoning content from <think> blocks before stripping them.
# Covers all cases:
# a) matched <think>X</think> b) orphan leading: X</think> (no <think>)
# c) orphan trailing: <think>X (no </think>)
reasoning_parts: list[str] = []
# a) Matched pairs: <think>...</think>
for m in re.finditer(r"<think>(.*?)</think>", text, re.DOTALL):
reasoning_parts.append(m.group(1))
# b) Orphan leading </think> with no preceding <think> β content before it is reasoning
if "</think>" in text:
first_close = text.index("</think>")
prefix = text[:first_close]
if "<think>" not in prefix:
reasoning_parts.insert(0, prefix)
# c) Trailing unclosed <think>...EOF
_trailing = re.search(r"<think>((?:(?!</think>).)*)\Z", text, re.DOTALL)
if _trailing:
reasoning_parts.append(_trailing.group(1))
reasoning_content = "\n".join(p.strip() for p in reasoning_parts if p.strip())
# Strip all think-related markup from the clean text.
clean = text
# Remove matched <think>...</think> pairs first.
clean = _THINK_RE.sub("", clean)
# Remove orphan </think> (its content before it was already captured above).
clean = re.sub(r"^[^<]*</think>", "", clean, count=1, flags=re.DOTALL)
clean = clean.replace("</think>", "")
# Remove trailing unclosed <think>...
clean = re.sub(r"<think>(?:(?!</think>).)*\Z", "", clean, flags=re.DOTALL)
# Keep tool call data only in structured field; strip markup from plain text.
clean = re.sub(r"<\|tool_call_begin\|>.*?<\|tool_call_end\|>", "", clean, flags=re.DOTALL)
clean = re.sub(r"<\|tool_calls_section_begin\|>.*?<\|tool_calls_section_end\|>", "", clean, flags=re.DOTALL)
clean = _QWEN_TOOL_CALL_RE.sub("", clean)
clean = clean.strip()
return clean, tool_calls, reasoning_content
def _normalize_tool_calls_for_template(tool_calls: list) -> list[dict]:
"""Ensure tool_calls are plain OpenAI-compatible dicts with string arguments.
Handles cases where function.arguments is a dict instead of a JSON string,
or where individual tool_call entries are non-dict objects (e.g. Pydantic models).
"""
normalized: list[dict] = []
for i, tc in enumerate(tool_calls):
if not isinstance(tc, dict):
try:
tc = dict(tc) # type: ignore[call-overload]
except Exception:
continue
func = tc.get("function")
if func is None:
continue
if not isinstance(func, dict):
try:
func = dict(func) # type: ignore[call-overload]
except Exception:
func = {"name": str(func), "arguments": "{}"}
else:
func = dict(func) # shallow copy so we don't mutate original
args = func.get("arguments")
if not isinstance(args, str):
func["arguments"] = json.dumps(args, ensure_ascii=False) if args is not None else "{}"
normalized.append({
"id": tc.get("id") or f"call_{i}",
"type": tc.get("type", "function"),
"function": func,
})
return normalized
def _normalize_tools_for_template(tools) -> list | None:
"""Convert tools from Anthropic format to OpenAI format expected by chat templates.
Anthropic format: {"name": ..., "description": ..., "input_schema": {...}}
OpenAI format: {"type": "function", "function": {"name": ..., "parameters": {...}}}
The Qwen3 (and other) chat templates use ``tool.function.parameters | items``
which raises TypeError if the tool is in Anthropic format.
"""
if not tools:
return tools
out: list[dict] = []
for tool in tools:
if not isinstance(tool, dict):
out.append(tool)
continue
# Already in OpenAI format
if tool.get("type") == "function" and "function" in tool:
func = tool["function"]
if isinstance(func, dict):
out.append(tool)
else:
out.append(tool)
continue
# Anthropic format: top-level name + input_schema
name = tool.get("name") or ""
if name:
out.append({
"type": "function",
"function": {
"name": name,
"description": tool.get("description", ""),
"parameters": (
tool.get("input_schema")
or tool.get("parameters")
or {"type": "object", "properties": {}}
),
},
})
else:
out.append(tool)
return out
def _normalize_messages_for_template(messages: list[dict]) -> list[dict]:
"""Normalize OpenClaw-style messages into chat-template-compatible format."""
out = []
for msg in messages:
m = dict(msg)
role = m.get("role")
if role == "developer":
m["role"] = "system"
role = "system"
# OpenClaw tool result message β OpenAI tool message
if role == "toolResult":
tool_msg: dict[str, Any] = {
"role": "tool",
"content": _flatten_message_content(m.get("content")),
}
tc_id = m.get("toolCallId") or m.get("tool_call_id")
if tc_id:
tool_msg["tool_call_id"] = tc_id
tool_name = m.get("toolName") or m.get("name")
if tool_name:
tool_msg["name"] = tool_name
out.append(tool_msg)
continue
# assistant content parts may contain text + toolCall blocks
raw = m.get("content")
if role == "assistant" and isinstance(raw, list):
text, tool_calls = _normalize_assistant_content_parts(raw)
m["content"] = text
if tool_calls:
m["tool_calls"] = tool_calls
elif not isinstance(raw, str) and raw is not None:
m["content"] = _flatten_message_content(raw)
# Ensure any existing tool_calls are proper plain dicts with string arguments
# so that Jinja2 chat templates don't fail with "Can only get item pairs from a mapping"
if role == "assistant":
existing_tcs = m.get("tool_calls")
if existing_tcs and isinstance(existing_tcs, list):
m["tool_calls"] = _normalize_tool_calls_for_template(existing_tcs)
out.append(m)
return out
def _extract_last_user_instruction(messages: list[dict]) -> str:
"""Return the most recent user message text from the current turn context."""
for msg in reversed(messages):
if isinstance(msg, dict) and msg.get("role") == "user":
text = _flatten_message_content(msg.get("content"))
if text:
return text
return ""
def _extract_logprobs_from_chat_response(choice: dict[str, Any]) -> list[float]:
logprobs_obj = choice.get("logprobs")
if not isinstance(logprobs_obj, dict):
return []
content = logprobs_obj.get("content")
if not isinstance(content, list):
return []
return [float(item.get("logprob", 0.0)) for item in content if isinstance(item, dict)]
def _rewrite_new_session_bootstrap_prompt(messages: list[dict]) -> tuple[list[dict], int]:
"""Rewrite OpenClaw /new bootstrap user prompt to a safer variant.
Some upstream providers over-trigger policy filters on the stock bootstrap
text ("A new session was started via /new or /reset ..."). This keeps
behavior while avoiding brittle phrasing.
"""
rewritten = 0
out: list[dict] = []
for msg in messages:
if not isinstance(msg, dict):
out.append(msg)
continue
if msg.get("role") != "user":
out.append(msg)
continue
text = _flatten_message_content(msg.get("content"))
lowered = text.lower()
if "a new session was started via /new or /reset" in lowered:
out.append(
{
**msg,
"content": (
"A new chat session just started. "
"Greet the user briefly in 1-3 sentences and ask what they want to do."
),
}
)
rewritten += 1
continue
out.append(msg)
return out, rewritten
# ------------------------------------------------------------------ #
# MetaClawAPIServer #
# ------------------------------------------------------------------ #
class MetaClawAPIServer:
"""Proxy between OpenClaw and Tinker for RL training data collection.
OpenClaw sends ``X-Session-Id`` and ``X-Turn-Type`` headers with every
request. The proxy forwards to Tinker SamplingClient, and when
``turn_type`` is ``"main"`` it tokenises the full prompt+response and
submits a training sample. Side tasks (``turn_type != "main"``) are
forwarded but produce no training data.
Parameters
----------
config:
MetaClawConfig instance.
output_queue:
Thread-safe queue for (group_index, [ConversationSample]) tuples.
Created and owned by the rollout worker.
submission_enabled:
threading.Event that gates sample submission.
Set = accepting samples; clear = paused for weight update.
sampling_client:
Tinker SamplingClient. Can be None and set later via
update_sampling_client().
skill_manager:
Optional SkillManager for injecting skills into system prompts.
prm_scorer:
Optional PRMScorer. If None, all samples get reward=0.
"""
def __init__(
self,
config: MetaClawConfig,
output_queue: queue.Queue,
submission_enabled: threading.Event,
sampling_client=None,
skill_manager: Optional[SkillManager] = None,
prm_scorer: Optional[PRMScorer] = None,
skill_evolver=None,
last_request_tracker=None,
memory_manager=None,
):
self.config = config
self.output_queue = output_queue
self.submission_enabled = submission_enabled
self._sampling_client = sampling_client
self.skill_manager = skill_manager
self.prm_scorer = prm_scorer
self.skill_evolver = skill_evolver
self.memory_manager = memory_manager
self._verifier = build_verifier_safe(config, logger=logger)
# Optional LastRequestTracker for scheduler idle detection
self._last_request_tracker = last_request_tracker
self._served_model = config.served_model_name
self._expected_api_key = config.api_key
os.makedirs(config.record_dir, exist_ok=True)
self._system_prompt_cache_file = os.path.join(
config.record_dir, "system_prompt_cache.json"
)
# State machines
self._index_counter = count(0)
self._group_counter = count(0)
self._turn_counts: dict[str, int] = {}
self._pending_turn_data: dict[str, dict[int, dict]] = {} # session β {turn β data}
self._prm_tasks: dict[str, dict[int, asyncio.Task]] = {} # session β {turn β task}
self._teacher_tasks: dict[str, dict[int, asyncio.Task]] = {} # session β {turn β task} (OPD)
self._pending_records: dict[str, dict] = {} # for record logging
self._session_effective: dict[str, int] = {} # at-least-one guarantee
# Buffer turns per session for skill evolution (cleared on evolution trigger)
self._session_turns: dict[str, list] = {}
# Buffer turns per session for memory ingestion (only cleared on session_done)
self._session_memory_turns: dict[str, list] = {}
self._session_memory_scopes: dict[str, str] = {}
# OPD teacher model client
self._teacher_client: Optional[OpenAI] = None
if config.use_opd and config.teacher_url:
self._teacher_client = OpenAI(
base_url=config.teacher_url,
api_key=config.teacher_api_key or "unused",
)
logger.info("[OpenClaw] OPD teacher client ready: url=%s model=%s",
config.teacher_url, config.teacher_model)
elif config.use_opd and not config.teacher_url:
logger.warning("[OpenClaw] use_opd=True but teacher_url is empty β teacher logprobs disabled")
# Record files
self._record_file = ""
self._prm_record_file = ""
if config.record_enabled:
os.makedirs(config.record_dir, exist_ok=True)
self._record_file = os.path.join(config.record_dir, "conversations.jsonl")
self._prm_record_file = os.path.join(config.record_dir, "prm_scores.jsonl")
with open(self._record_file, "w"):
pass
with open(self._prm_record_file, "w"):
pass
# Tokenizer is used in both modes for prompt length accounting/truncation,
# and in RL mode additionally for training sample tokenization.
self._tokenizer = self._load_tokenizer()
self.app = self._build_app()
# Threading lifecycle (set by start())
self._server: Optional[uvicorn.Server] = None
self._thread: Optional[threading.Thread] = None
# External trainer reference + event loop for admin train-step endpoint.
# Set via set_trainer() after the trainer is constructed.
self._trainer = None
self._main_loop = None
# ------------------------------------------------------------------ #
# Tokenizer #
# ------------------------------------------------------------------ #
def _load_tokenizer(self):
try:
from transformers import AutoTokenizer
return AutoTokenizer.from_pretrained(
self.config.model_name, trust_remote_code=True
)
except Exception as e:
logger.warning("[OpenClaw] could not load tokenizer: %s", e)
return None
# ------------------------------------------------------------------ #
# FastAPI app #
# ------------------------------------------------------------------ #
def set_trainer(self, trainer, main_loop) -> None:
"""Inject the trainer reference and its event loop for the admin
``/v1/admin/train_step`` endpoint. Called by the launcher after
the trainer is constructed."""
self._trainer = trainer
self._main_loop = main_loop
def _build_app(self) -> FastAPI:
app = FastAPI(title="MetaClaw Proxy")
app.state.owner = self
@app.get("/healthz")
async def healthz():
return {"ok": True}
@app.post("/v1/chat/completions")
async def chat_completions(
request: Request,
authorization: Optional[str] = Header(default=None),
x_session_id: Optional[str] = Header(default=None),
x_turn_type: Optional[str] = Header(default=None),
x_session_done: Optional[str] = Header(default=None),
x_memory_scope: Optional[str] = Header(default=None),
x_user_id: Optional[str] = Header(default=None),
x_workspace_id: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
# Update idle tracker so the scheduler knows the user is active
if owner._last_request_tracker is not None:
owner._last_request_tracker.touch()
await owner._check_auth(authorization)
if not owner.submission_enabled.is_set():
# Queue requests while submission is paused instead of returning 503.
# Use a bounded wait so clients don't hang forever on abnormal stalls.
resumed = await asyncio.to_thread(owner.submission_enabled.wait, 300.0)
if not resumed:
raise HTTPException(
status_code=503,
detail="submission paused for weight update (wait timeout)",
)
body = await request.json()
incoming_messages = body.get("messages", [])
if isinstance(incoming_messages, list):
rewritten_messages, rewritten = _rewrite_new_session_bootstrap_prompt(
incoming_messages
)
body["messages"] = rewritten_messages
else:
rewritten = 0
_raw_sid = x_session_id or body.get("session_id") or ""
# TUI mode: OpenClaw does not send X-Session-Id/X-Turn-Type.
# Fall back to a model-derived session ID and treat as "main" so
# TUI conversations are collected as training data.
if _raw_sid:
session_id = _raw_sid
turn_type = (x_turn_type or body.get("turn_type") or "side").strip().lower()
else:
session_id = f"tui-{body.get('model', 'default')}"
turn_type = (x_turn_type or body.get("turn_type") or "main").strip().lower()
session_done = (
(x_session_done and x_session_done.strip().lower() in {"1", "true", "yes", "on"})
or str(body.get("session_done", "")).strip().lower() in {"1", "true", "yes", "on"}
)
# Do not infer session_done from bootstrap text β only explicit X-Session-Done or body session_done trigger evolution.
# Reuse cached scope for the session to avoid re-deriving
# (which would nest |session:X repeatedly).
_explicit_scope = (x_memory_scope or "") or str(body.get("memory_scope", "") or "")
_explicit_user = (x_user_id or "") or str(body.get("user_id", "") or "")
_explicit_workspace = (x_workspace_id or "") or str(body.get("workspace_id", "") or "")
_cached = owner._session_memory_scopes.get(session_id, "")
if _cached and not _explicit_scope and not _explicit_user and not _explicit_workspace:
memory_scope = _cached
else:
memory_scope = derive_memory_scope(
default_scope=owner.memory_manager.scope_id if owner.memory_manager else "default",
session_id=session_id,
memory_scope=_explicit_scope,
user_id=_explicit_user,
workspace_id=_explicit_workspace,
)
stream = bool(body.get("stream", False))
result = await owner._handle_request(
body,
session_id=session_id,
turn_type=turn_type,
session_done=session_done,
memory_scope=memory_scope,
)
if stream:
return StreamingResponse(
owner._stream_response(result), media_type="text/event-stream"
)
return JSONResponse(content=result["response"])
@app.post("/v1/admin/train_step")
async def admin_train_step(request: Request):
"""Trigger a single RL training step using queued samples.
Intended to be called by ``metaclaw train-step`` CLI or any
external orchestrator (e.g. benchmark scripts).
The trainer runs on the main event loop while this server runs
on a separate uvicorn thread. We schedule the coroutine onto
the main loop via run_coroutine_threadsafe, then wait for the
result in a thread-pool worker (asyncio.to_thread) so that the
uvicorn event loop stays free to process inference requests
during training.
"""
owner: MetaClawAPIServer = request.app.state.owner
if owner._trainer is None or owner._main_loop is None:
raise HTTPException(
status_code=503,
detail="trainer not available (not in RL mode or not yet initialised)",
)
import concurrent.futures
future = asyncio.run_coroutine_threadsafe(
owner._trainer.train_step_external(),
owner._main_loop,
)
def _wait():
try:
return future.result(timeout=600)
except concurrent.futures.TimeoutError:
raise RuntimeError("train step timed out (600s)")
try:
result = await asyncio.to_thread(_wait)
except RuntimeError as exc:
raise HTTPException(status_code=504, detail=str(exc))
except Exception as exc:
raise HTTPException(status_code=500, detail=f"train step failed: {exc}")
return JSONResponse(content=result)
# ---------------------------------------------------------- #
# Memory management REST API #
# ---------------------------------------------------------- #
@app.get("/v1/memory/stats")
async def memory_stats(
request: Request,
scope: str = "",
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
stats = await asyncio.to_thread(
owner.memory_manager.get_scope_stats, scope or None
)
return JSONResponse(content=stats)
@app.get("/v1/memory/search")
async def memory_search(
request: Request,
q: str = "",
scope: str = "",
limit: int = 20,
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
results = await asyncio.to_thread(
owner.memory_manager.search_memories, q, scope or None, limit
)
return JSONResponse(content=results)
@app.get("/v1/memory/health")
async def memory_health(
request: Request,
scope: str = "",
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
health = await asyncio.to_thread(
owner.memory_manager.run_system_health_check, scope or None
)
return JSONResponse(content=health)
@app.get("/v1/memory/summary")
async def memory_summary(
request: Request,
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
summary = await asyncio.to_thread(
owner.memory_manager.get_system_summary
)
return JSONResponse(content=summary)
@app.get("/v1/memory/{memory_id}")
async def memory_get(
request: Request,
memory_id: str,
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
mem = await asyncio.to_thread(
owner.memory_manager.get_memory, memory_id
)
if not mem:
raise HTTPException(status_code=404, detail="memory not found")
return JSONResponse(content={
"memory_id": mem.memory_id,
"scope_id": mem.scope_id,
"type": mem.memory_type.value,
"content": mem.content,
"summary": mem.summary,
"importance": mem.importance,
"entities": mem.entities,
"topics": mem.topics,
"tags": mem.tags,
"pinned": mem.importance >= 0.99,
"created_at": mem.created_at,
"updated_at": mem.updated_at,
"expires_at": mem.expires_at,
})
@app.post("/v1/memory/action-plan")
async def memory_action_plan(
request: Request,
scope: str = "",
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
plan = await asyncio.to_thread(
owner.memory_manager.generate_action_plan, scope or None
)
return JSONResponse(content=plan)
@app.post("/v1/memory/maintenance")
async def memory_maintenance(
request: Request,
scope: str = "",
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
result = await asyncio.to_thread(
owner.memory_manager.run_maintenance, scope or None
)
return JSONResponse(content=result)
@app.get("/v1/memory/feedback-analysis")
async def memory_feedback_analysis(
request: Request,
scope: str = "",
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
result = await asyncio.to_thread(
owner.memory_manager.analyze_feedback_patterns, scope or None
)
return JSONResponse(content=result)
@app.get("/v1/memory/operator-report")
async def memory_operator_report(
request: Request,
scope: str = "",
authorization: Optional[str] = Header(default=None),
):
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
report = await asyncio.to_thread(
owner.memory_manager.generate_operator_report, scope or None
)
return JSONResponse(content=report)
@app.post("/v1/memory/ingest")
async def memory_ingest(
request: Request,
authorization: Optional[str] = Header(default=None),
):
"""Manually trigger memory ingestion for buffered sessions.
Request body: {"session_id": "...", "scope": "..."}
If session_id is provided, ingest that specific session.
If session_id is empty or omitted, ingest ALL buffered sessions.
"""
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
body = await request.json()
sid = str(body.get("session_id", "")).strip()
explicit_scope = str(body.get("scope", "")).strip()
if sid:
# Ingest a specific session.
sessions_to_ingest = {sid: owner._session_memory_turns.pop(sid, [])}
else:
# Ingest ALL buffered sessions.
sessions_to_ingest = dict(owner._session_memory_turns)
owner._session_memory_turns.clear()
total_added = 0
total_turns = 0
results = []
for s_id, turns in sessions_to_ingest.items():
if not turns:
continue
raw_scope = explicit_scope or owner._session_memory_scopes.pop(s_id, "")
scope = base_scope(raw_scope) if raw_scope else None
logger.info("[Memory] manual ingest session=%s scope=%s β %d buffered turns", s_id, scope, len(turns))
added = await asyncio.to_thread(
owner.memory_manager.ingest_session_turns, s_id, turns, scope,
)
total_added += added
total_turns += len(turns)
results.append({"session_id": s_id, "added": added, "turns": len(turns)})
if not results:
buffered_keys = list(owner._session_memory_turns.keys())
logger.info("[Memory] manual ingest β no buffered turns (buffered_sessions=%s)", buffered_keys)
return JSONResponse(content={"added": 0, "buffered_turns": 0, "sessions": []})
logger.info("[Memory] manual ingest complete: %d sessions, %d turns, %d units added",
len(results), total_turns, total_added)
return JSONResponse(content={
"added": total_added,
"buffered_turns": total_turns,
"sessions": results,
})
@app.post("/v1/memory/buffer_turn")
async def memory_buffer_turn(
request: Request,
authorization: Optional[str] = Header(default=None),
):
"""Append one turn to a session's incremental buffer.
Request body: {"session_id": "...", "turn": {"prompt_text": "...", "response_text": "..."}, "scope_id": "..."}
Auto-flushes when the buffer reaches flush_every turns.
Returns {"flushed": bool, "added": int | null}.
"""
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
body = await request.json()
session_id = str(body.get("session_id", "")).strip()
turn = body.get("turn", {})
raw_scope = str(body.get("scope_id", "")).strip()
scope = base_scope(raw_scope) if raw_scope else None
result = await asyncio.to_thread(
owner.memory_manager.buffer_turn, session_id, turn, scope,
)
return JSONResponse(content={"flushed": result is not None, "added": result})
@app.post("/v1/memory/flush_session")
async def memory_flush_session(
request: Request,
authorization: Optional[str] = Header(default=None),
):
"""Flush buffered turns for a session.
Request body: {"session_id": "...", "scope_id": "...", "final": true}
When final=true (default), emits a working_summary and clears session state.
Returns {"added": int}.
"""
owner: MetaClawAPIServer = request.app.state.owner
await owner._check_auth(authorization)
if not owner.memory_manager:
raise HTTPException(status_code=503, detail="memory not enabled")
body = await request.json()
session_id = str(body.get("session_id", "")).strip()
raw_scope = str(body.get("scope_id", "")).strip()
scope = base_scope(raw_scope) if raw_scope else None
final = bool(body.get("final", True))
added = await asyncio.to_thread(
owner.memory_manager.flush_session, session_id, scope, final,