1 /* <lambda>null2 * Copyright 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.photopicker.data 18 19 import android.content.ContentResolver 20 import android.database.ContentObserver 21 import android.net.Uri 22 import android.util.Log 23 import androidx.paging.PagingSource 24 import com.android.photopicker.core.configuration.PhotopickerConfiguration 25 import com.android.photopicker.core.features.FeatureManager 26 import com.android.photopicker.core.user.UserStatus 27 import com.android.photopicker.data.model.CloudMediaProviderDetails 28 import com.android.photopicker.data.model.Group.Album 29 import com.android.photopicker.data.model.Media 30 import com.android.photopicker.data.model.MediaPageKey 31 import com.android.photopicker.data.model.MediaSource 32 import com.android.photopicker.data.model.Provider 33 import com.android.photopicker.data.paging.AlbumMediaPagingSource 34 import com.android.photopicker.data.paging.AlbumPagingSource 35 import com.android.photopicker.data.paging.MediaPagingSource 36 import com.android.photopicker.features.cloudmedia.CloudMediaFeature 37 import kotlinx.coroutines.CoroutineDispatcher 38 import kotlinx.coroutines.CoroutineScope 39 import kotlinx.coroutines.Job 40 import kotlinx.coroutines.channels.awaitClose 41 import kotlinx.coroutines.flow.Flow 42 import kotlinx.coroutines.flow.MutableStateFlow 43 import kotlinx.coroutines.flow.SharingStarted 44 import kotlinx.coroutines.flow.StateFlow 45 import kotlinx.coroutines.flow.callbackFlow 46 import kotlinx.coroutines.flow.map 47 import kotlinx.coroutines.flow.stateIn 48 import kotlinx.coroutines.flow.update 49 import kotlinx.coroutines.launch 50 import kotlinx.coroutines.runBlocking 51 import kotlinx.coroutines.sync.Mutex 52 import kotlinx.coroutines.sync.withLock 53 54 /** 55 * Provides data to the Photo Picker UI. The data comes from a [ContentProvider] called 56 * [MediaProvider]. 57 * 58 * Underlying data changes in [MediaProvider] are observed using [ContentObservers]. When a change 59 * in data is observed, the data is re-fetched from the [MediaProvider] process and the new data is 60 * emitted to the [StateFlows]-s. 61 * 62 * @param userStatus A [StateFlow] with the current active user's details. 63 * @param scope The [CoroutineScope] the data flows will be shared in. 64 * @param dispatcher A [CoroutineDispatcher] to run the coroutines in. 65 * @param notificationService An instance of [NotificationService] responsible to listen to data 66 * change notifications. 67 * @param mediaProviderClient An instance of [MediaProviderClient] responsible to get data from 68 * MediaProvider. 69 * @param config [StateFlow] that emits [PhotopickerConfiguration] changes. 70 */ 71 class DataServiceImpl( 72 private val userStatus: StateFlow<UserStatus>, 73 private val scope: CoroutineScope, 74 private val dispatcher: CoroutineDispatcher, 75 private val notificationService: NotificationService, 76 private val mediaProviderClient: MediaProviderClient, 77 private val config: StateFlow<PhotopickerConfiguration>, 78 private val featureManager: FeatureManager 79 ) : DataService { 80 private val _activeContentResolver = 81 MutableStateFlow<ContentResolver>(userStatus.value.activeContentResolver) 82 83 // Keep track of the photo grid media and album grid paging source so that we can invalidate 84 // them in case the underlying data changes. 85 private val mediaPagingSources: MutableList<MediaPagingSource> = mutableListOf() 86 private val albumPagingSources: MutableList<AlbumPagingSource> = mutableListOf() 87 88 // Keep track of the album grid media paging sources so that we can invalidate 89 // them in case the underlying data changes or re-use them if the user re-opens the same album 90 // again. If something drastically changes that would require a refresh of the data source 91 // cache, remove the paging source from the below map. If a paging source is found the in map, 92 // it is assumed that a refresh request was already sent to the data source once in the session 93 // and there is no need to send it again, even if the paging source is invalid. 94 private val albumMediaPagingSources: 95 MutableMap<String, MutableMap<String, AlbumMediaPagingSource>> = 96 mutableMapOf() 97 98 // An internal lock to allow thread-safe updates to the [MediaPagingSource] and 99 // [AlbumPagingSource]. 100 private val mediaPagingSourceMutex = Mutex() 101 102 // An internal lock to allow thread-safe updates to the [AlbumMediaPagingSource]. 103 private val albumMediaPagingSourceMutex = Mutex() 104 105 /** 106 * Callback flow that listens to changes in the available providers and emits updated list of 107 * available providers. 108 */ 109 private var availableProviderCallbackFlow: Flow<List<Provider>>? = null 110 111 /** 112 * Callback flow that listens to changes in media and emits a [Unit] when change is observed. 113 */ 114 private var mediaUpdateCallbackFlow: Flow<Unit>? = null 115 116 /** 117 * Callback flow that listens to changes in album media and emits a [Pair] of album authority 118 * and album id when change is observed. 119 */ 120 private var albumMediaUpdateCallbackFlow: Flow<Pair<String, String>>? = null 121 122 /** 123 * Saves the current job that collects the [availableProviderCallbackFlow]. Cancel this job when 124 * there is a change in the [_activeContentResolver] 125 */ 126 private var availableProviderCollectJob: Job? = null 127 128 /** 129 * Saves the current job that collects the [mediaUpdateCallbackFlow]. Cancel this job when there 130 * is a change in the [_activeContentResolver] 131 */ 132 private var mediaUpdateCollectJob: Job? = null 133 134 /** 135 * Saves the current job that collects the [albumMediaUpdateCallbackFlow]. Cancel this job when 136 * there is a change in the [_activeContentResolver] 137 */ 138 private var albumMediaUpdateCollectJob: Job? = null 139 140 /** 141 * Internal [StateFlow] that emits when the [availableProviderCallbackFlow] emits a new list of 142 * providers. The [availableProviderCallbackFlow] can change if the active user in a session has 143 * changed. 144 * 145 * The initial value of this flow is an empty list to avoid an IPC to fetch the actual value 146 * from Media Provider from the main thread. 147 */ 148 private val _availableProviders: MutableStateFlow<List<Provider>> = 149 MutableStateFlow(emptyList()) 150 151 /** 152 * Create an immutable state flow from the callback flow [_availableProviders]. The state flow 153 * helps retain and provide immediate access to the last emitted value. 154 * 155 * The producer block remains active for some time after the last observer stops collecting. 156 * This helps retain the flow through transient changes like activity recreation due to config 157 * changes. 158 * 159 * Note that [StateFlow] automatically filters out subsequent repetitions of the same value. 160 */ 161 override val availableProviders: StateFlow<List<Provider>> = 162 _availableProviders.stateIn( 163 scope, 164 SharingStarted.WhileSubscribed(FLOW_TIMEOUT_MILLI_SECONDS), 165 _availableProviders.value 166 ) 167 168 companion object { 169 const val FLOW_TIMEOUT_MILLI_SECONDS: Long = 5000 170 } 171 172 init { 173 scope.launch(dispatcher) { 174 availableProviders.collect { providers: List<Provider> -> 175 Log.d(DataService.TAG, "Available providers have changed to $providers.") 176 177 mediaPagingSourceMutex.withLock { 178 mediaPagingSources.forEach { mediaPagingSource -> 179 mediaPagingSource.invalidate() 180 } 181 albumPagingSources.forEach { albumPagingSource -> 182 albumPagingSource.invalidate() 183 } 184 185 mediaPagingSources.clear() 186 albumPagingSources.clear() 187 } 188 189 albumMediaPagingSourceMutex.withLock { 190 albumMediaPagingSources.values.forEach { albumMediaPagingSourceMap -> 191 albumMediaPagingSourceMap.values.forEach { albumMediaPagingSource -> 192 albumMediaPagingSource.invalidate() 193 } 194 } 195 albumMediaPagingSources.clear() 196 } 197 } 198 } 199 200 scope.launch(dispatcher) { 201 // Only observe the changes in the active content resolver 202 _activeContentResolver.collect { activeContentResolver: ContentResolver -> 203 Log.d(DataService.TAG, "Active content resolver has changed.") 204 205 // Stop collecting available providers from previously initialized callback flow. 206 availableProviderCollectJob?.cancel() 207 availableProviderCallbackFlow = initAvailableProvidersFlow(activeContentResolver) 208 209 availableProviderCollectJob = 210 scope.launch(dispatcher) { 211 availableProviderCallbackFlow?.collect { providers: List<Provider> -> 212 Log.d( 213 DataService.TAG, 214 "Available providers update notification received $providers" 215 ) 216 217 var updatedProviders: List<Provider> = providers 218 if (!featureManager.isFeatureEnabled(CloudMediaFeature::class.java)) { 219 updatedProviders = 220 providers.filter { it.mediaSource != MediaSource.REMOTE } 221 Log.i( 222 DataService.TAG, 223 "Cloud media feature is not enabled, available providers are " + 224 "updated to $updatedProviders" 225 ) 226 } 227 228 // Send refresh media request to Photo Picker. 229 // TODO(b/340246010): This is required even when there is no change in 230 // the [availableProviders] state flow because PhotoPicker relies on the 231 // UI to trigger a sync when the cloud provider changes. Further, a 232 // successful sync enables cloud queries, which then updates the UI. 233 refreshMedia(updatedProviders) 234 235 _availableProviders.update { updatedProviders } 236 } 237 } 238 239 // Stop collecting media updates from previously initialized callback flow. 240 mediaUpdateCollectJob?.cancel() 241 mediaUpdateCallbackFlow = initMediaUpdateFlow(activeContentResolver) 242 243 mediaUpdateCollectJob = 244 scope.launch(dispatcher) { 245 mediaUpdateCallbackFlow?.collect { 246 Log.d(DataService.TAG, "Media update notification received") 247 mediaPagingSourceMutex.withLock { 248 mediaPagingSources.forEach { mediaPagingSource -> 249 mediaPagingSource.invalidate() 250 } 251 } 252 } 253 } 254 255 // Stop collecting album media updates from previously initialized callback flow. 256 albumMediaUpdateCollectJob?.cancel() 257 albumMediaUpdateCallbackFlow = initAlbumMediaUpdateFlow(activeContentResolver) 258 259 albumMediaUpdateCollectJob = 260 scope.launch(dispatcher) { 261 albumMediaUpdateCallbackFlow?.collect { 262 (albumAuthority, albumId): Pair<String, String> -> 263 Log.d( 264 DataService.TAG, 265 "Album media update notification " + 266 "received for album authority $albumAuthority " + 267 "and album id $albumId" 268 ) 269 albumMediaPagingSourceMutex.withLock { 270 albumMediaPagingSources 271 .get(albumAuthority) 272 ?.get(albumId) 273 ?.invalidate() 274 } 275 } 276 } 277 } 278 } 279 280 scope.launch(dispatcher) { 281 userStatus.collect { userStatusValue: UserStatus -> 282 _activeContentResolver.update { userStatusValue.activeContentResolver } 283 } 284 } 285 } 286 287 /** 288 * Creates a callback flow that listens to changes in the available providers using 289 * [ContentObserver] and emits updated list of available providers. 290 */ 291 private fun initAvailableProvidersFlow(resolver: ContentResolver): Flow<List<Provider>> = 292 callbackFlow<Unit> { 293 // Define a callback that tries sending a [Unit] in the [Channel]. 294 val observer = 295 object : ContentObserver(/* handler */ null) { 296 override fun onChange(selfChange: Boolean, uri: Uri?) { 297 trySend(Unit) 298 } 299 } 300 301 // Register the content observer callback. 302 notificationService.registerContentObserverCallback( 303 resolver, 304 AVAILABLE_PROVIDERS_CHANGE_NOTIFICATION_URI, 305 /* notifyForDescendants */ true, 306 observer 307 ) 308 309 // Trigger the first fetch of available providers. 310 trySend(Unit) 311 312 // Unregister when the flow is closed. 313 awaitClose { 314 notificationService.unregisterContentObserverCallback(resolver, observer) 315 } 316 } 317 .map { 318 // Fetch the available providers again when a change is detected. 319 mediaProviderClient.fetchAvailableProviders(resolver) 320 } 321 322 /** 323 * Creates a callback flow that emits a [Unit] when an update in media is observed using 324 * [ContentObserver] notifications. 325 */ 326 private fun initMediaUpdateFlow(resolver: ContentResolver): Flow<Unit> = 327 callbackFlow<Unit> { 328 val observer = 329 object : ContentObserver(/* handler */ null) { 330 override fun onChange(selfChange: Boolean, uri: Uri?) { 331 trySend(Unit) 332 } 333 } 334 335 // Register the content observer callback. 336 notificationService.registerContentObserverCallback( 337 resolver, 338 MEDIA_CHANGE_NOTIFICATION_URI, 339 /* notifyForDescendants */ true, 340 observer 341 ) 342 343 // Unregister when the flow is closed. 344 awaitClose { notificationService.unregisterContentObserverCallback(resolver, observer) } 345 } 346 347 /** 348 * Creates a callback flow that emits the album ID when an update in the album's media is 349 * observed using [ContentObserver] notifications. 350 */ 351 private fun initAlbumMediaUpdateFlow(resolver: ContentResolver): Flow<Pair<String, String>> = 352 callbackFlow { 353 val observer = 354 object : ContentObserver(/* handler */ null) { 355 override fun onChange(selfChange: Boolean, uri: Uri?) { 356 // Verify that album authority and album ID is present in the URI 357 if ( 358 uri?.pathSegments?.size == 359 (2 + ALBUM_CHANGE_NOTIFICATION_URI.pathSegments.size) 360 ) { 361 val albumAuthority = uri.pathSegments[uri.pathSegments.size - 2] ?: "" 362 val albumID = uri.pathSegments[uri.pathSegments.size - 1] ?: "" 363 trySend(Pair(albumAuthority, albumID)) 364 } 365 } 366 } 367 368 // Register the content observer callback. 369 notificationService.registerContentObserverCallback( 370 resolver, 371 ALBUM_CHANGE_NOTIFICATION_URI, 372 /* notifyForDescendants */ true, 373 observer 374 ) 375 376 // Unregister when the flow is closed. 377 awaitClose { notificationService.unregisterContentObserverCallback(resolver, observer) } 378 } 379 380 override fun albumMediaPagingSource(album: Album): PagingSource<MediaPageKey, Media> = 381 runBlocking { 382 refreshAlbumMedia(album) 383 384 albumMediaPagingSourceMutex.withLock { 385 val albumMap = albumMediaPagingSources.getOrDefault(album.authority, mutableMapOf()) 386 387 if (!albumMap.containsKey(album.id) || albumMap[album.id]!!.invalid) { 388 val availableProviders: List<Provider> = availableProviders.value 389 val contentResolver: ContentResolver = _activeContentResolver.value 390 val albumMediaPagingSource = 391 AlbumMediaPagingSource( 392 album.id, 393 album.authority, 394 contentResolver, 395 availableProviders, 396 mediaProviderClient, 397 dispatcher, 398 config.value.intent, 399 ) 400 401 Log.v( 402 DataService.TAG, 403 "Created an album media paging source that queries " + "$availableProviders" 404 ) 405 406 albumMap[album.id] = albumMediaPagingSource 407 albumMediaPagingSources[album.authority] = albumMap 408 } 409 410 albumMap[album.id]!! 411 } 412 } 413 414 override fun albumPagingSource(): PagingSource<MediaPageKey, Album> = runBlocking { 415 mediaPagingSourceMutex.withLock { 416 val availableProviders: List<Provider> = availableProviders.value 417 val contentResolver: ContentResolver = _activeContentResolver.value 418 val albumPagingSource = 419 AlbumPagingSource( 420 contentResolver, 421 availableProviders, 422 mediaProviderClient, 423 dispatcher, 424 config.value.intent, 425 ) 426 427 Log.v( 428 DataService.TAG, 429 "Created an album paging source that queries " + "$availableProviders" 430 ) 431 432 albumPagingSources.add(albumPagingSource) 433 albumPagingSource 434 } 435 } 436 437 override fun cloudMediaProviderDetails( 438 authority: String 439 ): StateFlow<CloudMediaProviderDetails?> = 440 throw NotImplementedError("This method is not implemented yet.") 441 442 override fun mediaPagingSource(): PagingSource<MediaPageKey, Media> = runBlocking { 443 mediaPagingSourceMutex.withLock { 444 val availableProviders: List<Provider> = availableProviders.value 445 val contentResolver: ContentResolver = _activeContentResolver.value 446 val mediaPagingSource = 447 MediaPagingSource( 448 contentResolver, 449 availableProviders, 450 mediaProviderClient, 451 dispatcher, 452 config.value.intent, 453 ) 454 455 Log.v(DataService.TAG, "Created a media paging source that queries $availableProviders") 456 457 mediaPagingSources.add(mediaPagingSource) 458 mediaPagingSource 459 } 460 } 461 462 override suspend fun refreshMedia() { 463 val availableProviders: List<Provider> = availableProviders.value 464 refreshMedia(availableProviders) 465 } 466 467 override suspend fun refreshAlbumMedia(album: Album) { 468 albumMediaPagingSourceMutex.withLock { 469 // Send album media refresh request only when the album media paging source is not 470 // already cached. 471 if ( 472 albumMediaPagingSources.containsKey(album.authority) && 473 albumMediaPagingSources[album.authority]!!.containsKey(album.id) 474 ) { 475 Log.i( 476 DataService.TAG, 477 "A media paging source is available for " + 478 "album ${album.id}. Not sending a refresh album media request." 479 ) 480 return 481 } 482 } 483 484 val providers = availableProviders.value 485 val isAlbumProviderAvailable = 486 providers.any { provider -> provider.authority == album.authority } 487 488 if (isAlbumProviderAvailable) { 489 mediaProviderClient.refreshAlbumMedia( 490 album.id, 491 album.authority, 492 providers, 493 _activeContentResolver.value, 494 config.value.intent 495 ) 496 } else { 497 Log.e( 498 DataService.TAG, 499 "Available providers $providers " + 500 "does not contain album authority ${album.authority}. " + 501 "Skip sending refresh album media request." 502 ) 503 } 504 } 505 506 private fun refreshMedia(availableProviders: List<Provider>) { 507 if (availableProviders.isNotEmpty()) { 508 mediaProviderClient.refreshMedia( 509 availableProviders, 510 _activeContentResolver.value, 511 config.value.intent, 512 ) 513 } else { 514 Log.w(DataService.TAG, "Cannot refresh media when there are no providers available") 515 } 516 } 517 } 518