Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions agents/gemini_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@
from .abstract import FactCheckingAgentBaseClass
from typing import Union, List
from google.genai import types
from utils.gemini_utils import get_image_part, generate_image_parts, generate_text_parts
from utils.gemini_utils import (
get_image_part,
generate_image_parts,
generate_text_parts,
generate_screenshot_parts,
)
import asyncio
import time
from tools import summarise_report_factory
from tools import summarise_report_factory, preprocess_inputs
from tools.preprocess_inputs import get_gemini_content
import json
from logger import StructuredLogger
from langfuse.decorators import observe, langfuse_context
Expand Down Expand Up @@ -249,10 +255,7 @@ async def generate_report(self, starting_parts):
remaining_searches=self.remaining_searches,
remaining_screenshots=self.remaining_screenshots,
)
if first_step:
available_functions = ["infer_intent"]
think = False
elif think and self.include_planning_step:
if think and self.include_planning_step:
available_functions = ["plan_next_step"]
else:
banned_functions = ["plan_next_step", "infer_intent"]
Expand Down Expand Up @@ -308,7 +311,6 @@ async def generate_report(self, starting_parts):
)
if len(function_call_promises) == 0:
think = not think
first_step = False
continue
function_results = await asyncio.gather(*function_call_promises)
response_parts = GeminiAgent.flatten_and_organise(function_results)
Expand All @@ -329,7 +331,6 @@ async def generate_report(self, starting_parts):
return return_dict
messages.append(types.Content(parts=response_parts, role="user"))
think = not think
first_step = False
logger.error("Report couldn't be generated after 50 turns")
return {
"error": "Report couldn't be generated after 50 turns",
Expand Down Expand Up @@ -380,13 +381,37 @@ async def generate_note(
}
start_time = time.time() # Start the timer
cost_tracker = {"total_cost": 0, "cost_trace": []} # To store the cost details

preprocessed_response = await preprocess_inputs(
image_url=image_url, caption=caption, text=text
)
if not preprocessed_response.get("success"):
child_logger.error("Error in preprocessing inputs")
return {
"success": False,
"error": "Error in preprocessing inputs",
}
else:
child_logger.info("Preprocessing inputs successful")
screenshots_results = preprocessed_response.get("screenshots", [])
screenshots_content = get_gemini_content(screenshots_results)
results = preprocessed_response.get("result", {})
is_access_blocked = results.get("is_access_blocked", False)
is_video = results.get("is_video", False)
intent = results.get("intent", "An error occurred, figure it out yourself")

if text is not None:
child_logger.info(f"Generating text parts for text: {text}")
parts = generate_text_parts(text)

elif image_url is not None:
parts = generate_image_parts(image_url, caption)

parts.append(types.Part.from_text(f"User's likely intent: {intent}"))

if screenshots_content:
parts.extend(screenshots_content)

report_dict = await self.generate_report(parts.copy())

duration = time.time() - start_time # Calculate duration
Expand All @@ -399,6 +424,8 @@ async def generate_note(
)
if summary_results.get("success"):
report_dict["community_note"] = summary_results["community_note"]
report_dict["is_access_blocked"] = is_access_blocked
report_dict["is_video"] = is_video
child_logger.info("Community note generated successfully")
else:
report_dict["success"] = False
Expand Down
32 changes: 31 additions & 1 deletion agents/openai_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
import json
from logger import StructuredLogger
import time
from tools import summarise_report_factory
from tools import summarise_report_factory, preprocess_inputs
from tools.preprocess_inputs import get_openai_content
import asyncio
from openai.types.chat import ChatCompletionMessageToolCall
from langfuse.decorators import observe
Expand Down Expand Up @@ -389,6 +390,23 @@ async def generate_note(
}
start_time = time.time() # Start the timer
cost_tracker = {"total_cost": 0, "cost_trace": []} # To store the cost details
preprocessed_response = await preprocess_inputs(
image_url=image_url, caption=caption, text=text
)
if not preprocessed_response.get("success"):
child_logger.error("Error in preprocessing inputs")
return {
"success": False,
"error": "Error in preprocessing inputs",
}
else:
child_logger.info("Preprocessing inputs successful")
screenshots_results = preprocessed_response.get("screenshots", [])
screenshots_content = get_openai_content(screenshots_results)
results = preprocessed_response.get("result", {})
is_access_blocked = results.get("is_access_blocked", False)
is_video = results.get("is_video", False)
intent = results.get("intent", "An error occurred, figure it out yourself")

if text is not None:
content = [
Expand All @@ -407,6 +425,16 @@ async def generate_note(
{"type": "image_url", "image_url": {"url": image_url}},
]

if screenshots_content:
content.extend(screenshots_content)

content.append(
{
"type": "text",
"text": f"User's likely intent: {intent}",
}
)

report_dict = await self.generate_report(content.copy())

duration = time.time() - start_time # Calculate duration
Expand All @@ -417,6 +445,8 @@ async def generate_note(
)
if summary_results.get("success"):
report_dict["community_note"] = summary_results["community_note"]
report_dict["is_access_blocked"] = is_access_blocked
report_dict["is_video"] = is_video
child_logger.info("Community note generated successfully")
else:
report_dict["success"] = False
Expand Down
4 changes: 2 additions & 2 deletions handlers/agent_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ async def get_outputs(
cn=chinese_note,
links=outputs.get("sources", None),
isControversial=outputs.get("isControversial", False),
isVideo=outputs.get("isVideo", False),
isAccessBlocked=outputs.get("isAccessBlocked", False),
isVideo=outputs.get("is_video", False),
isAccessBlocked=outputs.get("is_access_blocked", False),
report=outputs.get("report", None),
totalTimeTaken=outputs.get("total_time_taken", None),
agentTrace=outputs.get("agent_trace", None),
Expand Down
10 changes: 7 additions & 3 deletions prompts/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

Such content can be a text message or an image message. Image messages could, among others, be screenshots of their phone, pictures from their camera, or downloaded images. They could also be accompanied by captions.

In addition to what is submitted by the user, you will receive the following:
- screenshot of any webpages whose links are within the content, if the content submitted is a text
- the intent of the user, which you should craft your response to address

# Task
Your task is to:
1. Infer the intent of whoever sent the message in - what exactly about the message they want checked, and how to go about it. Note the distinction between the sender and the author. For example, if the message contains claims but no source, they are probably interested in the factuality of the claims. If the message doesn't contain verifiable claims, they are probably asking whether it's from a legitimate, trustworthy source. If it's about an offer, they are probably enquiring about the legitimacy of the offer. If it's a message claiming it's from the government, they want to know if it is really from the government.
2. Use the supplied tools to help you check the information. Focus primarily on credibility/legitimacy of the source/author and factuality of information/claims, if relevant. If not, rely on contextual clues. When searching, give more weight to reliable, well-known sources. Use searches and visit sites judiciously, you only get 5 of each.
3. Submit a report to conclude your task. Start with your findings and end with a thoughtful conclusion. Be helpful and address the intent identified in the first step.

1. Use the supplied tools to help you check the information. Focus primarily on credibility/legitimacy of the source/author and factuality of information/claims, if relevant. If not, rely on contextual clues. When searching, give more weight to reliable, well-known sources. Use searches and visit sites judiciously, you only get 5 of each.
2. Submit a report to conclude your task. Start with your findings and end with a thoughtful conclusion. Be helpful and address the intent identified in the first step.

# Guidelines for Report:
- Avoid references to the user, like "the user wants to know..." or the "the user sent in...", as these are obvious.
Expand Down
71 changes: 71 additions & 0 deletions prompts/preprocess_inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from langfuse import Langfuse

review_report_system_prompt = """# Context

You are an agent behind CheckMate, a product that allows users based in Singapore to send in dubious content they aren't sure whether to trust, and checks such content on their behalf.

Such content can be a text message or an image message. Image messages could, among others, be screenshots of their phone, pictures from their camera, or downloaded images. They could also be accompanied by captions.

# Task

Given these inputs:
- content submitted by the user, which could be an image or a text
- screenshots of any webpages whose links within the content

Your task is to:
1. Determine if the screenshots indicate that the content is a video, and/or access to the content is blocked.
2. Infer the intent of whoever sent the message in - what exactly about the message they want checked, and how to go about it. Note the distinction between the sender and the author. For example, if the message contains claims but no source, they are probably interested in the factuality of the claims. If the message doesn't contain verifiable claims, they are probably asking whether it's from a legitimate, trustworthy source. If it's about an offer, they are probably enquiring about the legitimacy of the offer. If it's a message claiming it's from the government, they want to know if it is really from the government."""

config = {
"model": "gpt-4o",
"temperature": 0.0,
"seed": 11,
"response_format": {
"type": "json_schema",
"json_schema": {
"name": "summarise_report",
"schema": {
"type": "object",
"properties": {
"reasoning": {
"type": "string",
"description": "The reasoning behind the intent you inferred from the message.",
},
"is_access_blocked": {
"type": "boolean",
"description": "True if the content or URL sent by the user to be checked is inaccessible/removed/blocked. An example is being led to a login page instead of post content.",
},
"is_video": {
"type": "boolean",
"description": "True if the content or URL sent by the user to be checked points to a video (e.g., YouTube, TikTok, Instagram Reels, Facebook videos).",
},
"intent": {
"type": "string",
"description": "What the user's intent is, e.g. to check whether this is a scam, to check if this is really from the government, to check the facts in this article, etc.",
},
},
"required": ["is_access_blocked", "is_video", "reasoning", "intent"],
"additionalProperties": False,
},
},
},
}


def compile_messages_array():
prompt_messages = [{"role": "system", "content": review_report_system_prompt}]
return prompt_messages


if __name__ == "__main__":
langfuse = Langfuse()
prompt_messages = compile_messages_array()
langfuse.create_prompt(
name="preprocess_inputs",
type="chat",
prompt=prompt_messages,
labels=["production", "development", "uat"], # directly promote to production
config=config, # optionally, add configs (e.g. model parameters or model tools) or tags
)
langfuse.get_prompt("preprocess_inputs", label="production")
print("Prompt created successfully.")
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ google-genai==0.3.0
pytest==8.3.4
pytest-asyncio==0.25.1
responses==0.25.3
google-cloud-firestore==2.20.0
google-cloud-firestore==2.20.0
urlextract==1.9.0
113 changes: 113 additions & 0 deletions tests/tools/test_preprocess_inputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import pytest
from unittest.mock import patch, AsyncMock
from tools import get_screenshots_from_text, preprocess_inputs


@pytest.mark.asyncio
async def test_get_screenshots_from_text_no_urls():
text = "This is a text without any URLs"
result = await get_screenshots_from_text(text)
assert result == []


@pytest.mark.asyncio
async def test_get_screenshots_from_text_single_url():
with patch(
"tools.preprocess_inputs.get_website_screenshot", new_callable=AsyncMock
) as mock_screenshot:
mock_screenshot.return_value = {
"success": True,
"result": "https://example.com/screenshot1.png",
}

text = "Check out https://example.com"
result = await get_screenshots_from_text(text)

assert len(result) == 1
assert result[0]["url"] == "https://example.com"
assert result[0]["image_url"] == "https://example.com/screenshot1.png"

mock_screenshot.assert_called_once_with("https://example.com")


@pytest.mark.asyncio
async def test_get_screenshots_from_text_multiple_urls():
with patch(
"tools.preprocess_inputs.get_website_screenshot", new_callable=AsyncMock
) as mock_screenshot:
mock_screenshot.side_effect = [
{"success": True, "result": "https://example.com/screenshot1.png"},
{"success": True, "result": "https://example.com/screenshot2.png"},
]

text = "Check these: https://example1.com and https://example2.com"
result = await get_screenshots_from_text(text)

assert len(result) == 2
assert result[0]["url"] == "https://example1.com"
assert result[0]["image_url"] == "https://example.com/screenshot1.png"
assert result[1]["url"] == "https://example2.com"
assert result[1]["image_url"] == "https://example.com/screenshot2.png"

assert mock_screenshot.call_count == 2


@pytest.mark.asyncio
async def test_get_screenshots_from_text_failed_screenshot():
with patch(
"tools.preprocess_inputs.get_website_screenshot", new_callable=AsyncMock
) as mock_screenshot:
mock_screenshot.return_value = {
"success": False,
"error": "Failed to get screenshot",
}

text = "Check out https://example.com"
result = await get_screenshots_from_text(text)

assert len(result) == 0
mock_screenshot.assert_called_once_with("https://example.com")


@pytest.mark.asyncio
async def test_multiple_screenshots():
text = """
Check these links:
https://example.com
https://google.com
https://github.com
"""
result = await get_screenshots_from_text(text)

assert len(result) > 0
for screenshot in result:
assert "url" in screenshot
assert "image_url" in screenshot
assert isinstance(screenshot["url"], str)
assert isinstance(screenshot["image_url"], str)
assert screenshot["url"] in text


@pytest.mark.asyncio
async def test_preprocess_inputs():
text = """
Check these links:
https://example.com
https://google.com
https://github.com
"""
result = await preprocess_inputs(None, None, text)

# We don't know exactly how many will succeed, but we should get some results
assert len(result) > 0

# Verify structure of results
assert "result" in result
assert "screenshots" in result

result_json = result["result"]

assert "is_access_blocked" in result_json
assert "is_video" in result_json
assert "reasoning" in result_json
assert "intent" in result_json
Loading