1 /*
<lambda>null2  * Copyright 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.photopicker.data
18 
19 import android.content.ContentResolver
20 import android.database.ContentObserver
21 import android.net.Uri
22 import android.util.Log
23 import androidx.paging.PagingSource
24 import com.android.photopicker.core.configuration.PhotopickerConfiguration
25 import com.android.photopicker.core.features.FeatureManager
26 import com.android.photopicker.core.user.UserStatus
27 import com.android.photopicker.data.model.CloudMediaProviderDetails
28 import com.android.photopicker.data.model.Group.Album
29 import com.android.photopicker.data.model.Media
30 import com.android.photopicker.data.model.MediaPageKey
31 import com.android.photopicker.data.model.MediaSource
32 import com.android.photopicker.data.model.Provider
33 import com.android.photopicker.data.paging.AlbumMediaPagingSource
34 import com.android.photopicker.data.paging.AlbumPagingSource
35 import com.android.photopicker.data.paging.MediaPagingSource
36 import com.android.photopicker.features.cloudmedia.CloudMediaFeature
37 import kotlinx.coroutines.CoroutineDispatcher
38 import kotlinx.coroutines.CoroutineScope
39 import kotlinx.coroutines.Job
40 import kotlinx.coroutines.channels.awaitClose
41 import kotlinx.coroutines.flow.Flow
42 import kotlinx.coroutines.flow.MutableStateFlow
43 import kotlinx.coroutines.flow.SharingStarted
44 import kotlinx.coroutines.flow.StateFlow
45 import kotlinx.coroutines.flow.callbackFlow
46 import kotlinx.coroutines.flow.map
47 import kotlinx.coroutines.flow.stateIn
48 import kotlinx.coroutines.flow.update
49 import kotlinx.coroutines.launch
50 import kotlinx.coroutines.runBlocking
51 import kotlinx.coroutines.sync.Mutex
52 import kotlinx.coroutines.sync.withLock
53 
54 /**
55  * Provides data to the Photo Picker UI. The data comes from a [ContentProvider] called
56  * [MediaProvider].
57  *
58  * Underlying data changes in [MediaProvider] are observed using [ContentObservers]. When a change
59  * in data is observed, the data is re-fetched from the [MediaProvider] process and the new data is
60  * emitted to the [StateFlows]-s.
61  *
62  * @param userStatus A [StateFlow] with the current active user's details.
63  * @param scope The [CoroutineScope] the data flows will be shared in.
64  * @param dispatcher A [CoroutineDispatcher] to run the coroutines in.
65  * @param notificationService An instance of [NotificationService] responsible to listen to data
66  *   change notifications.
67  * @param mediaProviderClient An instance of [MediaProviderClient] responsible to get data from
68  *   MediaProvider.
69  * @param config [StateFlow] that emits [PhotopickerConfiguration] changes.
70  */
71 class DataServiceImpl(
72     private val userStatus: StateFlow<UserStatus>,
73     private val scope: CoroutineScope,
74     private val dispatcher: CoroutineDispatcher,
75     private val notificationService: NotificationService,
76     private val mediaProviderClient: MediaProviderClient,
77     private val config: StateFlow<PhotopickerConfiguration>,
78     private val featureManager: FeatureManager
79 ) : DataService {
80     private val _activeContentResolver =
81         MutableStateFlow<ContentResolver>(userStatus.value.activeContentResolver)
82 
83     // Keep track of the photo grid media and album grid paging source so that we can invalidate
84     // them in case the underlying data changes.
85     private val mediaPagingSources: MutableList<MediaPagingSource> = mutableListOf()
86     private val albumPagingSources: MutableList<AlbumPagingSource> = mutableListOf()
87 
88     // Keep track of the album grid media paging sources so that we can invalidate
89     // them in case the underlying data changes or re-use them if the user re-opens the same album
90     // again. If something drastically changes that would require a refresh of the data source
91     // cache, remove the paging source from the below map. If a paging source is found the in map,
92     // it is assumed that a refresh request was already sent to the data source once in the session
93     // and there is no need to send it again, even if the paging source is invalid.
94     private val albumMediaPagingSources:
95         MutableMap<String, MutableMap<String, AlbumMediaPagingSource>> =
96         mutableMapOf()
97 
98     // An internal lock to allow thread-safe updates to the [MediaPagingSource] and
99     // [AlbumPagingSource].
100     private val mediaPagingSourceMutex = Mutex()
101 
102     // An internal lock to allow thread-safe updates to the [AlbumMediaPagingSource].
103     private val albumMediaPagingSourceMutex = Mutex()
104 
105     /**
106      * Callback flow that listens to changes in the available providers and emits updated list of
107      * available providers.
108      */
109     private var availableProviderCallbackFlow: Flow<List<Provider>>? = null
110 
111     /**
112      * Callback flow that listens to changes in media and emits a [Unit] when change is observed.
113      */
114     private var mediaUpdateCallbackFlow: Flow<Unit>? = null
115 
116     /**
117      * Callback flow that listens to changes in album media and emits a [Pair] of album authority
118      * and album id when change is observed.
119      */
120     private var albumMediaUpdateCallbackFlow: Flow<Pair<String, String>>? = null
121 
122     /**
123      * Saves the current job that collects the [availableProviderCallbackFlow]. Cancel this job when
124      * there is a change in the [_activeContentResolver]
125      */
126     private var availableProviderCollectJob: Job? = null
127 
128     /**
129      * Saves the current job that collects the [mediaUpdateCallbackFlow]. Cancel this job when there
130      * is a change in the [_activeContentResolver]
131      */
132     private var mediaUpdateCollectJob: Job? = null
133 
134     /**
135      * Saves the current job that collects the [albumMediaUpdateCallbackFlow]. Cancel this job when
136      * there is a change in the [_activeContentResolver]
137      */
138     private var albumMediaUpdateCollectJob: Job? = null
139 
140     /**
141      * Internal [StateFlow] that emits when the [availableProviderCallbackFlow] emits a new list of
142      * providers. The [availableProviderCallbackFlow] can change if the active user in a session has
143      * changed.
144      *
145      * The initial value of this flow is an empty list to avoid an IPC to fetch the actual value
146      * from Media Provider from the main thread.
147      */
148     private val _availableProviders: MutableStateFlow<List<Provider>> =
149         MutableStateFlow(emptyList())
150 
151     /**
152      * Create an immutable state flow from the callback flow [_availableProviders]. The state flow
153      * helps retain and provide immediate access to the last emitted value.
154      *
155      * The producer block remains active for some time after the last observer stops collecting.
156      * This helps retain the flow through transient changes like activity recreation due to config
157      * changes.
158      *
159      * Note that [StateFlow] automatically filters out subsequent repetitions of the same value.
160      */
161     override val availableProviders: StateFlow<List<Provider>> =
162         _availableProviders.stateIn(
163             scope,
164             SharingStarted.WhileSubscribed(FLOW_TIMEOUT_MILLI_SECONDS),
165             _availableProviders.value
166         )
167 
168     companion object {
169         const val FLOW_TIMEOUT_MILLI_SECONDS: Long = 5000
170     }
171 
172     init {
173         scope.launch(dispatcher) {
174             availableProviders.collect { providers: List<Provider> ->
175                 Log.d(DataService.TAG, "Available providers have changed to $providers.")
176 
177                 mediaPagingSourceMutex.withLock {
178                     mediaPagingSources.forEach { mediaPagingSource ->
179                         mediaPagingSource.invalidate()
180                     }
181                     albumPagingSources.forEach { albumPagingSource ->
182                         albumPagingSource.invalidate()
183                     }
184 
185                     mediaPagingSources.clear()
186                     albumPagingSources.clear()
187                 }
188 
189                 albumMediaPagingSourceMutex.withLock {
190                     albumMediaPagingSources.values.forEach { albumMediaPagingSourceMap ->
191                         albumMediaPagingSourceMap.values.forEach { albumMediaPagingSource ->
192                             albumMediaPagingSource.invalidate()
193                         }
194                     }
195                     albumMediaPagingSources.clear()
196                 }
197             }
198         }
199 
200         scope.launch(dispatcher) {
201             // Only observe the changes in the active content resolver
202             _activeContentResolver.collect { activeContentResolver: ContentResolver ->
203                 Log.d(DataService.TAG, "Active content resolver has changed.")
204 
205                 // Stop collecting available providers from previously initialized callback flow.
206                 availableProviderCollectJob?.cancel()
207                 availableProviderCallbackFlow = initAvailableProvidersFlow(activeContentResolver)
208 
209                 availableProviderCollectJob =
210                     scope.launch(dispatcher) {
211                         availableProviderCallbackFlow?.collect { providers: List<Provider> ->
212                             Log.d(
213                                 DataService.TAG,
214                                 "Available providers update notification received $providers"
215                             )
216 
217                             var updatedProviders: List<Provider> = providers
218                             if (!featureManager.isFeatureEnabled(CloudMediaFeature::class.java)) {
219                                 updatedProviders =
220                                     providers.filter { it.mediaSource != MediaSource.REMOTE }
221                                 Log.i(
222                                     DataService.TAG,
223                                     "Cloud media feature is not enabled, available providers are " +
224                                         "updated to  $updatedProviders"
225                                 )
226                             }
227 
228                             // Send refresh media request to Photo Picker.
229                             // TODO(b/340246010): This is required even when  there is no change in
230                             // the [availableProviders] state flow because PhotoPicker relies on the
231                             // UI to trigger a sync when the cloud provider changes. Further, a
232                             // successful sync enables cloud queries, which then updates the UI.
233                             refreshMedia(updatedProviders)
234 
235                             _availableProviders.update { updatedProviders }
236                         }
237                     }
238 
239                 // Stop collecting media updates from previously initialized callback flow.
240                 mediaUpdateCollectJob?.cancel()
241                 mediaUpdateCallbackFlow = initMediaUpdateFlow(activeContentResolver)
242 
243                 mediaUpdateCollectJob =
244                     scope.launch(dispatcher) {
245                         mediaUpdateCallbackFlow?.collect {
246                             Log.d(DataService.TAG, "Media update notification received")
247                             mediaPagingSourceMutex.withLock {
248                                 mediaPagingSources.forEach { mediaPagingSource ->
249                                     mediaPagingSource.invalidate()
250                                 }
251                             }
252                         }
253                     }
254 
255                 // Stop collecting album media updates from previously initialized callback flow.
256                 albumMediaUpdateCollectJob?.cancel()
257                 albumMediaUpdateCallbackFlow = initAlbumMediaUpdateFlow(activeContentResolver)
258 
259                 albumMediaUpdateCollectJob =
260                     scope.launch(dispatcher) {
261                         albumMediaUpdateCallbackFlow?.collect {
262                             (albumAuthority, albumId): Pair<String, String> ->
263                             Log.d(
264                                 DataService.TAG,
265                                 "Album media update notification " +
266                                     "received for album authority $albumAuthority " +
267                                     "and album id $albumId"
268                             )
269                             albumMediaPagingSourceMutex.withLock {
270                                 albumMediaPagingSources
271                                     .get(albumAuthority)
272                                     ?.get(albumId)
273                                     ?.invalidate()
274                             }
275                         }
276                     }
277             }
278         }
279 
280         scope.launch(dispatcher) {
281             userStatus.collect { userStatusValue: UserStatus ->
282                 _activeContentResolver.update { userStatusValue.activeContentResolver }
283             }
284         }
285     }
286 
287     /**
288      * Creates a callback flow that listens to changes in the available providers using
289      * [ContentObserver] and emits updated list of available providers.
290      */
291     private fun initAvailableProvidersFlow(resolver: ContentResolver): Flow<List<Provider>> =
292         callbackFlow<Unit> {
293                 // Define a callback that tries sending a [Unit] in the [Channel].
294                 val observer =
295                     object : ContentObserver(/* handler */ null) {
296                         override fun onChange(selfChange: Boolean, uri: Uri?) {
297                             trySend(Unit)
298                         }
299                     }
300 
301                 // Register the content observer callback.
302                 notificationService.registerContentObserverCallback(
303                     resolver,
304                     AVAILABLE_PROVIDERS_CHANGE_NOTIFICATION_URI,
305                     /* notifyForDescendants */ true,
306                     observer
307                 )
308 
309                 // Trigger the first fetch of available providers.
310                 trySend(Unit)
311 
312                 // Unregister when the flow is closed.
313                 awaitClose {
314                     notificationService.unregisterContentObserverCallback(resolver, observer)
315                 }
316             }
317             .map {
318                 // Fetch the available providers again when a change is detected.
319                 mediaProviderClient.fetchAvailableProviders(resolver)
320             }
321 
322     /**
323      * Creates a callback flow that emits a [Unit] when an update in media is observed using
324      * [ContentObserver] notifications.
325      */
326     private fun initMediaUpdateFlow(resolver: ContentResolver): Flow<Unit> =
327         callbackFlow<Unit> {
328             val observer =
329                 object : ContentObserver(/* handler */ null) {
330                     override fun onChange(selfChange: Boolean, uri: Uri?) {
331                         trySend(Unit)
332                     }
333                 }
334 
335             // Register the content observer callback.
336             notificationService.registerContentObserverCallback(
337                 resolver,
338                 MEDIA_CHANGE_NOTIFICATION_URI,
339                 /* notifyForDescendants */ true,
340                 observer
341             )
342 
343             // Unregister when the flow is closed.
344             awaitClose { notificationService.unregisterContentObserverCallback(resolver, observer) }
345         }
346 
347     /**
348      * Creates a callback flow that emits the album ID when an update in the album's media is
349      * observed using [ContentObserver] notifications.
350      */
351     private fun initAlbumMediaUpdateFlow(resolver: ContentResolver): Flow<Pair<String, String>> =
352         callbackFlow {
353             val observer =
354                 object : ContentObserver(/* handler */ null) {
355                     override fun onChange(selfChange: Boolean, uri: Uri?) {
356                         // Verify that album authority and album ID is present in the URI
357                         if (
358                             uri?.pathSegments?.size ==
359                                 (2 + ALBUM_CHANGE_NOTIFICATION_URI.pathSegments.size)
360                         ) {
361                             val albumAuthority = uri.pathSegments[uri.pathSegments.size - 2] ?: ""
362                             val albumID = uri.pathSegments[uri.pathSegments.size - 1] ?: ""
363                             trySend(Pair(albumAuthority, albumID))
364                         }
365                     }
366                 }
367 
368             // Register the content observer callback.
369             notificationService.registerContentObserverCallback(
370                 resolver,
371                 ALBUM_CHANGE_NOTIFICATION_URI,
372                 /* notifyForDescendants */ true,
373                 observer
374             )
375 
376             // Unregister when the flow is closed.
377             awaitClose { notificationService.unregisterContentObserverCallback(resolver, observer) }
378         }
379 
380     override fun albumMediaPagingSource(album: Album): PagingSource<MediaPageKey, Media> =
381         runBlocking {
382             refreshAlbumMedia(album)
383 
384             albumMediaPagingSourceMutex.withLock {
385                 val albumMap = albumMediaPagingSources.getOrDefault(album.authority, mutableMapOf())
386 
387                 if (!albumMap.containsKey(album.id) || albumMap[album.id]!!.invalid) {
388                     val availableProviders: List<Provider> = availableProviders.value
389                     val contentResolver: ContentResolver = _activeContentResolver.value
390                     val albumMediaPagingSource =
391                         AlbumMediaPagingSource(
392                             album.id,
393                             album.authority,
394                             contentResolver,
395                             availableProviders,
396                             mediaProviderClient,
397                             dispatcher,
398                             config.value.intent,
399                         )
400 
401                     Log.v(
402                         DataService.TAG,
403                         "Created an album media paging source that queries " + "$availableProviders"
404                     )
405 
406                     albumMap[album.id] = albumMediaPagingSource
407                     albumMediaPagingSources[album.authority] = albumMap
408                 }
409 
410                 albumMap[album.id]!!
411             }
412         }
413 
414     override fun albumPagingSource(): PagingSource<MediaPageKey, Album> = runBlocking {
415         mediaPagingSourceMutex.withLock {
416             val availableProviders: List<Provider> = availableProviders.value
417             val contentResolver: ContentResolver = _activeContentResolver.value
418             val albumPagingSource =
419                 AlbumPagingSource(
420                     contentResolver,
421                     availableProviders,
422                     mediaProviderClient,
423                     dispatcher,
424                     config.value.intent,
425                 )
426 
427             Log.v(
428                 DataService.TAG,
429                 "Created an album paging source that queries " + "$availableProviders"
430             )
431 
432             albumPagingSources.add(albumPagingSource)
433             albumPagingSource
434         }
435     }
436 
437     override fun cloudMediaProviderDetails(
438         authority: String
439     ): StateFlow<CloudMediaProviderDetails?> =
440         throw NotImplementedError("This method is not implemented yet.")
441 
442     override fun mediaPagingSource(): PagingSource<MediaPageKey, Media> = runBlocking {
443         mediaPagingSourceMutex.withLock {
444             val availableProviders: List<Provider> = availableProviders.value
445             val contentResolver: ContentResolver = _activeContentResolver.value
446             val mediaPagingSource =
447                 MediaPagingSource(
448                     contentResolver,
449                     availableProviders,
450                     mediaProviderClient,
451                     dispatcher,
452                     config.value.intent,
453                 )
454 
455             Log.v(DataService.TAG, "Created a media paging source that queries $availableProviders")
456 
457             mediaPagingSources.add(mediaPagingSource)
458             mediaPagingSource
459         }
460     }
461 
462     override suspend fun refreshMedia() {
463         val availableProviders: List<Provider> = availableProviders.value
464         refreshMedia(availableProviders)
465     }
466 
467     override suspend fun refreshAlbumMedia(album: Album) {
468         albumMediaPagingSourceMutex.withLock {
469             // Send album media refresh request only when the album media paging source is not
470             // already cached.
471             if (
472                 albumMediaPagingSources.containsKey(album.authority) &&
473                     albumMediaPagingSources[album.authority]!!.containsKey(album.id)
474             ) {
475                 Log.i(
476                     DataService.TAG,
477                     "A media paging source is available for " +
478                         "album ${album.id}. Not sending a refresh album media request."
479                 )
480                 return
481             }
482         }
483 
484         val providers = availableProviders.value
485         val isAlbumProviderAvailable =
486             providers.any { provider -> provider.authority == album.authority }
487 
488         if (isAlbumProviderAvailable) {
489             mediaProviderClient.refreshAlbumMedia(
490                 album.id,
491                 album.authority,
492                 providers,
493                 _activeContentResolver.value,
494                 config.value.intent
495             )
496         } else {
497             Log.e(
498                 DataService.TAG,
499                 "Available providers $providers " +
500                     "does not contain album authority ${album.authority}. " +
501                     "Skip sending refresh album media request."
502             )
503         }
504     }
505 
506     private fun refreshMedia(availableProviders: List<Provider>) {
507         if (availableProviders.isNotEmpty()) {
508             mediaProviderClient.refreshMedia(
509                 availableProviders,
510                 _activeContentResolver.value,
511                 config.value.intent,
512             )
513         } else {
514             Log.w(DataService.TAG, "Cannot refresh media when there are no providers available")
515         }
516     }
517 }
518