Skip to content

Commit 5f6f475

Browse files
committed
prog: made progress on optimizing downloading query history using background Celery jobs
1 parent 319a207 commit 5f6f475

File tree

6 files changed

+290
-111
lines changed

6 files changed

+290
-111
lines changed

Diff for: backend/ee/onyx/background/celery/apps/primary.py

+25
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
1+
from datetime import datetime
2+
3+
from celery import Task
4+
5+
from ee.onyx.background.celery.tasks.query_history import query_history_report
16
from ee.onyx.background.celery_utils import should_perform_chat_ttl_check
27
from ee.onyx.background.task_name_builders import name_chat_ttl_task
8+
from ee.onyx.background.task_name_builders import name_query_history_report_task
39
from ee.onyx.server.reporting.usage_export_generation import create_new_usage_report
410
from onyx.background.celery.apps.primary import celery_app
511
from onyx.background.task_utils import build_celery_task_wrapper
@@ -66,6 +72,11 @@ def check_ttl_management_task(*, tenant_id: str) -> None:
6672
)
6773

6874

75+
#####
76+
# Non-Periodic Tasks
77+
#####
78+
79+
6980
@celery_app.task(
7081
name="autogenerate_usage_report_task",
7182
ignore_result=True,
@@ -79,3 +90,17 @@ def autogenerate_usage_report_task(*, tenant_id: str) -> None:
7990
user_id=None,
8091
period=None,
8192
)
93+
94+
95+
@build_celery_task_wrapper(name_query_history_report_task)
96+
@celery_app.task(
97+
name="query_history_report_task",
98+
bind=True,
99+
# ignore_result=True,
100+
soft_time_limit=JOB_TIMEOUT,
101+
)
102+
def query_history_report_task(self: Task, *, start: datetime, end: datetime) -> str:
103+
with get_session_with_current_tenant() as db_session:
104+
return query_history_report(
105+
db_session=db_session, request_id=self.request.id, start=start, end=end
106+
)
+78
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import csv
2+
import io
3+
from datetime import datetime
4+
5+
from sqlalchemy.orm import Session
6+
7+
from ee.onyx.server.query_history.models import ChatSessionSnapshot
8+
from ee.onyx.server.query_history.models import QuestionAnswerPairSnapshot
9+
from ee.onyx.server.query_history.utils import fetch_and_process_chat_session_history
10+
from ee.onyx.server.query_history.utils import ONYX_ANONYMIZED_EMAIL
11+
from onyx.configs.app_configs import ONYX_QUERY_HISTORY_TYPE
12+
from onyx.configs.constants import FileOrigin
13+
from onyx.configs.constants import QueryHistoryType
14+
from onyx.file_store.file_store import get_default_file_store
15+
16+
17+
def query_history_report(
18+
db_session: Session, request_id: str, start: datetime, end: datetime
19+
) -> str:
20+
chat_session_history = fetch_chat_session_history(
21+
db_session=db_session, start=start, end=end
22+
)
23+
qa_pairs = construct_qa_pairs(chat_session_history)
24+
persist_chat_session_history(
25+
db_session=db_session,
26+
report_name=f"query_history_report_{request_id}.csv",
27+
qa_pairs=qa_pairs,
28+
)
29+
30+
31+
def fetch_chat_session_history(
32+
db_session: Session, start: datetime, end: datetime
33+
) -> list[ChatSessionSnapshot]:
34+
return fetch_and_process_chat_session_history(
35+
db_session=db_session,
36+
start=start,
37+
end=end,
38+
feedback_type=None,
39+
limit=None,
40+
)
41+
42+
43+
def construct_qa_pairs(
44+
chat_session_history: list[ChatSessionSnapshot],
45+
) -> list[QuestionAnswerPairSnapshot]:
46+
qa_pairs: list[QuestionAnswerPairSnapshot] = []
47+
for chat_session_snapshot in chat_session_history:
48+
if ONYX_QUERY_HISTORY_TYPE == QueryHistoryType.ANONYMIZED:
49+
chat_session_snapshot.user_email = ONYX_ANONYMIZED_EMAIL
50+
51+
qa_pairs.extend(
52+
QuestionAnswerPairSnapshot.from_chat_session_snapshot(chat_session_snapshot)
53+
)
54+
55+
return qa_pairs
56+
57+
58+
def persist_chat_session_history(
59+
db_session: Session, report_name: str, qa_pairs: list[QuestionAnswerPairSnapshot]
60+
):
61+
file_store = get_default_file_store(db_session)
62+
stream = io.StringIO()
63+
writer = csv.DictWriter(
64+
stream, fieldnames=list(QuestionAnswerPairSnapshot.model_fields.keys())
65+
)
66+
67+
writer.writeheader()
68+
for row in qa_pairs:
69+
writer.writerow(row.to_json())
70+
71+
stream.seek(0)
72+
file_store.save_file(
73+
file_name=report_name,
74+
content=stream,
75+
display_name=report_name,
76+
file_origin=FileOrigin.GENERATED_REPORT,
77+
file_type="text/csv",
78+
)

