Skip to content

Commit 4e5f7c5

Browse files
author
rockyyin
committed
Support user allowlist for connection access control
Add user allowlist feature (kyuubi.server.limit.connections.user.allowlist) that restricts which users can connect to Kyuubi server. Changes: - Add SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST config in KyuubiConf - Add userAllowlist field in SessionLimiterWithAccessControlListImpl - Add user allowlist check in SessionLimiter.increment() - Add getUserAllowlist/refreshUserAllowlist in KyuubiSessionManager - Add refreshUserAllowlist() in KyuubiServer - Add REST API endpoint POST /api/v1/admin/refresh/user_allowlist - Add 5 test cases in SessionLimiterSuite - When user.deny.list and user.allowlist both contain the same user, deny list takes higher priority
1 parent 7027bf6 commit 4e5f7c5

6 files changed

Lines changed: 209 additions & 8 deletions

File tree

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3296,6 +3296,19 @@ object KyuubiConf {
32963296
.toSet()
32973297
.createWithDefault(Set.empty)
32983298

3299+
val SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST: ConfigEntry[Set[String]] =
3300+
buildConf("kyuubi.server.limit.connections.user.allowlist")
3301+
.doc("When this list is not empty, only the user in the allow list will be" +
3302+
" permitted to connect to kyuubi server, all other users will be denied." +
3303+
" If this list is empty (default), no user allowlist restriction is applied." +
3304+
" Note: if a user is in both user.allowlist and user.deny.list," +
3305+
" the deny list takes higher priority.")
3306+
.version("1.10.0")
3307+
.serverOnly
3308+
.stringConf
3309+
.toSet()
3310+
.createWithDefault(Set.empty)
3311+
32993312
val SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
33003313
buildConf("kyuubi.server.limit.batch.connections.per.user")
33013314
.doc("Maximum kyuubi server batch connections per user." +

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,14 @@ object KyuubiServer extends Logging {
183183
info(s"Refreshed ip allowlist from $existingIpAllowlist to $refreshedIpAllowlist")
184184
}
185185

186+
private[kyuubi] def refreshUserAllowlist(): Unit = synchronized {
187+
val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
188+
val existingUserAllowlist = sessionMgr.getUserAllowlist
189+
sessionMgr.refreshUserAllowlist(createKyuubiConf())
190+
val refreshedUserAllowlist = sessionMgr.getUserAllowlist
191+
info(s"Refreshed user allowlist from $existingUserAllowlist to $refreshedUserAllowlist")
192+
}
193+
186194
private def createKyuubiConf(): KyuubiConf = {
187195
KyuubiConf().loadFileDefaults().loadFromArgs(commandArgs)
188196
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
180180
Response.ok(s"Refresh the ip allowlist successfully.").build()
181181
}
182182

183+
@ApiResponse(
184+
responseCode = "200",
185+
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
186+
description = "refresh the user allowlist")
187+
@POST
188+
@Path("refresh/user_allowlist")
189+
def refreshUserAllowlist(): Response = {
190+
val userName = fe.getSessionUser(Map.empty[String, String])
191+
val ipAddress = fe.getIpAddress
192+
info(s"Receive refresh user allowlist request from $userName/$ipAddress")
193+
if (!fe.isAdministrator(userName)) {
194+
throw new ForbiddenException(
195+
s"$userName is not allowed to refresh the user allowlist")
196+
}
197+
info(s"Reloading user allowlist")
198+
KyuubiServer.refreshUserAllowlist()
199+
Response.ok(s"Refresh the user allowlist successfully.").build()
200+
}
201+
183202
@ApiResponse(
184203
responseCode = "200",
185204
content = Array(new Content(

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,16 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
397397
val userDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_USER_DENY_LIST).filter(_.nonEmpty)
398398
val ipDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_IP_DENY_LIST).filter(_.nonEmpty)
399399
val ipAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty)
400+
val userAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty)
400401
limiter = applySessionLimiter(
401402
userLimit,
402403
ipAddressLimit,
403404
userIpAddressLimit,
404405
userUnlimitedList,
405406
userDenyList,
406407
ipDenyList,
407-
ipAllowlist)
408+
ipAllowlist,
409+
userAllowlist)
408410

409411
val userBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER).getOrElse(0)
410412
val ipAddressBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_IPADDRESS).getOrElse(0)
@@ -417,7 +419,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
417419
userUnlimitedList,
418420
userDenyList,
419421
ipDenyList,
420-
ipAllowlist)
422+
ipAllowlist,
423+
userAllowlist)
421424
}
422425

423426
private[kyuubi] def getUnlimitedUsers: Set[String] = {
@@ -462,24 +465,38 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
462465
batchLimiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist))
463466
}
464467

