1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.android.tradefed.command;
17 
18 import com.android.tradefed.log.LogUtil.CLog;
19 import com.android.tradefed.result.error.ErrorIdentifier;
20 import com.android.tradefed.util.RunInterruptedException;
21 
22 import com.google.common.annotations.VisibleForTesting;
23 import com.google.common.collect.MapMaker;
24 
25 import java.util.concurrent.CompletableFuture;
26 import java.util.concurrent.ConcurrentMap;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.Future;
29 import java.util.concurrent.ScheduledExecutorService;
30 import java.util.concurrent.TimeUnit;
31 
32 import javax.annotation.Nonnull;
33 
34 /** Service allowing TradeFederation commands to be interrupted or marked as uninterruptible. */
35 public class CommandInterrupter {
36 
37     /** Singleton. */
38     public static final CommandInterrupter INSTANCE = new CommandInterrupter();
39 
40     private final ScheduledExecutorService mExecutor = Executors.newScheduledThreadPool(0);
41 
42     // tracks whether a thread is currently interruptible
43     private ConcurrentMap<Thread, Boolean> mInterruptible = new MapMaker().weakKeys().makeMap();
44     // presence of an interrupt error message indicates that the thread should be interrupted
45     private ConcurrentMap<Thread, MessageAndErrorId> mInterruptMessage =
46             new MapMaker().weakKeys().makeMap();
47 
48     private class MessageAndErrorId {
49         public String message;
50         public ErrorIdentifier errorId;
51 
MessageAndErrorId(String message, ErrorIdentifier errorId)52         MessageAndErrorId(String message, ErrorIdentifier errorId) {
53             this.message = message;
54             this.errorId = errorId;
55         }
56     }
57 
58     @VisibleForTesting
59     // FIXME: reduce visibility once RunUtil interrupt tests are removed
CommandInterrupter()60     public CommandInterrupter() {}
61 
62     /** Allow current thread to be interrupted. */
allowInterrupt()63     public void allowInterrupt() {
64         CLog.i("Interrupt allowed");
65         mInterruptible.put(Thread.currentThread(), true);
66         checkInterrupted();
67     }
68 
69     /** Prevent current thread from being interrupted. */
blockInterrupt()70     public void blockInterrupt() {
71         CLog.i("Interrupt blocked");
72         mInterruptible.put(Thread.currentThread(), false);
73         checkInterrupted();
74     }
75 
76     /** @return true if current thread is interruptible */
isInterruptible()77     public boolean isInterruptible() {
78         return isInterruptible(Thread.currentThread());
79     }
80 
81     /** @return true if specified thread is interruptible */
isInterruptible(@onnull Thread thread)82     public boolean isInterruptible(@Nonnull Thread thread) {
83         return Boolean.TRUE.equals(mInterruptible.get(thread));
84     }
85 
86     /**
87      * Allow a specified thread to be interrupted after a delay.
88      *
89      * @param thread thread to mark as interruptible
90      * @param delay time from now to delay execution
91      * @param unit time unit of the delay parameter
92      */
93     // FIXME: reduce visibility once RunUtil interrupt methods are removed
allowInterruptAsync( @onnull Thread thread, long delay, @Nonnull TimeUnit unit)94     public Future<?> allowInterruptAsync(
95             @Nonnull Thread thread, long delay, @Nonnull TimeUnit unit) {
96         if (isInterruptible(thread)) {
97             CLog.v("Thread already interruptible");
98             return CompletableFuture.completedFuture(null);
99         }
100 
101         CLog.w("Allowing interrupt in %d ms", unit.toMillis(delay));
102         return mExecutor.schedule(
103                 () -> {
104                     CLog.e("Interrupt allowed asynchronously");
105                     mInterruptible.put(thread, true);
106                 },
107                 delay,
108                 unit);
109     }
110 
111     /**
112      * Flag a thread, interrupting it if and when it becomes interruptible.
113      *
114      * @param thread thread to mark for interruption
115      * @param message interruption message
116      */
117     // FIXME: reduce visibility once RunUtil interrupt methods are removed
interrupt( @onnull Thread thread, @Nonnull String message, ErrorIdentifier errorId)118     public void interrupt(
119             @Nonnull Thread thread, @Nonnull String message, ErrorIdentifier errorId) {
120         if (message == null) {
121             throw new IllegalArgumentException("message cannot be null.");
122         }
123         mInterruptMessage.put(thread, new MessageAndErrorId(message, errorId));
124         if (isInterruptible(thread)) {
125             thread.interrupt();
126         }
127     }
128 
129     /**
130      * Interrupts the current thread if it should be interrupted. Threads are encouraged to
131      * periodically call this method in order to throw the right {@link RunInterruptedException}.
132      */
checkInterrupted()133     public void checkInterrupted() throws RunInterruptedException {
134         Thread thread = Thread.currentThread();
135         if (isInterruptible()) {
136             MessageAndErrorId error = mInterruptMessage.remove(thread);
137             if (error != null) {
138                 throw new RunInterruptedException(error.message, error.errorId);
139             }
140         }
141     }
142 }
143