Skip to content

Commit d3fad45

Browse files
authored
Simplify misk-redis implementation (#3198)
Simplifies the RealRedis implementation by always deferring single operations to a pipeline. This incurs a little overhead since we have to create, sync, and close a pipeline for every command, but significantly simplifies the client. This change also removes the old timing proxy, so hopefully that compensates for the added overhead of the pipelines.
1 parent af3731f commit d3fad45

File tree

9 files changed

+587
-586
lines changed

9 files changed

+587
-586
lines changed

Diff for: .palantir/revapi.yml

+32
Original file line numberDiff line numberDiff line change
@@ -6192,6 +6192,38 @@ acceptedBreaks:
61926192
old: "parameter void misk.redis.RealPipelinedRedis::<init>(===misk.redis.JedisPipeline===)"
61936193
new: "parameter void misk.redis.RealPipelinedRedis::<init>(===redis.clients.jedis.AbstractPipeline===)"
61946194
justification: "RealPipelinedRedis is internal"
6195+
"2024.03.26.112919-01b20b7":
6196+
com.squareup.misk:misk-redis:
6197+
- code: "java.method.addedToInterface"
6198+
new: "method java.util.function.Supplier<java.lang.Double> misk.redis.DeferredRedis::zscore(java.lang.String,\
6199+
\ java.lang.String)"
6200+
justification: "Additive change only"
6201+
- code: "java.method.addedToInterface"
6202+
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zadd(java.lang.String,\
6203+
\ double, java.lang.String, misk.redis.Redis.ZAddOptions[])"
6204+
justification: "Additive change only"
6205+
- code: "java.method.addedToInterface"
6206+
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zadd(java.lang.String,\
6207+
\ java.util.Map<java.lang.String, java.lang.Double>, misk.redis.Redis.ZAddOptions[])"
6208+
justification: "Additive change only"
6209+
- code: "java.method.addedToInterface"
6210+
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zcard(java.lang.String)"
6211+
justification: "Additive change only"
6212+
- code: "java.method.addedToInterface"
6213+
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zremRangeByRank(java.lang.String,\
6214+
\ misk.redis.Redis.ZRangeRankMarker, misk.redis.Redis.ZRangeRankMarker)"
6215+
justification: "Additive change only"
6216+
- code: "java.method.addedToInterface"
6217+
new: "method java.util.function.Supplier<java.util.List<kotlin.Pair<okio.ByteString,\
6218+
\ java.lang.Double>>> misk.redis.DeferredRedis::zrangeWithScores(java.lang.String,\
6219+
\ misk.redis.Redis.ZRangeType, misk.redis.Redis.ZRangeMarker, misk.redis.Redis.ZRangeMarker,\
6220+
\ boolean, misk.redis.Redis.ZRangeLimit)"
6221+
justification: "Additive change only"
6222+
- code: "java.method.addedToInterface"
6223+
new: "method java.util.function.Supplier<java.util.List<okio.ByteString>> misk.redis.DeferredRedis::zrange(java.lang.String,\
6224+
\ misk.redis.Redis.ZRangeType, misk.redis.Redis.ZRangeMarker, misk.redis.Redis.ZRangeMarker,\
6225+
\ boolean, misk.redis.Redis.ZRangeLimit)"
6226+
justification: "Additive change only"
61956227
misk-0.18.0:
61966228
com.squareup.misk:misk-gcp:
61976229
- code: "java.method.numberOfParametersChanged"

Diff for: misk-redis/api/misk-redis.api