Diff for: backend/ee/onyx/background/task_name_builders.py

+9
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,11 @@
1+
from datetime import datetime
2+
3+
14
def name_chat_ttl_task(retention_limit_days: int, tenant_id: str | None = None) -> str:
25
return f"chat_ttl_{retention_limit_days}_days"
6+
7+
8+
def name_query_history_report_task(start: datetime, end: datetime) -> str:
9+
start_epoch = int(start.timestamp())
10+
end_epoch = int(end.timestamp())
11+
return f"query_history_report_{start_epoch}_{end_epoch}"

Diff for: backend/ee/onyx/server/query_history/api.py

+84-111
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import csv
21
import io
32
from datetime import datetime
43
from datetime import timezone
@@ -9,107 +8,33 @@
98
from fastapi import Depends
109
from fastapi import HTTPException
1110
from fastapi import Query
11+
from fastapi import Response
1212
from fastapi.responses import StreamingResponse
1313
from sqlalchemy.orm import Session
1414

15-
from ee.onyx.db.query_history import fetch_chat_sessions_eagerly_by_time
15+
from ee.onyx.background.celery.apps.primary import query_history_report_task
1616
from ee.onyx.db.query_history import get_page_of_chat_sessions
1717
from ee.onyx.db.query_history import get_total_filtered_chat_sessions_count
1818
from ee.onyx.server.query_history.models import ChatSessionMinimal
1919
from ee.onyx.server.query_history.models import ChatSessionSnapshot
20-
from ee.onyx.server.query_history.models import MessageSnapshot
21-
from ee.onyx.server.query_history.models import QuestionAnswerPairSnapshot
20+
from ee.onyx.server.query_history.utils import ONYX_ANONYMIZED_EMAIL
21+
from ee.onyx.server.query_history.utils import snapshot_from_chat_session
2222
from onyx.auth.users import current_admin_user
23-
from onyx.auth.users import get_display_email
24-
from onyx.chat.chat_utils import create_chat_chain
2523
from onyx.configs.app_configs import ONYX_QUERY_HISTORY_TYPE
26-
from onyx.configs.constants import MessageType
2724
from onyx.configs.constants import QAFeedbackType
2825
from onyx.configs.constants import QueryHistoryType
29-
from onyx.configs.constants import SessionType
3026
from onyx.db.chat import get_chat_session_by_id
3127
from onyx.db.chat import get_chat_sessions_by_user
3228
from onyx.db.engine import get_session
33-
from onyx.db.models import ChatSession
29+
from onyx.db.enums import TaskStatus
3430
from onyx.db.models import User
31+
from onyx.db.tasks import get_task_by_task_id
3532
from onyx.server.documents.models import PaginatedReturn
3633
from onyx.server.query_and_chat.models import ChatSessionDetails
3734
from onyx.server.query_and_chat.models import ChatSessionsResponse
3835

3936
router = APIRouter()
4037

