1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.android.tradefed.config.remote; 17 18 import com.android.annotations.VisibleForTesting; 19 import com.android.tradefed.build.BuildRetrievalError; 20 import com.android.tradefed.build.gcs.GCSDownloaderHelper; 21 import com.android.tradefed.config.DynamicRemoteFileResolver; 22 import com.android.tradefed.invoker.logger.CurrentInvocation; 23 import com.android.tradefed.invoker.logger.CurrentInvocation.InvocationInfo; 24 import com.android.tradefed.invoker.tracing.CloseableTraceScope; 25 import com.android.tradefed.invoker.tracing.TracePropagatingExecutorService; 26 import com.android.tradefed.log.LogUtil.CLog; 27 import com.android.tradefed.result.error.InfraErrorIdentifier; 28 import com.android.tradefed.util.FileUtil; 29 import com.android.tradefed.util.GCSFileDownloader; 30 import com.android.tradefed.util.RunUtil; 31 import com.android.tradefed.util.ZipUtil2; 32 33 import java.io.File; 34 import java.io.IOException; 35 import java.util.AbstractMap; 36 import java.util.Map; 37 import java.util.Map.Entry; 38 import java.util.concurrent.CompletableFuture; 39 import java.util.concurrent.Executors; 40 import java.util.concurrent.Future; 41 import java.util.concurrent.ThreadFactory; 42 import java.util.concurrent.atomic.AtomicInteger; 43 44 import javax.annotation.Nonnull; 45 46 /** Implementation of {@link IRemoteFileResolver} that allows downloading from a GCS bucket. */ 47 public class GcsRemoteFileResolver implements IRemoteFileResolver { 48 49 public static final String PROTOCOL = "gs"; 50 51 private static final long SLEEP_INTERVAL_MS = 5 * 1000; 52 private static final String RETRY_TIMEOUT_MS_ARG = "retry_timeout_ms"; 53 private static final AtomicInteger poolNumber = new AtomicInteger(1); 54 55 private GCSDownloaderHelper mHelper = null; 56 57 @Override resolveRemoteFile(RemoteFileResolverArgs args)58 public ResolvedFile resolveRemoteFile(RemoteFileResolverArgs args) throws BuildRetrievalError { 59 File consideredFile = args.getConsideredFile(); 60 // Don't use absolute path as it would not start with gs: 61 String path = consideredFile.getPath(); 62 63 File destFile = GCSFileDownloader.createTempFileForRemote(path, null); 64 try { 65 if (canUseParallelDownload(args.getQueryArgs())) { 66 Entry<File, Future<BuildRetrievalError>> parallelDownload = 67 fetchResourceWithRetryParallel(path, args.getQueryArgs(), destFile); 68 ExtendedFile eFile = new ExtendedFile(parallelDownload.getKey(), "gcs", "gcs"); 69 eFile.setDownloadFuture(parallelDownload.getValue()); 70 // Return the file with metadata 71 return new ResolvedFile(eFile); 72 } else { 73 // We need to download the file from the bucket 74 fetchResourceWithRetry(path, args.getQueryArgs(), destFile); 75 // Unzip it if required 76 return new ResolvedFile( 77 DynamicRemoteFileResolver.unzipIfRequired(destFile, args.getQueryArgs())); 78 } 79 } catch (IOException e) { 80 FileUtil.deleteFile(destFile); 81 CLog.e(e); 82 throw new BuildRetrievalError( 83 String.format("Failed to download %s due to: %s", path, e.getMessage()), 84 e, 85 InfraErrorIdentifier.GCS_ERROR); 86 } 87 } 88 89 @Override getSupportedProtocol()90 public @Nonnull String getSupportedProtocol() { 91 return PROTOCOL; 92 } 93 94 @VisibleForTesting getDownloader()95 protected GCSDownloaderHelper getDownloader() { 96 if (mHelper == null) { 97 mHelper = new GCSDownloaderHelper(); 98 } 99 return mHelper; 100 } 101 102 @VisibleForTesting sleep()103 void sleep() { 104 RunUtil.getDefault().sleep(SLEEP_INTERVAL_MS); 105 } 106 107 /** If the retry arg is set, we retry downloading until timeout is reached */ fetchResourceWithRetry(String path, Map<String, String> queryArgs, File destFile)108 private void fetchResourceWithRetry(String path, Map<String, String> queryArgs, File destFile) 109 throws BuildRetrievalError { 110 try (CloseableTraceScope ignored = new CloseableTraceScope("gs_download " + path)) { 111 String timeoutStringValue = queryArgs.get(RETRY_TIMEOUT_MS_ARG); 112 if (timeoutStringValue == null) { 113 getDownloader().fetchTestResource(destFile, path); 114 return; 115 } 116 long timeout = System.currentTimeMillis() + Long.parseLong(timeoutStringValue); 117 BuildRetrievalError error = null; 118 while (System.currentTimeMillis() < timeout) { 119 try { 120 getDownloader().fetchTestResource(destFile, path); 121 return; 122 } catch (BuildRetrievalError e) { 123 error = e; 124 } 125 sleep(); 126 } 127 throw error; 128 } 129 } 130 fetchResourceWithRetryParallel( String path, Map<String, String> queryArgs, File destFile)131 private Entry<File, Future<BuildRetrievalError>> fetchResourceWithRetryParallel( 132 String path, Map<String, String> queryArgs, File destFile) throws IOException { 133 String unzipValue = queryArgs.get(DynamicRemoteFileResolver.UNZIP_KEY); 134 boolean useDirectory = unzipValue != null && "true".equals(unzipValue.toLowerCase()); 135 File possibleDir = null; 136 if (useDirectory) { 137 possibleDir = 138 FileUtil.createTempDir( 139 FileUtil.getBaseName(destFile.getName()), 140 CurrentInvocation.getInfo(InvocationInfo.WORK_FOLDER)); 141 } 142 ThreadGroup currentGroup = Thread.currentThread().getThreadGroup(); 143 ThreadFactory factory = 144 new ThreadFactory() { 145 @Override 146 public Thread newThread(Runnable r) { 147 Thread t = 148 new Thread( 149 currentGroup, 150 r, 151 "gcs-pool-task-" + poolNumber.getAndIncrement()); 152 t.setDaemon(true); 153 return t; 154 } 155 }; 156 File destDir = possibleDir; 157 CompletableFuture<BuildRetrievalError> futureClient = 158 CompletableFuture.supplyAsync( 159 () -> { 160 try { 161 fetchResourceWithRetry(path, queryArgs, destFile); 162 if (useDirectory) { 163 ZipUtil2.extractZip(destFile, destDir); 164 FileUtil.deleteFile(destFile); 165 } 166 return null; 167 } catch (IOException ioe) { 168 FileUtil.deleteFile(destFile); 169 return new BuildRetrievalError(ioe.getMessage(), ioe); 170 } catch (BuildRetrievalError e) { 171 return e; 172 } 173 }, 174 TracePropagatingExecutorService.create( 175 Executors.newFixedThreadPool(1, factory))); 176 if (useDirectory) { 177 return new AbstractMap.SimpleEntry<File, Future<BuildRetrievalError>>( 178 destDir, futureClient); 179 } else { 180 return new AbstractMap.SimpleEntry<File, Future<BuildRetrievalError>>( 181 destFile, futureClient); 182 } 183 } 184 canUseParallelDownload(Map<String, String> query)185 private boolean canUseParallelDownload(Map<String, String> query) { 186 String parallelValue = query.get("parallel"); 187 if (parallelValue != null && "false".equals(parallelValue.toLowerCase())) { 188 return false; 189 } 190 return true; 191 } 192 } 193