1 /*
<lambda>null2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.util.kotlin
18 
19 import com.android.systemui.util.time.SystemClock
20 import com.android.systemui.util.time.SystemClockImpl
21 import java.util.concurrent.atomic.AtomicReference
22 import kotlin.math.max
23 import kotlinx.coroutines.CoroutineStart
24 import kotlinx.coroutines.Dispatchers
25 import kotlinx.coroutines.Job
26 import kotlinx.coroutines.coroutineScope
27 import kotlinx.coroutines.delay
28 import kotlinx.coroutines.flow.Flow
29 import kotlinx.coroutines.flow.MutableSharedFlow
30 import kotlinx.coroutines.flow.channelFlow
31 import kotlinx.coroutines.flow.distinctUntilChanged
32 import kotlinx.coroutines.flow.filter
33 import kotlinx.coroutines.flow.flow
34 import kotlinx.coroutines.flow.map
35 import kotlinx.coroutines.flow.onStart
36 import kotlinx.coroutines.launch
37 
38 /**
39  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
40  * Note that the new Flow will not start emitting until it has received two emissions from the
41  * upstream Flow.
42  *
43  * Useful for code that needs to compare the current value to the previous value.
44  */
45 fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> = flow {
46     val noVal = Any()
47     var previousValue: Any? = noVal
48     collect { newVal ->
49         if (previousValue != noVal) {
50             @Suppress("UNCHECKED_CAST") emit(transform(previousValue as T, newVal))
51         }
52         previousValue = newVal
53     }
54 }
55 
56 /**
57  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
58  * [initialValue] will be used as the "old" value for the first emission.
59  *
60  * Useful for code that needs to compare the current value to the previous value.
61  */
pairwiseBynull62 fun <S, T : S, R> Flow<T>.pairwiseBy(
63     initialValue: S,
64     transform: suspend (previousValue: S, newValue: T) -> R,
65 ): Flow<R> = pairwiseBy(getInitialValue = { initialValue }, transform)
66 
67 /**
68  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
69  *
70  * The output of [getInitialValue] will be used as the "old" value for the first emission. As
71  * opposed to the initial value in the above [pairwiseBy], [getInitialValue] can do some work before
72  * returning the initial value.
73  *
74  * Useful for code that needs to compare the current value to the previous value.
75  */
pairwiseBynull76 fun <S, T : S, R> Flow<T>.pairwiseBy(
77     getInitialValue: suspend () -> S,
78     transform: suspend (previousValue: S, newValue: T) -> R,
79 ): Flow<R> = flow {
80     var previousValue: S = getInitialValue()
81     collect { newVal ->
82         emit(transform(previousValue, newVal))
83         previousValue = newVal
84     }
85 }
86 
87 /**
88  * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new
89  * Flow will not start emitting until it has received two emissions from the upstream Flow.
90  *
91  * Useful for code that needs to compare the current value to the previous value.
92  */
pairwisenull93 fun <T> Flow<T>.pairwise(): Flow<WithPrev<T, T>> = pairwiseBy(::WithPrev)
94 
95 /**
96  * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue] will
97  * be used as the "old" value for the first emission.
98  *
99  * Useful for code that needs to compare the current value to the previous value.
100  */
101 fun <S, T : S> Flow<T>.pairwise(initialValue: S): Flow<WithPrev<S, T>> =
102     pairwiseBy(initialValue, ::WithPrev)
103 
104 /** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */
105 data class WithPrev<out S, out T : S>(val previousValue: S, val newValue: T)
106 
107 /** Emits a [Unit] only when the number of downstream subscribers of this flow increases. */
108 fun <T> MutableSharedFlow<T>.onSubscriberAdded(): Flow<Unit> {
109     return subscriptionCount
110         .pairwise(initialValue = 0)
111         .filter { (previous, current) -> current > previous }
112         .map {}
113 }
114 
115 /**
116  * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using
117  * [transform].
118  *
119  * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
120  * change event to be emitted that contains no removals, and all elements from that first [Set] as
121  * additions.
122  *
123  * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
124  * until a second [Set] has been emitted from the upstream [Flow].
125  */
setChangesBynull126 fun <T, R> Flow<Set<T>>.setChangesBy(
127     transform: suspend (removed: Set<T>, added: Set<T>) -> R,
128     emitFirstEvent: Boolean = true,
129 ): Flow<R> =
130     (if (emitFirstEvent) onStart { emit(emptySet()) } else this)
131         .distinctUntilChanged()
newnull132         .pairwiseBy { old: Set<T>, new: Set<T> ->
133             // If an element was present in the old set, but not the new one, then it was removed
134             val removed = old - new
135             // If an element is present in the new set, but on the old one, then it was added
136             val added = new - old
137             transform(removed, added)
138         }
139 
140 /**
141  * Returns a new [Flow] that produces the [Set] changes between each emission from [this].
142  *
143  * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
144  * change event to be emitted that contains no removals, and all elements from that first [Set] as
145  * additions.
146  *
147  * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
148  * until a second [Set] has been emitted from the upstream [Flow].
149  */
setChangesnull150 fun <T> Flow<Set<T>>.setChanges(emitFirstEvent: Boolean = true): Flow<SetChanges<T>> =
151     setChangesBy(::SetChanges, emitFirstEvent)
152 
153 /** Contains the difference in elements between two [Set]s. */
154 data class SetChanges<T>(
155     /** Elements that are present in the first [Set] but not in the second. */
156     val removed: Set<T>,
157     /** Elements that are present in the second [Set] but not in the first. */
158     val added: Set<T>,
159 )
160 
161 /**
162  * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with
163  * the most recent emission from [other] using [transform].
164  *
165  * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
166  */
167 fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow {
168     coroutineScope {
169         val noVal = Any()
170         val sampledRef = AtomicReference(noVal)
171         val job = launch(Dispatchers.Unconfined) { other.collect { sampledRef.set(it) } }
172         collect {
173             val sampled = sampledRef.get()
174             if (sampled != noVal) {
175                 @Suppress("UNCHECKED_CAST") emit(transform(it, sampled as B))
176             }
177         }
178         job.cancel()
179     }
180 }
181 
182 /**
183  * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted
184  * value from [other] instead.
185  *
186  * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
187  */
anull188 fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
189 
190 /**
191  * Returns a flow that mirrors the original flow, but delays values following emitted values for the
192  * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
193  * during this period, only The latest value is emitted.
194  *
195  * Example:
196  * ```kotlin
197  * flow {
198  *     emit(1)     // t=0ms
199  *     delay(90)
200  *     emit(2)     // t=90ms
201  *     delay(90)
202  *     emit(3)     // t=180ms
203  *     delay(1010)
204  *     emit(4)     // t=1190ms
205  *     delay(1010)
206  *     emit(5)     // t=2200ms
207  * }.throttle(1000)
208  * ```
209  *
210  * produces the following emissions at the following times
211  *
212  * ```text
213  * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
214  * ```
215  */
throttlenull216 fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock = SystemClockImpl()): Flow<T> =
217     channelFlow {
218         coroutineScope {
219             var previousEmitTimeMs = 0L
220             var delayJob: Job? = null
221             var sendJob: Job? = null
222             val outerScope = this
223 
224             collect {
225                 delayJob?.cancel()
226                 sendJob?.join()
227                 val currentTimeMs = clock.elapsedRealtime()
228                 val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
229                 val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
230                 if (timeUntilNextEmit > 0L) {
231                     // We create delayJob to allow cancellation during the delay period
232                     delayJob = launch {
233                         delay(timeUntilNextEmit)
234                         sendJob =
235                             outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
236                                 send(it)
237                                 previousEmitTimeMs = clock.elapsedRealtime()
238                             }
239                     }
240                 } else {
241                     send(it)
242                     previousEmitTimeMs = currentTimeMs
243                 }
244             }
245         }
246     }
247 
combinenull248 inline fun <T1, T2, T3, T4, T5, T6, R> combine(
249         flow: Flow<T1>,
250         flow2: Flow<T2>,
251         flow3: Flow<T3>,
252         flow4: Flow<T4>,
253         flow5: Flow<T5>,
254         flow6: Flow<T6>,
255         crossinline transform: suspend (T1, T2, T3, T4, T5, T6) -> R
256 ): Flow<R> {
257     return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6) {
258         args: Array<*> ->
259         @Suppress("UNCHECKED_CAST")
260         transform(
261                 args[0] as T1,
262                 args[1] as T2,
263                 args[2] as T3,
264                 args[3] as T4,
265                 args[4] as T5,
266                 args[5] as T6,
267         )
268     }
269 }
270 
combinenull271 inline fun <T1, T2, T3, T4, T5, T6, T7, R> combine(
272     flow: Flow<T1>,
273     flow2: Flow<T2>,
274     flow3: Flow<T3>,
275     flow4: Flow<T4>,
276     flow5: Flow<T5>,
277     flow6: Flow<T6>,
278     flow7: Flow<T7>,
279     crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7) -> R
280 ): Flow<R> {
281     return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7) {
282         args: Array<*> ->
283         @Suppress("UNCHECKED_CAST")
284         transform(
285             args[0] as T1,
286             args[1] as T2,
287             args[2] as T3,
288             args[3] as T4,
289             args[4] as T5,
290             args[5] as T6,
291             args[6] as T7
292         )
293     }
294 }
295 
combinenull296 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, R> combine(
297     flow: Flow<T1>,
298     flow2: Flow<T2>,
299     flow3: Flow<T3>,
300     flow4: Flow<T4>,
301     flow5: Flow<T5>,
302     flow6: Flow<T6>,
303     flow7: Flow<T7>,
304     flow8: Flow<T8>,
305     crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8) -> R
306 ): Flow<R> {
307     return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7, flow8) {
308         args: Array<*> ->
309         @Suppress("UNCHECKED_CAST")
310         transform(
311             args[0] as T1,
312             args[1] as T2,
313             args[2] as T3,
314             args[3] as T4,
315             args[4] as T5,
316             args[5] as T6,
317             args[6] as T7,
318             args[7] as T8
319         )
320     }
321 }
322 
combinenull323 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> combine(
324     flow: Flow<T1>,
325     flow2: Flow<T2>,
326     flow3: Flow<T3>,
327     flow4: Flow<T4>,
328     flow5: Flow<T5>,
329     flow6: Flow<T6>,
330     flow7: Flow<T7>,
331     flow8: Flow<T8>,
332     flow9: Flow<T9>,
333     crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R
334 ): Flow<R> {
335     return kotlinx.coroutines.flow.combine(
336         flow,
337         flow2,
338         flow3,
339         flow4,
340         flow5,
341         flow6,
342         flow7,
343         flow8,
344         flow9
345     ) { args: Array<*> ->
346         @Suppress("UNCHECKED_CAST")
347         transform(
348             args[0] as T1,
349             args[1] as T2,
350             args[2] as T3,
351             args[3] as T4,
352             args[4] as T5,
353             args[5] as T6,
354             args[6] as T7,
355             args[6] as T8,
356             args[6] as T9,
357         )
358     }
359 }
360 
361 /**
362  * Returns a [Flow] that immediately emits [Unit] when started, then emits from the given upstream
363  * [Flow] as normal.
364  */
365 @Suppress("NOTHING_TO_INLINE")
<lambda>null366 inline fun Flow<Unit>.emitOnStart(): Flow<Unit> = onStart { emit(Unit) }
367