41-
ONYX_ANONYMIZED_EMAIL = "[email protected]"
42-
43-
44-
def fetch_and_process_chat_session_history(
45-
db_session: Session,
46-
start: datetime,
47-
end: datetime,
48-
feedback_type: QAFeedbackType | None,
49-
limit: int | None = 500,
50-
) -> list[ChatSessionSnapshot]:
51-
# observed to be slow a scale of 8192 sessions and 4 messages per session
52-
53-
# this is a little slow (5 seconds)
54-
chat_sessions = fetch_chat_sessions_eagerly_by_time(
55-
start=start, end=end, db_session=db_session, limit=limit
56-
)
57-
58-
# this is VERY slow (80 seconds) due to create_chat_chain being called
59-
# for each session. Needs optimizing.
60-
chat_session_snapshots = [
61-
snapshot_from_chat_session(chat_session=chat_session, db_session=db_session)
62-
for chat_session in chat_sessions
63-
]
64-
65-
valid_snapshots = [
66-
snapshot for snapshot in chat_session_snapshots if snapshot is not None
67-
]
68-
69-
if feedback_type:
70-
valid_snapshots = [
71-
snapshot
72-
for snapshot in valid_snapshots
73-
if any(
74-
message.feedback_type == feedback_type for message in snapshot.messages
75-
)
76-
]
77-
78-
return valid_snapshots
79-
80-
81-
def snapshot_from_chat_session(
82-
chat_session: ChatSession,
83-
db_session: Session,
84-
) -> ChatSessionSnapshot | None:
85-
try:
86-
# Older chats may not have the right structure
87-
last_message, messages = create_chat_chain(
88-
chat_session_id=chat_session.id, db_session=db_session
89-
)
90-
messages.append(last_message)
91-
except RuntimeError:
92-
return None
93-
94-
flow_type = SessionType.SLACK if chat_session.onyxbot_flow else SessionType.CHAT
95-
96-
return ChatSessionSnapshot(
97-
id=chat_session.id,
98-
user_email=get_display_email(
99-
chat_session.user.email if chat_session.user else None
100-
),
101-
name=chat_session.description,
102-
messages=[
103-
MessageSnapshot.build(message)
104-
for message in messages
105-
if message.message_type != MessageType.SYSTEM
106-
],
107-
assistant_id=chat_session.persona_id,
108-
assistant_name=chat_session.persona.name if chat_session.persona else None,
109-
time_created=chat_session.time_created,
110-
flow_type=flow_type,
111-
)
112-
11338

