Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Non-decorated direct functions continue to work with `cancel_on_interruption=True`
(the default behavior).

- Added support for using Gemini Live (`GeminiLiveLLMService`) with Pipecat
Flows. Updated examples to support `LLM_PROVIDER=gemini_live`, accordingly.
Note that using Gemini Live requires that you use `LLMContext` and
`LLMContextAggregatorPair` rather than the deprecated `OpenAILLMContext` and
associated aggregators.

### Changed

- When transitioning from one node to the next, any functions in the previous
node that are absent in the new node now get "carried over", in that they're
still passed to the LLM, but in a "deactivated" state. Deactivation involves:
- Adding a special prefix to the function description
- Adding special context messages telling the LLM which functions to avoid
calling

Providing LLMs with deactivated functions helps them understand historical
context that might contain references to previously-active functions. Gemini
Live is particularly sensitive, erroring out when its context (even the text
messages) refer to missing functions.

## [0.0.22] - 2025-11-18

### Added
Expand Down
39 changes: 24 additions & 15 deletions examples/food_ordering.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from utils import create_llm
from utils import create_llm, needs_stt_tts

from pipecat_flows import (
FlowArgs,
Expand Down Expand Up @@ -139,7 +139,7 @@ async def choose_sushi(args: FlowArgs, flow_manager: FlowManager) -> tuple[None,
role_messages=[
{
"role": "system",
"content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.",
"content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. When you've decided to call a function to progress the conversation, do not also respond; the function call by itself is enough. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.",
}
],
task_messages=[
Expand Down Expand Up @@ -345,10 +345,14 @@ def create_end_node() -> NodeConfig:

async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Run the food ordering bot."""
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None
tts = (
CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman
)
if needs_stt_tts()
else None
)
# LLM service is created using the create_llm function from utils.py
# Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable
Expand All @@ -365,15 +369,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)

pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
list(
filter(
None,
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
],
)
)
)

task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
Expand Down
39 changes: 24 additions & 15 deletions examples/food_ordering_direct_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from pipecat.transports.websocket.fastapi import FastAPIWebsocketParams
from pipecat.turns.user_stop import TurnAnalyzerUserTurnStopStrategy
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from utils import create_llm
from utils import create_llm, needs_stt_tts

from pipecat_flows import FlowManager, FlowResult, NodeConfig

Expand Down Expand Up @@ -193,7 +193,7 @@ def create_initial_node() -> NodeConfig:
role_messages=[
{
"role": "system",
"content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.",
"content": "You are an order-taking assistant. You must ALWAYS use the available functions to progress the conversation. When you've decided to call a function to progress the conversation, do not also respond; the function call by itself is enough. This is a phone conversation and your responses will be converted to audio. Keep the conversation friendly, casual, and polite. Avoid outputting special characters and emojis.",
}
],
task_messages=[
Expand Down Expand Up @@ -288,10 +288,14 @@ def create_end_node() -> NodeConfig:

async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Run the food ordering bot with direct functions."""
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None
tts = (
CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="820a3788-2b37-4d21-847a-b65d8a68c99a", # Salesman
)
if needs_stt_tts()
else None
)
# LLM service is created using the create_llm function from utils.py
# Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable
Expand All @@ -308,15 +312,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)

pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
list(
filter(
None,
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
],
)
)
)

task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
Expand Down
65 changes: 38 additions & 27 deletions examples/insurance_quote.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from utils import create_llm
from utils import create_llm, needs_stt_tts

from pipecat_flows import FlowArgs, FlowManager, FlowResult, FlowsFunctionSchema, NodeConfig

Expand Down Expand Up @@ -115,12 +115,12 @@ class CoverageUpdateResult(FlowResult, InsuranceQuote):


