1 // Copyright (C) 2020 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef COMPUTEPIPE_RUNNER_GRAPH_STREAM_SET_OBSERVER_H
16 #define COMPUTEPIPE_RUNNER_GRAPH_STREAM_SET_OBSERVER_H
17 
18 #include <condition_variable>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <thread>
24 
25 #include "GrpcPrebuiltGraphService.grpc.pb.h"
26 #include "GrpcPrebuiltGraphService.pb.h"
27 #include "InputFrame.h"
28 #include "RunnerComponent.h"
29 #include "types/Status.h"
30 
31 namespace android {
32 namespace automotive {
33 namespace computepipe {
34 namespace graph {
35 
36 class GrpcGraph;
37 
38 class EndOfStreamReporter {
39   public:
40     virtual ~EndOfStreamReporter() = default;
41 
42     virtual void reportStreamClosed(int streamId) = 0;
43 };
44 
45 class StreamGraphInterface {
46   public:
47     virtual ~StreamGraphInterface() = default;
48 
49     virtual void dispatchPixelData(int streamId, int64_t timestamp_us,
50                                    const runner::InputFrame& frame) = 0;
51 
52     virtual void dispatchSerializedData(int streamId, int64_t timestamp_us,
53                                         std::string&& serialized_data) = 0;
54 
55     virtual void dispatchGraphTerminationMessage(Status, std::string&&) = 0;
56 
57     virtual proto::GrpcGraphService::Stub* getServiceStub() = 0;
58 };
59 
60 class SingleStreamObserver {
61   public:
62     SingleStreamObserver(int streamId, EndOfStreamReporter* endOfStreamReporter,
63                          StreamGraphInterface* streamGraphInterface);
64 
65     virtual ~SingleStreamObserver();
66 
67     Status startObservingStream();
68 
69     void stopObservingStream();
70   private:
71     int mStreamId;
72     EndOfStreamReporter* mEndOfStreamReporter;
73     StreamGraphInterface* mStreamGraphInterface;
74     std::thread mThread;
75     bool mStopped = true;
76     std::mutex mStopObservationLock;
77 };
78 
79 class StreamSetObserver : public EndOfStreamReporter {
80   public:
81     virtual ~StreamSetObserver();
82 
83     StreamSetObserver(const runner::ClientConfig& clientConfig,
84                       StreamGraphInterface* streamGraphInterface);
85 
86     Status startObservingStreams();
87 
88     void stopObservingStreams(bool stopImmediately);
89 
90     void reportStreamClosed(int streamId) override;
91   private:
92     const runner::ClientConfig& mClientConfig;
93     StreamGraphInterface* mStreamGraphInterface;
94     std::map<int, std::unique_ptr<SingleStreamObserver>> mStreamObservers;
95     std::mutex mLock;
96     std::condition_variable mStoppedCv;
97     std::thread mGraphTerminationThread;
98     bool mStopped = true;
99 };
100 
101 }  // namespace graph
102 }  // namespace computepipe
103 }  // namespace automotive
104 }  // namespace android
105 
106 #endif  // #define COMPUTEPIPE_RUNNER_GRAPH_STREAM_SET_OBSERVER_H
107