11439
@router.get("/admin/chat-sessions")
11540
def get_user_chat_sessions(
@@ -238,52 +163,100 @@ def get_chat_session_admin(
238163
return snapshot
239164

240165

241-
@router.get("/admin/query-history-csv")
242-
def get_query_history_as_csv(
243-
_: User | None = Depends(current_admin_user),
166+
@router.post("/admin/query-history-csv")
167+
def post_query_history_as_csv(
168+
response: Response,
244169
start: datetime | None = None,
245170
end: datetime | None = None,
171+
_: User | None = Depends(current_admin_user),
246172
db_session: Session = Depends(get_session),
247-
) -> StreamingResponse:
173+
) -> None:
248174
if ONYX_QUERY_HISTORY_TYPE == QueryHistoryType.DISABLED:
249175
raise HTTPException(
250176
status_code=HTTPStatus.FORBIDDEN,
251177
detail="Query history has been disabled by the administrator.",
252178
)
253179

254-
# this call is very expensive and is timing out via endpoint
255-
# TODO: optimize call and/or generate via background task
256-
complete_chat_session_history = fetch_and_process_chat_session_history(
257-
db_session=db_session,
258-
start=start or datetime.fromtimestamp(0, tz=timezone.utc),
259-
end=end or datetime.now(tz=timezone.utc),
260-
feedback_type=None,
261-
limit=None,
180+
start = start or datetime.fromtimestamp(0, tz=timezone.utc)
181+
end = end or datetime.now(tz=timezone.utc)
182+
task = query_history_report_task.delay(
183+
start=start,
184+
end=end,
262185
)
263186

264-
question_answer_pairs: list[QuestionAnswerPairSnapshot] = []
265-
for chat_session_snapshot in complete_chat_session_history:
266-
if ONYX_QUERY_HISTORY_TYPE == QueryHistoryType.ANONYMIZED:
267-
chat_session_snapshot.user_email = ONYX_ANONYMIZED_EMAIL
187+
response.status_code = HTTPStatus.ACCEPTED
188+
response.headers[
189+
"Location"
190+
] = f"/admin/query-history-csv/status?request_id={task.id}"
191+
return {"request_id": task.id}
192+
193+
194+
@router.get("/admin/query-history-csv/status")
195+
def get_query_history_csv_status(
196+
request_id: str,
197+
response: Response,
198+
_: User | None = Depends(current_admin_user),
199+
db_session: Session = Depends(get_session),
200+
) -> dict[str, TaskStatus]:
201+
if ONYX_QUERY_HISTORY_TYPE == QueryHistoryType.DISABLED:
202+
raise HTTPException(
203+
status_code=HTTPStatus.FORBIDDEN,
204+
detail="Query history has been disabled by the administrator.",
205+
)
268206

269-
question_answer_pairs.extend(
270-
QuestionAnswerPairSnapshot.from_chat_session_snapshot(chat_session_snapshot)
207+
task_queue_state = get_task_by_task_id(request_id, db_session)
208+
if task_queue_state is None:
209+
raise HTTPException(
210+
status_code=HTTPStatus.NOT_FOUND,
211+
detail="Task queue state not found for task id.",
271212
)
272213

273-
# Create an in-memory text stream
274-
stream = io.StringIO()
275-
writer = csv.DictWriter(
276-
stream, fieldnames=list(QuestionAnswerPairSnapshot.model_fields.keys())
277-
)
278-
writer.writeheader()
279-
for row in question_answer_pairs:
280-
writer.writerow(row.to_json())
214+
return {
215+
"status": task_queue_state.status,
216+
}
217+
218+
219+
@router.get("/admin/query-history-csv/download")
220+
def download_query_history_csv(
221+
request_id: str,
222+
_: User | None = Depends(current_admin_user),
223+
db_session: Session = Depends(get_session),
224+
) -> StreamingResponse:
225+
if ONYX_QUERY_HISTORY_TYPE == QueryHistoryType.DISABLED:
226+
raise HTTPException(
227+
status_code=HTTPStatus.FORBIDDEN,
228+
detail="Query history has been disabled by the administrator.",
229+
)
281230

282-
# Reset the stream's position to the start
283-
stream.seek(0)
231+
task_queue_state = get_task_by_task_id(request_id, db_session)
232+
if task_queue_state is None:
233+
raise HTTPException(
234+
status_code=HTTPStatus.NOT_FOUND,
235+
detail="Task queue state not found for task id.",
236+
)
237+
elif task_queue_state.status == TaskStatus.FAILURE:
238+
raise HTTPException(
239+
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
240+
detail="Task failed to complete.",
241+
)
242+
elif task_queue_state.status != TaskStatus.SUCCESS:
243+
raise HTTPException(
244+
status_code=HTTPStatus.NO_CONTENT,
245+
detail="Task is still pending.",
246+
)
284247

248+
# TODO: change this to read from the file store with the file name
249+
# `query_history_report_{request_id}.csv`
250+
test_csv = "user_message,assistant_message,date\n"
251+
test_csv += "Hello, how are you?,I am fine,2021-01-01\n"
252+
test_csv += (
253+
"What is the weather in Tokyo?,The weather in Tokyo is sunny,2021-01-02\n"
254+
)
255+
test_csv += (
256+
"What is the capital of France?,The capital of France is Paris,2021-01-03\n"
257+
)
285258
return StreamingResponse(
286-
iter([stream.getvalue()]),
259+
io.StringIO(test_csv),
287260
media_type="text/csv",
288-
headers={"Content-Disposition": "attachment;filename=onyx_query_history.csv"},
261+
headers={"Content-Disposition": "attachment;filename=query_history.csv"},
289262
)

0 commit comments

Comments
 (0)