From 02410d2f27f1b06a8f1b677ab2d1b81f88e5c371 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 11 Apr 2025 01:54:52 +0000 Subject: [PATCH 1/4] add issue templates --- .github/ISSUE_TEMPLATE/bug_report.md | 20 +++++++++++++++++++ .github/ISSUE_TEMPLATE/feature_request.md | 17 ++++++++++++++++ .github/ISSUE_TEMPLATE/question.md | 7 +++++++ .github/ISSUE_TEMPLATE/typo_doc_issue.md | 7 +++++++ ...t_template.md => PULL_REQUEST_TEMPLATE.md} | 0 matrix/cluster/ray_dashboard_job.py | 6 ++++++ 6 files changed, 57 insertions(+) create mode 100644 .github/ISSUE_TEMPLATE/bug_report.md create mode 100644 .github/ISSUE_TEMPLATE/feature_request.md create mode 100644 .github/ISSUE_TEMPLATE/question.md create mode 100644 .github/ISSUE_TEMPLATE/typo_doc_issue.md rename .github/{pull_request_template.md => PULL_REQUEST_TEMPLATE.md} (100%) diff --git a/.github/ISSUE_TEMPLATE/bug_report.md b/.github/ISSUE_TEMPLATE/bug_report.md new file mode 100644 index 0000000..b7e5f66 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/bug_report.md @@ -0,0 +1,20 @@ +--- +name: Bug Report +about: Submit a report to help us improve. +labels: 'bug, needs triage' +--- + +**Describe the bug:** +A clear and concise description of what the bug is. + +**Describe how to reproduce:** +Steps to reproduce the behavior. Ideally attach a minimal code sample to reproduce the described issue. + +**Describe the expected behavior:** +A clear and concise description of what you expected to happen. + +**Environment:** +At the very least, specify the versions of matrix, PyTorch, Python, and CUDA along with your operating system and, if relevant, GPU model. + +**Additional Context:** +Add any other context about the bug here. diff --git a/.github/ISSUE_TEMPLATE/feature_request.md b/.github/ISSUE_TEMPLATE/feature_request.md new file mode 100644 index 0000000..5e808aa --- /dev/null +++ b/.github/ISSUE_TEMPLATE/feature_request.md @@ -0,0 +1,17 @@ +--- +name: Feature Request +about: Submit a request for a new feature. +labels: 'enhancement, needs triage' +--- + +**Is your feature request related to a problem? Please describe:** +A clear and concise description of what the problem is. + +**Describe the solution you would like:** +A clear and concise description of what you want to happen. + +**Describe the alternatives you have considered:** +A clear and concise description of any alternative solutions or features you have considered. + +**Additional Context:** +Add any other context about the feature request here. diff --git a/.github/ISSUE_TEMPLATE/question.md b/.github/ISSUE_TEMPLATE/question.md new file mode 100644 index 0000000..8743b33 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/question.md @@ -0,0 +1,7 @@ +--- +name: Question +about: Ask a question to the users and contributors. +labels: 'question, needs triage' +--- + +Please make sure that you first search existing issues and documentation before asking a question. If you cannot find an answer, be clear and concise. Ideally attach a minimal code sample if it is relevant to your question. diff --git a/.github/ISSUE_TEMPLATE/typo_doc_issue.md b/.github/ISSUE_TEMPLATE/typo_doc_issue.md new file mode 100644 index 0000000..a6189b4 --- /dev/null +++ b/.github/ISSUE_TEMPLATE/typo_doc_issue.md @@ -0,0 +1,7 @@ +--- +name: Typo or Documentation Issue +about: Report a typo or an issue related to documentation. +labels: 'documentation, needs triage' +--- + +For typos, please go ahead; fix the typo and submit a PR. For documentation issues, please describe the issue here and wait for approval before submitting a PR. diff --git a/.github/pull_request_template.md b/.github/PULL_REQUEST_TEMPLATE.md similarity index 100% rename from .github/pull_request_template.md rename to .github/PULL_REQUEST_TEMPLATE.md diff --git a/matrix/cluster/ray_dashboard_job.py b/matrix/cluster/ray_dashboard_job.py index c2b5f32..9bdb22e 100644 --- a/matrix/cluster/ray_dashboard_job.py +++ b/matrix/cluster/ray_dashboard_job.py @@ -1,3 +1,9 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the license found in the +# LICENSE file in the root directory of this source tree. + import logging import os import shutil From e910d726fb8a29c822f2834c74958eebc4a32995 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 11 Apr 2025 02:46:21 +0000 Subject: [PATCH 2/4] add a batch request interface --- matrix/app_server/llm/query_llm.py | 31 ++++++++++ pyproject.toml | 1 + tests/unit/query/test_batch_requests.py | 75 +++++++++++++++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 tests/unit/query/test_batch_requests.py diff --git a/matrix/app_server/llm/query_llm.py b/matrix/app_server/llm/query_llm.py index a278646..e0d0b1b 100644 --- a/matrix/app_server/llm/query_llm.py +++ b/matrix/app_server/llm/query_llm.py @@ -471,6 +471,37 @@ async def make_request( } +def batch_requests( + url: tp.Union[str, tp.Callable[[], tp.Awaitable[str]]], + model: str, + requests: tp.List[tp.Dict[str, tp.Any]], + **kwargs, +) -> tp.List[tp.Dict[str, tp.Any]]: + """ + Process multiple requests by calling make_request_async for each. + This function works whether called from sync or async context. + """ + async def _process_requests(): + """Helper function to process all requests concurrently.""" + return await asyncio.gather(*[make_request(url, model, request, **kwargs) for request in requests]) + + # Get or create an event loop + try: + loop = asyncio.get_event_loop() + except RuntimeError: + # No event loop in this thread, create a new one + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Check if we're already in an async context + if loop.is_running(): + # We're in an async context, create a task for the batch operation + return asyncio.create_task(_process_requests()) + else: + # We're in a sync context, run the event loop until complete + return loop.run_until_complete(_process_requests()) + + async def main( url: tp.Union[str, tp.Callable[[], tp.Awaitable[str]]], output_file: str, diff --git a/pyproject.toml b/pyproject.toml index 0da7a41..a4ebb3e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,6 +42,7 @@ classifiers=[ dev = [ # Test "pytest>=4.3.0", + "pytest-asyncio>=0.26.0", "coverage[toml]>=5.1", # Format "black==24.10.0", diff --git a/tests/unit/query/test_batch_requests.py b/tests/unit/query/test_batch_requests.py new file mode 100644 index 0000000..0f68d75 --- /dev/null +++ b/tests/unit/query/test_batch_requests.py @@ -0,0 +1,75 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the license found in the +# LICENSE file in the root directory of this source tree. + +import asyncio +from unittest.mock import MagicMock, patch + +import pytest + +import matrix +from matrix.app_server.llm import query_llm + + +@pytest.mark.asyncio +async def test_batch_requests_in_async_context(): + """Test batch_requests when called from an async context.""" + # Create a mock for make_request_async + mock_response = "mocked_response" + + async def mock_make_request_async(_url, _model, request): + return f"{mock_response}_{request}" + + with patch( + "matrix.app_server.llm.query_llm.make_request", + side_effect=mock_make_request_async, + ): + # Test with a list of requests + requests = [1, 2, 3] + result = await query_llm.batch_requests("", "", requests) + + # Verify results + assert len(result) == 3 + assert result == [ + f"{mock_response}_1", + f"{mock_response}_2", + f"{mock_response}_3", + ] + + +def test_batch_requests_in_sync_context(): + """Test batch_requests when called from a synchronous context.""" + # Create a mock for make_request_async + mock_response = "mocked_response" + + async def mock_make_request_async(_url, _model, request): + return f"{mock_response}_{request}" + + with patch( + "matrix.app_server.llm.query_llm.make_request", + side_effect=mock_make_request_async, + ): + # Test with a list of requests + requests = [1, 2, 3] + result = query_llm.batch_requests("", "", requests) + + # Verify results + assert len(result) == 3 + assert result == [ + f"{mock_response}_1", + f"{mock_response}_2", + f"{mock_response}_3", + ] + + +@pytest.mark.asyncio +async def test_batch_requests_empty_list(): + """Test batch_requests with an empty list.""" + with patch("matrix.app_server.llm.query_llm.make_request") as mock_request: + result = await query_llm.batch_requests("", "", []) + # make_request_async should not be called + mock_request.assert_not_called() + # Result should be an empty list + assert result == [] From 356b6caa29e1b0aa1ca79a90967a4a535924a2cb Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 11 Apr 2025 04:18:20 +0000 Subject: [PATCH 3/4] format --- matrix/app_server/llm/query_llm.py | 7 +++++-- matrix/cli.py | 16 +++++++--------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/matrix/app_server/llm/query_llm.py b/matrix/app_server/llm/query_llm.py index e0d0b1b..8b6369f 100644 --- a/matrix/app_server/llm/query_llm.py +++ b/matrix/app_server/llm/query_llm.py @@ -481,9 +481,12 @@ def batch_requests( Process multiple requests by calling make_request_async for each. This function works whether called from sync or async context. """ + async def _process_requests(): """Helper function to process all requests concurrently.""" - return await asyncio.gather(*[make_request(url, model, request, **kwargs) for request in requests]) + return await asyncio.gather( + *[make_request(url, model, request, **kwargs) for request in requests] + ) # Get or create an event loop try: @@ -492,7 +495,7 @@ async def _process_requests(): # No event loop in this thread, create a new one loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) - + # Check if we're already in an async context if loop.is_running(): # We're in an async context, create a task for the batch operation diff --git a/matrix/cli.py b/matrix/cli.py index ace1cf9..7f13ed9 100644 --- a/matrix/cli.py +++ b/matrix/cli.py @@ -357,15 +357,13 @@ def check_health( else: if not use_chat: data_payload = {"prompt": prompt} - response = asyncio.run( - query_llm.make_request( - metadata["endpoints"]["head"], - metadata["model_name"], - data_payload, - app_name=app_name, - **kwargs, - ) - ) + response = query_llm.batch_requests( + metadata["endpoints"]["head"], + metadata["model_name"], + [data_payload], + app_name=app_name, + **kwargs, + )[0] print(response) return "error" not in response["response"] From 479cbe1de544df35b5a08be371f5658c407f00ab Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Fri, 11 Apr 2025 04:50:11 +0000 Subject: [PATCH 4/4] fix batch_requests for both sync and async caller --- matrix/app_server/llm/query_llm.py | 24 ++++++++++--- tests/unit/query/test_batch_requests.py | 45 +++++++++++++------------ 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/matrix/app_server/llm/query_llm.py b/matrix/app_server/llm/query_llm.py index 8b6369f..80405da 100644 --- a/matrix/app_server/llm/query_llm.py +++ b/matrix/app_server/llm/query_llm.py @@ -478,7 +478,7 @@ def batch_requests( **kwargs, ) -> tp.List[tp.Dict[str, tp.Any]]: """ - Process multiple requests by calling make_request_async for each. + Process multiple requests by calling make_request for each. This function works whether called from sync or async context. """ @@ -488,7 +488,7 @@ async def _process_requests(): *[make_request(url, model, request, **kwargs) for request in requests] ) - # Get or create an event loop + # Get the event loop try: loop = asyncio.get_event_loop() except RuntimeError: @@ -498,10 +498,24 @@ async def _process_requests(): # Check if we're already in an async context if loop.is_running(): - # We're in an async context, create a task for the batch operation - return asyncio.create_task(_process_requests()) + # We're in an async context and can't use run_until_complete + # Create a new thread to run our async code + import concurrent.futures + import threading + + def run_in_new_loop(): + # Create a new event loop for this thread + new_loop = asyncio.new_event_loop() + try: + return new_loop.run_until_complete(_process_requests()) + finally: + new_loop.close() + + # Run in an executor to avoid blocking the current event loop + with concurrent.futures.ThreadPoolExecutor() as pool: + return pool.submit(run_in_new_loop).result() else: - # We're in a sync context, run the event loop until complete + # We're in a sync context, use the current loop return loop.run_until_complete(_process_requests()) diff --git a/tests/unit/query/test_batch_requests.py b/tests/unit/query/test_batch_requests.py index 0f68d75..3a6f40c 100644 --- a/tests/unit/query/test_batch_requests.py +++ b/tests/unit/query/test_batch_requests.py @@ -13,30 +13,34 @@ from matrix.app_server.llm import query_llm -@pytest.mark.asyncio -async def test_batch_requests_in_async_context(): - """Test batch_requests when called from an async context.""" - # Create a mock for make_request_async +def test_batch_requests_from_async_run(): + """Test batch_requests called from within an asyncio.run context.""" mock_response = "mocked_response" async def mock_make_request_async(_url, _model, request): return f"{mock_response}_{request}" - with patch( - "matrix.app_server.llm.query_llm.make_request", - side_effect=mock_make_request_async, - ): - # Test with a list of requests - requests = [1, 2, 3] - result = await query_llm.batch_requests("", "", requests) + async def async_wrapper(): + with patch( + "matrix.app_server.llm.query_llm.make_request", + side_effect=mock_make_request_async, + ): + requests = [1, 2, 3] + # batch_requests should handle the async context internally + # and return a list directly, not a task + result = query_llm.batch_requests("", "", requests) - # Verify results - assert len(result) == 3 - assert result == [ - f"{mock_response}_1", - f"{mock_response}_2", - f"{mock_response}_3", - ] + # Verify it returned a list, not a task + assert isinstance(result, list) + assert len(result) == 3 + assert result == [ + f"{mock_response}_1", + f"{mock_response}_2", + f"{mock_response}_3", + ] + + # Use asyncio.run to execute the async wrapper + asyncio.run(async_wrapper()) def test_batch_requests_in_sync_context(): @@ -64,11 +68,10 @@ async def mock_make_request_async(_url, _model, request): ] -@pytest.mark.asyncio -async def test_batch_requests_empty_list(): +def test_batch_requests_empty_list(): """Test batch_requests with an empty list.""" with patch("matrix.app_server.llm.query_llm.make_request") as mock_request: - result = await query_llm.batch_requests("", "", []) + result = query_llm.batch_requests("", "", []) # make_request_async should not be called mock_request.assert_not_called() # Result should be an empty list