-
Notifications
You must be signed in to change notification settings - Fork 3.9k
Fix memory leak in AsyncCompletions.parse() with dynamically created models #2148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -1,6 +1,7 @@ | ||||||
from __future__ import annotations | ||||||
|
||||||
import json | ||||||
import weakref | ||||||
from typing import TYPE_CHECKING, Any, Iterable, cast | ||||||
from typing_extensions import TypeVar, TypeGuard, assert_never | ||||||
|
||||||
|
@@ -28,6 +29,9 @@ | |||||
from ...types.chat.completion_create_params import ResponseFormat as ResponseFormatParam | ||||||
from ...types.chat.chat_completion_message_tool_call import Function | ||||||
|
||||||
# Cache to store weak references to schema objects | ||||||
_schema_cache = weakref.WeakKeyDictionary() | ||||||
|
||||||
ResponseFormatT = TypeVar( | ||||||
"ResponseFormatT", | ||||||
# if it isn't given then we don't do any parsing | ||||||
|
@@ -243,6 +247,10 @@ def type_to_response_format_param( | |||||
# can only be a `type` | ||||||
response_format = cast(type, response_format) | ||||||
|
||||||
# Check if we already have a schema for this type in the cache | ||||||
if response_format in _schema_cache: | ||||||
return _schema_cache[response_format] | ||||||
|
||||||
json_schema_type: type[pydantic.BaseModel] | pydantic.TypeAdapter[Any] | None = None | ||||||
|
||||||
if is_basemodel_type(response_format): | ||||||
|
@@ -254,11 +262,16 @@ def type_to_response_format_param( | |||||
else: | ||||||
raise TypeError(f"Unsupported response_format type - {response_format}") | ||||||
|
||||||
return { | ||||||
schema_param = { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit:
Suggested change
|
||||||
"type": "json_schema", | ||||||
"json_schema": { | ||||||
"schema": to_strict_json_schema(json_schema_type), | ||||||
"name": name, | ||||||
"strict": True, | ||||||
}, | ||||||
} | ||||||
|
||||||
# Store a weak reference to the schema parameter | ||||||
_schema_cache[response_format] = schema_param | ||||||
|
||||||
return schema_param |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import unittest | ||
import gc | ||
import sys | ||
from unittest.mock import AsyncMock, patch, MagicMock | ||
from typing import List | ||
|
||
import pytest | ||
from pydantic import Field, create_model | ||
|
||
from openai.resources.beta.chat.completions import AsyncCompletions | ||
from openai.lib._parsing import type_to_response_format_param | ||
from openai.lib._parsing._completions import _schema_cache | ||
|
||
class TestMemoryLeak(unittest.TestCase): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we do not use unittest, can you delete this? it seems to be the same as the pytest version anyway |
||
def setUp(self): | ||
# Clear the schema cache before each test | ||
_schema_cache.clear() | ||
|
||
def test_schema_cache_with_models(self): | ||
"""Test if schema cache properly handles dynamic models and prevents memory leak""" | ||
|
||
StepModel = create_model( | ||
"Step", | ||
explanation=(str, Field()), | ||
output=(str, Field()), | ||
) | ||
|
||
# Create several models and ensure they're cached properly | ||
models = [] | ||
for i in range(5): | ||
model = create_model( | ||
f"MathResponse{i}", | ||
steps=(List[StepModel], Field()), | ||
final_answer=(str, Field()), | ||
) | ||
models.append(model) | ||
|
||
# Convert model to response format param | ||
param = type_to_response_format_param(model) | ||
|
||
# Check if the model is in the cache | ||
self.assertIn(model, _schema_cache) | ||
|
||
# Test that all models are in the cache | ||
self.assertEqual(len(_schema_cache), 5) | ||
|
||
# Let the models go out of scope and trigger garbage collection | ||
models = None | ||
gc.collect() | ||
|
||
# After garbage collection, the cache should be empty or reduced | ||
# since we're using weakref.WeakKeyDictionary | ||
self.assertLess(len(_schema_cache), 5) | ||
|
||
@pytest.mark.asyncio | ||
async def test_async_completions_parse_memory(): | ||
"""Test if AsyncCompletions.parse() doesn't leak memory with dynamic models""" | ||
StepModel = create_model( | ||
"Step", | ||
explanation=(str, Field()), | ||
output=(str, Field()), | ||
) | ||
|
||
# Clear the cache and record initial state | ||
_schema_cache.clear() | ||
initial_cache_size = len(_schema_cache) | ||
|
||
# Create a mock client | ||
mock_client = MagicMock() | ||
mock_client.chat.completions.create = AsyncMock() | ||
|
||
# Create the AsyncCompletions instance with our mock client | ||
completions = AsyncCompletions(mock_client) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems to be unused? |
||
|
||
# Simulate the issue by creating multiple models and making calls | ||
models = [] | ||
for i in range(10): | ||
# Create a new dynamic model each time | ||
new_model = create_model( | ||
f"MathResponse{i}", | ||
steps=(List[StepModel], Field()), | ||
final_answer=(str, Field()), | ||
) | ||
models.append(new_model) | ||
|
||
# Convert to response format and check if it's in the cache | ||
type_to_response_format_param(new_model) | ||
assert new_model in _schema_cache | ||
|
||
# Record cache size with all models referenced | ||
cache_size_with_references = len(_schema_cache) | ||
|
||
# Let the models go out of scope and trigger garbage collection | ||
models = None | ||
gc.collect() | ||
|
||
# After garbage collection, the cache should be significantly reduced | ||
cache_size_after_gc = len(_schema_cache) | ||
assert cache_size_after_gc < cache_size_with_references | ||
# The cache size should be close to the initial size (with some tolerance) | ||
assert cache_size_after_gc < cache_size_with_references / 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mousberg ,
Thanks for digging into this. As I understand this cache could help in the optimization of
type_to_response_format_param
as it will not recompute an already existing schema. I guess it solves the issue 2146 for the given example. But I don't see how it could stop the memory from rising up when we submit models that are always different.Could we use some kind of size max size for this cache? Something like an LRU?
Your first test case actually brings a very good insight: if we keep on generating new models, the cache size grows infinitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback, @anteverse
That's a good point about unbounded memory growth when continuously generating new models.
I did look into an LRU cache, but ran into challenges with picking the right cache key strategy (model identity vs. schema-based hashing) while also making sure weak references were properly maintained.
I see the problem as twofold:
Your insight on the test case really highlights where things get tricky. If anyone's up for digging into an LRU-based approach, I'd love to collaborate!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey,
Thanks for narrowing down the problems, I think you did most of the job.
I'd love to collaborate as well.
By any chance, do you have any connections at Pydantic? The folks there could help come up with a common key.
By the way, if we end up with a cache that has a limited size, what would be a good measure for the limit? Should it be frozen? Accessible to the end user? Should we have an heuristic based on the machine's capacity?
I'll run a few tests on my end as well!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@anteverse Yeah, reached out to Samuel (Pydantic's founder), and he suggested an approach for the cache key:
Build a string from the fields and types, optionally hash it, and use that as the cache key.
He also mentioned that if we need more input, it might be worth posting on Pydantic's GitHub to see if the community has additional insights.
Thoughts? I can prototype this approach and see if it addresses the issue!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I'm missing something but I don't see how this is a concern? if you're continuously creating new models then they should be getting GC'd at some point and if they're not, then you're still holding a reference to the model type yourself? if that is the case can you share more about your use case?