1+ package space.kscience.dataforge.data
2+
3+ import kotlinx.coroutines.CoroutineScope
4+ import kotlinx.coroutines.flow.Flow
5+ import kotlinx.coroutines.flow.MutableSharedFlow
6+ import kotlinx.coroutines.flow.SharedFlow
7+ import kotlinx.coroutines.flow.mapNotNull
8+ import kotlinx.coroutines.launch
9+ import kotlinx.coroutines.sync.Mutex
10+ import kotlinx.coroutines.sync.withLock
11+ import space.kscience.dataforge.meta.Meta
12+ import space.kscience.dataforge.meta.MutableMeta
13+ import space.kscience.dataforge.misc.UnsafeKType
14+ import space.kscience.dataforge.names.*
15+ import kotlin.reflect.KType
16+ import kotlin.reflect.typeOf
17+
18+ /* *
19+ * A builder for data trees
20+ */
21+ public fun interface DataBuilder <T > : DataBuilderScope <T > {
22+ public fun data (name : Name , data : Data <T >)
23+ }
24+
25+ @Deprecated(" Use DataBuilder instead" , ReplaceWith (" DataBuilder<T>" ))
26+ public typealias StaticDataBuilder <T > = DataBuilder <T >
27+
28+ /* *
29+ * A builder for dynamic data trees
30+ */
31+ public interface DynamicDataBuilder <T > : DataBuilder <T > {
32+ /* *
33+ * Asynchronously update the data tree.
34+ *
35+ * This method could be called multiple times. In this case different updaters are applied simultaneously and concurrently.
36+ *
37+ * Since updates are concurrent, the specific order of application is undetermined.
38+ */
39+ public fun update (block : suspend DataSink <T >.() -> Unit )
40+ }
41+
42+ /* *
43+ * A builder for data tree branches and static data
44+ */
45+ private class DataMapBuilder <T > : DataBuilder <T > {
46+ val map = mutableMapOf<Name , Data <T >>()
47+
48+ override fun data (name : Name , data : Data <T >) {
49+ if (map.containsKey(name)) {
50+ error(" Duplicate key '$name '" )
51+ } else {
52+ map[name] = data
53+ }
54+ }
55+ }
56+
57+ /* *
58+ * Map-based implementation of [DataTree]
59+ */
60+ private class FlatDataTree <T >(
61+ override val dataType : KType ,
62+ private val dataSet : Map <Name , Data <T >>,
63+ private val sourceUpdates : SharedFlow <Name >,
64+ private val prefix : Name ,
65+ ) : DataTree<T> {
66+ override val data: Data <T >? get() = dataSet[prefix]
67+ override val items: Map <NameToken , FlatDataTree <T >>
68+ get() = dataSet.keys
69+ .filter { it.startsWith(prefix) && it.length > prefix.length }
70+ .map { it.tokens[prefix.length] }
71+ .associateWith { FlatDataTree (dataType, dataSet, sourceUpdates, prefix + it) }
72+
73+ override fun read (name : Name ): Data <T >? = dataSet[prefix + name]
74+
75+ override val updates: Flow <Name > = sourceUpdates.mapNotNull { update ->
76+ update.removeFirstOrNull(prefix)
77+ }
78+ }
79+
80+ /* *
81+ * A builder for [FlatDataTree].
82+ */
83+ private class DataTreeBuilder <T >(
84+ private val type : KType ,
85+ initialData : Map <Name , Data <T >> = emptyMap(),
86+ ) : DataSink<T> {
87+
88+ private val map = HashMap <Name , Data <T >>(initialData)
89+
90+ private val mutex = Mutex ()
91+
92+ private val updatesFlow = MutableSharedFlow <Name >()
93+
94+
95+ override suspend fun write (name : Name , data : Data <T >? ) {
96+ mutex.withLock {
97+ if (data == null ) {
98+ map.remove(name)
99+ } else {
100+ map[name] = data
101+ }
102+ }
103+ updatesFlow.emit(name)
104+ }
105+
106+ fun build (): DataTree <T > = FlatDataTree (type, map, updatesFlow, Name .EMPTY )
107+ }
108+
109+ private val emptySharedFlow = MutableSharedFlow <Nothing >()
110+
111+
112+ public fun <T > DataBuilder<T>.data (name : String , data : Data <T >) {
113+ data(name.parseAsName(), data)
114+ }
115+
116+ public inline fun <T , reified T1 : T > DataBuilder<T>.value (
117+ name : String ,
118+ value : T1 ,
119+ metaBuilder : MutableMeta .() -> Unit = {}
120+ ) {
121+ data(name, Data (value, Meta (metaBuilder)))
122+ }
123+
124+ public fun <T > DataBuilder<T>.node (prefix : Name , block : DataBuilder <T >.() -> Unit ) {
125+ val map = DataMapBuilder <T >().apply (block).map
126+ map.forEach { (name, data) ->
127+ data(prefix + name, data)
128+ }
129+ }
130+
131+ public fun <T > DataBuilder<T>.node (prefix : String , block : DataBuilder <T >.() -> Unit ): Unit =
132+ node(prefix.parseAsName(), block)
133+
134+ public fun <T > DataBuilder<T>.node (prefix : Name , tree : DataTree <T >) {
135+ tree.forEach { data ->
136+ data(prefix + data.name, data)
137+ }
138+ }
139+
140+ /* *
141+ * Write current state of the [tree] into this builder. Does not propagate updates from it.
142+ */
143+ public fun <T > DataBuilder<T>.node (prefix : String , tree : DataTree <T >): Unit = node(prefix.parseAsName(), tree)
144+
145+ /* *
146+ * Write current state of the [tree] into this builder and propagate updates from it.
147+ */
148+ public fun <T > DynamicDataBuilder<T>.observeNode (prefix : Name , tree : DataTree <T >) {
149+ node(prefix, tree)
150+
151+ update {
152+ tree.updates.collect {
153+ write(prefix + it, tree[it])
154+ }
155+ }
156+ }
157+
158+ public fun <T > DynamicDataBuilder<T>.observeNode (prefix : String , tree : DataTree <T >): Unit =
159+ observeNode(prefix.parseAsName(), tree)
160+
161+ /* *
162+ * Create a static [DataTree] from a flat map
163+ */
164+ @UnsafeKType
165+ public fun <T > DataTree (type : KType , data : Map <Name , Data <T >>): DataTree <T > =
166+ DataTreeBuilder (type, data).build()
167+
168+ /* *
169+ * Create a dynamic [DataTree]
170+ */
171+ public fun <T > DataTree.Companion.dynamic (
172+ type : KType ,
173+ scope : CoroutineScope ,
174+ block : DynamicDataBuilder <T >.() -> Unit
175+ ): DataTree <T > {
176+
177+ val initialData = mutableMapOf<Name , Data <T >>()
178+ val updaters = mutableListOf< suspend DataSink <T >.() -> Unit > ()
179+
180+ val dynamicDataBuilder = object : DynamicDataBuilder <T > {
181+ override fun update (block : suspend DataSink <T >.() -> Unit ) {
182+ updaters.add(block)
183+ }
184+
185+ override fun data (name : Name , data : Data <T >) {
186+ initialData[name] = data
187+ }
188+
189+ }
190+
191+ dynamicDataBuilder.block()
192+
193+ return if (updaters.isEmpty()) {
194+ FlatDataTree (type, initialData, emptySharedFlow, Name .EMPTY )
195+ } else {
196+ DataTreeBuilder <T >(type, initialData).apply {
197+ updaters.forEach { updater ->
198+ scope.launch(GoalExecutionRestriction (GoalExecutionRestrictionPolicy .ERROR )) {
199+ updater()
200+ }
201+ }
202+ }.build()
203+ }
204+ }
205+
206+
207+ @OptIn(UnsafeKType ::class )
208+ public inline fun <reified T > DataTree.Companion.dynamic (
209+ scope : CoroutineScope ,
210+ noinline block : DynamicDataBuilder <T >.() -> Unit
211+ ): DataTree <T > = dynamic (typeOf<T >(), scope, block)
212+
213+ @OptIn(UnsafeKType ::class )
214+ public inline fun <reified T > DataTree (
215+ data : Map <Name , Data <T >>
216+ ): DataTree <T > = DataTree (typeOf<T >(), data)
217+
218+ /* *
219+ * Represent this flat data map as a [DataTree] without copying it
220+ */
221+ @UnsafeKType
222+ public fun <T > Map <Name , Data <T >>.asTree (type : KType ): DataTree <T > = FlatDataTree (
223+ dataType = type,
224+ dataSet = this ,
225+ sourceUpdates = emptySharedFlow,
226+ prefix = Name .EMPTY
227+ )
228+
229+ /* *
230+ * Represent this flat data map as a [DataTree] without copying it
231+ */
232+ @OptIn(UnsafeKType ::class )
233+ public inline fun <reified T > Map <Name , Data <T >>.asTree (): DataTree <T > = asTree(typeOf<T >())
234+
235+
236+ @UnsafeKType
237+ public fun <T > Sequence<NamedData<T>>.toTree (type : KType ): DataTree <T > = FlatDataTree (
238+ dataType = type,
239+ dataSet = associateBy { it.name },
240+ sourceUpdates = emptySharedFlow,
241+ prefix = Name .EMPTY
242+ )
243+
244+
245+ /* *
246+ * Collect a sequence of [NamedData] to a [DataTree]
247+ */
248+ @OptIn(UnsafeKType ::class )
249+ public inline fun <reified T > Sequence<NamedData<T>>.toTree (): DataTree <T > = toTree(typeOf<T >())
250+
251+
252+ @UnsafeKType
253+ public fun <T > DataTree.Companion.static (
254+ type : KType , block : DataBuilder <T >.() -> Unit
255+ ): DataTree <T > = DataMapBuilder <T >().apply (block).map.asTree(type)
256+
257+
258+ @OptIn(UnsafeKType ::class )
259+ public inline fun <reified T > DataTree.Companion.static (
260+ noinline block : DataBuilder <T >.() -> Unit
261+ ): DataTree <T > = static(typeOf<T >(), block)
0 commit comments