# Function handlers
async def collect_age(
async def record_age(
args: FlowArgs, flow_manager: FlowManager
) -> tuple[AgeCollectionResult, NodeConfig]:
"""Process age collection."""
age = args["age"]
logger.debug(f"collect_age handler executing with age: {age}")
logger.debug(f"record_age handler executing with age: {age}")

flow_manager.state["age"] = age
result = AgeCollectionResult(age=age)
Expand All @@ -130,12 +130,12 @@ async def collect_age(
return result, next_node


async def collect_marital_status(
async def record_marital_status(
args: FlowArgs, flow_manager: FlowManager
) -> tuple[MaritalStatusResult, NodeConfig]:
"""Process marital status collection."""
status = args["marital_status"]
logger.debug(f"collect_marital_status handler executing with status: {status}")
logger.debug(f"record_marital_status handler executing with status: {status}")

result = MaritalStatusResult(marital_status=status)

Expand Down Expand Up @@ -208,23 +208,25 @@ def create_initial_node() -> NodeConfig:
"content": (
"You are a friendly insurance agent. Your responses will be "
"converted to audio, so avoid special characters. Always use "
"the available functions to progress the conversation naturally."
"the available functions to progress the conversation naturally. "
"When you've decided to call a function, do not also respond; "
"the function call by itself is enough."
),
}
],
"task_messages": [
{
"role": "system",
"content": "Start by asking for the customer's age.",
"content": "Start by asking for the customer's age, then record their response using the record_age function.",
}
],
"functions": [
FlowsFunctionSchema(
name="collect_age",
name="record_age",
description="Record customer's age",
properties={"age": {"type": "integer"}},
required=["age"],
handler=collect_age,
handler=record_age,
)
],
}
Expand All @@ -242,11 +244,11 @@ def create_marital_status_node() -> NodeConfig:
],
"functions": [
FlowsFunctionSchema(
name="collect_marital_status",
name="record_marital_status",
description="Record marital status after customer provides it",
properties={"marital_status": {"type": "string", "enum": ["single", "married"]}},
required=["marital_status"],
handler=collect_marital_status,
handler=record_marital_status,
)
],
}
Expand Down Expand Up @@ -295,11 +297,11 @@ def create_quote_results_node(
f"Monthly Premium: ${quote['monthly_premium']:.2f}\n"
f"Coverage Amount: ${quote['coverage_amount']:,}\n"
f"Deductible: ${quote['deductible']:,}\n\n"
"Explain these quote details to the customer. When they request changes, "
"use update_coverage to recalculate their quote. Explain how their "
"Explain these quote details to the customer. If they then request changes, "
"use update_coverage to recalculate their quote. If this is an updated quote (from a previous quote), explain how their "
"changes affected the premium and compare it to their previous quote. "
"Ask if they'd like to make any other adjustments or if they're ready "
"to end the quote process."
"to end the quote process. "
),
}
],
Expand Down Expand Up @@ -344,10 +346,14 @@ def create_end_node() -> NodeConfig:

async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Run the insurance quote bot."""
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None
tts = (
CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
if needs_stt_tts()
else None
)
# LLM service is created using the create_llm function from utils.py
# Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable
Expand All @@ -364,15 +370,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)

pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
list(
filter(
None,
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
],
)
)
)

task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
Expand Down
41 changes: 25 additions & 16 deletions examples/patient_intake.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
TurnAnalyzerUserTurnStopStrategy,
)
from pipecat.turns.user_turn_strategies import UserTurnStrategies
from utils import create_llm
from utils import create_llm, needs_stt_tts

from pipecat_flows import (
ContextStrategy,
Expand Down Expand Up @@ -319,7 +319,7 @@ def create_allergies_node() -> NodeConfig:
task_messages=[
{
"role": "system",
"content": "Collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.",
"content": "Your job now is to collect allergy information. Ask about any allergies they have. After recording allergies (or confirming none), proceed to medical conditions.",
}
],
functions=[record_allergies_func],
Expand Down Expand Up @@ -355,7 +355,7 @@ def create_conditions_node() -> NodeConfig:
task_messages=[
{
"role": "system",
"content": "Collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.",
"content": "Your job now is to collect medical condition information. Ask about any medical conditions they have. After recording conditions (or confirming none), proceed to visit reasons.",
}
],
functions=[record_conditions_func],
Expand Down Expand Up @@ -479,10 +479,14 @@ def create_end_node() -> NodeConfig:

async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
"""Run the patient intake bot."""
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY"))
tts = CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
stt = DeepgramSTTService(api_key=os.getenv("DEEPGRAM_API_KEY")) if needs_stt_tts() else None
tts = (
CartesiaTTSService(
api_key=os.getenv("CARTESIA_API_KEY"),
voice_id="71a7ad14-091c-4e8e-a314-022ece01c121", # British Reading Lady
)
if needs_stt_tts()
else None
)
# LLM service is created using the create_llm function from utils.py
# Default is OpenAI; can be changed by setting LLM_PROVIDER environment variable
Expand All @@ -499,15 +503,20 @@ async def run_bot(transport: BaseTransport, runner_args: RunnerArguments):
)

pipeline = Pipeline(
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
]
list(
filter(
None,
[
transport.input(),
stt,
context_aggregator.user(),
llm,
tts,
transport.output(),
context_aggregator.assistant(),
],
)
)
)

task = PipelineTask(pipeline, params=PipelineParams(allow_interruptions=True))
Expand Down
Loading
Loading