Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions experimenter/experimenter/experiments/api/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@
from django.conf import settings
from django.core.cache import cache
from django.http import HttpResponse
from rest_framework.compat import SHORT_SEPARATORS
from rest_framework.renderers import JSONRenderer
from rest_framework.utils.encoders import JSONEncoder as DRFJSONEncoder

logger = logging.getLogger(__name__)

DEFAULT_STREAM_CHUNK_SIZE = 25


def get_api_cache_key(view_name, query_params=None):
"""Build a deterministic cache key from the view name and query parameters."""
Expand All @@ -20,15 +24,68 @@ def get_api_cache_key(view_name, query_params=None):
return f"nimbus:api:{view_name}"


class _StreamArray(list[object]):
"""Lets ``stream_render_queryset`` actually stream the queryset, rather than
holding the whole serialised graph in memory.
"""

def __init__(self, gen):
super().__init__()
self._iter = iter(gen)
try:
self._first = next(self._iter)
self._empty = False
except StopIteration:
self._empty = True

def __iter__(self):
yield self._first
yield from self._iter

def __len__(self):
return 0 if self._empty else 1


def _drf_compatible_encoder():
"""Keeps the warm cache and a fresh on-miss render byte-identical for the
same data.
"""
return DRFJSONEncoder(
ensure_ascii=JSONRenderer.ensure_ascii,
allow_nan=not JSONRenderer.strict,
separators=SHORT_SEPARATORS,
)


def stream_render_queryset(
queryset,
serializer_class,
chunk_size=DEFAULT_STREAM_CHUNK_SIZE,
):
"""Streams the warm-cache JSON in bounded memory — materialising it all at
once OOM-kills the worker (#15621).
"""
items = (
serializer_class(obj).data for obj in queryset.iterator(chunk_size=chunk_size)
)
encoder = _drf_compatible_encoder()
return b"".join(
chunk.encode("utf-8") for chunk in encoder.iterencode(_StreamArray(items))
)


def warm_api_cache(key_prefix, queryset, serializer_class, renderer=None, sort_key=None):
"""Query the DB, serialize, and store the rendered response in the cache."""
if renderer is None:
renderer = JSONRenderer()
qs = queryset.all()

if sort_key is not None:
qs = sorted(qs, key=sort_key, reverse=True)
data = serializer_class(qs, many=True).data
rendered = renderer.render(data)
qs = sorted(queryset.all(), key=sort_key, reverse=True)
data = serializer_class(qs, many=True).data
rendered = renderer.render(data)
else:
rendered = stream_render_queryset(queryset, serializer_class)

Comment on lines 82 to +88
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So will the v5:csv still fail since it has a sort_key defined? But it just won't kill the tasks for the other endpoints now?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No v5:csv wasn't failing, it was just succeeding before proceeding to the next ones which were then OOMing. It should still proceed healthily.

cache_key = get_api_cache_key(key_prefix)
cache.set(cache_key, rendered, timeout=settings.API_CACHE_WARMING_TTL)
logger.info("Warmed cache for %s (%d bytes)", key_prefix, len(rendered))
Expand Down
40 changes: 28 additions & 12 deletions experimenter/experimenter/experiments/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,36 @@ def _get_warm_cache_endpoints():
]


@app.task
@metrics.timer_decorator("warm_api_caches")
def warm_api_caches():
"""Pre-populate the API list cache so requests are always served instantly."""
metrics.incr("warm_api_caches.started")
def _get_warm_cache_endpoint(key_prefix):
for entry in _get_warm_cache_endpoints():
if entry[0] == key_prefix:
return entry
return None

try:
for key_prefix, queryset, serializer_class, kwargs in _get_warm_cache_endpoints():
warm_api_cache(key_prefix, queryset, serializer_class, **kwargs)
logger.info("Warmed %s", key_prefix)

metrics.incr("warm_api_caches.completed")
@app.task
@metrics.timer_decorator("warm_api_cache_endpoint")
def warm_api_cache_endpoint(key_prefix):
metrics.incr(f"warm_api_cache_endpoint.{key_prefix}.started")
entry = _get_warm_cache_endpoint(key_prefix)
if entry is None:
metrics.incr(f"warm_api_cache_endpoint.{key_prefix}.unknown")
logger.error("Unknown cache endpoint: %s", key_prefix)
return

_, queryset, serializer_class, kwargs = entry
try:
warm_api_cache(key_prefix, queryset, serializer_class, **kwargs)
logger.info("Warmed %s", key_prefix)
metrics.incr(f"warm_api_cache_endpoint.{key_prefix}.completed")
except Exception as e:
metrics.incr("warm_api_caches.failed")
logger.exception("warm_api_caches failed: %s", e)
metrics.incr(f"warm_api_cache_endpoint.{key_prefix}.failed")
logger.exception("Failed to warm %s: %s", key_prefix, e)
raise


