Skip to content

Commit 3feeb1f

Browse files
authored
Add configurable ClickHouse query timeout (#542)
* fix: add configurable ClickHouse query timeout * test: assert default client query timeout in ClickHouseHelperSuite
1 parent c21e25b commit 3feeb1f

48 files changed

Lines changed: 269 additions & 106 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

clickhouse-core/src/main/scala/com/clickhouse/spark/client/ClusterClient.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import scala.collection.JavaConverters._
2323
import scala.util.Random._
2424

2525
object ClusterClient {
26-
def apply(cluster: ClusterSpec) = new ClusterClient(cluster)
26+
def apply(cluster: ClusterSpec, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS): ClusterClient =
27+
new ClusterClient(cluster, queryTimeoutMs)
2728
}
2829

29-
class ClusterClient(cluster: ClusterSpec) extends AutoCloseable with Logging {
30+
class ClusterClient(cluster: ClusterSpec, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS)
31+
extends AutoCloseable with Logging {
3032

3133
@transient lazy val cache = new ConcurrentHashMap[(Int, Int), NodeClient]
3234

@@ -53,7 +55,7 @@ class ClusterClient(cluster: ClusterSpec) extends AutoCloseable with Logging {
5355
val replicaSpec = shardSpec.replicas.find(_.num == r).get
5456
val nodeSpec = replicaSpec.node
5557
log.info(s"Create client to $nodeSpec, shard $s replica $r")
56-
new NodeClient(nodeSpec)
58+
new NodeClient(nodeSpec, queryTimeoutMs)
5759
}
5860
)
5961
}

clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodeClient.scala

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,14 @@ import java.util.UUID
4242
import scala.util.{Failure, Success, Try}
4343

4444
object NodeClient {
45-
def apply(node: NodeSpec): NodeClient = new NodeClient(node)
45+
val DEFAULT_QUERY_TIMEOUT_MS: Long = 60000L
46+
47+
def apply(node: NodeSpec, queryTimeoutMs: Long = DEFAULT_QUERY_TIMEOUT_MS): NodeClient =
48+
new NodeClient(node, queryTimeoutMs)
4649
}
4750

48-
class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
49-
// TODO: add configurable timeout
50-
private val timeout: Int = 60000
51+
class NodeClient(val nodeSpec: NodeSpec, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS)
52+
extends AutoCloseable with Logging {
5153

5254
private lazy val userAgent: String = {
5355
val title = getClass.getPackage.getImplementationTitle
@@ -199,7 +201,7 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
199201
querySettings.setFormat(clickHouseFormat)
200202
querySettings.setQueryId(queryId)
201203
settings.foreach { case (k, v) => querySettings.setOption(k, v) }
202-
Try(client.query(sql, querySettings).get(timeout, TimeUnit.MILLISECONDS)) match {
204+
Try(client.query(sql, querySettings).get(queryTimeoutMs, TimeUnit.MILLISECONDS)) match {
203205
case Success(response: QueryResponse) => Right(deserializer(response.getInputStream))
204206
case Failure(se: ServerException) => Left(CHServerException(se.getCode, se.getMessage, Some(nodeSpec), Some(se)))
205207
case Failure(ex: Exception) => Left(CHClientException(ex.getMessage, Some(nodeSpec), Some(ex)))
@@ -233,7 +235,7 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
233235
querySettings.setQueryId(queryId)
234236
settings.foreach { case (k, v) => querySettings.setOption(k, v) }
235237

236-
Try(client.query(sql, querySettings).get(timeout, TimeUnit.MILLISECONDS)) match {
238+
Try(client.query(sql, querySettings).get(queryTimeoutMs, TimeUnit.MILLISECONDS)) match {
237239
case Success(response: QueryResponse) => response
238240
case Failure(se: ServerException) => throw CHServerException(se.getCode, se.getMessage, Some(nodeSpec), Some(se))
239241
case Failure(ex: Exception) => throw CHClientException(ex.getMessage, Some(nodeSpec), Some(ex))
@@ -249,6 +251,6 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
249251
|$sql
250252
|""".stripMargin
251253
)
252-
def ping(timeout: Int = timeout) =
253-
client.ping(timeout)
254+
def ping(timeoutMs: Long = queryTimeoutMs) =
255+
client.ping(timeoutMs)
254256
}

clickhouse-core/src/main/scala/com/clickhouse/spark/client/NodesClient.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import scala.collection.JavaConverters._
2222
import scala.util.Random.shuffle
2323

2424
object NodesClient {
25-
def apply(nodes: Nodes) = new NodesClient(nodes)
25+
def apply(nodes: Nodes, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS): NodesClient =
26+
new NodesClient(nodes, queryTimeoutMs)
2627
}
2728

28-
class NodesClient(nodes: Nodes) extends AutoCloseable with Logging {
29+
class NodesClient(nodes: Nodes, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS)
30+
extends AutoCloseable with Logging {
2931
assert(nodes.nodes.nonEmpty)
3032

3133
@transient lazy val cache = new ConcurrentHashMap[NodeSpec, NodeClient]
@@ -37,7 +39,7 @@ class NodesClient(nodes: Nodes) extends AutoCloseable with Logging {
3739
nodeSpec,
3840
{ nodeSpec =>
3941
log.info(s"Create client of $nodeSpec")
40-
new NodeClient(nodeSpec)
42+
new NodeClient(nodeSpec, queryTimeoutMs)
4143
}
4244
)
4345
}

docs/configurations/02_sql_configurations.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ license: |
1616
<!--begin-include-->
1717
|Key | Default | Description | Since
1818
|--- | ------- | ----------- | -----
19+
spark.clickhouse.client.queryTimeout|60s|The maximum time the ClickHouse client will wait for a single query or ping operation to complete on a NodeClient. Applied as a future-handle timeout on every client.query(...) and client.ping(...) call.|0.10.1
1920
spark.clickhouse.ignoreUnsupportedTransform|true|ClickHouse supports using complex expressions as sharding keys or partition values, e.g. `cityHash64(col_1, col_2)`, and those can not be supported by Spark now. If `true`, ignore the unsupported expressions and log a warning, otherwise fail fast w/ an exception. Note, when `spark.clickhouse.write.distributed.convertLocal` is enabled, ignoring unsupported sharding keys may corrupt the data.|0.4.0
2021
spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0
2122
spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0

spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCatalog.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class ClickHouseCatalog extends TableCatalog
6767
this.catalogName = name
6868
this.nodeSpec = buildNodeSpec(options)
6969
this.currentDb = nodeSpec.database
70-
this.nodeClient = NodeClient(nodeSpec)
70+
this.nodeClient = NodeClient(nodeSpec, clientQueryTimeoutMs)
7171

7272
this.nodeClient.syncQueryAndCheckOutputJSONEachRow("SELECT 1")
7373

spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseCommandRunner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import com.clickhouse.spark.client.NodeClient
2121
class ClickHouseCommandRunner extends ExternalCommandRunner with ClickHouseHelper {
2222

2323
override def executeCommand(sql: String, options: CaseInsensitiveStringMap): Array[String] =
24-
Utils.tryWithResource(NodeClient(buildNodeSpec(options))) { nodeClient =>
24+
Utils.tryWithResource(NodeClient(buildNodeSpec(options), clientQueryTimeoutMs)) { nodeClient =>
2525
nodeClient.syncQueryAndCheckOutputJSONEachRow(sql).records.map(_.toString).toArray
2626
}
2727
}

spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseHelper.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ package com.clickhouse.spark
1717
import com.clickhouse.client.ClickHouseProtocol
1818
import com.fasterxml.jackson.databind.JsonNode
1919
import com.fasterxml.jackson.databind.node.NullNode
20+
import org.apache.spark.sql.catalyst.SQLConfHelper
2021
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
22+
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.CLIENT_QUERY_TIMEOUT
2123
import org.apache.spark.sql.clickhouse.SchemaUtils
2224
import org.apache.spark.sql.connector.catalog.Identifier
2325
import org.apache.spark.sql.types.StructType
@@ -32,14 +34,16 @@ import java.time.{LocalDateTime, ZoneId}
3234
import java.util.{HashMap => JHashMap}
3335
import scala.collection.JavaConverters._
3436

35-
trait ClickHouseHelper extends Logging {
37+
trait ClickHouseHelper extends SQLConfHelper with Logging {
3638

3739
@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_DATABASE: String => Unit =
3840
(db: String) => throw NoSuchNamespaceException(db)
3941

4042
@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_TABLE: (String, String) => Unit =
4143
(database, table) => throw NoSuchTableException(s"$database.$table")
4244

45+
def clientQueryTimeoutMs: Long = conf.getConf(CLIENT_QUERY_TIMEOUT)
46+
4347
def unwrap(ident: Identifier): Option[(String, String)] = ident.namespace() match {
4448
case Array(database) => Some((database, ident.name()))
4549
case _ => None

spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/ClickHouseTable.scala

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,11 +68,12 @@ case class ClickHouseTable(
6868

6969
lazy val (localTableSpec, localTableEngineSpec): (Option[TableSpec], Option[MergeTreeFamilyEngineSpec]) =
7070
engineSpec match {
71-
case distSpec: DistributedEngineSpec => Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
72-
val _localTableSpec = queryTableSpec(distSpec.local_db, distSpec.local_table)
73-
val _localTableEngineSpec =
74-
TableEngineUtils.resolveTableEngine(_localTableSpec).asInstanceOf[MergeTreeFamilyEngineSpec]
75-
(Some(_localTableSpec), Some(_localTableEngineSpec))
71+
case distSpec: DistributedEngineSpec => Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) {
72+
implicit nodeClient =>
73+
val _localTableSpec = queryTableSpec(distSpec.local_db, distSpec.local_table)
74+
val _localTableEngineSpec =
75+
TableEngineUtils.resolveTableEngine(_localTableSpec).asInstanceOf[MergeTreeFamilyEngineSpec]
76+
(Some(_localTableSpec), Some(_localTableEngineSpec))
7677
}
7778
case _ => (None, None)
7879
}
@@ -105,8 +106,9 @@ case class ClickHouseTable(
105106
ACCEPT_ANY_SCHEMA // TODO check schema and handle extra columns before writing
106107
).asJava
107108

108-
override lazy val schema: StructType = Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
109-
queryTableSchema(database, table)
109+
override lazy val schema: StructType = Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) {
110+
implicit nodeClient =>
111+
queryTableSchema(database, table)
110112
}
111113

112114
/**
@@ -192,7 +194,7 @@ case class ClickHouseTable(
192194
}
193195
}.mkString("(", ",", ")")
194196

195-
Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
197+
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
196198
engineSpec match {
197199
case DistributedEngineSpec(_, cluster, local_db, local_table, _, _) =>
198200
dropPartition(local_db, local_table, partitionExpr, Some(cluster))
@@ -235,12 +237,13 @@ case class ClickHouseTable(
235237
val partitionSpecs: Seq[PartitionSpec] = engineSpec match {
236238
case DistributedEngineSpec(_, _, local_db, local_table, _, _) =>
237239
cluster.get.shards.flatMap { shardSpec =>
238-
Utils.tryWithResource(NodeClient(shardSpec.nodes.head)) { implicit nodeClient: NodeClient =>
239-
queryPartitionSpec(local_db, local_table)
240+
Utils.tryWithResource(NodeClient(shardSpec.nodes.head, clientQueryTimeoutMs)) {
241+
implicit nodeClient: NodeClient =>
242+
queryPartitionSpec(local_db, local_table)
240243
}
241244
}
242245
case _ =>
243-
Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
246+
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
244247
queryPartitionSpec(database, table)
245248
}
246249
}
@@ -274,7 +277,7 @@ case class ClickHouseTable(
274277

275278
override def deleteWhere(filters: Array[Filter]): Unit = {
276279
val deleteExpr = compileFilters(AlwaysTrue :: filters.toList)
277-
Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
280+
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
278281
engineSpec match {
279282
case DistributedEngineSpec(_, cluster, local_db, local_table, _, _) =>
280283
delete(local_db, local_table, deleteExpr, Some(cluster))
@@ -285,7 +288,7 @@ case class ClickHouseTable(
285288
}
286289

287290
override def truncateTable(): Boolean =
288-
Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
291+
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
289292
engineSpec match {
290293
case DistributedEngineSpec(_, cluster, local_db, local_table, _, _) =>
291294
truncateTable(local_db, local_table, Some(cluster))

spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseRead.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ class ClickHouseScanBuilder(
9797
|$groupByClause
9898
|""".stripMargin
9999
try {
100-
_readSchema = Utils.tryWithResource(NodeClient(scanJob.node)) { implicit nodeClient: NodeClient =>
100+
val queryTimeoutMs = scanJob.readOptions.clientQueryTimeout
101+
_readSchema = Utils.tryWithResource(NodeClient(scanJob.node, queryTimeoutMs)) { implicit nodeClient: NodeClient =>
101102
val fields = (getQueryOutputSchema(aggQuery) zip compiledSelectItems)
102103
.map { case (structField, colExpr) => structField.copy(name = colExpr) }
103104
StructType(fields)
@@ -139,10 +140,12 @@ class ClickHouseBatchScan(scanJob: ScanJobDescription) extends Scan with Batch
139140
val database: String = scanJob.database
140141
val table: String = scanJob.table
141142

143+
private val queryTimeoutMs: Long = scanJob.readOptions.clientQueryTimeout
144+
142145
lazy val inputPartitions: Array[ClickHouseInputPartition] = scanJob.tableEngineSpec match {
143146
case DistributedEngineSpec(_, _, local_db, local_table, _, _) if scanJob.readOptions.convertDistributedToLocal =>
144147
scanJob.cluster.get.shards.flatMap { shardSpec =>
145-
Utils.tryWithResource(NodeClient(shardSpec.nodes.head)) { implicit nodeClient: NodeClient =>
148+
Utils.tryWithResource(NodeClient(shardSpec.nodes.head, queryTimeoutMs)) { implicit nodeClient: NodeClient =>
146149
queryPartitionSpec(local_db, local_table).map { partitionSpec =>
147150
ClickHouseInputPartition(
148151
scanJob.localTableSpec.get,
@@ -166,7 +169,7 @@ class ClickHouseBatchScan(scanJob: ScanJobDescription) extends Scan with Batch
166169
scanJob.node
167170
))
168171
case _: TableEngineSpec =>
169-
Utils.tryWithResource(NodeClient(scanJob.node)) { implicit nodeClient: NodeClient =>
172+
Utils.tryWithResource(NodeClient(scanJob.node, queryTimeoutMs)) { implicit nodeClient: NodeClient =>
170173
queryPartitionSpec(database, table).map { partitionSpec =>
171174
ClickHouseInputPartition(
172175
scanJob.tableSpec,

spark-3.3/clickhouse-spark/src/main/scala/com/clickhouse/spark/read/ClickHouseReader.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ abstract class ClickHouseReader[Record](
4444
// val codec: ClickHouseCompression = scanJob.readOptions.compressionCodec
4545
val readSchema: StructType = scanJob.readSchema
4646

47-
private lazy val nodesClient = NodesClient(part.candidateNodes)
47+
private lazy val nodesClient = NodesClient(part.candidateNodes, scanJob.readOptions.clientQueryTimeout)
4848

4949
def nodeClient: NodeClient = nodesClient.node
5050

0 commit comments

Comments
 (0)