1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package com.android.tradefed.result.proto;
17 
18 import com.android.tradefed.config.Option;
19 import com.android.tradefed.invoker.IInvocationContext;
20 import com.android.tradefed.log.LogUtil.CLog;
21 import com.android.tradefed.result.proto.TestRecordProto.TestRecord;
22 import com.android.tradefed.util.RunUtil;
23 import com.android.tradefed.util.StreamUtil;
24 
25 import java.io.IOException;
26 import java.net.Socket;
27 import java.util.concurrent.ConcurrentLinkedQueue;
28 import java.util.concurrent.atomic.AtomicBoolean;
29 
30 /** An implementation of {@link ProtoResultReporter} */
31 public final class StreamProtoResultReporter extends ProtoResultReporter {
32 
33     public static final String PROTO_REPORT_PORT_OPTION = "proto-report-port";
34 
35     @Option(
36         name = PROTO_REPORT_PORT_OPTION,
37         description = "the port where to connect to send the protos."
38     )
39     private Integer mReportPort = null;
40 
41     private Socket mReportSocket = null;
42     private boolean mPrintedMessage = false;
43 
44     private ResultWriterThread mResultWriterThread;
45     private ConcurrentLinkedQueue<TestRecord> mToBeSent = new ConcurrentLinkedQueue<>();
46 
StreamProtoResultReporter()47     public StreamProtoResultReporter() {
48         setInlineRecordOfChildren(false);
49     }
50 
setProtoReportPort(Integer portValue)51     public void setProtoReportPort(Integer portValue) {
52         mReportPort = portValue;
53     }
54 
getProtoReportPort()55     public Integer getProtoReportPort() {
56         return mReportPort;
57     }
58 
59     @Override
processStartInvocation( TestRecord invocationStartRecord, IInvocationContext context)60     public void processStartInvocation(
61             TestRecord invocationStartRecord, IInvocationContext context) {
62         mResultWriterThread = new ResultWriterThread();
63         mResultWriterThread.start();
64         mToBeSent.add(invocationStartRecord);
65     }
66 
67     @Override
processTestModuleStarted(TestRecord moduleStartRecord)68     public void processTestModuleStarted(TestRecord moduleStartRecord) {
69         mToBeSent.add(moduleStartRecord);
70     }
71 
72     @Override
processTestModuleEnd(TestRecord moduleRecord)73     public void processTestModuleEnd(TestRecord moduleRecord) {
74         mToBeSent.add(moduleRecord);
75     }
76 
77     @Override
processTestRunStarted(TestRecord runStartedRecord)78     public void processTestRunStarted(TestRecord runStartedRecord) {
79         mToBeSent.add(runStartedRecord);
80     }
81 
82     @Override
processTestRunEnded(TestRecord runRecord, boolean moduleInProgress)83     public void processTestRunEnded(TestRecord runRecord, boolean moduleInProgress) {
84         mToBeSent.add(runRecord);
85     }
86 
87     @Override
processTestCaseStarted(TestRecord testCaseStartedRecord)88     public void processTestCaseStarted(TestRecord testCaseStartedRecord) {
89         mToBeSent.add(testCaseStartedRecord);
90     }
91 
92     @Override
processTestCaseEnded(TestRecord testCaseRecord)93     public void processTestCaseEnded(TestRecord testCaseRecord) {
94         mToBeSent.add(testCaseRecord);
95     }
96 
97     @Override
processFinalInvocationLogs(TestRecord invocationLogs)98     public void processFinalInvocationLogs(TestRecord invocationLogs) {
99         if (mResultWriterThread.mCancelled.get()) {
100             writeRecordToSocket(invocationLogs);
101         } else {
102             mToBeSent.add(invocationLogs);
103         }
104     }
105 
106     @Override
processFinalProto(TestRecord finalRecord)107     public void processFinalProto(TestRecord finalRecord) {
108         try {
109             if (mResultWriterThread.mCancelled.get()) {
110                 writeRecordToSocket(finalRecord);
111             } else {
112                 mToBeSent.add(finalRecord);
113             }
114         } finally {
115             // Upon invocation ended, trigger the end of the socket when the process finishes
116             SocketFinisher thread = new SocketFinisher();
117             Runtime.getRuntime().addShutdownHook(thread);
118             mResultWriterThread.mCancelled.set(true);
119             try {
120                 mResultWriterThread.join();
121             } catch (InterruptedException e) {
122                 CLog.e(e);
123             }
124         }
125     }
126 
closeSocket()127     protected void closeSocket() {
128         StreamUtil.close(mReportSocket);
129     }
130 
writeRecordToSocket(TestRecord record)131     private void writeRecordToSocket(TestRecord record) {
132         if (mReportPort == null) {
133             if (!mPrintedMessage) {
134                 CLog.d("No port set. Skipping the reporter.");
135                 mPrintedMessage = true;
136             }
137             return;
138         }
139         try {
140             if (mReportSocket == null) {
141                 mReportSocket = new Socket("localhost", mReportPort);
142             }
143             record.writeDelimitedTo(mReportSocket.getOutputStream());
144         } catch (IOException e) {
145             CLog.e(e);
146         }
147     }
148 
149     /** Threads that help terminating the socket. */
150     private class SocketFinisher extends Thread {
151 
SocketFinisher()152         public SocketFinisher() {
153             super();
154             setName("StreamProtoResultReporter-socket-finisher");
155         }
156 
157         @Override
run()158         public void run() {
159             closeSocket();
160         }
161     }
162 
163     /** Send events from the event queue */
164     private class ResultWriterThread extends Thread {
165 
166         private AtomicBoolean mCancelled = new AtomicBoolean(false);
167 
ResultWriterThread()168         public ResultWriterThread() {
169             super();
170             setName("ResultWriterThread");
171         }
172 
173         @Override
run()174         public void run() {
175             while (!mCancelled.get()) {
176                 flushEvents();
177                 if (!mCancelled.get()) {
178                     RunUtil.getDefault().sleep(1000);
179                 }
180             }
181             // Flush remaining events if any
182             flushEvents();
183         }
184 
flushEvents()185         public void flushEvents() {
186             TestRecord record = mToBeSent.poll();
187             while (record != null) {
188                 writeRecordToSocket(record);
189                 record = mToBeSent.poll();
190             }
191         }
192     }
193 }
194