1 // Copyright (C) 2023 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "handler_manager.h"
16 
17 #include <pthread.h>
18 #include <sys/eventfd.h>
19 
20 #include <android-base/logging.h>
21 
22 #include "android-base/properties.h"
23 #include "merge_worker.h"
24 #include "read_worker.h"
25 #include "snapuserd_core.h"
26 
27 namespace android {
28 namespace snapshot {
29 
30 static constexpr uint8_t kMaxMergeThreads = 2;
31 
HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)32 HandlerThread::HandlerThread(std::shared_ptr<SnapshotHandler> snapuserd)
33     : snapuserd_(snapuserd), misc_name_(snapuserd_->GetMiscName()) {}
34 
FreeResources()35 void HandlerThread::FreeResources() {
36     // Each worker thread holds a reference to snapuserd.
37     // Clear them so that all the resources
38     // held by snapuserd is released
39     if (snapuserd_) {
40         snapuserd_->FreeResources();
41         snapuserd_ = nullptr;
42     }
43 }
44 
SnapshotHandlerManager()45 SnapshotHandlerManager::SnapshotHandlerManager() {
46     monitor_merge_event_fd_.reset(eventfd(0, EFD_CLOEXEC));
47     if (monitor_merge_event_fd_ == -1) {
48         PLOG(FATAL) << "monitor_merge_event_fd_: failed to create eventfd";
49     }
50 }
51 
AddHandler(const std::string & misc_name,const std::string & cow_device_path,const std::string & backing_device,const std::string & base_path_merge,std::shared_ptr<IBlockServerOpener> opener,int num_worker_threads,bool use_iouring,bool o_direct)52 std::shared_ptr<HandlerThread> SnapshotHandlerManager::AddHandler(
53         const std::string& misc_name, const std::string& cow_device_path,
54         const std::string& backing_device, const std::string& base_path_merge,
55         std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads, bool use_iouring,
56         bool o_direct) {
57     auto snapuserd = std::make_shared<SnapshotHandler>(
58             misc_name, cow_device_path, backing_device, base_path_merge, opener, num_worker_threads,
59             use_iouring, perform_verification_, o_direct);
60     if (!snapuserd->InitCowDevice()) {
61         LOG(ERROR) << "Failed to initialize Snapuserd";
62         return nullptr;
63     }
64 
65     if (!snapuserd->InitializeWorkers()) {
66         LOG(ERROR) << "Failed to initialize workers";
67         return nullptr;
68     }
69 
70     auto handler = std::make_shared<HandlerThread>(snapuserd);
71     {
72         std::lock_guard<std::mutex> lock(lock_);
73         if (FindHandler(&lock, misc_name) != dm_users_.end()) {
74             LOG(ERROR) << "Handler already exists: " << misc_name;
75             return nullptr;
76         }
77         dm_users_.push_back(handler);
78     }
79     return handler;
80 }
81 
StartHandler(const std::string & misc_name)82 bool SnapshotHandlerManager::StartHandler(const std::string& misc_name) {
83     std::lock_guard<std::mutex> lock(lock_);
84     auto iter = FindHandler(&lock, misc_name);
85     if (iter == dm_users_.end()) {
86         LOG(ERROR) << "Could not find handler: " << misc_name;
87         return false;
88     }
89     if (!(*iter)->snapuserd() || (*iter)->snapuserd()->IsAttached()) {
90         LOG(ERROR) << "Tried to re-attach control device: " << misc_name;
91         return false;
92     }
93     if (!StartHandler(*iter)) {
94         return false;
95     }
96     return true;
97 }
98 
StartHandler(const std::shared_ptr<HandlerThread> & handler)99 bool SnapshotHandlerManager::StartHandler(const std::shared_ptr<HandlerThread>& handler) {
100     if (handler->snapuserd()->IsAttached()) {
101         LOG(ERROR) << "Handler already attached";
102         return false;
103     }
104 
105     handler->snapuserd()->AttachControlDevice();
106 
107     handler->thread() = std::thread(std::bind(&SnapshotHandlerManager::RunThread, this, handler));
108     return true;
109 }
110 
DeleteHandler(const std::string & misc_name)111 bool SnapshotHandlerManager::DeleteHandler(const std::string& misc_name) {
112     {
113         std::lock_guard<std::mutex> lock(lock_);
114         auto iter = FindHandler(&lock, misc_name);
115         if (iter == dm_users_.end()) {
116             // After merge is completed, we swap dm-user table with
117             // the underlying dm-linear base device. Hence, worker
118             // threads would have terminted and was removed from
119             // the list.
120             LOG(DEBUG) << "Could not find handler: " << misc_name;
121             return true;
122         }
123 
124         if (!(*iter)->ThreadTerminated()) {
125             (*iter)->snapuserd()->NotifyIOTerminated();
126         }
127     }
128     if (!RemoveAndJoinHandler(misc_name)) {
129         return false;
130     }
131     return true;
132 }
133 
RunThread(std::shared_ptr<HandlerThread> handler)134 void SnapshotHandlerManager::RunThread(std::shared_ptr<HandlerThread> handler) {
135     LOG(INFO) << "Entering thread for handler: " << handler->misc_name();
136 
137     pthread_setname_np(pthread_self(), "Handler");
138 
139     if (!handler->snapuserd()->Start()) {
140         LOG(ERROR) << " Failed to launch all worker threads";
141     }
142 
143     handler->snapuserd()->CloseFds();
144     bool merge_completed = handler->snapuserd()->CheckMergeCompletionStatus();
145     handler->snapuserd()->UnmapBufferRegion();
146 
147     auto misc_name = handler->misc_name();
148     LOG(INFO) << "Handler thread about to exit: " << misc_name;
149 
150     {
151         std::lock_guard<std::mutex> lock(lock_);
152         if (merge_completed) {
153             num_partitions_merge_complete_ += 1;
154             active_merge_threads_ -= 1;
155             WakeupMonitorMergeThread();
156         }
157         handler->SetThreadTerminated();
158         auto iter = FindHandler(&lock, handler->misc_name());
159         if (iter == dm_users_.end()) {
160             // RemoveAndJoinHandler() already removed us from the list, and is
161             // now waiting on a join(), so just return. Additionally, release
162             // all the resources held by snapuserd object which are shared
163             // by worker threads. This should be done when the last reference
164             // of "handler" is released; but we will explicitly release here
165             // to make sure snapuserd object is freed as it is the biggest
166             // consumer of memory in the daemon.
167             handler->FreeResources();
168             LOG(INFO) << "Exiting handler thread to allow for join: " << misc_name;
169             return;
170         }
171 
172         LOG(INFO) << "Exiting handler thread and freeing resources: " << misc_name;
173 
174         if (handler->snapuserd()->IsAttached()) {
175             handler->thread().detach();
176         }
177 
178         // Important: free resources within the lock. This ensures that if
179         // WaitForDelete() is called, the handler is either in the list, or
180         // it's not and its resources are guaranteed to be freed.
181         handler->FreeResources();
182         dm_users_.erase(iter);
183     }
184 }
185 
InitiateMerge(const std::string & misc_name)186 bool SnapshotHandlerManager::InitiateMerge(const std::string& misc_name) {
187     std::lock_guard<std::mutex> lock(lock_);
188     auto iter = FindHandler(&lock, misc_name);
189     if (iter == dm_users_.end()) {
190         LOG(ERROR) << "Could not find handler: " << misc_name;
191         return false;
192     }
193 
194     return StartMerge(&lock, *iter);
195 }
196 
StartMerge(std::lock_guard<std::mutex> * proof_of_lock,const std::shared_ptr<HandlerThread> & handler)197 bool SnapshotHandlerManager::StartMerge(std::lock_guard<std::mutex>* proof_of_lock,
198                                         const std::shared_ptr<HandlerThread>& handler) {
199     CHECK(proof_of_lock);
200 
201     if (!handler->snapuserd()->IsAttached()) {
202         LOG(ERROR) << "Handler not attached to dm-user - Merge thread cannot be started";
203         return false;
204     }
205 
206     handler->snapuserd()->MonitorMerge();
207 
208     if (!merge_monitor_.joinable()) {
209         merge_monitor_ = std::thread(&SnapshotHandlerManager::MonitorMerge, this);
210     }
211 
212     merge_handlers_.push(handler);
213     WakeupMonitorMergeThread();
214     return true;
215 }
216 
WakeupMonitorMergeThread()217 void SnapshotHandlerManager::WakeupMonitorMergeThread() {
218     uint64_t notify = 1;
219     ssize_t rc = TEMP_FAILURE_RETRY(write(monitor_merge_event_fd_.get(), &notify, sizeof(notify)));
220     if (rc < 0) {
221         PLOG(FATAL) << "failed to notify monitor merge thread";
222     }
223 }
224 
MonitorMerge()225 void SnapshotHandlerManager::MonitorMerge() {
226     pthread_setname_np(pthread_self(), "Merge Monitor");
227     while (!stop_monitor_merge_thread_) {
228         uint64_t testVal;
229         ssize_t ret =
230                 TEMP_FAILURE_RETRY(read(monitor_merge_event_fd_.get(), &testVal, sizeof(testVal)));
231         if (ret == -1) {
232             PLOG(FATAL) << "Failed to read from eventfd";
233         } else if (ret == 0) {
234             LOG(FATAL) << "Hit EOF on eventfd";
235         }
236 
237         LOG(INFO) << "MonitorMerge: active-merge-threads: " << active_merge_threads_;
238         {
239             auto num_merge_threads = android::base::GetUintProperty<uint>(
240                     "ro.virtual_ab.num_merge_threads", kMaxMergeThreads);
241             std::lock_guard<std::mutex> lock(lock_);
242             while (active_merge_threads_ < num_merge_threads && merge_handlers_.size() > 0) {
243                 auto handler = merge_handlers_.front();
244                 merge_handlers_.pop();
245 
246                 if (!handler->snapuserd()) {
247                     LOG(INFO) << "MonitorMerge: skipping deleted handler: " << handler->misc_name();
248                     continue;
249                 }
250 
251                 LOG(INFO) << "Starting merge for partition: "
252                           << handler->snapuserd()->GetMiscName();
253                 handler->snapuserd()->InitiateMerge();
254                 active_merge_threads_ += 1;
255             }
256         }
257     }
258 
259     LOG(INFO) << "Exiting MonitorMerge: size: " << merge_handlers_.size();
260 }
261 
GetMergeStatus(const std::string & misc_name)262 std::string SnapshotHandlerManager::GetMergeStatus(const std::string& misc_name) {
263     std::lock_guard<std::mutex> lock(lock_);
264     auto iter = FindHandler(&lock, misc_name);
265     if (iter == dm_users_.end()) {
266         LOG(ERROR) << "Could not find handler: " << misc_name;
267         return {};
268     }
269 
270     return (*iter)->snapuserd()->GetMergeStatus();
271 }
272 
GetMergePercentage()273 double SnapshotHandlerManager::GetMergePercentage() {
274     std::lock_guard<std::mutex> lock(lock_);
275 
276     double percentage = 0.0;
277     int n = 0;
278 
279     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
280         auto& th = (*iter)->thread();
281         if (th.joinable()) {
282             // Merge percentage by individual partitions wherein merge is still
283             // in-progress
284             percentage += (*iter)->snapuserd()->GetMergePercentage();
285             n += 1;
286         }
287     }
288 
289     // Calculate final merge including those partitions where merge was already
290     // completed - num_partitions_merge_complete_ will track them when each
291     // thread exists in RunThread.
292     int total_partitions = n + num_partitions_merge_complete_;
293 
294     if (total_partitions) {
295         percentage = ((num_partitions_merge_complete_ * 100.0) + percentage) / total_partitions;
296     }
297 
298     LOG(DEBUG) << "Merge %: " << percentage
299                << " num_partitions_merge_complete_: " << num_partitions_merge_complete_
300                << " total_partitions: " << total_partitions << " n: " << n;
301     return percentage;
302 }
303 
GetVerificationStatus()304 bool SnapshotHandlerManager::GetVerificationStatus() {
305     std::lock_guard<std::mutex> lock(lock_);
306 
307     bool status = true;
308     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
309         auto& th = (*iter)->thread();
310         if (th.joinable() && status) {
311             status = (*iter)->snapuserd()->CheckPartitionVerification() && status;
312         } else {
313             // return immediately if there is a failure
314             return false;
315         }
316     }
317 
318     return status;
319 }
320 
RemoveAndJoinHandler(const std::string & misc_name)321 bool SnapshotHandlerManager::RemoveAndJoinHandler(const std::string& misc_name) {
322     std::shared_ptr<HandlerThread> handler;
323     {
324         std::lock_guard<std::mutex> lock(lock_);
325 
326         auto iter = FindHandler(&lock, misc_name);
327         if (iter == dm_users_.end()) {
328             // Client already deleted.
329             return true;
330         }
331         handler = std::move(*iter);
332         dm_users_.erase(iter);
333     }
334 
335     auto& th = handler->thread();
336     if (th.joinable()) {
337         th.join();
338     }
339     return true;
340 }
341 
TerminateMergeThreads()342 void SnapshotHandlerManager::TerminateMergeThreads() {
343     std::lock_guard<std::mutex> guard(lock_);
344 
345     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
346         if (!(*iter)->ThreadTerminated()) {
347             (*iter)->snapuserd()->NotifyIOTerminated();
348         }
349     }
350 }
351 
JoinAllThreads()352 void SnapshotHandlerManager::JoinAllThreads() {
353     // Acquire the thread list within the lock.
354     std::vector<std::shared_ptr<HandlerThread>> dm_users;
355     {
356         std::lock_guard<std::mutex> guard(lock_);
357         dm_users = std::move(dm_users_);
358     }
359 
360     for (auto& client : dm_users) {
361         auto& th = client->thread();
362 
363         if (th.joinable()) th.join();
364     }
365 
366     if (merge_monitor_.joinable()) {
367         stop_monitor_merge_thread_ = true;
368         WakeupMonitorMergeThread();
369 
370         merge_monitor_.join();
371     }
372 }
373 
FindHandler(std::lock_guard<std::mutex> * proof_of_lock,const std::string & misc_name)374 auto SnapshotHandlerManager::FindHandler(std::lock_guard<std::mutex>* proof_of_lock,
375                                          const std::string& misc_name) -> HandlerList::iterator {
376     CHECK(proof_of_lock);
377 
378     for (auto iter = dm_users_.begin(); iter != dm_users_.end(); iter++) {
379         if ((*iter)->misc_name() == misc_name) {
380             return iter;
381         }
382     }
383     return dm_users_.end();
384 }
385 
386 }  // namespace snapshot
387 }  // namespace android
388