1
1
package com.apurebase.kgraphql.schema.execution
2
2
3
+ import com.apurebase.deferredJson.DeferredJsonMap
4
+ import com.apurebase.deferredJson.deferredJsonBuilder
3
5
import com.apurebase.kgraphql.Context
4
6
import com.apurebase.kgraphql.ExecutionException
5
7
import com.apurebase.kgraphql.GraphQLError
@@ -14,14 +16,11 @@ import com.apurebase.kgraphql.schema.scalar.serializeScalar
14
16
import com.apurebase.kgraphql.schema.structure.Field
15
17
import com.apurebase.kgraphql.schema.structure.InputValue
16
18
import com.apurebase.kgraphql.schema.structure.Type
17
- import kotlinx.coroutines.*
18
- import kotlinx.coroutines.sync.Mutex
19
- import kotlinx.coroutines.sync.withLock
19
+ import kotlinx.coroutines.CompletableDeferred
20
+ import kotlinx.coroutines.Deferred
21
+ import kotlinx.coroutines.coroutineScope
20
22
import kotlinx.serialization.json.*
21
23
import nidomiro.kdataloader.DataLoader
22
- import nidomiro.kdataloader.factories.DataLoaderFactory
23
- import java.util.concurrent.ConcurrentHashMap
24
- import java.util.concurrent.atomic.AtomicLong
25
24
import kotlin.reflect.KProperty1
26
25
27
26
@@ -32,46 +31,30 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
32
31
inner class ExecutionContext (
33
32
val variables : Variables ,
34
33
val requestContext : Context ,
35
- private val dataCounters : ConcurrentHashMap <DataLoader <* , * >, AtomicLong > = ConcurrentHashMap (),
36
- private val loaders : ConcurrentHashMap <DataLoaderFactory <Any ?, * >, DataLoader <Any ?, * >> = ConcurrentHashMap ()
37
- ) : Mutex by Mutex() {
38
-
39
-
40
- suspend fun get (loader : DataLoader <* , * >): Long = withLock {
41
- dataCounters[loader]?.get() ? : throw IllegalArgumentException (" Something went wrong with execution" )
42
- }
43
- suspend fun add (loader : DataLoader <* , * >, check : Pair <* , String >, count : Long ) = withLock {
44
- if (dataCounters[loader] == null ) {
45
- /* * TODO: There shouldn't be a need for Atomic here as we are using [withLock] */
46
- dataCounters[loader] = AtomicLong (count)
47
- } else {
48
- val counter = dataCounters[loader]!!
49
- counter.getAndUpdate {
50
- it + count
34
+ val loaders : Map <Field .DataLoader <* , * , * >, DataLoader <Any ?, * >>
35
+ )
36
+
37
+ private suspend fun ExecutionPlan.constructLoaders (): Map <Field .DataLoader <* , * , * >, DataLoader<Any?, *>> {
38
+ val loaders = mutableMapOf<Field .DataLoader <* , * , * >, DataLoader <Any ?, * >>()
39
+
40
+ suspend fun Collection<Execution>.look () {
41
+ forEach { ex ->
42
+ ex.selectionNode
43
+ when (ex) {
44
+ is Execution .Fragment -> ex.elements.look()
45
+ is Execution .Node -> {
46
+ ex.children.look()
47
+ if (ex.field is Field .DataLoader <* , * , * >) {
48
+ loaders[ex.field] = ex.field.loader.constructNew() as DataLoader <Any ?, * >
49
+ }
50
+ }
51
51
}
52
52
}
53
53
}
54
-
55
- suspend fun load (builder : DeferredJsonMap , loaderfactory : DataLoaderFactory <Any ?, Any ?>, node : Execution .Node , preparedValue : Any? ): Deferred <Any ?> {
56
- val loader = loaders.getOrPut(loaderfactory) { loaderfactory.constructNew() }
57
- add(loader, preparedValue to node.selectionNode.fullPath, 1 ) // parentCount)
58
-
59
- val value = loader.loadAsync(preparedValue)
60
-
61
- builder.deferredLaunch {
62
- val count = get(loader)
63
- val stats = loader.createStatisticsSnapshot()
64
- if (stats.objectsRequested >= count) {
65
- loader.dispatch()
66
- } // else if (stats.objectsRequested > count) throw TODO("This should never happen!!!")
67
- }
68
-
69
-
70
- return value
71
- }
54
+ operations.look()
55
+ return loaders
72
56
}
73
57
74
-
75
58
private suspend fun <T > DeferredJsonMap.writeOperation (
76
59
ctx : ExecutionContext ,
77
60
node : Execution .Node ,
@@ -212,7 +195,11 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
212
195
}
213
196
}
214
197
}
215
- } else {
198
+ } else if (expectedType.kind == TypeKind .UNION ) return handleFragment(
199
+ ctx,
200
+ value,
201
+ container.elements.first { expectedType.name == expectedType.name } as Execution .Fragment
202
+ ) else {
216
203
throw IllegalStateException (" fragments can be specified on object types, interfaces, and unions" )
217
204
}
218
205
}
@@ -326,7 +313,7 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
326
313
) // ?: TODO("Nullable prepare functions isn't supported")
327
314
328
315
329
- val value = ctx.load( this , field.loader as DataLoaderFactory < Any ?, Any ?>, node, preparedValue)
316
+ val value = ctx.loaders[ field] !! .loadAsync( preparedValue)
330
317
331
318
332
319
applyKeyToElement(ctx, value, node, field.returnType, parentCount)
@@ -360,10 +347,11 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
360
347
}
361
348
362
349
override suspend fun suspendExecute (plan : ExecutionPlan , variables : VariablesJson , context : Context ) = coroutineScope {
363
- deferredJsonBuilder(timeout = plan.options.timeout ? : schema.configuration.timeout) {
350
+ val result = deferredJsonBuilder(timeout = plan.options.timeout ? : schema.configuration.timeout) {
364
351
val ctx = ExecutionContext (
365
352
Variables (schema, variables, plan.firstOrNull { it.variables != null }?.variables),
366
- context
353
+ context,
354
+ plan.constructLoaders(),
367
355
)
368
356
369
357
@@ -372,7 +360,10 @@ class DataLoaderPreparedRequestExecutor(val schema: DefaultSchema) : RequestExec
372
360
if (shouldInclude(ctx, node)) writeOperation(ctx, node, node.field as Field .Function <* , * >)
373
361
}
374
362
}
375
- }.toString()
363
+ ctx.loaders.values.map { it.dispatch() }
364
+ }
365
+
366
+ result.await().toString()
376
367
}
377
368
378
369
private fun createNullNode (node : Execution .Node , returnType : Type ): JsonNull = if (returnType !is Type .NonNull ) {
0 commit comments