+16
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,20 @@ public abstract interface class misk/redis/DeferredRedis {
3636
public abstract fun rpush (Ljava/lang/String;[Lokio/ByteString;)Ljava/util/function/Supplier;
3737
public abstract fun set (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
3838
public abstract fun setnx (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
39+
public abstract fun zadd (Ljava/lang/String;DLjava/lang/String;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
40+
public abstract fun zadd (Ljava/lang/String;Ljava/util/Map;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
41+
public abstract fun zcard (Ljava/lang/String;)Ljava/util/function/Supplier;
42+
public abstract fun zrange (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
43+
public abstract fun zrangeWithScores (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
44+
public abstract fun zremRangeByRank (Ljava/lang/String;Lmisk/redis/Redis$ZRangeRankMarker;Lmisk/redis/Redis$ZRangeRankMarker;)Ljava/util/function/Supplier;
45+
public abstract fun zscore (Ljava/lang/String;Ljava/lang/String;)Ljava/util/function/Supplier;
3946
}
4047

4148
public final class misk/redis/DeferredRedis$DefaultImpls {
4249
public static synthetic fun set$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;ILjava/lang/Object;)Ljava/util/function/Supplier;
4350
public static synthetic fun setnx$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;ILjava/lang/Object;)Ljava/util/function/Supplier;
51+
public static synthetic fun zrange$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;ILjava/lang/Object;)Ljava/util/function/Supplier;
52+
public static synthetic fun zrangeWithScores$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;ILjava/lang/Object;)Ljava/util/function/Supplier;
4453
}
4554

4655
public final class misk/redis/FakeRedis : misk/redis/Redis {
@@ -592,6 +601,13 @@ public final class misk/redis/testing/FakeRedis$FakePipelinedRedis : misk/redis/
592601
public fun rpush (Ljava/lang/String;[Lokio/ByteString;)Ljava/util/function/Supplier;
593602
public fun set (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
594603
public fun setnx (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
604+
public fun zadd (Ljava/lang/String;DLjava/lang/String;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
605+
public fun zadd (Ljava/lang/String;Ljava/util/Map;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
606+
public fun zcard (Ljava/lang/String;)Ljava/util/function/Supplier;
607+
public fun zrange (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
608+
public fun zrangeWithScores (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
609+
public fun zremRangeByRank (Ljava/lang/String;Lmisk/redis/Redis$ZRangeRankMarker;Lmisk/redis/Redis$ZRangeRankMarker;)Ljava/util/function/Supplier;
610+
public fun zscore (Ljava/lang/String;Ljava/lang/String;)Ljava/util/function/Supplier;
595611
}
596612

597613
public abstract interface annotation class misk/redis/testing/ForFakeRedis : java/lang/annotation/Annotation {

Diff for: misk-redis/src/main/kotlin/misk/redis/DeferredRedis.kt

+44
Original file line numberDiff line numberDiff line change
@@ -97,5 +97,49 @@ interface DeferredRedis {
9797

9898
fun pExpireAt(key: String, timestampMilliseconds: Long): Supplier<Boolean>
9999

100+
fun zadd(
101+
key: String,
102+
score: Double,
103+
member: String,
104+
vararg options: Redis.ZAddOptions
105+
): Supplier<Long>
106+
107+
fun zadd(
108+
key: String,
109+
scoreMembers: Map<String, Double>,
110+
vararg options: Redis.ZAddOptions
111+
): Supplier<Long>
112+
113+
fun zscore(
114+
key: String,
115+
member: String
116+
): Supplier<Double?>
117+
118+
fun zrange(
119+
key: String,
120+
type: Redis.ZRangeType = Redis.ZRangeType.INDEX,
121+
start: Redis.ZRangeMarker,
122+
stop: Redis.ZRangeMarker,
123+
reverse: Boolean = false,
124+
limit: Redis.ZRangeLimit? = null,
125+
): Supplier<List<ByteString?>>
126+
127+
fun zrangeWithScores(
128+
key: String,
129+
type: Redis.ZRangeType = Redis.ZRangeType.INDEX,
130+
start: Redis.ZRangeMarker,
131+
stop: Redis.ZRangeMarker,
132+
reverse: Boolean = false,
133+
limit: Redis.ZRangeLimit? = null,
134+
): Supplier<List<Pair<ByteString?, Double>>>
135+
136+
fun zremRangeByRank(
137+
key: String,
138+
start: Redis.ZRangeRankMarker,
139+
stop: Redis.ZRangeRankMarker,
140+
): Supplier<Long>
141+
142+
fun zcard(key: String): Supplier<Long>
143+
100144
fun close()
101145
}

Diff for: misk-redis/src/main/kotlin/misk/redis/RealPipelinedRedis.kt

+187-3
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import okio.ByteString
44
import okio.ByteString.Companion.toByteString
55
import redis.clients.jedis.AbstractPipeline
66
import redis.clients.jedis.ClusterPipeline
7-
import redis.clients.jedis.Jedis
87
import redis.clients.jedis.Pipeline
9-
import redis.clients.jedis.PipelineBase
108
import redis.clients.jedis.Response
119
import redis.clients.jedis.args.ListDirection
1210
import redis.clients.jedis.params.SetParams
@@ -27,7 +25,7 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
2725
} else {
2826
RuntimeException(
2927
"""
30-
|When using clustered Redis, keys used by one $op command in a pipeline must always map to the same slot, but mapped to slots $slots.
28+
|When using clustered Redis, keys used by one $op command must always map to the same slot, but mapped to slots $slots.
3129
|You can use {hashtags} in your key name to control how Redis hashes keys to slots.
3230
|For example, keys: `{customer9001}.contacts` and `{customer9001}.payments` will hash to the same slot.
3331
|
@@ -54,6 +52,7 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
5452
pipeline.del(*slottedKeys.toTypedArray())
5553
}
5654
}
55+
5756
else -> error("Unknown pipeline type: $pipeline")
5857
}
5958
return Supplier {
@@ -71,6 +70,7 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
7170
val response = pipeline.mget(*keysBytes)
7271
Supplier { response.get().map { it?.toByteString() } }
7372
}
73+
7474
is ClusterPipeline -> {
7575
val responses = keysBytes.groupBy { JedisClusterCRC16.getSlot(it) }
7676
.mapValues { (_, slottedKeys) ->
@@ -90,6 +90,7 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
9090
keys.map { keyToValueMap[it] }
9191
}
9292
}
93+
9394
else -> error("Unknown pipeline type: $pipeline")
9495
}
9596
}
@@ -109,6 +110,7 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
109110
pipeline.mset(*slottedKeyValues.flatten().toTypedArray())
110111
}
111112
}
113+
112114
else -> error("Unknown pipeline type: $pipeline")
113115
}
114116
return Supplier { responses.map { it.get() } }
@@ -370,6 +372,188 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
370372
return Supplier { response.get() == 1L }
371373
}
372374

375+
override fun zadd(
376+
key: String,
377+
score: Double,
378+
member: String,
379+
vararg options: Redis.ZAddOptions,
380+
): Supplier<Long> {
381+
Redis.ZAddOptions.verify(options)
382+
val keyBytes = key.toByteArray(charset)
383+
val memberBytes = member.toByteArray(charset)
384+
val params = Redis.ZAddOptions.getZAddParams(options)
385+
val response = pipeline.zadd(keyBytes, score, memberBytes, params)
386+
return Supplier { response.get() }
387+
}
388+
389+
override fun zadd(
390+
key: String,
391+
scoreMembers: Map<String, Double>,
392+
vararg options: Redis.ZAddOptions,
393+
): Supplier<Long> {
394+
Redis.ZAddOptions.verify(options)
395+
val keyBytes = key.toByteArray(charset)
396+
val scoreMembersBytes = scoreMembers.mapKeys { it.key.toByteArray(charset) }
397+
val params = Redis.ZAddOptions.getZAddParams(options)
398+
val response = pipeline.zadd(keyBytes, scoreMembersBytes, params)
399+
return Supplier { response.get() }
400+
}
401+
402+
override fun zscore(key: String, member: String): Supplier<Double?> {
403+
val keyBytes = key.toByteArray(charset)
404+
val memberBytes = member.toByteArray(charset)
405+
val response = pipeline.zscore(keyBytes, memberBytes)
406+
return Supplier { response.get() }
407+
}
408+
409+
override fun zrange(
410+
key: String,
411+
type: Redis.ZRangeType,
412+
start: Redis.ZRangeMarker,
413+
stop: Redis.ZRangeMarker,
414+
reverse: Boolean,
415+
limit: Redis.ZRangeLimit?
416+
): Supplier<List<ByteString?>> {
417+
val response = zrangeBase(key, type, start, stop, reverse, false, limit).noScore
418+
return Supplier {
419+
response?.get()?.map { bytes -> bytes?.toByteString() } ?: listOf()
420+
}
421+
}
422+
423+
override fun zrangeWithScores(
424+
key: String,
425+
type: Redis.ZRangeType,
426+
start: Redis.ZRangeMarker,
427+
stop: Redis.ZRangeMarker,
428+
reverse: Boolean,
429+
limit: Redis.ZRangeLimit?
430+
): Supplier<List<Pair<ByteString?, Double>>> {
431+
val response = zrangeBase(key, type, start, stop, reverse, true, limit).withScore
432+
return Supplier {
433+
response?.get()?.map { tuple -> Pair(tuple.binaryElement?.toByteString(), tuple.score) } ?: listOf()
434+
}
435+
}
436+
437+
override fun zremRangeByRank(
438+
key: String,
439+
start: Redis.ZRangeRankMarker,
440+
stop: Redis.ZRangeRankMarker
441+
): Supplier<Long> {
442+
val response = pipeline.zremrangeByRank(key, start.longValue, stop.longValue)
443+
return Supplier { response.get() }
444+
}
445+
446+
override fun zcard(key: String): Supplier<Long> {
447+
val response = pipeline.zcard(key)
448+
return Supplier { response.get() }
449+
}
450+
451+
private fun zrangeBase(
452+
key: String,
453+
type: Redis.ZRangeType,
454+
start: Redis.ZRangeMarker,
455+
stop: Redis.ZRangeMarker,
456+
reverse: Boolean,
457+
withScore: Boolean,
458+
limit: Redis.ZRangeLimit?,
459+
): ZRangeResponse {
460+
return when (type) {
461+
Redis.ZRangeType.INDEX ->
462+
zrangeByIndex(
463+
key = key,
464+
start = start as Redis.ZRangeIndexMarker,
465+
stop = stop as Redis.ZRangeIndexMarker,
466+
reverse = reverse,
467+
withScore = withScore
468+
)
469+
470+
Redis.ZRangeType.SCORE ->
471+
zrangeByScore(
472+
key = key,
473+
start = start as Redis.ZRangeScoreMarker,
474+
stop = stop as Redis.ZRangeScoreMarker,
475+
reverse = reverse,
476+
withScore = withScore,
477+
limit = limit
478+
)
479+
}
480+
}
481+
482+
private fun zrangeByIndex(
483+
key: String,
484+
start: Redis.ZRangeIndexMarker,
485+
stop: Redis.ZRangeIndexMarker,
486+
reverse: Boolean,
487+
withScore: Boolean
488+
): ZRangeResponse {
489+
val params = ZRangeParams(
490+
start.intValue,
491+
stop.intValue
492+
)
493+
if (reverse) params.rev()
494+
495+
return if (withScore) {
496+
ZRangeResponse.withScore(pipeline.zrangeWithScores(key.toByteArray(charset), params))
497+
} else {
498+
ZRangeResponse.noScore(pipeline.zrange(key.toByteArray(charset), params))
499+
}
500+
}
501+
502+
private fun zrangeByScore(
503+
key: String,
504+
start: Redis.ZRangeScoreMarker,
505+
stop: Redis.ZRangeScoreMarker,
506+
reverse: Boolean,
507+
withScore: Boolean,
508+
limit: Redis.ZRangeLimit?,
509+
): ZRangeResponse {
510+
val min = start.toString().toByteArray(charset)
511+
val max = stop.toString().toByteArray(charset)
512+
val keyBytes = key.toByteArray(charset)
513+
514+
return if (limit == null && !reverse && !withScore) {
515+
ZRangeResponse.noScore(pipeline.zrangeByScore(keyBytes, min, max))
516+
} else if (limit == null && !reverse) {
517+
ZRangeResponse.withScore(pipeline.zrangeByScoreWithScores(keyBytes, min, max)
518+
)
519+
} else if (limit == null && !withScore) {
520+
ZRangeResponse.noScore(pipeline.zrevrangeByScore(keyBytes, max, min)
521+
)
522+
} else if (limit == null) {
523+
ZRangeResponse.withScore(pipeline.zrevrangeByScoreWithScores(keyBytes, max, min)
524+
)
525+
} else if (!reverse && !withScore) {
526+
ZRangeResponse.noScore(pipeline.zrangeByScore(keyBytes, min, max, limit.offset, limit.count)
527+
)
528+
} else if (!reverse) {
529+
ZRangeResponse.withScore(
530+
pipeline.zrangeByScoreWithScores(keyBytes, min, max, limit.offset, limit.count)
531+
)
532+
} else if (!withScore) {
533+
ZRangeResponse.noScore(
534+
pipeline.zrevrangeByScore(keyBytes, max, min, limit.offset, limit.count)
535+
)
536+
} else {
537+
ZRangeResponse.withScore(
538+
pipeline.zrevrangeByScoreWithScores(keyBytes, max, min, limit.offset, limit.count)
539+
)
540+
}
541+
}
542+
543+
/**
544+
* A wrapper class for handling response from zrange* methods.
545+
*/
546+
private class ZRangeResponse private constructor(
547+
val noScore: Response<List<ByteArray?>>?,
548+
val withScore: Response<List<Tuple>>?
549+
) {
550+
companion object {
551+
fun noScore(ans: Response<List<ByteArray?>>?): ZRangeResponse = ZRangeResponse(ans, null)
552+
553+
fun withScore(ans: Response<List<Tuple>>?): ZRangeResponse = ZRangeResponse(null, ans)
554+
}
555+
}
556+
373557
override fun close() {
374558
pipeline.close()
375559
}

0 commit comments

Comments
 (0)