diff --git a/airflow-core/src/airflow/airflow/docs/apache-airflow/logging-configuration.rst b/airflow-core/src/airflow/airflow/docs/apache-airflow/logging-configuration.rst new file mode 100644 index 0000000000000..279e7c33aef8f --- /dev/null +++ b/airflow-core/src/airflow/airflow/docs/apache-airflow/logging-configuration.rst @@ -0,0 +1,70 @@ +Default Logging in Apache Airflow +================================= + +Apache Airflow has multiple loggers for different components, which can be confusing for new users. +This section explains the default loggers, their purposes, and how to modify their behavior. + +Default Loggers +--------------- + ++------------------------+----------------+----------------------------+--------------------------------+ +| Logger Name | Component | Output | Notes | ++========================+================+============================+================================+ +| root | Webserver | stdout / webserver log | Default root logger used by webserver. | ++------------------------+----------------+----------------------------+--------------------------------+ +| airflow.task | Scheduler/Worker | logs////.log | A new log file is created per task instance and try. Shown in the Web UI. | ++------------------------+----------------+----------------------------+--------------------------------+ +| airflow.processor | Scheduler/Worker | logs/.log | Logs DAG parsing for scheduler and workers. | ++------------------------+----------------+----------------------------+--------------------------------+ +| airflow.processor_manager | Scheduler | logs/.log | Logs task instance execution control. | ++------------------------+----------------+----------------------------+--------------------------------+ +| flask_appbuilder | Webserver | filters verbose FAB logs | Typically used for filtering; no config needed by most users. | ++------------------------+----------------+----------------------------+--------------------------------+ + +Logging by Airflow Component +---------------------------- + +- **Webserver**: Uses the root logger. Logs to stdout and webserver log file. +- **Worker**: Uses `airflow.task` and `airflow.processor`. Task logs are stored per task instance. DAG parsing logs are stored per DAG file. +- **Scheduler**: Uses `airflow.processor`, `airflow.processor_manager`, and the root logger. + +Customizing Logging +------------------- + +You can influence the logging configuration using the following methods: + +1. **Configuration via airflow.cfg** + - `[logging]` section allows changing: + - Base log folder (`base_log_folder`) + - Remote logging settings + - Logging format + +2. **Custom Python logging configuration** + - Airflow uses `airflow.utils.log.logging_config.py` + - You can override `LOGGING_CONFIG` in `airflow_local_settings.py` + - Example: + + .. code-block:: python + + from airflow.utils.log.logging_config import DEFAULT_LOGGING_CONFIG + LOGGING_CONFIG = DEFAULT_LOGGING_CONFIG.copy() + LOGGING_CONFIG['handlers']['console']['level'] = 'INFO' + +3. **Environment Variables** + - Some logging options can be set via environment variables, e.g.: + - `AIRFLOW__LOGGING__BASE_LOG_FOLDER` + - `AIRFLOW__LOGGING__REMOTE_LOGGING` + +Recommendations +--------------- + +- Use `airflow.task` logs to debug task failures. +- Use `airflow.processor` to debug DAG parsing issues. +- For production, consider remote logging (S3, GCS, Elasticsearch) for scalability. +- Do **not** modify `flask_appbuilder` logger unless needed. + +References +---------- + +- :doc:`/configuration/logging` +- :ref:`task-logs` diff --git a/airflow-core/src/airflow/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py b/airflow-core/src/airflow/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py new file mode 100644 index 0000000000000..92a9d653b8b24 --- /dev/null +++ b/airflow-core/src/airflow/airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py @@ -0,0 +1,107 @@ +# airflow/providers/cncf/kubernetes/operators/spark_kubernetes.py + +from kubernetes.client import V1EnvVar +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from airflow.models import BaseOperator +from typing import Optional + +class SparkKubernetesOperator(BaseOperator): + """ + SparkKubernetesOperator launches Spark driver and executor pods in Kubernetes. + This version adds a SPARK_APPLICATION_NAME environment variable to both pods. + """ + + def __init__( + self, + *, + application_name: str, + namespace: str = "default", + # other arguments + **kwargs, + ): + super().__init__(**kwargs) + self.application_name = application_name + self.namespace = namespace + # Initialize other needed fields + + def execute(self, context): + """ + Build and submit the driver and executor pods. + """ + dag_run = context.get("dag_run") + self.dag_run = dag_run # store DAG run to use in _get_spark_app_name + + # --- Example: create driver pod spec --- + driver_spec = self._build_driver_pod_spec() + + # Add SPARK_APPLICATION_NAME env variable to driver pod + driver_env = driver_spec['spec']['containers'][0].env or [] + driver_env.append( + V1EnvVar( + name="SPARK_APPLICATION_NAME", + value=self._get_spark_app_name() + ) + ) + driver_spec['spec']['containers'][0].env = driver_env + + # --- Example: create executor pod spec --- + executor_spec = self._build_executor_pod_spec() + + # Add SPARK_APPLICATION_NAME env variable to executor pod + executor_env = executor_spec['spec']['containers'][0].env or [] + executor_env.append( + V1EnvVar( + name="SPARK_APPLICATION_NAME", + value=self._get_spark_app_name() + ) + ) + executor_spec['spec']['containers'][0].env = executor_env + + # Submit driver and executor pods + self._submit_driver(driver_spec) + self._submit_executors(executor_spec) + + # Other existing logic... + + def _get_spark_app_name(self) -> str: + """ + Returns the Spark application name for this DAG run. + Combines the base application name with DAG run ID for deterministic uniqueness. + """ + suffix = getattr(self, "dag_run", None).run_id if hasattr(self, "dag_run") else "manual" + return f"{self.application_name}-{suffix}" + + # Placeholder methods for building and submitting pods + def _build_driver_pod_spec(self): + # Existing logic to create driver pod spec + return { + "spec": { + "containers": [ + { + "name": "spark-driver", + "env": [] + } + ] + } + } + + def _build_executor_pod_spec(self): + # Existing logic to create executor pod spec + return { + "spec": { + "containers": [ + { + "name": "spark-executor", + "env": [] + } + ] + } + } + + def _submit_driver(self, driver_spec): + # Existing submission logic + pass + + def _submit_executors(self, executor_spec): + # Existing submission logic + pass diff --git a/airflow-core/src/airflow/ui/src/components/DataTable/DataTable.tsx b/airflow-core/src/airflow/ui/src/components/DataTable/DataTable.tsx index 5ad64589b092f..d7d91be93316c 100644 --- a/airflow-core/src/airflow/ui/src/components/DataTable/DataTable.tsx +++ b/airflow-core/src/airflow/ui/src/components/DataTable/DataTable.tsx @@ -38,7 +38,14 @@ import { ToggleTableDisplay } from "src/components/DataTable/ToggleTableDisplay" import { createSkeletonMock } from "src/components/DataTable/skeleton"; import type { CardDef, MetaColumn, TableState } from "src/components/DataTable/types"; import { ProgressBar, Pagination, Toaster } from "src/components/ui"; - +const loadFromStorage = (key: string, fallback: T): T => { + try { + const raw = localStorage.getItem(key); + return raw ? JSON.parse(raw) : fallback; + } catch { + return fallback; + } +}; type DataTableProps = { readonly allowFiltering?: boolean; readonly cardDef?: CardDef; @@ -83,6 +90,7 @@ export const DataTable = ({ skeletonCount = 10, total = 0, }: DataTableProps) => { + const storageKey = `airflow.datatable.${modelName}`; "use no memo"; // remove if https://github.com/TanStack/table/issues/5567 is resolved const { t: translate } = useTranslation(["common"]); @@ -109,9 +117,21 @@ export const DataTable = ({ [onStateChange], ); - const [columnVisibility, setColumnVisibility] = useState( + const [columnVisibility, setColumnVisibility] = useState(() => + loadFromStorage( + `${storageKey}.columnVisibility`, initialState?.columnVisibility ?? {}, + ), +); + + useEffect(() => { + + localStorage.setItem( + `${storageKey}.columnVisibility`, + JSON.stringify(columnVisibility), ); + }, [columnVisibility, storageKey]); + const rest = Boolean(isLoading) ? createSkeletonMock(displayMode, skeletonCount, columns) : {};