Skip to content

Commit 7027bf6

Browse files
author
rockyyin
committed
[KYUUBI #XXXX] Support IP client allowlist for connection access control
### _Why are the changes needed?_ Currently, Kyuubi supports IP deny list (ip.deny.list) to block specific IPs from connecting. However, in some security-sensitive environments, administrators need the opposite approach - only allowing specific trusted IPs to connect (allowlist/whitelist pattern). This is a common security requirement for production deployments. ### _How was this patch tested?_ - Added 5 unit test cases in SessionLimiterSuite: - test session limiter with ip allowlist - test session limiter ip allowlist with multiple ips - test session limiter empty ip allowlist allows all ips - test session limiter ip deny list has higher priority than ip allowlist - test refresh ip allowlist ### _Was this patch authored or co-authored using generative AI tooling?_ No ### Changes: - Add SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST config in KyuubiConf - Add ipAllowlist field in SessionLimiterWithAccessControlListImpl - Add ip allowlist check in SessionLimiter.increment() - Add getIpAllowlist/refreshIpAllowlist in KyuubiSessionManager - Add refreshIpAllowlist() in KyuubiServer - Add REST API endpoint POST /api/v1/admin/refresh/ip_allowlist - When ip.deny.list and ip.allowlist both contain the same IP, deny list takes higher priority
1 parent 3a529eb commit 7027bf6

6 files changed

Lines changed: 202 additions & 8 deletions

File tree

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3283,6 +3283,19 @@ object KyuubiConf {
32833283
.toSet()
32843284
.createWithDefault(Set.empty)
32853285

3286+
val SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST: ConfigEntry[Set[String]] =
3287+
buildConf("kyuubi.server.limit.connections.ip.allowlist")
3288+
.doc("When this list is not empty, only the client ip in the allow list will be" +
3289+
" permitted to connect to kyuubi server, all other ips will be denied." +
3290+
" If this list is empty (default), no ip allowlist restriction is applied." +
3291+
" Note: if a client ip is in both ip.allowlist and ip.deny.list," +
3292+
" the deny list takes higher priority.")
3293+
.version("1.10.0")
3294+
.serverOnly
3295+
.stringConf
3296+
.toSet()
3297+
.createWithDefault(Set.empty)
3298+
32863299
val SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER: OptionalConfigEntry[Int] =
32873300
buildConf("kyuubi.server.limit.batch.connections.per.user")
32883301
.doc("Maximum kyuubi server batch connections per user." +

kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ object KyuubiServer extends Logging {
175175
info(s"Refreshed deny client ips from $existingDenyIps to $refreshedDenyIps")
176176
}
177177

178+
private[kyuubi] def refreshIpAllowlist(): Unit = synchronized {
179+
val sessionMgr = kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
180+
val existingIpAllowlist = sessionMgr.getIpAllowlist
181+
sessionMgr.refreshIpAllowlist(createKyuubiConf())
182+
val refreshedIpAllowlist = sessionMgr.getIpAllowlist
183+
info(s"Refreshed ip allowlist from $existingIpAllowlist to $refreshedIpAllowlist")
184+
}
185+
178186
private def createKyuubiConf(): KyuubiConf = {
179187
KyuubiConf().loadFileDefaults().loadFromArgs(commandArgs)
180188
}

kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,25 @@ private[v1] class AdminResource extends ApiRequestContext with Logging {
161161
Response.ok(s"Refresh the deny ips successfully.").build()
162162
}
163163

164+
@ApiResponse(
165+
responseCode = "200",
166+
content = Array(new Content(mediaType = MediaType.APPLICATION_JSON)),
167+
description = "refresh the ip allowlist")
168+
@POST
169+
@Path("refresh/ip_allowlist")
170+
def refreshIpAllowlist(): Response = {
171+
val userName = fe.getSessionUser(Map.empty[String, String])
172+
val ipAddress = fe.getIpAddress
173+
info(s"Receive refresh ip allowlist request from $userName/$ipAddress")
174+
if (!fe.isAdministrator(userName)) {
175+
throw new ForbiddenException(
176+
s"$userName is not allowed to refresh the ip allowlist")
177+
}
178+
info(s"Reloading ip allowlist")
179+
KyuubiServer.refreshIpAllowlist()
180+
Response.ok(s"Refresh the ip allowlist successfully.").build()
181+
}
182+
164183
@ApiResponse(
165184
responseCode = "200",
166185
content = Array(new Content(

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -396,13 +396,15 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
396396
conf.get(SERVER_LIMIT_CONNECTIONS_USER_UNLIMITED_LIST).filter(_.nonEmpty)
397397
val userDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_USER_DENY_LIST).filter(_.nonEmpty)
398398
val ipDenyList = conf.get(SERVER_LIMIT_CONNECTIONS_IP_DENY_LIST).filter(_.nonEmpty)
399+
val ipAllowlist = conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty)
399400
limiter = applySessionLimiter(
400401
userLimit,
401402
ipAddressLimit,
402403
userIpAddressLimit,
403404
userUnlimitedList,
404405
userDenyList,
405-
ipDenyList)
406+
ipDenyList,
407+
ipAllowlist)
406408

407409
val userBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_USER).getOrElse(0)
408410
val ipAddressBatchLimit = conf.get(SERVER_LIMIT_BATCH_CONNECTIONS_PER_IPADDRESS).getOrElse(0)
@@ -414,7 +416,8 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
414416
userIpAddressBatchLimit,
415417
userUnlimitedList,
416418
userDenyList,
417-
ipDenyList)
419+
ipDenyList,
420+
ipAllowlist)
418421
}
419422

