Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2257,3 +2257,8 @@ class ExtraDynamicQueryFilters(TypedDict, total=False):
if env_var in os.environ:
config_var = env_var.replace("SUPERSET__", "")
globals()[config_var] = os.environ[env_var]


ASYNC_TASK_MAX_RETRIES = 3
ASYNC_TASK_RETRY_BACKOFF = True
ASYNC_TASK_RETRY_BACKOFF_MAX = 60
38 changes: 34 additions & 4 deletions superset/tasks/async_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import logging
from typing import Any, cast, TYPE_CHECKING

import celery
from celery.exceptions import SoftTimeLimitExceeded
from flask import current_app, g
from flask_appbuilder.security.sqla.models import User
from marshmallow import ValidationError
from requests.exceptions import ConnectionError, Timeout
from sqlalchemy.exc import OperationalError

from superset.charts.schemas import ChartDataQueryContextSchema
from superset.exceptions import SupersetVizException
Expand All @@ -37,6 +40,9 @@
from superset.utils.core import override_user
from superset.views.utils import get_datasource_info, get_viz

# Only retry transient issues
RETRYABLE_EXCEPTIONS = (OperationalError, ConnectionError, Timeout)

if TYPE_CHECKING:
from superset.common.query_context import QueryContext

Expand Down Expand Up @@ -79,10 +85,17 @@ def _load_user_from_job_metadata(job_metadata: dict[str, Any]) -> User:
return user


@celery_app.task(name="load_chart_data_into_cache", soft_time_limit=query_timeout)
@celery_app.task(
name="load_chart_data_into_cache",
soft_time_limit=query_timeout,
bind=True,
autoretry_for=RETRYABLE_EXCEPTIONS,
retry_backoff=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF", True),
retry_backoff_max=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF_MAX", 60),
max_retries=current_app.config.get("ASYNC_TASK_MAX_RETRIES", 3),
)
def load_chart_data_into_cache(
job_metadata: dict[str, Any],
form_data: dict[str, Any],
self: celery.Task, job_metadata: dict[str, Any], form_data: dict[str, Any]
) -> None:
# pylint: disable=import-outside-toplevel
from superset.commands.chart.data.get_data_command import ChartDataCommand
Expand Down Expand Up @@ -110,11 +123,24 @@ def load_chart_data_into_cache(
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
attempt = self.request.retries + 1
logger.warning(
"Retrying load_chart_data_into_cache (attempt {%s}): {%s}", attempt, ex
)
raise


@celery_app.task(name="load_explore_json_into_cache", soft_time_limit=query_timeout)
@celery_app.task(
name="load_explore_json_into_cache",
soft_time_limit=query_timeout,
bind=True,
autoretry_for=RETRYABLE_EXCEPTIONS,
retry_backoff=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF", True),
retry_backoff_max=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF_MAX", 60),
max_retries=current_app.config.get("ASYNC_TASK_MAX_RETRIES", 3),
)
def load_explore_json_into_cache( # pylint: disable=too-many-locals
self: celery.Task,
job_metadata: dict[str, Any],
form_data: dict[str, Any],
response_type: str | None = None,
Expand Down Expand Up @@ -179,4 +205,8 @@ def load_explore_json_into_cache( # pylint: disable=too-many-locals
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
attempt = self.request.retries + 1
logger.warning(
"Retrying load_chart_data_into_cache (attempt {%s}): {%s}", attempt, ex
)
raise