1 /**
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <StreamScheduler.h>
18 #include <ImsMediaTrace.h>
19 #include <stdint.h>
20 #include <chrono>
21 #include <thread>
22 #include <algorithm>
23 
24 using namespace std::chrono;
25 
26 #define RUN_WAIT_TIMEOUT_MS  2
27 #define STOP_WAIT_TIMEOUT_MS 1000
28 
StreamScheduler()29 StreamScheduler::StreamScheduler() {}
30 
~StreamScheduler()31 StreamScheduler::~StreamScheduler()
32 {
33     Stop();
34 }
35 
RegisterNode(BaseNode * pNode)36 void StreamScheduler::RegisterNode(BaseNode* pNode)
37 {
38     if (pNode != nullptr)
39     {
40         IMLOGD2("[RegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
41         std::lock_guard<std::mutex> guard(mMutex);
42         mListRegisteredNode.push_back(pNode);
43     }
44 }
45 
DeRegisterNode(BaseNode * pNode)46 void StreamScheduler::DeRegisterNode(BaseNode* pNode)
47 {
48     if (pNode != nullptr)
49     {
50         IMLOGD2("[DeRegisterNode] [%p], node[%s]", this, pNode->GetNodeName());
51         std::lock_guard<std::mutex> guard(mMutex);
52         mListRegisteredNode.remove(pNode);
53     }
54 }
55 
Start()56 void StreamScheduler::Start()
57 {
58     IMLOGD1("[Start] [%p] enter", this);
59 
60     for (auto& node : mListRegisteredNode)
61     {
62         if (node != nullptr)
63         {
64             IMLOGD2("[Start] [%p] registered node [%s]", this, node->GetNodeName());
65         }
66     }
67 
68     if (!mListRegisteredNode.empty())
69     {
70         IMLOGD1("[Start] [%p] Start thread", this);
71         mIsRunning = true;
72         StartThread("StreamScheduler");
73     }
74 
75     IMLOGD1("[Start] [%p] exit", this);
76 }
77 
Stop()78 void StreamScheduler::Stop()
79 {
80     IMLOGD1("[Stop] [%p] enter", this);
81 
82     if (!IsThreadStopped())
83     {
84         StopThread();
85         Awake();
86         mConditionExit.wait_timeout(STOP_WAIT_TIMEOUT_MS);
87     }
88 
89     IMLOGD1("[Stop] [%p] exit", this);
90 }
91 
Awake()92 void StreamScheduler::Awake()
93 {
94     if (!mIsRunning)
95     {
96         mIsRunning = true;
97         mConditionMain.signal();
98     }
99 }
100 
RunRegisteredNode()101 bool StreamScheduler::RunRegisteredNode()
102 {
103     bool needToRun = false;
104     // the list to contain non-source type node
105     std::list<BaseNode*> listNodesToRun;
106 
107     for (auto& node : mListRegisteredNode)
108     {
109         if (node != nullptr && node->GetState() == kNodeStateRunning && !node->IsRunTime())
110         {
111             if (node->IsSourceNode())  // process the source node
112             {
113                 node->ProcessData();
114                 needToRun = true;
115             }
116             else if (node->GetDataCount() > 0)
117             {
118                 listNodesToRun.push_back(node);  // store node to run
119             }
120         }
121     }
122 
123     for (auto& node : listNodesToRun)
124     {
125         if (IsThreadStopped())
126         {
127             break;
128         }
129 
130         if (node != nullptr)
131         {
132             node->ProcessData();  // process the non runtime node
133 
134             if (node->GetDataCount() > 0)
135             {
136                 needToRun = true;
137             }
138         }
139     }
140 
141     listNodesToRun.clear();
142     return needToRun;
143 }
144 
run()145 void* StreamScheduler::run()
146 {
147     IMLOGD1("[run] [%p] enter", this);
148 
149     // start nodes
150     mMutex.lock();
151 
152     for (auto& node : mListRegisteredNode)
153     {
154         if (node != nullptr && !node->IsRunTimeStart())
155         {
156             if (node->GetState() == kNodeStateStopped)
157             {
158                 node->ProcessStart();
159             }
160         }
161     }
162 
163     mMutex.unlock();
164 
165     while (!IsThreadStopped())
166     {
167         mMutex.lock();
168         bool needToRun = RunRegisteredNode();
169         mMutex.unlock();
170 
171         if (IsThreadStopped())
172         {
173             break;
174         }
175 
176         if (needToRun)
177         {
178             mIsRunning = true;
179             mConditionMain.wait_timeout(RUN_WAIT_TIMEOUT_MS);
180         }
181         else
182         {
183             mIsRunning = false;
184             mConditionMain.wait();
185         }
186     }
187 
188     mConditionExit.signal();
189     IMLOGD1("[run] [%p] exit", this);
190     return nullptr;
191 }