1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.adservices.service.customaudience;
18 
19 import static android.adservices.common.AdServicesStatusUtils.STATUS_SUCCESS;
20 
21 import static com.android.adservices.service.stats.AdServicesLoggerUtil.FIELD_UNSET;
22 
23 import android.annotation.NonNull;
24 import android.content.Context;
25 
26 import com.android.adservices.LoggerFactory;
27 import com.android.adservices.concurrency.AdServicesExecutors;
28 import com.android.adservices.data.adselection.AppInstallDao;
29 import com.android.adservices.data.adselection.SharedStorageDatabase;
30 import com.android.adservices.data.customaudience.CustomAudienceDao;
31 import com.android.adservices.data.customaudience.CustomAudienceDatabase;
32 import com.android.adservices.data.customaudience.DBCustomAudienceBackgroundFetchData;
33 import com.android.adservices.data.enrollment.EnrollmentDao;
34 import com.android.adservices.service.Flags;
35 import com.android.adservices.service.FlagsFactory;
36 import com.android.adservices.service.common.SingletonRunner;
37 import com.android.adservices.service.stats.AdServicesLoggerUtil;
38 import com.android.adservices.service.stats.BackgroundFetchExecutionLogger;
39 import com.android.adservices.service.stats.CustomAudienceLoggerFactory;
40 import com.android.internal.annotations.VisibleForTesting;
41 
42 import com.google.common.collect.ImmutableList;
43 import com.google.common.collect.Lists;
44 import com.google.common.util.concurrent.ExecutionSequencer;
45 import com.google.common.util.concurrent.FluentFuture;
46 import com.google.common.util.concurrent.FutureCallback;
47 import com.google.common.util.concurrent.Futures;
48 import com.google.common.util.concurrent.ListenableFuture;
49 
50 import java.time.Clock;
51 import java.time.Instant;
52 import java.util.ArrayList;
53 import java.util.List;
54 import java.util.Objects;
55 import java.util.concurrent.TimeUnit;
56 import java.util.function.Supplier;
57 
58 /** Worker instance for updating custom audiences in the background. */
59 public class BackgroundFetchWorker {
60     private static final LoggerFactory.Logger sLogger = LoggerFactory.getFledgeLogger();
61     public static final String JOB_DESCRIPTION = "FLEDGE background fetch";
62     private static final Object SINGLETON_LOCK = new Object();
63     private static volatile BackgroundFetchWorker sBackgroundFetchWorker;
64 
65     private final CustomAudienceDao mCustomAudienceDao;
66     private final Flags mFlags;
67     private final BackgroundFetchRunner mBackgroundFetchRunner;
68     private final Clock mClock;
69     private final CustomAudienceLoggerFactory mCustomAudienceLoggerFactory;
70     private final SingletonRunner<Void> mSingletonRunner =
71             new SingletonRunner<>(JOB_DESCRIPTION, this::doRun);
72 
73     @VisibleForTesting
BackgroundFetchWorker( @onNull CustomAudienceDao customAudienceDao, @NonNull Flags flags, @NonNull BackgroundFetchRunner backgroundFetchRunner, @NonNull Clock clock, @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory)74     protected BackgroundFetchWorker(
75             @NonNull CustomAudienceDao customAudienceDao,
76             @NonNull Flags flags,
77             @NonNull BackgroundFetchRunner backgroundFetchRunner,
78             @NonNull Clock clock,
79             @NonNull CustomAudienceLoggerFactory customAudienceLoggerFactory) {
80         Objects.requireNonNull(customAudienceDao);
81         Objects.requireNonNull(flags);
82         Objects.requireNonNull(backgroundFetchRunner);
83         Objects.requireNonNull(clock);
84         Objects.requireNonNull(customAudienceLoggerFactory);
85         mCustomAudienceDao = customAudienceDao;
86         mFlags = flags;
87         mBackgroundFetchRunner = backgroundFetchRunner;
88         mClock = clock;
89         mCustomAudienceLoggerFactory = customAudienceLoggerFactory;
90     }
91 
92     /**
93      * Gets an instance of a {@link BackgroundFetchWorker}.
94      *
95      * <p>If an instance hasn't been initialized, a new singleton will be created and returned.
96      */
97     @NonNull
getInstance(@onNull Context context)98     public static BackgroundFetchWorker getInstance(@NonNull Context context) {
99         Objects.requireNonNull(context);
100 
101         if (sBackgroundFetchWorker == null) {
102             synchronized (SINGLETON_LOCK) {
103                 if (sBackgroundFetchWorker == null) {
104                     CustomAudienceDao customAudienceDao =
105                             CustomAudienceDatabase.getInstance(context).customAudienceDao();
106                     AppInstallDao appInstallDao =
107                             SharedStorageDatabase.getInstance(context).appInstallDao();
108                     CustomAudienceLoggerFactory customAudienceLoggerFactory =
109                             CustomAudienceLoggerFactory.getInstance();
110                     Flags flags = FlagsFactory.getFlags();
111                     sBackgroundFetchWorker =
112                             new BackgroundFetchWorker(
113                                     customAudienceDao,
114                                     flags,
115                                     new BackgroundFetchRunner(
116                                             customAudienceDao,
117                                             appInstallDao,
118                                             context.getPackageManager(),
119                                             EnrollmentDao.getInstance(),
120                                             flags,
121                                             customAudienceLoggerFactory),
122                                     Clock.systemUTC(),
123                                     customAudienceLoggerFactory);
124                 }
125             }
126         }
127 
128         return sBackgroundFetchWorker;
129     }
130 
131     /**
132      * Runs the background fetch job for FLEDGE, including garbage collection and updating custom
133      * audiences.
134      *
135      * @return A future to be used to check when the task has completed.
136      */
runBackgroundFetch()137     public FluentFuture<Void> runBackgroundFetch() {
138         sLogger.d("Starting %s", JOB_DESCRIPTION);
139         return mSingletonRunner.runSingleInstance();
140     }
141 
142     /** Requests that any ongoing work be stopped gracefully and waits for work to be stopped. */
stopWork()143     public void stopWork() {
144         mSingletonRunner.stopWork();
145     }
146 
doRun(@onNull Supplier<Boolean> shouldStop)147     private FluentFuture<Void> doRun(@NonNull Supplier<Boolean> shouldStop) {
148         Instant jobStartTime = mClock.instant();
149         BackgroundFetchExecutionLogger backgroundFetchExecutionLogger =
150                 mCustomAudienceLoggerFactory.getBackgroundFetchExecutionLogger();
151         FluentFuture<Void> run =
152                 cleanupFledgeData(jobStartTime, backgroundFetchExecutionLogger)
153                         .transform(
154                                 ignored -> getFetchDataList(shouldStop, jobStartTime),
155                                 AdServicesExecutors.getBackgroundExecutor())
156                         .transformAsync(
157                                 fetchDataList ->
158                                         updateData(
159                                                 fetchDataList,
160                                                 shouldStop,
161                                                 jobStartTime,
162                                                 backgroundFetchExecutionLogger),
163                                 AdServicesExecutors.getBackgroundExecutor())
164                         .withTimeout(
165                                 mFlags.getFledgeBackgroundFetchJobMaxRuntimeMs(),
166                                 TimeUnit.MILLISECONDS,
167                                 AdServicesExecutors.getScheduler());
168         run.addCallback(
169                 getCloseBackgroundFetchExecutionLoggerCallback(backgroundFetchExecutionLogger),
170                 AdServicesExecutors.getBackgroundExecutor());
171 
172         return run;
173     }
174 
updateData( @onNull List<DBCustomAudienceBackgroundFetchData> fetchDataList, @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime, @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)175     private ListenableFuture<Void> updateData(
176             @NonNull List<DBCustomAudienceBackgroundFetchData> fetchDataList,
177             @NonNull Supplier<Boolean> shouldStop,
178             @NonNull Instant jobStartTime,
179             @NonNull BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) {
180         if (fetchDataList.isEmpty()) {
181             sLogger.d("No custom audiences found to update");
182             backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(0);
183             return FluentFuture.from(Futures.immediateVoidFuture());
184         }
185 
186         sLogger.d("Updating %d custom audiences", fetchDataList.size());
187         backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(fetchDataList.size());
188         // Divide the gathered CAs among worker threads
189         int numWorkers =
190                 Math.min(
191                         Math.max(1, Runtime.getRuntime().availableProcessors() - 2),
192                         mFlags.getFledgeBackgroundFetchThreadPoolSize());
193         int numCustomAudiencesPerWorker =
194                 (fetchDataList.size() / numWorkers)
195                         + (((fetchDataList.size() % numWorkers) == 0) ? 0 : 1);
196 
197         List<ListenableFuture<?>> subListFutureUpdates = new ArrayList<>();
198         for (final List<DBCustomAudienceBackgroundFetchData> fetchDataSubList :
199                 Lists.partition(fetchDataList, numCustomAudiencesPerWorker)) {
200             if (shouldStop.get()) {
201                 break;
202             }
203             // Updates in each batch are sequenced
204             ExecutionSequencer sequencer = ExecutionSequencer.create();
205             for (DBCustomAudienceBackgroundFetchData fetchData : fetchDataSubList) {
206                 subListFutureUpdates.add(
207                         sequencer.submitAsync(
208                                 () ->
209                                         mBackgroundFetchRunner.updateCustomAudience(
210                                                 jobStartTime, fetchData),
211                                 AdServicesExecutors.getBackgroundExecutor()));
212             }
213         }
214 
215         return FluentFuture.from(Futures.allAsList(subListFutureUpdates))
216                 .transform(ignored -> null, AdServicesExecutors.getLightWeightExecutor());
217     }
218 
getFetchDataList( @onNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime)219     private List<DBCustomAudienceBackgroundFetchData> getFetchDataList(
220             @NonNull Supplier<Boolean> shouldStop, @NonNull Instant jobStartTime) {
221         if (shouldStop.get()) {
222             sLogger.d("Stopping " + JOB_DESCRIPTION);
223             return ImmutableList.of();
224         }
225 
226         // Fetch stale/eligible/delinquent custom audiences
227         return mCustomAudienceDao.getActiveEligibleCustomAudienceBackgroundFetchData(
228                 jobStartTime, mFlags.getFledgeBackgroundFetchMaxNumUpdated());
229     }
230 
cleanupFledgeData( Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)231     private FluentFuture<?> cleanupFledgeData(
232             Instant jobStartTime, BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) {
233         return FluentFuture.from(
234                 AdServicesExecutors.getBackgroundExecutor()
235                         .submit(
236                                 () -> {
237                                     // Start background fetch execution logger.
238                                     backgroundFetchExecutionLogger.start();
239                                     backgroundFetchExecutionLogger.setNumOfEligibleToUpdateCAs(
240                                             FIELD_UNSET);
241                                     // Clean up custom audiences first so the actual fetch won't do
242                                     // unnecessary work
243                                     mBackgroundFetchRunner.deleteExpiredCustomAudiences(
244                                             jobStartTime);
245                                     mBackgroundFetchRunner.deleteDisallowedOwnerCustomAudiences();
246                                     mBackgroundFetchRunner.deleteDisallowedBuyerCustomAudiences();
247                                     if (mFlags.getFledgeAppInstallFilteringEnabled()) {
248                                         mBackgroundFetchRunner
249                                                 .deleteDisallowedPackageAppInstallEntries();
250                                     }
251                                 }));
252     }
253 
getCloseBackgroundFetchExecutionLoggerCallback( BackgroundFetchExecutionLogger backgroundFetchExecutionLogger)254     private FutureCallback<Void> getCloseBackgroundFetchExecutionLoggerCallback(
255             BackgroundFetchExecutionLogger backgroundFetchExecutionLogger) {
256         return new FutureCallback<>() {
257             @Override
258             public void onSuccess(Void result) {
259                 closeBackgroundFetchExecutionLogger(
260                         backgroundFetchExecutionLogger,
261                         backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(),
262                         STATUS_SUCCESS);
263             }
264 
265             @Override
266             public void onFailure(Throwable t) {
267                 sLogger.d(t, "Error in Custom Audience Background Fetch");
268                 int resultCode = AdServicesLoggerUtil.getResultCodeFromException(t);
269                 closeBackgroundFetchExecutionLogger(
270                         backgroundFetchExecutionLogger,
271                         backgroundFetchExecutionLogger.getNumOfEligibleToUpdateCAs(),
272                         resultCode);
273             }
274         };
275     }
276 
277     private void closeBackgroundFetchExecutionLogger(
278             BackgroundFetchExecutionLogger backgroundFetchExecutionLogger,
279             int numOfEligibleToUpdateCAs,
280             int resultCode) {
281         try {
282             backgroundFetchExecutionLogger.close(numOfEligibleToUpdateCAs, resultCode);
283         } catch (Exception e) {
284             sLogger.d(
285                     "Error when closing backgroundFetchExecutionLogger, "
286                             + "skipping metrics logging: %s",
287                     e.getMessage());
288         }
289     }
290 }
291