Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/memos/mem_feedback/feedback.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ def _single_add_operation(
datetime.now().isoformat()
)
to_add_memory.metadata.background = new_memory_item.metadata.background
to_add_memory.metadata.sources = []

added_ids = self._retry_db_operation(
lambda: self.memory_manager.add([to_add_memory], user_name=user_name, use_batch=False)
Expand Down
114 changes: 103 additions & 11 deletions src/memos/mem_reader/multi_modal_struct.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,9 @@ def _process_multi_modal_data(
# Use MultiModalParser to parse the scene data
# If it's a list, parse each item; otherwise parse as single message
if isinstance(scene_data_info, list):
# Pre-expand multimodal messages
expanded_messages = self._expand_multimodal_messages(scene_data_info)

# Parse each message in the list
all_memory_items = []
# Use thread pool to parse each message in parallel, but keep the original order
Expand All @@ -996,7 +999,7 @@ def _process_multi_modal_data(
need_emb=False,
**kwargs,
)
for msg in scene_data_info
for msg in expanded_messages
]
# collect results in original order
for future in futures:
Expand All @@ -1014,20 +1017,23 @@ def _process_multi_modal_data(
if mode == "fast":
return fast_memory_items
else:
non_file_url_fast_items = [
item for item in fast_memory_items if not self._is_file_url_only_item(item)
]

# Part A: call llm in parallel using thread pool
fine_memory_items = []

with ContextThreadPoolExecutor(max_workers=4) as executor:
future_string = executor.submit(
self._process_string_fine, fast_memory_items, info, custom_tags, **kwargs
self._process_string_fine, non_file_url_fast_items, info, custom_tags, **kwargs
)
future_tool = executor.submit(
self._process_tool_trajectory_fine, fast_memory_items, info, **kwargs
self._process_tool_trajectory_fine, non_file_url_fast_items, info, **kwargs
)
# Use general_llm for skill memory extraction (not fine-tuned for this task)
future_skill = executor.submit(
process_skill_memory_fine,
fast_memory_items=fast_memory_items,
fast_memory_items=non_file_url_fast_items,
info=info,
searcher=self.searcher,
graph_db=self.graph_db,
Expand All @@ -1039,7 +1045,7 @@ def _process_multi_modal_data(
)
future_pref = executor.submit(
process_preference_fine,
fast_memory_items,
non_file_url_fast_items,
info,
self.llm,
self.embedder,
Expand Down Expand Up @@ -1094,19 +1100,21 @@ def _process_transfer_multi_modal_data(
**(raw_nodes[0].metadata.info or {}),
}

# Filter out file-URL-only items for Part A fine processing (same as _process_multi_modal_data)
non_file_url_nodes = [node for node in raw_nodes if not self._is_file_url_only_item(node)]

fine_memory_items = []
# Part A: call llm in parallel using thread pool
with ContextThreadPoolExecutor(max_workers=4) as executor:
future_string = executor.submit(
self._process_string_fine, raw_nodes, info, custom_tags, **kwargs
self._process_string_fine, non_file_url_nodes, info, custom_tags, **kwargs
)
future_tool = executor.submit(
self._process_tool_trajectory_fine, raw_nodes, info, **kwargs
self._process_tool_trajectory_fine, non_file_url_nodes, info, **kwargs
)
# Use general_llm for skill memory extraction (not fine-tuned for this task)
future_skill = executor.submit(
process_skill_memory_fine,
raw_nodes,
non_file_url_nodes,
info,
searcher=self.searcher,
llm=self.general_llm,
Expand All @@ -1118,7 +1126,7 @@ def _process_transfer_multi_modal_data(
)
# Add preference memory extraction
future_pref = executor.submit(
process_preference_fine, raw_nodes, info, self.general_llm, self.embedder, **kwargs
process_preference_fine, non_file_url_nodes, info, self.llm, self.embedder, **kwargs
)

# Collect results
Expand Down Expand Up @@ -1148,6 +1156,90 @@ def _process_transfer_multi_modal_data(
fine_memory_items.extend(items)
return fine_memory_items

@staticmethod
def _expand_multimodal_messages(messages: list) -> list:
"""
Expand messages whose ``content`` is a list into individual
sub-messages so that each modality is routed to its specialised
parser during fast-mode parsing.

For a message like::

{
"content": [
{"type": "text", "text": "Analyze this file"},
{"type": "file", "file": {"file_data": "https://...", ...}},
{"type": "image_url", "image_url": {"url": "https://..."}},
],
"role": "user",
"chat_time": "03:14 PM on 13 March, 2026",
}

The result will be::

[
{"content": "Analyze this file", "role": "user", "chat_time": "..."},
{"type": "file", "file": {"file_data": "https://...", ...}},
{"type": "image_url", "image_url": {"url": "https://..."}},
]

Messages whose ``content`` is already a plain string (or that are
not dicts) are passed through unchanged.
"""
expanded: list = []
for msg in messages:
if not isinstance(msg, dict):
expanded.append(msg)
continue

content = msg.get("content")
if not isinstance(content, list):
expanded.append(msg)
continue

# ---- content is a list: split by modality ----
text_parts: list[str] = []
for part in content:
if not isinstance(part, dict):
text_parts.append(str(part))
continue

part_type = part.get("type", "")
if part_type == "text":
text_parts.append(part.get("text", ""))
elif part_type in ("file", "image", "image_url"):
# Extract as a standalone message for its specialised parser
expanded.append(part)
else:
text_parts.append(f"[{part_type}]")

# Reconstruct a text-only version of the original message
# (preserving role, chat_time, message_id, etc.)
text_content = "\n".join(t for t in text_parts if t.strip())
if text_content.strip():
text_msg = {k: v for k, v in msg.items() if k != "content"}
text_msg["content"] = text_content
expanded.append(text_msg)

return expanded

@staticmethod
def _is_file_url_only_item(item: TextualMemoryItem) -> bool:
"""
Check if a fast memory item contains only file-URL sources.
Args:
item: TextualMemoryItem to check

Returns:
True if all sources are file-type with URL info (metadata only)
"""
sources = item.metadata.sources or []
if not sources:
return False
return all(
getattr(s, "type", None) == "file" and getattr(s, "file_info", None) for s in sources
)

def get_scene_data_info(self, scene_data: list, type: str) -> list[list[Any]]:
"""
Convert normalized MessagesType scenes into scene data info.
Expand Down
Loading