Skip to content

Commit 9331701

Browse files
authored
Revert "Simplify misk-redis implementation (#3198)" (#3218)
This reverts commit d3fad45.
1 parent 656112e commit 9331701

File tree

9 files changed

+586
-587
lines changed

9 files changed

+586
-587
lines changed

.palantir/revapi.yml

-32
Original file line numberDiff line numberDiff line change
@@ -6192,38 +6192,6 @@ acceptedBreaks:
61926192
old: "parameter void misk.redis.RealPipelinedRedis::<init>(===misk.redis.JedisPipeline===)"
61936193
new: "parameter void misk.redis.RealPipelinedRedis::<init>(===redis.clients.jedis.AbstractPipeline===)"
61946194
justification: "RealPipelinedRedis is internal"
6195-
"2024.03.26.112919-01b20b7":
6196-
com.squareup.misk:misk-redis:
6197-
- code: "java.method.addedToInterface"
6198-
new: "method java.util.function.Supplier<java.lang.Double> misk.redis.DeferredRedis::zscore(java.lang.String,\
6199-
\ java.lang.String)"
6200-
justification: "Additive change only"
6201-
- code: "java.method.addedToInterface"
6202-
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zadd(java.lang.String,\
6203-
\ double, java.lang.String, misk.redis.Redis.ZAddOptions[])"
6204-
justification: "Additive change only"
6205-
- code: "java.method.addedToInterface"
6206-
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zadd(java.lang.String,\
6207-
\ java.util.Map<java.lang.String, java.lang.Double>, misk.redis.Redis.ZAddOptions[])"
6208-
justification: "Additive change only"
6209-
- code: "java.method.addedToInterface"
6210-
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zcard(java.lang.String)"
6211-
justification: "Additive change only"
6212-
- code: "java.method.addedToInterface"
6213-
new: "method java.util.function.Supplier<java.lang.Long> misk.redis.DeferredRedis::zremRangeByRank(java.lang.String,\
6214-
\ misk.redis.Redis.ZRangeRankMarker, misk.redis.Redis.ZRangeRankMarker)"
6215-
justification: "Additive change only"
6216-
- code: "java.method.addedToInterface"
6217-
new: "method java.util.function.Supplier<java.util.List<kotlin.Pair<okio.ByteString,\
6218-
\ java.lang.Double>>> misk.redis.DeferredRedis::zrangeWithScores(java.lang.String,\
6219-
\ misk.redis.Redis.ZRangeType, misk.redis.Redis.ZRangeMarker, misk.redis.Redis.ZRangeMarker,\
6220-
\ boolean, misk.redis.Redis.ZRangeLimit)"
6221-
justification: "Additive change only"
6222-
- code: "java.method.addedToInterface"
6223-
new: "method java.util.function.Supplier<java.util.List<okio.ByteString>> misk.redis.DeferredRedis::zrange(java.lang.String,\
6224-
\ misk.redis.Redis.ZRangeType, misk.redis.Redis.ZRangeMarker, misk.redis.Redis.ZRangeMarker,\
6225-
\ boolean, misk.redis.Redis.ZRangeLimit)"
6226-
justification: "Additive change only"
62276195
misk-0.18.0:
62286196
com.squareup.misk:misk-gcp:
62296197
- code: "java.method.numberOfParametersChanged"

misk-redis/api/misk-redis.api

