/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License */ package com.android.voicemail.impl.scheduling; import android.annotation.TargetApi; import android.content.Context; import android.content.Intent; import android.os.Build.VERSION_CODES; import android.os.Bundle; import android.os.Handler; import android.os.HandlerThread; import android.os.Looper; import android.os.Message; import android.support.annotation.MainThread; import android.support.annotation.Nullable; import android.support.annotation.VisibleForTesting; import android.support.annotation.WorkerThread; import com.android.voicemail.impl.Assert; import com.android.voicemail.impl.NeededForTesting; import com.android.voicemail.impl.VvmLog; import com.android.voicemail.impl.scheduling.TaskQueue.NextTask; import java.util.List; /** * A singleton to queue and run {@link Task} with the {@link android.app.job.JobScheduler}. A task * is queued by sending a broadcast to {@link TaskReceiver}. The intent should contain enough * information in {@link Intent#getExtras()} to construct the task (see {@link * Tasks#createIntent(Context, Class)}). * *
The executor will only exist when {@link TaskSchedulerJobService} is running. * *
All tasks are ran in the background with a wakelock being held by the {@link * android.app.job.JobScheduler}, which is between {@link #onStartJob(Job, List)} and {@link * #finishJobAsync()}. The {@link TaskSchedulerJobService} also has a {@link TaskQueue}, but the * data is stored in the {@link android.app.job.JobScheduler} instead of the process memory, so if * the process is killed the queued tasks will be restored. If a new task is added, a new {@link * TaskSchedulerJobService} will be scheduled to run the task. If the job is already scheduled, the * new task will be pushed into the queue of the scheduled job. If the job is already running, the * job will be queued in process memory. * *
Only one task will be ran at a time, and same task cannot exist in the queue at the same time. * Refer to {@link TaskQueue} for queuing and execution order. * *
If there are still tasks in the queue but none are executable immediately, the service will * enter a "sleep", pushing all remaining task into a new job and end the current job. * *
The executor will be started when {@link TaskSchedulerJobService} is running, and stopped when * there are no more tasks in the queue or when the executor is put to sleep. * *
{@link android.app.job.JobScheduler} is not used directly due to: * *
A short sleep will continue the job and use {@link Handler#postDelayed(Runnable, long)} to * wait for the next task. * *
A long sleep will finish the job and schedule a new one. The exact execution time is
* subjected to {@link android.app.job.JobScheduler} battery optimization, and is not exact.
*/
private static final int SHORT_SLEEP_THRESHOLD_MILLISECONDS = 10_000;
/**
* When there are no more tasks to be run the service should be stopped. But when all tasks has
* finished there might still be more tasks in the message queue waiting to be processed,
* especially the ones submitted in {@link Task#onCompleted()}. Wait for a while before stopping
* the service to make sure there are no pending messages.
*/
private static final int STOP_DELAY_MILLISECONDS = 5_000;
/** Interval between polling of whether the job is finished. */
private static final int TERMINATE_POLLING_INTERVAL_MILLISECONDS = 1_000;
// The thread to run tasks on
private final WorkerThreadHandler workerThreadHandler;
private static TaskExecutor instance;
/**
* Used by tests to turn task handling into a single threaded process by calling {@link
* Handler#handleMessage(Message)} directly
*/
private MessageSender messageSender = new MessageSender();
private final MainThreadHandler mainThreadHandler;
private final Context appContext;
/** Main thread only, access through {@link #getTasks()} */
private final TaskQueue tasks = new TaskQueue();
private boolean isWorkerThreadBusy = false;
private boolean isTerminating = false;
private Job job;
private final Runnable stopServiceWithDelay =
new Runnable() {
@MainThread
@Override
public void run() {
VvmLog.i(TAG, "Stopping service");
if (!isJobRunning() || isTerminating()) {
VvmLog.e(TAG, "Service already stopped");
return;
}
scheduleJobAndTerminate(0, true);
}
};
/**
* Reschedule the {@link TaskSchedulerJobService} and terminate the executor when the {@link Job}
* is truly finished. If the job is still not finished, this runnable will requeue itself on the
* main thread. The requeue is only expected to happen a few times.
*/
private class JobFinishedPoller implements Runnable {
private final long delayMillis;
private final boolean isNewJob;
private int invocationCounter = 0;
JobFinishedPoller(long delayMillis, boolean isNewJob) {
this.delayMillis = delayMillis;
this.isNewJob = isNewJob;
}
@Override
public void run() {
// The job should be finished relatively quickly. Assert to make sure this assumption is true.
Assert.isTrue(invocationCounter < 10);
invocationCounter++;
if (job.isFinished()) {
VvmLog.i("JobFinishedPoller.run", "Job finished");
if (!getTasks().isEmpty()) {
TaskSchedulerJobService.scheduleJob(
appContext, serializePendingTasks(), delayMillis, isNewJob);
tasks.clear();
}
terminate();
return;
}
VvmLog.w("JobFinishedPoller.run", "Job still running");
mainThreadHandler.postDelayed(this, TERMINATE_POLLING_INTERVAL_MILLISECONDS);
}
};
/** Should attempt to run the next task when a task has finished or been added. */
private boolean taskAutoRunDisabledForTesting = false;
/** Handles execution of the background task in teh worker thread. */
@VisibleForTesting
final class WorkerThreadHandler extends Handler {
public WorkerThreadHandler(Looper looper) {
super(looper);
}
@Override
@WorkerThread
public void handleMessage(Message msg) {
Assert.isNotMainThread();
Task task = (Task) msg.obj;
try {
VvmLog.i(TAG, "executing task " + task);
task.onExecuteInBackgroundThread();
} catch (Throwable throwable) {
VvmLog.e(TAG, "Exception while executing task " + task + ":", throwable);
}
Message schedulerMessage = mainThreadHandler.obtainMessage();
schedulerMessage.obj = task;
messageSender.send(schedulerMessage);
}
}
/** Handles completion of the background task in the main thread. */
@VisibleForTesting
final class MainThreadHandler extends Handler {
public MainThreadHandler(Looper looper) {
super(looper);
}
@Override
@MainThread
public void handleMessage(Message msg) {
Assert.isMainThread();
Task task = (Task) msg.obj;
getTasks().remove(task);
task.onCompleted();
isWorkerThreadBusy = false;
if (!isJobRunning() || isTerminating()) {
// TaskExecutor was terminated when the task is running in background, don't need to run the
// next task or terminate again
return;
}
maybeRunNextTask();
}
}
/** Starts a new TaskExecutor. May only be called by {@link TaskSchedulerJobService}. */
@MainThread
static void createRunningInstance(Context context) {
Assert.isMainThread();
Assert.isTrue(instance == null);
instance = new TaskExecutor(context);
}
/** @return the currently running instance, or {@code null} if the executor is not running. */
@MainThread
@Nullable
static TaskExecutor getRunningInstance() {
return instance;
}
private TaskExecutor(Context context) {
this.appContext = context.getApplicationContext();
HandlerThread thread = new HandlerThread("VvmTaskExecutor");
thread.start();
workerThreadHandler = new WorkerThreadHandler(thread.getLooper());
mainThreadHandler = new MainThreadHandler(Looper.getMainLooper());
}
@VisibleForTesting
void terminate() {
VvmLog.i(TAG, "terminated");
Assert.isMainThread();
job = null;
workerThreadHandler.getLooper().quit();
instance = null;
TaskReceiver.resendDeferredBroadcasts(appContext);
}
@MainThread
void addTask(Task task) {
Assert.isMainThread();
getTasks().add(task);
VvmLog.i(TAG, task + " added");
mainThreadHandler.removeCallbacks(stopServiceWithDelay);
maybeRunNextTask();
}
@MainThread
@VisibleForTesting
TaskQueue getTasks() {
Assert.isMainThread();
return tasks;
}
@MainThread
private void maybeRunNextTask() {
Assert.isMainThread();
if (isWorkerThreadBusy) {
return;
}
if (taskAutoRunDisabledForTesting) {
// If taskAutoRunDisabledForTesting is true, runNextTask() must be explicitly called
// to run the next task.
return;
}
runNextTask();
}
@VisibleForTesting
@MainThread
void runNextTask() {
Assert.isMainThread();
if (getTasks().isEmpty()) {
prepareStop();
return;
}
NextTask nextTask = getTasks().getNextTask(READY_TOLERANCE_MILLISECONDS);
if (nextTask.task != null) {
nextTask.task.onBeforeExecute();
Message message = workerThreadHandler.obtainMessage();
message.obj = nextTask.task;
isWorkerThreadBusy = true;
messageSender.send(message);
return;
}
VvmLog.i(TAG, "minimal wait time:" + nextTask.minimalWaitTimeMillis);
if (!taskAutoRunDisabledForTesting && nextTask.minimalWaitTimeMillis != null) {
// No tasks are currently ready. Sleep until the next one should be.
// If a new task is added during the sleep the service will wake immediately.
sleep(nextTask.minimalWaitTimeMillis);
}
}
@MainThread
private void sleep(long timeMillis) {
VvmLog.i(TAG, "sleep for " + timeMillis + " millis");
if (timeMillis < SHORT_SLEEP_THRESHOLD_MILLISECONDS) {
mainThreadHandler.postDelayed(
new Runnable() {
@Override
public void run() {
maybeRunNextTask();
}
},
timeMillis);
return;
}
scheduleJobAndTerminate(timeMillis, false);
}
private List