1 /* 2 * Copyright (C) 2023 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.federatedcompute.services.training; 18 19 import static com.android.adservices.service.stats.AdServicesStatsLog.AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON; 20 import static com.android.federatedcompute.services.common.FederatedComputeExecutors.getBackgroundExecutor; 21 22 import android.app.job.JobParameters; 23 import android.app.job.JobService; 24 25 import com.android.federatedcompute.internal.util.LogUtil; 26 import com.android.federatedcompute.services.common.FederatedComputeJobUtil; 27 import com.android.federatedcompute.services.common.FlagsFactory; 28 import com.android.federatedcompute.services.statsd.joblogging.FederatedComputeJobServiceLogger; 29 30 import com.google.common.util.concurrent.FutureCallback; 31 import com.google.common.util.concurrent.Futures; 32 import com.google.common.util.concurrent.ListenableFuture; 33 import com.google.intelligence.fcp.client.FLRunnerResult; 34 import com.google.intelligence.fcp.client.FLRunnerResult.ContributionResult; 35 36 /** Main service for the scheduled federated computation jobs. */ 37 public class FederatedJobService extends JobService { 38 private static final String TAG = FederatedJobService.class.getSimpleName(); 39 40 @Override onStartJob(JobParameters params)41 public boolean onStartJob(JobParameters params) { 42 int jobId = params.getJobId(); 43 LogUtil.d(TAG, "FederatedJobService.onStartJob"); 44 FederatedComputeJobServiceLogger.getInstance(this) 45 .recordOnStartJob(jobId); 46 if (FlagsFactory.getFlags().getGlobalKillSwitch()) { 47 LogUtil.d(TAG, "GlobalKillSwitch enabled, finishing job."); 48 return FederatedComputeJobUtil.cancelAndFinishJob(this, params, jobId, 49 AD_SERVICES_BACKGROUND_JOBS_EXECUTION_REPORTED__EXECUTION_RESULT_CODE__SKIP_FOR_KILL_SWITCH_ON); 50 } 51 FederatedComputeWorker worker = FederatedComputeWorker.getInstance(this); 52 ListenableFuture<FLRunnerResult> runCompleteFuture = 53 worker.startTrainingRun(params.getJobId(), new OnJobFinishedCallback(params, this)); 54 55 Futures.addCallback( 56 runCompleteFuture, 57 new FutureCallback<FLRunnerResult>() { 58 @Override 59 public void onSuccess(FLRunnerResult flRunnerResult) { 60 LogUtil.d(TAG, "Federated computation job %d is done!", params.getJobId()); 61 if (flRunnerResult != null) { 62 worker.finish(flRunnerResult); 63 } else { 64 worker.cleanUpActiveRun(); 65 } 66 } 67 68 @Override 69 public void onFailure(Throwable t) { 70 LogUtil.e( 71 TAG, t, "Failed to handle computation job: %d", params.getJobId()); 72 worker.finish(null, ContributionResult.FAIL, false); 73 } 74 }, 75 getBackgroundExecutor()); 76 return true; 77 } 78 79 public static class OnJobFinishedCallback { 80 JobParameters mParams; 81 FederatedJobService mJobService; 82 OnJobFinishedCallback(JobParameters mParams, FederatedJobService mJobService)83 public OnJobFinishedCallback(JobParameters mParams, FederatedJobService mJobService) { 84 this.mParams = mParams; 85 this.mJobService = mJobService; 86 } 87 88 /** 89 * To be called each time we are about to reschedule the job. 90 */ callJobFinished(boolean isSuccessful)91 public void callJobFinished(boolean isSuccessful) { 92 LogUtil.d( 93 TAG, 94 "Job Finished called for Federated computation job %d!", 95 mParams.getJobId()); 96 boolean wantsReschedule = false; 97 FederatedComputeJobServiceLogger.getInstance(mJobService) 98 .recordJobFinished( 99 mParams.getJobId(), 100 isSuccessful, 101 wantsReschedule); 102 mJobService.jobFinished(mParams, wantsReschedule); 103 } 104 } 105 106 @Override onStopJob(JobParameters params)107 public boolean onStopJob(JobParameters params) { 108 int jobId = params.getJobId(); 109 LogUtil.d( 110 TAG, 111 "FederatedJobService.onStopJob %d with reason %d", 112 jobId, 113 params.getStopReason()); 114 boolean wantsReschedule = false; 115 FederatedComputeJobServiceLogger.getInstance(this) 116 .recordOnStopJob( 117 params, 118 jobId, 119 wantsReschedule); 120 FederatedComputeWorker.getInstance(this).finish(null, ContributionResult.FAIL, true); 121 return wantsReschedule; 122 } 123 } 124