Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions superset/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2257,3 +2257,8 @@ class ExtraDynamicQueryFilters(TypedDict, total=False):
if env_var in os.environ:
config_var = env_var.replace("SUPERSET__", "")
globals()[config_var] = os.environ[env_var]


ASYNC_TASK_MAX_RETRIES = 3
ASYNC_TASK_RETRY_BACKOFF = True
ASYNC_TASK_RETRY_BACKOFF_MAX = 60
38 changes: 34 additions & 4 deletions superset/tasks/async_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
import logging
from typing import Any, cast, TYPE_CHECKING

import celery
from celery.exceptions import SoftTimeLimitExceeded
from flask import current_app, g
from flask_appbuilder.security.sqla.models import User
from marshmallow import ValidationError
from requests.exceptions import ConnectionError, Timeout
from sqlalchemy.exc import OperationalError

from superset.charts.schemas import ChartDataQueryContextSchema
from superset.exceptions import SupersetVizException
Expand All @@ -37,6 +40,9 @@
from superset.utils.core import override_user
from superset.views.utils import get_datasource_info, get_viz

# Only retry transient issues
RETRYABLE_EXCEPTIONS = (OperationalError, ConnectionError, Timeout)

if TYPE_CHECKING:
from superset.common.query_context import QueryContext

Expand Down Expand Up @@ -79,10 +85,17 @@ def _load_user_from_job_metadata(job_metadata: dict[str, Any]) -> User:
return user


@celery_app.task(name="load_chart_data_into_cache", soft_time_limit=query_timeout)
@celery_app.task(
name="load_chart_data_into_cache",
soft_time_limit=query_timeout,
bind=True,
autoretry_for=RETRYABLE_EXCEPTIONS,
retry_backoff=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF", True),
retry_backoff_max=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF_MAX", 60),
max_retries=current_app.config.get("ASYNC_TASK_MAX_RETRIES", 3),
)
def load_chart_data_into_cache(
job_metadata: dict[str, Any],
form_data: dict[str, Any],
self: celery.Task, job_metadata: dict[str, Any], form_data: dict[str, Any]
) -> None:
# pylint: disable=import-outside-toplevel
from superset.commands.chart.data.get_data_command import ChartDataCommand
Expand Down Expand Up @@ -110,11 +123,24 @@ def load_chart_data_into_cache(
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
attempt = self.request.retries + 1
logger.warning(
"Retrying load_chart_data_into_cache (attempt {%s}): {%s}", attempt, ex
)
raise


@celery_app.task(name="load_explore_json_into_cache", soft_time_limit=query_timeout)
@celery_app.task(
name="load_explore_json_into_cache",
soft_time_limit=query_timeout,
bind=True,
autoretry_for=RETRYABLE_EXCEPTIONS,
retry_backoff=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF", True),
retry_backoff_max=current_app.config.get("ASYNC_TASK_RETRY_BACKOFF_MAX", 60),
max_retries=current_app.config.get("ASYNC_TASK_MAX_RETRIES", 3),
)
def load_explore_json_into_cache( # pylint: disable=too-many-locals
self: celery.Task,
job_metadata: dict[str, Any],
form_data: dict[str, Any],
response_type: str | None = None,
Expand Down Expand Up @@ -179,4 +205,8 @@ def load_explore_json_into_cache( # pylint: disable=too-many-locals
async_query_manager.update_job(
job_metadata, async_query_manager.STATUS_ERROR, errors=errors
)
attempt = self.request.retries + 1
logger.warning(
"Retrying load_explore_json_into_cache (attempt {%s}): {%s}", attempt, ex
)
raise