Skip to content

Commit 10785f7

Browse files
authored
Merge pull request #207 from aPureBase/kdataloader-fix
Fix the dataloader dispatcher issue
2 parents 3591a17 + 770efc9 commit 10785f7

27 files changed

+852
-25
lines changed

build.gradle.kts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ val sonatypeUsername: String? = System.getenv("sonatypeUsername")
66
val sonatypePassword: String? = System.getenv("sonatypePassword")
77

88
plugins {
9-
id("com.github.ben-manes.versions") version "0.39.0"
9+
id("com.github.ben-manes.versions") version "0.44.0"
1010
id("io.codearte.nexus-staging") version "0.30.0"
1111
id("de.marcphilipp.nexus-publish") version "0.4.0"
1212
jacoco

gradle.properties

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ version=0.18.1
33

44
# Dependencies
55
coroutine_version=1.6.4
6-
jackson_version=2.13.4
7-
caffeine_version=3.1.1
8-
serialization_version=1.4.0
6+
jackson_version=2.14.1
7+
caffeine_version=3.1.2
8+
serialization_version=1.4.1
99
kDataLoader_version=0.5.1
1010
deferredJsonBuilder_version=1.0.0
11-
ktor_version=2.1.2
11+
ktor_version=2.2.2
1212

1313
# Test-Dependencies
1414
kotlin_html_version=0.7.5
1515
netty_version=4.1.82.Final
16-
junit_version=5.9.0
17-
kluent_version=1.68
16+
junit_version=5.9.2
17+
kluent_version=1.72
1818
hamcrest_version=2.2
1919

2020

@@ -25,7 +25,7 @@ coverallsGradlePlugin_version=2.8.4
2525
jacoco_version=0.8.5
2626

2727
# Example Dependencies
28-
logback_version=1.2.1
28+
logback_version=1.4.5
2929
exposed_version=0.32.1
3030
h2_version=1.4.200
3131
hikari_version=4.0.3

kgraphql-example/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
plugins {
22
base
33
application
4-
kotlin("jvm") version "1.7.10"
5-
id("org.jetbrains.dokka") version "1.7.10"
4+
kotlin("jvm") version "1.8.0"
5+
id("org.jetbrains.dokka") version "1.7.20"
66
signing
77
}
88

kgraphql-ktor/build.gradle.kts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
plugins {
22
base
3-
kotlin("jvm") version "1.6.20"
4-
kotlin("plugin.serialization") version "1.6.20"
5-
id("org.jetbrains.dokka") version "1.4.32"
3+
kotlin("jvm") version "1.8.0"
4+
kotlin("plugin.serialization") version "1.8.0"
5+
id("org.jetbrains.dokka") version "1.7.20"
66
signing
77
}
88

kgraphql/build.gradle.kts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11

22
plugins {
33
base
4-
kotlin("jvm") version "1.7.10"
5-
id("org.jetbrains.dokka") version "1.7.10"
4+
kotlin("jvm") version "1.8.0"
5+
id("org.jetbrains.dokka") version "1.7.20"
66
signing
77
}
88

@@ -26,14 +26,16 @@ dependencies {
2626
implementation(kotlin("reflect"))
2727

2828
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutine_version")
29+
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$coroutine_version")
2930
implementation("org.jetbrains.kotlinx:kotlinx-serialization-json:$serialization_version") // JVM dependency
3031

3132
implementation("com.fasterxml.jackson.core:jackson-databind:$jackson_version")
3233
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jackson_version")
3334

3435
implementation("com.github.ben-manes.caffeine:caffeine:$caffeine_version")
3536
implementation("com.apurebase:DeferredJsonBuilder:$deferredJsonBuilder_version")
36-
api("de.nidomiro:KDataLoader:$kDataLoader_version")
37+
38+
// api("de.nidomiro:KDataLoader:$kDataLoader_version")
3739

3840

3941
testImplementation("io.netty:netty-all:$netty_version")

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/dsl/DataLoaderPropertyDSL.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ class DataLoaderPropertyDSL<T, K, R>(
8787
{ TimedAutoDispatcherDataLoaderOptions() },
8888
mapOf(),
8989
dataLoader!!,
90-
null,
9190
)
9291
)
9392
}

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/DataLoaderPreparedRequestExecutor.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,7 @@ import com.apurebase.kgraphql.schema.scalar.serializeScalar
1616
import com.apurebase.kgraphql.schema.structure.Field
1717
import com.apurebase.kgraphql.schema.structure.InputValue
1818
import com.apurebase.kgraphql.schema.structure.Type
19-
import kotlinx.coroutines.CompletableDeferred
20-
import kotlinx.coroutines.Deferred
21-
import kotlinx.coroutines.coroutineScope
19+
import kotlinx.coroutines.*
2220
import kotlinx.serialization.json.*
2321
import nidomiro.kdataloader.DataLoader
2422
import kotlin.reflect.KProperty1
@@ -34,25 +32,24 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
3432
val loaders: Map<Field.DataLoader<*, *, *>, DataLoader<Any?, *>>
3533
)
3634

