Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
MER-Factory is under active development with new features being added regularly - check our [roadmap](https://github.com/Lum1104/MER-Factory/wiki) and welcome contributions!

<div style="text-align: center;">
<img src="docs/assets/framework.svg" style="border: none; width: 100%; max-width: 1000px;">
<img src="docs/assets/mer-factory.jpeg" style="border: none; width: 100%; max-width: 1000px;">
<!-- the figure generate by gemini 3, many thanks! -->
</div>

## Table of Contents
Expand Down Expand Up @@ -72,6 +73,7 @@ Remove for now, call the (print(app.get_graph().draw_mermaid())) graph.py to vie
- **Video Analysis Pipeline**: Generates comprehensive descriptions of video content and context.
- **Image Analysis Pipeline**: Provides end-to-end emotion recognition for static images, complete with visual descriptions and emotional synthesis.
- **Full MER Pipeline**: An end-to-end multimodal pipeline that identifies peak emotional moments, analyzes all modalities (visual, audio, facial), and synthesizes a holistic emotional reasoning summary.
- **Gate Agent (Experimental)**: An optional quality control layer that reviews intermediate analysis results. Following the "garbage in, garbage out" principle, it rejects low-quality or conflicting outputs and prompts sub-agents to refine their analysis before final synthesis. Enable with `--use-gate-agent`.

Check out example outputs here:
- [llava-llama3_llama3.2_merr_data.json](examples/llava-llama3_llama3.2_merr_data.json)
Expand Down Expand Up @@ -167,6 +169,7 @@ python dashboard.py
| `--ollama-text-model` | `-otm` | Ollama text model name | None |
| `--chatgpt-model` | `-cgm` | ChatGPT model name (e.g., gpt-4o) | None |
| `--huggingface-model` | `-hfm` | Hugging Face model ID | None |
| `--use-gate-agent` | `-uga` | Enable Gate Agent for quality control (Dev Feature) | False |

### Processing Types

Expand Down
4 changes: 3 additions & 1 deletion README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
MER-Factory 正在积极开发中,新功能会定期添加 - 查看我们的[路线图](https://github.com/Lum1104/MER-Factory/wiki),欢迎贡献!

<div style="text-align: center;">
<img src="docs/assets/framework.svg" style="border: none; width: 100%; max-width: 1000px;">
<img src="docs/assets/mer-factory.jpeg" style="border: none; width: 100%; max-width: 1000px;">
</div>

## 目录
Expand Down Expand Up @@ -66,6 +66,7 @@ MER-Factory 正在积极开发中,新功能会定期添加 - 查看我们的[
- **视频分析流程**:生成视频内容与上下文的全面描述。
- **图像分析流程**:为静态图像提供端到端情感识别,包含视觉描述与情感综合。
- **完整 MER 流程**:端到端多模态流程,定位情感峰值时刻,分析所有模态(视觉、音频、面部),并综合生成整体情感推理总结。
- **Gate Agent (实验性功能)**:一个可选的质量控制层,用于审查中间分析结果。遵循“垃圾进,垃圾出”的原则,它会拒绝低质量或相互冲突的输出,并提示子代理在最终合成之前完善其分析。使用 `--use-gate-agent` 启用。

查看示例输出:
- [llava-llama3_llama3.2_merr_data.json](examples/llava-llama3_llama3.2_merr_data.json)
Expand Down Expand Up @@ -161,6 +162,7 @@ python dashboard.py
| `--ollama-text-model` | `-otm` | Ollama 文本模型名称 | None |
| `--chatgpt-model` | `-cgm` | ChatGPT 模型名称(例如 gpt-4o) | None |
| `--huggingface-model` | `-hfm` | Hugging Face 模型 ID | None |
| `--use-gate-agent` | `-uga` | 启用 Gate Agent 进行质量控制(开发功能) | False |

### 处理类型

Expand Down
Binary file added docs/assets/mer-factory.jpeg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
9 changes: 8 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def main_orchestrator(config: AppConfig):
# --- Phase 2: Main Processing ---
console.rule("[bold blue]Phase 2: Main Processing[/bold blue]")
is_sync_model = models.model_type == "huggingface"
graph_app = create_graph(use_sync_nodes=is_sync_model)
graph_app = create_graph(use_sync_nodes=is_sync_model, use_gate_agent=config.use_gate_agent)

initial_state_builder = functools.partial(
build_initial_state,
Expand Down Expand Up @@ -185,6 +185,12 @@ def process(
"-ca",
help="Reuse existing audio/video/AU results from previous pipeline runs & cache LLM calls.",
),
use_gate_agent: bool = typer.Option(
False,
"--use-gate-agent",
"-uga",
help="Enable the Gate Agent for quality control and refinement (Dev Feature).",
),
):
"""Processes media files for Multimodal Emotion Recognition and Reasoning (MERR)."""
try:
Expand All @@ -200,6 +206,7 @@ def process(
silent=silent,
cache=cache,
concurrency=concurrency,
use_gate_agent=use_gate_agent,
ollama_vision_model=ollama_vision_model,
ollama_text_model=ollama_text_model,
chatgpt_model=chatgpt_model,
Expand Down
46 changes: 44 additions & 2 deletions mer_factory/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,25 @@ def route_after_video_generation(state: MERRState) -> str:
return "handle_error"


def create_graph(use_sync_nodes: bool = False):
def route_gate_agent(state: MERRState) -> str:
"""Routes flow based on the Gate Agent's decision."""
if state.get("error"):
return "handle_error"

decision = state.get("gate_decision", "pass")
if decision == "pass":
return "synthesize_summary"

retry_target = state.get("retry_target")
if retry_target:
if state.get("verbose", True):
console.log(f"[yellow]Gate Agent triggering retry at: {retry_target}[/yellow]")
return retry_target

return "synthesize_summary" # Fallback


def create_graph(use_sync_nodes: bool = False, use_gate_agent: bool = False):
"""
Creates and compiles the modular MERR construction graph.
It can create either an asynchronous or a synchronous graph based on the flag.
Expand All @@ -86,6 +104,8 @@ def create_graph(use_sync_nodes: bool = False):
else:
console.log("Creating an [bold green]asynchronous[/bold green] graph.")
from .nodes import async_nodes as nodes
if use_gate_agent:
from .nodes.gate_agent import GateAgent

workflow = StateGraph(MERRState)

Expand Down Expand Up @@ -116,6 +136,11 @@ def create_graph(use_sync_nodes: bool = False):
"generate_peak_frame_visual_description",
nodes.generate_peak_frame_visual_description,
)

if use_gate_agent and not use_sync_nodes:
gate_agent_node = GateAgent()
workflow.add_node("gate_agent", gate_agent_node.run)

workflow.add_node("synthesize_summary", nodes.synthesize_summary)
workflow.add_node("save_mer_results", nodes.save_mer_results)

Expand Down Expand Up @@ -191,7 +216,24 @@ def create_graph(use_sync_nodes: bool = False):

# 3. Define MER pipeline sequence
workflow.add_edge("extract_peak_image", "generate_audio_description")
workflow.add_edge("generate_peak_frame_visual_description", "synthesize_summary")

if use_gate_agent and not use_sync_nodes:
workflow.add_edge("generate_peak_frame_visual_description", "gate_agent")
# Gate Agent routing
workflow.add_conditional_edges(
"gate_agent",
route_gate_agent,
{
"synthesize_summary": "synthesize_summary",
"generate_audio_description": "generate_audio_description",
"generate_video_description": "generate_video_description",
"generate_peak_frame_visual_description": "generate_peak_frame_visual_description",
"handle_error": "handle_error",
},
)
else:
workflow.add_edge("generate_peak_frame_visual_description", "synthesize_summary")

workflow.add_edge("synthesize_summary", "save_mer_results")

# 4. Define Image pipeline sequence
Expand Down
70 changes: 56 additions & 14 deletions mer_factory/nodes/async_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def _save():


async def generate_audio_description(state):
# Skip if already passed (output exists) and no new prompt (not a retry target)
if state.get("audio_analysis_results") and not state.get("dynamic_prompts", {}).get("audio"):
if state.get("verbose", True):
console.log("[dim]Skipping Audio Analysis (already passed).[/dim]")
return {}

# reuse existing audio analysis results if available
if state.get("processing_type") == "MER" and state.get("cache"):
output_path = (
Expand Down Expand Up @@ -97,12 +103,19 @@ async def generate_audio_description(state):
processing_type = state.get("processing_type")
ground_truth_label = state.get("ground_truth_label")

# if processing_type is audio we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from audio alone.
has_label = bool(ground_truth_label) if processing_type == "audio" else False
prompt = prompts.get_audio_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)
# Check for dynamic prompt from Gate Agent

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Cache logic blocks Gate Agent refinement retries

When cache is enabled, the function checks for and loads existing results before checking for dynamic_prompts. This causes the node to return the cached (rejected) output instead of generating a new analysis with the refined prompt, effectively disabling the Gate Agent's retry mechanism when caching is active.

Fix in Cursor Fix in Web

dynamic_prompt = state.get("dynamic_prompts", {}).get("audio")
if dynamic_prompt:
prompt = dynamic_prompt
if verbose:
console.log("[yellow]Using dynamic prompt for Audio Analysis[/yellow]")
else:
# if processing_type is audio we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from audio alone.
has_label = bool(ground_truth_label) if processing_type == "audio" else False
prompt = prompts.get_audio_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)

audio_analysis = await model.analyze_audio(audio_path, prompt)
if verbose:
Expand Down Expand Up @@ -135,6 +148,11 @@ def _save():


async def generate_video_description(state):
# Skip if already passed (output exists) and no new prompt (not a retry target)
if state.get("video_description") and not state.get("dynamic_prompts", {}).get("video"):
if state.get("verbose", True):
console.log("[dim]Skipping Video Analysis (already passed).[/dim]")
return {}
if state.get("processing_type") == "MER" and state.get("cache"):
output_path = (
Path(state["video_output_dir"]) / f"{state['video_id']}_video_analysis.json"
Expand All @@ -158,12 +176,19 @@ async def generate_video_description(state):

processing_type = state.get("processing_type")
ground_truth_label = state.get("ground_truth_label")
# if processing_type is video, we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from video alone.
has_label = bool(ground_truth_label) if processing_type == "video" else False
prompt = prompts.get_video_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)
# Check for dynamic prompt from Gate Agent
dynamic_prompt = state.get("dynamic_prompts", {}).get("video")
if dynamic_prompt:
prompt = dynamic_prompt
if verbose:
console.log("[yellow]Using dynamic prompt for Video Description[/yellow]")
else:
# if processing_type is video, we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from video alone.
has_label = bool(ground_truth_label) if processing_type == "video" else False
prompt = prompts.get_video_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)

video_description = await model.describe_video(video_path, prompt)
if verbose and video_description:
Expand Down Expand Up @@ -328,22 +353,39 @@ async def extract_peak_image(state):

async def generate_peak_frame_visual_description(state):
"""Generates a visual description for the peak frame image."""
# Skip if already passed (output exists) and no new prompt (not a retry target)
if state.get("image_visual_description") and not state.get("dynamic_prompts", {}).get("peak_frame"):
if state.get("verbose", True):
console.log("[dim]Skipping Peak Frame Analysis (already passed).[/dim]")
return {}
verbose = state.get("verbose", True)
if verbose:
console.log("Generating visual description for peak frame...")
model: LLMModels = state["models"].model_instance
prompts: PromptTemplates = state["prompts"]
peak_frame_path = Path(state["peak_frame_path"])

# No label for peak frame, since this is use for MER.
prompt = prompts.get_image_prompt()
# Check for dynamic prompt from Gate Agent
dynamic_prompt = state.get("dynamic_prompts", {}).get("peak_frame")
if dynamic_prompt:
prompt = dynamic_prompt
if verbose:
console.log("[yellow]Using dynamic prompt for Peak Frame Visual Description[/yellow]")
else:
# No label for peak frame, since this is use for MER.
prompt = prompts.get_image_prompt()

visual_obj_desc = await model.describe_image(peak_frame_path, prompt)

if verbose:
console.log(f"Peak Frame Visual Description: [cyan]{visual_obj_desc}[/cyan]")
return {"image_visual_description": visual_obj_desc}






async def synthesize_summary(state):
verbose = state.get("verbose", True)
if verbose:
Expand Down
Loading