Skip to content

feat: async support for process functions #9

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 51 additions & 36 deletions src/f5_ai_gateway_sdk/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
LICENSE file in the root directory of this source tree.
"""

import inspect
import json
import logging
from abc import ABC
Expand Down Expand Up @@ -225,6 +226,13 @@ def __init_subclass__(cls, **kwargs):
"The DEPRECATED 'process' method must not be implemented "
"alongside 'process_input' or 'process_response'."
)
if is_process_overridden and inspect.iscoroutinefunction(cls.process):
# we don't want to add async capabilities to the deprecated function
raise TypeError(
f"Cannot create concrete class {cls.__name__}. "
"The DEPRECATED 'process' method does not support async. "
"Implement 'process_input' and/or 'process_response' instead."
)

return

Expand Down Expand Up @@ -875,15 +883,18 @@ async def _parse_and_process(self, request: Request) -> Response:
prompt_hash, response_hash = (None, None)
if input_direction:
prompt_hash = prompt.hash()
result: Result | Reject = self.process_input(
result = await self._handle_process_function(
self.process_input,
metadata=metadata,
parameters=parameters,
prompt=prompt,
request=request,
)

else:
response_hash = response.hash()
result: Result | Reject = self.process_response(
result = await self._handle_process_function(
self.process_response,
metadata=metadata,
parameters=parameters,
prompt=prompt,
Expand Down Expand Up @@ -1014,7 +1025,16 @@ def _is_method_overridden(self, method_name: str) -> bool:
# the method object directly from the Processor class, then it has been overridden.
return instance_class_method_obj is not base_class_method_obj

def process_input(
async def _process_fallback(self, **kwargs) -> Result | Reject:
warnings.warn(
f"{type(self).__name__} uses the deprecated 'process' method. "
"Implement 'process_input' and/or 'process_response' instead.",
DeprecationWarning,
stacklevel=2,
)
return await self._handle_process_function(self.process, **kwargs)

async def process_input(
self,
prompt: PROMPT,
metadata: Metadata,
Expand Down Expand Up @@ -1043,26 +1063,20 @@ def process_input(self, prompt, response, metadata, parameters, request):

return Result(processor_result=result)
"""
if self._is_method_overridden("process"):
warnings.warn(
f"{type(self).__name__} uses the deprecated 'process' method for input. "
"Implement 'process_input' instead.",
DeprecationWarning,
stacklevel=2, # Points the warning to the caller of process_input
if not self._is_method_overridden("process"):
raise NotImplementedError(
f"{type(self).__name__} must implement 'process_input' or the "
"deprecated 'process' method to handle input."
)
return self.process(
prompt=prompt,
response=None,
metadata=metadata,
parameters=parameters,
request=request,
)
raise NotImplementedError(
f"{type(self).__name__} must implement 'process_input' or the "
"deprecated 'process' method to handle input."
return await self._process_fallback(
prompt=prompt,
response=None,
metadata=metadata,
parameters=parameters,
request=request,
)

def process_response(
async def process_response(
self,
prompt: PROMPT | None,
response: RESPONSE,
Expand Down Expand Up @@ -1096,23 +1110,17 @@ def process_response(self, prompt, response, metadata, parameters, request):
return Result(processor_result=result)
"""

if self._is_method_overridden("process"):
warnings.warn(
f"{type(self).__name__} uses the deprecated 'process' method for response. "
"Implement 'process_response' instead.",
DeprecationWarning,
stacklevel=2, # Points the warning to the caller of process_input
if not self._is_method_overridden("process"):
raise NotImplementedError(
f"{type(self).__name__} must implement 'process_response' or the "
"deprecated 'process' method to handle input."
)
return self.process(
prompt=prompt,
response=response,
metadata=metadata,
parameters=parameters,
request=request,
)
raise NotImplementedError(
f"{type(self).__name__} must implement 'process_response' or the "
"deprecated 'process' method to handle input."
return await self._process_fallback(
prompt=prompt,
response=response,
metadata=metadata,
parameters=parameters,
request=request,
)

def process(
Expand Down Expand Up @@ -1159,6 +1167,13 @@ def process(self, prompt, response, metadata, parameters, request):
"'process_input'/'process_response'."
)

async def _handle_process_function(self, func, **kwargs) -> Result | Reject:
if inspect.iscoroutinefunction(func):
result = await func(**kwargs)
else:
result = func(**kwargs)
return result


def _validation_error_as_messages(err: ValidationError) -> list[str]:
return [_error_details_to_str(e) for e in err.errors()]
Expand Down
Loading