420423
private[kyuubi] def getUnlimitedUsers: Set[String] = {
@@ -448,22 +451,35 @@ class KyuubiSessionManager private (name: String) extends SessionManager(name) {
448451
batchLimiter.foreach(SessionLimiter.resetDenyIps(_, denyIps))
449452
}
450453

454+
private[kyuubi] def getIpAllowlist: Set[String] = {
455+
limiter.orElse(batchLimiter).map(SessionLimiter.getIpAllowlist).getOrElse(Set.empty)
456+
}
457+
458+
private[kyuubi] def refreshIpAllowlist(conf: KyuubiConf): Unit = {
459+
val ipAllowlist =
460+
conf.get(SERVER_LIMIT_CONNECTIONS_IP_ALLOWLIST).filter(_.nonEmpty)
461+
limiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist))
462+
batchLimiter.foreach(SessionLimiter.resetIpAllowlist(_, ipAllowlist))
463+
}
464+
451465
private def applySessionLimiter(
452466
userLimit: Int,
453467
ipAddressLimit: Int,
454468
userIpAddressLimit: Int,
455469
userUnlimitedList: Set[String],
456470
userDenyList: Set[String],
457-
ipDenyList: Set[String]): Option[SessionLimiter] = {
471+
ipDenyList: Set[String],
472+
ipAllowlist: Set[String] = Set.empty): Option[SessionLimiter] = {
458473
if (Seq(userLimit, ipAddressLimit, userIpAddressLimit).exists(_ > 0) ||
459-
userDenyList.nonEmpty || ipDenyList.nonEmpty) {
474+
userDenyList.nonEmpty || ipDenyList.nonEmpty || ipAllowlist.nonEmpty) {
460475
Some(SessionLimiter(
461476
userLimit,
462477
ipAddressLimit,
463478
userIpAddressLimit,
464479
userUnlimitedList,
465480
userDenyList,
466-
ipDenyList))
481+
ipDenyList,
482+
ipAllowlist))
467483
} else {
468484
None
469485
}

