Skip to content

Commit 2be753a

Browse files
authored
Merge pull request #149 from aPureBase/temp-solution-for-data-loader-issues
Use new timed auto dispatcher for dataloaders
2 parents 6e3cdc4 + 571c323 commit 2be753a

31 files changed

+881
-61
lines changed

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# KGraphQL version
2-
version=0.17.8
2+
version=0.17.9-alpha
33

44
# Dependencies
55
coroutine_version=1.3.9

kgraphql-ktor/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
plugins {
22
base
3-
kotlin("jvm") version "1.4.31"
4-
kotlin("plugin.serialization") version "1.4.31"
3+
kotlin("jvm") version "1.5.10"
4+
kotlin("plugin.serialization") version "1.5.0"
55
id("org.jetbrains.dokka") version "0.10.1"
66
signing
77
}

kgraphql/build.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
plugins {
33
base
4-
kotlin("jvm") version "1.4.31"
4+
kotlin("jvm") version "1.5.10"
55
id("org.jetbrains.dokka") version "0.10.1"
66
signing
77
}
@@ -28,7 +28,7 @@ dependencies {
2828
implementation("com.fasterxml.jackson.core:jackson-databind:$jackson_version")
2929
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:$jackson_version")
3030
implementation("com.github.ben-manes.caffeine:caffeine:$caffeine_version")
31-
api("de.nidomiro:KDataLoader:$kDataLoader_version")
31+
// api("de.nidomiro:KDataLoader:$kDataLoader_version")
3232
implementation("org.jetbrains.kotlinx:kotlinx-serialization-core:$serialization_version") // JVM dependency
3333

3434

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/dsl/DataLoaderPropertyDSL.kt

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,10 @@ import com.apurebase.kgraphql.Context
44
import com.apurebase.kgraphql.schema.model.FunctionWrapper
55
import com.apurebase.kgraphql.schema.model.InputValueDef
66
import com.apurebase.kgraphql.schema.model.PropertyDef
7-
import nidomiro.kdataloader.BatchLoader
8-
import nidomiro.kdataloader.dsl.dataLoaderFactory
7+
import com.apurebase.kgraphql.test.BatchLoader
8+
import com.apurebase.kgraphql.test.TimedAutoDispatcherDataLoaderOptions
9+
import com.apurebase.kgraphql.test.dsl.dataLoaderFactory
10+
import com.apurebase.kgraphql.test.factories.TimedAutoDispatcherDataLoaderFactory
911
import kotlin.reflect.KType
1012

1113
class DataLoaderPropertyDSL<T, K, R>(
@@ -80,7 +82,11 @@ class DataLoaderPropertyDSL<T, K, R>(
8082
inputValues = inputValues,
8183
returnType = returnType,
8284
prepare = prepareWrapper!!,
83-
loader = dataLoaderFactory(dataLoader!!)
85+
loader = TimedAutoDispatcherDataLoaderFactory(
86+
{ TimedAutoDispatcherDataLoaderOptions() },
87+
mapOf(),
88+
dataLoader!!
89+
)
8490
)
8591
}
8692

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/DataLoaderPreparedRequestExecutor.kt

Lines changed: 29 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,8 @@ import com.apurebase.kgraphql.schema.structure.Field
1515
import com.apurebase.kgraphql.schema.structure.InputValue
1616
import com.apurebase.kgraphql.schema.structure.Type
1717
import kotlinx.coroutines.*
18-
import kotlinx.coroutines.sync.Mutex
19-
import kotlinx.coroutines.sync.withLock
2018
import kotlinx.serialization.json.*
21-
import nidomiro.kdataloader.DataLoader
22-
import nidomiro.kdataloader.factories.DataLoaderFactory
23-
import java.util.concurrent.ConcurrentHashMap
24-
import java.util.concurrent.atomic.AtomicLong
19+
import com.apurebase.kgraphql.test.DataLoader
2520
import kotlin.reflect.KProperty1
2621

2722

@@ -32,43 +27,29 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
3227
inner class ExecutionContext(
3328
val variables: Variables,
3429
val requestContext: Context,
35-
private val dataCounters: ConcurrentHashMap<DataLoader<*, *>, AtomicLong> = ConcurrentHashMap(),
36-
private val loaders: ConcurrentHashMap<DataLoaderFactory<Any?, *>, DataLoader<Any?, *>> = ConcurrentHashMap()
37-
) : Mutex by Mutex() {
38-
39-
40-
suspend fun get(loader: DataLoader<*, *>): Long = withLock {
41-
dataCounters[loader]?.get() ?: throw IllegalArgumentException("Something went wrong with execution")
42-
}
43-
suspend fun add(loader: DataLoader<*, *>, check: Pair<*, String>, count: Long) = withLock {
44-
if (dataCounters[loader] == null) {
45-
/** TODO: There shouldn't be a need for Atomic here as we are using [withLock] */
46-
dataCounters[loader] = AtomicLong(count)
47-
} else {
48-
val counter = dataCounters[loader]!!
49-
counter.getAndUpdate {
50-
it + count
30+
val loaders: Map<Field.DataLoader<*, *, *>, DataLoader<Any?, *>>,
31+
)
32+
33+
private suspend fun ExecutionPlan.constructLoaders(job: Job): Map<Field.DataLoader<*, *, *>, DataLoader<Any?, *>> {
34+
val loaders = mutableMapOf<Field.DataLoader<*, *, *>, DataLoader<Any?, *>>()
35+
36+
suspend fun Collection<Execution>.look() {
37+
forEach { ex ->
38+
ex.selectionNode
39+
when (ex) {
40+
is Execution.Fragment -> ex.elements.look()
41+
is Execution.Node -> {
42+
ex.children.look()
43+
if (ex.field is Field.DataLoader<*, *, *>) {
44+
loaders[ex.field] = ex.field.loader.constructNew(job) as DataLoader<Any?, *>
45+
}
46+
}
5147
}
5248
}
5349
}
5450

55-
suspend fun load(builder: DeferredJsonMap, loaderfactory: DataLoaderFactory<Any?, Any?>, node: Execution.Node, preparedValue: Any?): Deferred<Any?> {
56-
val loader = loaders.getOrPut(loaderfactory) { loaderfactory.constructNew() }
57-
add(loader, preparedValue to node.selectionNode.fullPath, 1) // parentCount)
58-
59-
val value = loader.loadAsync(preparedValue)
60-
61-
builder.deferredLaunch {
62-
val count = get(loader)
63-
val stats = loader.createStatisticsSnapshot()
64-
if (stats.objectsRequested >= count) {
65-
loader.dispatch()
66-
} // else if (stats.objectsRequested > count) throw TODO("This should never happen!!!")
67-
}
68-
69-
70-
return value
71-
}
51+
operations.look()
52+
return loaders
7253
}
7354

7455

@@ -325,8 +306,7 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
325306
ctx = ctx
326307
) // ?: TODO("Nullable prepare functions isn't supported")
327308

328-
329-
val value = ctx.load(this, field.loader as DataLoaderFactory<Any?, Any?>, node, preparedValue)
309+
val value = ctx.loaders[field]!!.loadAsync(preparedValue)
330310

331311

332312
applyKeyToElement(ctx, value, node, field.returnType, parentCount)
@@ -360,19 +340,23 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
360340
}
361341

362342
override suspend fun suspendExecute(plan: ExecutionPlan, variables: VariablesJson, context: Context) = coroutineScope {
363-
deferredJsonBuilder(timeout = plan.options.timeout ?: schema.configuration.timeout) {
343+
val result = deferredJsonBuilder(timeout = plan.options.timeout ?: schema.configuration.timeout) {
364344
val ctx = ExecutionContext(
365345
Variables(schema, variables, plan.firstOrNull { it.variables != null }?.variables),
366-
context
346+
context,
347+
plan.constructLoaders(job),
367348
)
368349

369-
370350
"data" toDeferredObj {
371351
plan.forEach { node ->
372352
if (shouldInclude(ctx, node)) writeOperation(ctx, node, node.field as Field.Function<*, *>)
373353
}
374354
}
375-
}.toString()
355+
ctx.loaders.values.map { it.dispatch() }
356+
}
357+
358+
359+
result.toString()
376360
}
377361

378362
private fun createNullNode(node: Execution.Node, returnType: Type): JsonNull = if (returnType !is Type.NonNull) {

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/execution/ParallelRequestExecutor.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import com.fasterxml.jackson.databind.node.JsonNodeFactory
1818
import com.fasterxml.jackson.databind.node.NullNode
1919
import com.fasterxml.jackson.databind.node.ObjectNode
2020
import kotlinx.coroutines.*
21-
import nidomiro.kdataloader.DataLoader
21+
import com.apurebase.kgraphql.test.DataLoader
2222
import kotlin.reflect.KProperty1
2323

2424

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/model/PropertyDef.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.apurebase.kgraphql.schema.model
22

33
import com.apurebase.kgraphql.Context
4-
import nidomiro.kdataloader.BatchLoader
4+
import com.apurebase.kgraphql.test.factories.DataLoaderFactory
55
import kotlin.reflect.KProperty1
66
import kotlin.reflect.KType
77

@@ -29,7 +29,7 @@ interface PropertyDef<T> : Depreciable, DescribedDef {
2929
*/
3030
open class DataLoadedFunction<T, K, R>(
3131
override val name: String,
32-
val loader: nidomiro.kdataloader.factories.DataLoaderFactory<K, R>,
32+
val loader: DataLoaderFactory<K, R>,
3333
val prepare: FunctionWrapper<K>,
3434
val returnType: KType,
3535
override val description: String? = null,

kgraphql/src/main/kotlin/com/apurebase/kgraphql/schema/structure/Field.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package com.apurebase.kgraphql.schema.structure
33
import com.apurebase.kgraphql.Context
44
import com.apurebase.kgraphql.schema.introspection.*
55
import com.apurebase.kgraphql.schema.model.*
6-
import nidomiro.kdataloader.DataLoader
6+
import com.apurebase.kgraphql.test.factories.DataLoaderFactory
77
import kotlin.reflect.full.findAnnotation
88

99

@@ -44,7 +44,7 @@ sealed class Field : __Field {
4444

4545
class DataLoader<T, K, R>(
4646
val kql: PropertyDef.DataLoadedFunction<T, K, R>,
47-
val loader: nidomiro.kdataloader.factories.DataLoaderFactory<K, R>,
47+
val loader: DataLoaderFactory<K, R>,
4848
override val returnType: Type,
4949
override val arguments: List<InputValue<*>>
5050
): Field() {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.apurebase.kgraphql.test
2+
3+
sealed class BatchMode {
4+
/**
5+
* Load data in batches of [batchSize]
6+
*/
7+
data class LoadInBatch(val batchSize: Int? = null) : BatchMode()
8+
9+
/**
10+
* Load everything immediately
11+
*/
12+
object LoadImmediately : BatchMode()
13+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.apurebase.kgraphql.test
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
5+
/**
6+
* A "Threadsafe" Cache
7+
* (Coroutine-Save)
8+
*/
9+
interface Cache<K, V> {
10+
11+
suspend fun store(key: K, value: CompletableDeferred<V>): CompletableDeferred<V>
12+
13+
suspend fun get(key: K): CompletableDeferred<V>?
14+
15+
suspend fun getOrCreate(
16+
key: K,
17+
generator: suspend (key: K) -> CompletableDeferred<V>,
18+
callOnCacheHit: suspend () -> Unit
19+
): CompletableDeferred<V>
20+
21+
suspend fun clear(key: K): CompletableDeferred<V>?
22+
23+
suspend fun clear()
24+
}
25+
26+
suspend fun <K, V> Cache<K, V>.getOrCreate(
27+
key: K,
28+
generator: suspend (key: K) -> CompletableDeferred<V>
29+
): CompletableDeferred<V> =
30+
getOrCreate(key, generator, {})
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package com.apurebase.kgraphql.test
2+
3+
import kotlinx.coroutines.CompletableDeferred
4+
import kotlinx.coroutines.sync.Mutex
5+
import kotlinx.coroutines.sync.withLock
6+
7+
class CoroutineMapCache<K, V>(
8+
private val cacheMap: MutableMap<K, CompletableDeferred<V>> = mutableMapOf()
9+
) : Cache<K, V> {
10+
private val mutex = Mutex()
11+
12+
13+
override suspend fun store(key: K, value: CompletableDeferred<V>): CompletableDeferred<V> {
14+
mutex.withLock {
15+
cacheMap[key] = value
16+
}
17+
return value
18+
}
19+
20+
override suspend fun get(key: K): CompletableDeferred<V>? =
21+
mutex.withLock {
22+
cacheMap[key]
23+
}
24+
25+
override suspend fun getOrCreate(
26+
key: K,
27+
generator: suspend (key: K) -> CompletableDeferred<V>,
28+
callOnCacheHit: suspend () -> Unit
29+
): CompletableDeferred<V> =
30+
mutex.withLock {
31+
val currentVal = cacheMap[key]
32+
if (currentVal == null) {
33+
val generated = generator(key)
34+
cacheMap[key] = generated
35+
return@withLock generated
36+
} else {
37+
callOnCacheHit()
38+
return@withLock currentVal
39+
}
40+
}
41+
42+
43+
override suspend fun clear(key: K): CompletableDeferred<V>? =
44+
mutex.withLock {
45+
cacheMap.remove(key)
46+
}
47+
48+
override suspend fun clear() =
49+
mutex.withLock {
50+
cacheMap.clear()
51+
}
52+
53+
54+
}

0 commit comments

Comments
 (0)