1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.adservices.service.customaudience; 18 19 import static android.adservices.common.AdServicesStatusUtils.STATUS_SUCCESS; 20 21 import static com.android.adservices.service.stats.AdServicesLoggerUtil.FIELD_UNSET; 22 23 import android.annotation.NonNull; 24 import android.content.Context; 25 26 import com.android.adservices.LoggerFactory; 27 import com.android.adservices.concurrency.AdServicesExecutors; 28 import com.android.adservices.data.adselection.AppInstallDao; 29 import com.android.adservices.data.adselection.SharedStorageDatabase; 30 import com.android.adservices.data.customaudience.CustomAudienceDao; 31 import com.android.adservices.data.customaudience.CustomAudienceDatabase; 32 import com.android.adservices.data.customaudience.DBCustomAudienceBackgroundFetchData; 33 import com.android.adservices.data.enrollment.EnrollmentDao; 34 import com.android.adservices.service.Flags; 35 import com.android.adservices.service.FlagsFactory; 36 import com.android.adservices.service.common.SingletonRunner; 37 import com.android.adservices.service.stats.AdServicesLoggerUtil; 38 import com.android.adservices.service.stats.BackgroundFetchExecutionLogger; 39 import com.android.adservices.service.stats.CustomAudienceLoggerFactory; 40 import com.android.internal.annotations.VisibleForTesting; 41 42 import com.google.common.collect.ImmutableList; 43 import com.google.common.collect.Lists; 44 import com.google.common.util.concurrent.ExecutionSequencer; 45 import com.google.common.util.concurrent.FluentFuture; 46 import com.google.common.util.concurrent.FutureCallback; 47 import com.google.common.util.concurrent.Futures; 48 import com.google.common.util.concurrent.ListenableFuture; 49 50 import java.time.Clock; 51 import java.time.Instant; 52 import java.util.ArrayList; 53 import java.util.List; 54 import java.util.Objects; 55 import java.util.concurrent.TimeUnit; 56 import java.util.function.Supplier; 57 58 /** Worker instance for updating custom audiences in the background. */ 59 public class BackgroundFetchWorker { 60 private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger(); 61 public static final String JOB_DESCRIPTION = "FLEDGE background fetch"; 62 private static final Object SINGLETON_LOCK = new Object(); 63 private static volatile BackgroundFetchWorker sBackgroundFetchWorker; 64 65 private final CustomAudienceDao mCustomAudienceDao; 66 private final Flags mFlags; 67 private final BackgroundFetchRunner mBackgroundFetchRunner; 68 private final Clock mClock; 69 private final CustomAudienceLoggerFactory mCustomAudienceLoggerFactory; 70 private final SingletonRunner<Void> mSingletonRunner = 71 new SingletonRunner<>(JOB_DESCRIPTION, this::doRun); 72 73 @VisibleForTesting BackgroundFetchWorker( @onNull CustomAudienceDao customAudienceDao, @NonNull Flags flags, @NonNull BackgroundFetchRunner backgroundFetchRunner, @NonNull Clock clock, @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory)74 protected BackgroundFetchWorker( 75 @NonNull CustomAudienceDao customAudienceDao, 76 @NonNull Flags flags, 77 @NonNull BackgroundFetchRunner backgroundFetchRunner, 78 @NonNull Clock clock, 79 @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory) { 80 Objects.requireNonNull(customAudienceDao); 81 Objects.requireNonNull(flags); 82 Objects.requireNonNull(backgroundFetchRunner); 83 Objects.requireNonNull(clock); 84 Objects.requireNonNull(customAudienceLoggerFactory); 85 mCustomAudienceDao = customAudienceDao; 86 mFlags = flags; 87 mBackgroundFetchRunner = backgroundFetchRunner; 88 mClock = clock; 89 mCustomAudienceLoggerFactory = customAudienceLoggerFactory; 90 } 91 92 /** 93 * Gets an instance of a {@link BackgroundFetchWorker}. 94 * 95 * <p>If an instance hasn't been initialized, a new singleton will be created and returned. 96 */ 97 @NonNull getInstance(@onNull Context context)98 public static BackgroundFetchWorker getInstance(@NonNull Context context) { 99 Objects.requireNonNull(context); 100 101 if (sBackgroundFetchWorker == null) { 102 synchronized (SINGLETON_LOCK) { 103 if (sBackgroundFetchWorker == null) { 104 CustomAudienceDao customAudienceDao = 105 CustomAudienceDatabase.getInstance(context).customAudienceDao(); 106 AppInstallDao appInstallDao = 107 SharedStorageDatabase.getInstance(context).appInstallDao(); 108 CustomAudienceLoggerFactory customAudienceLoggerFactory = 109 CustomAudienceLoggerFactory.getInstance(); 110 Flags flags = FlagsFactory.getFlags(); 111 sBackgroundFetchWorker = 112 new BackgroundFetchWorker( 113 customAudienceDao, 114 flags, 115 new BackgroundFetchRunner( 116 customAudienceDao, 117 appInstallDao, 118 context.getPackageManager(), 119 EnrollmentDao.getInstance(), 120 flags, 121 customAudienceLoggerFactory), 122 Clock.systemUTC(), 123 customAudienceLoggerFactory); 124 } 125 } 126 } 127 128 return sBackgroundFetchWorker; 129 } 130 131 /** 132 * Runs the background fetch job for FLEDGE, including garbage collection and updating custom 133 * audiences. 134 * 135 * @return A future to be used to check when the task has completed. 136 */ runBackgroundFetch()137 public FluentFuture<Void> runBackgroundFetch() { 138 sLogger.d("Starting %s", JOB_DESCRIPTION); 139 return mSingletonRunner.runSingleInstance(); 140 } 141 142 /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */ stopWork()143 public void stopWork() { 144 mSingletonRunner.stopWork(); 145 } 146 doRun(@onNull Supplier<Boolean> shouldStop)147 private FluentFuture<Void> doRun(@NonNull Supplier<Boolean> shouldStop) { 148 Instant jobStartTime = mClock.instant(); 149 BackgroundFetchExecutionLogger backgroundFetchExecutionLogger = 150 mCustomAudienceLoggerFactory.getBackgroundFetchExecutionLogger(); 151 FluentFuture<Void> run = 152 cleanupFledgeData(jobStartTime, backgroundFetchExecutionLogger) 153 .transform( 154 ignored -> getFetchDataList(shouldStop, jobStartTime), 155 AdServicesExecutors.getBackgroundExecutor()) 156 .transformAsync( 157 fetchDataList -> 158 updateData( 159 fetchDataList, 160 shouldStop, 161 jobStartTime, 162 backgroundFetchExecutionLogger), 163 AdServicesExecutors.getBackgroundExecutor()) 164 .withTimeout( 165 mFlags.getFledgeBackgroundFetchJobMaxRuntimeMs(), 166 TimeUnit.MILLISECONDS, 167 AdServicesExecutors.getScheduler()); 168 run.addCallback( 169 getCloseBackgroundFetchExecutionLoggerCallback(backgroundFetchExecutionLogger), 170 AdServicesExecutors.getBackgroundExecutor()); 171 172 return run; 173 } 174 updateData( @onNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime, @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)175 private ListenableFuture<Void> updateData( 176 @NonNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, 177 @NonNull Supplier<Boolean> shouldStop, 178 @NonNull Instant jobStartTime, 179 @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) { 180 if (fetchDataList.isEmpty()) { 181 sLogger.d("No custom audiences found to update"); 182 backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(0); 183 return FluentFuture.from(Futures.immediateVoidFuture()); 184 } 185 186 sLogger.d("Updating %d custom audiences", fetchDataList.size()); 187 backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(fetchDataList.size()); 188 // Divide the gathered CAs among worker threads 189 int numWorkers = 190 Math.min( 191 Math.max(1, Runtime.getRuntime().availableProcessors() - 2), 192 mFlags.getFledgeBackgroundFetchThreadPoolSize()); 193 int numCustomAudiencesPerWorker = 194 (fetchDataList.size() / numWorkers) 195 + (((fetchDataList.size() % numWorkers) == 0) ? 0 : 1); 196 197 List<ListenableFuture<?>> subListFutureUpdates = new ArrayList<>(); 198 for (final List<DBCustomAudienceBackgroundFetchData> fetchDataSubList : 199 Lists.partition(fetchDataList, numCustomAudiencesPerWorker)) { 200 if (shouldStop.get()) { 201 break; 202 } 203 // Updates in each batch are sequenced 204 ExecutionSequencer sequencer = ExecutionSequencer.create(); 205 for (DBCustomAudienceBackgroundFetchData fetchData : fetchDataSubList) { 206 subListFutureUpdates.add( 207 sequencer.submitAsync( 208 () -> 209 mBackgroundFetchRunner.updateCustomAudience( 210 jobStartTime, fetchData), 211 AdServicesExecutors.getBackgroundExecutor())); 212 } 213 } 214 215 return FluentFuture.from(Futures.allAsList(subListFutureUpdates)) 216 .transform(ignored -> null, AdServicesExecutors.getLightWeightExecutor()); 217 } 218 getFetchDataList( @onNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)219 private List<DBCustomAudienceBackgroundFetchData> getFetchDataList( 220 @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime) { 221 if (shouldStop.get()) { 222 sLogger.d("Stopping " + JOB_DESCRIPTION); 223 return ImmutableList.of(); 224 } 225 226 // Fetch stale/eligible/delinquent custom audiences 227 return mCustomAudienceDao.getActiveEligibleCustomAudienceBackgroundFetchData( 228 jobStartTime, mFlags.getFledgeBackgroundFetchMaxNumUpdated()); 229 } 230 cleanupFledgeData( Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)231 private FluentFuture<?> cleanupFledgeData( 232 Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) { 233 return FluentFuture.from( 234 AdServicesExecutors.getBackgroundExecutor() 235 .submit( 236 () -> { 237 // Start background fetch execution logger. 238 backgroundFetchExecutionLogger.start(); 239 backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs( 240 FIELD_UNSET); 241 // Clean up custom audiences first so the actual fetch won't do 242 // unnecessary work 243 mBackgroundFetchRunner.deleteExpiredCustomAudiences( 244 jobStartTime); 245 mBackgroundFetchRunner.deleteDisallowedOwnerCustomAudiences(); 246 mBackgroundFetchRunner.deleteDisallowedBuyerCustomAudiences(); 247 if (mFlags.getFledgeAppInstallFilteringEnabled()) { 248 mBackgroundFetchRunner 249 .deleteDisallowedPackageAppInstallEntries(); 250 } 251 })); 252 } 253 getCloseBackgroundFetchExecutionLoggerCallback( BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)254 private FutureCallback<Void> getCloseBackgroundFetchExecutionLoggerCallback( 255 BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) { 256 return new FutureCallback<>() { 257 @Override 258 public void onSuccess(Void result) { 259 closeBackgroundFetchExecutionLogger( 260 backgroundFetchExecutionLogger, 261 backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(), 262 STATUS_SUCCESS); 263 } 264 265 @Override 266 public void onFailure(Throwable t) { 267 sLogger.d(t, "Error in Custom Audience Background Fetch"); 268 int resultCode = AdServicesLoggerUtil.getResultCodeFromException(t); 269 closeBackgroundFetchExecutionLogger( 270 backgroundFetchExecutionLogger, 271 backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(), 272 resultCode); 273 } 274 }; 275 } 276 277 private void closeBackgroundFetchExecutionLogger( 278 BackgroundFetchExecutionLogger backgroundFetchExecutionLogger, 279 int numOfEligibleToUpdateCAs, 280 int resultCode) { 281 try { 282 backgroundFetchExecutionLogger.close(numOfEligibleToUpdateCAs, resultCode); 283 } catch (Exception e) { 284 sLogger.d( 285 "Error when closing backgroundFetchExecutionLogger, " 286 + "skipping metrics logging: %s", 287 e.getMessage()); 288 } 289 } 290 } 291