-16
Original file line numberDiff line numberDiff line change
@@ -36,20 +36,11 @@ public abstract interface class misk/redis/DeferredRedis {
3636
public abstract fun rpush (Ljava/lang/String;[Lokio/ByteString;)Ljava/util/function/Supplier;
3737
public abstract fun set (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
3838
public abstract fun setnx (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
39-
public abstract fun zadd (Ljava/lang/String;DLjava/lang/String;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
40-
public abstract fun zadd (Ljava/lang/String;Ljava/util/Map;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
41-
public abstract fun zcard (Ljava/lang/String;)Ljava/util/function/Supplier;
42-
public abstract fun zrange (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
43-
public abstract fun zrangeWithScores (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
44-
public abstract fun zremRangeByRank (Ljava/lang/String;Lmisk/redis/Redis$ZRangeRankMarker;Lmisk/redis/Redis$ZRangeRankMarker;)Ljava/util/function/Supplier;
45-
public abstract fun zscore (Ljava/lang/String;Ljava/lang/String;)Ljava/util/function/Supplier;
4639
}
4740

4841
public final class misk/redis/DeferredRedis$DefaultImpls {
4942
public static synthetic fun set$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;ILjava/lang/Object;)Ljava/util/function/Supplier;
5043
public static synthetic fun setnx$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;ILjava/lang/Object;)Ljava/util/function/Supplier;
51-
public static synthetic fun zrange$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;ILjava/lang/Object;)Ljava/util/function/Supplier;
52-
public static synthetic fun zrangeWithScores$default (Lmisk/redis/DeferredRedis;Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;ILjava/lang/Object;)Ljava/util/function/Supplier;
5344
}
5445

5546
public final class misk/redis/FakeRedis : misk/redis/Redis {
@@ -601,13 +592,6 @@ public final class misk/redis/testing/FakeRedis$FakePipelinedRedis : misk/redis/
601592
public fun rpush (Ljava/lang/String;[Lokio/ByteString;)Ljava/util/function/Supplier;
602593
public fun set (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
603594
public fun setnx (Ljava/lang/String;Lokio/ByteString;Ljava/time/Duration;)Ljava/util/function/Supplier;
604-
public fun zadd (Ljava/lang/String;DLjava/lang/String;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
605-
public fun zadd (Ljava/lang/String;Ljava/util/Map;[Lmisk/redis/Redis$ZAddOptions;)Ljava/util/function/Supplier;
606-
public fun zcard (Ljava/lang/String;)Ljava/util/function/Supplier;
607-
public fun zrange (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
608-
public fun zrangeWithScores (Ljava/lang/String;Lmisk/redis/Redis$ZRangeType;Lmisk/redis/Redis$ZRangeMarker;Lmisk/redis/Redis$ZRangeMarker;ZLmisk/redis/Redis$ZRangeLimit;)Ljava/util/function/Supplier;
609-
public fun zremRangeByRank (Ljava/lang/String;Lmisk/redis/Redis$ZRangeRankMarker;Lmisk/redis/Redis$ZRangeRankMarker;)Ljava/util/function/Supplier;
610-
public fun zscore (Ljava/lang/String;Ljava/lang/String;)Ljava/util/function/Supplier;
611595
}
612596

613597
public abstract interface annotation class misk/redis/testing/ForFakeRedis : java/lang/annotation/Annotation {

misk-redis/src/main/kotlin/misk/redis/DeferredRedis.kt

-44
Original file line numberDiff line numberDiff line change
@@ -97,49 +97,5 @@ interface DeferredRedis {
9797

9898
fun pExpireAt(key: String, timestampMilliseconds: Long): Supplier<Boolean>
9999

100-
fun zadd(
101-
key: String,
102-
score: Double,
103-
member: String,
104-
vararg options: Redis.ZAddOptions
105-
): Supplier<Long>
106-
107-
fun zadd(
108-
key: String,
109-
scoreMembers: Map<String, Double>,
110-
vararg options: Redis.ZAddOptions
111-
): Supplier<Long>
112-
113-
fun zscore(
114-
key: String,
115-
member: String
116-
): Supplier<Double?>
117-
118-
fun zrange(
119-
key: String,
120-
type: Redis.ZRangeType = Redis.ZRangeType.INDEX,
121-
start: Redis.ZRangeMarker,
122-
stop: Redis.ZRangeMarker,
123-
reverse: Boolean = false,
124-
limit: Redis.ZRangeLimit? = null,
125-
): Supplier<List<ByteString?>>
126-
127-
fun zrangeWithScores(
128-
key: String,
129-
type: Redis.ZRangeType = Redis.ZRangeType.INDEX,
130-
start: Redis.ZRangeMarker,
131-
stop: Redis.ZRangeMarker,
132-
reverse: Boolean = false,
133-
limit: Redis.ZRangeLimit? = null,
134-
): Supplier<List<Pair<ByteString?, Double>>>
135-
136-
fun zremRangeByRank(
137-
key: String,
138-
start: Redis.ZRangeRankMarker,
139-
stop: Redis.ZRangeRankMarker,
140-
): Supplier<Long>
141-
142-
fun zcard(key: String): Supplier<Long>
143-
144100
fun close()
145101
}

misk-redis/src/main/kotlin/misk/redis/RealPipelinedRedis.kt

+3-187
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import okio.ByteString
44
import okio.ByteString.Companion.toByteString
55
import redis.clients.jedis.AbstractPipeline
66
import redis.clients.jedis.ClusterPipeline
7+
import redis.clients.jedis.Jedis
78
import redis.clients.jedis.Pipeline
9+
import redis.clients.jedis.PipelineBase
810
import redis.clients.jedis.Response
911
import redis.clients.jedis.args.ListDirection
1012
import redis.clients.jedis.params.SetParams
@@ -25,7 +27,7 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
2527
} else {
2628
RuntimeException(
2729
"""
28-
|When using clustered Redis, keys used by one $op command must always map to the same slot, but mapped to slots $slots.
30+
|When using clustered Redis, keys used by one $op command in a pipeline must always map to the same slot, but mapped to slots $slots.
2931
|You can use {hashtags} in your key name to control how Redis hashes keys to slots.
3032
|For example, keys: `{customer9001}.contacts` and `{customer9001}.payments` will hash to the same slot.
3133
|
@@ -52,7 +54,6 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
5254
pipeline.del(*slottedKeys.toTypedArray())
5355
}
5456
}
55-
5657
else -> error("Unknown pipeline type: $pipeline")
5758
}
5859
return Supplier {
@@ -70,7 +71,6 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
7071
val response = pipeline.mget(*keysBytes)
7172
Supplier { response.get().map { it?.toByteString() } }
7273
}
73-
7474
is ClusterPipeline -> {
7575
val responses = keysBytes.groupBy { JedisClusterCRC16.getSlot(it) }
7676
.mapValues { (_, slottedKeys) ->
@@ -90,7 +90,6 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
9090
keys.map { keyToValueMap[it] }
9191
}
9292
}
93-
9493
else -> error("Unknown pipeline type: $pipeline")
9594
}
9695
}
@@ -110,7 +109,6 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
110109
pipeline.mset(*slottedKeyValues.flatten().toTypedArray())
111110
}
112111
}
113-
114112
else -> error("Unknown pipeline type: $pipeline")
115113
}
116114
return Supplier { responses.map { it.get() } }
@@ -372,188 +370,6 @@ internal class RealPipelinedRedis(private val pipeline: AbstractPipeline) : Defe
372370
return Supplier { response.get() == 1L }
373371
}
374372

375-
override fun zadd(
376-
key: String,
377-
score: Double,
378-
member: String,
379-
vararg options: Redis.ZAddOptions,
380-
): Supplier<Long> {
381-
Redis.ZAddOptions.verify(options)
382-
val keyBytes = key.toByteArray(charset)
383-
val memberBytes = member.toByteArray(charset)
384-
val params = Redis.ZAddOptions.getZAddParams(options)
385-
val response = pipeline.zadd(keyBytes, score, memberBytes, params)
386-
return Supplier { response.get() }
387-
}
388-
389-
override fun zadd(
390-
key: String,
391-
scoreMembers: Map<String, Double>,
392-
vararg options: Redis.ZAddOptions,
393-
): Supplier<Long> {
394-
Redis.ZAddOptions.verify(options)
395-
val keyBytes = key.toByteArray(charset)
396-
val scoreMembersBytes = scoreMembers.mapKeys { it.key.toByteArray(charset) }
397-
val params = Redis.ZAddOptions.getZAddParams(options)
398-
val response = pipeline.zadd(keyBytes, scoreMembersBytes, params)
399-
return Supplier { response.get() }
400-
}
401-
402-
override fun zscore(key: String, member: String): Supplier<Double?> {
403-
val keyBytes = key.toByteArray(charset)
404-
val memberBytes = member.toByteArray(charset)
405-
val response = pipeline.zscore(keyBytes, memberBytes)
406-
return Supplier { response.get() }
407-
}
408-
409-
override fun zrange(
410-
key: String,
411-
type: Redis.ZRangeType,
412-
start: Redis.ZRangeMarker,
413-
stop: Redis.ZRangeMarker,
414-
reverse: Boolean,
415-
limit: Redis.ZRangeLimit?
416-
): Supplier<List<ByteString?>> {
417-
val response = zrangeBase(key, type, start, stop, reverse, false, limit).noScore
418-
return Supplier {
419-
response?.get()?.map { bytes -> bytes?.toByteString() } ?: listOf()
420-
}
421-
}
422-
423-
override fun zrangeWithScores(
424-
key: String,
425-
type: Redis.ZRangeType,
426-
start: Redis.ZRangeMarker,
427-
stop: Redis.ZRangeMarker,
428-
reverse: Boolean,
429-
limit: Redis.ZRangeLimit?
430-
): Supplier<List<Pair<ByteString?, Double>>> {
431-
val response = zrangeBase(key, type, start, stop, reverse, true, limit).withScore
432-
return Supplier {
433-
response?.get()?.map { tuple -> Pair(tuple.binaryElement?.toByteString(), tuple.score) } ?: listOf()
434-
}
435-
}
436-
437-
override fun zremRangeByRank(
438-
key: String,
439-
start: Redis.ZRangeRankMarker,
440-
stop: Redis.ZRangeRankMarker
441-
): Supplier<Long> {
442-
val response = pipeline.zremrangeByRank(key, start.longValue, stop.longValue)
443-
return Supplier { response.get() }
444-
}
445-
446-
override fun zcard(key: String): Supplier<Long> {
447-
val response = pipeline.zcard(key)
448-
return Supplier { response.get() }
449-
}
450-
451-
private fun zrangeBase(
452-
key: String,
453-
type: Redis.ZRangeType,
454-
start: Redis.ZRangeMarker,
455-
stop: Redis.ZRangeMarker,
456-
reverse: Boolean,
457-
withScore: Boolean,
458-
limit: Redis.ZRangeLimit?,
459-
): ZRangeResponse {
460-
return when (type) {
461-
Redis.ZRangeType.INDEX ->
462-
zrangeByIndex(
463-
key = key,
464-
start = start as Redis.ZRangeIndexMarker,
465-
stop = stop as Redis.ZRangeIndexMarker,
466-
reverse = reverse,
467-
withScore = withScore
468-
)
469-
470-
Redis.ZRangeType.SCORE ->
471-
zrangeByScore(
472-
key = key,
473-
start = start as Redis.ZRangeScoreMarker,
474-
stop = stop as Redis.ZRangeScoreMarker,
475-
reverse = reverse,
476-
withScore = withScore,
477-
limit = limit
478-
)
479-
}
480-
}
481-
482-
private fun zrangeByIndex(
483-
key: String,
484-
start: Redis.ZRangeIndexMarker,
485-
stop: Redis.ZRangeIndexMarker,
486-
reverse: Boolean,
487-
withScore: Boolean
488-
): ZRangeResponse {
489-
val params = ZRangeParams(
490-
start.intValue,
491-
stop.intValue
492-
)
493-
if (reverse) params.rev()
494-
495-
return if (withScore) {
496-
ZRangeResponse.withScore(pipeline.zrangeWithScores(key.toByteArray(charset), params))
497-
} else {
498-
ZRangeResponse.noScore(pipeline.zrange(key.toByteArray(charset), params))
499-
}
500-
}
501-
502-
private fun zrangeByScore(
503-
key: String,
504-
start: Redis.ZRangeScoreMarker,
505-
stop: Redis.ZRangeScoreMarker,
506-
reverse: Boolean,
507-
withScore: Boolean,
508-
limit: Redis.ZRangeLimit?,
509-
): ZRangeResponse {
510-
val min = start.toString().toByteArray(charset)
511-
val max = stop.toString().toByteArray(charset)
512-
val keyBytes = key.toByteArray(charset)
513-
514-
return if (limit == null && !reverse && !withScore) {
515-
ZRangeResponse.noScore(pipeline.zrangeByScore(keyBytes, min, max))
516-
} else if (limit == null && !reverse) {
517-
ZRangeResponse.withScore(pipeline.zrangeByScoreWithScores(keyBytes, min, max)
518-
)
519-
} else if (limit == null && !withScore) {
520-
ZRangeResponse.noScore(pipeline.zrevrangeByScore(keyBytes, max, min)
521-
)
522-
} else if (limit == null) {
523-
ZRangeResponse.withScore(pipeline.zrevrangeByScoreWithScores(keyBytes, max, min)
524-
)
525-
} else if (!reverse && !withScore) {
526-
ZRangeResponse.noScore(pipeline.zrangeByScore(keyBytes, min, max, limit.offset, limit.count)
527-
)
528-
} else if (!reverse) {
529-
ZRangeResponse.withScore(
530-
pipeline.zrangeByScoreWithScores(keyBytes, min, max, limit.offset, limit.count)
531-
)
532-
} else if (!withScore) {
533-
ZRangeResponse.noScore(
534-
pipeline.zrevrangeByScore(keyBytes, max, min, limit.offset, limit.count)
535-
)
536-
} else {
537-
ZRangeResponse.withScore(
538-
pipeline.zrevrangeByScoreWithScores(keyBytes, max, min, limit.offset, limit.count)
539-
)
540-
}
541-
}
542-
543-
/**
544-
* A wrapper class for handling response from zrange* methods.
545-
*/
546-
private class ZRangeResponse private constructor(
547-
val noScore: Response<List<ByteArray?>>?,
548-
val withScore: Response<List<Tuple>>?
549-
) {
550-
companion object {
551-
fun noScore(ans: Response<List<ByteArray?>>?): ZRangeResponse = ZRangeResponse(ans, null)
552-
553-
fun withScore(ans: Response<List<Tuple>>?): ZRangeResponse = ZRangeResponse(null, ans)
554-
}
555-
}
556-
557373
override fun close() {
558374
pipeline.close()
559375
}

0 commit comments

Comments
 (0)