1 //
2 // Copyright (C) 2020 The Android Open Source_info Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "writer_v3.h"
18 
19 #include <sys/types.h>
20 #include <sys/uio.h>
21 #include <unistd.h>
22 
23 #include <android-base/file.h>
24 #include <android-base/logging.h>
25 #include <android-base/parseint.h>
26 #include <android-base/properties.h>
27 #include <android-base/strings.h>
28 #include <android-base/unique_fd.h>
29 #include <brotli/encode.h>
30 #include <libsnapshot/cow_format.h>
31 #include <libsnapshot/cow_reader.h>
32 #include <libsnapshot/cow_writer.h>
33 #include <lz4.h>
34 #include <zlib.h>
35 
36 #include <fcntl.h>
37 #include <libsnapshot/cow_compress.h>
38 #include <libsnapshot_cow/parser_v3.h>
39 #include <linux/fs.h>
40 #include <storage_literals/storage_literals.h>
41 #include <sys/ioctl.h>
42 #include <unistd.h>
43 #include <numeric>
44 
45 // The info messages here are spammy, but as useful for update_engine. Disable
46 // them when running on the host.
47 #ifdef __ANDROID__
48 #define LOG_INFO LOG(INFO)
49 #else
50 #define LOG_INFO LOG(VERBOSE)
51 #endif
52 
53 namespace android {
54 namespace snapshot {
55 
56 static_assert(sizeof(off_t) == sizeof(uint64_t));
57 
58 using namespace android::storage_literals;
59 using android::base::unique_fd;
60 
61 // Divide |x| by |y| and round up to the nearest integer.
DivRoundUp(uint64_t x,uint64_t y)62 constexpr uint64_t DivRoundUp(uint64_t x, uint64_t y) {
63     return (x + y - 1) / y;
64 }
65 
CowWriterV3(const CowOptions & options,unique_fd && fd)66 CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd)
67     : CowWriterBase(options, std::move(fd)), batch_size_(std::max<size_t>(options.cluster_ops, 1)) {
68     SetupHeaders();
69 }
70 
InitWorkers()71 void CowWriterV3::InitWorkers() {
72     if (num_compress_threads_ <= 1) {
73         LOG_INFO << "Not creating new threads for compression.";
74         return;
75     }
76     compress_threads_.reserve(num_compress_threads_);
77     compress_threads_.clear();
78     threads_.reserve(num_compress_threads_);
79     threads_.clear();
80     for (size_t i = 0; i < num_compress_threads_; i++) {
81         std::unique_ptr<ICompressor> compressor =
82                 ICompressor::Create(compression_, header_.max_compression_size);
83         auto&& wt = compress_threads_.emplace_back(
84                 std::make_unique<CompressWorker>(std::move(compressor)));
85         threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
86     }
87     LOG(INFO) << num_compress_threads_ << " thread used for compression";
88 }
89 
SetupHeaders()90 void CowWriterV3::SetupHeaders() {
91     header_ = {};
92     header_.prefix.magic = kCowMagicNumber;
93     header_.prefix.major_version = 3;
94     header_.prefix.minor_version = 0;
95     header_.prefix.header_size = sizeof(CowHeaderV3);
96     header_.footer_size = 0;
97     header_.op_size = sizeof(CowOperationV3);
98     header_.block_size = options_.block_size;
99     header_.num_merge_ops = options_.num_merge_ops;
100     header_.cluster_ops = 0;
101     if (options_.scratch_space) {
102         header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE;
103     }
104 
105     // v3 specific fields
106     // WIP: not quite sure how some of these are calculated yet, assuming buffer_size is determined
107     // during COW size estimation
108     header_.sequence_data_count = 0;
109 
110     header_.resume_point_count = 0;
111     header_.resume_point_max = kNumResumePoints;
112     header_.op_count = 0;
113     header_.op_count_max = 0;
114     header_.compression_algorithm = kCowCompressNone;
115     header_.max_compression_size = options_.compression_factor;
116 }
117 
ParseOptions()118 bool CowWriterV3::ParseOptions() {
119     if (!header_.max_compression_size || !IsBlockAligned(header_.max_compression_size)) {
120         LOG(ERROR) << "Invalid compression factor: " << header_.max_compression_size;
121         return false;
122     }
123 
124     num_compress_threads_ = std::max(int(options_.num_compress_threads), 1);
125     auto parts = android::base::Split(options_.compression, ",");
126     if (parts.size() > 2) {
127         LOG(ERROR) << "failed to parse compression parameters: invalid argument count: "
128                    << parts.size() << " " << options_.compression;
129         return false;
130     }
131     auto algorithm = CompressionAlgorithmFromString(parts[0]);
132     if (!algorithm) {
133         LOG(ERROR) << "unrecognized compression: " << options_.compression;
134         return false;
135     }
136     header_.compression_algorithm = *algorithm;
137     header_.op_count_max = options_.op_count_max;
138 
139     if (!IsEstimating() && header_.op_count_max == 0) {
140         if (!options_.max_blocks.has_value()) {
141             LOG(ERROR) << "can't size op buffer size since op_count_max is 0 and max_blocks is not "
142                           "set.";
143             return false;
144         }
145         LOG(INFO) << "op count max is read in as 0. Setting to "
146                      "num blocks in partition "
147                   << options_.max_blocks.value();
148         header_.op_count_max = options_.max_blocks.value();
149     }
150 
151     if (parts.size() > 1) {
152         if (!android::base::ParseInt(parts[1], &compression_.compression_level)) {
153             LOG(ERROR) << "failed to parse compression level invalid type: " << parts[1];
154             return false;
155         }
156     } else {
157         compression_.compression_level =
158                 CompressWorker::GetDefaultCompressionLevel(algorithm.value());
159     }
160 
161     compression_.algorithm = *algorithm;
162     if (compression_.algorithm != kCowCompressNone) {
163         compressor_ = ICompressor::Create(compression_, header_.max_compression_size);
164         if (compressor_ == nullptr) {
165             LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
166             return false;
167         }
168     }
169 
170     if (options_.cluster_ops &&
171         (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
172          options_.batch_write)) {
173         batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
174         data_vec_.reserve(batch_size_);
175         cached_data_.reserve(batch_size_);
176         cached_ops_.reserve(batch_size_ * kNonDataOpBufferSize);
177     }
178 
179     if (batch_size_ > 1) {
180         LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_;
181     } else {
182         LOG(INFO) << "Batch writes: disabled";
183     }
184     if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false) &&
185         options_.num_compress_threads) {
186         num_compress_threads_ = options_.num_compress_threads;
187     }
188     InitWorkers();
189 
190     return true;
191 }
192 
~CowWriterV3()193 CowWriterV3::~CowWriterV3() {
194     for (const auto& t : compress_threads_) {
195         t->Finalize();
196     }
197     for (auto& t : threads_) {
198         if (t.joinable()) {
199             t.join();
200         }
201     }
202 }
203 
Initialize(std::optional<uint64_t> label)204 bool CowWriterV3::Initialize(std::optional<uint64_t> label) {
205     if (!InitFd() || !ParseOptions()) {
206         return false;
207     }
208     if (!label) {
209         if (!OpenForWrite()) {
210             return false;
211         }
212     } else {
213         if (!OpenForAppend(*label)) {
214             return false;
215         }
216     }
217     return true;
218 }
219 
OpenForWrite()220 bool CowWriterV3::OpenForWrite() {
221     // This limitation is tied to the data field size in CowOperationV2.
222     // Keeping this for V3 writer <- although we
223     if (header_.block_size > std::numeric_limits<uint16_t>::max()) {
224         LOG(ERROR) << "Block size is too large";
225         return false;
226     }
227 
228     if (lseek(fd_.get(), 0, SEEK_SET) < 0) {
229         PLOG(ERROR) << "lseek failed";
230         return false;
231     }
232 
233     // Headers are not complete, but this ensures the file is at the right
234     // position.
235     if (!android::base::WriteFully(fd_, &header_, sizeof(header_))) {
236         PLOG(ERROR) << "write failed";
237         return false;
238     }
239 
240     if (options_.scratch_space) {
241         // Initialize the scratch space
242         std::string data(header_.buffer_size, 0);
243         if (!android::base::WriteFully(fd_, data.data(), header_.buffer_size)) {
244             PLOG(ERROR) << "writing scratch space failed";
245             return false;
246         }
247     }
248 
249     resume_points_ = std::make_shared<std::vector<ResumePoint>>();
250 
251     if (!Sync()) {
252         LOG(ERROR) << "Header sync failed";
253         return false;
254     }
255     next_data_pos_ = GetDataOffset(header_);
256     return true;
257 }
258 
OpenForAppend(uint64_t label)259 bool CowWriterV3::OpenForAppend(uint64_t label) {
260     CowHeaderV3 header_v3{};
261     if (!ReadCowHeader(fd_, &header_v3)) {
262         LOG(ERROR) << "Couldn't read Cow Header";
263         return false;
264     }
265 
266     header_ = header_v3;
267 
268     CHECK(label >= 0);
269     CowParserV3 parser;
270     if (!parser.Parse(fd_, header_, label)) {
271         PLOG(ERROR) << "unable to parse with given label: " << label;
272         return false;
273     }
274 
275     resume_points_ = parser.resume_points();
276     options_.block_size = header_.block_size;
277     next_data_pos_ = GetDataOffset(header_);
278 
279     TranslatedCowOps ops;
280     parser.Translate(&ops);
281     header_.op_count = ops.ops->size();
282 
283     for (const auto& op : *ops.ops) {
284         next_data_pos_ += op.data_length;
285     }
286 
287     return true;
288 }
289 
CheckOpCount(size_t op_count)290 bool CowWriterV3::CheckOpCount(size_t op_count) {
291     if (IsEstimating()) {
292         return true;
293     }
294     if (header_.op_count + cached_ops_.size() + op_count > header_.op_count_max) {
295         LOG(ERROR) << "Current number of ops on disk: " << header_.op_count
296                    << ", number of ops cached in memory: " << cached_ops_.size()
297                    << ", number of ops attempting to write: " << op_count
298                    << ", this will exceed max op count " << header_.op_count_max;
299         return false;
300     }
301     return true;
302 }
303 
CachedDataSize() const304 size_t CowWriterV3::CachedDataSize() const {
305     size_t size = 0;
306     for (const auto& i : cached_data_) {
307         size += i.size();
308     }
309     return size;
310 }
311 
EmitCopy(uint64_t new_block,uint64_t old_block,uint64_t num_blocks)312 bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) {
313     if (!CheckOpCount(num_blocks)) {
314         return false;
315     }
316     for (size_t i = 0; i < num_blocks; i++) {
317         CowOperationV3& op = cached_ops_.emplace_back();
318         op.set_type(kCowCopyOp);
319         op.new_block = new_block + i;
320         op.set_source(old_block + i);
321     }
322 
323     if (NeedsFlush()) {
324         if (!FlushCacheOps()) {
325             return false;
326         }
327     }
328     return true;
329 }
330 
EmitRawBlocks(uint64_t new_block_start,const void * data,size_t size)331 bool CowWriterV3::EmitRawBlocks(uint64_t new_block_start, const void* data, size_t size) {
332     return EmitBlocks(new_block_start, data, size, 0, 0, kCowReplaceOp);
333 }
334 
EmitXorBlocks(uint32_t new_block_start,const void * data,size_t size,uint32_t old_block,uint16_t offset)335 bool CowWriterV3::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t size,
336                                 uint32_t old_block, uint16_t offset) {
337     return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
338 }
339 
NeedsFlush() const340 bool CowWriterV3::NeedsFlush() const {
341     // Allow bigger batch sizes for ops without data. A single CowOperationV3
342     // struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory,
343     // it's only ~44K.
344     return CachedDataSize() >= batch_size_ * header_.block_size ||
345            cached_ops_.size() >= batch_size_ * kNonDataOpBufferSize;
346 }
347 
ConstructCowOpCompressedBuffers(uint64_t new_block_start,const void * data,uint64_t old_block,uint16_t offset,CowOperationType type,size_t blocks_to_write)348 bool CowWriterV3::ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
349                                                   uint64_t old_block, uint16_t offset,
350                                                   CowOperationType type, size_t blocks_to_write) {
351     size_t compressed_bytes = 0;
352     auto&& blocks = CompressBlocks(blocks_to_write, data, type);
353     if (blocks.empty()) {
354         LOG(ERROR) << "Failed to compress blocks " << new_block_start << ", " << blocks_to_write
355                    << ", actual number of blocks received from compressor " << blocks.size();
356         return false;
357     }
358     if (!CheckOpCount(blocks.size())) {
359         return false;
360     }
361     size_t blocks_written = 0;
362     for (size_t blk_index = 0; blk_index < blocks.size(); blk_index++) {
363         CowOperation& op = cached_ops_.emplace_back();
364         auto& vec = data_vec_.emplace_back();
365         CompressedBuffer buffer = std::move(blocks[blk_index]);
366         auto& compressed_data = cached_data_.emplace_back(std::move(buffer.compressed_data));
367         op.new_block = new_block_start + blocks_written;
368 
369         op.set_type(type);
370         op.set_compression_bits(std::log2(buffer.compression_factor / header_.block_size));
371 
372         if (type == kCowXorOp) {
373             op.set_source((old_block + blocks_written) * header_.block_size + offset);
374         } else {
375             op.set_source(next_data_pos_ + compressed_bytes);
376         }
377 
378         vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
379         op.data_length = vec.iov_len;
380         compressed_bytes += op.data_length;
381         blocks_written += (buffer.compression_factor / header_.block_size);
382     }
383     if (blocks_written != blocks_to_write) {
384         LOG(ERROR) << "Total compressed blocks: " << blocks_written
385                    << " Expected: " << blocks_to_write;
386         return false;
387     }
388     return true;
389 }
390 
EmitBlocks(uint64_t new_block_start,const void * data,size_t size,uint64_t old_block,uint16_t offset,CowOperationType type)391 bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
392                              uint64_t old_block, uint16_t offset, CowOperationType type) {
393     if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
394         LOG(ERROR) << "Compression algorithm is " << compression_.algorithm
395                    << " but compressor is uninitialized.";
396         return false;
397     }
398     const auto bytes = reinterpret_cast<const uint8_t*>(data);
399     size_t num_blocks = (size / header_.block_size);
400     size_t total_written = 0;
401     while (total_written < num_blocks) {
402         size_t chunk = std::min(num_blocks - total_written, batch_size_);
403         if (!ConstructCowOpCompressedBuffers(new_block_start + total_written,
404                                              bytes + header_.block_size * total_written,
405                                              old_block + total_written, offset, type, chunk)) {
406             return false;
407         }
408 
409         if (NeedsFlush() && !FlushCacheOps()) {
410             LOG(ERROR) << "EmitBlocks with compression: write failed. new block: "
411                        << new_block_start << " compression: " << compression_.algorithm
412                        << ", op type: " << type;
413             return false;
414         }
415         total_written += chunk;
416     }
417 
418     return true;
419 }
420 
EmitZeroBlocks(uint64_t new_block_start,const uint64_t num_blocks)421 bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, const uint64_t num_blocks) {
422     if (!CheckOpCount(num_blocks)) {
423         return false;
424     }
425     for (uint64_t i = 0; i < num_blocks; i++) {
426         auto& op = cached_ops_.emplace_back();
427         op.set_type(kCowZeroOp);
428         op.new_block = new_block_start + i;
429     }
430     if (NeedsFlush()) {
431         if (!FlushCacheOps()) {
432             return false;
433         }
434     }
435     return true;
436 }
437 
EmitLabel(uint64_t label)438 bool CowWriterV3::EmitLabel(uint64_t label) {
439     // remove all labels greater than this current one. we want to avoid the situation of adding
440     // in
441     // duplicate labels with differing op values
442     if (!FlushCacheOps()) {
443         LOG(ERROR) << "Failed to flush cached ops before emitting label " << label;
444         return false;
445     }
446     auto remove_if_callback = [&](const auto& resume_point) -> bool {
447         if (resume_point.label >= label) return true;
448         return false;
449     };
450     resume_points_->erase(
451             std::remove_if(resume_points_->begin(), resume_points_->end(), remove_if_callback),
452             resume_points_->end());
453 
454     resume_points_->push_back({label, header_.op_count});
455     header_.resume_point_count++;
456     // remove the oldest resume point if resume_buffer is full
457     while (resume_points_->size() > header_.resume_point_max) {
458         resume_points_->erase(resume_points_->begin());
459     }
460 
461     CHECK_LE(resume_points_->size(), header_.resume_point_max);
462 
463     if (!android::base::WriteFullyAtOffset(fd_, resume_points_->data(),
464                                            resume_points_->size() * sizeof(ResumePoint),
465                                            GetResumeOffset(header_))) {
466         PLOG(ERROR) << "writing resume buffer failed";
467         return false;
468     }
469     return Finalize();
470 }
471 
EmitSequenceData(size_t num_ops,const uint32_t * data)472 bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
473     if (header_.op_count > 0 || !cached_ops_.empty()) {
474         LOG(ERROR) << "There's " << header_.op_count << " operations written to disk and "
475                    << cached_ops_.size()
476                    << " ops cached in memory. Writing sequence data is only allowed before all "
477                       "operation writes.";
478         return false;
479     }
480 
481     header_.sequence_data_count = num_ops;
482 
483     // Ensure next_data_pos_ is updated as previously initialized + the newly added sequence
484     // buffer.
485     CHECK_EQ(next_data_pos_ + header_.sequence_data_count * sizeof(uint32_t),
486              GetDataOffset(header_));
487     next_data_pos_ = GetDataOffset(header_);
488 
489     if (!android::base::WriteFullyAtOffset(fd_, data, sizeof(data[0]) * num_ops,
490                                            GetSequenceOffset(header_))) {
491         PLOG(ERROR) << "writing sequence buffer failed";
492         return false;
493     }
494     return true;
495 }
496 
FlushCacheOps()497 bool CowWriterV3::FlushCacheOps() {
498     if (cached_ops_.empty()) {
499         if (!data_vec_.empty()) {
500             LOG(ERROR) << "Cached ops is empty, but data iovec has size: " << data_vec_.size()
501                        << " this is definitely a bug.";
502             return false;
503         }
504         return true;
505     }
506     size_t bytes_written = 0;
507 
508     for (auto& op : cached_ops_) {
509         if (op.type() == kCowReplaceOp) {
510             op.set_source(next_data_pos_ + bytes_written);
511         }
512         bytes_written += op.data_length;
513     }
514     if (!WriteOperation(cached_ops_, data_vec_)) {
515         LOG(ERROR) << "Failed to flush " << cached_ops_.size() << " ops to disk";
516         return false;
517     }
518     cached_ops_.clear();
519     cached_data_.clear();
520     data_vec_.clear();
521     return true;
522 }
523 
GetCompressionFactor(const size_t blocks_to_compress,CowOperationType type) const524 size_t CowWriterV3::GetCompressionFactor(const size_t blocks_to_compress,
525                                          CowOperationType type) const {
526     // For XOR ops, we don't support bigger block size compression yet.
527     // For bigger block size support, snapshot-merge also has to changed. We
528     // aren't there yet; hence, just stick to 4k for now until
529     // snapshot-merge is ready for XOR operation.
530     if (type == kCowXorOp) {
531         return header_.block_size;
532     }
533 
534     size_t compression_factor = header_.max_compression_size;
535     while (compression_factor > header_.block_size) {
536         size_t num_blocks = compression_factor / header_.block_size;
537         if (blocks_to_compress >= num_blocks) {
538             return compression_factor;
539         }
540         compression_factor >>= 1;
541     }
542     return header_.block_size;
543 }
544 
ProcessBlocksWithNoCompression(const size_t num_blocks,const void * data,CowOperationType type)545 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithNoCompression(
546         const size_t num_blocks, const void* data, CowOperationType type) {
547     size_t blocks_to_compress = num_blocks;
548     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
549     std::vector<CompressedBuffer> compressed_vec;
550 
551     while (blocks_to_compress) {
552         CompressedBuffer buffer;
553 
554         const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
555         size_t num_blocks = compression_factor / header_.block_size;
556 
557         buffer.compression_factor = compression_factor;
558         buffer.compressed_data.resize(compression_factor);
559 
560         // No compression. Just copy the data as-is.
561         std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
562 
563         compressed_vec.push_back(std::move(buffer));
564         blocks_to_compress -= num_blocks;
565         iter += compression_factor;
566     }
567     return compressed_vec;
568 }
569 
ProcessBlocksWithCompression(const size_t num_blocks,const void * data,CowOperationType type)570 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompression(
571         const size_t num_blocks, const void* data, CowOperationType type) {
572     size_t blocks_to_compress = num_blocks;
573     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
574     std::vector<CompressedBuffer> compressed_vec;
575 
576     while (blocks_to_compress) {
577         CompressedBuffer buffer;
578 
579         const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
580         size_t num_blocks = compression_factor / header_.block_size;
581 
582         buffer.compression_factor = compression_factor;
583         // Compress the blocks
584         buffer.compressed_data = compressor_->Compress(iter, compression_factor);
585         if (buffer.compressed_data.empty()) {
586             PLOG(ERROR) << "Compression failed";
587             return {};
588         }
589 
590         // Check if the buffer was indeed compressed
591         if (buffer.compressed_data.size() >= compression_factor) {
592             buffer.compressed_data.resize(compression_factor);
593             std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
594         }
595 
596         compressed_vec.push_back(std::move(buffer));
597         blocks_to_compress -= num_blocks;
598         iter += compression_factor;
599     }
600     return compressed_vec;
601 }
602 
ProcessBlocksWithThreadedCompression(const size_t num_blocks,const void * data,CowOperationType type)603 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression(
604         const size_t num_blocks, const void* data, CowOperationType type) {
605     const size_t num_threads = num_compress_threads_;
606     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
607 
608     // We will alternate which thread to send compress work to. E.g. alternate between T1 and T2
609     // until all blocks are processed
610     std::vector<CompressedBuffer> compressed_vec;
611     int iteration = 0;
612     int blocks_to_compress = static_cast<int>(num_blocks);
613     while (blocks_to_compress) {
614         CompressedBuffer buffer;
615         CompressWorker* worker = compress_threads_[iteration % num_threads].get();
616 
617         const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
618         size_t num_blocks = compression_factor / header_.block_size;
619 
620         worker->EnqueueCompressBlocks(iter, compression_factor, 1);
621         buffer.compression_factor = compression_factor;
622         compressed_vec.push_back(std::move(buffer));
623 
624         iteration++;
625         iter += compression_factor;
626         blocks_to_compress -= num_blocks;
627     }
628 
629     std::vector<std::vector<uint8_t>> compressed_buf;
630     std::vector<std::vector<std::vector<uint8_t>>> worker_buffers(num_threads);
631     compressed_buf.clear();
632     for (size_t i = 0; i < num_threads; i++) {
633         CompressWorker* worker = compress_threads_[i].get();
634         if (!worker->GetCompressedBuffers(&worker_buffers[i])) {
635             return {};
636         }
637     }
638     // compressed_vec | CB 1 | CB 2 | CB 3 | CB 4 | <-compressed buffers
639     //                   t1     t2     t1     t2    <- processed by these threads
640     // Ordering is important here. We need to retrieve the compressed data in the same order we
641     // processed it and assume that that we submit data beginning with the first thread and then
642     // round robin the consecutive data calls. We need to Fetch compressed buffers from the
643     // threads via the same ordering
644     for (size_t i = 0; i < compressed_vec.size(); i++) {
645         compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]);
646     }
647 
648     if (compressed_vec.size() != compressed_buf.size()) {
649         LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size()
650                    << " - Expected: " << compressed_vec.size();
651         return {};
652     }
653 
654     iter = reinterpret_cast<const uint8_t*>(data);
655     // Walk through all the compressed buffers
656     for (size_t i = 0; i < compressed_buf.size(); i++) {
657         auto& buffer = compressed_vec[i];
658         auto& block = compressed_buf[i];
659         size_t block_size = buffer.compression_factor;
660         // Check if the blocks was indeed compressed
661         if (block.size() >= block_size) {
662             buffer.compressed_data.resize(block_size);
663             std::memcpy(buffer.compressed_data.data(), iter, block_size);
664         } else {
665             // Compressed block
666             buffer.compressed_data.resize(block.size());
667             std::memcpy(buffer.compressed_data.data(), block.data(), block.size());
668         }
669         iter += block_size;
670     }
671     return compressed_vec;
672 }
673 
CompressBlocks(const size_t num_blocks,const void * data,CowOperationType type)674 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::CompressBlocks(const size_t num_blocks,
675                                                                        const void* data,
676                                                                        CowOperationType type) {
677     if (compression_.algorithm == kCowCompressNone) {
678         return ProcessBlocksWithNoCompression(num_blocks, data, type);
679     }
680 
681     const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
682 
683     // If no threads are required, just compress the blocks inline.
684     if (num_threads <= 1) {
685         return ProcessBlocksWithCompression(num_blocks, data, type);
686     }
687 
688     return ProcessBlocksWithThreadedCompression(num_blocks, data, type);
689 }
690 
WriteOperation(std::span<const CowOperationV3> ops,std::span<const struct iovec> data)691 bool CowWriterV3::WriteOperation(std::span<const CowOperationV3> ops,
692                                  std::span<const struct iovec> data) {
693     const auto total_data_size =
694             std::transform_reduce(data.begin(), data.end(), 0, std::plus<size_t>{},
695                                   [](const struct iovec& a) { return a.iov_len; });
696     if (IsEstimating()) {
697         header_.op_count += ops.size();
698         if (header_.op_count > header_.op_count_max) {
699             // If we increment op_count_max, the offset of data section would
700             // change. So need to update |next_data_pos_|
701             next_data_pos_ += (header_.op_count - header_.op_count_max) * sizeof(CowOperationV3);
702             header_.op_count_max = header_.op_count;
703         }
704         next_data_pos_ += total_data_size;
705         return true;
706     }
707 
708     if (header_.op_count + ops.size() > header_.op_count_max) {
709         LOG(ERROR) << "Current op count " << header_.op_count << ", attempting to write "
710                    << ops.size() << " ops will exceed the max of " << header_.op_count_max;
711         return false;
712     }
713     const off_t offset = GetOpOffset(header_.op_count, header_);
714     if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) {
715         PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset;
716         return false;
717     }
718     if (!data.empty()) {
719         int total_written = 0;
720         int i = 0;
721         while (i < data.size()) {
722             int chunk = std::min(static_cast<int>(data.size() - i), IOV_MAX);
723 
724             const auto ret = pwritev(fd_, data.data() + i, chunk, next_data_pos_ + total_written);
725             if (ret < 0) {
726                 PLOG(ERROR) << "write failed chunk size of: " << chunk
727                             << " at offset: " << next_data_pos_ + total_written << " " << errno;
728                 return false;
729             }
730             total_written += ret;
731             i += chunk;
732         }
733         if (total_written != total_data_size) {
734             PLOG(ERROR) << "write failed for data vector of size: " << data.size()
735                         << " and total data length: " << total_data_size
736                         << " at offset: " << next_data_pos_ << " " << errno
737                         << ", only wrote: " << total_written;
738             return false;
739         }
740     }
741 
742     header_.op_count += ops.size();
743     next_data_pos_ += total_data_size;
744 
745     return true;
746 }
747 
Finalize()748 bool CowWriterV3::Finalize() {
749     CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
750     CHECK_LE(header_.prefix.header_size, sizeof(header_));
751     if (!FlushCacheOps()) {
752         return false;
753     }
754     if (!android::base::WriteFullyAtOffset(fd_, &header_, header_.prefix.header_size, 0)) {
755         return false;
756     }
757     return Sync();
758 }
759 
GetCowSizeInfo() const760 CowSizeInfo CowWriterV3::GetCowSizeInfo() const {
761     CowSizeInfo info;
762     info.cow_size = next_data_pos_;
763     info.op_count_max = header_.op_count_max;
764     return info;
765 }
766 
767 }  // namespace snapshot
768 }  // namespace android
769