468+
private[kyuubi] def getUserAllowlist: Set[String] = {
469+
limiter.orElse(batchLimiter).map(SessionLimiter.getUserAllowlist).getOrElse(Set.empty)
470+
}
471+
472+
private[kyuubi] def refreshUserAllowlist(conf: KyuubiConf): Unit = {
473+
val userAllowlist =
474+
conf.get(SERVER_LIMIT_CONNECTIONS_USER_ALLOWLIST).filter(_.nonEmpty)
475+
limiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist))
476+
batchLimiter.foreach(SessionLimiter.resetUserAllowlist(_, userAllowlist))
477+
}
478+
465479
private def applySessionLimiter(
466480
userLimit: Int,
467481
ipAddressLimit: Int,
468482
userIpAddressLimit: Int,
469483
userUnlimitedList: Set[String],
470484
userDenyList: Set[String],
471485
ipDenyList: Set[String],
472-
ipAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = {
486+
ipAllowlist: Set[String] = Set.empty,
487+
userAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = {
473488
if (Seq(userLimit, ipAddressLimit, userIpAddressLimit).exists(_ > 0) ||
474-
userDenyList.nonEmpty || ipDenyList.nonEmpty || ipAllowlist.nonEmpty) {
489+
userDenyList.nonEmpty || ipDenyList.nonEmpty ||
490+
ipAllowlist.nonEmpty || userAllowlist.nonEmpty) {
475491
Some(SessionLimiter(
476492
userLimit,
477493
ipAddressLimit,
478494
userIpAddressLimit,
479495
userUnlimitedList,
480496
userDenyList,
481497
ipDenyList,
482-
ipAllowlist))
498+
ipAllowlist,
499+
userAllowlist))
483500
} else {
484501
None
485502
}

kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ class SessionLimiterWithAccessControlListImpl(
109109
var unlimitedUsers: Set[String],
110110
var denyUsers: Set[String],
111111
var denyIps: Set[String],
112-
var ipAllowlist: Set[String] = Set.empty)
112+
var ipAllowlist: Set[String] = Set.empty,
113+
var userAllowlist: Set[String] = Set.empty)
113114
extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) {
114115
override def increment(userIpAddress: UserIpAddress): Unit = {
115116
val user = userIpAddress.user
@@ -124,6 +125,13 @@ class SessionLimiterWithAccessControlListImpl(
124125
s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)"
125126
throw KyuubiSQLException(errorMsg)
126127
}
128+
// user allowlist check: when allowlist is not empty, only allowed users can connect
129+
if (userAllowlist.nonEmpty && StringUtils.isNotBlank(user) &&
130+
!userAllowlist.contains(user)) {
131+
val errorMsg =
132+
s"Connection denied because the user is not in the user allowlist. (user: $user)"
133+
throw KyuubiSQLException(errorMsg)
134+
}
127135
// ip allowlist check: when allowlist is not empty, only allowed ips can connect
128136
if (ipAllowlist.nonEmpty && StringUtils.isNotBlank(ip) && !ipAllowlist.contains(ip)) {
129137
val errorMsg =
@@ -151,6 +159,10 @@ class SessionLimiterWithAccessControlListImpl(
151159
private[kyuubi] def setIpAllowlist(ipAllowlist: Set[String]): Unit = {
152160
this.ipAllowlist = ipAllowlist
153161
}
162+
163+
private[kyuubi] def setUserAllowlist(userAllowlist: Set[String]): Unit = {
164+
this.userAllowlist = userAllowlist
165+
}
154166
}
155167

156168
object SessionLimiter {
@@ -162,15 +174,17 @@ object SessionLimiter {
162174
unlimitedUsers: Set[String] = Set.empty,
163175
denyUsers: Set[String] = Set.empty,
164176
denyIps: Set[String] = Set.empty,
165-
ipAllowlist: Set[String] = Set.empty): SessionLimiter = {
177+
ipAllowlist: Set[String] = Set.empty,
178+
userAllowlist: Set[String] = Set.empty): SessionLimiter = {
166179
new SessionLimiterWithAccessControlListImpl(
167180
userLimit,
168181
ipAddressLimit,
169182
userIpAddressLimit,
170183
unlimitedUsers,
171184
denyUsers,
172185
denyIps,
173-
ipAllowlist)
186+
ipAllowlist,
187+
userAllowlist)
174188
}
175189

176190
def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit =
@@ -216,4 +230,15 @@ object SessionLimiter {
216230
case l: SessionLimiterWithAccessControlListImpl => l.ipAllowlist
217231
case _ => Set.empty
218232
}
233+
234+
def resetUserAllowlist(limiter: SessionLimiter, userAllowlist: Set[String]): Unit =
235+
limiter match {
236+
case l: SessionLimiterWithAccessControlListImpl => l.setUserAllowlist(userAllowlist)
237+
case _ =>
238+
}
239+
240+
def getUserAllowlist(limiter: SessionLimiter): Set[String] = limiter match {
241+
case l: SessionLimiterWithAccessControlListImpl => l.userAllowlist
242+
case _ => Set.empty
243+
}
219244
}

kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,4 +328,123 @@ class SessionLimiterSuite extends KyuubiFunSuite {
328328
SessionLimiter.resetIpAllowlist(limiter, Set.empty)
329329
limiter.increment(UserIpAddress("user003", "172.16.0.1"))
330330
}
331+
332+
test("test session limiter with user allowlist") {
333+
val allowedUser = "user001"
334+
val blockedUser = "user002"
335+
val userAllowlist = Set(allowedUser)
336+
val limiter = SessionLimiter(
337+
100,
338+
100,
339+
100,
340+
Set.empty,
341+
Set.empty,
342+
Set.empty,
343+
Set.empty,
344+
userAllowlist)
345+
346+
// allowed user should be able to connect
347+
limiter.increment(UserIpAddress(allowedUser, "10.0.0.1"))
348+
349+
// blocked user should be denied
350+
val caught = intercept[KyuubiSQLException] {
351+
limiter.increment(UserIpAddress(blockedUser, "10.0.0.1"))
352+
}
353+
assert(caught.getMessage.equals(
354+
s"Connection denied because the user is not in the user allowlist." +
355+
s" (user: $blockedUser)"))
356+
}
357+
358+
test("test session limiter user allowlist with multiple users") {
359+
val allowedUser1 = "user001"
360+
val allowedUser2 = "user002"
361+
val blockedUser = "user003"
362+
val userAllowlist = Set(allowedUser1, allowedUser2)
363+
val limiter = SessionLimiter(
364+
100,
365+
100,
366+
100,
367+
Set.empty,
368+
Set.empty,
369+
Set.empty,
370+
Set.empty,
371+
userAllowlist)
372+
373+
// both allowed users should be able to connect
374+
limiter.increment(UserIpAddress(allowedUser1, "10.0.0.1"))
375+
limiter.increment(UserIpAddress(allowedUser2, "10.0.0.2"))
376+
377+
// blocked user should be denied
378+
val caught = intercept[KyuubiSQLException] {
379+
limiter.increment(UserIpAddress(blockedUser, "192.168.1.100"))
380+
}
381+
assert(caught.getMessage.contains("not in the user allowlist"))
382+
}
383+
384+
test("test session limiter empty user allowlist allows all users") {
385+
val limiter = SessionLimiter(
386+
100,
387+
100,
388+
100,
389+
Set.empty,
390+
Set.empty,
391+
Set.empty,
392+
Set.empty,
393+
Set.empty)
394+
395+
// when allowlist is empty, all users should be allowed
396+
limiter.increment(UserIpAddress("user001", "10.0.0.1"))
397+
limiter.increment(UserIpAddress("user002", "192.168.1.100"))
398+
limiter.increment(UserIpAddress("user003", "172.16.0.1"))
399+
}
400+
401+
test("test session limiter user deny list has higher priority than user allowlist") {
402+
val user = "user001"
403+
val denyUsers = Set(user)
404+
val userAllowlist = Set(user)
405+
val limiter = SessionLimiter(
406+
100,
407+
100,
408+
100,
409+
Set.empty,
410+
denyUsers,
411+
Set.empty,
412+
Set.empty,
413+
userAllowlist)
414+
415+
// deny user list check happens before allowlist check
416+
val caught = intercept[KyuubiSQLException] {
417+
limiter.increment(UserIpAddress(user, "10.0.0.1"))
418+
}
419+
assert(caught.getMessage.equals(
420+
s"Connection denied because the user is in the deny user list. (user: $user)"))
421+
}
422+
423+
test("test refresh user allowlist") {
424+
val allowedUser = "user001"
425+
val blockedUser = "user002"
426+
val limiter = SessionLimiter(
427+
100,
428+
100,
429+
100,
430+
Set.empty,
431+
Set.empty,
432+
Set.empty,
433+
Set.empty,
434+
Set(allowedUser))
435+
436+
// initially only allowedUser can connect
437+
limiter.increment(UserIpAddress(allowedUser, "10.0.0.1"))
438+
intercept[KyuubiSQLException] {
439+
limiter.increment(UserIpAddress(blockedUser, "10.0.0.1"))
440+
}
441+
442+
// refresh allowlist to include blockedUser
443+
SessionLimiter.resetUserAllowlist(limiter, Set(allowedUser, blockedUser))
444+
limiter.increment(UserIpAddress(blockedUser, "10.0.0.1"))
445+
446+
// refresh allowlist to empty (allow all)
447+
SessionLimiter.resetUserAllowlist(limiter, Set.empty)
448+
limiter.increment(UserIpAddress("user003", "172.16.0.1"))
449+
}
331450
}

0 commit comments

Comments
 (0)