Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ Remove for now, call the (print(app.get_graph().draw_mermaid())) graph.py to vie
- **Video Analysis Pipeline**: Generates comprehensive descriptions of video content and context.
- **Image Analysis Pipeline**: Provides end-to-end emotion recognition for static images, complete with visual descriptions and emotional synthesis.
- **Full MER Pipeline**: An end-to-end multimodal pipeline that identifies peak emotional moments, analyzes all modalities (visual, audio, facial), and synthesizes a holistic emotional reasoning summary.
- **Gate Agent (Experimental)**: An optional quality control layer that reviews intermediate analysis results. Following the "garbage in, garbage out" principle, it rejects low-quality or conflicting outputs and prompts sub-agents to refine their analysis before final synthesis. Enable with `--use-gate-agent`.

Check out example outputs here:
- [llava-llama3_llama3.2_merr_data.json](examples/llava-llama3_llama3.2_merr_data.json)
Expand Down Expand Up @@ -167,6 +168,7 @@ python dashboard.py
| `--ollama-text-model` | `-otm` | Ollama text model name | None |
| `--chatgpt-model` | `-cgm` | ChatGPT model name (e.g., gpt-4o) | None |
| `--huggingface-model` | `-hfm` | Hugging Face model ID | None |
| `--use-gate-agent` | `-uga` | Enable Gate Agent for quality control (Dev Feature) | False |

### Processing Types

