1 /*
<lambda>null2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package com.android.systemui.util.kotlin
18
19 import com.android.systemui.util.time.SystemClock
20 import com.android.systemui.util.time.SystemClockImpl
21 import java.util.concurrent.atomic.AtomicReference
22 import kotlin.math.max
23 import kotlinx.coroutines.CoroutineStart
24 import kotlinx.coroutines.Dispatchers
25 import kotlinx.coroutines.Job
26 import kotlinx.coroutines.coroutineScope
27 import kotlinx.coroutines.delay
28 import kotlinx.coroutines.flow.Flow
29 import kotlinx.coroutines.flow.MutableSharedFlow
30 import kotlinx.coroutines.flow.channelFlow
31 import kotlinx.coroutines.flow.distinctUntilChanged
32 import kotlinx.coroutines.flow.filter
33 import kotlinx.coroutines.flow.flow
34 import kotlinx.coroutines.flow.map
35 import kotlinx.coroutines.flow.onStart
36 import kotlinx.coroutines.launch
37
38 /**
39 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
40 * Note that the new Flow will not start emitting until it has received two emissions from the
41 * upstream Flow.
42 *
43 * Useful for code that needs to compare the current value to the previous value.
44 */
45 fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> = flow {
46 val noVal = Any()
47 var previousValue: Any? = noVal
48 collect { newVal ->
49 if (previousValue != noVal) {
50 @Suppress("UNCHECKED_CAST") emit(transform(previousValue as T, newVal))
51 }
52 previousValue = newVal
53 }
54 }
55
56 /**
57 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
58 * [initialValue] will be used as the "old" value for the first emission.
59 *
60 * Useful for code that needs to compare the current value to the previous value.
61 */
pairwiseBynull62 fun <S, T : S, R> Flow<T>.pairwiseBy(
63 initialValue: S,
64 transform: suspend (previousValue: S, newValue: T) -> R,
65 ): Flow<R> = pairwiseBy(getInitialValue = { initialValue }, transform)
66
67 /**
68 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
69 *
70 * The output of [getInitialValue] will be used as the "old" value for the first emission. As
71 * opposed to the initial value in the above [pairwiseBy], [getInitialValue] can do some work before
72 * returning the initial value.
73 *
74 * Useful for code that needs to compare the current value to the previous value.
75 */
pairwiseBynull76 fun <S, T : S, R> Flow<T>.pairwiseBy(
77 getInitialValue: suspend () -> S,
78 transform: suspend (previousValue: S, newValue: T) -> R,
79 ): Flow<R> = flow {
80 var previousValue: S = getInitialValue()
81 collect { newVal ->
82 emit(transform(previousValue, newVal))
83 previousValue = newVal
84 }
85 }
86
87 /**
88 * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new
89 * Flow will not start emitting until it has received two emissions from the upstream Flow.
90 *
91 * Useful for code that needs to compare the current value to the previous value.
92 */
pairwisenull93 fun <T> Flow<T>.pairwise(): Flow<WithPrev<T, T>> = pairwiseBy(::WithPrev)
94
95 /**
96 * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue] will
97 * be used as the "old" value for the first emission.
98 *
99 * Useful for code that needs to compare the current value to the previous value.
100 */
101 fun <S, T : S> Flow<T>.pairwise(initialValue: S): Flow<WithPrev<S, T>> =
102 pairwiseBy(initialValue, ::WithPrev)
103
104 /** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */
105 data class WithPrev<out S, out T : S>(val previousValue: S, val newValue: T)
106
107 /** Emits a [Unit] only when the number of downstream subscribers of this flow increases. */
108 fun <T> MutableSharedFlow<T>.onSubscriberAdded(): Flow<Unit> {
109 return subscriptionCount
110 .pairwise(initialValue = 0)
111 .filter { (previous, current) -> current > previous }
112 .map {}
113 }
114
115 /**
116 * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using
117 * [transform].
118 *
119 * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
120 * change event to be emitted that contains no removals, and all elements from that first [Set] as
121 * additions.
122 *
123 * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
124 * until a second [Set] has been emitted from the upstream [Flow].
125 */
setChangesBynull126 fun <T, R> Flow<Set<T>>.setChangesBy(
127 transform: suspend (removed: Set<T>, added: Set<T>) -> R,
128 emitFirstEvent: Boolean = true,
129 ): Flow<R> =
130 (if (emitFirstEvent) onStart { emit(emptySet()) } else this)
131 .distinctUntilChanged()
newnull132 .pairwiseBy { old: Set<T>, new: Set<T> ->
133 // If an element was present in the old set, but not the new one, then it was removed
134 val removed = old - new
135 // If an element is present in the new set, but on the old one, then it was added
136 val added = new - old
137 transform(removed, added)
138 }
139
140 /**
141 * Returns a new [Flow] that produces the [Set] changes between each emission from [this].
142 *
143 * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause a
144 * change event to be emitted that contains no removals, and all elements from that first [Set] as
145 * additions.
146 *
147 * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
148 * until a second [Set] has been emitted from the upstream [Flow].
149 */
setChangesnull150 fun <T> Flow<Set<T>>.setChanges(emitFirstEvent: Boolean = true): Flow<SetChanges<T>> =
151 setChangesBy(::SetChanges, emitFirstEvent)
152
153 /** Contains the difference in elements between two [Set]s. */
154 data class SetChanges<T>(
155 /** Elements that are present in the first [Set] but not in the second. */
156 val removed: Set<T>,
157 /** Elements that are present in the second [Set] but not in the first. */
158 val added: Set<T>,
159 )
160
161 /**
162 * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with
163 * the most recent emission from [other] using [transform].
164 *
165 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
166 */
167 fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow {
168 coroutineScope {
169 val noVal = Any()
170 val sampledRef = AtomicReference(noVal)
171 val job = launch(Dispatchers.Unconfined) { other.collect { sampledRef.set(it) } }
172 collect {
173 val sampled = sampledRef.get()
174 if (sampled != noVal) {
175 @Suppress("UNCHECKED_CAST") emit(transform(it, sampled as B))
176 }
177 }
178 job.cancel()
179 }
180 }
181
182 /**
183 * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted
184 * value from [other] instead.
185 *
186 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
187 */
anull188 fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
189
190 /**
191 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
192 * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
193 * during this period, only The latest value is emitted.
194 *
195 * Example:
196 * ```kotlin
197 * flow {
198 * emit(1) // t=0ms
199 * delay(90)
200 * emit(2) // t=90ms
201 * delay(90)
202 * emit(3) // t=180ms
203 * delay(1010)
204 * emit(4) // t=1190ms
205 * delay(1010)
206 * emit(5) // t=2200ms
207 * }.throttle(1000)
208 * ```
209 *
210 * produces the following emissions at the following times
211 *
212 * ```text
213 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
214 * ```
215 */
throttlenull216 fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock = SystemClockImpl()): Flow<T> =
217 channelFlow {
218 coroutineScope {
219 var previousEmitTimeMs = 0L
220 var delayJob: Job? = null
221 var sendJob: Job? = null
222 val outerScope = this
223
224 collect {
225 delayJob?.cancel()
226 sendJob?.join()
227 val currentTimeMs = clock.elapsedRealtime()
228 val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
229 val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
230 if (timeUntilNextEmit > 0L) {
231 // We create delayJob to allow cancellation during the delay period
232 delayJob = launch {
233 delay(timeUntilNextEmit)
234 sendJob =
235 outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
236 send(it)
237 previousEmitTimeMs = clock.elapsedRealtime()
238 }
239 }
240 } else {
241 send(it)
242 previousEmitTimeMs = currentTimeMs
243 }
244 }
245 }
246 }
247
combinenull248 inline fun <T1, T2, T3, T4, T5, T6, R> combine(
249 flow: Flow<T1>,
250 flow2: Flow<T2>,
251 flow3: Flow<T3>,
252 flow4: Flow<T4>,
253 flow5: Flow<T5>,
254 flow6: Flow<T6>,
255 crossinline transform: suspend (T1, T2, T3, T4, T5, T6) -> R
256 ): Flow<R> {
257 return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6) {
258 args: Array<*> ->
259 @Suppress("UNCHECKED_CAST")
260 transform(
261 args[0] as T1,
262 args[1] as T2,
263 args[2] as T3,
264 args[3] as T4,
265 args[4] as T5,
266 args[5] as T6,
267 )
268 }
269 }
270
combinenull271 inline fun <T1, T2, T3, T4, T5, T6, T7, R> combine(
272 flow: Flow<T1>,
273 flow2: Flow<T2>,
274 flow3: Flow<T3>,
275 flow4: Flow<T4>,
276 flow5: Flow<T5>,
277 flow6: Flow<T6>,
278 flow7: Flow<T7>,
279 crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7) -> R
280 ): Flow<R> {
281 return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7) {
282 args: Array<*> ->
283 @Suppress("UNCHECKED_CAST")
284 transform(
285 args[0] as T1,
286 args[1] as T2,
287 args[2] as T3,
288 args[3] as T4,
289 args[4] as T5,
290 args[5] as T6,
291 args[6] as T7
292 )
293 }
294 }
295
combinenull296 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, R> combine(
297 flow: Flow<T1>,
298 flow2: Flow<T2>,
299 flow3: Flow<T3>,
300 flow4: Flow<T4>,
301 flow5: Flow<T5>,
302 flow6: Flow<T6>,
303 flow7: Flow<T7>,
304 flow8: Flow<T8>,
305 crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8) -> R
306 ): Flow<R> {
307 return kotlinx.coroutines.flow.combine(flow, flow2, flow3, flow4, flow5, flow6, flow7, flow8) {
308 args: Array<*> ->
309 @Suppress("UNCHECKED_CAST")
310 transform(
311 args[0] as T1,
312 args[1] as T2,
313 args[2] as T3,
314 args[3] as T4,
315 args[4] as T5,
316 args[5] as T6,
317 args[6] as T7,
318 args[7] as T8
319 )
320 }
321 }
322
combinenull323 inline fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> combine(
324 flow: Flow<T1>,
325 flow2: Flow<T2>,
326 flow3: Flow<T3>,
327 flow4: Flow<T4>,
328 flow5: Flow<T5>,
329 flow6: Flow<T6>,
330 flow7: Flow<T7>,
331 flow8: Flow<T8>,
332 flow9: Flow<T9>,
333 crossinline transform: suspend (T1, T2, T3, T4, T5, T6, T7, T8, T9) -> R
334 ): Flow<R> {
335 return kotlinx.coroutines.flow.combine(
336 flow,
337 flow2,
338 flow3,
339 flow4,
340 flow5,
341 flow6,
342 flow7,
343 flow8,
344 flow9
345 ) { args: Array<*> ->
346 @Suppress("UNCHECKED_CAST")
347 transform(
348 args[0] as T1,
349 args[1] as T2,
350 args[2] as T3,
351 args[3] as T4,
352 args[4] as T5,
353 args[5] as T6,
354 args[6] as T7,
355 args[6] as T8,
356 args[6] as T9,
357 )
358 }
359 }
360
361 /**
362 * Returns a [Flow] that immediately emits [Unit] when started, then emits from the given upstream
363 * [Flow] as normal.
364 */
365 @Suppress("NOTHING_TO_INLINE")
<lambda>null366 inline fun Flow<Unit>.emitOnStart(): Flow<Unit> = onStart { emit(Unit) }
367