@@ -2,14 +2,29 @@ package org.akanework.gramophone
22
33import kotlinx.coroutines.Dispatchers
44import kotlinx.coroutines.delay
5+ import kotlinx.coroutines.flow.Flow
56import kotlinx.coroutines.flow.MutableStateFlow
7+ import kotlinx.coroutines.flow.collect
68import kotlinx.coroutines.flow.flow
9+ import kotlinx.coroutines.flow.flowOf
10+ import kotlinx.coroutines.flow.map
11+ import kotlinx.coroutines.flow.onEach
12+ import kotlinx.coroutines.flow.toCollection
713import kotlinx.coroutines.launch
814import kotlinx.coroutines.runBlocking
915import org.akanework.gramophone.logic.utils.flows.IncrementalList
16+ import org.akanework.gramophone.logic.utils.flows.IncrementalMap
1017import org.akanework.gramophone.logic.utils.flows.PauseManager
1118import org.akanework.gramophone.logic.utils.flows.conflateAndBlockWhenPaused
19+ import org.akanework.gramophone.logic.utils.flows.filterIncremental
20+ import org.akanework.gramophone.logic.utils.flows.flatMapIncremental
21+ import org.akanework.gramophone.logic.utils.flows.flattenIncremental
22+ import org.akanework.gramophone.logic.utils.flows.forKey
23+ import org.akanework.gramophone.logic.utils.flows.groupByIncremental
24+ import org.akanework.gramophone.logic.utils.flows.mapIncremental
1225import org.junit.Assert.assertEquals
26+ import org.junit.Assert.assertFalse
27+ import org.junit.Assert.assertTrue
1328import org.junit.Test
1429
1530class PauseableFlowsTest {
@@ -60,9 +75,201 @@ class PauseableFlowsTest {
6075
6176 @Test
6277 fun incrementalFlows () {
63- val source = flow {
64- emit(IncrementalList .Begin (listOf (1 , 2 , 3 )))
65- emit(IncrementalList .Insert (1 , 2 , listOf (1 , 999 , 2 , 3 )))
78+ runBlocking {
79+ var countEmitted = 0
80+ var countMapped = 0
81+ var countFiltered = 0
82+ val source = flow {
83+ emit(IncrementalList .Begin (listOf (1 , 2 , 3 )))
84+ emit(IncrementalList .Insert (1 , 2 , listOf (1 , 15 , 10 , 2 , 3 )))
85+ emit(IncrementalList .Insert (1 , 1 , listOf (1 , 999 , 15 , 10 , 2 , 3 )))
86+ emit(IncrementalList .Move (1 , 1 , 2 , listOf (1 , 15 , 999 , 10 , 2 , 3 )))
87+ emit(IncrementalList .Move (1 , 1 , 5 , listOf (1 , 999 , 10 , 2 , 3 , 15 )))
88+ emit(IncrementalList .Move (2 , 3 , 0 , listOf (10 , 2 , 3 , 1 , 999 , 15 )))
89+ emit(IncrementalList .Remove (1 , 1 , listOf (10 , 3 , 1 , 999 , 15 )))
90+ emit(IncrementalList .Update (1 , 1 , listOf (10 , 5 , 1 , 999 , 15 )))
91+ }
92+ .assertContractNotViolated(" init" )
93+ .onEach { countEmitted++ }
94+ .mapIncremental { it + 1 }
95+ .assertContractNotViolated(" after map" )
96+ .onEach { countMapped++ }
97+ .filterIncremental { it < 100 }
98+ .assertContractNotViolated(" after filter" )
99+ .onEach { countFiltered++ }
100+ .flatMapIncremental { if (it % 2 == 0 ) listOf (it, it) else emptyList() }
101+ .assertContractNotViolated(" after flatMap" )
102+ val out = ArrayList <IncrementalList <Int >>()
103+ source.toCollection(out )
104+ assertEquals(8 , countEmitted)
105+ assertEquals(8 , countMapped)
106+ assertEquals(6 , countFiltered)
107+ assertEquals(5 , out .size)
108+ assertTrue(out [0 ] is IncrementalList .Begin )
109+ assertTrue(out [1 ] is IncrementalList .Insert )
110+ assertTrue(out [2 ] is IncrementalList .Move )
111+ assertTrue(out [3 ] is IncrementalList .Move )
112+ assertTrue(out [4 ] is IncrementalList .Update )
113+ assertEquals(listOf (2 , 2 , 16 , 16 , 4 , 4 ), out [1 ].after)
114+ assertEquals(listOf (4 , 4 , 2 , 2 , 16 , 16 ), out [3 ].after)
115+ assertEquals(listOf (6 , 6 , 2 , 2 , 16 , 16 ), out [4 ].after)
116+ }
117+ }
118+
119+ @Test
120+ fun incrementalFlowsGroupBy () {
121+ runBlocking {
122+ var countEmitted = 0
123+ var countMapped = 0
124+ var countFiltered = 0
125+ val source = flow {
126+ emit(IncrementalList .Begin (listOf (1 , 2 , 3 )))
127+ emit(IncrementalList .Insert (1 , 2 , listOf (1 , 15 , 10 , 2 , 3 )))
128+ emit(IncrementalList .Insert (1 , 1 , listOf (1 , 999 , 15 , 10 , 2 , 3 )))
129+ emit(IncrementalList .Move (1 , 1 , 2 , listOf (1 , 15 , 999 , 10 , 2 , 3 )))
130+ emit(IncrementalList .Move (1 , 1 , 5 , listOf (1 , 999 , 10 , 2 , 3 , 15 )))
131+ emit(IncrementalList .Move (2 , 3 , 0 , listOf (10 , 2 , 3 , 1 , 999 , 15 )))
132+ emit(IncrementalList .Remove (1 , 1 , listOf (10 , 3 , 1 , 999 , 15 )))
133+ emit(IncrementalList .Update (1 , 1 , listOf (10 , 5 , 1 , 999 , 15 )))
134+ }
135+ .assertContractNotViolated(" init" )
136+ .onEach { countEmitted++ }
137+ .mapIncremental { it + 1 }
138+ .assertContractNotViolated(" after map" )
139+ .onEach { countMapped++ }
140+ .filterIncremental { it < 100 }
141+ .assertContractNotViolated(" after filter" )
142+ .onEach { countFiltered++ }
143+ .groupByIncremental { it % 2 }
144+ .assertContractNotViolated(" after groupBy" )
145+ .mapIncremental { a, b -> flowOf(b) }
146+ .assertContractNotViolated(" after mapIncremental" )
147+ .flattenIncremental()
148+ .assertContractNotViolated(" after flattenIncremental" )
149+ .forKey(1 )
150+ .map { it!! }
151+ .assertContractNotViolated(" after forKey" )
152+ source.collect()
153+ }
154+ }
155+
156+
157+ @Test
158+ fun incrementalFlowsGroupBy2 () {
159+ runBlocking {
160+ var countEmitted = 0
161+ var countMapped = 0
162+ var countFiltered = 0
163+ val source = flow {
164+ emit(IncrementalList .Begin (listOf (1 , 2 , 3 )))
165+ emit(IncrementalList .Insert (1 , 2 , listOf (1 , 15 , 10 , 2 , 3 )))
166+ emit(IncrementalList .Insert (1 , 1 , listOf (1 , 999 , 15 , 10 , 2 , 3 )))
167+ emit(IncrementalList .Move (1 , 1 , 2 , listOf (1 , 15 , 999 , 10 , 2 , 3 )))
168+ emit(IncrementalList .Move (1 , 1 , 5 , listOf (1 , 999 , 10 , 2 , 3 , 15 )))
169+ emit(IncrementalList .Move (2 , 3 , 0 , listOf (10 , 2 , 3 , 1 , 999 , 15 )))
170+ emit(IncrementalList .Remove (1 , 1 , listOf (10 , 3 , 1 , 999 , 15 )))
171+ emit(IncrementalList .Update (1 , 1 , listOf (10 , 5 , 1 , 999 , 15 )))
172+ }
173+ .assertContractNotViolated(" init" )
174+ .onEach { countEmitted++ }
175+ .mapIncremental { it + 1 }
176+ .assertContractNotViolated(" after map" )
177+ .onEach { countMapped++ }
178+ .filterIncremental { it < 100 }
179+ .assertContractNotViolated(" after filter" )
180+ .onEach { countFiltered++ }
181+ .groupByIncremental { it % 2 }
182+ .assertContractNotViolated(" after groupBy" )
183+ .forKey(1 )
184+ .map { it!! }
185+ .assertContractNotViolated(" after forKey" )
186+ source.collect()
187+ }
188+ }
189+
190+ fun <T > Flow<IncrementalList<T>>.assertContractNotViolated (tag : String ) = flow {
191+ var cache: IncrementalList <T >? = null
192+ collect {
193+ when {
194+ it is IncrementalList .Begin || cache == null -> {
195+ // nothing to check
196+ }
197+ it is IncrementalList .Insert -> {
198+ var new = ArrayList (cache!! .after)
199+ for (i in it.pos.. < it.pos+ it.count) {
200+ new.add(i, it.after[i])
201+ }
202+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
203+ }
204+ it is IncrementalList .Move -> {
205+ var new = ArrayList (cache!! .after)
206+ var removed = ArrayList <T >(it.count)
207+ repeat(it.count) { _ ->
208+ removed.add(new.removeAt(it.pos))
209+ }
210+ for (i in it.outPos.. < it.outPos+ it.count) {
211+ new.add(i, removed[i - it.outPos])
212+ }
213+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
214+ }
215+ it is IncrementalList .Remove -> {
216+ var new = ArrayList (cache!! .after)
217+ repeat(it.count) { _ ->
218+ new.removeAt(it.pos)
219+ }
220+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
221+ }
222+ it is IncrementalList .Update -> {
223+ var new = ArrayList (cache!! .after)
224+ for (i in it.pos.. < it.pos+ it.count) {
225+ new[i] = it.after[i]
226+ }
227+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
228+ }
229+ else -> throw IllegalArgumentException (" unknown command?" )
230+ }
231+ emit(it)
232+ cache = it
233+ }
234+ }
235+
236+ @JvmName(" assertContractNotViolatedMap" )
237+ fun <T , R > Flow <IncrementalMap <T , R >>.assertContractNotViolated (tag : String ) = flow {
238+ var cache: IncrementalMap <T , R >? = null
239+ collect {
240+ when {
241+ it is IncrementalMap .Begin || cache == null -> {
242+ // nothing to check
243+ }
244+ it is IncrementalMap .Insert -> {
245+ var new = HashMap (cache!! .after)
246+ assertFalse(new.contains(it.key))
247+ new[it.key] = it.after[it.key]
248+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
249+ }
250+ it is IncrementalMap .Move -> {
251+ var new = HashMap (cache!! .after)
252+ assertTrue(new.contains(it.key))
253+ assertFalse(new.contains(it.outKey))
254+ new[it.outKey] = new.remove(it.key)
255+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
256+ }
257+ it is IncrementalMap .Remove -> {
258+ var new = HashMap (cache!! .after)
259+ assertTrue(new.contains(it.key))
260+ new.remove(it.key)
261+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
262+ }
263+ it is IncrementalMap .Update -> {
264+ var new = HashMap (cache!! .after)
265+ assertTrue(" at \" $tag \" , processing op $it : expected key to exist" , new.contains(it.key))
266+ new[it.key] = it.after[it.key]
267+ assertEquals(" at \" $tag \" , expected match while processing op $it (old=${cache!! .after} )" , it.after, new)
268+ }
269+ else -> throw IllegalArgumentException (" unknown command?" )
270+ }
271+ emit(it)
272+ cache = it
66273 }
67274 }
68275}
0 commit comments