1 //
2 // Copyright (C) 2020 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 // http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16
17 #include "writer_v2.h"
18
19 #include <sys/types.h>
20 #include <sys/uio.h>
21 #include <unistd.h>
22
23 #include <future>
24 #include <limits>
25
26 #include <android-base/file.h>
27 #include <android-base/logging.h>
28 #include <android-base/properties.h>
29 #include <android-base/unique_fd.h>
30 #include <brotli/encode.h>
31 #include <libsnapshot/cow_format.h>
32 #include <libsnapshot/cow_reader.h>
33 #include <libsnapshot/cow_writer.h>
34 #include <lz4.h>
35 #include <zlib.h>
36
37 #include <fcntl.h>
38 #include <linux/fs.h>
39 #include <sys/ioctl.h>
40 #include <unistd.h>
41
42 #include "android-base/parseint.h"
43 #include "android-base/strings.h"
44 #include "parser_v2.h"
45
46 // The info messages here are spammy, but as useful for update_engine. Disable
47 // them when running on the host.
48 #ifdef __ANDROID__
49 #define LOG_INFO LOG(INFO)
50 #else
51 #define LOG_INFO LOG(VERBOSE)
52 #endif
53
54 namespace android {
55 namespace snapshot {
56
57 static_assert(sizeof(off_t) == sizeof(uint64_t));
58
59 using android::base::unique_fd;
60
CowWriterV2(const CowOptions & options,unique_fd && fd)61 CowWriterV2::CowWriterV2(const CowOptions& options, unique_fd&& fd)
62 : CowWriterBase(options, std::move(fd)) {
63 SetupHeaders();
64 SetupWriteOptions();
65 }
66
~CowWriterV2()67 CowWriterV2::~CowWriterV2() {
68 for (size_t i = 0; i < compress_threads_.size(); i++) {
69 CompressWorker* worker = compress_threads_[i].get();
70 if (worker) {
71 worker->Finalize();
72 }
73 }
74
75 bool ret = true;
76 for (auto& t : threads_) {
77 ret = t.get() && ret;
78 }
79
80 if (!ret) {
81 LOG(ERROR) << "Compression failed";
82 }
83 compress_threads_.clear();
84 }
85
SetupWriteOptions()86 void CowWriterV2::SetupWriteOptions() {
87 num_compress_threads_ = options_.num_compress_threads;
88
89 if (!num_compress_threads_) {
90 num_compress_threads_ = 1;
91 // We prefer not to have more than two threads as the overhead of additional
92 // threads is far greater than cutting down compression time.
93 if (header_.cluster_ops &&
94 android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
95 num_compress_threads_ = 2;
96 }
97 }
98
99 if (header_.cluster_ops &&
100 (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
101 options_.batch_write)) {
102 batch_write_ = true;
103 }
104 }
105
SetupHeaders()106 void CowWriterV2::SetupHeaders() {
107 header_ = {};
108 header_.prefix.magic = kCowMagicNumber;
109 header_.prefix.major_version = kCowVersionMajor;
110 header_.prefix.minor_version = kCowVersionMinor;
111 header_.prefix.header_size = sizeof(CowHeader);
112 header_.footer_size = sizeof(CowFooter);
113 header_.op_size = sizeof(CowOperationV2);
114 header_.block_size = options_.block_size;
115 header_.num_merge_ops = options_.num_merge_ops;
116 header_.cluster_ops = options_.cluster_ops;
117 header_.buffer_size = 0;
118 footer_ = {};
119 footer_.op.data_length = 64;
120 footer_.op.type = kCowFooterOp;
121 }
122
ParseOptions()123 bool CowWriterV2::ParseOptions() {
124 auto parts = android::base::Split(options_.compression, ",");
125
126 if (parts.size() > 2) {
127 LOG(ERROR) << "failed to parse compression parameters: invalid argument count: "
128 << parts.size() << " " << options_.compression;
129 return false;
130 }
131 auto algorithm = CompressionAlgorithmFromString(parts[0]);
132 if (!algorithm) {
133 LOG(ERROR) << "unrecognized compression: " << options_.compression;
134 return false;
135 }
136 if (parts.size() > 1) {
137 if (!android::base::ParseInt(parts[1], &compression_.compression_level)) {
138 LOG(ERROR) << "failed to parse compression level invalid type: " << parts[1];
139 return false;
140 }
141 } else {
142 compression_.compression_level =
143 CompressWorker::GetDefaultCompressionLevel(algorithm.value());
144 }
145
146 compression_.algorithm = *algorithm;
147
148 if (options_.cluster_ops == 1) {
149 LOG(ERROR) << "Clusters must contain at least two operations to function.";
150 return false;
151 }
152 return true;
153 }
154
InitBatchWrites()155 void CowWriterV2::InitBatchWrites() {
156 if (batch_write_) {
157 cowop_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
158 data_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
159 struct iovec* cowop_ptr = cowop_vec_.get();
160 struct iovec* data_ptr = data_vec_.get();
161 for (size_t i = 0; i < header_.cluster_ops; i++) {
162 std::unique_ptr<CowOperationV2> op = std::make_unique<CowOperationV2>();
163 cowop_ptr[i].iov_base = op.get();
164 cowop_ptr[i].iov_len = sizeof(CowOperationV2);
165 opbuffer_vec_.push_back(std::move(op));
166
167 std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(header_.block_size * 2);
168 data_ptr[i].iov_base = buffer.get();
169 data_ptr[i].iov_len = header_.block_size * 2;
170 databuffer_vec_.push_back(std::move(buffer));
171 }
172
173 current_op_pos_ = next_op_pos_;
174 current_data_pos_ = next_data_pos_;
175 }
176
177 LOG_INFO << "Batch writes: " << (batch_write_ ? "enabled" : "disabled");
178 }
179
InitWorkers()180 void CowWriterV2::InitWorkers() {
181 if (num_compress_threads_ <= 1) {
182 LOG_INFO << "Not creating new threads for compression.";
183 return;
184 }
185 for (int i = 0; i < num_compress_threads_; i++) {
186 std::unique_ptr<ICompressor> compressor =
187 ICompressor::Create(compression_, header_.block_size);
188 auto wt = std::make_unique<CompressWorker>(std::move(compressor));
189 threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
190 compress_threads_.push_back(std::move(wt));
191 }
192
193 LOG_INFO << num_compress_threads_ << " thread used for compression";
194 }
195
Initialize(std::optional<uint64_t> label)196 bool CowWriterV2::Initialize(std::optional<uint64_t> label) {
197 if (!InitFd() || !ParseOptions()) {
198 return false;
199 }
200 if (!label) {
201 if (!OpenForWrite()) {
202 return false;
203 }
204 } else {
205 if (!OpenForAppend(*label)) {
206 return false;
207 }
208 }
209
210 if (!compress_threads_.size()) {
211 InitWorkers();
212 }
213 return true;
214 }
215
InitPos()216 void CowWriterV2::InitPos() {
217 next_op_pos_ = sizeof(CowHeader) + header_.buffer_size;
218 cluster_size_ = header_.cluster_ops * sizeof(CowOperationV2);
219 if (header_.cluster_ops) {
220 next_data_pos_ = next_op_pos_ + cluster_size_;
221 } else {
222 next_data_pos_ = next_op_pos_ + sizeof(CowOperationV2);
223 }
224 current_cluster_size_ = 0;
225 current_data_size_ = 0;
226 }
227
OpenForWrite()228 bool CowWriterV2::OpenForWrite() {
229 // This limitation is tied to the data field size in CowOperationV2.
230 if (header_.block_size > std::numeric_limits<uint16_t>::max()) {
231 LOG(ERROR) << "Block size is too large";
232 return false;
233 }
234
235 if (lseek(fd_.get(), 0, SEEK_SET) < 0) {
236 PLOG(ERROR) << "lseek failed";
237 return false;
238 }
239
240 if (options_.scratch_space) {
241 header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE;
242 }
243
244 // Headers are not complete, but this ensures the file is at the right
245 // position.
246 if (!android::base::WriteFully(fd_, &header_, sizeof(CowHeader))) {
247 PLOG(ERROR) << "write failed";
248 return false;
249 }
250
251 if (options_.scratch_space) {
252 // Initialize the scratch space
253 std::string data(header_.buffer_size, 0);
254 if (!android::base::WriteFully(fd_, data.data(), header_.buffer_size)) {
255 PLOG(ERROR) << "writing scratch space failed";
256 return false;
257 }
258 }
259
260 if (!Sync()) {
261 LOG(ERROR) << "Header sync failed";
262 return false;
263 }
264
265 InitPos();
266 InitBatchWrites();
267
268 return true;
269 }
270
OpenForAppend(uint64_t label)271 bool CowWriterV2::OpenForAppend(uint64_t label) {
272 CowHeaderV3 header_v3;
273 if (!ReadCowHeader(fd_, &header_v3)) {
274 return false;
275 }
276
277 header_ = header_v3;
278
279 CowParserV2 parser;
280 if (!parser.Parse(fd_, header_v3, {label})) {
281 return false;
282 }
283 if (header_.prefix.major_version > 2) {
284 LOG(ERROR) << "CowWriterV2 tried to open incompatible version "
285 << header_.prefix.major_version;
286 return false;
287 }
288
289 options_.block_size = header_.block_size;
290 options_.cluster_ops = header_.cluster_ops;
291
292 // Reset this, since we're going to reimport all operations.
293 footer_.op.num_ops = 0;
294 InitPos();
295
296 for (const auto& op : *parser.get_v2ops()) {
297 AddOperation(op);
298 }
299
300 if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
301 PLOG(ERROR) << "lseek failed";
302 return false;
303 }
304
305 InitBatchWrites();
306
307 return EmitClusterIfNeeded();
308 }
309
EmitCopy(uint64_t new_block,uint64_t old_block,uint64_t num_blocks)310 bool CowWriterV2::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) {
311 CHECK(!merge_in_progress_);
312
313 for (size_t i = 0; i < num_blocks; i++) {
314 CowOperationV2 op = {};
315 op.type = kCowCopyOp;
316 op.new_block = new_block + i;
317 op.source = old_block + i;
318 if (!WriteOperation(op)) {
319 return false;
320 }
321 }
322
323 return true;
324 }
325
EmitRawBlocks(uint64_t new_block_start,const void * data,size_t size)326 bool CowWriterV2::EmitRawBlocks(uint64_t new_block_start, const void* data, size_t size) {
327 return EmitBlocks(new_block_start, data, size, 0, 0, kCowReplaceOp);
328 }
329
EmitXorBlocks(uint32_t new_block_start,const void * data,size_t size,uint32_t old_block,uint16_t offset)330 bool CowWriterV2::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t size,
331 uint32_t old_block, uint16_t offset) {
332 return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
333 }
334
CompressBlocks(size_t num_blocks,const void * data)335 bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) {
336 size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
337 size_t num_blocks_per_thread = num_blocks / num_threads;
338 const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
339 compressed_buf_.clear();
340 if (num_threads <= 1) {
341 if (!compressor_) {
342 compressor_ = ICompressor::Create(compression_, header_.block_size);
343 }
344 return CompressWorker::CompressBlocks(compressor_.get(), options_.block_size, data,
345 num_blocks, &compressed_buf_);
346 }
347 // Submit the blocks per thread. The retrieval of
348 // compressed buffers has to be done in the same order.
349 // We should not poll for completed buffers in a different order as the
350 // buffers are tightly coupled with block ordering.
351 for (size_t i = 0; i < num_threads; i++) {
352 CompressWorker* worker = compress_threads_[i].get();
353 if (i == num_threads - 1) {
354 num_blocks_per_thread = num_blocks;
355 }
356 worker->EnqueueCompressBlocks(iter, header_.block_size, num_blocks_per_thread);
357 iter += (num_blocks_per_thread * header_.block_size);
358 num_blocks -= num_blocks_per_thread;
359 }
360
361 for (size_t i = 0; i < num_threads; i++) {
362 CompressWorker* worker = compress_threads_[i].get();
363 if (!worker->GetCompressedBuffers(&compressed_buf_)) {
364 return false;
365 }
366 }
367
368 return true;
369 }
370
EmitBlocks(uint64_t new_block_start,const void * data,size_t size,uint64_t old_block,uint16_t offset,CowOperationType type)371 bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
372 uint64_t old_block, uint16_t offset, CowOperationType type) {
373 CHECK(!merge_in_progress_);
374 const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
375
376 // Update engine can potentially send 100MB of blocks at a time. We
377 // don't want to process all those blocks in one shot as it can
378 // stress the memory. Hence, process the blocks in chunks.
379 //
380 // 1024 blocks is reasonable given we will end up using max
381 // memory of ~4MB.
382 const size_t kProcessingBlocks = 1024;
383 size_t num_blocks = (size / header_.block_size);
384 size_t i = 0;
385
386 while (num_blocks) {
387 size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
388
389 if (compression_.algorithm && num_compress_threads_ > 1) {
390 if (!CompressBlocks(pending_blocks, iter)) {
391 return false;
392 }
393 buf_iter_ = compressed_buf_.begin();
394 CHECK(pending_blocks == compressed_buf_.size());
395 }
396
397 num_blocks -= pending_blocks;
398
399 while (i < size / header_.block_size && pending_blocks) {
400 CowOperationV2 op = {};
401 op.new_block = new_block_start + i;
402 op.type = type;
403 if (type == kCowXorOp) {
404 op.source = (old_block + i) * header_.block_size + offset;
405 } else {
406 op.source = next_data_pos_;
407 }
408
409 if (compression_.algorithm) {
410 auto data = [&, this]() {
411 if (num_compress_threads_ > 1) {
412 auto data = std::move(*buf_iter_);
413 buf_iter_++;
414 return data;
415 } else {
416 if (!compressor_) {
417 compressor_ = ICompressor::Create(compression_, header_.block_size);
418 }
419
420 auto data = compressor_->Compress(iter, header_.block_size);
421 return data;
422 }
423 }();
424 op.compression = compression_.algorithm;
425 op.data_length = static_cast<uint16_t>(data.size());
426
427 if (!WriteOperation(op, data.data(), data.size())) {
428 PLOG(ERROR) << "AddRawBlocks: write failed";
429 return false;
430 }
431 } else {
432 op.data_length = static_cast<uint16_t>(header_.block_size);
433 if (!WriteOperation(op, iter, header_.block_size)) {
434 PLOG(ERROR) << "AddRawBlocks: write failed";
435 return false;
436 }
437 }
438 iter += header_.block_size;
439
440 i += 1;
441 pending_blocks -= 1;
442 }
443
444 CHECK(pending_blocks == 0);
445 }
446 return true;
447 }
448
EmitZeroBlocks(uint64_t new_block_start,uint64_t num_blocks)449 bool CowWriterV2::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
450 CHECK(!merge_in_progress_);
451 for (uint64_t i = 0; i < num_blocks; i++) {
452 CowOperationV2 op = {};
453 op.type = kCowZeroOp;
454 op.new_block = new_block_start + i;
455 op.source = 0;
456 WriteOperation(op);
457 }
458 return true;
459 }
460
EmitLabel(uint64_t label)461 bool CowWriterV2::EmitLabel(uint64_t label) {
462 CHECK(!merge_in_progress_);
463 CowOperationV2 op = {};
464 op.type = kCowLabelOp;
465 op.source = label;
466 return WriteOperation(op) && Sync();
467 }
468
EmitSequenceData(size_t num_ops,const uint32_t * data)469 bool CowWriterV2::EmitSequenceData(size_t num_ops, const uint32_t* data) {
470 CHECK(!merge_in_progress_);
471 size_t to_add = 0;
472 size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t);
473 while (num_ops > 0) {
474 CowOperationV2 op = {};
475 op.type = kCowSequenceOp;
476 op.source = next_data_pos_;
477 to_add = std::min(num_ops, max_ops);
478 op.data_length = static_cast<uint16_t>(to_add * sizeof(uint32_t));
479 if (!WriteOperation(op, data, op.data_length)) {
480 PLOG(ERROR) << "AddSequenceData: write failed";
481 return false;
482 }
483 num_ops -= to_add;
484 data += to_add;
485 }
486 return true;
487 }
488
EmitCluster()489 bool CowWriterV2::EmitCluster() {
490 CowOperationV2 op = {};
491 op.type = kCowClusterOp;
492 // Next cluster starts after remainder of current cluster and the next data block.
493 op.source = current_data_size_ + cluster_size_ - current_cluster_size_ - sizeof(CowOperationV2);
494 return WriteOperation(op);
495 }
496
EmitClusterIfNeeded()497 bool CowWriterV2::EmitClusterIfNeeded() {
498 // If there isn't room for another op and the cluster end op, end the current cluster
499 if (cluster_size_ && cluster_size_ < current_cluster_size_ + 2 * sizeof(CowOperationV2)) {
500 if (!EmitCluster()) return false;
501 }
502 return true;
503 }
504
Finalize()505 bool CowWriterV2::Finalize() {
506 if (!FlushCluster()) {
507 LOG(ERROR) << "Finalize: FlushCluster() failed";
508 return false;
509 }
510
511 auto continue_cluster_size = current_cluster_size_;
512 auto continue_data_size = current_data_size_;
513 auto continue_data_pos = next_data_pos_;
514 auto continue_op_pos = next_op_pos_;
515 auto continue_num_ops = footer_.op.num_ops;
516 bool extra_cluster = false;
517
518 // Blank out extra ops, in case we're in append mode and dropped ops.
519 if (cluster_size_) {
520 auto unused_cluster_space = cluster_size_ - current_cluster_size_;
521 std::string clr;
522 clr.resize(unused_cluster_space, '\0');
523 if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
524 PLOG(ERROR) << "Failed to seek to footer position.";
525 return false;
526 }
527 if (!android::base::WriteFully(fd_, clr.data(), clr.size())) {
528 PLOG(ERROR) << "clearing unused cluster area failed";
529 return false;
530 }
531 }
532
533 // Footer should be at the end of a file, so if there is data after the current block, end
534 // it and start a new cluster.
535 if (cluster_size_ && current_data_size_ > 0) {
536 EmitCluster();
537 extra_cluster = true;
538 }
539
540 footer_.op.ops_size = footer_.op.num_ops * sizeof(CowOperationV2);
541 if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
542 PLOG(ERROR) << "Failed to seek to footer position.";
543 return false;
544 }
545 memset(&footer_.unused, 0, sizeof(footer_.unused));
546
547 // Write out footer at end of file
548 if (!android::base::WriteFully(fd_, reinterpret_cast<const uint8_t*>(&footer_),
549 sizeof(footer_))) {
550 PLOG(ERROR) << "write footer failed";
551 return false;
552 }
553
554 // Remove excess data, if we're in append mode and threw away more data
555 // than we wrote before.
556 off_t offs = lseek(fd_.get(), 0, SEEK_CUR);
557 if (offs < 0) {
558 PLOG(ERROR) << "Failed to lseek to find current position";
559 return false;
560 }
561 if (!Truncate(offs)) {
562 return false;
563 }
564
565 // Reposition for additional Writing
566 if (extra_cluster) {
567 current_cluster_size_ = continue_cluster_size;
568 current_data_size_ = continue_data_size;
569 next_data_pos_ = continue_data_pos;
570 next_op_pos_ = continue_op_pos;
571 footer_.op.num_ops = continue_num_ops;
572 }
573
574 FlushCluster();
575
576 return Sync();
577 }
578
GetCowSizeInfo() const579 CowSizeInfo CowWriterV2::GetCowSizeInfo() const {
580 CowSizeInfo info;
581 if (current_data_size_ > 0) {
582 info.cow_size = next_data_pos_ + sizeof(footer_);
583 } else {
584 info.cow_size = next_op_pos_ + sizeof(footer_);
585 }
586 return info;
587 }
588
GetDataPos(uint64_t * pos)589 bool CowWriterV2::GetDataPos(uint64_t* pos) {
590 off_t offs = lseek(fd_.get(), 0, SEEK_CUR);
591 if (offs < 0) {
592 PLOG(ERROR) << "lseek failed";
593 return false;
594 }
595 *pos = offs;
596 return true;
597 }
598
EnsureSpaceAvailable(const uint64_t bytes_needed) const599 bool CowWriterV2::EnsureSpaceAvailable(const uint64_t bytes_needed) const {
600 if (bytes_needed > cow_image_size_) {
601 LOG(ERROR) << "No space left on COW device. Required: " << bytes_needed
602 << ", available: " << cow_image_size_;
603 errno = ENOSPC;
604 return false;
605 }
606 return true;
607 }
608
FlushCluster()609 bool CowWriterV2::FlushCluster() {
610 ssize_t ret;
611
612 if (op_vec_index_) {
613 ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_);
614 if (ret != (op_vec_index_ * sizeof(CowOperationV2))) {
615 PLOG(ERROR) << "pwritev failed for CowOperationV2. Expected: "
616 << (op_vec_index_ * sizeof(CowOperationV2));
617 return false;
618 }
619 }
620
621 if (data_vec_index_) {
622 ret = pwritev(fd_.get(), data_vec_.get(), data_vec_index_, current_data_pos_);
623 if (ret != total_data_written_) {
624 PLOG(ERROR) << "pwritev failed for data. Expected: " << total_data_written_;
625 return false;
626 }
627 }
628
629 total_data_written_ = 0;
630 op_vec_index_ = 0;
631 data_vec_index_ = 0;
632 current_op_pos_ = next_op_pos_;
633 current_data_pos_ = next_data_pos_;
634
635 return true;
636 }
637
WriteOperation(const CowOperationV2 & op,const void * data,size_t size)638 bool CowWriterV2::WriteOperation(const CowOperationV2& op, const void* data, size_t size) {
639 if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op)) ||
640 !EnsureSpaceAvailable(next_data_pos_ + size)) {
641 return false;
642 }
643
644 if (batch_write_) {
645 CowOperationV2* cow_op =
646 reinterpret_cast<CowOperationV2*>(cowop_vec_[op_vec_index_].iov_base);
647 std::memcpy(cow_op, &op, sizeof(CowOperationV2));
648 op_vec_index_ += 1;
649
650 if (data != nullptr && size > 0) {
651 struct iovec* data_ptr = data_vec_.get();
652 std::memcpy(data_ptr[data_vec_index_].iov_base, data, size);
653 data_ptr[data_vec_index_].iov_len = size;
654 data_vec_index_ += 1;
655 total_data_written_ += size;
656 }
657 } else {
658 if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
659 PLOG(ERROR) << "lseek failed for writing operation.";
660 return false;
661 }
662 if (!android::base::WriteFully(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op))) {
663 return false;
664 }
665 if (data != nullptr && size > 0) {
666 if (!WriteRawData(data, size)) return false;
667 }
668 }
669
670 AddOperation(op);
671
672 if (batch_write_) {
673 if (op_vec_index_ == header_.cluster_ops || data_vec_index_ == header_.cluster_ops ||
674 op.type == kCowLabelOp || op.type == kCowClusterOp) {
675 if (!FlushCluster()) {
676 LOG(ERROR) << "Failed to flush cluster data";
677 return false;
678 }
679 }
680 }
681
682 return EmitClusterIfNeeded();
683 }
684
AddOperation(const CowOperationV2 & op)685 void CowWriterV2::AddOperation(const CowOperationV2& op) {
686 footer_.op.num_ops++;
687
688 if (op.type == kCowClusterOp) {
689 current_cluster_size_ = 0;
690 current_data_size_ = 0;
691 } else if (header_.cluster_ops) {
692 current_cluster_size_ += sizeof(op);
693 current_data_size_ += op.data_length;
694 }
695
696 next_data_pos_ += op.data_length + GetNextDataOffset(op, header_.cluster_ops);
697 next_op_pos_ += sizeof(CowOperationV2) + GetNextOpOffset(op, header_.cluster_ops);
698 }
699
WriteRawData(const void * data,const size_t size)700 bool CowWriterV2::WriteRawData(const void* data, const size_t size) {
701 if (!android::base::WriteFullyAtOffset(fd_, data, size, next_data_pos_)) {
702 return false;
703 }
704 return true;
705 }
706
Truncate(off_t length)707 bool CowWriterV2::Truncate(off_t length) {
708 if (is_dev_null_ || is_block_device_) {
709 return true;
710 }
711 if (ftruncate(fd_.get(), length) < 0) {
712 PLOG(ERROR) << "Failed to truncate.";
713 return false;
714 }
715 return true;
716 }
717
718 } // namespace snapshot
719 } // namespace android
720