1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "snapuserd_core.h"
18 
19 #include <android-base/chrono_utils.h>
20 #include <android-base/properties.h>
21 #include <android-base/scopeguard.h>
22 #include <android-base/strings.h>
23 #include <snapuserd/dm_user_block_server.h>
24 
25 #include "merge_worker.h"
26 #include "read_worker.h"
27 #include "utility.h"
28 
29 namespace android {
30 namespace snapshot {
31 
32 using namespace android;
33 using namespace android::dm;
34 using android::base::unique_fd;
35 
SnapshotHandler(std::string misc_name,std::string cow_device,std::string backing_device,std::string base_path_merge,std::shared_ptr<IBlockServerOpener> opener,int num_worker_threads,bool use_iouring,bool perform_verification,bool o_direct)36 SnapshotHandler::SnapshotHandler(std::string misc_name, std::string cow_device,
37                                  std::string backing_device, std::string base_path_merge,
38                                  std::shared_ptr<IBlockServerOpener> opener, int num_worker_threads,
39                                  bool use_iouring, bool perform_verification, bool o_direct) {
40     misc_name_ = std::move(misc_name);
41     cow_device_ = std::move(cow_device);
42     backing_store_device_ = std::move(backing_device);
43     block_server_opener_ = std::move(opener);
44     base_path_merge_ = std::move(base_path_merge);
45     num_worker_threads_ = num_worker_threads;
46     is_io_uring_enabled_ = use_iouring;
47     perform_verification_ = perform_verification;
48     o_direct_ = o_direct;
49 }
50 
InitializeWorkers()51 bool SnapshotHandler::InitializeWorkers() {
52     for (int i = 0; i < num_worker_threads_; i++) {
53         auto wt = std::make_unique<ReadWorker>(cow_device_, backing_store_device_, misc_name_,
54                                                base_path_merge_, GetSharedPtr(),
55                                                block_server_opener_, o_direct_);
56         if (!wt->Init()) {
57             SNAP_LOG(ERROR) << "Thread initialization failed";
58             return false;
59         }
60 
61         worker_threads_.push_back(std::move(wt));
62     }
63 
64     merge_thread_ = std::make_unique<MergeWorker>(cow_device_, misc_name_, base_path_merge_,
65                                                   GetSharedPtr());
66 
67     read_ahead_thread_ = std::make_unique<ReadAhead>(cow_device_, backing_store_device_, misc_name_,
68                                                      GetSharedPtr());
69 
70     update_verify_ = std::make_unique<UpdateVerify>(misc_name_);
71 
72     return true;
73 }
74 
CloneReaderForWorker()75 std::unique_ptr<CowReader> SnapshotHandler::CloneReaderForWorker() {
76     return reader_->CloneCowReader();
77 }
78 
UpdateMergeCompletionPercentage()79 void SnapshotHandler::UpdateMergeCompletionPercentage() {
80     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
81     merge_completion_percentage_ = (ch->num_merge_ops * 100.0) / reader_->get_num_total_data_ops();
82 
83     SNAP_LOG(DEBUG) << "Merge-complete %: " << merge_completion_percentage_
84                     << " num_merge_ops: " << ch->num_merge_ops
85                     << " total-ops: " << reader_->get_num_total_data_ops();
86 
87     if (ch->num_merge_ops == reader_->get_num_total_data_ops()) {
88         MarkMergeComplete();
89     }
90 }
91 
CommitMerge(int num_merge_ops)92 bool SnapshotHandler::CommitMerge(int num_merge_ops) {
93     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
94     ch->num_merge_ops += num_merge_ops;
95 
96     if (scratch_space_) {
97         if (ra_thread_) {
98             struct BufferState* ra_state = GetBufferState();
99             ra_state->read_ahead_state = kCowReadAheadInProgress;
100         }
101 
102         int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
103         if (ret < 0) {
104             SNAP_PLOG(ERROR) << "msync header failed: " << ret;
105             return false;
106         }
107     } else {
108         reader_->UpdateMergeOpsCompleted(num_merge_ops);
109         const auto& header = reader_->GetHeader();
110 
111         if (lseek(cow_fd_.get(), 0, SEEK_SET) < 0) {
112             SNAP_PLOG(ERROR) << "lseek failed";
113             return false;
114         }
115 
116         if (!android::base::WriteFully(cow_fd_, &header, header.prefix.header_size)) {
117             SNAP_PLOG(ERROR) << "Write to header failed";
118             return false;
119         }
120 
121         if (fsync(cow_fd_.get()) < 0) {
122             SNAP_PLOG(ERROR) << "fsync failed";
123             return false;
124         }
125     }
126 
127     // Update the merge completion - this is used by update engine
128     // to track the completion. No need to take a lock. It is ok
129     // even if there is a miss on reading a latest updated value.
130     // Subsequent polling will eventually converge to completion.
131     UpdateMergeCompletionPercentage();
132 
133     return true;
134 }
135 
PrepareReadAhead()136 void SnapshotHandler::PrepareReadAhead() {
137     struct BufferState* ra_state = GetBufferState();
138     // Check if the data has to be re-constructed from COW device
139     if (ra_state->read_ahead_state == kCowReadAheadDone) {
140         populate_data_from_cow_ = true;
141     } else {
142         populate_data_from_cow_ = false;
143     }
144 
145     NotifyRAForMergeReady();
146 }
147 
CheckMergeCompletionStatus()148 bool SnapshotHandler::CheckMergeCompletionStatus() {
149     if (!merge_initiated_) {
150         SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
151                        << reader_->get_num_total_data_ops();
152         return false;
153     }
154 
155     struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
156 
157     SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
158                    << " Total-data-ops: " << reader_->get_num_total_data_ops();
159     return true;
160 }
161 
ReadMetadata()162 bool SnapshotHandler::ReadMetadata() {
163     reader_ = std::make_unique<CowReader>(CowReader::ReaderFlags::USERSPACE_MERGE, true);
164     CowOptions options;
165 
166     SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
167 
168     if (!reader_->Parse(cow_fd_)) {
169         SNAP_LOG(ERROR) << "Failed to parse";
170         return false;
171     }
172 
173     const auto& header = reader_->GetHeader();
174     if (!(header.block_size == BLOCK_SZ)) {
175         SNAP_LOG(ERROR) << "Invalid header block size found: " << header.block_size;
176         return false;
177     }
178 
179     SNAP_LOG(INFO) << "Merge-ops: " << header.num_merge_ops;
180     if (header.num_merge_ops) {
181         resume_merge_ = true;
182         SNAP_LOG(INFO) << "Resume Snapshot-merge";
183     }
184 
185     if (!MmapMetadata()) {
186         SNAP_LOG(ERROR) << "mmap failed";
187         return false;
188     }
189 
190     UpdateMergeCompletionPercentage();
191 
192     // Initialize the iterator for reading metadata
193     std::unique_ptr<ICowOpIter> cowop_iter = reader_->GetOpIter(true);
194 
195     int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
196     int ra_index = 0;
197 
198     size_t copy_ops = 0, replace_ops = 0, zero_ops = 0, xor_ops = 0;
199 
200     while (!cowop_iter->AtEnd()) {
201         const CowOperation* cow_op = cowop_iter->Get();
202 
203         if (cow_op->type() == kCowCopyOp) {
204             copy_ops += 1;
205         } else if (cow_op->type() == kCowReplaceOp) {
206             replace_ops += 1;
207         } else if (cow_op->type() == kCowZeroOp) {
208             zero_ops += 1;
209         } else if (cow_op->type() == kCowXorOp) {
210             xor_ops += 1;
211         }
212 
213         chunk_vec_.push_back(std::make_pair(ChunkToSector(cow_op->new_block), cow_op));
214 
215         if (IsOrderedOp(*cow_op)) {
216             ra_thread_ = true;
217             block_to_ra_index_[cow_op->new_block] = ra_index;
218             num_ra_ops_per_iter -= 1;
219 
220             if ((ra_index + 1) - merge_blk_state_.size() == 1) {
221                 std::unique_ptr<MergeGroupState> blk_state = std::make_unique<MergeGroupState>(
222                         MERGE_GROUP_STATE::GROUP_MERGE_PENDING, 0);
223 
224                 merge_blk_state_.push_back(std::move(blk_state));
225             }
226 
227             // Move to next RA block
228             if (num_ra_ops_per_iter == 0) {
229                 num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
230                 ra_index += 1;
231             }
232         }
233         cowop_iter->Next();
234     }
235 
236     chunk_vec_.shrink_to_fit();
237 
238     // Sort the vector based on sectors as we need this during un-aligned access
239     std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
240 
241     PrepareReadAhead();
242 
243     SNAP_LOG(INFO) << "Merged-ops: " << header.num_merge_ops
244                    << " Total-data-ops: " << reader_->get_num_total_data_ops()
245                    << " Unmerged-ops: " << chunk_vec_.size() << " Copy-ops: " << copy_ops
246                    << " Zero-ops: " << zero_ops << " Replace-ops: " << replace_ops
247                    << " Xor-ops: " << xor_ops;
248 
249     return true;
250 }
251 
MmapMetadata()252 bool SnapshotHandler::MmapMetadata() {
253     const auto& header = reader_->GetHeader();
254 
255     total_mapped_addr_length_ = header.prefix.header_size + BUFFER_REGION_DEFAULT_SIZE;
256 
257     if (header.prefix.major_version >= 2 && header.buffer_size > 0) {
258         scratch_space_ = true;
259     }
260 
261     if (scratch_space_) {
262         mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
263                             cow_fd_.get(), 0);
264     } else {
265         mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE,
266                             MAP_SHARED | MAP_ANONYMOUS, -1, 0);
267         struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
268         ch->num_merge_ops = header.num_merge_ops;
269     }
270 
271     if (mapped_addr_ == MAP_FAILED) {
272         SNAP_LOG(ERROR) << "mmap metadata failed";
273         return false;
274     }
275 
276     return true;
277 }
278 
UnmapBufferRegion()279 void SnapshotHandler::UnmapBufferRegion() {
280     int ret = munmap(mapped_addr_, total_mapped_addr_length_);
281     if (ret < 0) {
282         SNAP_PLOG(ERROR) << "munmap failed";
283     }
284 }
285 
InitCowDevice()286 bool SnapshotHandler::InitCowDevice() {
287     cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
288     if (cow_fd_ < 0) {
289         SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
290         return false;
291     }
292 
293     return ReadMetadata();
294 }
295 
296 /*
297  * Entry point to launch threads
298  */
Start()299 bool SnapshotHandler::Start() {
300     std::vector<std::future<bool>> threads;
301     std::future<bool> ra_thread_status;
302 
303     if (ra_thread_) {
304         ra_thread_status =
305                 std::async(std::launch::async, &ReadAhead::RunThread, read_ahead_thread_.get());
306         // If this is a merge-resume path, wait until RA thread is fully up as
307         // the data has to be re-constructed from the scratch space.
308         if (resume_merge_ && ShouldReconstructDataFromCow()) {
309             WaitForRaThreadToStart();
310         }
311     }
312 
313     // Launch worker threads
314     for (int i = 0; i < worker_threads_.size(); i++) {
315         threads.emplace_back(
316                 std::async(std::launch::async, &ReadWorker::Run, worker_threads_[i].get()));
317     }
318 
319     std::future<bool> merge_thread =
320             std::async(std::launch::async, &MergeWorker::Run, merge_thread_.get());
321 
322     // Now that the worker threads are up, scan the partitions.
323     // If the snapshot-merge is being resumed, there is no need to scan as the
324     // current slot is already marked as boot complete.
325     if (perform_verification_ && !resume_merge_) {
326         update_verify_->VerifyUpdatePartition();
327     }
328 
329     bool ret = true;
330     for (auto& t : threads) {
331         ret = t.get() && ret;
332     }
333 
334     // Worker threads are terminated by this point - this can only happen:
335     //
336     // 1: If dm-user device is destroyed
337     // 2: We had an I/O failure when reading root partitions
338     //
339     // In case (1), this would be a graceful shutdown. In this case, merge
340     // thread and RA thread should have already terminated by this point. We will be
341     // destroying the dm-user device only _after_ merge is completed.
342     //
343     // In case (2), if merge thread had started, then it will be
344     // continuing to merge; however, since we had an I/O failure and the
345     // I/O on root partitions are no longer served, we will terminate the
346     // merge
347 
348     NotifyIOTerminated();
349 
350     bool read_ahead_retval = false;
351 
352     SNAP_LOG(INFO) << "Snapshot I/O terminated. Waiting for merge thread....";
353     bool merge_thread_status = merge_thread.get();
354 
355     if (ra_thread_) {
356         read_ahead_retval = ra_thread_status.get();
357     }
358 
359     SNAP_LOG(INFO) << "Worker threads terminated with ret: " << ret
360                    << " Merge-thread with ret: " << merge_thread_status
361                    << " RA-thread with ret: " << read_ahead_retval;
362     return ret;
363 }
364 
GetBufferMetadataOffset()365 uint64_t SnapshotHandler::GetBufferMetadataOffset() {
366     const auto& header = reader_->GetHeader();
367 
368     return (header.prefix.header_size + sizeof(BufferState));
369 }
370 
371 /*
372  * Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
373  * end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
374  * region is split into:
375  *
376  * 1: 8k metadata
377  * 2: Scratch space
378  *
379  */
GetBufferMetadataSize()380 size_t SnapshotHandler::GetBufferMetadataSize() {
381     const auto& header = reader_->GetHeader();
382     size_t buffer_size = header.buffer_size;
383 
384     // If there is no scratch space, then just use the
385     // anonymous memory
386     if (buffer_size == 0) {
387         buffer_size = BUFFER_REGION_DEFAULT_SIZE;
388     }
389 
390     return ((buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ);
391 }
392 
GetBufferDataOffset()393 size_t SnapshotHandler::GetBufferDataOffset() {
394     const auto& header = reader_->GetHeader();
395 
396     return (header.prefix.header_size + GetBufferMetadataSize());
397 }
398 
399 /*
400  * (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
401  */
GetBufferDataSize()402 size_t SnapshotHandler::GetBufferDataSize() {
403     const auto& header = reader_->GetHeader();
404     size_t buffer_size = header.buffer_size;
405 
406     // If there is no scratch space, then just use the
407     // anonymous memory
408     if (buffer_size == 0) {
409         buffer_size = BUFFER_REGION_DEFAULT_SIZE;
410     }
411 
412     return (buffer_size - GetBufferMetadataSize());
413 }
414 
GetBufferState()415 struct BufferState* SnapshotHandler::GetBufferState() {
416     const auto& header = reader_->GetHeader();
417 
418     struct BufferState* ra_state =
419             reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.prefix.header_size);
420     return ra_state;
421 }
422 
IsIouringSupported()423 bool SnapshotHandler::IsIouringSupported() {
424     if (!KernelSupportsIoUring()) {
425         return false;
426     }
427 
428     // During selinux init transition, libsnapshot will propagate the
429     // status of io_uring enablement. As properties are not initialized,
430     // we cannot query system property.
431     if (is_io_uring_enabled_) {
432         return true;
433     }
434 
435     // Finally check the system property
436     return android::base::GetBoolProperty("ro.virtual_ab.io_uring.enabled", false);
437 }
438 
CheckPartitionVerification()439 bool SnapshotHandler::CheckPartitionVerification() {
440     return update_verify_->CheckPartitionVerification();
441 }
442 
FreeResources()443 void SnapshotHandler::FreeResources() {
444     worker_threads_.clear();
445     read_ahead_thread_ = nullptr;
446     merge_thread_ = nullptr;
447 }
448 
GetNumSectors() const449 uint64_t SnapshotHandler::GetNumSectors() const {
450     unique_fd fd(TEMP_FAILURE_RETRY(open(base_path_merge_.c_str(), O_RDONLY | O_CLOEXEC)));
451     if (fd < 0) {
452         SNAP_LOG(ERROR) << "Cannot open base path: " << base_path_merge_;
453         return false;
454     }
455 
456     uint64_t dev_sz = get_block_device_size(fd.get());
457     if (!dev_sz) {
458         SNAP_LOG(ERROR) << "Failed to find block device size: " << base_path_merge_;
459         return false;
460     }
461 
462     return dev_sz / SECTOR_SIZE;
463 }
464 
465 }  // namespace snapshot
466 }  // namespace android
467