1 /*
2 * Copyright 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "message_loop_thread.h"
18
19 #include <base/functional/callback.h>
20 #include <base/location.h>
21 #include <base/strings/stringprintf.h>
22 #include <base/time/time.h>
23 #include <bluetooth/log.h>
24 #include <sys/syscall.h>
25 #include <unistd.h>
26
27 #include <future>
28 #include <mutex>
29 #include <string>
30 #include <thread>
31
32 #include "common/postable_context.h"
33
34 namespace bluetooth {
35 namespace common {
36
37 static constexpr int kRealTimeFifoSchedulingPriority = 1;
38
timeDeltaFromMicroseconds(std::chrono::microseconds t)39 static base::TimeDelta timeDeltaFromMicroseconds(std::chrono::microseconds t) {
40 #if BASE_VER < 931007
41 return base::TimeDelta::FromMicroseconds(t.count());
42 #else
43 return base::Microseconds(t.count());
44 #endif
45 }
46
MessageLoopThread(const std::string & thread_name)47 MessageLoopThread::MessageLoopThread(const std::string& thread_name)
48 : thread_name_(thread_name),
49 message_loop_(nullptr),
50 run_loop_(nullptr),
51 thread_(nullptr),
52 thread_id_(-1),
53 linux_tid_(-1),
54 weak_ptr_factory_(this),
55 shutting_down_(false) {}
56
~MessageLoopThread()57 MessageLoopThread::~MessageLoopThread() { ShutDown(); }
58
StartUp()59 void MessageLoopThread::StartUp() {
60 std::promise<void> start_up_promise;
61 std::future<void> start_up_future = start_up_promise.get_future();
62 {
63 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
64 if (thread_ != nullptr) {
65 log::warn("thread {} is already started", *this);
66
67 return;
68 }
69 thread_ = new std::thread(&MessageLoopThread::RunThread, this,
70 std::move(start_up_promise));
71 }
72 start_up_future.wait();
73 }
74
DoInThread(const base::Location & from_here,base::OnceClosure task)75 bool MessageLoopThread::DoInThread(const base::Location& from_here,
76 base::OnceClosure task) {
77 return DoInThreadDelayed(from_here, std::move(task),
78 std::chrono::microseconds(0));
79 }
80
DoInThreadDelayed(const base::Location & from_here,base::OnceClosure task,std::chrono::microseconds delay)81 bool MessageLoopThread::DoInThreadDelayed(const base::Location& from_here,
82 base::OnceClosure task,
83 std::chrono::microseconds delay) {
84 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
85
86 if (message_loop_ == nullptr) {
87 log::error("message loop is null for thread {}, from {}", *this,
88 from_here.ToString());
89 return false;
90 }
91 if (!message_loop_->task_runner()->PostDelayedTask(
92 from_here, std::move(task), timeDeltaFromMicroseconds(delay))) {
93 log::error("failed to post task to message loop for thread {}, from {}",
94 *this, from_here.ToString());
95 return false;
96 }
97 return true;
98 }
99
ShutDown()100 void MessageLoopThread::ShutDown() {
101 {
102 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
103 if (thread_ == nullptr) {
104 log::info("thread {} is already stopped", *this);
105 return;
106 }
107 if (message_loop_ == nullptr) {
108 log::info("message_loop_ is null. Already stopping");
109 return;
110 }
111 if (shutting_down_) {
112 log::info("waiting for thread to join");
113 return;
114 }
115 shutting_down_ = true;
116 log::assert_that(thread_id_ != base::PlatformThread::CurrentId(),
117 "should not be called on the thread itself. Otherwise, "
118 "deadlock may happen.");
119 run_loop_->QuitWhenIdle();
120 }
121 thread_->join();
122 {
123 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
124 delete thread_;
125 thread_ = nullptr;
126 shutting_down_ = false;
127 }
128 }
129
GetThreadId() const130 base::PlatformThreadId MessageLoopThread::GetThreadId() const {
131 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
132 return thread_id_;
133 }
134
GetName() const135 std::string MessageLoopThread::GetName() const { return thread_name_; }
136
ToString() const137 std::string MessageLoopThread::ToString() const {
138 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
139 return base::StringPrintf("%s(%d)", thread_name_.c_str(), thread_id_);
140 }
141
IsRunning() const142 bool MessageLoopThread::IsRunning() const {
143 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
144 return thread_id_ != -1;
145 }
146
147 // Non API method, should not be protected by API mutex
RunThread(MessageLoopThread * thread,std::promise<void> start_up_promise)148 void MessageLoopThread::RunThread(MessageLoopThread* thread,
149 std::promise<void> start_up_promise) {
150 thread->Run(std::move(start_up_promise));
151 }
152
153 // This is only for use in tests.
message_loop() const154 btbase::AbstractMessageLoop* MessageLoopThread::message_loop() const {
155 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
156 return message_loop_;
157 }
158
EnableRealTimeScheduling()159 bool MessageLoopThread::EnableRealTimeScheduling() {
160 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
161
162 if (!IsRunning()) {
163 log::error("thread {} is not running", *this);
164 return false;
165 }
166
167 struct sched_param rt_params = {.sched_priority =
168 kRealTimeFifoSchedulingPriority};
169 int rc = sched_setscheduler(linux_tid_, SCHED_FIFO, &rt_params);
170 if (rc != 0) {
171 log::error(
172 "unable to set SCHED_FIFO priority {} for linux_tid {}, thread {}, "
173 "error: {}",
174 kRealTimeFifoSchedulingPriority, linux_tid_, *this, strerror(errno));
175 return false;
176 }
177 return true;
178 }
179
GetWeakPtr()180 base::WeakPtr<MessageLoopThread> MessageLoopThread::GetWeakPtr() {
181 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
182 return weak_ptr_factory_.GetWeakPtr();
183 }
184
Run(std::promise<void> start_up_promise)185 void MessageLoopThread::Run(std::promise<void> start_up_promise) {
186 {
187 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
188
189 log::info("message loop starting for thread {}", thread_name_);
190 base::PlatformThread::SetName(thread_name_);
191 message_loop_ = new btbase::AbstractMessageLoop();
192 run_loop_ = new base::RunLoop();
193 thread_id_ = base::PlatformThread::CurrentId();
194 linux_tid_ = static_cast<pid_t>(syscall(SYS_gettid));
195 start_up_promise.set_value();
196 }
197
198 // Blocking until ShutDown() is called
199 run_loop_->Run();
200
201 {
202 std::lock_guard<std::recursive_mutex> api_lock(api_mutex_);
203 thread_id_ = -1;
204 linux_tid_ = -1;
205 delete message_loop_;
206 message_loop_ = nullptr;
207 delete run_loop_;
208 run_loop_ = nullptr;
209 log::info("message loop finished for thread {}", thread_name_);
210 }
211 }
212
Post(base::OnceClosure closure)213 void MessageLoopThread::Post(base::OnceClosure closure) {
214 DoInThread(FROM_HERE, std::move(closure));
215 }
216
Postable()217 PostableContext* MessageLoopThread::Postable() { return this; }
218
219 } // namespace common
220 } // namespace bluetooth
221