Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 100 additions & 46 deletions application/api/answer/services/stream_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
self._required_tool_actions: Optional[Dict[str, Set[Optional[str]]]] = None
self.compressed_summary: Optional[str] = None
self.compressed_summary_tokens: int = 0
self._agent_data: Optional[Dict[str, Any]] = None

def initialize(self):
"""Initialize all required components for processing"""
Expand Down Expand Up @@ -359,22 +360,29 @@
return data

def _configure_source(self):
"""Configure the source based on agent data"""
api_key = self.data.get("api_key") or self.agent_key
"""Configure the source based on agent data.

if api_key:
agent_data = self._get_data_from_api_key(api_key)
The literal string ``"default"`` is a placeholder meaning "no
ingested source" and is normalized to an empty source so that no
retrieval is attempted.
"""
if self._agent_data:
agent_data = self._agent_data

if agent_data.get("sources") and len(agent_data["sources"]) > 0:
source_ids = [
source["id"] for source in agent_data["sources"] if source.get("id")
source["id"]
for source in agent_data["sources"]
if source.get("id") and source["id"] != "default"
]
if source_ids:
self.source = {"active_docs": source_ids}
else:
self.source = {}
self.all_sources = agent_data["sources"]
elif agent_data.get("source"):
self.all_sources = [
s for s in agent_data["sources"] if s.get("id") != "default"
]
elif agent_data.get("source") and agent_data["source"] != "default":
self.source = {"active_docs": agent_data["source"]}
self.all_sources = [
{
Expand All @@ -387,11 +395,24 @@
self.all_sources = []
return
if "active_docs" in self.data:
self.source = {"active_docs": self.data["active_docs"]}
active_docs = self.data["active_docs"]
if active_docs and active_docs != "default":
self.source = {"active_docs": active_docs}
else:
self.source = {}
return
self.source = {}
self.all_sources = []

def _has_active_docs(self) -> bool:
"""Return True if a real document source is configured for retrieval."""
active_docs = self.source.get("active_docs") if self.source else None
if not active_docs:
return False
if active_docs == "default":
return False
return True

def _resolve_agent_id(self) -> Optional[str]:
"""Resolve agent_id from request, then fall back to conversation context."""
request_agent_id = self.data.get("agent_id")
Expand Down Expand Up @@ -433,48 +454,39 @@
effective_key = self.data.get("api_key") or self.agent_key

if effective_key:
data_key = self._get_data_from_api_key(effective_key)
if data_key.get("_id"):
self.agent_id = str(data_key.get("_id"))
self._agent_data = self._get_data_from_api_key(effective_key)
if self._agent_data.get("_id"):
self.agent_id = str(self._agent_data.get("_id"))

self.agent_config.update(
{
"prompt_id": data_key.get("prompt_id", "default"),
"agent_type": data_key.get("agent_type", settings.AGENT_NAME),
"prompt_id": self._agent_data.get("prompt_id", "default"),
"agent_type": self._agent_data.get("agent_type", settings.AGENT_NAME),
"user_api_key": effective_key,
"json_schema": data_key.get("json_schema"),
"default_model_id": data_key.get("default_model_id", ""),
"models": data_key.get("models", []),
"json_schema": self._agent_data.get("json_schema"),
"default_model_id": self._agent_data.get("default_model_id", ""),
"models": self._agent_data.get("models", []),
"allow_system_prompt_override": self._agent_data.get(
"allow_system_prompt_override", False
),
}
)

# Set identity context
if self.data.get("api_key"):
# External API key: use the key owner's identity
self.initial_user_id = data_key.get("user")
self.decoded_token = {"sub": data_key.get("user")}
self.initial_user_id = self._agent_data.get("user")
self.decoded_token = {"sub": self._agent_data.get("user")}
elif self.is_shared_usage:
# Shared agent: keep the caller's identity
pass
else:
# Owner using their own agent
self.decoded_token = {"sub": data_key.get("user")}

if data_key.get("source"):
self.source = {"active_docs": data_key["source"]}
if data_key.get("workflow"):
self.agent_config["workflow"] = data_key["workflow"]
self.agent_config["workflow_owner"] = data_key.get("user")
if data_key.get("retriever"):
self.retriever_config["retriever_name"] = data_key["retriever"]
if data_key.get("chunks") is not None:
try:
self.retriever_config["chunks"] = int(data_key["chunks"])
except (ValueError, TypeError):
logger.warning(
f"Invalid chunks value: {data_key['chunks']}, using default value 2"
)
self.retriever_config["chunks"] = 2
self.decoded_token = {"sub": self._agent_data.get("user")}

if self._agent_data.get("workflow"):
self.agent_config["workflow"] = self._agent_data["workflow"]
self.agent_config["workflow_owner"] = self._agent_data.get("user")
else:
# No API key — default/workflow configuration
agent_type = settings.AGENT_NAME
Expand All @@ -497,14 +509,45 @@
)

def _configure_retriever(self):
"""Assemble retriever config with precedence: request > agent > default."""
doc_token_limit = calculate_doc_token_budget(model_id=self.model_id)

# Start with defaults
retriever_name = "classic"
chunks = 2

# Layer agent-level config (if present)
if self._agent_data:
if self._agent_data.get("retriever"):
retriever_name = self._agent_data["retriever"]
if self._agent_data.get("chunks") is not None:
try:
chunks = int(self._agent_data["chunks"])
except (ValueError, TypeError):
logger.warning(
f"Invalid agent chunks value: {self._agent_data['chunks']}, "
"using default value 2"
)

# Explicit request values win over agent config
if "retriever" in self.data:
retriever_name = self.data["retriever"]
if "chunks" in self.data:
try:
chunks = int(self.data["chunks"])
except (ValueError, TypeError):
logger.warning(
f"Invalid request chunks value: {self.data['chunks']}, "
"using default value 2"

Check failure

Code scanning / CodeQL

Clear-text logging of sensitive information High

This expression logs
sensitive data (password)
as clear text.
This expression logs
sensitive data (password)
as clear text.
Comment thread
dartpain marked this conversation as resolved.
Dismissed
)

self.retriever_config = {
"retriever_name": self.data.get("retriever", "classic"),
"chunks": int(self.data.get("chunks", 2)),
"retriever_name": retriever_name,
"chunks": chunks,
"doc_token_limit": doc_token_limit,
}

# isNoneDoc without an API key forces no retrieval
api_key = self.data.get("api_key") or self.agent_key
if not api_key and "isNoneDoc" in self.data and self.data["isNoneDoc"]:
self.retriever_config["chunks"] = 0
Expand All @@ -528,6 +571,9 @@
if self.data.get("isNoneDoc", False) and not self.agent_id:
logger.info("Pre-fetch skipped: isNoneDoc=True")
return None, None
if not self._has_active_docs():
logger.info("Pre-fetch skipped: no active docs configured")
return None, None
try:
retriever = self.create_retriever()
logger.info(
Expand Down Expand Up @@ -910,15 +956,23 @@
raw_prompt = get_prompt(prompt_id, self.prompts_collection)
self._prompt_content = raw_prompt

rendered_prompt = self.prompt_renderer.render_prompt(
prompt_content=raw_prompt,
user_id=self.initial_user_id,
request_id=self.data.get("request_id"),
passthrough_data=self.data.get("passthrough"),
docs=docs,
docs_together=docs_together,
tools_data=tools_data,
)
# Allow API callers to override the system prompt when the agent
# has opted in via allow_system_prompt_override.
if (
self.agent_config.get("allow_system_prompt_override", False)
and self.data.get("system_prompt_override")
):
rendered_prompt = self.data["system_prompt_override"]
else:
rendered_prompt = self.prompt_renderer.render_prompt(
prompt_content=raw_prompt,
user_id=self.initial_user_id,
request_id=self.data.get("request_id"),
passthrough_data=self.data.get("passthrough"),
docs=docs,
docs_together=docs_together,
tools_data=tools_data,
)

provider = (
get_provider_from_model_id(self.model_id)
Expand Down
30 changes: 30 additions & 0 deletions application/api/user/agents/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
"token_limit",
"limited_request_mode",
"request_limit",
"allow_system_prompt_override",
"createdAt",
"updatedAt",
"lastUsedAt",
Expand All @@ -96,6 +97,7 @@
"token_limit",
"limited_request_mode",
"request_limit",
"allow_system_prompt_override",
"createdAt",
"updatedAt",
"lastUsedAt",
Expand Down Expand Up @@ -220,6 +222,12 @@ def build_agent_document(
base_doc["request_limit"] = int(
data.get("request_limit", settings.DEFAULT_AGENT_LIMITS["request_limit"])
)
if "allow_system_prompt_override" in allowed_fields:
base_doc["allow_system_prompt_override"] = (
data.get("allow_system_prompt_override") == "True"
if isinstance(data.get("allow_system_prompt_override"), str)
else bool(data.get("allow_system_prompt_override", False))
)
return {k: v for k, v in base_doc.items() if k in allowed_fields}


Expand Down Expand Up @@ -292,6 +300,9 @@ def get(self):
"default_model_id": agent.get("default_model_id", ""),
"folder_id": agent.get("folder_id"),
"workflow": agent.get("workflow"),
"allow_system_prompt_override": agent.get(
"allow_system_prompt_override", False
),
}
return make_response(jsonify(data), 200)
except Exception as e:
Expand Down Expand Up @@ -373,6 +384,9 @@ def get(self):
"default_model_id": agent.get("default_model_id", ""),
"folder_id": agent.get("folder_id"),
"workflow": agent.get("workflow"),
"allow_system_prompt_override": agent.get(
"allow_system_prompt_override", False
),
}
for agent in agents
if "source" in agent
Expand Down Expand Up @@ -450,6 +464,10 @@ class CreateAgent(Resource):
"folder_id": fields.String(
required=False, description="Folder ID to organize the agent"
),
"allow_system_prompt_override": fields.Boolean(
required=False,
description="Allow API callers to override the system prompt via the v1 endpoint",
),
},
)

