-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
pipecat version
0.0.105
Python version
3.11.5
Operating System
MacOS 26.2 (25C56)
Issue description
For Deepgram Flux and Assembly AI it seems the ordering of TranscriptionFrame and UserStoppedSpeakingFrame is not Preserved. There is even a comment in the AssemblyAI processor saying it should be:
# AAI is authoritative — emit UserStoppedSpeakingFrame immediately.
# broadcast_frame pushes downstream (same queue as TranscriptionFrame
# above, so ordering is preserved) and upstream.
However on testing I find that the UserStoppedSpeakingFrame reaches my subsequent processors first then the TranscriptionFrame gets received later.
I have done some testing on this and I believe its down to UserStoppedSpeakingFrame being a SystemFrame. I tried this with another frame that I made "EndOfTurnDetectedFrame"
@dataclass
class EndOfTurnDetectedFrame(DataFrame):
pass
Which seems to work fine.
Here is a snippet from the DeepgramFluxProcessor and you can see that the TranscriptionFrame is first then the UserStoppedSpeakingFrame is broadcast after
if not self._settings.min_confidence or average_confidence > self._settings.min_confidence:
# EndOfTurn means Flux has determined the turn is complete,
# so this TranscriptionFrame is always finalized
await self.push_frame(
TranscriptionFrame(
transcript,
self._user_id,
time_now_iso8601(),
self._settings.language,
result=data,
finalized=True,
)
)
else:
logger.warning(
f"Transcription confidence below min_confidence threshold: {average_confidence}"
)
await self._handle_transcription(transcript, True, self._settings.language)
await self.stop_processing_metrics()
await self.broadcast_frame(UserStoppedSpeakingFrame)
await self._call_event_handler("on_end_of_turn", transcript)
Reproduction steps
- Make a test processor, in my case I called it OrderingProcessor
from loguru import logger
from pipecat.frames.frames import UserStoppedSpeakingFrame, Frame, TranscriptionFrame
from pipecat.processors.frame_processor import FrameProcessor, FrameDirection
class OrderingProcessor(FrameProcessor):
async def process_frame(self, frame: Frame, direction: FrameDirection):
await super().process_frame(frame, direction)
if isinstance(frame, TranscriptionFrame):
logger.info("Received TranscriptionFrame")
if isinstance(frame, UserStoppedSpeakingFrame):
logger.info("Received UserStoppedSpeakingFrame")
await self.push_frame(frame, direction)
- Then make a simple pipeline with both Deepgram and Ordering Processor
For example:
pipeline = Pipeline(
[
transport.input(),
DeepgramFluxSTTService(....),
OrderingProcessor(),
transport.output(),
]
)
- Check the logs and you will see that UserStoppedSpeakingFrame reaches first
{"timestamp": 1773314786.697549, "level": "INFO", "message": "Received UserStoppedSpeakingFrame"}
{"timestamp": 1773314786.698449, "level": "INFO", "message": "Received TranscriptionFrame"}
Expected behavior
Expected behaviour is that TranscriptionFrame will reach the following processors first
Actual behavior
Actual behaviour is that UserStoppedSpeakingFrame will reach the following processors first
Workaround
Deepgram has a on_end_of_turn event I can use to send my own EndOfTurnDetectedFrame that isnt a system frame which works. However AssemblyAI does not have the same event sadly
await self._call_event_handler("on_end_of_turn", transcript)