37-
private suspend fun ExecutionPlan.constructLoaders(): Map<Field.DataLoader<*, *, *>, DataLoader<Any?, *>> {
35+
private suspend fun ExecutionPlan.constructLoaders(): Map<Field.DataLoader<*, *, *>, DataLoader<Any?, *>> = coroutineScope {
3836
val loaders = mutableMapOf<Field.DataLoader<*, *, *>, DataLoader<Any?, *>>()
3937

4038
suspend fun Collection<Execution>.look() {
4139
forEach { ex ->
42-
ex.selectionNode
4340
when (ex) {
4441
is Execution.Fragment -> ex.elements.look()
4542
is Execution.Node -> {
4643
ex.children.look()
4744
if (ex.field is Field.DataLoader<*, *, *>) {
48-
loaders[ex.field] = ex.field.loader.constructNew() as DataLoader<Any?, *>
45+
loaders[ex.field] = ex.field.loader.constructNew(coroutineContext.job) as DataLoader<Any?, *>
4946
}
5047
}
5148
}
5249
}
5350
}
5451
operations.look()
55-
return loaders
52+
loaders
5653
}
5754

5855
private suspend fun <T> DeferredJsonMap.writeOperation(

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/ParallelRequestExecutor.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class ParallelRequestExecutor(val schema: DefaultSchema) : RequestExecutor {
292292
)
293293

294294
// as this isn't the DataLoaderPreparedRequestExecutor. We'll use this instant workaround instead.
295-
val loader = field.loader.constructNew() as DataLoader<Any?, Any?>
295+
val loader = field.loader.constructNew(null) as DataLoader<Any?, Any?>
296296
val value = loader.loadAsync(preparedValue)
297297
loader.dispatch()
298298

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package nidomiro.kdataloader
2+
3+
sealed class BatchMode {
4+
/**
5+
* Load data in batches of [batchSize]
6+
*/
7+
data class LoadInBatch(val batchSize: Int? = null) : BatchMode()
8+
9+
/**
10+
* Load everything immediately
11+
*/
12+
object LoadImmediately : BatchMode()
13+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package nidomiro.kdataloader
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
5+
/**
6+
* A "Threadsafe" Cache
7+
* (Coroutine-Save)
8+
*/
9+
interface Cache<K, V> {
10+
11+
suspend fun store(key: K, value: CompletableDeferred<V>): CompletableDeferred<V>
12+
13+
suspend fun get(key: K): CompletableDeferred<V>?
14+
15+
suspend fun getOrCreate(
16+
key: K,
17+
generator: suspend (key: K) -> CompletableDeferred<V>,
18+
callOnCacheHit: suspend () -> Unit
19+
): CompletableDeferred<V>
20+
21+
suspend fun clear(key: K): CompletableDeferred<V>?
22+
23+
suspend fun clear()
24+
}
25+
26+
suspend fun <K, V> Cache<K, V>.getOrCreate(
27+
key: K,
28+
generator: suspend (key: K) -> CompletableDeferred<V>
29+
): CompletableDeferred<V> =
30+
getOrCreate(key, generator, {})
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package nidomiro.kdataloader
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
import kotlinx.coroutines.sync.Mutex
5+
import kotlinx.coroutines.sync.withLock
6+
7+
class CoroutineMapCache<K, V>(
8+
private val cacheMap: MutableMap<K, CompletableDeferred<V>> = mutableMapOf()
9+
) : Cache<K, V> {
10+
private val mutex = Mutex()
11+
12+
13+
override suspend fun store(key: K, value: CompletableDeferred<V>): CompletableDeferred<V> {
14+
mutex.withLock {
15+
cacheMap[key] = value
16+
}
17+
return value
18+
}
19+
20+
override suspend fun get(key: K): CompletableDeferred<V>? =
21+
mutex.withLock {
22+
cacheMap[key]
23+
}
24+
25+
override suspend fun getOrCreate(
26+
key: K,
27+
generator: suspend (key: K) -> CompletableDeferred<V>,
28+
callOnCacheHit: suspend () -> Unit
29+
): CompletableDeferred<V> =
30+
mutex.withLock {
31+
val currentVal = cacheMap[key]
32+
if (currentVal == null) {
33+
val generated = generator(key)
34+
cacheMap[key] = generated
35+
return@withLock generated
36+
} else {
37+
callOnCacheHit()
38+
return@withLock currentVal
39+
}
40+
}
41+
42+
43+
override suspend fun clear(key: K): CompletableDeferred<V>? =
44+
mutex.withLock {
45+
cacheMap.remove(key)
46+
}
47+
48+
override suspend fun clear() =
49+
mutex.withLock {
50+
cacheMap.clear()
51+
}
52+
53+
54+
}
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package nidomiro.kdataloader
2+
3+
import kotlinx.coroutines.CoroutineScope
4+
import kotlinx.coroutines.Deferred
5+
import kotlinx.coroutines.Job
6+
import kotlinx.coroutines.joinAll
7+
import kotlin.jvm.JvmName
8+
import nidomiro.kdataloader.statistics.DataLoaderStatistics
9+
10+
typealias BatchLoader<K, R> = suspend (ids: List<K>) -> List<ExecutionResult<R>>
11+
12+
interface DataLoader<K, R> {
13+
14+
val options: DataLoaderOptions<K, R>
15+
16+
/**
17+
* Loads the value for the given Key.
18+
* The returned [Deferred] completes with the finish of [dispatch] in case of [DataLoaderOptions.batchLoadEnabled] = true.
19+
* If [DataLoaderOptions.batchLoadEnabled] = false it calls the BatchLoader immediately and returns the retrieved value.
20+
*/
21+
suspend fun loadAsync(key: K): Deferred<R>
22+
23+
/**
24+
* The same as [loadAsync] but for multiple Keys at once.
25+
*/
26+
suspend fun loadManyAsync(vararg keys: K): Deferred<List<R>>
27+
28+
/**
29+
* Executes all stored requests via the given batchLoader.
30+
* After this function finishes all [Deferred] created before are completed.
31+
*/
32+
suspend fun dispatch()
33+
34+
/**
35+
* Removes the value of the given Key from the cache
36+
*/
37+
suspend fun clear(key: K)
38+
39+
/**
40+
* Removes all values from the cache
41+
*/
42+
suspend fun clearAll()
43+
44+
/**
45+
* Primes the cache with the given values.
46+
* After priming the [BatchLoader] will not be called with this key.
47+
*/
48+
suspend fun prime(key: K, value: R)
49+
50+
/**
51+
* Primes the cache with the given [Throwable].
52+
* After priming the [BatchLoader] will not be called with this key, if [DataLoaderOptions.cacheExceptions] = true.
53+
*/
54+
suspend fun prime(key: K, value: Throwable)
55+
56+
57+
/**
58+
* Returns a snapshot of the statistics at the point of calling
59+
*/
60+
suspend fun createStatisticsSnapshot(): DataLoaderStatistics
61+
62+
}
63+
64+
/**
65+
* @see DataLoader.prime(K, R)
66+
*/
67+
suspend fun <K, R> SimpleDataLoaderImpl<K, R>.prime(cacheEntry: Pair<K, R>) {
68+
prime(cacheEntry.first, cacheEntry.second)
69+
}
70+
71+
/**
72+
* @see DataLoader.prime(K, Throwable)
73+
*/
74+
@JvmName("primeFailure")
75+
suspend fun <K, R> SimpleDataLoaderImpl<K, R>.prime(cacheEntry: Pair<K, Throwable>) {
76+
prime(cacheEntry.first, cacheEntry.second)
77+
}
78+
79+
internal suspend fun <K, R> DataLoader<K, R>.prime(key: K, value: ExecutionResult<R>) =
80+
when (value) {
81+
is ExecutionResult.Success -> prime(key, value.value)
82+
is ExecutionResult.Failure -> prime(key, value.throwable)
83+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package nidomiro.kdataloader
2+
3+
open class DataLoaderOptions<K, R>(
4+
/**
5+
* The cache implementation
6+
*/
7+
val cache: Cache<K, R>? = CoroutineMapCache(),
8+
9+
/**
10+
* Cache Exceptional States?
11+
*/
12+
val cacheExceptions: Boolean = true,
13+
14+
/**
15+
* The batch-mode
16+
*/
17+
val batchMode: BatchMode = BatchMode.LoadInBatch()
18+
)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package nidomiro.kdataloader
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
import kotlinx.coroutines.sync.Mutex
5+
import kotlinx.coroutines.sync.withLock
6+
7+
class DefaultLoaderQueueImpl<K, V> : LoaderQueue<K, V> {
8+
9+
private val mutex = Mutex()
10+
private var queue: MutableList<LoaderQueueEntry<K, CompletableDeferred<V>>> = mutableListOf()
11+
12+
13+
override suspend fun enqueue(key: K, deferred: CompletableDeferred<V>) {
14+
mutex.withLock {
15+
queue.add(LoaderQueueEntry(key, deferred))
16+
}
17+
}
18+
19+
override suspend fun getAllItemsAsList(): List<LoaderQueueEntry<K, CompletableDeferred<V>>> =
20+
mutex.withLock {
21+
val currentQueue = queue
22+
queue = mutableListOf()
23+
return@withLock currentQueue
24+
}
25+
26+
27+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package nidomiro.kdataloader
2+
3+
sealed class ExecutionResult<out T> {
4+
data class Success<out T>(val value: T) : ExecutionResult<T>()
5+
data class Failure(val throwable: Throwable) : ExecutionResult<Nothing>()
6+
}

0 commit comments

Comments
 (0)