1 //
2 // Copyright (C) 2020 The Android Open Source_info Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "writer_v3.h"
18
19 #include <sys/types.h>
20 #include <sys/uio.h>
21 #include <unistd.h>
22
23 #include <android-base/file.h>
24 #include <android-base/logging.h>
25 #include <android-base/parseint.h>
26 #include <android-base/properties.h>
27 #include <android-base/strings.h>
28 #include <android-base/unique_fd.h>
29 #include <brotli/encode.h>
30 #include <libsnapshot/cow_format.h>
31 #include <libsnapshot/cow_reader.h>
32 #include <libsnapshot/cow_writer.h>
33 #include <lz4.h>
34 #include <zlib.h>
35
36 #include <fcntl.h>
37 #include <libsnapshot/cow_compress.h>
38 #include <libsnapshot_cow/parser_v3.h>
39 #include <linux/fs.h>
40 #include <storage_literals/storage_literals.h>
41 #include <sys/ioctl.h>
42 #include <unistd.h>
43 #include <numeric>
44
45 // The info messages here are spammy, but as useful for update_engine. Disable
46 // them when running on the host.
47 #ifdef __ANDROID__
48 #define LOG_INFO LOG(INFO)
49 #else
50 #define LOG_INFO LOG(VERBOSE)
51 #endif
52
53 namespace android {
54 namespace snapshot {
55
56 static_assert(sizeof(off_t) == sizeof(uint64_t));
57
58 using namespace android::storage_literals;
59 using android::base::unique_fd;
60
61 // Divide |x| by |y| and round up to the nearest integer.
DivRoundUp(uint64_t x,uint64_t y)62 constexpr uint64_t DivRoundUp(uint64_t x, uint64_t y) {
63 return (x + y - 1) / y;
64 }
65
CowWriterV3(const CowOptions & options,unique_fd && fd)66 CowWriterV3::CowWriterV3(const CowOptions& options, unique_fd&& fd)
67 : CowWriterBase(options, std::move(fd)), batch_size_(std::max<size_t>(options.cluster_ops, 1)) {
68 SetupHeaders();
69 }
70
InitWorkers()71 void CowWriterV3::InitWorkers() {
72 if (num_compress_threads_ <= 1) {
73 LOG_INFO << "Not creating new threads for compression.";
74 return;
75 }
76 compress_threads_.reserve(num_compress_threads_);
77 compress_threads_.clear();
78 threads_.reserve(num_compress_threads_);
79 threads_.clear();
80 for (size_t i = 0; i < num_compress_threads_; i++) {
81 std::unique_ptr<ICompressor> compressor =
82 ICompressor::Create(compression_, header_.max_compression_size);
83 auto&& wt = compress_threads_.emplace_back(
84 std::make_unique<CompressWorker>(std::move(compressor)));
85 threads_.emplace_back(std::thread([wt = wt.get()]() { wt->RunThread(); }));
86 }
87 LOG(INFO) << num_compress_threads_ << " thread used for compression";
88 }
89
SetupHeaders()90 void CowWriterV3::SetupHeaders() {
91 header_ = {};
92 header_.prefix.magic = kCowMagicNumber;
93 header_.prefix.major_version = 3;
94 header_.prefix.minor_version = 0;
95 header_.prefix.header_size = sizeof(CowHeaderV3);
96 header_.footer_size = 0;
97 header_.op_size = sizeof(CowOperationV3);
98 header_.block_size = options_.block_size;
99 header_.num_merge_ops = options_.num_merge_ops;
100 header_.cluster_ops = 0;
101 if (options_.scratch_space) {
102 header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE;
103 }
104
105 // v3 specific fields
106 // WIP: not quite sure how some of these are calculated yet, assuming buffer_size is determined
107 // during COW size estimation
108 header_.sequence_data_count = 0;
109
110 header_.resume_point_count = 0;
111 header_.resume_point_max = kNumResumePoints;
112 header_.op_count = 0;
113 header_.op_count_max = 0;
114 header_.compression_algorithm = kCowCompressNone;
115 header_.max_compression_size = options_.compression_factor;
116 }
117
ParseOptions()118 bool CowWriterV3::ParseOptions() {
119 if (!header_.max_compression_size || !IsBlockAligned(header_.max_compression_size)) {
120 LOG(ERROR) << "Invalid compression factor: " << header_.max_compression_size;
121 return false;
122 }
123
124 num_compress_threads_ = std::max(int(options_.num_compress_threads), 1);
125 auto parts = android::base::Split(options_.compression, ",");
126 if (parts.size() > 2) {
127 LOG(ERROR) << "failed to parse compression parameters: invalid argument count: "
128 << parts.size() << " " << options_.compression;
129 return false;
130 }
131 auto algorithm = CompressionAlgorithmFromString(parts[0]);
132 if (!algorithm) {
133 LOG(ERROR) << "unrecognized compression: " << options_.compression;
134 return false;
135 }
136 header_.compression_algorithm = *algorithm;
137 header_.op_count_max = options_.op_count_max;
138
139 if (!IsEstimating() && header_.op_count_max == 0) {
140 if (!options_.max_blocks.has_value()) {
141 LOG(ERROR) << "can't size op buffer size since op_count_max is 0 and max_blocks is not "
142 "set.";
143 return false;
144 }
145 LOG(INFO) << "op count max is read in as 0. Setting to "
146 "num blocks in partition "
147 << options_.max_blocks.value();
148 header_.op_count_max = options_.max_blocks.value();
149 }
150
151 if (parts.size() > 1) {
152 if (!android::base::ParseInt(parts[1], &compression_.compression_level)) {
153 LOG(ERROR) << "failed to parse compression level invalid type: " << parts[1];
154 return false;
155 }
156 } else {
157 compression_.compression_level =
158 CompressWorker::GetDefaultCompressionLevel(algorithm.value());
159 }
160
161 compression_.algorithm = *algorithm;
162 if (compression_.algorithm != kCowCompressNone) {
163 compressor_ = ICompressor::Create(compression_, header_.max_compression_size);
164 if (compressor_ == nullptr) {
165 LOG(ERROR) << "Failed to create compressor for " << compression_.algorithm;
166 return false;
167 }
168 }
169
170 if (options_.cluster_ops &&
171 (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
172 options_.batch_write)) {
173 batch_size_ = std::max<size_t>(options_.cluster_ops, 1);
174 data_vec_.reserve(batch_size_);
175 cached_data_.reserve(batch_size_);
176 cached_ops_.reserve(batch_size_ * kNonDataOpBufferSize);
177 }
178
179 if (batch_size_ > 1) {
180 LOG(INFO) << "Batch writes: enabled with batch size " << batch_size_;
181 } else {
182 LOG(INFO) << "Batch writes: disabled";
183 }
184 if (android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false) &&
185 options_.num_compress_threads) {
186 num_compress_threads_ = options_.num_compress_threads;
187 }
188 InitWorkers();
189
190 return true;
191 }
192
~CowWriterV3()193 CowWriterV3::~CowWriterV3() {
194 for (const auto& t : compress_threads_) {
195 t->Finalize();
196 }
197 for (auto& t : threads_) {
198 if (t.joinable()) {
199 t.join();
200 }
201 }
202 }
203
Initialize(std::optional<uint64_t> label)204 bool CowWriterV3::Initialize(std::optional<uint64_t> label) {
205 if (!InitFd() || !ParseOptions()) {
206 return false;
207 }
208 if (!label) {
209 if (!OpenForWrite()) {
210 return false;
211 }
212 } else {
213 if (!OpenForAppend(*label)) {
214 return false;
215 }
216 }
217 return true;
218 }
219
OpenForWrite()220 bool CowWriterV3::OpenForWrite() {
221 // This limitation is tied to the data field size in CowOperationV2.
222 // Keeping this for V3 writer <- although we
223 if (header_.block_size > std::numeric_limits<uint16_t>::max()) {
224 LOG(ERROR) << "Block size is too large";
225 return false;
226 }
227
228 if (lseek(fd_.get(), 0, SEEK_SET) < 0) {
229 PLOG(ERROR) << "lseek failed";
230 return false;
231 }
232
233 // Headers are not complete, but this ensures the file is at the right
234 // position.
235 if (!android::base::WriteFully(fd_, &header_, sizeof(header_))) {
236 PLOG(ERROR) << "write failed";
237 return false;
238 }
239
240 if (options_.scratch_space) {
241 // Initialize the scratch space
242 std::string data(header_.buffer_size, 0);
243 if (!android::base::WriteFully(fd_, data.data(), header_.buffer_size)) {
244 PLOG(ERROR) << "writing scratch space failed";
245 return false;
246 }
247 }
248
249 resume_points_ = std::make_shared<std::vector<ResumePoint>>();
250
251 if (!Sync()) {
252 LOG(ERROR) << "Header sync failed";
253 return false;
254 }
255 next_data_pos_ = GetDataOffset(header_);
256 return true;
257 }
258
OpenForAppend(uint64_t label)259 bool CowWriterV3::OpenForAppend(uint64_t label) {
260 CowHeaderV3 header_v3{};
261 if (!ReadCowHeader(fd_, &header_v3)) {
262 LOG(ERROR) << "Couldn't read Cow Header";
263 return false;
264 }
265
266 header_ = header_v3;
267
268 CHECK(label >= 0);
269 CowParserV3 parser;
270 if (!parser.Parse(fd_, header_, label)) {
271 PLOG(ERROR) << "unable to parse with given label: " << label;
272 return false;
273 }
274
275 resume_points_ = parser.resume_points();
276 options_.block_size = header_.block_size;
277 next_data_pos_ = GetDataOffset(header_);
278
279 TranslatedCowOps ops;
280 parser.Translate(&ops);
281 header_.op_count = ops.ops->size();
282
283 for (const auto& op : *ops.ops) {
284 next_data_pos_ += op.data_length;
285 }
286
287 return true;
288 }
289
CheckOpCount(size_t op_count)290 bool CowWriterV3::CheckOpCount(size_t op_count) {
291 if (IsEstimating()) {
292 return true;
293 }
294 if (header_.op_count + cached_ops_.size() + op_count > header_.op_count_max) {
295 LOG(ERROR) << "Current number of ops on disk: " << header_.op_count
296 << ", number of ops cached in memory: " << cached_ops_.size()
297 << ", number of ops attempting to write: " << op_count
298 << ", this will exceed max op count " << header_.op_count_max;
299 return false;
300 }
301 return true;
302 }
303
CachedDataSize() const304 size_t CowWriterV3::CachedDataSize() const {
305 size_t size = 0;
306 for (const auto& i : cached_data_) {
307 size += i.size();
308 }
309 return size;
310 }
311
EmitCopy(uint64_t new_block,uint64_t old_block,uint64_t num_blocks)312 bool CowWriterV3::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) {
313 if (!CheckOpCount(num_blocks)) {
314 return false;
315 }
316 for (size_t i = 0; i < num_blocks; i++) {
317 CowOperationV3& op = cached_ops_.emplace_back();
318 op.set_type(kCowCopyOp);
319 op.new_block = new_block + i;
320 op.set_source(old_block + i);
321 }
322
323 if (NeedsFlush()) {
324 if (!FlushCacheOps()) {
325 return false;
326 }
327 }
328 return true;
329 }
330
EmitRawBlocks(uint64_t new_block_start,const void * data,size_t size)331 bool CowWriterV3::EmitRawBlocks(uint64_t new_block_start, const void* data, size_t size) {
332 return EmitBlocks(new_block_start, data, size, 0, 0, kCowReplaceOp);
333 }
334
EmitXorBlocks(uint32_t new_block_start,const void * data,size_t size,uint32_t old_block,uint16_t offset)335 bool CowWriterV3::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t size,
336 uint32_t old_block, uint16_t offset) {
337 return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
338 }
339
NeedsFlush() const340 bool CowWriterV3::NeedsFlush() const {
341 // Allow bigger batch sizes for ops without data. A single CowOperationV3
342 // struct uses 14 bytes of memory, even if we cache 200 * 16 ops in memory,
343 // it's only ~44K.
344 return CachedDataSize() >= batch_size_ * header_.block_size ||
345 cached_ops_.size() >= batch_size_ * kNonDataOpBufferSize;
346 }
347
ConstructCowOpCompressedBuffers(uint64_t new_block_start,const void * data,uint64_t old_block,uint16_t offset,CowOperationType type,size_t blocks_to_write)348 bool CowWriterV3::ConstructCowOpCompressedBuffers(uint64_t new_block_start, const void* data,
349 uint64_t old_block, uint16_t offset,
350 CowOperationType type, size_t blocks_to_write) {
351 size_t compressed_bytes = 0;
352 auto&& blocks = CompressBlocks(blocks_to_write, data, type);
353 if (blocks.empty()) {
354 LOG(ERROR) << "Failed to compress blocks " << new_block_start << ", " << blocks_to_write
355 << ", actual number of blocks received from compressor " << blocks.size();
356 return false;
357 }
358 if (!CheckOpCount(blocks.size())) {
359 return false;
360 }
361 size_t blocks_written = 0;
362 for (size_t blk_index = 0; blk_index < blocks.size(); blk_index++) {
363 CowOperation& op = cached_ops_.emplace_back();
364 auto& vec = data_vec_.emplace_back();
365 CompressedBuffer buffer = std::move(blocks[blk_index]);
366 auto& compressed_data = cached_data_.emplace_back(std::move(buffer.compressed_data));
367 op.new_block = new_block_start + blocks_written;
368
369 op.set_type(type);
370 op.set_compression_bits(std::log2(buffer.compression_factor / header_.block_size));
371
372 if (type == kCowXorOp) {
373 op.set_source((old_block + blocks_written) * header_.block_size + offset);
374 } else {
375 op.set_source(next_data_pos_ + compressed_bytes);
376 }
377
378 vec = {.iov_base = compressed_data.data(), .iov_len = compressed_data.size()};
379 op.data_length = vec.iov_len;
380 compressed_bytes += op.data_length;
381 blocks_written += (buffer.compression_factor / header_.block_size);
382 }
383 if (blocks_written != blocks_to_write) {
384 LOG(ERROR) << "Total compressed blocks: " << blocks_written
385 << " Expected: " << blocks_to_write;
386 return false;
387 }
388 return true;
389 }
390
EmitBlocks(uint64_t new_block_start,const void * data,size_t size,uint64_t old_block,uint16_t offset,CowOperationType type)391 bool CowWriterV3::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
392 uint64_t old_block, uint16_t offset, CowOperationType type) {
393 if (compression_.algorithm != kCowCompressNone && compressor_ == nullptr) {
394 LOG(ERROR) << "Compression algorithm is " << compression_.algorithm
395 << " but compressor is uninitialized.";
396 return false;
397 }
398 const auto bytes = reinterpret_cast<const uint8_t*>(data);
399 size_t num_blocks = (size / header_.block_size);
400 size_t total_written = 0;
401 while (total_written < num_blocks) {
402 size_t chunk = std::min(num_blocks - total_written, batch_size_);
403 if (!ConstructCowOpCompressedBuffers(new_block_start + total_written,
404 bytes + header_.block_size * total_written,
405 old_block + total_written, offset, type, chunk)) {
406 return false;
407 }
408
409 if (NeedsFlush() && !FlushCacheOps()) {
410 LOG(ERROR) << "EmitBlocks with compression: write failed. new block: "
411 << new_block_start << " compression: " << compression_.algorithm
412 << ", op type: " << type;
413 return false;
414 }
415 total_written += chunk;
416 }
417
418 return true;
419 }
420
EmitZeroBlocks(uint64_t new_block_start,const uint64_t num_blocks)421 bool CowWriterV3::EmitZeroBlocks(uint64_t new_block_start, const uint64_t num_blocks) {
422 if (!CheckOpCount(num_blocks)) {
423 return false;
424 }
425 for (uint64_t i = 0; i < num_blocks; i++) {
426 auto& op = cached_ops_.emplace_back();
427 op.set_type(kCowZeroOp);
428 op.new_block = new_block_start + i;
429 }
430 if (NeedsFlush()) {
431 if (!FlushCacheOps()) {
432 return false;
433 }
434 }
435 return true;
436 }
437
EmitLabel(uint64_t label)438 bool CowWriterV3::EmitLabel(uint64_t label) {
439 // remove all labels greater than this current one. we want to avoid the situation of adding
440 // in
441 // duplicate labels with differing op values
442 if (!FlushCacheOps()) {
443 LOG(ERROR) << "Failed to flush cached ops before emitting label " << label;
444 return false;
445 }
446 auto remove_if_callback = [&](const auto& resume_point) -> bool {
447 if (resume_point.label >= label) return true;
448 return false;
449 };
450 resume_points_->erase(
451 std::remove_if(resume_points_->begin(), resume_points_->end(), remove_if_callback),
452 resume_points_->end());
453
454 resume_points_->push_back({label, header_.op_count});
455 header_.resume_point_count++;
456 // remove the oldest resume point if resume_buffer is full
457 while (resume_points_->size() > header_.resume_point_max) {
458 resume_points_->erase(resume_points_->begin());
459 }
460
461 CHECK_LE(resume_points_->size(), header_.resume_point_max);
462
463 if (!android::base::WriteFullyAtOffset(fd_, resume_points_->data(),
464 resume_points_->size() * sizeof(ResumePoint),
465 GetResumeOffset(header_))) {
466 PLOG(ERROR) << "writing resume buffer failed";
467 return false;
468 }
469 return Finalize();
470 }
471
EmitSequenceData(size_t num_ops,const uint32_t * data)472 bool CowWriterV3::EmitSequenceData(size_t num_ops, const uint32_t* data) {
473 if (header_.op_count > 0 || !cached_ops_.empty()) {
474 LOG(ERROR) << "There's " << header_.op_count << " operations written to disk and "
475 << cached_ops_.size()
476 << " ops cached in memory. Writing sequence data is only allowed before all "
477 "operation writes.";
478 return false;
479 }
480
481 header_.sequence_data_count = num_ops;
482
483 // Ensure next_data_pos_ is updated as previously initialized + the newly added sequence
484 // buffer.
485 CHECK_EQ(next_data_pos_ + header_.sequence_data_count * sizeof(uint32_t),
486 GetDataOffset(header_));
487 next_data_pos_ = GetDataOffset(header_);
488
489 if (!android::base::WriteFullyAtOffset(fd_, data, sizeof(data[0]) * num_ops,
490 GetSequenceOffset(header_))) {
491 PLOG(ERROR) << "writing sequence buffer failed";
492 return false;
493 }
494 return true;
495 }
496
FlushCacheOps()497 bool CowWriterV3::FlushCacheOps() {
498 if (cached_ops_.empty()) {
499 if (!data_vec_.empty()) {
500 LOG(ERROR) << "Cached ops is empty, but data iovec has size: " << data_vec_.size()
501 << " this is definitely a bug.";
502 return false;
503 }
504 return true;
505 }
506 size_t bytes_written = 0;
507
508 for (auto& op : cached_ops_) {
509 if (op.type() == kCowReplaceOp) {
510 op.set_source(next_data_pos_ + bytes_written);
511 }
512 bytes_written += op.data_length;
513 }
514 if (!WriteOperation(cached_ops_, data_vec_)) {
515 LOG(ERROR) << "Failed to flush " << cached_ops_.size() << " ops to disk";
516 return false;
517 }
518 cached_ops_.clear();
519 cached_data_.clear();
520 data_vec_.clear();
521 return true;
522 }
523
GetCompressionFactor(const size_t blocks_to_compress,CowOperationType type) const524 size_t CowWriterV3::GetCompressionFactor(const size_t blocks_to_compress,
525 CowOperationType type) const {
526 // For XOR ops, we don't support bigger block size compression yet.
527 // For bigger block size support, snapshot-merge also has to changed. We
528 // aren't there yet; hence, just stick to 4k for now until
529 // snapshot-merge is ready for XOR operation.
530 if (type == kCowXorOp) {
531 return header_.block_size;
532 }
533
534 size_t compression_factor = header_.max_compression_size;
535 while (compression_factor > header_.block_size) {
536 size_t num_blocks = compression_factor / header_.block_size;
537 if (blocks_to_compress >= num_blocks) {
538 return compression_factor;
539 }
540 compression_factor >>= 1;
541 }
542 return header_.block_size;
543 }
544
ProcessBlocksWithNoCompression(const size_t num_blocks,const void * data,CowOperationType type)545 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithNoCompression(
546 const size_t num_blocks, const void* data, CowOperationType type) {
547 size_t blocks_to_compress = num_blocks;
548 const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
549 std::vector<CompressedBuffer> compressed_vec;
550
551 while (blocks_to_compress) {
552 CompressedBuffer buffer;
553
554 const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
555 size_t num_blocks = compression_factor / header_.block_size;
556
557 buffer.compression_factor = compression_factor;
558 buffer.compressed_data.resize(compression_factor);
559
560 // No compression. Just copy the data as-is.
561 std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
562
563 compressed_vec.push_back(std::move(buffer));
564 blocks_to_compress -= num_blocks;
565 iter += compression_factor;
566 }
567 return compressed_vec;
568 }
569
ProcessBlocksWithCompression(const size_t num_blocks,const void * data,CowOperationType type)570 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithCompression(
571 const size_t num_blocks, const void* data, CowOperationType type) {
572 size_t blocks_to_compress = num_blocks;
573 const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
574 std::vector<CompressedBuffer> compressed_vec;
575
576 while (blocks_to_compress) {
577 CompressedBuffer buffer;
578
579 const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
580 size_t num_blocks = compression_factor / header_.block_size;
581
582 buffer.compression_factor = compression_factor;
583 // Compress the blocks
584 buffer.compressed_data = compressor_->Compress(iter, compression_factor);
585 if (buffer.compressed_data.empty()) {
586 PLOG(ERROR) << "Compression failed";
587 return {};
588 }
589
590 // Check if the buffer was indeed compressed
591 if (buffer.compressed_data.size() >= compression_factor) {
592 buffer.compressed_data.resize(compression_factor);
593 std::memcpy(buffer.compressed_data.data(), iter, compression_factor);
594 }
595
596 compressed_vec.push_back(std::move(buffer));
597 blocks_to_compress -= num_blocks;
598 iter += compression_factor;
599 }
600 return compressed_vec;
601 }
602
ProcessBlocksWithThreadedCompression(const size_t num_blocks,const void * data,CowOperationType type)603 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::ProcessBlocksWithThreadedCompression(
604 const size_t num_blocks, const void* data, CowOperationType type) {
605 const size_t num_threads = num_compress_threads_;
606 const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
607
608 // We will alternate which thread to send compress work to. E.g. alternate between T1 and T2
609 // until all blocks are processed
610 std::vector<CompressedBuffer> compressed_vec;
611 int iteration = 0;
612 int blocks_to_compress = static_cast<int>(num_blocks);
613 while (blocks_to_compress) {
614 CompressedBuffer buffer;
615 CompressWorker* worker = compress_threads_[iteration % num_threads].get();
616
617 const size_t compression_factor = GetCompressionFactor(blocks_to_compress, type);
618 size_t num_blocks = compression_factor / header_.block_size;
619
620 worker->EnqueueCompressBlocks(iter, compression_factor, 1);
621 buffer.compression_factor = compression_factor;
622 compressed_vec.push_back(std::move(buffer));
623
624 iteration++;
625 iter += compression_factor;
626 blocks_to_compress -= num_blocks;
627 }
628
629 std::vector<std::vector<uint8_t>> compressed_buf;
630 std::vector<std::vector<std::vector<uint8_t>>> worker_buffers(num_threads);
631 compressed_buf.clear();
632 for (size_t i = 0; i < num_threads; i++) {
633 CompressWorker* worker = compress_threads_[i].get();
634 if (!worker->GetCompressedBuffers(&worker_buffers[i])) {
635 return {};
636 }
637 }
638 // compressed_vec | CB 1 | CB 2 | CB 3 | CB 4 | <-compressed buffers
639 // t1 t2 t1 t2 <- processed by these threads
640 // Ordering is important here. We need to retrieve the compressed data in the same order we
641 // processed it and assume that that we submit data beginning with the first thread and then
642 // round robin the consecutive data calls. We need to Fetch compressed buffers from the
643 // threads via the same ordering
644 for (size_t i = 0; i < compressed_vec.size(); i++) {
645 compressed_buf.emplace_back(worker_buffers[i % num_threads][i / num_threads]);
646 }
647
648 if (compressed_vec.size() != compressed_buf.size()) {
649 LOG(ERROR) << "Compressed buffer size: " << compressed_buf.size()
650 << " - Expected: " << compressed_vec.size();
651 return {};
652 }
653
654 iter = reinterpret_cast<const uint8_t*>(data);
655 // Walk through all the compressed buffers
656 for (size_t i = 0; i < compressed_buf.size(); i++) {
657 auto& buffer = compressed_vec[i];
658 auto& block = compressed_buf[i];
659 size_t block_size = buffer.compression_factor;
660 // Check if the blocks was indeed compressed
661 if (block.size() >= block_size) {
662 buffer.compressed_data.resize(block_size);
663 std::memcpy(buffer.compressed_data.data(), iter, block_size);
664 } else {
665 // Compressed block
666 buffer.compressed_data.resize(block.size());
667 std::memcpy(buffer.compressed_data.data(), block.data(), block.size());
668 }
669 iter += block_size;
670 }
671 return compressed_vec;
672 }
673
CompressBlocks(const size_t num_blocks,const void * data,CowOperationType type)674 std::vector<CowWriterV3::CompressedBuffer> CowWriterV3::CompressBlocks(const size_t num_blocks,
675 const void* data,
676 CowOperationType type) {
677 if (compression_.algorithm == kCowCompressNone) {
678 return ProcessBlocksWithNoCompression(num_blocks, data, type);
679 }
680
681 const size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
682
683 // If no threads are required, just compress the blocks inline.
684 if (num_threads <= 1) {
685 return ProcessBlocksWithCompression(num_blocks, data, type);
686 }
687
688 return ProcessBlocksWithThreadedCompression(num_blocks, data, type);
689 }
690
WriteOperation(std::span<const CowOperationV3> ops,std::span<const struct iovec> data)691 bool CowWriterV3::WriteOperation(std::span<const CowOperationV3> ops,
692 std::span<const struct iovec> data) {
693 const auto total_data_size =
694 std::transform_reduce(data.begin(), data.end(), 0, std::plus<size_t>{},
695 [](const struct iovec& a) { return a.iov_len; });
696 if (IsEstimating()) {
697 header_.op_count += ops.size();
698 if (header_.op_count > header_.op_count_max) {
699 // If we increment op_count_max, the offset of data section would
700 // change. So need to update |next_data_pos_|
701 next_data_pos_ += (header_.op_count - header_.op_count_max) * sizeof(CowOperationV3);
702 header_.op_count_max = header_.op_count;
703 }
704 next_data_pos_ += total_data_size;
705 return true;
706 }
707
708 if (header_.op_count + ops.size() > header_.op_count_max) {
709 LOG(ERROR) << "Current op count " << header_.op_count << ", attempting to write "
710 << ops.size() << " ops will exceed the max of " << header_.op_count_max;
711 return false;
712 }
713 const off_t offset = GetOpOffset(header_.op_count, header_);
714 if (!android::base::WriteFullyAtOffset(fd_, ops.data(), ops.size() * sizeof(ops[0]), offset)) {
715 PLOG(ERROR) << "Write failed for " << ops.size() << " ops at " << offset;
716 return false;
717 }
718 if (!data.empty()) {
719 int total_written = 0;
720 int i = 0;
721 while (i < data.size()) {
722 int chunk = std::min(static_cast<int>(data.size() - i), IOV_MAX);
723
724 const auto ret = pwritev(fd_, data.data() + i, chunk, next_data_pos_ + total_written);
725 if (ret < 0) {
726 PLOG(ERROR) << "write failed chunk size of: " << chunk
727 << " at offset: " << next_data_pos_ + total_written << " " << errno;
728 return false;
729 }
730 total_written += ret;
731 i += chunk;
732 }
733 if (total_written != total_data_size) {
734 PLOG(ERROR) << "write failed for data vector of size: " << data.size()
735 << " and total data length: " << total_data_size
736 << " at offset: " << next_data_pos_ << " " << errno
737 << ", only wrote: " << total_written;
738 return false;
739 }
740 }
741
742 header_.op_count += ops.size();
743 next_data_pos_ += total_data_size;
744
745 return true;
746 }
747
Finalize()748 bool CowWriterV3::Finalize() {
749 CHECK_GE(header_.prefix.header_size, sizeof(CowHeaderV3));
750 CHECK_LE(header_.prefix.header_size, sizeof(header_));
751 if (!FlushCacheOps()) {
752 return false;
753 }
754 if (!android::base::WriteFullyAtOffset(fd_, &header_, header_.prefix.header_size, 0)) {
755 return false;
756 }
757 return Sync();
758 }
759
GetCowSizeInfo() const760 CowSizeInfo CowWriterV3::GetCowSizeInfo() const {
761 CowSizeInfo info;
762 info.cow_size = next_data_pos_;
763 info.op_count_max = header_.op_count_max;
764 return info;
765 }
766
767 } // namespace snapshot
768 } // namespace android
769