@app.task
def warm_api_caches():
metrics.incr("warm_api_caches.dispatched")
for key_prefix, _, _, _ in _get_warm_cache_endpoints():
warm_api_cache_endpoint.delay(key_prefix)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
},
CELERY_TASK_ALWAYS_EAGER=True,
CELERY_TASK_EAGER_PROPAGATES=False,
)
class CachedViewSetTest(TestCase):
def setUp(self):
Expand Down
85 changes: 80 additions & 5 deletions experimenter/experimenter/experiments/tests/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,22 @@

from django.core.cache import cache
from django.test import TestCase, override_settings
from rest_framework.renderers import JSONRenderer

from experimenter.experiments.api.cache import get_api_cache_key
from experimenter.experiments.tasks import _get_warm_cache_endpoints, warm_api_caches
from experimenter.experiments.api.cache import (
get_api_cache_key,
stream_render_queryset,
)
from experimenter.experiments.api.cache import warm_api_cache as real_warm_api_cache
from experimenter.experiments.api.v8.serializers import (
NimbusExperimentSerializer as V8NimbusExperimentSerializer,
)
from experimenter.experiments.api.v8.views import NimbusExperimentViewSet as V8ViewSet
from experimenter.experiments.tasks import (
_get_warm_cache_endpoints,
warm_api_cache_endpoint,
warm_api_caches,
)
from experimenter.experiments.tests.factories import NimbusExperimentFactory


Expand All @@ -14,7 +27,9 @@
"default": {
"BACKEND": "django.core.cache.backends.locmem.LocMemCache",
}
}
},
CELERY_TASK_ALWAYS_EAGER=True,
CELERY_TASK_EAGER_PROPAGATES=False,
)
class TestWarmApiCaches(TestCase):
def setUp(self):
Expand Down Expand Up @@ -126,12 +141,72 @@ def test_warm_api_caches_v5_csv_endpoint(self):
content = cached.decode("utf-8") if isinstance(cached, bytes) else cached
self.assertIn("csv-experiment", content)

def test_warm_api_caches_raises_on_error(self):
def test_warm_api_caches_dispatches_one_subtask_per_endpoint(self):
with mock.patch(
"experimenter.experiments.tasks.warm_api_cache_endpoint.delay"
) as mock_delay:
warm_api_caches()

dispatched_keys = [call.args[0] for call in mock_delay.call_args_list]
expected_keys = [entry[0] for entry in _get_warm_cache_endpoints()]
self.assertEqual(sorted(dispatched_keys), sorted(expected_keys))

def test_warm_api_caches_failed_subtask_does_not_block_other_subtasks(self):
NimbusExperimentFactory.create_with_lifecycle(
NimbusExperimentFactory.Lifecycles.LIVE_ENROLLING,
slug="live-experiment",
)

def fake_warm(key_prefix, *args, **kwargs):
if key_prefix == "v6:experiments":
raise RuntimeError("simulated OOM for v6")
return real_warm_api_cache(key_prefix, *args, **kwargs)

with mock.patch(
"experimenter.experiments.tasks.warm_api_cache", side_effect=fake_warm
):
warm_api_caches()

self.assertIsNone(cache.get(get_api_cache_key("v6:experiments")))
for key_prefix in (
"v5:csv",
"v6:draft-experiments",
"v7:experiments",
"v8:experiments",
"v8:draft-experiments",
):
self.assertIsNotNone(
cache.get(get_api_cache_key(key_prefix)),
f"{key_prefix} should still be warmed when v6:experiments fails",
)

def test_warm_api_cache_endpoint_unknown_key_logs_and_returns(self):
with self.assertLogs("experimenter.experiments.tasks", level="ERROR") as captured:
warm_api_cache_endpoint("does-not-exist")
self.assertTrue(any("Unknown cache endpoint" in line for line in captured.output))

def test_warm_api_cache_endpoint_raises_on_warm_failure(self):
with (
mock.patch(
"experimenter.experiments.tasks.warm_api_cache",
side_effect=Exception("serialization failed"),
),
self.assertRaises(Exception),
):
warm_api_caches()
warm_api_cache_endpoint("v6:experiments")

def test_stream_render_queryset_matches_drf_json_renderer(self):
for slug in ("eq-experiment-1", "eq-experiment-2", "eq-experiment-3"):
NimbusExperimentFactory.create_with_lifecycle(
NimbusExperimentFactory.Lifecycles.LIVE_ENROLLING,
slug=slug,
)

queryset = V8ViewSet.queryset
streamed = stream_render_queryset(queryset, V8NimbusExperimentSerializer)

rendered = JSONRenderer().render(
V8NimbusExperimentSerializer(queryset.all(), many=True).data
)

self.assertEqual(streamed, rendered)
Loading