kyuubi-server/src/main/scala/org/apache/kyuubi/session/SessionLimiter.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ class SessionLimiterWithAccessControlListImpl(
108108
userIpAddressLimit: Int,
109109
var unlimitedUsers: Set[String],
110110
var denyUsers: Set[String],
111-
var denyIps: Set[String])
111+
var denyIps: Set[String],
112+
var ipAllowlist: Set[String] = Set.empty)
112113
extends SessionLimiterImpl(userLimit, ipAddressLimit, userIpAddressLimit) {
113114
override def increment(userIpAddress: UserIpAddress): Unit = {
114115
val user = userIpAddress.user
@@ -123,6 +124,12 @@ class SessionLimiterWithAccessControlListImpl(
123124
s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)"
124125
throw KyuubiSQLException(errorMsg)
125126
}
127+
// ip allowlist check: when allowlist is not empty, only allowed ips can connect
128+
if (ipAllowlist.nonEmpty && StringUtils.isNotBlank(ip) && !ipAllowlist.contains(ip)) {
129+
val errorMsg =
130+
s"Connection denied because the client ip is not in the ip allowlist. (ipAddress: $ip)"
131+
throw KyuubiSQLException(errorMsg)
132+
}
126133

127134
if (!unlimitedUsers.contains(user)) {
128135
super.increment(userIpAddress)
@@ -140,6 +147,10 @@ class SessionLimiterWithAccessControlListImpl(
140147
private[kyuubi] def setDenyIps(denyIps: Set[String]): Unit = {
141148
this.denyIps = denyIps
142149
}
150+
151+
private[kyuubi] def setIpAllowlist(ipAllowlist: Set[String]): Unit = {
152+
this.ipAllowlist = ipAllowlist
153+
}
143154
}
144155

145156
object SessionLimiter {
@@ -150,14 +161,16 @@ object SessionLimiter {
150161
userIpAddressLimit: Int,
151162
unlimitedUsers: Set[String] = Set.empty,
152163
denyUsers: Set[String] = Set.empty,
153-
denyIps: Set[String] = Set.empty): SessionLimiter = {
164+
denyIps: Set[String] = Set.empty,
165+
ipAllowlist: Set[String] = Set.empty): SessionLimiter = {
154166
new SessionLimiterWithAccessControlListImpl(
155167
userLimit,
156168
ipAddressLimit,
157169
userIpAddressLimit,
158170
unlimitedUsers,
159171
denyUsers,
160-
denyIps)
172+
denyIps,
173+
ipAllowlist)
161174
}
162175

163176
def resetUnlimitedUsers(limiter: SessionLimiter, unlimitedUsers: Set[String]): Unit =
@@ -192,4 +205,15 @@ object SessionLimiter {
192205
case l: SessionLimiterWithAccessControlListImpl => l.denyIps
193206
case _ => Set.empty
194207
}
208+
209+
def resetIpAllowlist(limiter: SessionLimiter, ipAllowlist: Set[String]): Unit =
210+
limiter match {
211+
case l: SessionLimiterWithAccessControlListImpl => l.setIpAllowlist(ipAllowlist)
212+
case _ =>
213+
}
214+
215+
def getIpAllowlist(limiter: SessionLimiter): Set[String] = limiter match {
216+
case l: SessionLimiterWithAccessControlListImpl => l.ipAllowlist
217+
case _ => Set.empty
218+
}
195219
}

kyuubi-server/src/test/scala/org/apache/kyuubi/session/SessionLimiterSuite.scala

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,4 +214,118 @@ class SessionLimiterSuite extends KyuubiFunSuite {
214214
limiter.asInstanceOf[SessionLimiterImpl].counters().asScala.values
215215
.foreach(c => assert(c.get() == 0))
216216
}
217+
218+
test("test session limiter with ip allowlist") {
219+
val allowedIp = "10.0.0.1"
220+
val blockedIp = "192.168.1.100"
221+
val ipAllowlist = Set(allowedIp)
222+
val limiter = SessionLimiter(
223+
100,
224+
100,
225+
100,
226+
Set.empty,
227+
Set.empty,
228+
Set.empty,
229+
ipAllowlist)
230+
231+
// allowed ip should be able to connect
232+
limiter.increment(UserIpAddress("user001", allowedIp))
233+
234+
// blocked ip should be denied
235+
val caught = intercept[KyuubiSQLException] {
236+
limiter.increment(UserIpAddress("user001", blockedIp))
237+
}
238+
assert(caught.getMessage.equals(
239+
s"Connection denied because the client ip is not in the ip allowlist." +
240+
s" (ipAddress: $blockedIp)"))
241+
}
242+
243+
test("test session limiter ip allowlist with multiple ips") {
244+
val allowedIp1 = "10.0.0.1"
245+
val allowedIp2 = "10.0.0.2"
246+
val blockedIp = "192.168.1.100"
247+
val ipAllowlist = Set(allowedIp1, allowedIp2)
248+
val limiter = SessionLimiter(
249+
100,
250+
100,
251+
100,
252+
Set.empty,
253+
Set.empty,
254+
Set.empty,
255+
ipAllowlist)
256+
257+
// both allowed ips should be able to connect
258+
limiter.increment(UserIpAddress("user001", allowedIp1))
259+
limiter.increment(UserIpAddress("user002", allowedIp2))
260+
261+
// blocked ip should be denied
262+
val caught = intercept[KyuubiSQLException] {
263+
limiter.increment(UserIpAddress("user003", blockedIp))
264+
}
265+
assert(caught.getMessage.contains("not in the ip allowlist"))
266+
}
267+
268+
test("test session limiter empty ip allowlist allows all ips") {
269+
val limiter = SessionLimiter(
270+
100,
271+
100,
272+
100,
273+
Set.empty,
274+
Set.empty,
275+
Set.empty,
276+
Set.empty)
277+
278+
// when allowlist is empty, all ips should be allowed
279+
limiter.increment(UserIpAddress("user001", "10.0.0.1"))
280+
limiter.increment(UserIpAddress("user002", "192.168.1.100"))
281+
limiter.increment(UserIpAddress("user003", "172.16.0.1"))
282+
}
283+
284+
test("test session limiter ip deny list has higher priority than ip allowlist") {
285+
val ip = "10.0.0.1"
286+
val denyIps = Set(ip)
287+
val ipAllowlist = Set(ip)
288+
val limiter = SessionLimiter(
289+
100,
290+
100,
291+
100,
292+
Set.empty,
293+
Set.empty,
294+
denyIps,
295+
ipAllowlist)
296+
297+
// deny ip list check happens before allowlist check
298+
val caught = intercept[KyuubiSQLException] {
299+
limiter.increment(UserIpAddress("user001", ip))
300+
}
301+
assert(caught.getMessage.equals(
302+
s"Connection denied because the client ip is in the deny ip list. (ipAddress: $ip)"))
303+
}
304+
305+
test("test refresh ip allowlist") {
306+
val allowedIp = "10.0.0.1"
307+
val blockedIp = "192.168.1.100"
308+
val limiter = SessionLimiter(
309+
100,
310+
100,
311+
100,
312+
Set.empty,
313+
Set.empty,
314+
Set.empty,
315+
Set(allowedIp))
316+
317+
// initially only allowedIp can connect
318+
limiter.increment(UserIpAddress("user001", allowedIp))
319+
intercept[KyuubiSQLException] {
320+
limiter.increment(UserIpAddress("user002", blockedIp))
321+
}
322+
323+
// refresh allowlist to include blockedIp
324+
SessionLimiter.resetIpAllowlist(limiter, Set(allowedIp, blockedIp))
325+
limiter.increment(UserIpAddress("user002", blockedIp))
326+
327+
// refresh allowlist to empty (allow all)
328+
SessionLimiter.resetIpAllowlist(limiter, Set.empty)
329+
limiter.increment(UserIpAddress("user003", "172.16.0.1"))
330+
}
217331
}

0 commit comments

Comments
 (0)