1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.android.tradefed.cluster; 17 18 import com.android.annotations.VisibleForTesting; 19 import com.android.tradefed.cluster.ClusterHostEvent.HostEventType; 20 import com.android.tradefed.command.CommandScheduler; 21 import com.android.tradefed.command.ICommandScheduler; 22 import com.android.tradefed.command.remote.DeviceDescriptor; 23 import com.android.tradefed.config.ConfigurationException; 24 import com.android.tradefed.config.IConfiguration; 25 import com.android.tradefed.device.DeviceAllocationState; 26 import com.android.tradefed.device.DeviceNotAvailableException; 27 import com.android.tradefed.device.FreeDeviceState; 28 import com.android.tradefed.device.IDeviceManager; 29 import com.android.tradefed.device.ITestDevice; 30 import com.android.tradefed.device.NoDeviceException; 31 import com.android.tradefed.device.TestDeviceState; 32 import com.android.tradefed.device.battery.BatteryController; 33 import com.android.tradefed.device.battery.IBatteryInfo; 34 import com.android.tradefed.device.battery.IBatteryInfo.BatteryState; 35 import com.android.tradefed.error.HarnessRuntimeException; 36 import com.android.tradefed.error.IHarnessException; 37 import com.android.tradefed.host.IHostOptions.PermitLimitType; 38 import com.android.tradefed.invoker.IInvocationContext; 39 import com.android.tradefed.invoker.logger.InvocationMetricLogger.InvocationMetricKey; 40 import com.android.tradefed.log.LogUtil.CLog; 41 import com.android.tradefed.result.CollectingTestListener; 42 import com.android.tradefed.result.FailureDescription; 43 import com.android.tradefed.result.ITestSummaryListener; 44 import com.android.tradefed.result.TestRunResult; 45 import com.android.tradefed.result.TestStatus; 46 import com.android.tradefed.result.TestSummary; 47 import com.android.tradefed.result.error.ErrorIdentifier; 48 import com.android.tradefed.result.error.ErrorStorageUtil; 49 import com.android.tradefed.result.error.InfraErrorIdentifier; 50 import com.android.tradefed.util.FileUtil; 51 import com.android.tradefed.util.MultiMap; 52 import com.android.tradefed.util.QuotationAwareTokenizer; 53 import com.android.tradefed.util.StreamUtil; 54 55 import com.google.common.primitives.Ints; 56 57 import org.json.JSONException; 58 59 import java.io.File; 60 import java.io.IOException; 61 import java.util.Arrays; 62 import java.util.Collection; 63 import java.util.Collections; 64 import java.util.HashMap; 65 import java.util.HashSet; 66 import java.util.LinkedList; 67 import java.util.List; 68 import java.util.Map; 69 import java.util.Optional; 70 import java.util.Set; 71 import java.util.concurrent.RejectedExecutionHandler; 72 import java.util.concurrent.ScheduledFuture; 73 import java.util.concurrent.ScheduledThreadPoolExecutor; 74 import java.util.concurrent.ThreadFactory; 75 import java.util.concurrent.ThreadPoolExecutor; 76 import java.util.concurrent.TimeUnit; 77 78 /** 79 * A {@link ICommandScheduler} to support TFC (Tradefed Cluster). This scheduler runs commands from 80 * TFC command-queue and uploads invocation events to TFC command-event-queue. 81 */ 82 public class ClusterCommandScheduler extends CommandScheduler { 83 84 // Errors that should not be retried. 85 private static final Set<InfraErrorIdentifier> NONE_RETRIABLE_CONFIG_ERRORS = 86 new HashSet<>(Arrays.asList(InfraErrorIdentifier.OPTION_CONFIGURATION_ERROR)); 87 88 /** The {@link ScheduledThreadPoolExecutor} used to manage heartbeats. */ 89 private ScheduledThreadPoolExecutor mHeartbeatThreadPool = null; 90 91 /** The {@link IClusterOptions} instance used to store cluster-related settings. */ 92 private IClusterOptions mClusterOptions; 93 94 /** The {@link IClusterClient} instance used to interact with the TFC backend. */ 95 private IClusterClient mClusterClient; 96 97 /** 98 * A {@link ThreadFactory} which returns threads in a dedicated heartbeat group. 99 * 100 * <p>This class is used as a factory by {@code mHeartbeatThreadPool} in order to segregate 101 * heartbeat threads from other "stray" threads to avoid tripping loose thread detection in 102 * {@link CommandScheduler}. 103 */ 104 private static class HeartbeatThreadFactory implements ThreadFactory { 105 private static final ThreadGroup HB_GROUP; 106 107 static { 108 // fetch root thread group as this class may be initialized by an invocation thread 109 ThreadGroup tg = Thread.currentThread().getThreadGroup(); 110 while (tg.getParent() != null) { 111 tg = tg.getParent(); 112 } 113 HB_GROUP = new ThreadGroup(tg, "ClusterCommandScheduler.heartbeat"); 114 } 115 116 @Override newThread(Runnable r)117 public Thread newThread(Runnable r) { 118 Thread thread = new Thread(HB_GROUP, r); 119 // heartbeat should always get cancelled, but ensure it doesn't prevent JVM exit 120 thread.setDaemon(true); 121 return thread; 122 } 123 } 124 125 /** {@inheritDoc} */ 126 @Override start()127 public void start() { 128 UploadHostEventWithState(HostState.RUNNING); 129 super.start(); 130 } 131 132 /** {@inheritDoc} */ 133 @Override shutdown()134 public void shutdown() { 135 UploadHostEventWithState(HostState.QUITTING); 136 getHeartbeatThreadPool().shutdown(); 137 super.shutdown(); 138 } 139 140 @Override shutdownHard()141 public synchronized void shutdownHard() { 142 UploadHostEventWithState(HostState.KILLING); 143 getHeartbeatThreadPool().shutdown(); 144 super.shutdownHard(); 145 } 146 147 /** 148 * A {@link com.android.tradefed.command.ICommandScheduler.IScheduledInvocationListener} to 149 * upload events to TFC. 150 */ 151 class InvocationEventHandler extends CollectingTestListener 152 implements IScheduledInvocationListener, ITestSummaryListener { 153 154 private ScheduledFuture<?> mHeartbeat; 155 private final ClusterCommand mCommandTask; 156 private Set<String> mDeviceSerials = new HashSet<>(); 157 private String mSummary; 158 private Set<String> processedSummaries = new HashSet<>(); 159 private FailureDescription mFailureDescription; 160 private String mError; 161 private String mSubprocessCommandError; 162 private File mWorkDir; 163 private InvocationStatus mInvocationStatus; 164 private boolean mCanceled = false; 165 166 /** 167 * Creates a {@link InvocationEventHandler} to track the given {@link ClusterCommand}. 168 * 169 * @param commandTask the {@link ClusterCommand} to track. 170 */ InvocationEventHandler(ClusterCommand commandTask)171 public InvocationEventHandler(ClusterCommand commandTask) { 172 mCommandTask = commandTask; 173 } 174 175 /** 176 * Sets a work directory for an invocation. 177 * 178 * @param dir a work directory. 179 */ setWorkDir(File dir)180 public void setWorkDir(File dir) { 181 mWorkDir = dir; 182 } 183 184 @VisibleForTesting setCanceled(boolean value)185 void setCanceled(boolean value) { 186 mCanceled = value; 187 } 188 createEventBuilder()189 private ClusterCommandEvent.Builder createEventBuilder() { 190 final ClusterCommandEvent.Builder builder = 191 ClusterCommandEvent.createEventBuilder(mCommandTask) 192 .setHostName(ClusterHostUtil.getHostName()); 193 if (!mDeviceSerials.isEmpty()) { 194 builder.setDeviceSerials(mDeviceSerials); 195 } 196 return builder; 197 } 198 updateInvocationStatus()199 private void updateInvocationStatus() { 200 if (!getClusterOptions().shouldUploadInvocationStatus()) { 201 return; 202 } 203 final InvocationStatus obj = new InvocationStatus(); 204 final Collection<TestRunResult> testRunResults = this.getMergedTestRunResults(); 205 for (final TestRunResult result : testRunResults) { 206 final TestGroupStatus testGroupStatus = 207 new TestGroupStatus( 208 result.getName(), 209 result.getNumTests(), 210 result.getNumCompleteTests(), 211 result.getNumAllFailedTests(), 212 result.getNumTestsInState(TestStatus.PASSED), 213 result.isRunComplete(), 214 result.getElapsedTime()); 215 obj.addTestGroupStatus(testGroupStatus); 216 } 217 mInvocationStatus = obj; 218 } 219 220 /** {@inheritDoc} */ 221 @Override invocationInitiated(IInvocationContext context)222 public void invocationInitiated(IInvocationContext context) { 223 for (ITestDevice device : context.getDevices()) { 224 mDeviceSerials.add(device.getSerialNumber()); 225 } 226 final ClusterCommandEvent event = 227 createEventBuilder() 228 .setType(ClusterCommandEvent.Type.InvocationInitiated) 229 .build(); 230 getClusterClient().getCommandEventUploader().postEvent(event); 231 getClusterClient().getCommandEventUploader().flush(); 232 mHeartbeat = startHeartbeat(); 233 // Check that devices are in charging state before starting the invocation. 234 for (ITestDevice device : context.getDevices()) { 235 try { 236 BatteryState state = BatteryController.getDeviceChargingState(device); 237 if (BatteryState.NOT_CHARGING.equals(state)) { 238 IBatteryInfo info = BatteryController.getBatteryInfoForDevice(device); 239 if (info != null) { 240 info.enableCharging(device); 241 } 242 } 243 } catch (DeviceNotAvailableException e) { 244 CLog.e(e); 245 } 246 } 247 } 248 249 /** {@inheritDoc} */ 250 @Override invocationStarted(IInvocationContext context)251 public void invocationStarted(IInvocationContext context) { 252 super.invocationStarted(context); 253 final ClusterCommandEvent event = 254 createEventBuilder() 255 .setType(ClusterCommandEvent.Type.InvocationStarted) 256 .build(); 257 getClusterClient().getCommandEventUploader().postEvent(event); 258 getClusterClient().getCommandEventUploader().flush(); 259 } 260 261 @Override testRunStarted(String name, int numTests)262 public void testRunStarted(String name, int numTests) { 263 testRunStarted(name, numTests, 0); 264 } 265 266 @Override testRunStarted(String name, int numTests, int attemptNumber)267 public void testRunStarted(String name, int numTests, int attemptNumber) { 268 testRunStarted(name, numTests, attemptNumber, System.currentTimeMillis()); 269 } 270 271 /** {@inheritDoc} */ 272 @Override testRunStarted(String name, int numTests, int attemptNumber, long startTime)273 public void testRunStarted(String name, int numTests, int attemptNumber, long startTime) { 274 super.testRunStarted(name, numTests, attemptNumber, startTime); 275 updateInvocationStatus(); 276 } 277 278 /** {@inheritDoc} */ 279 @Override invocationFailed(Throwable cause)280 public void invocationFailed(Throwable cause) { 281 super.invocationFailed(cause); 282 283 mError = StreamUtil.getStackTrace(cause); 284 if (cause instanceof SubprocessCommandException && cause.getCause() != null) { 285 // The inner exception holds an exception stack trace from a subprocess. 286 mSubprocessCommandError = cause.getCause().getMessage(); 287 } 288 } 289 290 /** {@inheritDoc} */ 291 @Override invocationFailed(FailureDescription failure)292 public void invocationFailed(FailureDescription failure) { 293 super.invocationFailed(failure); 294 295 mFailureDescription = failure; 296 mError = failure.getErrorMessage(); 297 if (failure.getCause() != null) { 298 Throwable cause = failure.getCause(); 299 mError = StreamUtil.getStackTrace(cause); 300 if (cause instanceof HarnessRuntimeException 301 && InfraErrorIdentifier.TRADEFED_SKIPPED_TESTS_DURING_SHUTDOWN.equals( 302 ((HarnessRuntimeException) cause).getErrorId())) { 303 // Tests were not run, so un-lease the command so that it can be rescheduled. 304 unleaseCommands(Arrays.asList(mCommandTask)); 305 } 306 } 307 } 308 309 /** {@inheritDoc} */ 310 @Override invocationEnded(long elapsedTime)311 public void invocationEnded(long elapsedTime) { 312 super.invocationEnded(elapsedTime); 313 314 ClusterCommandEvent event = 315 createEventBuilder() 316 .setType(ClusterCommandEvent.Type.InvocationEnded) 317 .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError) 318 .setData( 319 ClusterCommandEvent.DATA_KEY_SUBPROCESS_COMMAND_ERROR, 320 mSubprocessCommandError) 321 .build(); 322 getClusterClient().getCommandEventUploader().postEvent(event); 323 getClusterClient().getCommandEventUploader().flush(); 324 } 325 326 /** {@inheritDoc} */ 327 @Override invocationComplete( IInvocationContext metadata, Map<ITestDevice, FreeDeviceState> devicesStates)328 public void invocationComplete( 329 IInvocationContext metadata, Map<ITestDevice, FreeDeviceState> devicesStates) { 330 CLog.d("ClusterCommand invocationComplete start."); 331 if (mWorkDir != null) { 332 FileUtil.recursiveDelete(mWorkDir); 333 } 334 335 // TODO: handle multi-device where only one of the build could be missing. 336 ErrorIdentifier errorId = null; 337 if (getPrimaryBuildInfo() == null && mError == null) { 338 mError = "build not found"; 339 // Test that the filesystem is working as it's the main reason for this error 340 // situation to occur 341 try { 342 File f = FileUtil.createTempFile("test-filesystem", ".txt"); 343 FileUtil.deleteFile(f); 344 } catch (IOException e) { 345 errorId = InfraErrorIdentifier.LAB_HOST_FILESYSTEM_ERROR; 346 mError = 347 String.format( 348 "[%s] Filesystem error on %s. Please notify lab admin.", 349 errorId.name(), ClusterHostUtil.getHostName()); 350 } 351 } 352 if (errorId == null && mFailureDescription != null) { 353 errorId = mFailureDescription.getErrorIdentifier(); 354 } 355 356 String fetchBuildTimeMillis = "-1"; 357 String setupTimeMillis = "-1"; 358 String lostDevice = null; 359 if (metadata != null) { 360 fetchBuildTimeMillis = 361 metadata.getAttributes() 362 .getUniqueMap() 363 .get(InvocationMetricKey.FETCH_BUILD.toString()); 364 setupTimeMillis = 365 metadata.getAttributes() 366 .getUniqueMap() 367 .get(InvocationMetricKey.SETUP.toString()); 368 lostDevice = 369 metadata.getAttributes() 370 .getUniqueMap() 371 .get(InvocationMetricKey.DEVICE_LOST_DETECTED.toString()); 372 } 373 374 // Stop heartbeat thread before sending InvocationCompleted event. 375 if (mHeartbeat != null) { 376 mHeartbeat.cancel(true); 377 } 378 updateInvocationStatus(); 379 final ClusterCommandEvent.Builder eventBuilder = 380 createEventBuilder() 381 .setType(ClusterCommandEvent.Type.InvocationCompleted) 382 .setInvocationStatus(mInvocationStatus) 383 .setData(ClusterCommandEvent.DATA_KEY_ERROR, mError) 384 .setData( 385 ClusterCommandEvent.DATA_KEY_SUBPROCESS_COMMAND_ERROR, 386 mSubprocessCommandError) 387 .setData(ClusterCommandEvent.DATA_KEY_SUMMARY, mSummary) 388 .setData( 389 ClusterCommandEvent.DATA_KEY_FETCH_BUILD_TIME_MILLIS, 390 fetchBuildTimeMillis) 391 .setData( 392 ClusterCommandEvent.DATA_KEY_SETUP_TIME_MILLIS, setupTimeMillis) 393 .setData( 394 ClusterCommandEvent.DATA_KEY_TOTAL_TEST_COUNT, 395 Integer.toString(getNumTotalTests())) 396 .setData( 397 ClusterCommandEvent.DATA_KEY_FAILED_TEST_COUNT, 398 Integer.toString(getNumAllFailedTests())) 399 .setData( 400 ClusterCommandEvent.DATA_KEY_PASSED_TEST_COUNT, 401 Integer.toString(getNumTestsInState(TestStatus.PASSED))) 402 .setData( 403 ClusterCommandEvent.DATA_KEY_FAILED_TEST_RUN_COUNT, 404 Integer.toString(getNumAllFailedTestRuns())); 405 if (errorId != null) { 406 // Report ConfigurationError for known errors to prevent test retry. 407 if (NONE_RETRIABLE_CONFIG_ERRORS.contains(errorId)) { 408 eventBuilder.setType(ClusterCommandEvent.Type.ConfigurationError); 409 } 410 eventBuilder.setData(ClusterCommandEvent.DATA_KEY_ERROR_ID_NAME, errorId.name()); 411 eventBuilder.setData(ClusterCommandEvent.DATA_KEY_ERROR_ID_CODE, errorId.code()); 412 eventBuilder.setData( 413 ClusterCommandEvent.DATA_KEY_ERROR_STATUS, 414 ErrorStorageUtil.mapStatus(errorId.status())); 415 } 416 if (lostDevice != null) { 417 eventBuilder.setData(ClusterCommandEvent.DATA_KEY_LOST_DEVICE_DETECTED, lostDevice); 418 } 419 final ClusterCommandEvent event = eventBuilder.build(); 420 getClusterClient().getCommandEventUploader().postEvent(event); 421 getClusterClient().getCommandEventUploader().flush(); 422 CLog.d("ClusterCommand invocationComplete done."); 423 } 424 425 /** {@inheritDoc} */ 426 @Override putEarlySummary(List<TestSummary> summaries)427 public void putEarlySummary(List<TestSummary> summaries) { 428 if (getClusterOptions().shouldCollectEarlyTestSummary()) { 429 putSummary(summaries); 430 } 431 } 432 433 /** {@inheritDoc} */ 434 @Override putSummary(List<TestSummary> summaries)435 public void putSummary(List<TestSummary> summaries) { 436 StringBuilder sb = new StringBuilder(); 437 for (final TestSummary summary : summaries) { 438 String summaryString = summary.getSummary().toString(); 439 if (!processedSummaries.contains(summaryString)) { 440 processedSummaries.add(summaryString); 441 sb.append(summaryString); 442 sb.append("\n"); 443 } 444 } 445 mSummary = mSummary == null ? sb.toString() : mSummary + sb.toString(); 446 } 447 startHeartbeat()448 private ScheduledFuture<?> startHeartbeat() { 449 return getHeartbeatThreadPool() 450 .scheduleAtFixedRate( 451 new HeartbeatSender(), 452 0, 453 getClusterOptions().getInvocationHeartbeatInterval(), 454 TimeUnit.MILLISECONDS); 455 } 456 457 class HeartbeatSender implements Runnable { 458 @Override run()459 public void run() { 460 try { 461 // Check cluster command's status. 462 if (getClusterOptions().checkCommandState() && !mCanceled) { 463 ClusterCommandStatus commandStatus = 464 getClusterClient() 465 .getCommandStatus( 466 mCommandTask.getRequestId(), 467 mCommandTask.getCommandId()); 468 if (ClusterCommand.State.CANCELED.equals(commandStatus.getState())) { 469 mCanceled = true; 470 String cause = 471 String.format( 472 "The cluster client %s has marked command" 473 + " (requestId=%s, commandId=%s) canceled with" 474 + " reason: %s", 475 getClusterClient().getClass().getSimpleName(), 476 mCommandTask.getRequestId(), 477 mCommandTask.getCommandId(), 478 commandStatus.getCancelReason()); 479 CLog.w("Stop invocation due to: %s", cause); 480 Optional.ofNullable(getInvocationContext()) 481 .map(IInvocationContext::getInvocationId) 482 .map(Ints::tryParse) 483 .ifPresent(invocationId -> stopInvocation(invocationId, cause)); 484 } else if (ClusterCommand.State.COMPLETED.equals( 485 commandStatus.getState())) { 486 CLog.d("Invocation completed, skip reporting heartbeat."); 487 return; 488 } 489 } 490 491 final ClusterCommandEvent event = 492 createEventBuilder() 493 .setType(ClusterCommandEvent.Type.TestRunInProgress) 494 .setInvocationStatus(mInvocationStatus) 495 .build(); 496 getClusterClient().getCommandEventUploader().postEvent(event); 497 } catch (Exception e) { 498 CLog.e("Error sending heartbeat to TFC:"); 499 CLog.e(e); 500 } 501 } 502 } 503 } 504 getHeartbeatThreadPool()505 synchronized ScheduledThreadPoolExecutor getHeartbeatThreadPool() { 506 if (mHeartbeatThreadPool == null) { 507 mHeartbeatThreadPool = new ScheduledThreadPoolExecutor(1, new HeartbeatThreadFactory()); 508 // instead of throwing some exception on shutdown we simply log it. 509 mHeartbeatThreadPool.setRejectedExecutionHandler( 510 new RejectedExecutionHandler() { 511 @Override 512 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 513 CLog.w( 514 "Rejecting Task %s rejected from executor %s", 515 r.toString(), e.toString()); 516 } 517 }); 518 // continue existing heartbeats after shutdown (until invocation is complete) 519 mHeartbeatThreadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(true); 520 } 521 return mHeartbeatThreadPool; 522 } 523 524 /** {@inheritDoc} */ 525 @Override processReadyCommands(IDeviceManager manager)526 protected void processReadyCommands(IDeviceManager manager) { 527 super.processReadyCommands(manager); 528 529 if (isShuttingDown()) { 530 return; 531 } 532 533 List<ClusterCommand> commands = null; 534 MultiMap<String, DeviceDescriptor> devices = getAvailableDevices(manager); 535 if (devices.isEmpty()) { 536 CLog.d("No devices are available for testing."); 537 return; 538 } 539 // Lease command tasks through the leasehosttasks API. 540 // Here we get all devices (available or not), so TFC will analyze the device tree to 541 // decide which group is allocated and which group is available. 542 devices = getDevices(manager, false); 543 commands = fetchHostCommands(devices); 544 if (commands.isEmpty()) { 545 CLog.d("No commands available for testing."); 546 return; 547 } 548 if (isShuttingDown()) { 549 CLog.d("Tradefed shutting down, unleasing commands."); 550 unleaseCommands(commands); 551 return; 552 } 553 execCommands(commands); 554 } 555 556 /** 557 * Returns a map containing available devices grouped by their types. 558 * 559 * @param manager a {@link IDeviceManager}. 560 * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices. 561 */ getAvailableDevices(IDeviceManager manager)562 MultiMap<String, DeviceDescriptor> getAvailableDevices(IDeviceManager manager) { 563 return getDevices(manager, true); 564 } 565 566 /** 567 * Returns a map containing devices grouped by their types. 568 * 569 * @param manager a {@link IDeviceManager}. 570 * @param availableOnly only return available devices or all devices. 571 * @return a {@link MultiMap} of String to DeviceDescriptor containing available devices. 572 */ getDevices(IDeviceManager manager, boolean availableOnly)573 MultiMap<String, DeviceDescriptor> getDevices(IDeviceManager manager, boolean availableOnly) { 574 // Getting available device types 575 final MultiMap<String, DeviceDescriptor> devices = new MultiMap<>(); 576 for (final DeviceDescriptor device : manager.listAllDevices()) { 577 if (availableOnly && device.getState() != DeviceAllocationState.Available) { 578 continue; 579 } 580 TestDeviceState deviceState = device.getTestDeviceState(); 581 if (TestDeviceState.FASTBOOT.equals(deviceState) 582 || TestDeviceState.FASTBOOTD.equals(deviceState)) { 583 continue; 584 } 585 if (ClusterHostUtil.isLocalhostIpPort(device.getSerial())) { 586 // Skipping localhost IP:PORT serials from cluster scheduling to avoid scheduling 587 // tests on TCP devices created by Local/RemoteAndroidVirtualDevice. 588 continue; 589 } 590 String runTargetFormat = getClusterOptions().getRunTargetFormat(); 591 String runTarget = 592 ClusterHostUtil.getRunTarget( 593 device, runTargetFormat, getClusterOptions().getDeviceTag()); 594 devices.put(runTarget, device); 595 } 596 return devices; 597 } 598 permitsAvailableToSchedule()599 private int permitsAvailableToSchedule() { 600 if (!getClusterOptions().checkPermitsOnLease()) { 601 return Integer.MAX_VALUE; 602 } 603 for (PermitLimitType permit : PermitLimitType.values()) { 604 if (getHostOptions().getAvailablePermits(permit) <= 0) { 605 CLog.i("There is no available '%s' permits. Not leasing any additional commands.", 606 permit); 607 return 0; 608 } 609 } 610 // Assumption is that download permits eventually become flashing permits 611 // TODO: Improve to track after download until flashing 612 int heuriticPermitCalculation = 613 getHostOptions().getAvailablePermits(PermitLimitType.CONCURRENT_FLASHER) 614 - getHostOptions().getInUsePermits(PermitLimitType.CONCURRENT_DOWNLOAD); 615 if (heuriticPermitCalculation < 0) { 616 CLog.i( 617 "Download permits will exceed the flashing limit and might create permit" 618 + " delays. Not Leasing."); 619 return 0; 620 } 621 return heuriticPermitCalculation; 622 } 623 checkDiskSpace()624 private boolean checkDiskSpace() { 625 if (getClusterOptions().maxDiskUsagePercentage() == 100L) { 626 return true; 627 } 628 File rootPartition = new File("/"); 629 long freeSpace = 630 (long) (rootPartition.getUsableSpace() * 100.0) / rootPartition.getTotalSpace(); 631 long usage = 100L - freeSpace; 632 if (usage > getClusterOptions().maxDiskUsagePercentage()) { 633 CLog.i("Disk space utilization is '%s%%'. Stop leasing.", usage); 634 return false; 635 } 636 return true; 637 } 638 639 /** 640 * Fetches commands for devices from the Tradefed Cluster's leasehosttasks API. 641 * 642 * @param devices a {@link MultiMap} of String to DeviceDescriptor containing devices. 643 * @return a list of {@link ClusterCommand}s. 644 */ fetchHostCommands(final MultiMap<String, DeviceDescriptor> devices)645 List<ClusterCommand> fetchHostCommands(final MultiMap<String, DeviceDescriptor> devices) { 646 CLog.d("fetching cluster host commands from leasehosttasks..."); 647 int permitsAvailable = permitsAvailableToSchedule(); 648 if (permitsAvailable <= 0) { 649 return Collections.<ClusterCommand>emptyList(); 650 } 651 // Check disk space before scheduling 652 if (!checkDiskSpace()) { 653 return Collections.<ClusterCommand>emptyList(); 654 } 655 656 final IClusterOptions options = getClusterOptions(); 657 final MultiMap<String, String> deviceGroups = options.getDeviceGroup(); 658 final Map<String, String> deviceToGroup = new HashMap<>(); 659 for (String group : deviceGroups.keySet()) { 660 for (String deviceSerial : deviceGroups.get(group)) { 661 deviceToGroup.put(deviceSerial, group); 662 } 663 } 664 List<ClusterDeviceInfo> deviceInfos = new LinkedList<>(); 665 for (String runTarget : devices.keySet()) { 666 for (DeviceDescriptor d : devices.get(runTarget)) { 667 String groupName = deviceToGroup.getOrDefault(d.getSerial(), null); 668 ClusterDeviceInfo deviceInfo = 669 new ClusterDeviceInfo.Builder() 670 .setDeviceDescriptor(d) 671 .setRunTarget(runTarget) 672 .setGroupName(groupName) 673 .build(); 674 deviceInfos.add(deviceInfo); 675 } 676 } 677 try { 678 int count = Math.min(deviceInfos.size(), permitsAvailable); 679 List<ClusterCommand> commands = 680 getClusterClient() 681 .leaseHostCommands( 682 options.getClusterId(), 683 ClusterHostUtil.getHostName(), 684 deviceInfos, 685 options.getNextClusterIds(), 686 count); 687 return commands; 688 } catch (JSONException e) { 689 CLog.e(e); 690 return Collections.<ClusterCommand>emptyList(); 691 } 692 } 693 694 /** 695 * Executes commands fetched from the cluster command queue. 696 * 697 * @param commands a list of {@link ClusterCommand}s fetched from the cluster command queue. 698 */ execCommands(final List<ClusterCommand> commands)699 void execCommands(final List<ClusterCommand> commands) { 700 int commandIdx = 0; 701 for (final ClusterCommand commandTask : commands) { 702 if (isShuttingDown()) { 703 CLog.d("Tradefed shutting down, unleasing remaining commands."); 704 unleaseCommands(commands.subList(commandIdx, commands.size())); 705 return; 706 } 707 try { 708 final InvocationEventHandler handler = new InvocationEventHandler(commandTask); 709 switch (commandTask.getRequestType()) { 710 case UNMANAGED: 711 execClusterCommand(commandTask, handler); 712 break; 713 case MANAGED: 714 execManagedClusterCommand(commandTask, handler); 715 break; 716 default: 717 throw new UnsupportedOperationException(); 718 } 719 } catch (NoDeviceException e) { 720 CLog.w( 721 "no device meets requirements for cluster command [%s]; returning...", 722 commandTask.getTaskId()); 723 CLog.w(e); 724 IClusterEventUploader<ClusterCommandEvent> eventUploader = 725 getClusterClient().getCommandEventUploader(); 726 ClusterCommandEvent.Builder eventBuilder = 727 ClusterCommandEvent.createEventBuilder(commandTask) 728 .setHostName(ClusterHostUtil.getHostName()) 729 .setType(ClusterCommandEvent.Type.AllocationFailed) 730 .setData( 731 ClusterCommandEvent.DATA_KEY_ERROR, 732 StreamUtil.getStackTrace(e)); 733 if (e.getErrorId() != null) { 734 eventBuilder.setData( 735 ClusterCommandEvent.DATA_KEY_ERROR_ID_NAME, e.getErrorId().name()); 736 eventBuilder.setData( 737 ClusterCommandEvent.DATA_KEY_ERROR_ID_CODE, e.getErrorId().code()); 738 eventBuilder.setData( 739 ClusterCommandEvent.DATA_KEY_ERROR_STATUS, 740 ErrorStorageUtil.mapStatus(e.getErrorId().status())); 741 } 742 eventUploader.postEvent(eventBuilder.build()); 743 eventUploader.flush(); 744 } catch (ConfigurationException | IOException | RuntimeException e) { 745 CLog.w("failed to execute cluster command [%s]: %s", commandTask.getTaskId(), e); 746 CLog.w(e); 747 IClusterEventUploader<ClusterCommandEvent> eventUploader = 748 getClusterClient().getCommandEventUploader(); 749 ClusterCommandEvent.Builder eventBuilder = 750 ClusterCommandEvent.createEventBuilder(commandTask) 751 .setHostName(ClusterHostUtil.getHostName()) 752 .setType(ClusterCommandEvent.Type.ConfigurationError) 753 .setData( 754 ClusterCommandEvent.DATA_KEY_ERROR, 755 StreamUtil.getStackTrace(e)); 756 if ((e instanceof IHarnessException) 757 && ((IHarnessException) e).getErrorId() != null) { 758 ErrorIdentifier errorId = ((IHarnessException) e).getErrorId(); 759 eventBuilder.setData( 760 ClusterCommandEvent.DATA_KEY_ERROR_ID_NAME, errorId.name()); 761 eventBuilder.setData( 762 ClusterCommandEvent.DATA_KEY_ERROR_ID_CODE, errorId.code()); 763 eventBuilder.setData( 764 ClusterCommandEvent.DATA_KEY_ERROR_STATUS, 765 ErrorStorageUtil.mapStatus(errorId.status())); 766 } 767 eventUploader.postEvent(eventBuilder.build()); 768 eventUploader.flush(); 769 } 770 commandIdx++; 771 } 772 } 773 execClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)774 void execClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler) 775 throws ConfigurationException, IllegalArgumentException, NoDeviceException { 776 String cmdLine = commandTask.getCommandLine(); 777 String[] args = QuotationAwareTokenizer.tokenizeLine(cmdLine); 778 // If it is a dry run command skip execution. 779 if (dryRunCommand(handler, args)) { 780 return; 781 } 782 // Append device serials to command. 783 // By assigning all applicable serials, TF will try one by one until allocation 784 // succeeds (or fails for all). This mitigates the issue where a single bad 785 // device can starve tests. 786 if (commandTask.getTargetDeviceSerials() != null) { 787 for (String serial : commandTask.getTargetDeviceSerials()) { 788 cmdLine += " --serial "; 789 cmdLine += ClusterHostUtil.getLocalDeviceSerial(serial); 790 } 791 } 792 CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), cmdLine); 793 execCommand(handler, QuotationAwareTokenizer.tokenizeLine(cmdLine)); 794 } 795 796 @VisibleForTesting getClusterCommandConfigBuilder()797 ClusterCommandConfigBuilder getClusterCommandConfigBuilder() { 798 return new ClusterCommandConfigBuilder(); 799 } 800 execManagedClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler)801 void execManagedClusterCommand(ClusterCommand commandTask, InvocationEventHandler handler) 802 throws IOException, ConfigurationException, NoDeviceException { 803 File workDir = null; 804 try { 805 workDir = new File(System.getProperty("java.io.tmpdir"), commandTask.getAttemptId()); 806 workDir.mkdirs(); 807 final String requestId = commandTask.getRequestId(); 808 final String commandId = commandTask.getCommandId(); 809 final IClusterClient client = getClusterClient(); 810 final TestEnvironment testEnvironment = client.getTestEnvironment(requestId); 811 final List<TestResource> testResources = client.getTestResources(requestId); 812 final TestContext testContext = client.getTestContext(requestId, commandId); 813 testResources.addAll(testContext.getTestResources()); 814 final File configFile = 815 getClusterCommandConfigBuilder() 816 .setWorkDir(workDir) 817 .setClusterCommand(commandTask) 818 .setTestEnvironment(testEnvironment) 819 .setTestResources(testResources) 820 .setTestContext(testContext) 821 .build(); 822 CLog.i("executing cluster command: [%s] %s", commandTask.getTaskId(), configFile); 823 CLog.d("configFile: %s", FileUtil.readStringFromFile(configFile)); 824 // FIXME: Find a way to upload a config file after an invocation is completed for 825 // debugging. 826 handler.setWorkDir(workDir); 827 execCommand(handler, new String[] {configFile.getAbsolutePath()}); 828 // Unset workDir to avoid being cleaned up 829 workDir = null; 830 } catch (JSONException e) { 831 throw new RuntimeException(e); 832 } finally { 833 if (workDir != null) { 834 FileUtil.recursiveDelete(workDir); 835 } 836 } 837 } 838 839 /** 840 * Determines if a given command is a dry-run. If the command is a dry-run, validate it. If 841 * there are any configs issue, it will throw a ConfigurationException. 842 * 843 * @param handler {@link InvocationEventHandler} to report events for dry-run validation. 844 * @param args the command to validate. 845 * @return true if the command are a dry run, false otherwise. 846 * @throws ConfigurationException 847 */ dryRunCommand(final InvocationEventHandler handler, String[] args)848 protected boolean dryRunCommand(final InvocationEventHandler handler, String[] args) 849 throws ConfigurationException { 850 IConfiguration config = null; 851 try { 852 config = createConfiguration(args); 853 } catch (Throwable e) { 854 throw new ConfigurationException("Failed to create dry-run config", e); 855 } 856 if (config.getCommandOptions().isDryRunMode()) { 857 dryRunCommandReporting(handler, config); 858 return true; 859 } 860 return false; 861 } 862 863 /** Get the {@link IClusterOptions} instance used to store cluster-related settings. */ getClusterOptions()864 IClusterOptions getClusterOptions() { 865 if (mClusterOptions == null) { 866 mClusterOptions = ClusterHostUtil.getClusterOptions(); 867 } 868 return mClusterOptions; 869 } 870 871 /** Get the {@link IClusterClient} instance used to interact with the TFC backend. */ getClusterClient()872 IClusterClient getClusterClient() { 873 if (mClusterClient == null) { 874 mClusterClient = ClusterHostUtil.getClusterClient(); 875 } 876 return mClusterClient; 877 } 878 879 /** Event triggered, to upload host states */ UploadHostEventWithState(HostState state)880 private void UploadHostEventWithState(HostState state) { 881 try { 882 IClusterEventUploader<ClusterHostEvent> Uploader = 883 getClusterClient().getHostEventUploader(); 884 ClusterHostEvent.Builder builder = 885 new ClusterHostEvent.Builder() 886 .setHostEventType(HostEventType.HostStateChanged) 887 .setHostState(state); 888 CLog.d("event uploading with state %s", state.toString()); 889 ClusterHostEvent event = builder.build(); 890 Uploader.postEvent(event); 891 CLog.d("event %s uploaded with state %s", event.toString(), state.toString()); 892 Uploader.flush(); 893 } catch (RuntimeException e) { 894 CLog.e("failed to upload host state %s to TFC: %s", state.toString(), e); 895 } 896 } 897 898 /** 899 * Notifies TFC of commands that were not executed and need to be rescheduled. 900 * 901 * @param commands a list of {@link ClusterCommand} that need to be unleased to get rescheduled. 902 */ unleaseCommands(final List<ClusterCommand> commands)903 private synchronized void unleaseCommands(final List<ClusterCommand> commands) { 904 IClusterEventUploader<ClusterCommandEvent> eventUploader = 905 getClusterClient().getCommandEventUploader(); 906 for (ClusterCommand command : commands) { 907 ClusterCommandEvent.Builder eventBuilder = 908 ClusterCommandEvent.createEventBuilder(command) 909 .setHostName(ClusterHostUtil.getHostName()) 910 .setType(ClusterCommandEvent.Type.Unleased); 911 eventUploader.postEvent(eventBuilder.build()); 912 } 913 eventUploader.flush(); 914 } 915 } 916