Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import scala.collection.JavaConverters._
import scala.util.Random._

object ClusterClient {
def apply(cluster: ClusterSpec) = new ClusterClient(cluster)
def apply(cluster: ClusterSpec, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS): ClusterClient =
new ClusterClient(cluster, queryTimeoutMs)
}

class ClusterClient(cluster: ClusterSpec) extends AutoCloseable with Logging {
class ClusterClient(cluster: ClusterSpec, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS)
extends AutoCloseable with Logging {

@transient lazy val cache = new ConcurrentHashMap[(Int, Int), NodeClient]

Expand All @@ -53,7 +55,7 @@ class ClusterClient(cluster: ClusterSpec) extends AutoCloseable with Logging {
val replicaSpec = shardSpec.replicas.find(_.num == r).get
val nodeSpec = replicaSpec.node
log.info(s"Create client to $nodeSpec, shard $s replica $r")
new NodeClient(nodeSpec)
new NodeClient(nodeSpec, queryTimeoutMs)
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ import java.util.UUID
import scala.util.{Failure, Success, Try}

object NodeClient {
def apply(node: NodeSpec): NodeClient = new NodeClient(node)
val DEFAULT_QUERY_TIMEOUT_MS: Long = 60000L

def apply(node: NodeSpec, queryTimeoutMs: Long = DEFAULT_QUERY_TIMEOUT_MS): NodeClient =
new NodeClient(node, queryTimeoutMs)
}

class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
// TODO: add configurable timeout
private val timeout: Int = 60000
class NodeClient(val nodeSpec: NodeSpec, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS)
extends AutoCloseable with Logging {

private lazy val userAgent: String = {
val title = getClass.getPackage.getImplementationTitle
Expand Down Expand Up @@ -199,7 +201,7 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
querySettings.setFormat(clickHouseFormat)
querySettings.setQueryId(queryId)
settings.foreach { case (k, v) => querySettings.setOption(k, v) }
Try(client.query(sql, querySettings).get(timeout, TimeUnit.MILLISECONDS)) match {
Try(client.query(sql, querySettings).get(queryTimeoutMs, TimeUnit.MILLISECONDS)) match {
case Success(response: QueryResponse) => Right(deserializer(response.getInputStream))
case Failure(se: ServerException) => Left(CHServerException(se.getCode, se.getMessage, Some(nodeSpec), Some(se)))
case Failure(ex: Exception) => Left(CHClientException(ex.getMessage, Some(nodeSpec), Some(ex)))
Expand Down Expand Up @@ -233,7 +235,7 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
querySettings.setQueryId(queryId)
settings.foreach { case (k, v) => querySettings.setOption(k, v) }

Try(client.query(sql, querySettings).get(timeout, TimeUnit.MILLISECONDS)) match {
Try(client.query(sql, querySettings).get(queryTimeoutMs, TimeUnit.MILLISECONDS)) match {
case Success(response: QueryResponse) => response
case Failure(se: ServerException) => throw CHServerException(se.getCode, se.getMessage, Some(nodeSpec), Some(se))
case Failure(ex: Exception) => throw CHClientException(ex.getMessage, Some(nodeSpec), Some(ex))
Expand All @@ -249,6 +251,6 @@ class NodeClient(val nodeSpec: NodeSpec) extends AutoCloseable with Logging {
|$sql
|""".stripMargin
)
def ping(timeout: Int = timeout) =
client.ping(timeout)
def ping(timeoutMs: Long = queryTimeoutMs) =
client.ping(timeoutMs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import scala.collection.JavaConverters._
import scala.util.Random.shuffle

object NodesClient {
def apply(nodes: Nodes) = new NodesClient(nodes)
def apply(nodes: Nodes, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS): NodesClient =
new NodesClient(nodes, queryTimeoutMs)
}

class NodesClient(nodes: Nodes) extends AutoCloseable with Logging {
class NodesClient(nodes: Nodes, queryTimeoutMs: Long = NodeClient.DEFAULT_QUERY_TIMEOUT_MS)
extends AutoCloseable with Logging {
assert(nodes.nodes.nonEmpty)

@transient lazy val cache = new ConcurrentHashMap[NodeSpec, NodeClient]
Expand All @@ -37,7 +39,7 @@ class NodesClient(nodes: Nodes) extends AutoCloseable with Logging {
nodeSpec,
{ nodeSpec =>
log.info(s"Create client of $nodeSpec")
new NodeClient(nodeSpec)
new NodeClient(nodeSpec, queryTimeoutMs)
}
)
}
Expand Down
1 change: 1 addition & 0 deletions docs/configurations/02_sql_configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ license: |
<!--begin-include-->
|Key | Default | Description | Since
|--- | ------- | ----------- | -----
spark.clickhouse.client.queryTimeout|60s|The maximum time the ClickHouse client will wait for a single query or ping operation to complete on a NodeClient. Applied as a future-handle timeout on every client.query(...) and client.ping(...) call.|0.10.1
spark.clickhouse.ignoreUnsupportedTransform|true|ClickHouse supports using complex expressions as sharding keys or partition values, e.g. `cityHash64(col_1, col_2)`, and those can not be supported by Spark now. If `true`, ignore the unsupported expressions and log a warning, otherwise fail fast w/ an exception. Note, when `spark.clickhouse.write.distributed.convertLocal` is enabled, ignoring unsupported sharding keys may corrupt the data.|0.4.0
spark.clickhouse.read.compression.codec|lz4|The codec used to decompress data for reading. Supported codecs: none, lz4.|0.5.0
spark.clickhouse.read.distributed.convertLocal|true|When reading Distributed table, read local table instead of itself. If `true`, ignore `spark.clickhouse.read.distributed.useClusterNodes`.|0.1.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ClickHouseCatalog extends TableCatalog
this.catalogName = name
this.nodeSpec = buildNodeSpec(options)
this.currentDb = nodeSpec.database
this.nodeClient = NodeClient(nodeSpec)
this.nodeClient = NodeClient(nodeSpec, clientQueryTimeoutMs)

this.nodeClient.syncQueryAndCheckOutputJSONEachRow("SELECT 1")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.clickhouse.spark.client.NodeClient
class ClickHouseCommandRunner extends ExternalCommandRunner with ClickHouseHelper {

override def executeCommand(sql: String, options: CaseInsensitiveStringMap): Array[String] =
Utils.tryWithResource(NodeClient(buildNodeSpec(options))) { nodeClient =>
Utils.tryWithResource(NodeClient(buildNodeSpec(options), clientQueryTimeoutMs)) { nodeClient =>
nodeClient.syncQueryAndCheckOutputJSONEachRow(sql).records.map(_.toString).toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package com.clickhouse.spark
import com.clickhouse.client.ClickHouseProtocol
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.CLIENT_QUERY_TIMEOUT
import org.apache.spark.sql.clickhouse.SchemaUtils
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType
Expand All @@ -32,14 +34,16 @@ import java.time.{LocalDateTime, ZoneId}
import java.util.{HashMap => JHashMap}
import scala.collection.JavaConverters._

trait ClickHouseHelper extends Logging {
trait ClickHouseHelper extends SQLConfHelper with Logging {

@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_DATABASE: String => Unit =
(db: String) => throw NoSuchNamespaceException(db)

@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_TABLE: (String, String) => Unit =
(database, table) => throw NoSuchTableException(s"$database.$table")

def clientQueryTimeoutMs: Long = conf.getConf(CLIENT_QUERY_TIMEOUT)

def unwrap(ident: Identifier): Option[(String, String)] = ident.namespace() match {
case Array(database) => Some((database, ident.name()))
case _ => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ case class ClickHouseTable(

lazy val (localTableSpec, localTableEngineSpec): (Option[TableSpec], Option[MergeTreeFamilyEngineSpec]) =
engineSpec match {
case distSpec: DistributedEngineSpec => Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
val _localTableSpec = queryTableSpec(distSpec.local_db, distSpec.local_table)
val _localTableEngineSpec =
TableEngineUtils.resolveTableEngine(_localTableSpec).asInstanceOf[MergeTreeFamilyEngineSpec]
(Some(_localTableSpec), Some(_localTableEngineSpec))
case distSpec: DistributedEngineSpec => Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) {
implicit nodeClient =>
val _localTableSpec = queryTableSpec(distSpec.local_db, distSpec.local_table)
val _localTableEngineSpec =
TableEngineUtils.resolveTableEngine(_localTableSpec).asInstanceOf[MergeTreeFamilyEngineSpec]
(Some(_localTableSpec), Some(_localTableEngineSpec))
}
case _ => (None, None)
}
Expand Down Expand Up @@ -105,8 +106,9 @@ case class ClickHouseTable(
ACCEPT_ANY_SCHEMA // TODO check schema and handle extra columns before writing
).asJava

override lazy val schema: StructType = Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
queryTableSchema(database, table)
override lazy val schema: StructType = Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) {
implicit nodeClient =>
queryTableSchema(database, table)
}

/**
Expand Down Expand Up @@ -192,7 +194,7 @@ case class ClickHouseTable(
}
}.mkString("(", ",", ")")

Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
engineSpec match {
case DistributedEngineSpec(_, cluster, local_db, local_table, _, _) =>
dropPartition(local_db, local_table, partitionExpr, Some(cluster))
Expand Down Expand Up @@ -235,12 +237,13 @@ case class ClickHouseTable(
val partitionSpecs: Seq[PartitionSpec] = engineSpec match {
case DistributedEngineSpec(_, _, local_db, local_table, _, _) =>
cluster.get.shards.flatMap { shardSpec =>
Utils.tryWithResource(NodeClient(shardSpec.nodes.head)) { implicit nodeClient: NodeClient =>
queryPartitionSpec(local_db, local_table)
Utils.tryWithResource(NodeClient(shardSpec.nodes.head, clientQueryTimeoutMs)) {
implicit nodeClient: NodeClient =>
queryPartitionSpec(local_db, local_table)
}
}
case _ =>
Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
queryPartitionSpec(database, table)
}
}
Expand Down Expand Up @@ -274,7 +277,7 @@ case class ClickHouseTable(

override def deleteWhere(filters: Array[Filter]): Unit = {
val deleteExpr = compileFilters(AlwaysTrue :: filters.toList)
Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
engineSpec match {
case DistributedEngineSpec(_, cluster, local_db, local_table, _, _) =>
delete(local_db, local_table, deleteExpr, Some(cluster))
Expand All @@ -285,7 +288,7 @@ case class ClickHouseTable(
}

override def truncateTable(): Boolean =
Utils.tryWithResource(NodeClient(node)) { implicit nodeClient =>
Utils.tryWithResource(NodeClient(node, clientQueryTimeoutMs)) { implicit nodeClient =>
engineSpec match {
case DistributedEngineSpec(_, cluster, local_db, local_table, _, _) =>
truncateTable(local_db, local_table, Some(cluster))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ class ClickHouseScanBuilder(
|$groupByClause
|""".stripMargin
try {
_readSchema = Utils.tryWithResource(NodeClient(scanJob.node)) { implicit nodeClient: NodeClient =>
val queryTimeoutMs = scanJob.readOptions.clientQueryTimeout
_readSchema = Utils.tryWithResource(NodeClient(scanJob.node, queryTimeoutMs)) { implicit nodeClient: NodeClient =>
val fields = (getQueryOutputSchema(aggQuery) zip compiledSelectItems)
.map { case (structField, colExpr) => structField.copy(name = colExpr) }
StructType(fields)
Expand Down Expand Up @@ -139,10 +140,12 @@ class ClickHouseBatchScan(scanJob: ScanJobDescription) extends Scan with Batch
val database: String = scanJob.database
val table: String = scanJob.table

private val queryTimeoutMs: Long = scanJob.readOptions.clientQueryTimeout

lazy val inputPartitions: Array[ClickHouseInputPartition] = scanJob.tableEngineSpec match {
case DistributedEngineSpec(_, _, local_db, local_table, _, _) if scanJob.readOptions.convertDistributedToLocal =>
scanJob.cluster.get.shards.flatMap { shardSpec =>
Utils.tryWithResource(NodeClient(shardSpec.nodes.head)) { implicit nodeClient: NodeClient =>
Utils.tryWithResource(NodeClient(shardSpec.nodes.head, queryTimeoutMs)) { implicit nodeClient: NodeClient =>
queryPartitionSpec(local_db, local_table).map { partitionSpec =>
ClickHouseInputPartition(
scanJob.localTableSpec.get,
Expand All @@ -166,7 +169,7 @@ class ClickHouseBatchScan(scanJob: ScanJobDescription) extends Scan with Batch
scanJob.node
))
case _: TableEngineSpec =>
Utils.tryWithResource(NodeClient(scanJob.node)) { implicit nodeClient: NodeClient =>
Utils.tryWithResource(NodeClient(scanJob.node, queryTimeoutMs)) { implicit nodeClient: NodeClient =>
queryPartitionSpec(database, table).map { partitionSpec =>
ClickHouseInputPartition(
scanJob.tableSpec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ abstract class ClickHouseReader[Record](
// val codec: ClickHouseCompression = scanJob.readOptions.compressionCodec
val readSchema: StructType = scanJob.readSchema

private lazy val nodesClient = NodesClient(part.candidateNodes)
private lazy val nodesClient = NodesClient(part.candidateNodes, scanJob.readOptions.clientQueryTimeout)

def nodeClient: NodeClient = nodesClient.node

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ class ClickHouseBatchWrite(

log.info(s"Truncating table ${writeJob.targetDatabase(false)}.${writeJob.targetTable(false)} for overwrite mode")

Utils.tryWithResource(NodeClient(writeJob.node)) { implicit nodeClient =>
val queryTimeoutMs = writeJob.writeOptions.clientQueryTimeout
Utils.tryWithResource(NodeClient(writeJob.node, queryTimeoutMs)) { implicit nodeClient =>
writeJob.tableEngineSpec match {
case DistributedEngineSpec(_, cluster, local_db, local_table, _, _) =>
val sql = s"TRUNCATE TABLE IF EXISTS `$local_db`.`$local_table` ON CLUSTER `$cluster`"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
.filter(_ => writeJob.writeOptions.convertDistributedToLocal)
.map(expr => SafeProjection.create(Seq(expr)))

private val queryTimeoutMs: Long = writeJob.writeOptions.clientQueryTimeout

// put the node select strategy in executor side because we need to calculate shard and don't know the records
// util DataWriter#write(InternalRow) invoked.
protected lazy val client: Either[ClusterClient, NodeClient] =
Expand All @@ -85,11 +87,11 @@ abstract class ClickHouseWriter(writeJob: WriteJobDescription)
val clusterSpec = writeJob.cluster.get
log.info(s"Connect to cluster ${clusterSpec.name}, which has ${clusterSpec.shards.length} shards and " +
s"${clusterSpec.nodes.length} nodes.")
Left(ClusterClient(clusterSpec))
Left(ClusterClient(clusterSpec, queryTimeoutMs))
case _ =>
val nodeSpec = writeJob.node
log.info(s"Connect to single node: $nodeSpec")
Right(NodeClient(nodeSpec))
Right(NodeClient(nodeSpec, queryTimeoutMs))
}

def nodeClient(shardNum: Option[Int]): NodeClient = client match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,4 +231,14 @@ object ClickHouseSQLConf {
.transform(_.toLowerCase)
.createOptional

val CLIENT_QUERY_TIMEOUT: ConfigEntry[Long] =
buildConf("spark.clickhouse.client.queryTimeout")
.doc("The maximum time the ClickHouse client will wait for a single query or ping " +
"operation to complete on a NodeClient. Applied as a future-handle timeout on every " +
"client.query(...) and client.ping(...) call.")
.version("0.10.1")
.timeConf(TimeUnit.MILLISECONDS)
.checkValue(_ > 0, "`spark.clickhouse.client.queryTimeout` should be positive.")
.createWithDefaultString("60s")

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ trait SparkOptions extends SQLConfHelper with Serializable {

protected def eval[T](key: String, entry: ConfigEntry[T]): T =
Option(options.get(key)).map(entry.valueConverter).getOrElse(conf.getConf(entry))

def clientQueryTimeout: Long =
eval(CLIENT_QUERY_TIMEOUT.key, CLIENT_QUERY_TIMEOUT)
}

class ReadOptions(_options: JMap[String, String]) extends SparkOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

package org.apache.spark.sql.clickhouse

import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.CLIENT_QUERY_TIMEOUT
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.scalatest.funsuite.AnyFunSuite
import com.clickhouse.spark.ClickHouseHelper
Expand All @@ -33,4 +35,15 @@ class ClickHouseHelperSuite extends AnyFunSuite with ClickHouseHelper {
assert(nodeSpec.database === "testing")
assert(nodeSpec.options.get("ssl") === "true")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I have missed. Should we run a test to verify our default timeout value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, I'll add it.

test("client query timeout uses SQLConf") {
val conf = SQLConf.get
val original = conf.getConf(CLIENT_QUERY_TIMEOUT)
assert(original === 60000L)
try {
conf.setConfString(CLIENT_QUERY_TIMEOUT.key, "1234ms")
assert(clientQueryTimeoutMs === 1234L)
} finally
conf.setConfString(CLIENT_QUERY_TIMEOUT.key, s"${original}ms")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class ClickHouseCatalog extends TableCatalog
this.catalogName = name
this.nodeSpec = buildNodeSpec(options)
this.currentDb = nodeSpec.database
this.nodeClient = NodeClient(nodeSpec)
this.nodeClient = NodeClient(nodeSpec, clientQueryTimeoutMs)

this.nodeClient.syncQueryAndCheckOutputJSONEachRow("SELECT 1")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.clickhouse.spark.client.NodeClient
class ClickHouseCommandRunner extends ExternalCommandRunner with ClickHouseHelper {

override def executeCommand(sql: String, options: CaseInsensitiveStringMap): Array[String] =
Utils.tryWithResource(NodeClient(buildNodeSpec(options))) { nodeClient =>
Utils.tryWithResource(NodeClient(buildNodeSpec(options), clientQueryTimeoutMs)) { nodeClient =>
nodeClient.syncQueryAndCheckOutputJSONEachRow(sql).records.map(_.toString).toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package com.clickhouse.spark
import com.clickhouse.client.ClickHouseProtocol
import com.fasterxml.jackson.databind.JsonNode
import com.fasterxml.jackson.databind.node.NullNode
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException}
import org.apache.spark.sql.clickhouse.ClickHouseSQLConf.CLIENT_QUERY_TIMEOUT
import org.apache.spark.sql.clickhouse.SchemaUtils
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.StructType
Expand All @@ -32,14 +34,16 @@ import java.time.{LocalDateTime, ZoneId}
import java.util.{HashMap => JHashMap}
import scala.collection.JavaConverters._

trait ClickHouseHelper extends Logging {
trait ClickHouseHelper extends SQLConfHelper with Logging {

@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_DATABASE: String => Unit =
(db: String) => throw new NoSuchNamespaceException(db)

@volatile lazy val DEFAULT_ACTION_IF_NO_SUCH_TABLE: (String, String) => Unit =
(database, table) => throw new NoSuchTableException(s"$database.$table")

def clientQueryTimeoutMs: Long = conf.getConf(CLIENT_QUERY_TIMEOUT)

def unwrap(ident: Identifier): Option[(String, String)] = ident.namespace() match {
case Array(database) => Some((database, ident.name()))
case _ => None
Expand Down
Loading
Loading