Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.client.api.v1.dto
import org.apache.kyuubi.client.api.v1.dto.{OperationData, OperationProgress, ServerData, SessionData}
import org.apache.kyuubi.config.KyuubiConf.SERVER_SECRET_REDACTION_PATTERN
import org.apache.kyuubi.ha.client.ServiceNodeInfo
import org.apache.kyuubi.operation.KyuubiOperation
import org.apache.kyuubi.session.KyuubiSession

object ApiUtils extends Logging {
def sessionEvent(session: KyuubiSession): dto.KyuubiSessionEvent = {
session.getSessionEvent.map(event =>
session.getSessionEvent.map { event =>
val redactionPattern = session.sessionManager.getConf.get(SERVER_SECRET_REDACTION_PATTERN)
val redactedConf = Utils.redact(redactionPattern, event.conf.toSeq).toMap.asJava
dto.KyuubiSessionEvent.builder()
.sessionId(event.sessionId)
.clientVersion(event.clientVersion)
Expand All @@ -37,7 +40,7 @@ object ApiUtils extends Logging {
.user(event.user)
.clientIp(event.clientIP)
.serverIp(event.serverIP)
.conf(event.conf.asJava)
.conf(redactedConf)
.remoteSessionId(event.remoteSessionId)
.engineId(event.engineId)
.engineName(event.engineName)
Expand All @@ -48,17 +51,20 @@ object ApiUtils extends Logging {
.endTime(event.endTime)
.totalOperations(event.totalOperations)
.exception(event.exception.orNull)
.build()).orNull
.build()
}.orNull
}

def sessionData(session: KyuubiSession): SessionData = {
val sessionEvent = session.getSessionEvent
val redactionPattern = session.sessionManager.getConf.get(SERVER_SECRET_REDACTION_PATTERN)
val redactedConf = Utils.redact(redactionPattern, session.conf.toSeq).toMap.asJava
new SessionData(
session.handle.identifier.toString,
sessionEvent.map(_.remoteSessionId).getOrElse(""),
session.user,
session.ipAddress,
session.conf.asJava,
redactedConf,
session.createTime,
session.lastAccessTime - session.createTime,
session.getNoOperationTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,14 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
throw new ForbiddenException(
s"$userName is not allowed to close the session $sessionHandleStr")
}
fe.be.closeSession(SessionHandle.fromUUID(sessionHandleStr))
Response.ok(s"Session $sessionHandleStr is closed successfully.").build()
val sessionHandle = SessionHandle.fromUUID(sessionHandleStr)
fe.be.sessionManager.getSessionOption(sessionHandle) match {
case Some(_) =>
fe.be.closeSession(sessionHandle)
Response.ok(s"Session $sessionHandleStr is closed successfully.").build()
case None =>
throw new NotFoundException(s"Invalid session handle: $sessionHandleStr")
}
}

@ApiResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
content = Array(new Content(
mediaType = MediaType.APPLICATION_JSON,
array = new ArraySchema(schema = new Schema(implementation = classOf[SessionData])))),
description = "get the list of all live sessions")
description = "get the list of live sessions for the current user")
@GET
def sessions(): Seq[SessionData] = {
sessionManager.allSessions()
.map(session => ApiUtils.sessionData(session.asInstanceOf[KyuubiSession])).toSeq
val userName = fe.getSessionUser(Map.empty[String, String])
sessionManager
.allSessions()
.filter(session => session.user == userName)
.map(session => ApiUtils.sessionData(session.asInstanceOf[KyuubiSession]))
.toSeq
}

@ApiResponse(
Expand Down Expand Up @@ -157,8 +161,14 @@ private[v1] class SessionsResource extends ApiRequestContext with Logging {
@Path("{sessionHandle}")
def closeSession(@PathParam("sessionHandle") sessionHandleStr: String): Response = {
info(s"Received request of closing $sessionHandleStr")
fe.be.closeSession(sessionHandleStr)
Response.ok().build()
val sessionHandle = SessionHandle.fromUUID(sessionHandleStr)
fe.be.sessionManager.getSessionOption(sessionHandle) match {
case Some(_) =>
fe.be.closeSession(sessionHandle)
Response.ok().build()
case None =>
throw new NotFoundException(s"Invalid session handle: $sessionHandleStr")
}
}

@ApiResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(sessions2.isEmpty)
}

test("delete non-existent admin session returns 404") {
val nonExistentSessionId = java.util.UUID.randomUUID().toString
val response = webTarget.path(s"api/v1/admin/sessions/$nonExistentSessionId").request()
.header(AUTHORIZATION_HEADER, HttpAuthUtils.basicAuthorizationHeader(Utils.currentUser))
.delete()
assert(response.getStatus === 404)
assert(response.readEntity(classOf[String]).contains("Invalid"))
}

test("list sessions/operations with filter") {
fe.be.openSession(
HIVE_CLI_SERVICE_PROTOCOL_V2,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ import org.apache.kyuubi.session.SessionType

class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {

override protected lazy val conf: KyuubiConf = {
val c = KyuubiConf()
c.set(KyuubiConf.SERVER_SECRET_REDACTION_PATTERN, "(?i)password".r)
c
}

override protected def beforeEach(): Unit = {
super.beforeEach()
eventually(timeout(10.seconds), interval(200.milliseconds)) {
Expand Down Expand Up @@ -389,4 +395,27 @@ class SessionsResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
assert(operations.size == 1)
assert(sessionHandle.toString.equals(operations.head.getSessionId))
}

test("get /sessions returns redacted spark confs") {
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test does not configure SERVER_SECRET_REDACTION_PATTERN (via conf.set(SERVER_SECRET_REDACTION_PATTERN, ...)) before asserting that the sensitive config value is redacted. Since this config entry has no default value (createOptional), the redaction mechanism will not activate, and sensitiveValue will be returned unredacted. The test needs to set the redaction pattern (e.g., "(?i)password".r) as part of the test setup or override the server config in WithKyuubiServer.

Suggested change
test("get /sessions returns redacted spark confs") {
test("get /sessions returns redacted spark confs") {
// Configure redaction so that sensitive configs like spark.password are masked
conf.set(KyuubiConf.SERVER_SECRET_REDACTION_PATTERN, "(?i)password".r)

Copilot uses AI. Check for mistakes.
val sensitiveKey = "spark.password"
val sensitiveValue = "superSecret123"
val requestObj = new SessionOpenRequest(Map(sensitiveKey -> sensitiveValue).asJava)

val response = webTarget.path("api/v1/sessions")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(requestObj, MediaType.APPLICATION_JSON_TYPE))
assert(200 == response.getStatus)
val sessionHandle = response.readEntity(classOf[SessionHandle]).getIdentifier

val response2 = webTarget.path("api/v1/sessions").request().get()
assert(200 == response2.getStatus)
val sessions = response2.readEntity(new GenericType[Seq[SessionData]]() {})
val sessionConf = sessions.find(_.getIdentifier == sessionHandle.toString).get.getConf

assert(sessionConf.get(sensitiveKey) != sensitiveValue)
assert(sessionConf.get(sensitiveKey) == "*********(redacted)")

val delResp = webTarget.path(s"api/v1/sessions/$sessionHandle").request().delete()
assert(200 == delResp.getStatus)
}
}
Loading