From ab00b29e68429b8f7764323bade59a0e65733897 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 29 Jan 2026 14:59:32 -0500 Subject: [PATCH 01/13] Add `LLM_PROVIDER=gemini_live` as a supported value for examples. Note that Gemini Live doesn't work yet with PIpecat Flows. --- examples/food_ordering.py | 4 ++-- examples/utils.py | 6 ++++++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/examples/food_ordering.py b/examples/food_ordering.py index 21f908d6..ea7c829f 100644 --- a/examples/food_ordering.py +++ b/examples/food_ordering.py @@ -367,10 +367,10 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): pipeline = Pipeline( [ transport.input(), - stt, + stt, # comment out if using LLM_PROVIDER="gemini_live" context_aggregator.user(), llm, - tts, + tts, # comment out if using LLM_PROVIDER="gemini_live" transport.output(), context_aggregator.assistant(), ] diff --git a/examples/utils.py b/examples/utils.py index 3e962ae0..cb1460b9 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -33,6 +33,7 @@ def create_llm(provider: str = None, model: str = None) -> Any: - google: Requires GOOGLE_API_KEY - aws: Uses AWS default credential chain (SSO, environment variables, or IAM roles) Optionally set AWS_REGION (defaults to us-west-2) + - gemini_live: Requires GOOGLE_API_KEY Usage: # Use default provider (from LLM_PROVIDER env var, defaults to OpenAI) @@ -75,6 +76,11 @@ def create_llm(provider: str = None, model: str = None) -> Any: "default_model": "us.anthropic.claude-haiku-4-5-20251001-v1:0", "region": "us-west-2", }, + "gemini_live": { + "service": "pipecat.services.google.gemini_live.llm.GeminiLiveLLMService", + "api_key_env": "GOOGLE_API_KEY", + "default_model": "models/gemini-2.5-flash-native-audio-preview-12-2025", + } } config = configs.get(provider) From 9915ab79c96e4bd44f3a1e50bace50ed10c0fc46 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 2 Feb 2026 12:12:28 -0500 Subject: [PATCH 02/13] Tweak food ordering example's system prompt to work better with Gemini Live --- examples/food_ordering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/food_ordering.py b/examples/food_ordering.py index ea7c829f..ad3b4118 100644 --- a/examples/food_ordering.py +++ b/examples/food_ordering.py @@ -139,7 +139,7 @@ async def choose_sushi(args: FlowArgs, flow_manager: FlowManager) -> tuple[None, role_messages=[ { "role": "system", - "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", + "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. When you've decided to call a function to progress the conversation, do not also respond; the function call by itself is enough. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", } ], task_messages=[ From e02687433ab5bc43322ecc42b41bda8792dc4e6b Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Mon, 2 Feb 2026 15:09:52 -0500 Subject: [PATCH 03/13] Carry over "deactivated" functions from one node to the next, so that LLMs are still aware of functions called or simply referenced in conversation history. With Gemini Live in particular, if historical context text messages even mention missing functions, the API may return errors when connecting and seeding conversation history. --- src/pipecat_flows/manager.py | 148 ++++++++- tests/test_context_strategies.py | 551 +++++++++++++++++++++++++++++++ tests/test_manager.py | 23 +- 3 files changed, 701 insertions(+), 21 deletions(-) diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index a8c61856..225f8670 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -154,6 +154,18 @@ def __init__( self._transport = transport self._global_functions = global_functions or [] + # Cache global function names as an optimization when detecting deactivated functions later + self._global_function_names: Set[str] = set() + for func in self._global_functions: + if callable(func): + wrapper = FlowsDirectFunctionWrapper(function=func) + self._global_function_names.add(wrapper.name) + elif isinstance(func, FlowsFunctionSchema): + self._global_function_names.add(func.name) + else: + name = create_adapter(llm, context_aggregator).get_function_name(func) + self._global_function_names.add(name) + # Set up static or dynamic mode if flow_config: warnings.warn( @@ -170,7 +182,7 @@ def __init__( logger.debug("Initialized in dynamic mode") self._state: Dict[str, Any] = {} # Internal state storage - self._current_functions: Set[str] = set() # Track registered functions + self._current_functions: Dict[str, FlowsFunctionSchema] = {} # Track registered functions self._current_node: Optional[str] = None self._showed_deprecation_warning_for_transition_fields = False @@ -645,15 +657,60 @@ def _lookup_function(self, func_name: str) -> Callable: raise FlowError(error_message) + def _create_deactivated_function_schema( + self, original_schema: FlowsFunctionSchema + ) -> FlowsFunctionSchema: + """Create a function schema for deactivated functions with a dummy handler. + + Creates a copy of the original function schema but replaces the handler + with a dummy that returns an error message indicating the function is + deactivated. This is used when carrying over functions from a previous + node during APPEND context strategy transitions. + + Args: + original_schema: The original FlowsFunctionSchema to create a + deactivated version of. + + Returns: + A new FlowsFunctionSchema with the same name/description/properties + but with a dummy handler that returns a deactivation message. + """ + + async def deactivated_handler(args: FlowArgs, flow_manager: "FlowManager") -> FlowResult: + """Dummy handler for deactivated functions.""" + return { + "status": "error", + "error": ( + f"Function '{original_schema.name}' is deactivated and should not be called. " + f"Please use the currently active functions instead." + ), + } + + # Only add prefix if not already deactivated (idempotent) + description = original_schema.description + if not description.startswith("[DEACTIVATED]"): + description = f"[DEACTIVATED] {description}" + + return FlowsFunctionSchema( + name=original_schema.name, + description=description, + properties=original_schema.properties, + required=original_schema.required, + handler=deactivated_handler, + cancel_on_interruption=True, + transition_to=None, + transition_callback=None, + ) + async def _register_function( self, name: str, - new_functions: Set[str], handler: Optional[Callable | FlowsDirectFunctionWrapper], transition_to: Optional[str] = None, transition_callback: Optional[Callable] = None, *, cancel_on_interruption: bool = True, + force: bool = False, ) -> None: """Register a function with the LLM if not already registered. @@ -663,14 +720,18 @@ async def _register_function( If string starts with '__function__:', extracts the function name after the prefix. transition_to: Optional node to transition to (static flows) transition_callback: Optional transition callback (dynamic flows) - new_functions: Set to track newly registered functions for this node cancel_on_interruption: Whether to cancel this function call when an interruption occurs. Defaults to True. + force: If True, register even if function already exists (for re-registering + deactivated functions with new handlers). Defaults to False. Raises: FlowError: If function registration fails """ - if name not in self._current_functions: + should_register = force or ( + name not in self._current_functions.keys() + ) + if should_register: try: # Handle special token format (e.g. "__function__:function_name") if isinstance(handler, str) and handler.startswith("__function__:"): @@ -689,7 +750,6 @@ async def _register_function( cancel_on_interruption=cancel_on_interruption, ) - new_functions.add(name) logger.debug(f"Registered function: {name}") except Exception as e: logger.error(f"Failed to register function {name}: {str(e)}") @@ -790,7 +850,7 @@ async def _set_node(self, node_id: str, node_config: NodeConfig) -> None: # Register functions and prepare tools tools: List[FlowsFunctionSchema | FlowsDirectFunctionWrapper] = [] - new_functions: Set[str] = set() + new_functions: Dict[str, FlowsFunctionSchema] = {} # Get functions list with default empty list if not provided functions_list = node_config.get("functions", []) @@ -798,17 +858,20 @@ async def _set_node(self, node_id: str, node_config: NodeConfig) -> None: # Mix in global functions that should be available at every node functions_list = self._global_functions + functions_list - async def register_function_schema(schema: FlowsFunctionSchema): + async def register_function_schema(schema: FlowsFunctionSchema, is_deactivated: bool = False): """Helper to register a single FlowsFunctionSchema.""" tools.append(schema) await self._register_function( name=schema.name, - new_functions=new_functions, handler=schema.handler, transition_to=schema.transition_to, transition_callback=schema.transition_callback, cancel_on_interruption=schema.cancel_on_interruption, + force=is_deactivated, # Force re-registration for deactivated functions ) + # Store the schema for potential future deactivation (including deactivated ones + # so they persist across multiple transitions) + new_functions[schema.name] = schema async def register_direct_function(func): """Helper to register a single direct function.""" @@ -816,12 +879,21 @@ async def register_direct_function(func): tools.append(direct_function) await self._register_function( name=direct_function.name, - new_functions=new_functions, handler=direct_function, transition_to=None, transition_callback=None, cancel_on_interruption=direct_function.cancel_on_interruption, ) + # Store a FlowsFunctionSchema representation for potential future deactivation + schema = FlowsFunctionSchema( + name=direct_function.name, + description=direct_function.description, + properties=direct_function.properties, + required=direct_function.required, + handler=None, # Will be replaced if deactivated + cancel_on_interruption=direct_function.cancel_on_interruption, + ) + new_functions[direct_function.name] = schema for func_config in functions_list: # Handle direct functions @@ -847,6 +919,41 @@ async def register_direct_function(func): ) await register_function_schema(schema) + # Determine effective context strategy for this transition + effective_strategy = ( + node_config.get("context_strategy") or self._context_strategy + ) + + # For APPEND strategy, carry over deactivated functions from + # the previous node, since there may be context messages that call + # or reference them. This helps LLMs better understand their + # context, and also prevents errors: Gemini Live is particularly + # sensitive, erroring out when it has context messages (even text + # messages) referring to missing functions. + deactivated_function_names: List[str] = [] + if ( + self._current_node is not None + and effective_strategy.strategy == ContextStrategy.APPEND + ): + # Find functions from previous node that aren't in the new node + # (excluding global functions, which are always present) + new_function_names = set(new_functions.keys()) + for prev_name, prev_schema in self._current_functions.items(): + if ( + prev_name not in new_function_names + and prev_name not in self._global_function_names + ): + # Create deactivated stub with same schema but dummy handler + deactivated_schema = self._create_deactivated_function_schema(prev_schema) + await register_function_schema(deactivated_schema, is_deactivated=True) + deactivated_function_names.append(prev_name) + + if deactivated_function_names: + logger.debug( + f"Carrying over {len(deactivated_function_names)} deactivated functions: " + f"{', '.join(deactivated_function_names)}" + ) + # Create ToolsSchema with standard function schemas standard_functions = [] for tool in tools: @@ -858,12 +965,25 @@ async def register_direct_function(func): standard_functions, original_configs=functions_list ) + # Prepare task messages, injecting deactivated function warning if needed + task_messages = list(node_config["task_messages"]) # Copy to avoid mutating original + if deactivated_function_names: + deactivated_warning = { + "role": "system", + "content": ( + f"IMPORTANT: The following functions are deactivated and should NOT be " + f"called: {', '.join(deactivated_function_names)}. If you call them, they " + f"will return an error." + ), + } + task_messages.insert(0, deactivated_warning) + # Update LLM context await self._update_llm_context( role_messages=node_config.get("role_messages"), - task_messages=node_config["task_messages"], + task_messages=task_messages, functions=formatted_tools, - strategy=node_config.get("context_strategy"), + strategy=effective_strategy, ) logger.debug("Updated LLM context") @@ -904,7 +1024,7 @@ async def _update_llm_context( role_messages: Optional[List[dict]], task_messages: List[dict], functions: List[dict], - strategy: Optional[ContextStrategyConfig] = None, + strategy: ContextStrategyConfig, ) -> None: """Update LLM context with new messages and functions. @@ -912,7 +1032,7 @@ async def _update_llm_context( role_messages: Optional role messages to add to context. task_messages: Task messages to add to context. functions: New functions to make available. - strategy: Optional context update configuration. + strategy: Context strategy configuration for this transition. Raises: FlowError: If context update fails. @@ -927,7 +1047,7 @@ async def _update_llm_context( if role_messages: messages.extend(role_messages) - update_config = strategy or self._context_strategy + update_config = strategy if ( update_config.strategy == ContextStrategy.RESET_WITH_SUMMARY diff --git a/tests/test_context_strategies.py b/tests/test_context_strategies.py index a9b807d0..f9105912 100644 --- a/tests/test_context_strategies.py +++ b/tests/test_context_strategies.py @@ -308,3 +308,554 @@ async def test_context_structure_after_summary(self): self.assertEqual( messages_frame.messages[1]["content"], new_node["task_messages"][0]["content"] ) + + +class TestDeactivatedFunctions(unittest.IsolatedAsyncioTestCase): + """Test suite for deactivated function carry-over during APPEND transitions. + + Tests functionality including: + - Carrying over deactivated functions with dummy handlers + - Injecting warning task messages + - Skipping deactivation for RESET strategies + - Global functions are not deactivated + """ + + async def asyncSetUp(self): + """Set up test fixtures before each test.""" + self.mock_task = AsyncMock() + self.mock_task.event_handler = Mock() + self.mock_task.set_reached_downstream_filter = Mock() + + self.mock_llm = OpenAILLMService(api_key="") + self.mock_llm.register_function = MagicMock() + + self.mock_context_aggregator = MagicMock() + self.mock_context_aggregator.user = MagicMock() + self.mock_context_aggregator.user.return_value = MagicMock() + self.mock_context_aggregator.user.return_value._context = MagicMock() + + async def test_deactivated_functions_carried_over_on_append(self): + """Test that functions from previous node are carried over as deactivated on APPEND.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + # First node with function_a and function_b + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Function A", + "parameters": {"type": "object", "properties": {}}, + }, + }, + { + "type": "function", + "function": { + "name": "function_b", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Function B", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + + # Verify both functions are in current_functions + self.assertIn("function_a", flow_manager._current_functions) + self.assertIn("function_b", flow_manager._current_functions) + + self.mock_task.queue_frames.reset_mock() + self.mock_llm.register_function.reset_mock() + + # Second node with only function_c (function_a and function_b should be deactivated) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_c", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Function C", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("second", second_node) + + # Verify that function_a and function_b were registered as deactivated + registered_function_names = [ + call[0][0] for call in self.mock_llm.register_function.call_args_list + ] + self.assertIn("function_a", registered_function_names) + self.assertIn("function_b", registered_function_names) + self.assertIn("function_c", registered_function_names) + + async def test_deactivated_warning_message_injected(self): + """Test that warning message is injected when functions are deactivated.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + # First node with a function + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "old_function", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Old function", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.mock_task.queue_frames.reset_mock() + + # Second node without the old function + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + } + + await flow_manager._set_node("second", second_node) + + # Verify warning message was injected + queue_frames_call = self.mock_task.queue_frames.call_args_list[0] + frames = queue_frames_call[0][0] + messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + + # Check that warning message is present + warning_found = any( + "old_function" in str(m.get("content", "")) + and "deactivated" in str(m.get("content", "")).lower() + for m in messages_frame.messages + ) + self.assertTrue(warning_found, "Warning message about deactivated functions not found") + + async def test_no_deactivation_on_reset_strategy(self): + """Test that functions are not carried over as deactivated on RESET strategy.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.RESET), + ) + await flow_manager.initialize() + + # First node with a function + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "old_function", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Old function", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.mock_llm.register_function.reset_mock() + + # Second node without the old function + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "new_function", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "New function", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("second", second_node) + + # Verify that old_function was NOT registered in second transition + registered_function_names = [ + call[0][0] for call in self.mock_llm.register_function.call_args_list + ] + self.assertNotIn("old_function", registered_function_names) + self.assertIn("new_function", registered_function_names) + + async def test_global_functions_not_deactivated(self): + """Test that global functions are never carried over as deactivated.""" + from pipecat_flows.types import FlowsFunctionSchema + + global_func = FlowsFunctionSchema( + name="global_function", + description="A global function", + properties={}, + required=[], + handler=AsyncMock(return_value={"status": "success"}), + ) + + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + global_functions=[global_func], + ) + await flow_manager.initialize() + + # First node with an additional function + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "node_function", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Node-specific function", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.mock_task.queue_frames.reset_mock() + + # Second node without node_function (but global_function should still be active) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + } + + await flow_manager._set_node("second", second_node) + + # Verify warning message mentions node_function but NOT global_function + queue_frames_call = self.mock_task.queue_frames.call_args_list[0] + frames = queue_frames_call[0][0] + messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + + for msg in messages_frame.messages: + content = str(msg.get("content", "")) + if "deactivated" in content.lower(): + self.assertIn("node_function", content) + self.assertNotIn("global_function", content) + + async def test_deactivated_function_returns_error(self): + """Test that calling a deactivated function returns an error result.""" + from pipecat_flows.types import FlowsFunctionSchema + + # Create the schema directly to test the helper + original_schema = FlowsFunctionSchema( + name="test_function", + description="Test function", + properties={"param": {"type": "string"}}, + required=["param"], + handler=AsyncMock(return_value={"status": "success"}), + ) + + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + ) + + deactivated_schema = flow_manager._create_deactivated_function_schema(original_schema) + + # Verify schema properties are preserved (with updated description) + self.assertEqual(deactivated_schema.name, original_schema.name) + self.assertTrue(deactivated_schema.description.startswith("[DEACTIVATED]")) + self.assertIn(original_schema.description, deactivated_schema.description) + self.assertEqual(deactivated_schema.properties, original_schema.properties) + self.assertEqual(deactivated_schema.required, original_schema.required) + + # Verify deactivated handler returns error (modern signature with flow_manager) + result = await deactivated_schema.handler({}, flow_manager) + self.assertEqual(result["status"], "error") + self.assertIn("deactivated", result["error"]) + self.assertIn("test_function", result["error"]) + + async def test_node_level_reset_prevents_deactivation(self): + """Test that node-level RESET strategy prevents function deactivation.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + # First node with a function + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "old_function", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Old function", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.mock_llm.register_function.reset_mock() + + # Second node with RESET strategy override + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + "context_strategy": ContextStrategyConfig(strategy=ContextStrategy.RESET), + } + + await flow_manager._set_node("second", second_node) + + # Verify that old_function was NOT registered (no deactivation on RESET) + registered_function_names = [ + call[0][0] for call in self.mock_llm.register_function.call_args_list + ] + self.assertNotIn("old_function", registered_function_names) + + async def test_deactivated_functions_persist_across_multiple_transitions(self): + """Test that deactivated functions persist through multiple node transitions.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + # First node with function_a + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Function A", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.mock_task.queue_frames.reset_mock() + self.mock_llm.register_function.reset_mock() + + # Second node with function_b only (function_a becomes deactivated) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_b", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Function B", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("second", second_node) + + # Verify function_a was deactivated + queue_frames_call = self.mock_task.queue_frames.call_args_list[0] + frames = queue_frames_call[0][0] + messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + warning_found = any( + "function_a" in str(msg.get("content", "")) and "deactivated" in str(msg.get("content", "")).lower() + for msg in messages_frame.messages + ) + self.assertTrue(warning_found, "function_a should be marked as deactivated") + + self.mock_task.queue_frames.reset_mock() + self.mock_llm.register_function.reset_mock() + + # Third node with function_c only (function_a AND function_b should be deactivated) + third_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Third task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_c", + "handler": AsyncMock(return_value={"status": "success"}), + "description": "Function C", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("third", third_node) + + # Verify both function_a and function_b are mentioned as deactivated + queue_frames_call = self.mock_task.queue_frames.call_args_list[0] + frames = queue_frames_call[0][0] + messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + + warning_content = "" + for msg in messages_frame.messages: + content = str(msg.get("content", "")) + if "deactivated" in content.lower(): + warning_content = content + break + + self.assertIn("function_a", warning_content, "function_a should persist as deactivated") + self.assertIn("function_b", warning_content, "function_b should now be deactivated") + + async def test_deactivated_prefix_is_idempotent(self): + """Test that [DEACTIVATED] prefix is not added multiple times.""" + from pipecat_flows.types import FlowsFunctionSchema + + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + ) + + # Create an already-deactivated schema + already_deactivated = FlowsFunctionSchema( + name="test_function", + description="[DEACTIVATED] Original description", + properties={}, + required=[], + handler=AsyncMock(return_value={"status": "error", "error": "deactivated"}), + ) + + # Deactivate it again + doubly_deactivated = flow_manager._create_deactivated_function_schema(already_deactivated) + + # Verify description only has one [DEACTIVATED] prefix + self.assertEqual( + doubly_deactivated.description, + "[DEACTIVATED] Original description", + "Should not add multiple [DEACTIVATED] prefixes", + ) + self.assertEqual( + doubly_deactivated.description.count("[DEACTIVATED]"), + 1, + "Should have exactly one [DEACTIVATED] prefix", + ) + + async def test_deactivated_function_reactivated_when_present_in_new_node(self): + """Test that a previously deactivated function becomes active again when included in new node.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + # Create a handler that we can track + function_a_handler = AsyncMock(return_value={"status": "success"}) + + # First node with function_a + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": function_a_handler, + "description": "Function A", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.mock_task.queue_frames.reset_mock() + + # Second node without function_a (it becomes deactivated) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + } + + await flow_manager._set_node("second", second_node) + + # Verify function_a is in current_functions as deactivated + self.assertIn("function_a", flow_manager._current_functions) + self.assertTrue( + flow_manager._current_functions["function_a"].description.startswith("[DEACTIVATED]") + ) + + self.mock_task.queue_frames.reset_mock() + + # Third node brings back function_a + third_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Third task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": function_a_handler, + "description": "Function A restored", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("third", third_node) + + # Verify function_a is now active (not deactivated) + self.assertIn("function_a", flow_manager._current_functions) + self.assertFalse( + flow_manager._current_functions["function_a"].description.startswith("[DEACTIVATED]"), + "function_a should be active again, not deactivated", + ) + + # Verify no deactivation warning was issued for function_a + queue_frames_call = self.mock_task.queue_frames.call_args_list[0] + frames = queue_frames_call[0][0] + messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + + for msg in messages_frame.messages: + content = str(msg.get("content", "")) + if "deactivated" in content.lower(): + self.assertNotIn( + "function_a", + content, + "function_a should not be in deactivation warning since it's now active", + ) diff --git a/tests/test_manager.py b/tests/test_manager.py index 70b5969a..b4ad0572 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -290,7 +290,13 @@ async def result_callback(result, properties=None): # Test new style callback await flow_manager.set_node_from_config(new_style_node) - func = flow_manager._llm.register_function.call_args[0][1] + # Find the registered function by name (not using call_args since deactivated functions may be registered after) + func = None + for call in flow_manager._llm.register_function.call_args_list: + if call[0][0] == "new_style_function": + func = call[0][1] + break + self.assertIsNotNone(func, "new_style_function was not registered") # Reset context_updated callback context_updated_callback = None @@ -338,7 +344,7 @@ async def test_node_validation(self): await flow_manager.set_node_from_config(valid_config) self.assertEqual(flow_manager._current_node, "test") - self.assertEqual(flow_manager._current_functions, set()) + self.assertEqual(flow_manager._current_functions, {}) async def test_function_registration(self): """Test function registration with LLM.""" @@ -779,7 +785,7 @@ async def test_register_function_error_handling(self): # Mock LLM to raise error on register_function flow_manager._llm.register_function.side_effect = Exception("Registration error") - new_functions = set() + new_functions = {} with self.assertRaises(FlowError): await flow_manager._register_function("test", new_functions, None) @@ -823,11 +829,14 @@ async def test_update_llm_context_error_handling(self): # Mock task to raise error on queue_frames flow_manager._task.queue_frames.side_effect = Exception("Queue error") + from pipecat_flows.types import ContextStrategy, ContextStrategyConfig + with self.assertRaises(FlowError): await flow_manager._update_llm_context( role_messages=[], task_messages=[{"role": "system", "content": "Test"}], functions=[], + strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), ) async def test_function_declarations_processing(self): @@ -1383,8 +1392,8 @@ async def test_node_without_functions(self): # Set node and verify it works without error await flow_manager.set_node_from_config(node_config) - # Verify current_functions is empty set - self.assertEqual(flow_manager._current_functions, set()) + # Verify current_functions is empty dict + self.assertEqual(flow_manager._current_functions, {}) # Verify LLM tools were still set (with empty or placeholder functions) tools_frames_call = [ @@ -1412,8 +1421,8 @@ async def test_node_with_empty_functions(self): # Set node and verify it works without error await flow_manager.set_node_from_config(node_config) - # Verify current_functions is empty set - self.assertEqual(flow_manager._current_functions, set()) + # Verify current_functions is empty dict + self.assertEqual(flow_manager._current_functions, {}) # Verify LLM tools were still set (with empty or placeholder functions) tools_frames_call = [ From a7f27fcaea6d1592906f0a136602a9679395c20d Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 4 Feb 2026 10:15:58 -0500 Subject: [PATCH 04/13] Make it so that nodes with the RESET_WITH_SUMMARY strategy fall back to the APPEND strategy when we're using an LLM that don't support summarization (like Gemini Live). I opted to do the simple thing of just checking the LLM name rather than build a more general system where each LLM reports whether it's capable of summarization. --- src/pipecat_flows/adapters.py | 32 ++++++++++++++++++++++++++++---- src/pipecat_flows/manager.py | 15 ++++++++++++++- src/pipecat_flows/types.py | 10 +++------- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/src/pipecat_flows/adapters.py b/src/pipecat_flows/adapters.py index f58623af..dfe6d463 100644 --- a/src/pipecat_flows/adapters.py +++ b/src/pipecat_flows/adapters.py @@ -47,6 +47,16 @@ class LLMAdapter: - AWS Bedrock: Uses Anthropic-compatible format """ + def __init__(self, supports_summarization: bool = True) -> None: + """Initialize the adapter. + + Args: + supports_summarization: Indicates if the LLM supports generating + summaries (which determines whether generate_summary() can be + used). + """ + self.supports_summarization = supports_summarization + def get_function_name(self, function_def: Union[Dict[str, Any], FlowsFunctionSchema]) -> str: """Extract function name from provider-specific function definition or schema. @@ -171,6 +181,9 @@ async def generate_summary( Generated summary text, or None if generation fails. """ try: + if not self.supports_summarization: + raise RuntimeError("This LLM does not support generating summaries.") + if isinstance(context, LLMContext): messages = context.get_messages() else: @@ -217,6 +230,16 @@ class UniversalLLMAdapter(LLMAdapter): be in the standard FlowsFunctionSchema format. """ + def __init__(self, supports_summarization: bool) -> None: + """Initialize the adapter. + + Args: + supports_summarization: Indicates if the LLM supports generating + summaries (which determines whether generate_summary() can be + used). + """ + super().__init__(supports_summarization=supports_summarization) + def _get_function_name_from_dict(self, function_def: Dict[str, Any]) -> str: raise RuntimeError( "Provider-specific function definitions are not supported in flows using universal LLMContext. Use FlowsFunctionSchemas or direct functions instead." @@ -664,13 +687,14 @@ def create_adapter(llm, context_aggregator) -> LLMAdapter: Raises: ValueError: If LLM type is not supported or required dependency not installed. """ + llm_type = type(llm).__name__ + llm_class = type(llm) + if isinstance(context_aggregator, LLMContextAggregatorPair): # Universal LLMContext is in use, so we need the universal adapter logger.debug("Creating universal adapter") - return UniversalLLMAdapter() - - llm_type = type(llm).__name__ - llm_class = type(llm) + llm_supports_summarization = llm_type not in ["GeminiLiveLLMService"] + return UniversalLLMAdapter(supports_summarization=llm_supports_summarization) if llm_type == "OpenAILLMService": logger.debug("Creating OpenAI adapter") diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 225f8670..801bdecc 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -924,6 +924,19 @@ async def register_direct_function(func): node_config.get("context_strategy") or self._context_strategy ) + # If RESET_WITH_SUMMARY is not supported for this LLM, fall back to + # APPEND and log a warning. In the future, we may want to add + # support for specifying a summarization LLM. + if ( + effective_strategy.strategy == ContextStrategy.RESET_WITH_SUMMARY + and not self._adapter.supports_summarization + ): + logger.warning( + f"Context strategy RESET_WITH_SUMMARY is not supported by the current LLM. " + f"Falling back to APPEND strategy for node {node_id}." + ) + effective_strategy = ContextStrategyConfig(strategy=ContextStrategy.APPEND) + # For APPEND strategy, carry over deactivated functions from # the previous node, since there may be context messages that call # or reference them. This helps LLMs better understand their @@ -971,7 +984,7 @@ async def register_direct_function(func): deactivated_warning = { "role": "system", "content": ( - f"IMPORTANT: The following functions are deactivated and should NOT be " + f"IMPORTANT: The following functions are now DEACTIVATED and should NO LONGER be " f"called: {', '.join(deactivated_function_names)}. If you call them, they " f"will return an error." ), diff --git a/src/pipecat_flows/types.py b/src/pipecat_flows/types.py index be9b7a66..6dce145d 100644 --- a/src/pipecat_flows/types.py +++ b/src/pipecat_flows/types.py @@ -313,15 +313,11 @@ def flows_direct_function(*, cancel_on_interruption: bool = True) -> Callable[[C Returns: A decorator that attaches the metadata to the function. - Example: + Example:: + @flows_direct_function(cancel_on_interruption=False) async def long_running_task(flow_manager: FlowManager, query: str): - '''Perform a long-running task that should not be cancelled on interruption. - - Args: - query: The query to process. - ''' - # ... implementation + # ... docstring and implementation return {"status": "complete"}, None """ From 499accc559b2683eb35e1dfa9d7e4cb3fa99ff40 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Tue, 3 Feb 2026 14:08:53 -0500 Subject: [PATCH 05/13] Update examples to work with Gemini Live: - Conditionally add STT and TTS to the pipeline, based on whether a traditional LLM or a speech-to-speech service like Gemini Live is being used - Tweak system prompt wording to: - prevent unnecessary responses before node transitions - nudge the model to convey the results of a previous function call to the user, even when it's not the most recent message in context --- examples/food_ordering.py | 41 ++++++++---- examples/food_ordering_direct_functions.py | 39 ++++++----- examples/insurance_quote.py | 65 +++++++++++-------- examples/patient_intake.py | 41 +++++++----- examples/podcast_interview.py | 39 ++++++----- examples/restaurant_reservation.py | 46 ++++++++----- ...restaurant_reservation_direct_functions.py | 50 ++++++++------ examples/utils.py | 46 ++++++------- examples/warm_transfer.py | 51 ++++++++++----- src/pipecat_flows/manager.py | 12 ++-- tests/test_context_strategies.py | 3 +- 11 files changed, 262 insertions(+), 171 deletions(-) diff --git a/examples/food_ordering.py b/examples/food_ordering.py index ad3b4118..fc35ab68 100644 --- a/examples/food_ordering.py +++ b/examples/food_ordering.py @@ -52,7 +52,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import ( FlowArgs, @@ -168,6 +168,8 @@ async def select_pizza_order( size = args["size"] pizza_type = args["type"] + print("[pk] Selected pizza:", size, pizza_type) + # Simple pricing base_price = {"small": 10.00, "medium": 15.00, "large": 20.00} price = base_price[size] @@ -233,6 +235,8 @@ async def select_sushi_order( count = args["count"] roll_type = args["type"] + print("[pk] Selected sushi:", count, roll_type) + # Simple pricing: $8 per roll price = count * 8.00 @@ -345,10 +349,14 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the food ordering bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + tts = ( + CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman + ) + if needs_stt_tts() + else None ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -365,15 +373,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - [ - transport.input(), - stt, # comment out if using LLM_PROVIDER="gemini_live" - context_aggregator.user(), - llm, - tts, # comment out if using LLM_PROVIDER="gemini_live" - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/food_ordering_direct_functions.py b/examples/food_ordering_direct_functions.py index bf9b7e75..b44fecba 100644 --- a/examples/food_ordering_direct_functions.py +++ b/examples/food_ordering_direct_functions.py @@ -52,7 +52,7 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import FlowManager, FlowResult, NodeConfig @@ -193,7 +193,7 @@ def create_initial_node() -> NodeConfig: role_messages=[ { "role": "system", - "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", + "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. When you've decided to call a function to progress the conversation, do not also respond; the function call by itself is enough. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", } ], task_messages=[ @@ -288,10 +288,14 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the food ordering bot with direct functions.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + tts = ( + CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman + ) + if needs_stt_tts() + else None ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -308,15 +312,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/insurance_quote.py b/examples/insurance_quote.py index 13c18625..83b2f65a 100644 --- a/examples/insurance_quote.py +++ b/examples/insurance_quote.py @@ -57,7 +57,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import FlowArgs, FlowManager, FlowResult, FlowsFunctionSchema, NodeConfig @@ -115,12 +115,12 @@ class CoverageUpdateResult(FlowResult, InsuranceQuote): # Function handlers -async def collect_age( +async def record_age( args: FlowArgs, flow_manager: FlowManager ) -> tuple[AgeCollectionResult, NodeConfig]: """Process age collection.""" age = args["age"] - logger.debug(f"collect_age handler executing with age: {age}") + logger.debug(f"record_age handler executing with age: {age}") flow_manager.state["age"] = age result = AgeCollectionResult(age=age) @@ -130,12 +130,12 @@ async def collect_age( return result, next_node -async def collect_marital_status( +async def record_marital_status( args: FlowArgs, flow_manager: FlowManager ) -> tuple[MaritalStatusResult, NodeConfig]: """Process marital status collection.""" status = args["marital_status"] - logger.debug(f"collect_marital_status handler executing with status: {status}") + logger.debug(f"record_marital_status handler executing with status: {status}") result = MaritalStatusResult(marital_status=status) @@ -208,23 +208,25 @@ def create_initial_node() -> NodeConfig: "content": ( "You are a friendly insurance agent. Your responses will be " "converted to audio, so avoid special characters. Always use " - "the available functions to progress the conversation naturally." + "the available functions to progress the conversation naturally. " + "When you've decided to call a function, do not also respond; " + "the function call by itself is enough." ), } ], "task_messages": [ { "role": "system", - "content": "Start by asking for the customer's age.", + "content": "Start by asking for the customer's age, then record their response using the record_age function.", } ], "functions": [ FlowsFunctionSchema( - name="collect_age", + name="record_age", description="Record customer's age", properties={"age": {"type": "integer"}}, required=["age"], - handler=collect_age, + handler=record_age, ) ], } @@ -242,11 +244,11 @@ def create_marital_status_node() -> NodeConfig: ], "functions": [ FlowsFunctionSchema( - name="collect_marital_status", + name="record_marital_status", description="Record marital status after customer provides it", properties={"marital_status": {"type": "string", "enum": ["single", "married"]}}, required=["marital_status"], - handler=collect_marital_status, + handler=record_marital_status, ) ], } @@ -295,11 +297,11 @@ def create_quote_results_node( f"Monthly Premium: ${quote['monthly_premium']:.2f}\n" f"Coverage Amount: ${quote['coverage_amount']:,}\n" f"Deductible: ${quote['deductible']:,}\n\n" - "Explain these quote details to the customer. When they request changes, " - "use update_coverage to recalculate their quote. Explain how their " + "Explain these quote details to the customer. If they then request changes, " + "use update_coverage to recalculate their quote. If this is an updated quote (from a previous quote), explain how their " "changes affected the premium and compare it to their previous quote. " "Ask if they'd like to make any other adjustments or if they're ready " - "to end the quote process." + "to end the quote process. " ), } ], @@ -344,10 +346,14 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the insurance quote bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + tts = ( + CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + if needs_stt_tts() + else None ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -364,15 +370,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/patient_intake.py b/examples/patient_intake.py index ecf769aa..de481ab4 100644 --- a/examples/patient_intake.py +++ b/examples/patient_intake.py @@ -54,7 +54,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import ( ContextStrategy, @@ -319,7 +319,7 @@ def create_allergies_node() -> NodeConfig: task_messages=[ { "role": "system", - "content": "Collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.", + "content": "Your job now is to collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.", } ], functions=[record_allergies_func], @@ -355,7 +355,7 @@ def create_conditions_node() -> NodeConfig: task_messages=[ { "role": "system", - "content": "Collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.", + "content": "Your job now is to collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.", } ], functions=[record_conditions_func], @@ -479,10 +479,14 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the patient intake bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + tts = ( + CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + if needs_stt_tts() + else None ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -499,15 +503,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/podcast_interview.py b/examples/podcast_interview.py index ddbbe722..8126b1bc 100644 --- a/examples/podcast_interview.py +++ b/examples/podcast_interview.py @@ -63,7 +63,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import ( FlowArgs, @@ -246,11 +246,15 @@ def create_final_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + tts = ( + CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + if needs_stt_tts() + else None ) # LLM service is created using the create_llm function from utils.py @@ -270,16 +274,21 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): rtvi = RTVIProcessor(config=RTVIConfig(config=[])) pipeline = Pipeline( - [ - transport.input(), - rtvi, - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask( diff --git a/examples/restaurant_reservation.py b/examples/restaurant_reservation.py index 39c2c848..95583f50 100644 --- a/examples/restaurant_reservation.py +++ b/examples/restaurant_reservation.py @@ -53,7 +53,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import FlowArgs, FlowManager, FlowResult, FlowsFunctionSchema, NodeConfig @@ -199,7 +199,7 @@ def create_initial_node(wait_for_user: bool) -> NodeConfig: "role_messages": [ { "role": "system", - "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis.", + "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis. When you've decided to call a function, do not also respond; the function call by itself is enough.", } ], "task_messages": [ @@ -221,7 +221,7 @@ def create_time_selection_node() -> NodeConfig: "task_messages": [ { "role": "system", - "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM.", + "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. When they provide a time, check availability by calling the appropriate function.", } ], "functions": [availability_schema], @@ -235,7 +235,7 @@ def create_confirmation_node() -> NodeConfig: "task_messages": [ { "role": "system", - "content": "Confirm the reservation details and ask if they need anything else.", + "content": "Confirm the reservation details and ask if they need anything else. If they don't, go ahead and end the conversation by calling the appropriate function.", } ], "functions": [end_conversation_schema], @@ -253,7 +253,8 @@ def create_no_availability_node(alternative_times: list[str]) -> NodeConfig: "content": ( f"Apologize that the requested time is not available. " f"Suggest these alternative times: {times_list}. " - "Ask if they'd like to try one of these times." + "Ask if they'd like to try one of these times. " + "If not, end the conversation by calling the appropriate function." ), } ], @@ -280,10 +281,14 @@ async def run_bot( transport: BaseTransport, runner_args: RunnerArguments, wait_for_user: bool = False ): """Run the restaurant reservation bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + tts = ( + CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + if needs_stt_tts() + else None ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -300,15 +305,20 @@ async def run_bot( ) pipeline = Pipeline( - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/restaurant_reservation_direct_functions.py b/examples/restaurant_reservation_direct_functions.py index 064c3362..745f8df5 100644 --- a/examples/restaurant_reservation_direct_functions.py +++ b/examples/restaurant_reservation_direct_functions.py @@ -55,7 +55,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import FlowManager, FlowResult, NodeConfig @@ -150,7 +150,7 @@ async def check_availability( Check availability for requested time. Args: - time (str): Requested reservation time in "HH:MM AM/PM" format. Must be between 5 PM and 10 PM. + time (str): Requested reservation time in "HH:MM AM/PM" format, with NO leading 0s (e.g. "6:00 PM"). Must be between 5 PM and 10 PM. party_size (int): Number of people in the party. """ # Check availability with mock API @@ -163,8 +163,10 @@ async def check_availability( # Next node: confirmation or no availability if is_available: + logger.debug("Time is available, transitioning to confirmation node") next_node = create_confirmation_node() else: + logger.debug(f"Time not available, storing alternatives: {alternative_times}") next_node = create_no_availability_node(alternative_times) return result, next_node @@ -183,7 +185,7 @@ def create_initial_node(wait_for_user: bool) -> NodeConfig: "role_messages": [ { "role": "system", - "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis.", + "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis. When you've decided to call a function, do not also respond; the function call by itself is enough.", } ], "task_messages": [ @@ -205,7 +207,7 @@ def create_time_selection_node() -> NodeConfig: "task_messages": [ { "role": "system", - "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM.", + "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. When they provide a time, check availability by calling the appropriate function.", } ], "functions": [check_availability], @@ -219,7 +221,7 @@ def create_confirmation_node() -> NodeConfig: "task_messages": [ { "role": "system", - "content": "Confirm the reservation details and ask if they need anything else.", + "content": "Confirm the reservation details and ask if they need anything else. If they don't, go ahead and end the conversation by calling the appropriate function.", } ], "functions": [end_conversation], @@ -237,7 +239,8 @@ def create_no_availability_node(alternative_times: list[str]) -> NodeConfig: "content": ( f"Apologize that the requested time is not available. " f"Suggest these alternative times: {times_list}. " - "Ask if they'd like to try one of these times." + "Ask if they'd like to try one of these times. " + "If not, end the conversation by calling the appropriate function." ), } ], @@ -264,10 +267,14 @@ async def run_bot( transport: BaseTransport, runner_args: RunnerArguments, wait_for_user: bool = False ): """Run the restaurant reservation bot with direct functions.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + tts = ( + CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady + ) + if needs_stt_tts() + else None ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -284,15 +291,20 @@ async def run_bot( ) pipeline = Pipeline( - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/utils.py b/examples/utils.py index cb1460b9..4daf82b8 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -14,13 +14,9 @@ from typing import Any -def create_llm(provider: str = None, model: str = None) -> Any: +def create_llm() -> Any: """Create an LLM service instance based on environment configuration. - Args: - provider: LLM provider name. If None, uses LLM_PROVIDER env var (defaults to 'openai') - model: Model name. If None, uses provider's default model - Returns: Configured LLM service instance @@ -36,22 +32,10 @@ def create_llm(provider: str = None, model: str = None) -> Any: - gemini_live: Requires GOOGLE_API_KEY Usage: - # Use default provider (from LLM_PROVIDER env var, defaults to OpenAI) + # Use provider from LLM_PROVIDER env var (defaults to OpenAI) llm = create_llm() - - # Use specific provider - llm = create_llm("anthropic") - - # Use specific provider and model - llm = create_llm("openai", "gpt-4o-mini") - - # Use AWS Bedrock (requires AWS credentials via SSO, env vars, or IAM) - llm = create_llm("aws") """ - if provider is None: - provider = os.getenv("LLM_PROVIDER", "openai").lower() - else: - provider = provider.lower() + provider = _resolve_provider() # Provider configurations configs = { @@ -80,7 +64,7 @@ def create_llm(provider: str = None, model: str = None) -> Any: "service": "pipecat.services.google.gemini_live.llm.GeminiLiveLLMService", "api_key_env": "GOOGLE_API_KEY", "default_model": "models/gemini-2.5-flash-native-audio-preview-12-2025", - } + }, } config = configs.get(provider) @@ -101,8 +85,8 @@ def create_llm(provider: str = None, model: str = None) -> Any: if not api_key: raise ValueError(f"Missing API key: {config['api_key_env']} for provider: {provider}") - # Use provided model or default - selected_model = model or config["default_model"] + # Use default model + selected_model = config["default_model"] # Build kwargs kwargs = {"api_key": api_key, "model": selected_model} @@ -115,3 +99,21 @@ def create_llm(provider: str = None, model: str = None) -> Any: del kwargs["api_key"] return service_class(**kwargs) + + +def needs_stt_tts() -> bool: + """Return True when STT/TTS should be enabled for the LLM service specified by environment configuration. + + Returns: + True if STT/TTS should be enabled, False if the provider handles audio natively. + """ + return _resolve_provider() != "gemini_live" + + +def _resolve_provider() -> str: + """Resolve the LLM provider name from environment configuration. + + Returns: + Lower-cased provider name. + """ + return os.getenv("LLM_PROVIDER", "openai").lower() diff --git a/examples/warm_transfer.py b/examples/warm_transfer.py index 5cb307af..5b7f5668 100644 --- a/examples/warm_transfer.py +++ b/examples/warm_transfer.py @@ -74,7 +74,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm +from utils import create_llm, needs_stt_tts from pipecat_flows import ContextStrategyConfig, FlowManager, FlowResult, NodeConfig from pipecat_flows.types import ActionConfig, ContextStrategy, FlowArgs, FlowsFunctionSchema @@ -277,7 +277,7 @@ def create_initial_customer_interaction_node() -> NodeConfig: role_messages=[ { "role": "system", - "content": "You are an assistant for ABC Widget Company. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", + "content": "You are an assistant for ABC Widget Company. You must ALWAYS use the available functions to progress the conversation. When you've decided to call a function to progress the conversation, do not also respond; the function call by itself is enough. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", } ], task_messages=[ @@ -329,7 +329,10 @@ def create_continued_customer_interaction_node() -> NodeConfig: task_messages=[ { "role": "system", - "content": """Ask the customer there's anything else you could help them with today, or if they'd like to end the conversation. If they need more help, re-offer the two choices you offered before: you could provide store location and hours of operation, or begin placing an order. + "content": """ + Finish helping the customer with their previous request, if you haven't already. + + Then ask the customer there's anything else you could help them with today, or if they'd like to end the conversation. If they need more help, re-offer the two choices you offered before: you could provide store location and hours of operation, or begin placing an order. To help the customer: - Use the check_store_location_and_hours_of_operation function to check store location and hours of operation to provide to the customer @@ -637,10 +640,14 @@ async def main(): vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), ), ) - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = CartesiaHttpTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="d46abd1d-2d02-43e8-819f-51fb652c1c61", # Newsman + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + tts = ( + CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="d46abd1d-2d02-43e8-819f-51fb652c1c61", # Newsman + ) + if needs_stt_tts() + else None ) llm = create_llm() @@ -659,15 +666,20 @@ async def main(): # Create pipeline pipeline = Pipeline( - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ] + list( + filter( + None, + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ], + ) + ) ) task = PipelineTask(pipeline=pipeline, params=PipelineParams(allow_interruptions=True)) @@ -732,8 +744,13 @@ async def on_participant_left( # Prepare hold music args flow_manager.state["hold_music_args"] = { - "script_path": Path(__file__).parent.parent / "assets" / "hold_music" / "hold_music.py", + "script_path": Path(__file__).parent.parent + / "examples" + / "assets" + / "hold_music" + / "hold_music.py", "wav_file_path": Path(__file__).parent.parent + / "examples" / "assets" / "hold_music" / "hold_music.wav", diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 801bdecc..7340b691 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -728,9 +728,7 @@ async def _register_function( Raises: FlowError: If function registration fails """ - should_register = force or ( - name not in self._current_functions.keys() - ) + should_register = force or (name not in self._current_functions.keys()) if should_register: try: # Handle special token format (e.g. "__function__:function_name") @@ -858,7 +856,9 @@ async def _set_node(self, node_id: str, node_config: NodeConfig) -> None: # Mix in global functions that should be available at every node functions_list = self._global_functions + functions_list - async def register_function_schema(schema: FlowsFunctionSchema, is_deactivated: bool = False): + async def register_function_schema( + schema: FlowsFunctionSchema, is_deactivated: bool = False + ): """Helper to register a single FlowsFunctionSchema.""" tools.append(schema) await self._register_function( @@ -920,9 +920,7 @@ async def register_direct_function(func): await register_function_schema(schema) # Determine effective context strategy for this transition - effective_strategy = ( - node_config.get("context_strategy") or self._context_strategy - ) + effective_strategy = node_config.get("context_strategy") or self._context_strategy # If RESET_WITH_SUMMARY is not supported for this LLM, fall back to # APPEND and log a warning. In the future, we may want to add diff --git a/tests/test_context_strategies.py b/tests/test_context_strategies.py index f9105912..77f043a3 100644 --- a/tests/test_context_strategies.py +++ b/tests/test_context_strategies.py @@ -698,7 +698,8 @@ async def test_deactivated_functions_persist_across_multiple_transitions(self): frames = queue_frames_call[0][0] messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) warning_found = any( - "function_a" in str(msg.get("content", "")) and "deactivated" in str(msg.get("content", "")).lower() + "function_a" in str(msg.get("content", "")) + and "deactivated" in str(msg.get("content", "")).lower() for msg in messages_frame.messages ) self.assertTrue(warning_found, "function_a should be marked as deactivated") From 5bd2c1c2d88d70516ccce759bd108f5d2f81c81f Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Wed, 4 Feb 2026 16:59:32 -0500 Subject: [PATCH 06/13] Fix a bug where "deactivated" functions carried over from previous nodes wouldn't be properly reactivated when transitioning to a node where those functions are active again. --- src/pipecat_flows/manager.py | 99 +++++----- tests/test_context_strategies.py | 308 +++++++++++++++++++++++++++++++ 2 files changed, 362 insertions(+), 45 deletions(-) diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 7340b691..0765cb3a 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -657,6 +657,15 @@ def _lookup_function(self, func_name: str) -> Callable: raise FlowError(error_message) + # Prefix used to mark deactivated function descriptions + _DEACTIVATED_PREFIX = "[DEACTIVATED] " + + def _add_deactivated_prefix(self, description: str) -> str: + """Add the [DEACTIVATED] prefix to a description if not already present.""" + if not description.startswith(self._DEACTIVATED_PREFIX): + return f"{self._DEACTIVATED_PREFIX}{description}" + return description + def _create_deactivated_function_schema( self, original_schema: FlowsFunctionSchema ) -> FlowsFunctionSchema: @@ -687,9 +696,7 @@ async def deactivated_handler(args: FlowArgs, flow_manager: "FlowManager") -> Fl } # Only add prefix if not already deactivated (idempotent) - description = original_schema.description - if not description.startswith("[DEACTIVATED]"): - description = f"[DEACTIVATED] {description}" + description = self._add_deactivated_prefix(original_schema.description) return FlowsFunctionSchema( name=original_schema.name, @@ -710,9 +717,12 @@ async def _register_function( transition_callback: Optional[Callable] = None, *, cancel_on_interruption: bool = True, - force: bool = False, ) -> None: - """Register a function with the LLM if not already registered. + """Register a function with the LLM. + + Always registers the function, replacing any existing registration. + This ensures that handlers are properly updated when a function is + deactivated or reactivated upon transitioning nodes. Args: name: Name of the function to register @@ -722,36 +732,32 @@ async def _register_function( transition_callback: Optional transition callback (dynamic flows) cancel_on_interruption: Whether to cancel this function call when an interruption occurs. Defaults to True. - force: If True, register even if function already exists (for re-registering - deactivated functions with new handlers). Defaults to False. Raises: FlowError: If function registration fails """ - should_register = force or (name not in self._current_functions.keys()) - if should_register: - try: - # Handle special token format (e.g. "__function__:function_name") - if isinstance(handler, str) and handler.startswith("__function__:"): - func_name = handler.split(":")[1] - handler = self._lookup_function(func_name) - - # Create transition function - transition_func = await self._create_transition_func( - name, handler, transition_to, transition_callback - ) + try: + # Handle special token format (e.g. "__function__:function_name") + if isinstance(handler, str) and handler.startswith("__function__:"): + func_name = handler.split(":")[1] + handler = self._lookup_function(func_name) + + # Create transition function + transition_func = await self._create_transition_func( + name, handler, transition_to, transition_callback + ) - # Register function with LLM (or LLMSwitcher) - self._llm.register_function( - name, - transition_func, - cancel_on_interruption=cancel_on_interruption, - ) + # Register function with LLM (or LLMSwitcher) + self._llm.register_function( + name, + transition_func, + cancel_on_interruption=cancel_on_interruption, + ) - logger.debug(f"Registered function: {name}") - except Exception as e: - logger.error(f"Failed to register function {name}: {str(e)}") - raise FlowError(f"Function registration failed: {str(e)}") from e + logger.debug(f"Registered function: {name}") + except Exception as e: + logger.error(f"Failed to register function {name}: {str(e)}") + raise FlowError(f"Function registration failed: {str(e)}") from e async def set_node_from_config(self, node_config: NodeConfig) -> None: """Set up a new conversation node and transition to it. @@ -856,9 +862,7 @@ async def _set_node(self, node_id: str, node_config: NodeConfig) -> None: # Mix in global functions that should be available at every node functions_list = self._global_functions + functions_list - async def register_function_schema( - schema: FlowsFunctionSchema, is_deactivated: bool = False - ): + async def register_function_schema(schema: FlowsFunctionSchema): """Helper to register a single FlowsFunctionSchema.""" tools.append(schema) await self._register_function( @@ -867,10 +871,10 @@ async def register_function_schema( transition_to=schema.transition_to, transition_callback=schema.transition_callback, cancel_on_interruption=schema.cancel_on_interruption, - force=is_deactivated, # Force re-registration for deactivated functions ) - # Store the schema for potential future deactivation (including deactivated ones - # so they persist across multiple transitions) + # Track that we registered this function in the current node. + # This will be useful if we need to carry it over as a + # "deactivated" function into the next node. new_functions[schema.name] = schema async def register_direct_function(func): @@ -884,7 +888,10 @@ async def register_direct_function(func): transition_callback=None, cancel_on_interruption=direct_function.cancel_on_interruption, ) - # Store a FlowsFunctionSchema representation for potential future deactivation + # Track that we registered this function in the current node. + # This will be useful if we need to carry it over as a + # "deactivated" function into the next node. Track it as a + # FlowsFunctionSchema for ease of editing to be "deactivated". schema = FlowsFunctionSchema( name=direct_function.name, description=direct_function.description, @@ -935,12 +942,13 @@ async def register_direct_function(func): ) effective_strategy = ContextStrategyConfig(strategy=ContextStrategy.APPEND) - # For APPEND strategy, carry over deactivated functions from - # the previous node, since there may be context messages that call - # or reference them. This helps LLMs better understand their - # context, and also prevents errors: Gemini Live is particularly - # sensitive, erroring out when it has context messages (even text - # messages) referring to missing functions. + # For APPEND strategy, carry over functions from the previous node + # that aren't in the current node (but marking them as + # "deactivated"), since there may be historical context messages + # that call or reference them. This helps LLMs better understand + # their context, and also prevents errors: Gemini Live is + # particularly sensitive, erroring out when it has context messages + # (even text messages) referring to missing functions. deactivated_function_names: List[str] = [] if ( self._current_node is not None @@ -956,7 +964,7 @@ async def register_direct_function(func): ): # Create deactivated stub with same schema but dummy handler deactivated_schema = self._create_deactivated_function_schema(prev_schema) - await register_function_schema(deactivated_schema, is_deactivated=True) + await register_function_schema(deactivated_schema) deactivated_function_names.append(prev_name) if deactivated_function_names: @@ -982,9 +990,10 @@ async def register_direct_function(func): deactivated_warning = { "role": "system", "content": ( - f"IMPORTANT: The following functions are now DEACTIVATED and should NO LONGER be " + f"IMPORTANT: The following functions are currently DEACTIVATED and should NOT be " f"called: {', '.join(deactivated_function_names)}. If you call them, they " - f"will return an error." + f"will return an error. ALL OTHER AVAILABLE FUNCTIONS should be considered " + "currently ACTIVE and callable." ), } task_messages.insert(0, deactivated_warning) diff --git a/tests/test_context_strategies.py b/tests/test_context_strategies.py index 77f043a3..66441a79 100644 --- a/tests/test_context_strategies.py +++ b/tests/test_context_strategies.py @@ -860,3 +860,311 @@ async def test_deactivated_function_reactivated_when_present_in_new_node(self): content, "function_a should not be in deactivation warning since it's now active", ) + + async def test_reactivated_function_is_re_registered_with_llm(self): + """Test that reactivated functions are re-registered with the LLM.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + function_a_handler = AsyncMock(return_value={"status": "success"}) + + # First node with function_a + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": function_a_handler, + "description": "Function A", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + + # Count initial registrations for function_a + initial_registrations = sum( + 1 + for call in self.mock_llm.register_function.call_args_list + if call[0][0] == "function_a" + ) + self.assertEqual(initial_registrations, 1, "function_a should be registered once initially") + + # Second node without function_a (it becomes deactivated) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + } + + await flow_manager._set_node("second", second_node) + + # Count registrations after deactivation + deactivation_registrations = sum( + 1 + for call in self.mock_llm.register_function.call_args_list + if call[0][0] == "function_a" + ) + self.assertEqual( + deactivation_registrations, 2, "function_a should be re-registered when deactivated" + ) + + # Third node brings back function_a + third_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Third task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": function_a_handler, + "description": "Function A restored", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("third", third_node) + + # Count registrations after reactivation + final_registrations = sum( + 1 + for call in self.mock_llm.register_function.call_args_list + if call[0][0] == "function_a" + ) + self.assertEqual( + final_registrations, + 3, + "function_a should be re-registered when reactivated (total 3 registrations)", + ) + + async def test_reactivated_function_handler_works(self): + """Test that a reactivated function's handler actually works (not the deactivated dummy).""" + from pipecat.services.llm_service import FunctionCallParams + + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + # Track actual handler calls + real_handler_called = [] + + async def real_handler(args, flow_manager): + real_handler_called.append(args) + return {"status": "real_success", "data": args.get("input", "none")} + + # First node with function_a + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": real_handler, + "description": "Function A", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + + # Second node without function_a (deactivates it) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + } + + await flow_manager._set_node("second", second_node) + + # Third node brings back function_a + third_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Third task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": real_handler, + "description": "Function A restored", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("third", third_node) + + # Get the registered function handler from the most recent registration + # The last call for function_a should be the reactivated one + function_a_calls = [ + call + for call in self.mock_llm.register_function.call_args_list + if call[0][0] == "function_a" + ] + last_registered_handler = function_a_calls[-1][0][1] + + # Create mock params to call the handler + mock_result_callback = AsyncMock() + mock_params = MagicMock(spec=FunctionCallParams) + mock_params.arguments = {"input": "test_value"} + mock_params.result_callback = mock_result_callback + + # Call the registered handler + await last_registered_handler(mock_params) + + # Verify the real handler was called, not the deactivated dummy + self.assertEqual(len(real_handler_called), 1, "Real handler should have been called") + self.assertEqual(real_handler_called[0], {"input": "test_value"}) + + # Verify result callback was called with success (not error) + mock_result_callback.assert_called_once() + result = mock_result_callback.call_args[0][0] + self.assertEqual(result.get("status"), "real_success") + self.assertNotIn("error", result) + + async def test_reactivated_function_description_no_deactivated_prefix(self): + """Test that reactivated function's stored schema has no [DEACTIVATED] prefix.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + handler = AsyncMock(return_value={"status": "success"}) + + # First node with function_a + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": handler, + "description": "Original description", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.assertEqual( + flow_manager._current_functions["function_a"].description, "Original description" + ) + + # Second node without function_a (deactivates it) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [], + } + + await flow_manager._set_node("second", second_node) + self.assertTrue( + flow_manager._current_functions["function_a"].description.startswith("[DEACTIVATED]") + ) + + # Third node reactivates function_a + third_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Third task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": handler, + "description": "Restored description", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("third", third_node) + + # Verify stored schema has clean description + self.assertEqual( + flow_manager._current_functions["function_a"].description, + "Restored description", + "Reactivated function should have clean description without [DEACTIVATED] prefix", + ) + self.assertFalse( + flow_manager._current_functions["function_a"].description.startswith("[DEACTIVATED]") + ) + + async def test_functions_always_registered_on_each_transition(self): + """Test that functions are always re-registered on each node transition.""" + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + handler = AsyncMock(return_value={"status": "success"}) + + # Node with function_a and function_b + node_with_both: NodeConfig = { + "task_messages": [{"role": "system", "content": "Task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": handler, + "description": "Function A", + "parameters": {"type": "object", "properties": {}}, + }, + }, + { + "type": "function", + "function": { + "name": "function_b", + "handler": handler, + "description": "Function B", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + # First transition + await flow_manager._set_node("first", node_with_both) + + first_transition_registrations = { + call[0][0] for call in self.mock_llm.register_function.call_args_list + } + self.assertIn("function_a", first_transition_registrations) + self.assertIn("function_b", first_transition_registrations) + + initial_call_count = len(self.mock_llm.register_function.call_args_list) + + # Second transition with same functions + await flow_manager._set_node("second", node_with_both) + + # Verify functions were registered again + new_call_count = len(self.mock_llm.register_function.call_args_list) + new_registrations = new_call_count - initial_call_count + + self.assertGreaterEqual( + new_registrations, 2, "Both functions should be re-registered on second transition" + ) From 2008ec418edcdf90f9237c59073104ebb094cf82 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 5 Feb 2026 11:47:14 -0500 Subject: [PATCH 07/13] Add README for changes to support Gemini Live --- CHANGELOG.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a8461d0..e1854616 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Non-decorated direct functions continue to work with `cancel_on_interruption=True` (the default behavior). +- Added support for using Gemini Live (`GeminiLiveLLMService`) with Pipecat + Flows. Updated examples to support `LLM_PROVIDER=gemini_live`, accordingly. + Note that using Gemini Live requires that you use `LLMContext` and + `LLMContextAggregatorPair` rather than the deprecated `OpenAILLMContext` and + associated aggregators. + +### Changed + +- When transitioning from one node to the next, any functions in the previous + node that are absent in the new node now get "carried over", in that they're + still passed to the LLM, but in a "deactivated" state. Deactivation involves: + - Adding a special prefix to the function description + - Adding special context messages telling the LLM which functions to avoid + calling + + Providing LLMs with deactivated functions helps them understand historical + context that might contain references to previously-active functions. Gemini + Live is particularly sensitive, erroring out when its context (even the text + messages) refer to missing functions. + ## [0.0.22] - 2025-11-18 ### Added From 0c08bc96f5d34d0f6086c27197af69b5b7f8690e Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 5 Feb 2026 11:50:49 -0500 Subject: [PATCH 08/13] Ruff format --- src/pipecat_flows/manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 0765cb3a..1a2af1dc 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -890,7 +890,7 @@ async def register_direct_function(func): ) # Track that we registered this function in the current node. # This will be useful if we need to carry it over as a - # "deactivated" function into the next node. Track it as a + # "deactivated" function into the next node. Track it as a # FlowsFunctionSchema for ease of editing to be "deactivated". schema = FlowsFunctionSchema( name=direct_function.name, From 3650560f39449d32e7e39d79c8faf7e094fa56a6 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 5 Feb 2026 12:38:39 -0500 Subject: [PATCH 09/13] Remove stray debugging print statements --- examples/food_ordering.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/food_ordering.py b/examples/food_ordering.py index fc35ab68..c8c58330 100644 --- a/examples/food_ordering.py +++ b/examples/food_ordering.py @@ -168,8 +168,6 @@ async def select_pizza_order( size = args["size"] pizza_type = args["type"] - print("[pk] Selected pizza:", size, pizza_type) - # Simple pricing base_price = {"small": 10.00, "medium": 15.00, "large": 20.00} price = base_price[size] @@ -235,8 +233,6 @@ async def select_sushi_order( count = args["count"] roll_type = args["type"] - print("[pk] Selected sushi:", count, roll_type) - # Simple pricing: $8 per roll price = count * 8.00 From aed0e1a097cad8ebd436800a5dce6e73ee94a2a3 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 5 Feb 2026 17:00:29 -0500 Subject: [PATCH 10/13] Keep only the latest "functions deactivated" message in context, rather than letting them build up. --- src/pipecat_flows/manager.py | 66 +++++++--- tests/test_context_strategies.py | 199 ++++++++++++++++++++++++++----- tests/test_manager.py | 27 +++-- 3 files changed, 235 insertions(+), 57 deletions(-) diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 1a2af1dc..17fd46f4 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -35,13 +35,14 @@ from pipecat.frames.frames import ( FunctionCallResultProperties, LLMMessagesAppendFrame, + LLMMessagesTransformFrame, LLMMessagesUpdateFrame, LLMRunFrame, LLMSetToolsFrame, ) from pipecat.pipeline.llm_switcher import LLMSwitcher from pipecat.pipeline.task import PipelineTask -from pipecat.processors.aggregators.llm_context import LLMContext +from pipecat.processors.aggregators.llm_context import LLMContext, LLMContextMessage from pipecat.processors.aggregators.openai_llm_context import OpenAILLMContext from pipecat.services.llm_service import FunctionCallParams from pipecat.transports.base_transport import BaseTransport @@ -658,14 +659,35 @@ def _lookup_function(self, func_name: str) -> Callable: raise FlowError(error_message) # Prefix used to mark deactivated function descriptions - _DEACTIVATED_PREFIX = "[DEACTIVATED] " + _DEACTIVATED_FUNCTION_PREFIX = "[DEACTIVATED] " + + # Prefix used to mark "deactivated functions" messages in context + _DEACTIVATED_FUNCTIONS_MESSAGE_PREFIX = "[DEACTIVATED_FUNCTIONS_MESSAGE] " def _add_deactivated_prefix(self, description: str) -> str: """Add the [DEACTIVATED] prefix to a description if not already present.""" - if not description.startswith(self._DEACTIVATED_PREFIX): - return f"{self._DEACTIVATED_PREFIX}{description}" + if not description.startswith(self._DEACTIVATED_FUNCTION_PREFIX): + return f"{self._DEACTIVATED_FUNCTION_PREFIX}{description}" return description + def _remove_deactivated_functions_messages( + self, messages: List[LLMContextMessage] + ) -> List[LLMContextMessage]: + """Transform function that removes deactivated function warning messages. + + Suitable for use with LLMMessagesTransformFrame. Filters out any messages + with the deactivated functions message prefix, ensuring only the most + recent warning is present in the context. + """ + return [ + msg + for msg in messages + if not ( + isinstance(msg.get("content"), str) + and msg["content"].startswith(self._DEACTIVATED_FUNCTIONS_MESSAGE_PREFIX) + ) + ] + def _create_deactivated_function_schema( self, original_schema: FlowsFunctionSchema ) -> FlowsFunctionSchema: @@ -942,13 +964,13 @@ async def register_direct_function(func): ) effective_strategy = ContextStrategyConfig(strategy=ContextStrategy.APPEND) - # For APPEND strategy, carry over functions from the previous node - # that aren't in the current node (but marking them as - # "deactivated"), since there may be historical context messages - # that call or reference them. This helps LLMs better understand - # their context, and also prevents errors: Gemini Live is - # particularly sensitive, erroring out when it has context messages - # (even text messages) referring to missing functions. + # For APPEND strategy on non-first nodes, carry over functions from + # the previous node that aren't in the current node (but marking + # them as "deactivated"), since there may be historical context + # messages that call or reference them. This helps LLMs better + # understand their context, and also prevents errors: Gemini Live + # is particularly sensitive, erroring out when it has context + # messages (even text messages) referring to missing functions. deactivated_function_names: List[str] = [] if ( self._current_node is not None @@ -984,19 +1006,35 @@ async def register_direct_function(func): standard_functions, original_configs=functions_list ) - # Prepare task messages, injecting deactivated function warning if needed + # Remove any prior "deactivated functions" messages from context + # to prevent buildup (only one up-to-date one is needed). Only + # needed for APPEND strategy on non-first nodes, since + # RESET/RESET_WITH_SUMMARY replace the entire context. + if ( + self._current_node is not None + and effective_strategy.strategy == ContextStrategy.APPEND + ): + await self._task.queue_frames( + [ + LLMMessagesTransformFrame( + transform=self._remove_deactivated_functions_messages + ) + ] + ) + task_messages = list(node_config["task_messages"]) # Copy to avoid mutating original if deactivated_function_names: - deactivated_warning = { + deactivated_functions_message = { "role": "system", "content": ( + f"{self._DEACTIVATED_FUNCTIONS_MESSAGE_PREFIX}" f"IMPORTANT: The following functions are currently DEACTIVATED and should NOT be " f"called: {', '.join(deactivated_function_names)}. If you call them, they " f"will return an error. ALL OTHER AVAILABLE FUNCTIONS should be considered " "currently ACTIVE and callable." ), } - task_messages.insert(0, deactivated_warning) + task_messages.insert(0, deactivated_functions_message) # Update LLM context await self._update_llm_context( diff --git a/tests/test_context_strategies.py b/tests/test_context_strategies.py index 66441a79..bcf0957b 100644 --- a/tests/test_context_strategies.py +++ b/tests/test_context_strategies.py @@ -80,6 +80,14 @@ async def test_context_strategy_config_validation(self): with self.assertRaises(ValueError): ContextStrategyConfig(strategy=ContextStrategy.RESET_WITH_SUMMARY) + def _get_all_queued_frames(self): + """Helper to collect all frames from all queue_frames calls.""" + all_frames = [] + for call in self.mock_task.queue_frames.call_args_list: + frames = call[0][0] + all_frames.extend(frames) + return all_frames + async def test_default_strategy(self): """Test default context strategy (APPEND).""" flow_manager = FlowManager( @@ -91,8 +99,7 @@ async def test_default_strategy(self): # First node should use UpdateFrame regardless of strategy await flow_manager._set_node("first", self.sample_node) - first_call = self.mock_task.queue_frames.call_args_list[0] - first_frames = first_call[0][0] + first_frames = self._get_all_queued_frames() self.assertTrue(any(isinstance(f, LLMMessagesUpdateFrame) for f in first_frames)) # Reset mock @@ -100,8 +107,7 @@ async def test_default_strategy(self): # Subsequent node should use AppendFrame with default strategy await flow_manager._set_node("second", self.sample_node) - second_call = self.mock_task.queue_frames.call_args_list[0] - second_frames = second_call[0][0] + second_frames = self._get_all_queued_frames() self.assertTrue(any(isinstance(f, LLMMessagesAppendFrame) for f in second_frames)) async def test_reset_strategy(self): @@ -120,8 +126,7 @@ async def test_reset_strategy(self): # Second node should use UpdateFrame with RESET strategy await flow_manager._set_node("second", self.sample_node) - second_call = self.mock_task.queue_frames.call_args_list[0] - second_frames = second_call[0][0] + second_frames = self._get_all_queued_frames() self.assertTrue(any(isinstance(f, LLMMessagesUpdateFrame) for f in second_frames)) async def test_reset_with_summary_success(self): @@ -148,8 +153,7 @@ async def test_reset_with_summary_success(self): await flow_manager._set_node("second", self.sample_node) # Verify summary was included in context update - second_call = self.mock_task.queue_frames.call_args_list[0] - second_frames = second_call[0][0] + second_frames = self._get_all_queued_frames() update_frame = next(f for f in second_frames if isinstance(f, LLMMessagesUpdateFrame)) self.assertTrue(any(mock_summary in str(m) for m in update_frame.messages)) @@ -175,9 +179,8 @@ async def test_reset_with_summary_timeout(self): await flow_manager._set_node("second", self.sample_node) - # Verify UpdateFrame was used (APPEND behavior) - second_call = self.mock_task.queue_frames.call_args_list[0] - second_frames = second_call[0][0] + # Verify AppendFrame was used (fallback to APPEND behavior on timeout) + second_frames = self._get_all_queued_frames() self.assertTrue(any(isinstance(f, LLMMessagesAppendFrame) for f in second_frames)) async def test_provider_specific_summary_formatting(self): @@ -234,8 +237,7 @@ async def test_node_level_strategy_override(self): await flow_manager._set_node("second", node_with_strategy) # Verify UpdateFrame was used (RESET behavior) despite global APPEND - second_call = self.mock_task.queue_frames.call_args_list[0] - second_frames = second_call[0][0] + second_frames = self._get_all_queued_frames() self.assertTrue(any(isinstance(f, LLMMessagesUpdateFrame) for f in second_frames)) async def test_summary_generation_content(self): @@ -299,9 +301,11 @@ async def test_context_structure_after_summary(self): await flow_manager._set_node("second", new_node) # Verify context structure - update_call = self.mock_task.queue_frames.call_args_list[0] - update_frames = update_call[0][0] - messages_frame = next(f for f in update_frames if isinstance(f, LLMMessagesUpdateFrame)) + all_frames = self._get_all_queued_frames() + messages_frame = next( + (f for f in all_frames if isinstance(f, LLMMessagesUpdateFrame)), None + ) + self.assertIsNotNone(messages_frame, "LLMMessagesUpdateFrame should be queued") # Verify order: summary message, then new task messages self.assertTrue(mock_summary in str(messages_frame.messages[0])) @@ -334,6 +338,14 @@ async def asyncSetUp(self): self.mock_context_aggregator.user.return_value = MagicMock() self.mock_context_aggregator.user.return_value._context = MagicMock() + def _get_all_queued_frames(self): + """Helper to collect all frames from all queue_frames calls.""" + all_frames = [] + for call in self.mock_task.queue_frames.call_args_list: + frames = call[0][0] + all_frames.extend(frames) + return all_frames + async def test_deactivated_functions_carried_over_on_append(self): """Test that functions from previous node are carried over as deactivated on APPEND.""" flow_manager = FlowManager( @@ -442,9 +454,11 @@ async def test_deactivated_warning_message_injected(self): await flow_manager._set_node("second", second_node) # Verify warning message was injected - queue_frames_call = self.mock_task.queue_frames.call_args_list[0] - frames = queue_frames_call[0][0] - messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + all_frames = self._get_all_queued_frames() + messages_frame = next( + (f for f in all_frames if isinstance(f, LLMMessagesAppendFrame)), None + ) + self.assertIsNotNone(messages_frame, "LLMMessagesAppendFrame should be queued") # Check that warning message is present warning_found = any( @@ -557,9 +571,11 @@ async def test_global_functions_not_deactivated(self): await flow_manager._set_node("second", second_node) # Verify warning message mentions node_function but NOT global_function - queue_frames_call = self.mock_task.queue_frames.call_args_list[0] - frames = queue_frames_call[0][0] - messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + all_frames = self._get_all_queued_frames() + messages_frame = next( + (f for f in all_frames if isinstance(f, LLMMessagesAppendFrame)), None + ) + self.assertIsNotNone(messages_frame, "LLMMessagesAppendFrame should be queued") for msg in messages_frame.messages: content = str(msg.get("content", "")) @@ -694,9 +710,11 @@ async def test_deactivated_functions_persist_across_multiple_transitions(self): await flow_manager._set_node("second", second_node) # Verify function_a was deactivated - queue_frames_call = self.mock_task.queue_frames.call_args_list[0] - frames = queue_frames_call[0][0] - messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + all_frames = self._get_all_queued_frames() + messages_frame = next( + (f for f in all_frames if isinstance(f, LLMMessagesAppendFrame)), None + ) + self.assertIsNotNone(messages_frame, "LLMMessagesAppendFrame should be queued") warning_found = any( "function_a" in str(msg.get("content", "")) and "deactivated" in str(msg.get("content", "")).lower() @@ -726,9 +744,11 @@ async def test_deactivated_functions_persist_across_multiple_transitions(self): await flow_manager._set_node("third", third_node) # Verify both function_a and function_b are mentioned as deactivated - queue_frames_call = self.mock_task.queue_frames.call_args_list[0] - frames = queue_frames_call[0][0] - messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + all_frames = self._get_all_queued_frames() + messages_frame = next( + (f for f in all_frames if isinstance(f, LLMMessagesAppendFrame)), None + ) + self.assertIsNotNone(messages_frame, "LLMMessagesAppendFrame should be queued") warning_content = "" for msg in messages_frame.messages: @@ -848,9 +868,11 @@ async def test_deactivated_function_reactivated_when_present_in_new_node(self): ) # Verify no deactivation warning was issued for function_a - queue_frames_call = self.mock_task.queue_frames.call_args_list[0] - frames = queue_frames_call[0][0] - messages_frame = next(f for f in frames if isinstance(f, LLMMessagesAppendFrame)) + all_frames = self._get_all_queued_frames() + messages_frame = next( + (f for f in all_frames if isinstance(f, LLMMessagesAppendFrame)), None + ) + self.assertIsNotNone(messages_frame, "LLMMessagesAppendFrame should be queued") for msg in messages_frame.messages: content = str(msg.get("content", "")) @@ -1168,3 +1190,118 @@ async def test_functions_always_registered_on_each_transition(self): self.assertGreaterEqual( new_registrations, 2, "Both functions should be re-registered on second transition" ) + + async def test_deactivated_warnings_transform_frame_queued(self): + """Test that an LLMMessagesTransformFrame is queued to remove old warnings.""" + from pipecat.frames.frames import LLMMessagesTransformFrame + + from pipecat_flows.manager import FlowManager + + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + handler = AsyncMock(return_value={"status": "success"}) + + # First node with function_a + first_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "First task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_a", + "handler": handler, + "description": "Function A", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("first", first_node) + self.mock_task.queue_frames.reset_mock() + + # Second node with only function_b (function_a becomes deactivated) + second_node: NodeConfig = { + "task_messages": [{"role": "system", "content": "Second task."}], + "functions": [ + { + "type": "function", + "function": { + "name": "function_b", + "handler": handler, + "description": "Function B", + "parameters": {"type": "object", "properties": {}}, + }, + }, + ], + } + + await flow_manager._set_node("second", second_node) + + # Verify LLMMessagesTransformFrame was queued + all_frames = [] + for call in self.mock_task.queue_frames.call_args_list: + frames = call[0][0] + all_frames.extend(frames) + + transform_frames = [f for f in all_frames if isinstance(f, LLMMessagesTransformFrame)] + self.assertGreater( + len(transform_frames), + 0, + "LLMMessagesTransformFrame should be queued to remove old warnings", + ) + + async def test_deactivated_warnings_transform_function_removes_warnings(self): + """Test that the transform function correctly removes deactivated warning messages.""" + from pipecat_flows.manager import FlowManager + + flow_manager = FlowManager( + task=self.mock_task, + llm=self.mock_llm, + context_aggregator=self.mock_context_aggregator, + context_strategy=ContextStrategyConfig(strategy=ContextStrategy.APPEND), + ) + await flow_manager.initialize() + + warning_marker = FlowManager._DEACTIVATED_FUNCTIONS_MESSAGE_PREFIX + + # Test messages with warnings mixed in + input_messages = [ + {"role": "system", "content": f"{warning_marker}Old warning 1"}, + {"role": "user", "content": "Hello"}, + {"role": "system", "content": f"{warning_marker}Old warning 2"}, + {"role": "assistant", "content": "Hi there"}, + {"role": "system", "content": f"{warning_marker}Old warning 3"}, + {"role": "system", "content": "Regular system message"}, + ] + + # Apply the transform method directly + result = flow_manager._remove_deactivated_functions_messages(input_messages) + + # Verify all warnings were removed + warning_count = sum( + 1 + for msg in result + if isinstance(msg.get("content", ""), str) and msg["content"].startswith(warning_marker) + ) + self.assertEqual(warning_count, 0, "All warnings should be removed") + + # Verify non-warning messages are preserved + self.assertEqual(len(result), 3, "Should have 3 messages remaining") + user_messages = [msg for msg in result if msg.get("role") == "user"] + assistant_messages = [msg for msg in result if msg.get("role") == "assistant"] + system_messages = [msg for msg in result if msg.get("role") == "system"] + self.assertEqual(len(user_messages), 1, "User message should be preserved") + self.assertEqual(len(assistant_messages), 1, "Assistant message should be preserved") + self.assertEqual(len(system_messages), 1, "Non-warning system message should be preserved") + self.assertEqual( + system_messages[0]["content"], + "Regular system message", + "Regular system message content should be preserved", + ) diff --git a/tests/test_manager.py b/tests/test_manager.py index b4ad0572..84b62d70 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -97,6 +97,14 @@ async def asyncSetUp(self): }, } + def _get_all_queued_frames(self): + """Helper to collect all frames from all queue_frames calls.""" + all_frames = [] + for call in self.mock_task.queue_frames.call_args_list: + frames = call[0][0] + all_frames.extend(frames) + return all_frames + async def test_static_flow_initialization(self): """Test initialization of a static flow configuration.""" flow_manager = FlowManager( @@ -175,12 +183,11 @@ async def test_static_flow_transitions(self, mock_llm_run_frame): # The first call should be for the context update self.assertTrue(self.mock_task.queue_frames.called) - # Get the first call (context update) - first_call = self.mock_task.queue_frames.call_args_list[0] - first_frames = first_call[0][0] + # Collect all queued frames + all_frames = self._get_all_queued_frames() # For subsequent nodes, should use AppendFrame by default - append_frames = [f for f in first_frames if isinstance(f, LLMMessagesAppendFrame)] + append_frames = [f for f in all_frames if isinstance(f, LLMMessagesAppendFrame)] self.assertTrue(len(append_frames) > 0, "Should have at least one AppendFrame") # Verify that LLM completion was triggered by checking LLMRunFrame instantiation @@ -1040,8 +1047,7 @@ async def test_role_message_inheritance(self): # Set first node and verify UpdateFrame await flow_manager.set_node_from_config(first_node) - first_call = self.mock_task.queue_frames.call_args_list[0] # Get first call - first_frames = first_call[0][0] + first_frames = self._get_all_queued_frames() update_frames = [f for f in first_frames if isinstance(f, LLMMessagesUpdateFrame)] self.assertEqual(len(update_frames), 1) @@ -1054,8 +1060,7 @@ async def test_role_message_inheritance(self): await flow_manager.set_node_from_config(second_node) # Verify AppendFrame for second node - first_call = self.mock_task.queue_frames.call_args_list[0] # Get first call - second_frames = first_call[0][0] + second_frames = self._get_all_queued_frames() append_frames = [f for f in second_frames if isinstance(f, LLMMessagesAppendFrame)] self.assertEqual(len(append_frames), 1) @@ -1078,8 +1083,7 @@ async def test_frame_type_selection(self): # First node should use UpdateFrame await flow_manager.set_node_from_config(test_node) - first_call = self.mock_task.queue_frames.call_args_list[0] # Get first call - first_frames = first_call[0][0] + first_frames = self._get_all_queued_frames() self.assertTrue( any(isinstance(f, LLMMessagesUpdateFrame) for f in first_frames), "First node should use UpdateFrame", @@ -1094,8 +1098,7 @@ async def test_frame_type_selection(self): # Second node should use AppendFrame await flow_manager.set_node_from_config(test_node) - first_call = self.mock_task.queue_frames.call_args_list[0] # Get first call - second_frames = first_call[0][0] + second_frames = self._get_all_queued_frames() self.assertTrue( any(isinstance(f, LLMMessagesAppendFrame) for f in second_frames), "Subsequent nodes should use AppendFrame", From 8aa949814e526d949b84759a1f921ba188fb1783 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Thu, 5 Feb 2026 17:06:02 -0500 Subject: [PATCH 11/13] Bump pipecat dependency to 0.0.102, to pick up needed changes: - `LLMMessagesTransformFrame` - Changes to `GeminiLiveLLMService` that allow its context (messages, tools) to be programmatically updated at runtime --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 51d849dc..fff86f20 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,7 +19,7 @@ classifiers = [ "Topic :: Multimedia :: Sound/Audio", ] dependencies = [ - "pipecat-ai>=0.0.85", + "pipecat-ai>=0.0.102", "loguru~=0.7.2", "docstring_parser~=0.16" ] From a237c1bd4afb5efd87c8600458552ac225bacb67 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Fri, 6 Feb 2026 10:15:12 -0500 Subject: [PATCH 12/13] Tweak CHANGELOG --- CHANGELOG.md | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e1854616..48e5b237 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,12 +34,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 Non-decorated direct functions continue to work with `cancel_on_interruption=True` (the default behavior). -- Added support for using Gemini Live (`GeminiLiveLLMService`) with Pipecat - Flows. Updated examples to support `LLM_PROVIDER=gemini_live`, accordingly. - Note that using Gemini Live requires that you use `LLMContext` and - `LLMContextAggregatorPair` rather than the deprecated `OpenAILLMContext` and - associated aggregators. - ### Changed - When transitioning from one node to the next, any functions in the previous @@ -50,9 +44,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 calling Providing LLMs with deactivated functions helps them understand historical - context that might contain references to previously-active functions. Gemini - Live is particularly sensitive, erroring out when its context (even the text - messages) refer to missing functions. + context that might contain references to previously-active functions. ## [0.0.22] - 2025-11-18 From fbb44a3e2dcac9528996f1eb7f7f1ef7c4a173b4 Mon Sep 17 00:00:00 2001 From: Paul Kompfner Date: Fri, 6 Feb 2026 10:29:07 -0500 Subject: [PATCH 13/13] Back out changes, for now, that are specific only to Gemini Live support --- examples/food_ordering.py | 37 ++++------- examples/food_ordering_direct_functions.py | 39 +++++------ examples/insurance_quote.py | 65 ++++++++----------- examples/patient_intake.py | 41 +++++------- examples/podcast_interview.py | 39 +++++------ examples/restaurant_reservation.py | 44 +++++-------- ...restaurant_reservation_direct_functions.py | 44 +++++-------- examples/utils.py | 50 ++++++-------- examples/warm_transfer.py | 44 +++++-------- src/pipecat_flows/adapters.py | 32 ++------- src/pipecat_flows/manager.py | 17 +---- 11 files changed, 163 insertions(+), 289 deletions(-) diff --git a/examples/food_ordering.py b/examples/food_ordering.py index c8c58330..b61cf4ee 100644 --- a/examples/food_ordering.py +++ b/examples/food_ordering.py @@ -52,7 +52,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import ( FlowArgs, @@ -345,14 +345,10 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the food ordering bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = ( - CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman - ) - if needs_stt_tts() - else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -369,20 +365,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/food_ordering_direct_functions.py b/examples/food_ordering_direct_functions.py index b44fecba..bf9b7e75 100644 --- a/examples/food_ordering_direct_functions.py +++ b/examples/food_ordering_direct_functions.py @@ -52,7 +52,7 @@ from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import FlowManager, FlowResult, NodeConfig @@ -193,7 +193,7 @@ def create_initial_node() -> NodeConfig: role_messages=[ { "role": "system", - "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. When you've decided to call a function to progress the conversation, do not also respond; the function call by itself is enough. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", + "content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", } ], task_messages=[ @@ -288,14 +288,10 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the food ordering bot with direct functions.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = ( - CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman - ) - if needs_stt_tts() - else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -312,20 +308,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/insurance_quote.py b/examples/insurance_quote.py index 83b2f65a..13c18625 100644 --- a/examples/insurance_quote.py +++ b/examples/insurance_quote.py @@ -57,7 +57,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import FlowArgs, FlowManager, FlowResult, FlowsFunctionSchema, NodeConfig @@ -115,12 +115,12 @@ class CoverageUpdateResult(FlowResult, InsuranceQuote): # Function handlers -async def record_age( +async def collect_age( args: FlowArgs, flow_manager: FlowManager ) -> tuple[AgeCollectionResult, NodeConfig]: """Process age collection.""" age = args["age"] - logger.debug(f"record_age handler executing with age: {age}") + logger.debug(f"collect_age handler executing with age: {age}") flow_manager.state["age"] = age result = AgeCollectionResult(age=age) @@ -130,12 +130,12 @@ async def record_age( return result, next_node -async def record_marital_status( +async def collect_marital_status( args: FlowArgs, flow_manager: FlowManager ) -> tuple[MaritalStatusResult, NodeConfig]: """Process marital status collection.""" status = args["marital_status"] - logger.debug(f"record_marital_status handler executing with status: {status}") + logger.debug(f"collect_marital_status handler executing with status: {status}") result = MaritalStatusResult(marital_status=status) @@ -208,25 +208,23 @@ def create_initial_node() -> NodeConfig: "content": ( "You are a friendly insurance agent. Your responses will be " "converted to audio, so avoid special characters. Always use " - "the available functions to progress the conversation naturally. " - "When you've decided to call a function, do not also respond; " - "the function call by itself is enough." + "the available functions to progress the conversation naturally." ), } ], "task_messages": [ { "role": "system", - "content": "Start by asking for the customer's age, then record their response using the record_age function.", + "content": "Start by asking for the customer's age.", } ], "functions": [ FlowsFunctionSchema( - name="record_age", + name="collect_age", description="Record customer's age", properties={"age": {"type": "integer"}}, required=["age"], - handler=record_age, + handler=collect_age, ) ], } @@ -244,11 +242,11 @@ def create_marital_status_node() -> NodeConfig: ], "functions": [ FlowsFunctionSchema( - name="record_marital_status", + name="collect_marital_status", description="Record marital status after customer provides it", properties={"marital_status": {"type": "string", "enum": ["single", "married"]}}, required=["marital_status"], - handler=record_marital_status, + handler=collect_marital_status, ) ], } @@ -297,11 +295,11 @@ def create_quote_results_node( f"Monthly Premium: ${quote['monthly_premium']:.2f}\n" f"Coverage Amount: ${quote['coverage_amount']:,}\n" f"Deductible: ${quote['deductible']:,}\n\n" - "Explain these quote details to the customer. If they then request changes, " - "use update_coverage to recalculate their quote. If this is an updated quote (from a previous quote), explain how their " + "Explain these quote details to the customer. When they request changes, " + "use update_coverage to recalculate their quote. Explain how their " "changes affected the premium and compare it to their previous quote. " "Ask if they'd like to make any other adjustments or if they're ready " - "to end the quote process. " + "to end the quote process." ), } ], @@ -346,14 +344,10 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the insurance quote bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = ( - CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - if needs_stt_tts() - else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -370,20 +364,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/patient_intake.py b/examples/patient_intake.py index de481ab4..ecf769aa 100644 --- a/examples/patient_intake.py +++ b/examples/patient_intake.py @@ -54,7 +54,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import ( ContextStrategy, @@ -319,7 +319,7 @@ def create_allergies_node() -> NodeConfig: task_messages=[ { "role": "system", - "content": "Your job now is to collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.", + "content": "Collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.", } ], functions=[record_allergies_func], @@ -355,7 +355,7 @@ def create_conditions_node() -> NodeConfig: task_messages=[ { "role": "system", - "content": "Your job now is to collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.", + "content": "Collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.", } ], functions=[record_conditions_func], @@ -479,14 +479,10 @@ def create_end_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): """Run the patient intake bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = ( - CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - if needs_stt_tts() - else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -503,20 +499,15 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): ) pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/podcast_interview.py b/examples/podcast_interview.py index 8126b1bc..ddbbe722 100644 --- a/examples/podcast_interview.py +++ b/examples/podcast_interview.py @@ -63,7 +63,7 @@ from pipecat.services.deepgram.stt import DeepgramSTTService from pipecat.transports.base_transport import BaseTransport, TransportParams from pipecat.transports.daily.transport import DailyParams -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import ( FlowArgs, @@ -246,15 +246,11 @@ def create_final_node() -> NodeConfig: async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): logger.info(f"Starting bot") - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) - tts = ( - CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - if needs_stt_tts() - else None + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) # LLM service is created using the create_llm function from utils.py @@ -274,21 +270,16 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments): rtvi = RTVIProcessor(config=RTVIConfig(config=[])) pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - rtvi, - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + rtvi, + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask( diff --git a/examples/restaurant_reservation.py b/examples/restaurant_reservation.py index 95583f50..8c945928 100644 --- a/examples/restaurant_reservation.py +++ b/examples/restaurant_reservation.py @@ -53,7 +53,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import FlowArgs, FlowManager, FlowResult, FlowsFunctionSchema, NodeConfig @@ -199,7 +199,7 @@ def create_initial_node(wait_for_user: bool) -> NodeConfig: "role_messages": [ { "role": "system", - "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis. When you've decided to call a function, do not also respond; the function call by itself is enough.", + "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis.", } ], "task_messages": [ @@ -221,7 +221,7 @@ def create_time_selection_node() -> NodeConfig: "task_messages": [ { "role": "system", - "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. When they provide a time, check availability by calling the appropriate function.", + "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM.", } ], "functions": [availability_schema], @@ -253,8 +253,7 @@ def create_no_availability_node(alternative_times: list[str]) -> NodeConfig: "content": ( f"Apologize that the requested time is not available. " f"Suggest these alternative times: {times_list}. " - "Ask if they'd like to try one of these times. " - "If not, end the conversation by calling the appropriate function." + "Ask if they'd like to try one of these times." ), } ], @@ -281,14 +280,10 @@ async def run_bot( transport: BaseTransport, runner_args: RunnerArguments, wait_for_user: bool = False ): """Run the restaurant reservation bot.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = ( - CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - if needs_stt_tts() - else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -305,20 +300,15 @@ async def run_bot( ) pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/restaurant_reservation_direct_functions.py b/examples/restaurant_reservation_direct_functions.py index 745f8df5..2321b88b 100644 --- a/examples/restaurant_reservation_direct_functions.py +++ b/examples/restaurant_reservation_direct_functions.py @@ -55,7 +55,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import FlowManager, FlowResult, NodeConfig @@ -185,7 +185,7 @@ def create_initial_node(wait_for_user: bool) -> NodeConfig: "role_messages": [ { "role": "system", - "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis. When you've decided to call a function, do not also respond; the function call by itself is enough.", + "content": "You are a restaurant reservation assistant for La Maison, an upscale French restaurant. Be casual and friendly. This is a voice conversation, so avoid special characters and emojis.", } ], "task_messages": [ @@ -207,7 +207,7 @@ def create_time_selection_node() -> NodeConfig: "task_messages": [ { "role": "system", - "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM. When they provide a time, check availability by calling the appropriate function.", + "content": "Ask what time they'd like to dine. Restaurant is open 5 PM to 10 PM.", } ], "functions": [check_availability], @@ -239,8 +239,7 @@ def create_no_availability_node(alternative_times: list[str]) -> NodeConfig: "content": ( f"Apologize that the requested time is not available. " f"Suggest these alternative times: {times_list}. " - "Ask if they'd like to try one of these times. " - "If not, end the conversation by calling the appropriate function." + "Ask if they'd like to try one of these times." ), } ], @@ -267,14 +266,10 @@ async def run_bot( transport: BaseTransport, runner_args: RunnerArguments, wait_for_user: bool = False ): """Run the restaurant reservation bot with direct functions.""" - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = ( - CartesiaTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady - ) - if needs_stt_tts() - else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady ) # LLM service is created using the create_llm function from utils.py # Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable @@ -291,20 +286,15 @@ async def run_bot( ) pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/examples/utils.py b/examples/utils.py index 4daf82b8..3e962ae0 100644 --- a/examples/utils.py +++ b/examples/utils.py @@ -14,9 +14,13 @@ from typing import Any -def create_llm() -> Any: +def create_llm(provider: str = None, model: str = None) -> Any: """Create an LLM service instance based on environment configuration. + Args: + provider: LLM provider name. If None, uses LLM_PROVIDER env var (defaults to 'openai') + model: Model name. If None, uses provider's default model + Returns: Configured LLM service instance @@ -29,13 +33,24 @@ def create_llm() -> Any: - google: Requires GOOGLE_API_KEY - aws: Uses AWS default credential chain (SSO, environment variables, or IAM roles) Optionally set AWS_REGION (defaults to us-west-2) - - gemini_live: Requires GOOGLE_API_KEY Usage: - # Use provider from LLM_PROVIDER env var (defaults to OpenAI) + # Use default provider (from LLM_PROVIDER env var, defaults to OpenAI) llm = create_llm() + + # Use specific provider + llm = create_llm("anthropic") + + # Use specific provider and model + llm = create_llm("openai", "gpt-4o-mini") + + # Use AWS Bedrock (requires AWS credentials via SSO, env vars, or IAM) + llm = create_llm("aws") """ - provider = _resolve_provider() + if provider is None: + provider = os.getenv("LLM_PROVIDER", "openai").lower() + else: + provider = provider.lower() # Provider configurations configs = { @@ -60,11 +75,6 @@ def create_llm() -> Any: "default_model": "us.anthropic.claude-haiku-4-5-20251001-v1:0", "region": "us-west-2", }, - "gemini_live": { - "service": "pipecat.services.google.gemini_live.llm.GeminiLiveLLMService", - "api_key_env": "GOOGLE_API_KEY", - "default_model": "models/gemini-2.5-flash-native-audio-preview-12-2025", - }, } config = configs.get(provider) @@ -85,8 +95,8 @@ def create_llm() -> Any: if not api_key: raise ValueError(f"Missing API key: {config['api_key_env']} for provider: {provider}") - # Use default model - selected_model = config["default_model"] + # Use provided model or default + selected_model = model or config["default_model"] # Build kwargs kwargs = {"api_key": api_key, "model": selected_model} @@ -99,21 +109,3 @@ def create_llm() -> Any: del kwargs["api_key"] return service_class(**kwargs) - - -def needs_stt_tts() -> bool: - """Return True when STT/TTS should be enabled for the LLM service specified by environment configuration. - - Returns: - True if STT/TTS should be enabled, False if the provider handles audio natively. - """ - return _resolve_provider() != "gemini_live" - - -def _resolve_provider() -> str: - """Resolve the LLM provider name from environment configuration. - - Returns: - Lower-cased provider name. - """ - return os.getenv("LLM_PROVIDER", "openai").lower() diff --git a/examples/warm_transfer.py b/examples/warm_transfer.py index 5b7f5668..37ff46d4 100644 --- a/examples/warm_transfer.py +++ b/examples/warm_transfer.py @@ -74,7 +74,7 @@ TurnAnalyzerUserTurnStopStrategy, ) from pipecat.turns.user_turn_strategies import UserTurnStrategies -from utils import create_llm, needs_stt_tts +from utils import create_llm from pipecat_flows import ContextStrategyConfig, FlowManager, FlowResult, NodeConfig from pipecat_flows.types import ActionConfig, ContextStrategy, FlowArgs, FlowsFunctionSchema @@ -277,7 +277,7 @@ def create_initial_customer_interaction_node() -> NodeConfig: role_messages=[ { "role": "system", - "content": "You are an assistant for ABC Widget Company. You must ALWAYS use the available functions to progress the conversation. When you've decided to call a function to progress the conversation, do not also respond; the function call by itself is enough. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", + "content": "You are an assistant for ABC Widget Company. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.", } ], task_messages=[ @@ -329,10 +329,7 @@ def create_continued_customer_interaction_node() -> NodeConfig: task_messages=[ { "role": "system", - "content": """ - Finish helping the customer with their previous request, if you haven't already. - - Then ask the customer there's anything else you could help them with today, or if they'd like to end the conversation. If they need more help, re-offer the two choices you offered before: you could provide store location and hours of operation, or begin placing an order. + "content": """Ask the customer there's anything else you could help them with today, or if they'd like to end the conversation. If they need more help, re-offer the two choices you offered before: you could provide store location and hours of operation, or begin placing an order. To help the customer: - Use the check_store_location_and_hours_of_operation function to check store location and hours of operation to provide to the customer @@ -640,14 +637,10 @@ async def main(): vad_analyzer=SileroVADAnalyzer(params=VADParams(stop_secs=0.2)), ), ) - stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None - tts = ( - CartesiaHttpTTSService( - api_key=os.getenv("CARTESIA_API_KEY"), - voice_id="d46abd1d-2d02-43e8-819f-51fb652c1c61", # Newsman - ) - if needs_stt_tts() - else None + stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) + tts = CartesiaHttpTTSService( + api_key=os.getenv("CARTESIA_API_KEY"), + voice_id="d46abd1d-2d02-43e8-819f-51fb652c1c61", # Newsman ) llm = create_llm() @@ -666,20 +659,15 @@ async def main(): # Create pipeline pipeline = Pipeline( - list( - filter( - None, - [ - transport.input(), - stt, - context_aggregator.user(), - llm, - tts, - transport.output(), - context_aggregator.assistant(), - ], - ) - ) + [ + transport.input(), + stt, + context_aggregator.user(), + llm, + tts, + transport.output(), + context_aggregator.assistant(), + ] ) task = PipelineTask(pipeline=pipeline, params=PipelineParams(allow_interruptions=True)) diff --git a/src/pipecat_flows/adapters.py b/src/pipecat_flows/adapters.py index dfe6d463..f58623af 100644 --- a/src/pipecat_flows/adapters.py +++ b/src/pipecat_flows/adapters.py @@ -47,16 +47,6 @@ class LLMAdapter: - AWS Bedrock: Uses Anthropic-compatible format """ - def __init__(self, supports_summarization: bool = True) -> None: - """Initialize the adapter. - - Args: - supports_summarization: Indicates if the LLM supports generating - summaries (which determines whether generate_summary() can be - used). - """ - self.supports_summarization = supports_summarization - def get_function_name(self, function_def: Union[Dict[str, Any], FlowsFunctionSchema]) -> str: """Extract function name from provider-specific function definition or schema. @@ -181,9 +171,6 @@ async def generate_summary( Generated summary text, or None if generation fails. """ try: - if not self.supports_summarization: - raise RuntimeError("This LLM does not support generating summaries.") - if isinstance(context, LLMContext): messages = context.get_messages() else: @@ -230,16 +217,6 @@ class UniversalLLMAdapter(LLMAdapter): be in the standard FlowsFunctionSchema format. """ - def __init__(self, supports_summarization: bool) -> None: - """Initialize the adapter. - - Args: - supports_summarization: Indicates if the LLM supports generating - summaries (which determines whether generate_summary() can be - used). - """ - super().__init__(supports_summarization=supports_summarization) - def _get_function_name_from_dict(self, function_def: Dict[str, Any]) -> str: raise RuntimeError( "Provider-specific function definitions are not supported in flows using universal LLMContext. Use FlowsFunctionSchemas or direct functions instead." @@ -687,14 +664,13 @@ def create_adapter(llm, context_aggregator) -> LLMAdapter: Raises: ValueError: If LLM type is not supported or required dependency not installed. """ - llm_type = type(llm).__name__ - llm_class = type(llm) - if isinstance(context_aggregator, LLMContextAggregatorPair): # Universal LLMContext is in use, so we need the universal adapter logger.debug("Creating universal adapter") - llm_supports_summarization = llm_type not in ["GeminiLiveLLMService"] - return UniversalLLMAdapter(supports_summarization=llm_supports_summarization) + return UniversalLLMAdapter() + + llm_type = type(llm).__name__ + llm_class = type(llm) if llm_type == "OpenAILLMService": logger.debug("Creating OpenAI adapter") diff --git a/src/pipecat_flows/manager.py b/src/pipecat_flows/manager.py index 17fd46f4..0fd9f63f 100644 --- a/src/pipecat_flows/manager.py +++ b/src/pipecat_flows/manager.py @@ -951,26 +951,11 @@ async def register_direct_function(func): # Determine effective context strategy for this transition effective_strategy = node_config.get("context_strategy") or self._context_strategy - # If RESET_WITH_SUMMARY is not supported for this LLM, fall back to - # APPEND and log a warning. In the future, we may want to add - # support for specifying a summarization LLM. - if ( - effective_strategy.strategy == ContextStrategy.RESET_WITH_SUMMARY - and not self._adapter.supports_summarization - ): - logger.warning( - f"Context strategy RESET_WITH_SUMMARY is not supported by the current LLM. " - f"Falling back to APPEND strategy for node {node_id}." - ) - effective_strategy = ContextStrategyConfig(strategy=ContextStrategy.APPEND) - # For APPEND strategy on non-first nodes, carry over functions from # the previous node that aren't in the current node (but marking # them as "deactivated"), since there may be historical context # messages that call or reference them. This helps LLMs better - # understand their context, and also prevents errors: Gemini Live - # is particularly sensitive, erroring out when it has context - # messages (even text messages) referring to missing functions. + # understand their context. deactivated_function_names: List[str] = [] if ( self._current_node is not None