1 // Copyright (C) 2021 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //      http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #pragma once
16 
17 #include <linux/types.h>
18 #include <stdint.h>
19 #include <stdlib.h>
20 #include <sys/mman.h>
21 #include <sys/resource.h>
22 #include <sys/time.h>
23 #include <unistd.h>
24 
25 #include <condition_variable>
26 #include <cstring>
27 #include <future>
28 #include <iostream>
29 #include <limits>
30 #include <mutex>
31 #include <ostream>
32 #include <string>
33 #include <thread>
34 #include <unordered_map>
35 #include <unordered_set>
36 #include <vector>
37 
38 #include <android-base/file.h>
39 #include <android-base/logging.h>
40 #include <android-base/stringprintf.h>
41 #include <android-base/unique_fd.h>
42 #include <ext4_utils/ext4_utils.h>
43 #include <libdm/dm.h>
44 #include <libsnapshot/cow_reader.h>
45 #include <libsnapshot/cow_writer.h>
46 #include <snapuserd/block_server.h>
47 #include <snapuserd/snapuserd_buffer.h>
48 #include <snapuserd/snapuserd_kernel.h>
49 #include <storage_literals/storage_literals.h>
50 #include <system/thread_defs.h>
51 #include "snapuserd_readahead.h"
52 #include "snapuserd_verify.h"
53 
54 namespace android {
55 namespace snapshot {
56 
57 using android::base::unique_fd;
58 using namespace std::chrono_literals;
59 using namespace android::storage_literals;
60 
61 static constexpr size_t PAYLOAD_BUFFER_SZ = (1UL << 20);
62 static_assert(PAYLOAD_BUFFER_SZ >= BLOCK_SZ);
63 
64 static constexpr int kNumWorkerThreads = 4;
65 
66 #define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
67 #define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
68 
69 enum class MERGE_IO_TRANSITION {
70     INVALID,
71     MERGE_READY,
72     MERGE_BEGIN,
73     MERGE_FAILED,
74     MERGE_COMPLETE,
75     IO_TERMINATED,
76     READ_AHEAD_FAILURE
77 };
78 
79 class MergeWorker;
80 class ReadWorker;
81 
82 enum class MERGE_GROUP_STATE {
83     GROUP_MERGE_PENDING,
84     GROUP_MERGE_RA_READY,
85     GROUP_MERGE_IN_PROGRESS,
86     GROUP_MERGE_COMPLETED,
87     GROUP_MERGE_FAILED,
88     GROUP_INVALID,
89 };
90 
91 struct MergeGroupState {
92     MERGE_GROUP_STATE merge_state_;
93     // Ref count I/O when group state
94     // is in "GROUP_MERGE_PENDING"
95     size_t num_ios_in_progress;
96     std::mutex m_lock;
97     std::condition_variable m_cv;
98 
MergeGroupStateMergeGroupState99     MergeGroupState(MERGE_GROUP_STATE state, size_t n_ios)
100         : merge_state_(state), num_ios_in_progress(n_ios) {}
101 };
102 
103 class SnapshotHandler : public std::enable_shared_from_this<SnapshotHandler> {
104   public:
105     SnapshotHandler(std::string misc_name, std::string cow_device, std::string backing_device,
106                     std::string base_path_merge, std::shared_ptr<IBlockServerOpener> opener,
107                     int num_workers, bool use_iouring, bool perform_verification, bool o_direct);
108     bool InitCowDevice();
109     bool Start();
110 
GetControlDevicePath()111     const std::string& GetControlDevicePath() { return control_device_; }
GetMiscName()112     const std::string& GetMiscName() { return misc_name_; }
113     uint64_t GetNumSectors() const;
IsAttached()114     const bool& IsAttached() const { return attached_; }
AttachControlDevice()115     void AttachControlDevice() { attached_ = true; }
116 
117     bool CheckMergeCompletionStatus();
118     bool CommitMerge(int num_merge_ops);
119 
CloseFds()120     void CloseFds() { cow_fd_ = {}; }
121     void FreeResources();
122 
123     bool InitializeWorkers();
124     std::unique_ptr<CowReader> CloneReaderForWorker();
GetSharedPtr()125     std::shared_ptr<SnapshotHandler> GetSharedPtr() { return shared_from_this(); }
126 
GetChunkVec()127     std::vector<std::pair<sector_t, const CowOperation*>>& GetChunkVec() { return chunk_vec_; }
128 
compare(std::pair<sector_t,const CowOperation * > p1,std::pair<sector_t,const CowOperation * > p2)129     static bool compare(std::pair<sector_t, const CowOperation*> p1,
130                         std::pair<sector_t, const CowOperation*> p2) {
131         return p1.first < p2.first;
132     }
133 
134     void UnmapBufferRegion();
135     bool MmapMetadata();
136 
137     // Read-ahead related functions
GetMappedAddr()138     void* GetMappedAddr() { return mapped_addr_; }
139     void PrepareReadAhead();
GetReadAheadMap()140     std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
141 
142     // State transitions for merge
143     void InitiateMerge();
144     void MonitorMerge();
145     void WakeupMonitorMergeThread();
146     void WaitForMergeComplete();
147     bool WaitForMergeBegin();
148     void RaThreadStarted();
149     void WaitForRaThreadToStart();
150     void NotifyRAForMergeReady();
151     bool WaitForMergeReady();
152     void MergeFailed();
153     bool IsIOTerminated();
154     void MergeCompleted();
155     void NotifyIOTerminated();
156     bool ReadAheadIOCompleted(bool sync);
157     void ReadAheadIOFailed();
158 
ShouldReconstructDataFromCow()159     bool ShouldReconstructDataFromCow() { return populate_data_from_cow_; }
FinishReconstructDataFromCow()160     void FinishReconstructDataFromCow() { populate_data_from_cow_ = false; }
161     void MarkMergeComplete();
162     // Return the snapshot status
163     std::string GetMergeStatus();
164 
165     // RA related functions
166     uint64_t GetBufferMetadataOffset();
167     size_t GetBufferMetadataSize();
168     size_t GetBufferDataOffset();
169     size_t GetBufferDataSize();
170 
171     // Total number of blocks to be merged in a given read-ahead buffer region
SetMergedBlockCountForNextCommit(int x)172     void SetMergedBlockCountForNextCommit(int x) { total_ra_blocks_merged_ = x; }
GetTotalBlocksToMerge()173     int GetTotalBlocksToMerge() { return total_ra_blocks_merged_; }
MergeInitiated()174     bool MergeInitiated() { return merge_initiated_; }
MergeMonitored()175     bool MergeMonitored() { return merge_monitored_; }
GetMergePercentage()176     double GetMergePercentage() { return merge_completion_percentage_; }
177 
178     // Merge Block State Transitions
179     void SetMergeCompleted(size_t block_index);
180     void SetMergeInProgress(size_t block_index);
181     void SetMergeFailed(size_t block_index);
182     void NotifyIOCompletion(uint64_t new_block);
183     bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
184     MERGE_GROUP_STATE ProcessMergingBlock(uint64_t new_block, void* buffer);
185 
186     bool IsIouringSupported();
187     bool CheckPartitionVerification();
188 
189   private:
190     bool ReadMetadata();
ChunkToSector(chunk_t chunk)191     sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
SectorToChunk(sector_t sector)192     chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
IsBlockAligned(uint64_t read_size)193     bool IsBlockAligned(uint64_t read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
194     struct BufferState* GetBufferState();
195     void UpdateMergeCompletionPercentage();
196 
197     // COW device
198     std::string cow_device_;
199     // Source device
200     std::string backing_store_device_;
201     // dm-user control device
202     std::string control_device_;
203     std::string misc_name_;
204     // Base device for merging
205     std::string base_path_merge_;
206 
207     unique_fd cow_fd_;
208 
209     std::unique_ptr<CowReader> reader_;
210 
211     // chunk_vec stores the pseudo mapping of sector
212     // to COW operations.
213     std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_;
214 
215     std::mutex lock_;
216     std::condition_variable cv;
217 
218     void* mapped_addr_;
219     size_t total_mapped_addr_length_;
220 
221     std::vector<std::unique_ptr<ReadWorker>> worker_threads_;
222     // Read-ahead related
223     bool populate_data_from_cow_ = false;
224     bool ra_thread_ = false;
225     bool ra_thread_started_ = false;
226     int total_ra_blocks_merged_ = 0;
227     MERGE_IO_TRANSITION io_state_ = MERGE_IO_TRANSITION::INVALID;
228     std::unique_ptr<ReadAhead> read_ahead_thread_;
229     std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
230 
231     // user-space-merging
232     std::unordered_map<uint64_t, int> block_to_ra_index_;
233 
234     // Merge Block state
235     std::vector<std::unique_ptr<MergeGroupState>> merge_blk_state_;
236 
237     std::unique_ptr<MergeWorker> merge_thread_;
238     double merge_completion_percentage_;
239 
240     bool merge_initiated_ = false;
241     bool merge_monitored_ = false;
242     bool attached_ = false;
243     bool is_io_uring_enabled_ = false;
244     bool scratch_space_ = false;
245     int num_worker_threads_ = kNumWorkerThreads;
246     bool perform_verification_ = true;
247     bool resume_merge_ = false;
248     bool merge_complete_ = false;
249     bool o_direct_ = false;
250 
251     std::unique_ptr<UpdateVerify> update_verify_;
252     std::shared_ptr<IBlockServerOpener> block_server_opener_;
253 };
254 
255 std::ostream& operator<<(std::ostream& os, MERGE_IO_TRANSITION value);
256 static_assert(sizeof(off_t) == sizeof(uint64_t));
257 
258 }  // namespace snapshot
259 }  // namespace android
260