1 /*
2  * Copyright 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "message_loop_thread.h"
18 
19 #include <base/functional/callback.h>
20 #include <base/location.h>
21 #include <base/strings/stringprintf.h>
22 #include <base/time/time.h>
23 #include <bluetooth/log.h>
24 #include <sys/syscall.h>
25 #include <unistd.h>
26 
27 #include <future>
28 #include <mutex>
29 #include <string>
30 #include <thread>
31 
32 #include "common/postable_context.h"
33 
34 namespace bluetooth {
35 namespace common {
36 
37 static constexpr int kRealTimeFifoSchedulingPriority = 1;
38 
timeDeltaFromMicroseconds(std::chrono::microseconds t)39 static base::TimeDelta timeDeltaFromMicroseconds(std::chrono::microseconds t) {
40 #if BASE_VER < 931007
41   return base::TimeDelta::FromMicroseconds(t.count());
42 #else
43   return base::Microseconds(t.count());
44 #endif
45 }
46 
MessageLoopThread(const std::string & thread_name)47 MessageLoopThread::MessageLoopThread(const std::string& thread_name)
48     : thread_name_(thread_name),
49       message_loop_(nullptr),
50       run_loop_(nullptr),
51       thread_(nullptr),
52       thread_id_(-1),
53       linux_tid_(-1),
54       weak_ptr_factory_(this),
55       shutting_down_(false) {}
56 
~MessageLoopThread()57 MessageLoopThread::~MessageLoopThread() { ShutDown(); }
58 
StartUp()59 void MessageLoopThread::StartUp() {
60   std::promise<void> start_up_promise;
61   std::future<void> start_up_future = start_up_promise.get_future();
62   {
63     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
64     if (thread_ != nullptr) {
65       log::warn("thread {} is already started", *this);
66 
67       return;
68     }
69     thread_ = new std::thread(&MessageLoopThread::RunThread, this,
70                               std::move(start_up_promise));
71   }
72   start_up_future.wait();
73 }
74 
DoInThread(const base::Location & from_here,base::OnceClosure task)75 bool MessageLoopThread::DoInThread(const base::Location& from_here,
76                                    base::OnceClosure task) {
77   return DoInThreadDelayed(from_here, std::move(task),
78                            std::chrono::microseconds(0));
79 }
80 
DoInThreadDelayed(const base::Location & from_here,base::OnceClosure task,std::chrono::microseconds delay)81 bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here,
82                                           base::OnceClosure task,
83                                           std::chrono::microseconds delay) {
84   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
85 
86   if (message_loop_ == nullptr) {
87     log::error("message loop is null for thread {}, from {}", *this,
88                from_here.ToString());
89     return false;
90   }
91   if (!message_loop_->task_runner()->PostDelayedTask(
92           from_here, std::move(task), timeDeltaFromMicroseconds(delay))) {
93     log::error("failed to post task to message loop for thread {}, from {}",
94                *this, from_here.ToString());
95     return false;
96   }
97   return true;
98 }
99 
ShutDown()100 void MessageLoopThread::ShutDown() {
101   {
102     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
103     if (thread_ == nullptr) {
104       log::info("thread {} is already stopped", *this);
105       return;
106     }
107     if (message_loop_ == nullptr) {
108       log::info("message_loop_ is null. Already stopping");
109       return;
110     }
111     if (shutting_down_) {
112       log::info("waiting for thread to join");
113       return;
114     }
115     shutting_down_ = true;
116     log::assert_that(thread_id_ != base::PlatformThread::CurrentId(),
117                      "should not be called on the thread itself. Otherwise, "
118                      "deadlock may happen.");
119     run_loop_->QuitWhenIdle();
120   }
121   thread_->join();
122   {
123     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
124     delete thread_;
125     thread_ = nullptr;
126     shutting_down_ = false;
127   }
128 }
129 
GetThreadId() const130 base::PlatformThreadId MessageLoopThread::GetThreadId() const {
131   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
132   return thread_id_;
133 }
134 
GetName() const135 std::string MessageLoopThread::GetName() const { return thread_name_; }
136 
ToString() const137 std::string MessageLoopThread::ToString() const {
138   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
139   return base::StringPrintf("%s(%d)", thread_name_.c_str(), thread_id_);
140 }
141 
IsRunning() const142 bool MessageLoopThread::IsRunning() const {
143   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
144   return thread_id_ != -1;
145 }
146 
147 // Non API method, should not be protected by API mutex
RunThread(MessageLoopThread * thread,std::promise<void> start_up_promise)148 void MessageLoopThread::RunThread(MessageLoopThread* thread,
149                                   std::promise<void> start_up_promise) {
150   thread->Run(std::move(start_up_promise));
151 }
152 
153 // This is only for use in tests.
message_loop() const154 btbase::AbstractMessageLoop* MessageLoopThread::message_loop() const {
155   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
156   return message_loop_;
157 }
158 
EnableRealTimeScheduling()159 bool MessageLoopThread::EnableRealTimeScheduling() {
160   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
161 
162   if (!IsRunning()) {
163     log::error("thread {} is not running", *this);
164     return false;
165   }
166 
167   struct sched_param rt_params = {.sched_priority =
168                                       kRealTimeFifoSchedulingPriority};
169   int rc = sched_setscheduler(linux_tid_, SCHED_FIFO, &rt_params);
170   if (rc != 0) {
171     log::error(
172         "unable to set SCHED_FIFO priority {} for linux_tid {}, thread {}, "
173         "error: {}",
174         kRealTimeFifoSchedulingPriority, linux_tid_, *this, strerror(errno));
175     return false;
176   }
177   return true;
178 }
179 
GetWeakPtr()180 base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
181   std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
182   return weak_ptr_factory_.GetWeakPtr();
183 }
184 
Run(std::promise<void> start_up_promise)185 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
186   {
187     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
188 
189     log::info("message loop starting for thread {}", thread_name_);
190     base::PlatformThread::SetName(thread_name_);
191     message_loop_ = new btbase::AbstractMessageLoop();
192     run_loop_ = new base::RunLoop();
193     thread_id_ = base::PlatformThread::CurrentId();
194     linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid));
195     start_up_promise.set_value();
196   }
197 
198   // Blocking until ShutDown() is called
199   run_loop_->Run();
200 
201   {
202     std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
203     thread_id_ = -1;
204     linux_tid_ = -1;
205     delete message_loop_;
206     message_loop_ = nullptr;
207     delete run_loop_;
208     run_loop_ = nullptr;
209     log::info("message loop finished for thread {}", thread_name_);
210   }
211 }
212 
Post(base::OnceClosure closure)213 void MessageLoopThread::Post(base::OnceClosure closure) {
214   DoInThread(FROM_HERE, std::move(closure));
215 }
216 
Postable()217 PostableContext* MessageLoopThread::Postable() { return this; }
218 
219 }  // namespace common
220 }  // namespace bluetooth
221