Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ and others would not be possible without your help.

Ready? [Getting Started](https://kyuubi.readthedocs.io/en/master/quick_start/) with Kyuubi.

## Security & Guard

- [Dangerous Join Watchdog](./docs/watchdog/dangerous-join.md)

## [Contributing](./CONTRIBUTING.md)

## Project & Community Status
Expand Down
12 changes: 12 additions & 0 deletions docs/configuration/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,18 @@ jdbc:hive2://localhost:10009/default;#spark.sql.shuffle.partitions=2;spark.execu

Please refer to the Spark official online documentation for [SET Command](https://spark.apache.org/docs/latest/sql-ref-syntax-aux-conf-mgmt-set.html)

### Dangerous Join Watchdog

You can enable dangerous join detection for Spark SQL extension with:

| Name | Default | Description |
|------------------------------------------------|---------|------------------------------------------------------------------------------------|
| `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable dangerous join detection |
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold to identify oversized broadcast fallback |
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs warning diagnostics, `REJECT` throws exception with error code `41101` |

Please see [Dangerous Join Watchdog](../watchdog/dangerous-join.md) for rules and examples.

## Flink Configurations

### Via flink-conf.yaml
Expand Down
3 changes: 2 additions & 1 deletion docs/deployment/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Basics
:glob:

kyuubi_on_kubernetes
settings
hive_metastore
high_availability_guide
migration-guide
Expand All @@ -42,4 +43,4 @@ Engines
engine_on_kubernetes
engine_share_level
engine_lifecycle
spark/index
spark/index
34 changes: 34 additions & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Deployment Settings for Dangerous Join Watchdog

## Spark SQL Extensions

```properties
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
```

## Dangerous Join Configurations

| Name | Default | Description |
|------------------------------------------------|---------|-----------------------------------------------------------|
| `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable dangerous join watchdog |
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Broadcast threshold coefficient |
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` only logs diagnostics, `REJECT` throws error 41101 |

For detailed rules and examples, see [Dangerous Join Watchdog](../watchdog/dangerous-join.md).
57 changes: 30 additions & 27 deletions docs/extensions/engines/spark/rules.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ What's Next

quick_start/index
configuration/settings
watchdog/dangerous-join
deployment/index
Security <security/index>
monitor/index
Expand Down
105 changes: 105 additions & 0 deletions docs/watchdog/dangerous-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-->

# Dangerous Join Watchdog

Kyuubi Dangerous Join Watchdog detects risky join planning patterns before query execution.
It helps reduce accidental Cartesian products, oversized broadcast attempts, and long-running nested loop joins.

## Background

In shared SQL gateway environments, a single risky join can consume excessive driver memory or create very slow jobs.
The Dangerous Join Watchdog adds planning-time checks for these high-risk patterns.

## Risk Rules

### Equi-Join

- Rule 1: Equi-join is marked dangerous when it degrades to a Cartesian pattern.
- Rule 2: Equi-join is marked dangerous when the estimated build side exceeds the configured broadcast ratio threshold.

### Non-Equi Join

- Rule 1: Non-equi join is marked dangerous when both sides exceed broadcast threshold and effectively become Cartesian risk.
- Rule 2: Non-equi join is marked dangerous when build side is not selectable and the plan falls back to a second BNLJ pattern.

## Configurations

| Name | Default | Meaning |
|------------------------------------------------|---------|---------------------------------------------------------------------------|
| `kyuubi.watchdog.dangerousJoin.enabled` | `false` | Enable or disable dangerous join detection |
| `kyuubi.watchdog.dangerousJoin.broadcastRatio` | `0.8` | Ratio against Spark broadcast threshold for warning/reject decision |
| `kyuubi.watchdog.dangerousJoin.action` | `WARN` | `WARN` logs diagnostics; `REJECT` throws exception and rejects submission |

## Usage

1. Put Kyuubi Spark extension jar into Spark classpath.
2. Configure SQL extensions:

```properties
spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension,org.apache.kyuubi.sql.watchdog.KyuubiDangerousJoinExtension
```

3. Configure action:

```properties
kyuubi.watchdog.dangerousJoin.action=WARN
```

or

```properties
kyuubi.watchdog.dangerousJoin.action=REJECT
```

## Sample WARN Log

When action is `WARN`, Kyuubi writes a structured JSON payload:

```text
KYUUBI_LOG_KEY={"sql":"SELECT ...","joinType":"INNER","reason":"Cartesian","leftSize":10485760,"rightSize":15728640,"broadcastThreshold":10485760,"broadcastRatio":0.8}
```

## Sample REJECT Error

When action is `REJECT`, query submission fails with:

```text
errorCode=41101
Query rejected due to dangerous join strategy: {...details...}
```

## Disable or Tune

- Disable watchdog:

```properties
kyuubi.watchdog.dangerousJoin.enabled=false
```

- Increase tolerance:

```properties
kyuubi.watchdog.dangerousJoin.broadcastRatio=0.95
```

## FAQ

### What if `spark.sql.adaptive.enabled=true`?

Dangerous Join Watchdog runs in planner strategy phase and evaluates pre-execution plan statistics.
AQE may still optimize runtime plans, but watchdog decisions are made before query execution starts.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,30 @@ object KyuubiSQLConf {
.bytesConf(ByteUnit.BYTE)
.createOptional

val DANGEROUS_JOIN_ENABLED =
buildConf("kyuubi.watchdog.dangerousJoin.enabled")
.doc("Enable dangerous join condition detection.")
.version("1.11.0")
.booleanConf
.createWithDefault(false)

val DANGEROUS_JOIN_BROADCAST_RATIO =
buildConf("kyuubi.watchdog.dangerousJoin.broadcastRatio")
.doc("The threshold ratio to mark oversized broadcast fallback.")
.version("1.11.0")
.doubleConf
.checkValue(v => v > 0 && v <= 1, "must be in (0, 1]")
.createWithDefault(0.8)

val DANGEROUS_JOIN_ACTION =
buildConf("kyuubi.watchdog.dangerousJoin.action")
.doc("Action when dangerous join is detected, one of WARN and REJECT.")
.version("1.11.0")
.stringConf
.transform(_.toUpperCase(java.util.Locale.ROOT))
.checkValues(Set("WARN", "REJECT"))
.createWithDefault("WARN")

val DROP_IGNORE_NONEXISTENT =
buildConf("spark.sql.optimizer.dropIgnoreNonExistent")
.doc("Do not report an error if DROP DATABASE/TABLE/VIEW/FUNCTION/PARTITION specifies " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.sql

import org.apache.spark.sql.{FinalStageResourceManager, InjectCustomResourceProfile, SparkSessionExtensions}

import org.apache.kyuubi.sql.watchdog.{KyuubiUnsupportedOperationsCheck, MaxScanStrategy}
import org.apache.kyuubi.sql.watchdog.{DangerousJoinInterceptor, KyuubiUnsupportedOperationsCheck, MaxScanStrategy}

// scalastyle:off line.size.limit
/**
Expand All @@ -39,6 +39,7 @@ class KyuubiSparkSQLExtension extends (SparkSessionExtensions => Unit) {
// watchdog extension
extensions.injectCheckRule(_ => KyuubiUnsupportedOperationsCheck)
extensions.injectPlannerStrategy(MaxScanStrategy)
extensions.injectPlannerStrategy(DangerousJoinInterceptor(_))

extensions.injectQueryStagePrepRule(FinalStageResourceManager(_))
extensions.injectQueryStagePrepRule(InjectCustomResourceProfile)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.sql.watchdog

import scala.collection.mutable.ArrayBuffer

object DangerousJoinCounter {
case class Entry(
sqlText: String,
joinType: String,
reason: String,
leftSize: BigInt,
rightSize: BigInt,
broadcastThreshold: Long,
broadcastRatio: Double) {
def toJson: String = {
val pairs = Seq(
"sql" -> escape(sqlText),
"joinType" -> escape(joinType),
"reason" -> escape(reason),
"leftSize" -> leftSize.toString,
"rightSize" -> rightSize.toString,
"broadcastThreshold" -> broadcastThreshold.toString,
"broadcastRatio" -> broadcastRatio.toString)
pairs.map { case (k, v) =>
if (k == "leftSize" || k == "rightSize" || k == "broadcastThreshold" || k == "broadcastRatio") {
s""""$k":$v"""
} else {
s""""$k":"$v""""
}
}.mkString("{", ",", "}")
}
}

private val entries = ArrayBuffer.empty[Entry]

def add(entry: Entry): Unit = synchronized {
entries += entry
}

def count: Int = synchronized {
entries.size
}

def latest: Option[Entry] = synchronized {
entries.lastOption
}

def snapshot: Seq[Entry] = synchronized {
entries.toSeq
}

def reset(): Unit = synchronized {
entries.clear()
}

private def escape(raw: String): String = {
raw
.replace("\\", "\\\\")
.replace("\"", "\\\"")
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
}
}
Loading
Loading