1 /*
2  * Copyright (C) 2023 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.federatedcompute.services.training;
18 
19 import static com.android.adservices.service.stats.AdServicesStatsLog.AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON;
20 import static com.android.federatedcompute.services.common.FederatedComputeExecutors.getBackgroundExecutor;
21 
22 import android.app.job.JobParameters;
23 import android.app.job.JobService;
24 
25 import com.android.federatedcompute.internal.util.LogUtil;
26 import com.android.federatedcompute.services.common.FederatedComputeJobUtil;
27 import com.android.federatedcompute.services.common.FlagsFactory;
28 import com.android.federatedcompute.services.statsd.joblogging.FederatedComputeJobServiceLogger;
29 
30 import com.google.common.util.concurrent.FutureCallback;
31 import com.google.common.util.concurrent.Futures;
32 import com.google.common.util.concurrent.ListenableFuture;
33 import com.google.intelligence.fcp.client.FLRunnerResult;
34 import com.google.intelligence.fcp.client.FLRunnerResult.ContributionResult;
35 
36 /** Main service for the scheduled federated computation jobs. */
37 public class FederatedJobService extends JobService {
38     private static final String TAG = FederatedJobService.class.getSimpleName();
39 
40     @Override
onStartJob(JobParameters params)41     public boolean onStartJob(JobParameters params) {
42         int jobId = params.getJobId();
43         LogUtil.d(TAG, "FederatedJobService.onStartJob");
44         FederatedComputeJobServiceLogger.getInstance(this)
45                 .recordOnStartJob(jobId);
46         if (FlagsFactory.getFlags().getGlobalKillSwitch()) {
47             LogUtil.d(TAG, "GlobalKillSwitch enabled, finishing job.");
48             return FederatedComputeJobUtil.cancelAndFinishJob(this, params, jobId,
49                     AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON);
50         }
51         FederatedComputeWorker worker = FederatedComputeWorker.getInstance(this);
52         ListenableFuture<FLRunnerResult> runCompleteFuture =
53                 worker.startTrainingRun(params.getJobId(), new OnJobFinishedCallback(params, this));
54 
55         Futures.addCallback(
56                 runCompleteFuture,
57                 new FutureCallback<FLRunnerResult>() {
58                     @Override
59                     public void onSuccess(FLRunnerResult flRunnerResult) {
60                         LogUtil.d(TAG, "Federated computation job %d is done!", params.getJobId());
61                         if (flRunnerResult != null) {
62                             worker.finish(flRunnerResult);
63                         } else {
64                             worker.cleanUpActiveRun();
65                         }
66                     }
67 
68                     @Override
69                     public void onFailure(Throwable t) {
70                         LogUtil.e(
71                                 TAG, t, "Failed to handle computation job: %d", params.getJobId());
72                         worker.finish(null, ContributionResult.FAIL, false);
73                     }
74                 },
75                 getBackgroundExecutor());
76         return true;
77     }
78 
79     public static class OnJobFinishedCallback {
80         JobParameters mParams;
81         FederatedJobService mJobService;
82 
OnJobFinishedCallback(JobParameters mParams, FederatedJobService mJobService)83         public OnJobFinishedCallback(JobParameters mParams, FederatedJobService mJobService) {
84             this.mParams = mParams;
85             this.mJobService = mJobService;
86         }
87 
88         /**
89          * To be called each time we are about to reschedule the job.
90          */
callJobFinished(boolean isSuccessful)91         public void callJobFinished(boolean isSuccessful) {
92             LogUtil.d(
93                     TAG,
94                     "Job Finished called for Federated computation job %d!",
95                     mParams.getJobId());
96             boolean wantsReschedule = false;
97             FederatedComputeJobServiceLogger.getInstance(mJobService)
98                     .recordJobFinished(
99                             mParams.getJobId(),
100                             isSuccessful,
101                             wantsReschedule);
102             mJobService.jobFinished(mParams, wantsReschedule);
103         }
104     }
105 
106     @Override
onStopJob(JobParameters params)107     public boolean onStopJob(JobParameters params) {
108         int jobId = params.getJobId();
109         LogUtil.d(
110                 TAG,
111                 "FederatedJobService.onStopJob %d with reason %d",
112                 jobId,
113                 params.getStopReason());
114         boolean wantsReschedule = false;
115         FederatedComputeJobServiceLogger.getInstance(this)
116                 .recordOnStopJob(
117                         params,
118                         jobId,
119                         wantsReschedule);
120         FederatedComputeWorker.getInstance(this).finish(null, ContributionResult.FAIL, true);
121         return wantsReschedule;
122     }
123 }
124