1 /**
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <StreamScheduler.h>
18 #include <ImsMediaTrace.h>
19 #include <stdint.h>
20 #include <chrono>
21 #include <thread>
22 #include <algorithm>
23
24 using namespace std::chrono;
25
26 #define RUN_WAIT_TIMEOUT_MS 2
27 #define STOP_WAIT_TIMEOUT_MS 1000
28
StreamScheduler()29 StreamScheduler::StreamScheduler() {}
30
~StreamScheduler()31 StreamScheduler::~StreamScheduler()
32 {
33 Stop();
34 }
35
RegisterNode(BaseNode * pNode)36 void StreamScheduler::RegisterNode(BaseNode* pNode)
37 {
38 if (pNode != nullptr)
39 {
40 IMLOGD2("[RegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
41 std::lock_guard<std::mutex> guard(mMutex);
42 mListRegisteredNode.push_back(pNode);
43 }
44 }
45
DeRegisterNode(BaseNode * pNode)46 void StreamScheduler::DeRegisterNode(BaseNode* pNode)
47 {
48 if (pNode != nullptr)
49 {
50 IMLOGD2("[DeRegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
51 std::lock_guard<std::mutex> guard(mMutex);
52 mListRegisteredNode.remove(pNode);
53 }
54 }
55
Start()56 void StreamScheduler::Start()
57 {
58 IMLOGD1("[Start] [%p] enter", this);
59
60 for (auto& node : mListRegisteredNode)
61 {
62 if (node != nullptr)
63 {
64 IMLOGD2("[Start] [%p] registered node [%s]", this, node->GetNodeName());
65 }
66 }
67
68 if (!mListRegisteredNode.empty())
69 {
70 IMLOGD1("[Start] [%p] Start thread", this);
71 mIsRunning = true;
72 StartThread("StreamScheduler");
73 }
74
75 IMLOGD1("[Start] [%p] exit", this);
76 }
77
Stop()78 void StreamScheduler::Stop()
79 {
80 IMLOGD1("[Stop] [%p] enter", this);
81
82 if (!IsThreadStopped())
83 {
84 StopThread();
85 Awake();
86 mConditionExit.wait_timeout(STOP_WAIT_TIMEOUT_MS);
87 }
88
89 IMLOGD1("[Stop] [%p] exit", this);
90 }
91
Awake()92 void StreamScheduler::Awake()
93 {
94 if (!mIsRunning)
95 {
96 mIsRunning = true;
97 mConditionMain.signal();
98 }
99 }
100
RunRegisteredNode()101 bool StreamScheduler::RunRegisteredNode()
102 {
103 bool needToRun = false;
104 // the list to contain non-source type node
105 std::list<BaseNode*> listNodesToRun;
106
107 for (auto& node : mListRegisteredNode)
108 {
109 if (node != nullptr && node->GetState() == kNodeStateRunning && !node->IsRunTime())
110 {
111 if (node->IsSourceNode()) // process the source node
112 {
113 node->ProcessData();
114 needToRun = true;
115 }
116 else if (node->GetDataCount() > 0)
117 {
118 listNodesToRun.push_back(node); // store node to run
119 }
120 }
121 }
122
123 for (auto& node : listNodesToRun)
124 {
125 if (IsThreadStopped())
126 {
127 break;
128 }
129
130 if (node != nullptr)
131 {
132 node->ProcessData(); // process the non runtime node
133
134 if (node->GetDataCount() > 0)
135 {
136 needToRun = true;
137 }
138 }
139 }
140
141 listNodesToRun.clear();
142 return needToRun;
143 }
144
run()145 void* StreamScheduler::run()
146 {
147 IMLOGD1("[run] [%p] enter", this);
148
149 // start nodes
150 mMutex.lock();
151
152 for (auto& node : mListRegisteredNode)
153 {
154 if (node != nullptr && !node->IsRunTimeStart())
155 {
156 if (node->GetState() == kNodeStateStopped)
157 {
158 node->ProcessStart();
159 }
160 }
161 }
162
163 mMutex.unlock();
164
165 while (!IsThreadStopped())
166 {
167 mMutex.lock();
168 bool needToRun = RunRegisteredNode();
169 mMutex.unlock();
170
171 if (IsThreadStopped())
172 {
173 break;
174 }
175
176 if (needToRun)
177 {
178 mIsRunning = true;
179 mConditionMain.wait_timeout(RUN_WAIT_TIMEOUT_MS);
180 }
181 else
182 {
183 mIsRunning = false;
184 mConditionMain.wait();
185 }
186 }
187
188 mConditionExit.signal();
189 IMLOGD1("[run] [%p] exit", this);
190 return nullptr;
191 }