1 /*
2  * Copyright (C) 2019 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.android.tradefed.cluster;
17 
18 import com.android.annotations.VisibleForTesting;
19 import com.android.tradefed.cluster.ClusterHostEvent.HostEventType;
20 import com.android.tradefed.command.CommandScheduler;
21 import com.android.tradefed.command.ICommandScheduler;
22 import com.android.tradefed.command.remote.DeviceDescriptor;
23 import com.android.tradefed.config.ConfigurationException;
24 import com.android.tradefed.config.IConfiguration;
25 import com.android.tradefed.device.DeviceAllocationState;
26 import com.android.tradefed.device.DeviceNotAvailableException;
27 import com.android.tradefed.device.FreeDeviceState;
28 import com.android.tradefed.device.IDeviceManager;
29 import com.android.tradefed.device.ITestDevice;
30 import com.android.tradefed.device.NoDeviceException;
31 import com.android.tradefed.device.TestDeviceState;
32 import com.android.tradefed.device.battery.BatteryController;
33 import com.android.tradefed.device.battery.IBatteryInfo;
34 import com.android.tradefed.device.battery.IBatteryInfo.BatteryState;
35 import com.android.tradefed.error.HarnessRuntimeException;
36 import com.android.tradefed.error.IHarnessException;
37 import com.android.tradefed.host.IHostOptions.PermitLimitType;
38 import com.android.tradefed.invoker.IInvocationContext;
39 import com.android.tradefed.invoker.logger.InvocationMetricLogger.InvocationMetricKey;
40 import com.android.tradefed.log.LogUtil.CLog;
41 import com.android.tradefed.result.CollectingTestListener;
42 import com.android.tradefed.result.FailureDescription;
43 import com.android.tradefed.result.ITestSummaryListener;
44 import com.android.tradefed.result.TestRunResult;
45 import com.android.tradefed.result.TestStatus;
46 import com.android.tradefed.result.TestSummary;
47 import com.android.tradefed.result.error.ErrorIdentifier;
48 import com.android.tradefed.result.error.ErrorStorageUtil;
49 import com.android.tradefed.result.error.InfraErrorIdentifier;
50 import com.android.tradefed.util.FileUtil;
51 import com.android.tradefed.util.MultiMap;
52 import com.android.tradefed.util.QuotationAwareTokenizer;
53 import com.android.tradefed.util.StreamUtil;
54 
55 import com.google.common.primitives.Ints;
56 
57 import org.json.JSONException;
58 
59 import java.io.File;
60 import java.io.IOException;
61 import java.util.Arrays;
62 import java.util.Collection;
63 import java.util.Collections;
64 import java.util.HashMap;
65 import java.util.HashSet;
66 import java.util.LinkedList;
67 import java.util.List;
68 import java.util.Map;
69 import java.util.Optional;
70 import java.util.Set;
71 import java.util.concurrent.RejectedExecutionHandler;
72 import java.util.concurrent.ScheduledFuture;
73 import java.util.concurrent.ScheduledThreadPoolExecutor;
74 import java.util.concurrent.ThreadFactory;
75 import java.util.concurrent.ThreadPoolExecutor;
76 import java.util.concurrent.TimeUnit;
77 
78 /**
79  * A {@link ICommandScheduler} to support TFC (Tradefed Cluster). This scheduler runs commands from
80  * TFC command-queue and uploads invocation events to TFC command-event-queue.
81  */
82 public class ClusterCommandScheduler extends CommandScheduler {
83 
84     // Errors that should not be retried.
85     private static final Set<InfraErrorIdentifier> NONE_RETRIABLE_CONFIG_ERRORS =
86             new HashSet<>(Arrays.asList(InfraErrorIdentifier.OPTION_CONFIGURATION_ERROR));
87 
88     /** The {@link ScheduledThreadPoolExecutor} used to manage heartbeats. */
89     private ScheduledThreadPoolExecutor mHeartbeatThreadPool = null;
90 
91     /** The {@link IClusterOptions} instance used to store cluster-related settings. */
92     private IClusterOptions mClusterOptions;
93 
94     /** The {@link IClusterClient} instance used to interact with the TFC backend. */
95     private IClusterClient mClusterClient;
96 
97     /**
98      * A {@link ThreadFactory} which returns threads in a dedicated heartbeat group.
99      *
100      * <p>This class is used as a factory by {@code mHeartbeatThreadPool} in order to segregate
101      * heartbeat threads from other "stray" threads to avoid tripping loose thread detection in
102      * {@link CommandScheduler}.
103      */
104     private static class HeartbeatThreadFactory implements ThreadFactory {
105         private static final ThreadGroup HB_GROUP;
106 
107         static {
108             // fetch root thread group as this class may be initialized by an invocation thread
109             ThreadGroup tg = Thread.currentThread().getThreadGroup();
110             while (tg.getParent() != null) {
111                 tg = tg.getParent();
112             }
113             HB_GROUP = new ThreadGroup(tg, "ClusterCommandScheduler.heartbeat");
114         }
115 
116         @Override
newThread(Runnable r)117         public Thread newThread(Runnable r) {
118             Thread thread = new Thread(HB_GROUP, r);
119             // heartbeat should always get cancelled, but ensure it doesn't prevent JVM exit
120             thread.setDaemon(true);
121             return thread;
122         }
123     }
124 
125     /** {@inheritDoc} */
126     @Override
start()127     public void start() {
128         UploadHostEventWithState(HostState.RUNNING);
129         super.start();
130     }
131 
132     /** {@inheritDoc} */
133     @Override
shutdown()134     public void shutdown() {
135         UploadHostEventWithState(HostState.QUITTING);
136         getHeartbeatThreadPool().shutdown();
137         super.shutdown();
138     }
139 
140     @Override
shutdownHard()141     public synchronized void shutdownHard() {
142         UploadHostEventWithState(HostState.KILLING);
143         getHeartbeatThreadPool().shutdown();
144         super.shutdownHard();
145     }
146 
147     /**
148      * A {@link com.android.tradefed.command.ICommandScheduler.IScheduledInvocationListener} to
149      * upload events to TFC.
150      */
151     class InvocationEventHandler extends CollectingTestListener
152             implements IScheduledInvocationListener, ITestSummaryListener {
153 
154         private ScheduledFuture<?> mHeartbeat;
155         private final ClusterCommand mCommandTask;
156         private Set<String> mDeviceSerials = new HashSet<>();
157         private String mSummary;
158         private Set<String> processedSummaries = new HashSet<>();
159         private FailureDescription mFailureDescription;
160         private String mError;
161         private String mSubprocessCommandError;
162         private File mWorkDir;
163         private InvocationStatus mInvocationStatus;
164         private boolean mCanceled = false;
165 
166         /**
167          * Creates a {@link InvocationEventHandler} to track the given {@link ClusterCommand}.
168          *
169          * @param commandTask the {@link ClusterCommand} to track.
170          */
InvocationEventHandler(ClusterCommand commandTask)171         public InvocationEventHandler(ClusterCommand commandTask) {
172             mCommandTask = commandTask;
173         }
174 
175         /**
176          * Sets a work directory for an invocation.
177          *
178          * @param dir a work directory.
179          */
setWorkDir(File dir)180         public void setWorkDir(File dir) {
181             mWorkDir = dir;
182         }
183 
184         @VisibleForTesting
setCanceled(boolean value)185         void setCanceled(boolean value) {
186             mCanceled = value;
187         }
188 
createEventBuilder()189         private ClusterCommandEvent.Builder createEventBuilder() {
190             final ClusterCommandEvent.Builder builder =
191                     ClusterCommandEvent.createEventBuilder(mCommandTask)
192                             .setHostName(ClusterHostUtil.getHostName());
193             if (!mDeviceSerials.isEmpty()) {
194                 builder.setDeviceSerials(mDeviceSerials);
195             }
196             return builder;
197         }
198 
updateInvocationStatus()199         private void updateInvocationStatus() {
200             if (!getClusterOptions().shouldUploadInvocationStatus()) {
201                 return;
202             }
203             final InvocationStatus obj = new InvocationStatus();
204             final Collection<TestRunResult> testRunResults = this.getMergedTestRunResults();
205             for (final TestRunResult result : testRunResults) {
206                 final TestGroupStatus testGroupStatus =
207                         new TestGroupStatus(
208                                 result.getName(),
209                                 result.getNumTests(),
210                                 result.getNumCompleteTests(),
211                                 result.getNumAllFailedTests(),
212                                 result.getNumTestsInState(TestStatus.PASSED),
213                                 result.isRunComplete(),
214                                 result.getElapsedTime());
215                 obj.addTestGroupStatus(testGroupStatus);
216             }
217             mInvocationStatus = obj;
218         }
219 
220         /** {@inheritDoc} */
221         @Override
invocationInitiated(IInvocationContext context)222         public void invocationInitiated(IInvocationContext context) {
223             for (ITestDevice device : context.getDevices()) {
224                 mDeviceSerials.add(device.getSerialNumber());
225             }
226             final ClusterCommandEvent event =
227                     createEventBuilder()
228                             .setType(ClusterCommandEvent.Type.InvocationInitiated)
229                             .build();
230             getClusterClient().getCommandEventUploader().postEvent(event);
231             getClusterClient().getCommandEventUploader().flush();
232             mHeartbeat = startHeartbeat();
233             // Check that devices are in charging state before starting the invocation.
234             for (ITestDevice device : context.getDevices()) {
235                 try {
236                     BatteryState state = BatteryController.getDeviceChargingState(device);
237                     if (BatteryState.NOT_CHARGING.equals(state)) {
238                         IBatteryInfo info = BatteryController.getBatteryInfoForDevice(device);
239                         if (info != null) {
240                             info.enableCharging(device);
241                         }
242                     }
243                 } catch (DeviceNotAvailableException e) {
244                     CLog.e(e);
245                 }
246             }
247         }
248 
249         /** {@inheritDoc} */
250         @Override
invocationStarted(IInvocationContext context)251         public void invocationStarted(IInvocationContext context) {
252             super.invocationStarted(context);
253             final ClusterCommandEvent event =
254                     createEventBuilder()
255                             .setType(ClusterCommandEvent.Type.InvocationStarted)
256                             .build();
257             getClusterClient().getCommandEventUploader().postEvent(event);
258             getClusterClient().getCommandEventUploader().flush();
259         }
260 
261         @Override
testRunStarted(String name, int numTests)262         public void testRunStarted(String name, int numTests) {
263             testRunStarted(name, numTests, 0);
264         }
265 
266         @Override
testRunStarted(String name, int numTests, int attemptNumber)267         public void testRunStarted(String name, int numTests, int attemptNumber) {
268             testRunStarted(name, numTests, attemptNumber, System.currentTimeMillis());
269         }
270 
271         /** {@inheritDoc} */
272         @Override
testRunStarted(String name, int numTests, int attemptNumber, long startTime)273         public void testRunStarted(String name, int numTests, int attemptNumber, long startTime) {
274             super.testRunStarted(name, numTests, attemptNumber, startTime);
275             updateInvocationStatus();
276         }
277 
278         /** {@inheritDoc} */
279         @Override
invocationFailed(Throwable cause)280         public void invocationFailed(Throwable cause) {
281             super.invocationFailed(cause);
282 
283             mError = StreamUtil.getStackTrace(cause);
284             if (cause instanceof SubprocessCommandException && cause.getCause() != null) {
285                 // The inner exception holds an exception stack trace from a subprocess.
286                 mSubprocessCommandError = cause.getCause().getMessage();
287             }
288         }
289 
290         /** {@inheritDoc} */
291         @Override
invocationFailed(FailureDescription failure)292         public void invocationFailed(FailureDescription failure) {
293             super.invocationFailed(failure);
294 
295             mFailureDescription = failure;
296             mError = failure.getErrorMessage();
297             if (failure.getCause() != null) {
298                 Throwable cause = failure.getCause();
299                 mError = StreamUtil.getStackTrace(cause);
300                 if (cause instanceof HarnessRuntimeException
301                         && InfraErrorIdentifier.TRADEFED_SKIPPED_TESTS_DURING_SHUTDOWN.equals(
302                                 ((HarnessRuntimeException) cause).getErrorId())) {
303                     // Tests were not run, so un-lease the command so that it can be rescheduled.
304                     unleaseCommands(Arrays.asList(mCommandTask));
305                 }
306             }
307         }
308 
309         /** {@inheritDoc} */
310         @Override
invocationEnded(long elapsedTime)311         public void invocationEnded(long elapsedTime) {
312             super.invocationEnded(elapsedTime);
313 
314             ClusterCommandEvent event =
315                     createEventBuilder()
316                             .setType(ClusterCommandEvent.Type.InvocationEnded)
317                             .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError)
318                             .setData(
319                                     ClusterCommandEvent.DATA_KEY_SUBPROCESS_COMMAND_ERROR,
320                                     mSubprocessCommandError)
321                             .build();
322             getClusterClient().getCommandEventUploader().postEvent(event);
323             getClusterClient().getCommandEventUploader().flush();
324         }
325 
326         /** {@inheritDoc} */
327         @Override
invocationComplete( IInvocationContext metadata, Map<ITestDevice, FreeDeviceState> devicesStates)328         public void invocationComplete(
329                 IInvocationContext metadata, Map<ITestDevice, FreeDeviceState> devicesStates) {
330             CLog.d("ClusterCommand invocationComplete start.");
331             if (mWorkDir != null) {
332                 FileUtil.recursiveDelete(mWorkDir);
333             }
334 
335             // TODO: handle multi-device where only one of the build could be missing.
336             ErrorIdentifier errorId = null;
337             if (getPrimaryBuildInfo() == null && mError == null) {
338                 mError = "build not found";
339                 // Test that the filesystem is working as it's the main reason for this error
340                 // situation to occur
341                 try {
342                     File f = FileUtil.createTempFile("test-filesystem", ".txt");
343                     FileUtil.deleteFile(f);
344                 } catch (IOException e) {
345                     errorId = InfraErrorIdentifier.LAB_HOST_FILESYSTEM_ERROR;
346                     mError =
347                             String.format(
348                                     "[%s] Filesystem error on %s. Please notify lab admin.",
349                                     errorId.name(), ClusterHostUtil.getHostName());
350                 }
351             }
352             if (errorId == null && mFailureDescription != null) {
353                 errorId = mFailureDescription.getErrorIdentifier();
354             }
355 
356             String fetchBuildTimeMillis = "-1";
357             String setupTimeMillis = "-1";
358             String lostDevice = null;
359             if (metadata != null) {
360                 fetchBuildTimeMillis =
361                         metadata.getAttributes()
362                                 .getUniqueMap()
363                                 .get(InvocationMetricKey.FETCH_BUILD.toString());
364                 setupTimeMillis =
365                         metadata.getAttributes()
366                                 .getUniqueMap()
367                                 .get(InvocationMetricKey.SETUP.toString());
368                 lostDevice =
369                         metadata.getAttributes()
370                                 .getUniqueMap()
371                                 .get(InvocationMetricKey.DEVICE_LOST_DETECTED.toString());
372             }
373 
374             // Stop heartbeat thread before sending InvocationCompleted event.
375             if (mHeartbeat != null) {
376                 mHeartbeat.cancel(true);
377             }
378             updateInvocationStatus();
379             final ClusterCommandEvent.Builder eventBuilder =
380                     createEventBuilder()
381                             .setType(ClusterCommandEvent.Type.InvocationCompleted)
382                             .setInvocationStatus(mInvocationStatus)
383                             .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError)
384                             .setData(
385                                     ClusterCommandEvent.DATA_KEY_SUBPROCESS_COMMAND_ERROR,
386                                     mSubprocessCommandError)
387                             .setData(ClusterCommandEvent.DATA_KEY_SUMMARY, mSummary)
388                             .setData(
389                                     ClusterCommandEvent.DATA_KEY_FETCH_BUILD_TIME_MILLIS,
390                                     fetchBuildTimeMillis)
391                             .setData(
392                                     ClusterCommandEvent.DATA_KEY_SETUP_TIME_MILLIS, setupTimeMillis)
393                             .setData(
394                                     ClusterCommandEvent.DATA_KEY_TOTAL_TEST_COUNT,
395                                     Integer.toString(getNumTotalTests()))
396                             .setData(
397                                     ClusterCommandEvent.DATA_KEY_FAILED_TEST_COUNT,
398                                     Integer.toString(getNumAllFailedTests()))
399                             .setData(
400                                     ClusterCommandEvent.DATA_KEY_PASSED_TEST_COUNT,
401                                     Integer.toString(getNumTestsInState(TestStatus.PASSED)))
402                             .setData(
403                                     ClusterCommandEvent.DATA_KEY_FAILED_TEST_RUN_COUNT,
404                                     Integer.toString(getNumAllFailedTestRuns()));
405             if (errorId != null) {
406                 // Report ConfigurationError for known errors to prevent test retry.
407                 if (NONE_RETRIABLE_CONFIG_ERRORS.contains(errorId)) {
408                     eventBuilder.setType(ClusterCommandEvent.Type.ConfigurationError);
409                 }
410                 eventBuilder.setData(ClusterCommandEvent.DATA_KEY_ERROR_ID_NAME, errorId.name());
411                 eventBuilder.setData(ClusterCommandEvent.DATA_KEY_ERROR_ID_CODE, errorId.code());
412                 eventBuilder.setData(
413                         ClusterCommandEvent.DATA_KEY_ERROR_STATUS,
414                         ErrorStorageUtil.mapStatus(errorId.status()));
415             }
416             if (lostDevice != null) {
417                 eventBuilder.setData(ClusterCommandEvent.DATA_KEY_LOST_DEVICE_DETECTED, lostDevice);
418             }
419             final ClusterCommandEvent event = eventBuilder.build();
420             getClusterClient().getCommandEventUploader().postEvent(event);
421             getClusterClient().getCommandEventUploader().flush();
422             CLog.d("ClusterCommand invocationComplete done.");
423         }
424 
425         /** {@inheritDoc} */
426         @Override
putEarlySummary(List<TestSummary> summaries)427         public void putEarlySummary(List<TestSummary> summaries) {
428             if (getClusterOptions().shouldCollectEarlyTestSummary()) {
429                 putSummary(summaries);
430             }
431         }
432 
433         /** {@inheritDoc} */
434         @Override
putSummary(List<TestSummary> summaries)435         public void putSummary(List<TestSummary> summaries) {
436             StringBuilder sb = new StringBuilder();
437             for (final TestSummary summary : summaries) {
438                 String summaryString = summary.getSummary().toString();
439                 if (!processedSummaries.contains(summaryString)) {
440                     processedSummaries.add(summaryString);
441                     sb.append(summaryString);
442                     sb.append("\n");
443                 }
444             }
445             mSummary = mSummary == null ? sb.toString() : mSummary + sb.toString();
446         }
447 
startHeartbeat()448         private ScheduledFuture<?> startHeartbeat() {
449             return getHeartbeatThreadPool()
450                     .scheduleAtFixedRate(
451                             new HeartbeatSender(),
452                             0,
453                             getClusterOptions().getInvocationHeartbeatInterval(),
454                             TimeUnit.MILLISECONDS);
455         }
456 
457         class HeartbeatSender implements Runnable {
458             @Override
run()459             public void run() {
460                 try {
461                     // Check cluster command's status.
462                     if (getClusterOptions().checkCommandState() && !mCanceled) {
463                         ClusterCommandStatus commandStatus =
464                                 getClusterClient()
465                                         .getCommandStatus(
466                                                 mCommandTask.getRequestId(),
467                                                 mCommandTask.getCommandId());
468                         if (ClusterCommand.State.CANCELED.equals(commandStatus.getState())) {
469                             mCanceled = true;
470                             String cause =
471                                     String.format(
472                                             "The cluster client %s has marked command"
473                                                     + " (requestId=%s, commandId=%s) canceled with"
474                                                     + " reason: %s",
475                                             getClusterClient().getClass().getSimpleName(),
476                                             mCommandTask.getRequestId(),
477                                             mCommandTask.getCommandId(),
478                                             commandStatus.getCancelReason());
479                             CLog.w("Stop invocation due to: %s", cause);
480                             Optional.ofNullable(getInvocationContext())
481                                     .map(IInvocationContext::getInvocationId)
482                                     .map(Ints::tryParse)
483                                     .ifPresent(invocationId -> stopInvocation(invocationId, cause));
484                         } else if (ClusterCommand.State.COMPLETED.equals(
485                                 commandStatus.getState())) {
486                             CLog.d("Invocation completed, skip reporting heartbeat.");
487                             return;
488                         }
489                     }
490 
491                     final ClusterCommandEvent event =
492                             createEventBuilder()
493                                     .setType(ClusterCommandEvent.Type.TestRunInProgress)
494                                     .setInvocationStatus(mInvocationStatus)
495                                     .build();
496                     getClusterClient().getCommandEventUploader().postEvent(event);
497                 } catch (Exception e) {
498                     CLog.e("Error sending heartbeat to TFC:");
499                     CLog.e(e);
500                 }
501             }
502         }
503     }
504 
getHeartbeatThreadPool()505     synchronized ScheduledThreadPoolExecutor getHeartbeatThreadPool() {
506         if (mHeartbeatThreadPool == null) {
507             mHeartbeatThreadPool = new ScheduledThreadPoolExecutor(1, new HeartbeatThreadFactory());
508             // instead of throwing some exception on shutdown we simply log it.
509             mHeartbeatThreadPool.setRejectedExecutionHandler(
510                     new RejectedExecutionHandler() {
511                         @Override
512                         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
513                             CLog.w(
514                                     "Rejecting Task %s rejected from executor %s",
515                                     r.toString(), e.toString());
516                         }
517                     });
518             // continue existing heartbeats after shutdown (until invocation is complete)
519             mHeartbeatThreadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
520         }
521         return mHeartbeatThreadPool;
522     }
523 
524     /** {@inheritDoc} */
525     @Override
processReadyCommands(IDeviceManager manager)526     protected void processReadyCommands(IDeviceManager manager) {
527         super.processReadyCommands(manager);
528 
529         if (isShuttingDown()) {
530             return;
531         }
532 
533         List<ClusterCommand> commands = null;
534         MultiMap<String, DeviceDescriptor> devices = getAvailableDevices(manager);
535         if (devices.isEmpty()) {
536             CLog.d("No devices are available for testing.");
537             return;
538         }
539         // Lease command tasks through the leasehosttasks API.
540         // Here we get all devices (available or not), so TFC will analyze the device tree to
541         // decide which group is allocated and which group is available.
542         devices = getDevices(manager, false);
543         commands = fetchHostCommands(devices);
544         if (commands.isEmpty()) {
545             CLog.d("No commands available for testing.");
546             return;
547         }
548         if (isShuttingDown()) {
549             CLog.d("Tradefed shutting down, unleasing commands.");
550             unleaseCommands(commands);
551             return;
552         }
553         execCommands(commands);
554     }
555 
556     /**
557      * Returns a map containing available devices grouped by their types.
558      *
559      * @param manager a {@link IDeviceManager}.
560      * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices.
561      */
getAvailableDevices(IDeviceManager manager)562     MultiMap<String, DeviceDescriptor> getAvailableDevices(IDeviceManager manager) {
563         return getDevices(manager, true);
564     }
565 
566     /**
567      * Returns a map containing devices grouped by their types.
568      *
569      * @param manager a {@link IDeviceManager}.
570      * @param availableOnly only return available devices or all devices.
571      * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices.
572      */
getDevices(IDeviceManager manager, boolean availableOnly)573     MultiMap<String, DeviceDescriptor> getDevices(IDeviceManager manager, boolean availableOnly) {
574         // Getting available device types
575         final MultiMap<String, DeviceDescriptor> devices = new MultiMap<>();
576         for (final DeviceDescriptor device : manager.listAllDevices()) {
577             if (availableOnly && device.getState() != DeviceAllocationState.Available) {
578                 continue;
579             }
580             TestDeviceState deviceState = device.getTestDeviceState();
581             if (TestDeviceState.FASTBOOT.equals(deviceState)
582                     || TestDeviceState.FASTBOOTD.equals(deviceState)) {
583                 continue;
584             }
585             if (ClusterHostUtil.isLocalhostIpPort(device.getSerial())) {
586                 // Skipping localhost IP:PORT serials from cluster scheduling to avoid scheduling
587                 // tests on TCP devices created by Local/RemoteAndroidVirtualDevice.
588                 continue;
589             }
590             String runTargetFormat = getClusterOptions().getRunTargetFormat();
591             String runTarget =
592                     ClusterHostUtil.getRunTarget(
593                             device, runTargetFormat, getClusterOptions().getDeviceTag());
594             devices.put(runTarget, device);
595         }
596         return devices;
597     }
598 
permitsAvailableToSchedule()599     private int permitsAvailableToSchedule() {
600         if (!getClusterOptions().checkPermitsOnLease()) {
601             return Integer.MAX_VALUE;
602         }
603         for (PermitLimitType permit : PermitLimitType.values()) {
604             if (getHostOptions().getAvailablePermits(permit) <= 0) {
605                 CLog.i("There is no available '%s' permits. Not leasing any additional commands.",
606                         permit);
607                 return 0;
608             }
609         }
610         // Assumption is that download permits eventually become flashing permits
611         // TODO: Improve to track after download until flashing
612         int heuriticPermitCalculation =
613                 getHostOptions().getAvailablePermits(PermitLimitType.CONCURRENT_FLASHER)
614                         - getHostOptions().getInUsePermits(PermitLimitType.CONCURRENT_DOWNLOAD);
615         if (heuriticPermitCalculation < 0) {
616             CLog.i(
617                     "Download permits will exceed the flashing limit and might create permit"
618                             + " delays. Not Leasing.");
619             return 0;
620         }
621         return heuriticPermitCalculation;
622     }
623 
checkDiskSpace()624     private boolean checkDiskSpace() {
625         if (getClusterOptions().maxDiskUsagePercentage() == 100L) {
626             return true;
627         }
628         File rootPartition = new File("/");
629         long freeSpace =
630             (long) (rootPartition.getUsableSpace() * 100.0) / rootPartition.getTotalSpace();
631         long usage = 100L - freeSpace;
632         if (usage > getClusterOptions().maxDiskUsagePercentage()) {
633             CLog.i("Disk space utilization is '%s%%'. Stop leasing.", usage);
634             return false;
635         }
636         return true;
637     }
638 
639     /**
640      * Fetches commands for devices from the Tradefed Cluster's leasehosttasks API.
641      *
642      * @param devices a {@link MultiMap} of String to DeviceDescriptor containing devices.
643      * @return a list of {@link ClusterCommand}s.
644      */
fetchHostCommands(final MultiMap<String, DeviceDescriptor> devices)645     List<ClusterCommand> fetchHostCommands(final MultiMap<String, DeviceDescriptor> devices) {
646         CLog.d("fetching cluster host commands from leasehosttasks...");
647         int permitsAvailable = permitsAvailableToSchedule();
648         if (permitsAvailable <= 0) {
649             return Collections.<ClusterCommand>emptyList();
650         }
651         // Check disk space before scheduling
652         if (!checkDiskSpace()) {
653             return Collections.<ClusterCommand>emptyList();
654         }
655 
656         final IClusterOptions options = getClusterOptions();
657         final MultiMap<String, String> deviceGroups = options.getDeviceGroup();
658         final Map<String, String> deviceToGroup = new HashMap<>();
659         for (String group : deviceGroups.keySet()) {
660             for (String deviceSerial : deviceGroups.get(group)) {
661                 deviceToGroup.put(deviceSerial, group);
662             }
663         }
664         List<ClusterDeviceInfo> deviceInfos = new LinkedList<>();
665         for (String runTarget : devices.keySet()) {
666             for (DeviceDescriptor d : devices.get(runTarget)) {
667                 String groupName = deviceToGroup.getOrDefault(d.getSerial(), null);
668                 ClusterDeviceInfo deviceInfo =
669                         new ClusterDeviceInfo.Builder()
670                                 .setDeviceDescriptor(d)
671                                 .setRunTarget(runTarget)
672                                 .setGroupName(groupName)
673                                 .build();
674                 deviceInfos.add(deviceInfo);
675             }
676         }
677         try {
678             int count = Math.min(deviceInfos.size(), permitsAvailable);
679             List<ClusterCommand> commands =
680                     getClusterClient()
681                             .leaseHostCommands(
682                                     options.getClusterId(),
683                                     ClusterHostUtil.getHostName(),
684                                     deviceInfos,
685                                     options.getNextClusterIds(),
686                                     count);
687             return commands;
688         } catch (JSONException e) {
689             CLog.e(e);
690             return Collections.<ClusterCommand>emptyList();
691         }
692     }
693 
694     /**
695      * Executes commands fetched from the cluster command queue.
696      *
697      * @param commands a list of {@link ClusterCommand}s fetched from the cluster command queue.
698      */
execCommands(final List<ClusterCommand> commands)699     void execCommands(final List<ClusterCommand> commands) {
700         int commandIdx = 0;
701         for (final ClusterCommand commandTask : commands) {
702             if (isShuttingDown()) {
703                 CLog.d("Tradefed shutting down, unleasing remaining commands.");
704                 unleaseCommands(commands.subList(commandIdx, commands.size()));
705                 return;
706             }
707             try {
708                 final InvocationEventHandler handler = new InvocationEventHandler(commandTask);
709                 switch (commandTask.getRequestType()) {
710                     case UNMANAGED:
711                         execClusterCommand(commandTask, handler);
712                         break;
713                     case MANAGED:
714                         execManagedClusterCommand(commandTask, handler);
715                         break;
716                     default:
717                         throw new UnsupportedOperationException();
718                 }
719             } catch (NoDeviceException e) {
720                 CLog.w(
721                         "no device meets requirements for cluster command [%s]; returning...",
722                         commandTask.getTaskId());
723                 CLog.w(e);
724                 IClusterEventUploader<ClusterCommandEvent> eventUploader =
725                         getClusterClient().getCommandEventUploader();
726                 ClusterCommandEvent.Builder eventBuilder =
727                         ClusterCommandEvent.createEventBuilder(commandTask)
728                                 .setHostName(ClusterHostUtil.getHostName())
729                                 .setType(ClusterCommandEvent.Type.AllocationFailed)
730                                 .setData(
731                                         ClusterCommandEvent.DATA_KEY_ERROR,
732                                         StreamUtil.getStackTrace(e));
733                 if (e.getErrorId() != null) {
734                     eventBuilder.setData(
735                             ClusterCommandEvent.DATA_KEY_ERROR_ID_NAME, e.getErrorId().name());
736                     eventBuilder.setData(
737                             ClusterCommandEvent.DATA_KEY_ERROR_ID_CODE, e.getErrorId().code());
738                     eventBuilder.setData(
739                             ClusterCommandEvent.DATA_KEY_ERROR_STATUS,
740                             ErrorStorageUtil.mapStatus(e.getErrorId().status()));
741                 }
742                 eventUploader.postEvent(eventBuilder.build());
743                 eventUploader.flush();
744             } catch (ConfigurationException | IOException | RuntimeException e) {
745                 CLog.w("failed to execute cluster command [%s]: %s", commandTask.getTaskId(), e);
746                 CLog.w(e);
747                 IClusterEventUploader<ClusterCommandEvent> eventUploader =
748                         getClusterClient().getCommandEventUploader();
749                 ClusterCommandEvent.Builder eventBuilder =
750                         ClusterCommandEvent.createEventBuilder(commandTask)
751                                 .setHostName(ClusterHostUtil.getHostName())
752                                 .setType(ClusterCommandEvent.Type.ConfigurationError)
753                                 .setData(
754                                         ClusterCommandEvent.DATA_KEY_ERROR,
755                                         StreamUtil.getStackTrace(e));
756                 if ((e instanceof IHarnessException)
757                         && ((IHarnessException) e).getErrorId() != null) {
758                     ErrorIdentifier errorId = ((IHarnessException) e).getErrorId();
759                     eventBuilder.setData(
760                             ClusterCommandEvent.DATA_KEY_ERROR_ID_NAME, errorId.name());
761                     eventBuilder.setData(
762                             ClusterCommandEvent.DATA_KEY_ERROR_ID_CODE, errorId.code());
763                     eventBuilder.setData(
764                             ClusterCommandEvent.DATA_KEY_ERROR_STATUS,
765                             ErrorStorageUtil.mapStatus(errorId.status()));
766                 }
767                 eventUploader.postEvent(eventBuilder.build());
768                 eventUploader.flush();
769             }
770             commandIdx++;
771         }
772     }
773 
execClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)774     void execClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)
775             throws ConfigurationException, IllegalArgumentException, NoDeviceException {
776         String cmdLine = commandTask.getCommandLine();
777         String[] args = QuotationAwareTokenizer.tokenizeLine(cmdLine);
778         // If it is a dry run command skip execution.
779         if (dryRunCommand(handler, args)) {
780             return;
781         }
782         // Append device serials to command.
783         // By assigning all applicable serials, TF will try one by one until allocation
784         // succeeds (or fails for all). This mitigates the issue where a single bad
785         // device can starve tests.
786         if (commandTask.getTargetDeviceSerials() != null) {
787             for (String serial : commandTask.getTargetDeviceSerials()) {
788                 cmdLine += " --serial ";
789                 cmdLine += ClusterHostUtil.getLocalDeviceSerial(serial);
790             }
791         }
792         CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), cmdLine);
793         execCommand(handler, QuotationAwareTokenizer.tokenizeLine(cmdLine));
794     }
795 
796     @VisibleForTesting
getClusterCommandConfigBuilder()797     ClusterCommandConfigBuilder getClusterCommandConfigBuilder() {
798         return new ClusterCommandConfigBuilder();
799     }
800 
execManagedClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)801     void execManagedClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)
802             throws IOException, ConfigurationException, NoDeviceException {
803         File workDir = null;
804         try {
805             workDir = new File(System.getProperty("java.io.tmpdir"), commandTask.getAttemptId());
806             workDir.mkdirs();
807             final String requestId = commandTask.getRequestId();
808             final String commandId = commandTask.getCommandId();
809             final IClusterClient client = getClusterClient();
810             final TestEnvironment testEnvironment = client.getTestEnvironment(requestId);
811             final List<TestResource> testResources = client.getTestResources(requestId);
812             final TestContext testContext = client.getTestContext(requestId, commandId);
813             testResources.addAll(testContext.getTestResources());
814             final File configFile =
815                     getClusterCommandConfigBuilder()
816                             .setWorkDir(workDir)
817                             .setClusterCommand(commandTask)
818                             .setTestEnvironment(testEnvironment)
819                             .setTestResources(testResources)
820                             .setTestContext(testContext)
821                             .build();
822             CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), configFile);
823             CLog.d("configFile: %s", FileUtil.readStringFromFile(configFile));
824             // FIXME: Find a way to upload a config file after an invocation is completed for
825             // debugging.
826             handler.setWorkDir(workDir);
827             execCommand(handler, new String[] {configFile.getAbsolutePath()});
828             // Unset workDir to avoid being cleaned up
829             workDir = null;
830         } catch (JSONException e) {
831             throw new RuntimeException(e);
832         } finally {
833             if (workDir != null) {
834                 FileUtil.recursiveDelete(workDir);
835             }
836         }
837     }
838 
839     /**
840      * Determines if a given command is a dry-run. If the command is a dry-run, validate it. If
841      * there are any configs issue, it will throw a ConfigurationException.
842      *
843      * @param handler {@link InvocationEventHandler} to report events for dry-run validation.
844      * @param args the command to validate.
845      * @return true if the command are a dry run, false otherwise.
846      * @throws ConfigurationException
847      */
dryRunCommand(final InvocationEventHandler handler, String[] args)848     protected boolean dryRunCommand(final InvocationEventHandler handler, String[] args)
849             throws ConfigurationException {
850         IConfiguration config = null;
851         try {
852             config = createConfiguration(args);
853         } catch (Throwable e) {
854             throw new ConfigurationException("Failed to create dry-run config", e);
855         }
856         if (config.getCommandOptions().isDryRunMode()) {
857             dryRunCommandReporting(handler, config);
858             return true;
859         }
860         return false;
861     }
862 
863     /** Get the {@link IClusterOptions} instance used to store cluster-related settings. */
getClusterOptions()864     IClusterOptions getClusterOptions() {
865         if (mClusterOptions == null) {
866             mClusterOptions = ClusterHostUtil.getClusterOptions();
867         }
868         return mClusterOptions;
869     }
870 
871     /** Get the {@link IClusterClient} instance used to interact with the TFC backend. */
getClusterClient()872     IClusterClient getClusterClient() {
873         if (mClusterClient == null) {
874             mClusterClient = ClusterHostUtil.getClusterClient();
875         }
876         return mClusterClient;
877     }
878 
879     /** Event triggered, to upload host states */
UploadHostEventWithState(HostState state)880     private void UploadHostEventWithState(HostState state) {
881         try {
882             IClusterEventUploader<ClusterHostEvent> Uploader =
883                     getClusterClient().getHostEventUploader();
884             ClusterHostEvent.Builder builder =
885                     new ClusterHostEvent.Builder()
886                             .setHostEventType(HostEventType.HostStateChanged)
887                             .setHostState(state);
888             CLog.d("event uploading with state %s", state.toString());
889             ClusterHostEvent event = builder.build();
890             Uploader.postEvent(event);
891             CLog.d("event %s uploaded with state %s", event.toString(), state.toString());
892             Uploader.flush();
893         } catch (RuntimeException e) {
894             CLog.e("failed to upload host state %s to TFC: %s", state.toString(), e);
895         }
896     }
897 
898     /**
899      * Notifies TFC of commands that were not executed and need to be rescheduled.
900      *
901      * @param commands a list of {@link ClusterCommand} that need to be unleased to get rescheduled.
902      */
unleaseCommands(final List<ClusterCommand> commands)903     private synchronized void unleaseCommands(final List<ClusterCommand> commands) {
904         IClusterEventUploader<ClusterCommandEvent> eventUploader =
905                 getClusterClient().getCommandEventUploader();
906         for (ClusterCommand command : commands) {
907             ClusterCommandEvent.Builder eventBuilder =
908                     ClusterCommandEvent.createEventBuilder(command)
909                             .setHostName(ClusterHostUtil.getHostName())
910                             .setType(ClusterCommandEvent.Type.Unleased);
911             eventUploader.postEvent(eventBuilder.build());
912         }
913         eventUploader.flush();
914     }
915 }
916