Skip to content

Allow async pipe #468

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 54 additions & 63 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import inspect
from fastapi import FastAPI, Request, Depends, status, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
from fastapi.concurrency import run_in_threadpool
Expand Down Expand Up @@ -27,7 +28,7 @@
import uuid
import sys
import subprocess

import inspect

from config import API_KEY, PIPELINES_DIR

Expand Down Expand Up @@ -666,8 +667,19 @@ async def generate_openai_chat_completion(form_data: OpenAIChatCompletionForm):
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Pipeline {form_data.model} not found",
)

async def execute_pipe(pipe, *args, **kwargs):
if inspect.isasyncgenfunction(pipe):
async for res in pipe(*args, **kwargs):
yield res
elif inspect.iscoroutinefunction(pipe):
for item in await pipe(*args, **kwargs):
yield item
else:
for item in await run_in_threadpool(pipe, *args, **kwargs):
yield item

def job():
async def job():
print(form_data.model)

pipeline = app.state.PIPELINES[form_data.model]
Expand All @@ -683,39 +695,30 @@ def job():

if form_data.stream:

def stream_content():
res = pipe(
async def stream_content():
res = execute_pipe(pipe,
user_message=user_message,
model_id=pipeline_id,
messages=messages,
body=form_data.model_dump(),
)
async for line in res:
if isinstance(line, BaseModel):
line = line.model_dump_json()
line = f"data: {line}"

logging.info(f"stream:true:{res}")

if isinstance(res, str):
message = stream_message_template(form_data.model, res)
logging.info(f"stream_content:str:{message}")
yield f"data: {json.dumps(message)}\n\n"

if isinstance(res, Iterator):
for line in res:
if isinstance(line, BaseModel):
line = line.model_dump_json()
line = f"data: {line}"
try:
line = line.decode("utf-8")
except:
pass

try:
line = line.decode("utf-8")
except:
pass
logging.info(f"stream_content:Generator:{line}")

logging.info(f"stream_content:Generator:{line}")

if line.startswith("data:"):
yield f"{line}\n\n"
else:
line = stream_message_template(form_data.model, line)
yield f"data: {json.dumps(line)}\n\n"
if line.startswith("data:"):
yield f"{line}\n\n"
else:
line = stream_message_template(form_data.model, line)
yield f"data: {json.dumps(line)}\n\n"

if isinstance(res, str) or isinstance(res, Generator):
finish_message = {
Expand All @@ -732,52 +735,40 @@ def stream_content():
}
],
}

yield f"data: {json.dumps(finish_message)}\n\n"
yield f"data: [DONE]"

return StreamingResponse(stream_content(), media_type="text/event-stream")
else:
res = pipe(
res = execute_pipe(pipe,
user_message=user_message,
model_id=pipeline_id,
messages=messages,
body=form_data.model_dump(),
)
logging.info(f"stream:false:{res}")

if isinstance(res, dict):
return res
elif isinstance(res, BaseModel):
return res.model_dump()
else:

message = ""
message = ""
async for stream in res:
message = f"{message}{stream}"

if isinstance(res, str):
message = res

if isinstance(res, Generator):
for stream in res:
message = f"{message}{stream}"

logging.info(f"stream:false:{message}")
return {
"id": f"{form_data.model}-{str(uuid.uuid4())}",
"object": "chat.completion",
"created": int(time.time()),
"model": form_data.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": message,
},
"logprobs": None,
"finish_reason": "stop",
}
],
}
logging.info(f"stream:false:{message}")
return {
"id": f"{form_data.model}-{str(uuid.uuid4())}",
"object": "chat.completion",
"created": int(time.time()),
"model": form_data.model,
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": message,
},
"logprobs": None,
"finish_reason": "stop",
}
],
}

return await run_in_threadpool(job)
return await job()