Expand Down
2 changes: 2 additions & 0 deletions README_zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ MER-Factory 正在积极开发中,新功能会定期添加 - 查看我们的[
- **视频分析流程**:生成视频内容与上下文的全面描述。
- **图像分析流程**:为静态图像提供端到端情感识别,包含视觉描述与情感综合。
- **完整 MER 流程**:端到端多模态流程,定位情感峰值时刻,分析所有模态(视觉、音频、面部),并综合生成整体情感推理总结。
- **Gate Agent (实验性功能)**:一个可选的质量控制层,用于审查中间分析结果。遵循“垃圾进,垃圾出”的原则,它会拒绝低质量或相互冲突的输出,并提示子代理在最终合成之前完善其分析。使用 `--use-gate-agent` 启用。

查看示例输出:
- [llava-llama3_llama3.2_merr_data.json](examples/llava-llama3_llama3.2_merr_data.json)
Expand Down Expand Up @@ -161,6 +162,7 @@ python dashboard.py
| `--ollama-text-model` | `-otm` | Ollama 文本模型名称 | None |
| `--chatgpt-model` | `-cgm` | ChatGPT 模型名称(例如 gpt-4o) | None |
| `--huggingface-model` | `-hfm` | Hugging Face 模型 ID | None |
| `--use-gate-agent` | `-uga` | 启用 Gate Agent 进行质量控制(开发功能) | False |

### 处理类型

Expand Down
9 changes: 8 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def main_orchestrator(config: AppConfig):
# --- Phase 2: Main Processing ---
console.rule("[bold blue]Phase 2: Main Processing[/bold blue]")
is_sync_model = models.model_type == "huggingface"
graph_app = create_graph(use_sync_nodes=is_sync_model)
graph_app = create_graph(use_sync_nodes=is_sync_model, use_gate_agent=config.use_gate_agent)

initial_state_builder = functools.partial(
build_initial_state,
Expand Down Expand Up @@ -185,6 +185,12 @@ def process(
"-ca",
help="Reuse existing audio/video/AU results from previous pipeline runs & cache LLM calls.",
),
use_gate_agent: bool = typer.Option(
False,
"--use-gate-agent",
"-uga",
help="Enable the Gate Agent for quality control and refinement (Dev Feature).",
),
):
"""Processes media files for Multimodal Emotion Recognition and Reasoning (MERR)."""
try:
Expand All @@ -200,6 +206,7 @@ def process(
silent=silent,
cache=cache,
concurrency=concurrency,
use_gate_agent=use_gate_agent,
ollama_vision_model=ollama_vision_model,
ollama_text_model=ollama_text_model,
chatgpt_model=chatgpt_model,
Expand Down
46 changes: 44 additions & 2 deletions mer_factory/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,25 @@ def route_after_video_generation(state: MERRState) -> str:
return "handle_error"


def create_graph(use_sync_nodes: bool = False):
def route_gate_agent(state: MERRState) -> str:
"""Routes flow based on the Gate Agent's decision."""
if state.get("error"):
return "handle_error"

decision = state.get("gate_decision", "pass")
if decision == "pass":
return "synthesize_summary"

retry_target = state.get("retry_target")
if retry_target:
if state.get("verbose", True):
console.log(f"[yellow]Gate Agent triggering retry at: {retry_target}[/yellow]")
return retry_target

return "synthesize_summary" # Fallback


def create_graph(use_sync_nodes: bool = False, use_gate_agent: bool = False):
"""
Creates and compiles the modular MERR construction graph.
It can create either an asynchronous or a synchronous graph based on the flag.
Expand All @@ -86,6 +104,8 @@ def create_graph(use_sync_nodes: bool = False):
else:
console.log("Creating an [bold green]asynchronous[/bold green] graph.")
from .nodes import async_nodes as nodes
if use_gate_agent:
from .nodes.gate_agent import GateAgent

workflow = StateGraph(MERRState)

Expand Down Expand Up @@ -116,6 +136,11 @@ def create_graph(use_sync_nodes: bool = False):
"generate_peak_frame_visual_description",
nodes.generate_peak_frame_visual_description,
)

if use_gate_agent and not use_sync_nodes:
gate_agent_node = GateAgent()
workflow.add_node("gate_agent", gate_agent_node.run)

workflow.add_node("synthesize_summary", nodes.synthesize_summary)
workflow.add_node("save_mer_results", nodes.save_mer_results)

Expand Down Expand Up @@ -191,7 +216,24 @@ def create_graph(use_sync_nodes: bool = False):

# 3. Define MER pipeline sequence
workflow.add_edge("extract_peak_image", "generate_audio_description")
workflow.add_edge("generate_peak_frame_visual_description", "synthesize_summary")

if use_gate_agent and not use_sync_nodes:
workflow.add_edge("generate_peak_frame_visual_description", "gate_agent")
# Gate Agent routing
workflow.add_conditional_edges(
"gate_agent",
route_gate_agent,
{
"synthesize_summary": "synthesize_summary",
"generate_audio_description": "generate_audio_description",
"generate_video_description": "generate_video_description",
"generate_peak_frame_visual_description": "generate_peak_frame_visual_description",
"handle_error": "handle_error",
},
)
else:
workflow.add_edge("generate_peak_frame_visual_description", "synthesize_summary")

workflow.add_edge("synthesize_summary", "save_mer_results")

# 4. Define Image pipeline sequence
Expand Down
70 changes: 56 additions & 14 deletions mer_factory/nodes/async_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ def _save():


async def generate_audio_description(state):
# Skip if already passed (output exists) and no new prompt (not a retry target)
if state.get("audio_analysis_results") and not state.get("dynamic_prompts", {}).get("audio"):
if state.get("verbose", True):
console.log("[dim]Skipping Audio Analysis (already passed).[/dim]")
return {}

# reuse existing audio analysis results if available
if state.get("processing_type") == "MER" and state.get("cache"):
output_path = (
Expand Down Expand Up @@ -97,12 +103,19 @@ async def generate_audio_description(state):
processing_type = state.get("processing_type")
ground_truth_label = state.get("ground_truth_label")

# if processing_type is audio we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from audio alone.
has_label = bool(ground_truth_label) if processing_type == "audio" else False
prompt = prompts.get_audio_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)
# Check for dynamic prompt from Gate Agent
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Cache logic blocks Gate Agent refinement retries

When cache is enabled, the function checks for and loads existing results before checking for dynamic_prompts. This causes the node to return the cached (rejected) output instead of generating a new analysis with the refined prompt, effectively disabling the Gate Agent's retry mechanism when caching is active.

Fix in Cursor Fix in Web

dynamic_prompt = state.get("dynamic_prompts", {}).get("audio")
if dynamic_prompt:
prompt = dynamic_prompt
if verbose:
console.log("[yellow]Using dynamic prompt for Audio Analysis[/yellow]")
else:
# if processing_type is audio we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from audio alone.
has_label = bool(ground_truth_label) if processing_type == "audio" else False
prompt = prompts.get_audio_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)

audio_analysis = await model.analyze_audio(audio_path, prompt)
if verbose:
Expand Down Expand Up @@ -135,6 +148,11 @@ def _save():


async def generate_video_description(state):
# Skip if already passed (output exists) and no new prompt (not a retry target)
if state.get("video_description") and not state.get("dynamic_prompts", {}).get("video"):
if state.get("verbose", True):
console.log("[dim]Skipping Video Analysis (already passed).[/dim]")
return {}
if state.get("processing_type") == "MER" and state.get("cache"):
output_path = (
Path(state["video_output_dir"]) / f"{state['video_id']}_video_analysis.json"
Expand All @@ -158,12 +176,19 @@ async def generate_video_description(state):

processing_type = state.get("processing_type")
ground_truth_label = state.get("ground_truth_label")
# if processing_type is video, we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from video alone.
has_label = bool(ground_truth_label) if processing_type == "video" else False
prompt = prompts.get_video_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)
# Check for dynamic prompt from Gate Agent
dynamic_prompt = state.get("dynamic_prompts", {}).get("video")
if dynamic_prompt:
prompt = dynamic_prompt
if verbose:
console.log("[yellow]Using dynamic prompt for Video Description[/yellow]")
else:
# if processing_type is video, we pass the label to the prompt
# otherwise (MER), we do not, because emotion cannot be inferred from video alone.
has_label = bool(ground_truth_label) if processing_type == "video" else False
prompt = prompts.get_video_prompt(has_label)
if has_label:
prompt = prompt.format(label=ground_truth_label)

video_description = await model.describe_video(video_path, prompt)
if verbose and video_description:
Expand Down Expand Up @@ -328,22 +353,39 @@ async def extract_peak_image(state):

async def generate_peak_frame_visual_description(state):
"""Generates a visual description for the peak frame image."""
# Skip if already passed (output exists) and no new prompt (not a retry target)
if state.get("image_visual_description") and not state.get("dynamic_prompts", {}).get("peak_frame"):
if state.get("verbose", True):
console.log("[dim]Skipping Peak Frame Analysis (already passed).[/dim]")
return {}
verbose = state.get("verbose", True)
if verbose:
console.log("Generating visual description for peak frame...")
model: LLMModels = state["models"].model_instance
prompts: PromptTemplates = state["prompts"]
peak_frame_path = Path(state["peak_frame_path"])

# No label for peak frame, since this is use for MER.
prompt = prompts.get_image_prompt()
# Check for dynamic prompt from Gate Agent
dynamic_prompt = state.get("dynamic_prompts", {}).get("peak_frame")
if dynamic_prompt:
prompt = dynamic_prompt
if verbose:
console.log("[yellow]Using dynamic prompt for Peak Frame Visual Description[/yellow]")
else:
# No label for peak frame, since this is use for MER.
prompt = prompts.get_image_prompt()

visual_obj_desc = await model.describe_image(peak_frame_path, prompt)

if verbose:
console.log(f"Peak Frame Visual Description: [cyan]{visual_obj_desc}[/cyan]")
return {"image_visual_description": visual_obj_desc}






async def synthesize_summary(state):
verbose = state.get("verbose", True)
if verbose:
Expand Down
Loading