Expand Down Expand Up @@ -674,6 +692,10 @@ class UpdateAgent(Resource):
"folder_id": fields.String(
required=False, description="Folder ID to organize the agent"
),
"allow_system_prompt_override": fields.Boolean(
required=False,
description="Allow API callers to override the system prompt via the v1 endpoint",
),
},
)

Expand Down Expand Up @@ -765,6 +787,7 @@ def put(self, agent_id):
"default_model_id",
"folder_id",
"workflow",
"allow_system_prompt_override",
]

for field in allowed_fields:
Expand Down Expand Up @@ -983,6 +1006,13 @@ def put(self, agent_id):
if workflow_error:
return workflow_error
update_fields[field] = workflow_id
elif field == "allow_system_prompt_override":
raw_value = data.get("allow_system_prompt_override", False)
update_fields[field] = (
raw_value == "True"
if isinstance(raw_value, str)
else bool(raw_value)
)
else:
value = data[field]
if field in ["name", "description", "prompt_id", "agent_type"]:
Expand Down
22 changes: 20 additions & 2 deletions application/api/v1/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,18 @@ def chat_completions():
if usage_error:
return usage_error

should_save_conversation = bool(internal_data.get("save_conversation", False))

if is_stream:
return Response(
_stream_response(
helper, question, agent, processor, model_name, continuation
helper,
question,
agent,
processor,
model_name,
continuation,
should_save_conversation,
),
mimetype="text/event-stream",
headers={
Expand All @@ -151,7 +159,13 @@ def chat_completions():
)
else:
return _non_stream_response(
helper, question, agent, processor, model_name, continuation
helper,
question,
agent,
processor,
model_name,
continuation,
should_save_conversation,
)

except ValueError as e:
Expand Down Expand Up @@ -181,6 +195,7 @@ def _stream_response(
processor: StreamProcessor,
model_name: str,
continuation: Optional[Dict],
should_save_conversation: bool,
) -> Generator[str, None, None]:
"""Generate translated SSE chunks for streaming response."""
completion_id = f"chatcmpl-{int(time.time())}"
Expand All @@ -193,6 +208,7 @@ def _stream_response(
decoded_token=processor.decoded_token,
agent_id=processor.agent_id,
model_id=processor.model_id,
should_save_conversation=should_save_conversation,
_continuation=continuation,
)

Expand Down Expand Up @@ -225,6 +241,7 @@ def _non_stream_response(
processor: StreamProcessor,
model_name: str,
continuation: Optional[Dict],
should_save_conversation: bool,
) -> Response:
"""Collect full response and return as single JSON."""
stream = helper.complete_stream(
Expand All @@ -235,6 +252,7 @@ def _non_stream_response(
decoded_token=processor.decoded_token,
agent_id=processor.agent_id,
model_id=processor.model_id,
should_save_conversation=should_save_conversation,
_continuation=continuation,
)

Expand Down
Loading
Loading