@@ -777,9 +777,6 @@ async def _exec_with_plugin(
777777
778778 async with Aclosing (execute_fn (invocation_context )) as agen :
779779 async for event in agen :
780- _apply_run_config_custom_metadata (
781- event , invocation_context .run_config
782- )
783780 if is_live_call :
784781 if event .partial and _is_transcription (event ):
785782 is_transcribing = True
@@ -806,14 +803,14 @@ async def _exec_with_plugin(
806803 'Appending transcription finished event: %s' , event
807804 )
808805 if self ._should_append_event (event , is_live_call ):
809- await self .session_service . append_event (
810- session = session , event = event
806+ event = await self ._append_event (
807+ invocation_context , session , event
811808 )
812809
813810 for buffered_event in buffered_events :
814811 logger .debug ('Appending buffered event: %s' , buffered_event )
815- await self .session_service . append_event (
816- session = session , event = buffered_event
812+ buffered_event = await self ._append_event (
813+ invocation_context , session , buffered_event
817814 )
818815 yield buffered_event # yield buffered events to caller
819816 buffered_events = []
@@ -822,33 +819,46 @@ async def _exec_with_plugin(
822819 # example, event that stores blob reference, should be appended.
823820 if self ._should_append_event (event , is_live_call ):
824821 logger .debug ('Appending non-buffered event: %s' , event )
825- await self .session_service . append_event (
826- session = session , event = event
822+ event = await self ._append_event (
823+ invocation_context , session , event
827824 )
825+ # Run the on_event callbacks to optionally modify the event.
828826 else :
829- if event .partial is not True :
830- await self .session_service .append_event (
831- session = session , event = event
832- )
833-
834- # Step 3: Run the on_event callbacks to optionally modify the event.
835- modified_event = await plugin_manager .run_on_event_callback (
836- invocation_context = invocation_context , event = event
837- )
838- if modified_event :
839- _apply_run_config_custom_metadata (
840- modified_event , invocation_context .run_config
827+ event = await self ._append_event (
828+ invocation_context , session , event
841829 )
842- yield modified_event
843- else :
844- yield event
830+ yield event
831+ # Run after_run_callbacks to perform global cleanup tasks or finalizing logs and metrics data
832+ # This does NOT emit any event.
833+ await plugin_manager .run_after_run_callback (
834+ invocation_context = invocation_context
835+ )
845836
846- # Step 4: Run the after_run callbacks to perform global cleanup tasks or
847- # finalizing logs and metrics data.
848- # This does NOT emit any event.
849- await plugin_manager .run_after_run_callback (
850- invocation_context = invocation_context
837+ async def _append_event (
838+ self ,
839+ invocation_context : InvocationContext ,
840+ session : Session ,
841+ event : Event ,
842+ ) -> Event :
843+ """Appends an event to the session with plugin callbacks.
844+
845+ Args:
846+ invocation_context: The invocation context.
847+ session: The session to append the event to.
848+ event: The event to process and append to the session.
849+
850+ Returns:
851+ The event after processing by plugins.
852+ """
853+ plugin_manager = invocation_context .plugin_manager
854+ modified_event = await plugin_manager .run_on_event_callback (
855+ invocation_context = invocation_context , event = event
851856 )
857+ if modified_event :
858+ event = modified_event
859+ _apply_run_config_custom_metadata (event , invocation_context .run_config )
860+ await self .session_service .append_event (session = session , event = event )
861+ return event
852862
853863 async def _append_new_message_to_session (
854864 self ,
0 commit comments