1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package com.android.tradefed.result.proto; 17 18 import com.android.tradefed.config.Option; 19 import com.android.tradefed.invoker.IInvocationContext; 20 import com.android.tradefed.log.LogUtil.CLog; 21 import com.android.tradefed.result.proto.TestRecordProto.TestRecord; 22 import com.android.tradefed.util.RunUtil; 23 import com.android.tradefed.util.StreamUtil; 24 25 import java.io.IOException; 26 import java.net.Socket; 27 import java.util.concurrent.ConcurrentLinkedQueue; 28 import java.util.concurrent.atomic.AtomicBoolean; 29 30 /** An implementation of {@link ProtoResultReporter} */ 31 public final class StreamProtoResultReporter extends ProtoResultReporter { 32 33 public static final String PROTO_REPORT_PORT_OPTION = "proto-report-port"; 34 35 @Option( 36 name = PROTO_REPORT_PORT_OPTION, 37 description = "the port where to connect to send the protos." 38 ) 39 private Integer mReportPort = null; 40 41 private Socket mReportSocket = null; 42 private boolean mPrintedMessage = false; 43 44 private ResultWriterThread mResultWriterThread; 45 private ConcurrentLinkedQueue<TestRecord> mToBeSent = new ConcurrentLinkedQueue<>(); 46 StreamProtoResultReporter()47 public StreamProtoResultReporter() { 48 setInlineRecordOfChildren(false); 49 } 50 setProtoReportPort(Integer portValue)51 public void setProtoReportPort(Integer portValue) { 52 mReportPort = portValue; 53 } 54 getProtoReportPort()55 public Integer getProtoReportPort() { 56 return mReportPort; 57 } 58 59 @Override processStartInvocation( TestRecord invocationStartRecord, IInvocationContext context)60 public void processStartInvocation( 61 TestRecord invocationStartRecord, IInvocationContext context) { 62 mResultWriterThread = new ResultWriterThread(); 63 mResultWriterThread.start(); 64 mToBeSent.add(invocationStartRecord); 65 } 66 67 @Override processTestModuleStarted(TestRecord moduleStartRecord)68 public void processTestModuleStarted(TestRecord moduleStartRecord) { 69 mToBeSent.add(moduleStartRecord); 70 } 71 72 @Override processTestModuleEnd(TestRecord moduleRecord)73 public void processTestModuleEnd(TestRecord moduleRecord) { 74 mToBeSent.add(moduleRecord); 75 } 76 77 @Override processTestRunStarted(TestRecord runStartedRecord)78 public void processTestRunStarted(TestRecord runStartedRecord) { 79 mToBeSent.add(runStartedRecord); 80 } 81 82 @Override processTestRunEnded(TestRecord runRecord, boolean moduleInProgress)83 public void processTestRunEnded(TestRecord runRecord, boolean moduleInProgress) { 84 mToBeSent.add(runRecord); 85 } 86 87 @Override processTestCaseStarted(TestRecord testCaseStartedRecord)88 public void processTestCaseStarted(TestRecord testCaseStartedRecord) { 89 mToBeSent.add(testCaseStartedRecord); 90 } 91 92 @Override processTestCaseEnded(TestRecord testCaseRecord)93 public void processTestCaseEnded(TestRecord testCaseRecord) { 94 mToBeSent.add(testCaseRecord); 95 } 96 97 @Override processFinalInvocationLogs(TestRecord invocationLogs)98 public void processFinalInvocationLogs(TestRecord invocationLogs) { 99 if (mResultWriterThread.mCancelled.get()) { 100 writeRecordToSocket(invocationLogs); 101 } else { 102 mToBeSent.add(invocationLogs); 103 } 104 } 105 106 @Override processFinalProto(TestRecord finalRecord)107 public void processFinalProto(TestRecord finalRecord) { 108 try { 109 if (mResultWriterThread.mCancelled.get()) { 110 writeRecordToSocket(finalRecord); 111 } else { 112 mToBeSent.add(finalRecord); 113 } 114 } finally { 115 // Upon invocation ended, trigger the end of the socket when the process finishes 116 SocketFinisher thread = new SocketFinisher(); 117 Runtime.getRuntime().addShutdownHook(thread); 118 mResultWriterThread.mCancelled.set(true); 119 try { 120 mResultWriterThread.join(); 121 } catch (InterruptedException e) { 122 CLog.e(e); 123 } 124 } 125 } 126 closeSocket()127 protected void closeSocket() { 128 StreamUtil.close(mReportSocket); 129 } 130 writeRecordToSocket(TestRecord record)131 private void writeRecordToSocket(TestRecord record) { 132 if (mReportPort == null) { 133 if (!mPrintedMessage) { 134 CLog.d("No port set. Skipping the reporter."); 135 mPrintedMessage = true; 136 } 137 return; 138 } 139 try { 140 if (mReportSocket == null) { 141 mReportSocket = new Socket("localhost", mReportPort); 142 } 143 record.writeDelimitedTo(mReportSocket.getOutputStream()); 144 } catch (IOException e) { 145 CLog.e(e); 146 } 147 } 148 149 /** Threads that help terminating the socket. */ 150 private class SocketFinisher extends Thread { 151 SocketFinisher()152 public SocketFinisher() { 153 super(); 154 setName("StreamProtoResultReporter-socket-finisher"); 155 } 156 157 @Override run()158 public void run() { 159 closeSocket(); 160 } 161 } 162 163 /** Send events from the event queue */ 164 private class ResultWriterThread extends Thread { 165 166 private AtomicBoolean mCancelled = new AtomicBoolean(false); 167 ResultWriterThread()168 public ResultWriterThread() { 169 super(); 170 setName("ResultWriterThread"); 171 } 172 173 @Override run()174 public void run() { 175 while (!mCancelled.get()) { 176 flushEvents(); 177 if (!mCancelled.get()) { 178 RunUtil.getDefault().sleep(1000); 179 } 180 } 181 // Flush remaining events if any 182 flushEvents(); 183 } 184 flushEvents()185 public void flushEvents() { 186 TestRecord record = mToBeSent.poll(); 187 while (record != null) { 188 writeRecordToSocket(record); 189 record = mToBeSent.poll(); 190 } 191 } 192 } 193 } 194