1 //
2 // Copyright (C) 2020 The Android Open Source Project
3 //
4 // Licensed under the Apache License, Version 2.0 (the "License");
5 // you may not use this file except in compliance with the License.
6 // You may obtain a copy of the License at
7 //
8 //      http://www.apache.org/licenses/LICENSE-2.0
9 //
10 // Unless required by applicable law or agreed to in writing, software
11 // distributed under the License is distributed on an "AS IS" BASIS,
12 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 // See the License for the specific language governing permissions and
14 // limitations under the License.
15 //
16 
17 #include "writer_v2.h"
18 
19 #include <sys/types.h>
20 #include <sys/uio.h>
21 #include <unistd.h>
22 
23 #include <future>
24 #include <limits>
25 
26 #include <android-base/file.h>
27 #include <android-base/logging.h>
28 #include <android-base/properties.h>
29 #include <android-base/unique_fd.h>
30 #include <brotli/encode.h>
31 #include <libsnapshot/cow_format.h>
32 #include <libsnapshot/cow_reader.h>
33 #include <libsnapshot/cow_writer.h>
34 #include <lz4.h>
35 #include <zlib.h>
36 
37 #include <fcntl.h>
38 #include <linux/fs.h>
39 #include <sys/ioctl.h>
40 #include <unistd.h>
41 
42 #include "android-base/parseint.h"
43 #include "android-base/strings.h"
44 #include "parser_v2.h"
45 
46 // The info messages here are spammy, but as useful for update_engine. Disable
47 // them when running on the host.
48 #ifdef __ANDROID__
49 #define LOG_INFO LOG(INFO)
50 #else
51 #define LOG_INFO LOG(VERBOSE)
52 #endif
53 
54 namespace android {
55 namespace snapshot {
56 
57 static_assert(sizeof(off_t) == sizeof(uint64_t));
58 
59 using android::base::unique_fd;
60 
CowWriterV2(const CowOptions & options,unique_fd && fd)61 CowWriterV2::CowWriterV2(const CowOptions& options, unique_fd&& fd)
62     : CowWriterBase(options, std::move(fd)) {
63     SetupHeaders();
64     SetupWriteOptions();
65 }
66 
~CowWriterV2()67 CowWriterV2::~CowWriterV2() {
68     for (size_t i = 0; i < compress_threads_.size(); i++) {
69         CompressWorker* worker = compress_threads_[i].get();
70         if (worker) {
71             worker->Finalize();
72         }
73     }
74 
75     bool ret = true;
76     for (auto& t : threads_) {
77         ret = t.get() && ret;
78     }
79 
80     if (!ret) {
81         LOG(ERROR) << "Compression failed";
82     }
83     compress_threads_.clear();
84 }
85 
SetupWriteOptions()86 void CowWriterV2::SetupWriteOptions() {
87     num_compress_threads_ = options_.num_compress_threads;
88 
89     if (!num_compress_threads_) {
90         num_compress_threads_ = 1;
91         // We prefer not to have more than two threads as the overhead of additional
92         // threads is far greater than cutting down compression time.
93         if (header_.cluster_ops &&
94             android::base::GetBoolProperty("ro.virtual_ab.compression.threads", false)) {
95             num_compress_threads_ = 2;
96         }
97     }
98 
99     if (header_.cluster_ops &&
100         (android::base::GetBoolProperty("ro.virtual_ab.batch_writes", false) ||
101          options_.batch_write)) {
102         batch_write_ = true;
103     }
104 }
105 
SetupHeaders()106 void CowWriterV2::SetupHeaders() {
107     header_ = {};
108     header_.prefix.magic = kCowMagicNumber;
109     header_.prefix.major_version = kCowVersionMajor;
110     header_.prefix.minor_version = kCowVersionMinor;
111     header_.prefix.header_size = sizeof(CowHeader);
112     header_.footer_size = sizeof(CowFooter);
113     header_.op_size = sizeof(CowOperationV2);
114     header_.block_size = options_.block_size;
115     header_.num_merge_ops = options_.num_merge_ops;
116     header_.cluster_ops = options_.cluster_ops;
117     header_.buffer_size = 0;
118     footer_ = {};
119     footer_.op.data_length = 64;
120     footer_.op.type = kCowFooterOp;
121 }
122 
ParseOptions()123 bool CowWriterV2::ParseOptions() {
124     auto parts = android::base::Split(options_.compression, ",");
125 
126     if (parts.size() > 2) {
127         LOG(ERROR) << "failed to parse compression parameters: invalid argument count: "
128                    << parts.size() << " " << options_.compression;
129         return false;
130     }
131     auto algorithm = CompressionAlgorithmFromString(parts[0]);
132     if (!algorithm) {
133         LOG(ERROR) << "unrecognized compression: " << options_.compression;
134         return false;
135     }
136     if (parts.size() > 1) {
137         if (!android::base::ParseInt(parts[1], &compression_.compression_level)) {
138             LOG(ERROR) << "failed to parse compression level invalid type: " << parts[1];
139             return false;
140         }
141     } else {
142         compression_.compression_level =
143                 CompressWorker::GetDefaultCompressionLevel(algorithm.value());
144     }
145 
146     compression_.algorithm = *algorithm;
147 
148     if (options_.cluster_ops == 1) {
149         LOG(ERROR) << "Clusters must contain at least two operations to function.";
150         return false;
151     }
152     return true;
153 }
154 
InitBatchWrites()155 void CowWriterV2::InitBatchWrites() {
156     if (batch_write_) {
157         cowop_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
158         data_vec_ = std::make_unique<struct iovec[]>(header_.cluster_ops);
159         struct iovec* cowop_ptr = cowop_vec_.get();
160         struct iovec* data_ptr = data_vec_.get();
161         for (size_t i = 0; i < header_.cluster_ops; i++) {
162             std::unique_ptr<CowOperationV2> op = std::make_unique<CowOperationV2>();
163             cowop_ptr[i].iov_base = op.get();
164             cowop_ptr[i].iov_len = sizeof(CowOperationV2);
165             opbuffer_vec_.push_back(std::move(op));
166 
167             std::unique_ptr<uint8_t[]> buffer = std::make_unique<uint8_t[]>(header_.block_size * 2);
168             data_ptr[i].iov_base = buffer.get();
169             data_ptr[i].iov_len = header_.block_size * 2;
170             databuffer_vec_.push_back(std::move(buffer));
171         }
172 
173         current_op_pos_ = next_op_pos_;
174         current_data_pos_ = next_data_pos_;
175     }
176 
177     LOG_INFO << "Batch writes: " << (batch_write_ ? "enabled" : "disabled");
178 }
179 
InitWorkers()180 void CowWriterV2::InitWorkers() {
181     if (num_compress_threads_ <= 1) {
182         LOG_INFO << "Not creating new threads for compression.";
183         return;
184     }
185     for (int i = 0; i < num_compress_threads_; i++) {
186         std::unique_ptr<ICompressor> compressor =
187                 ICompressor::Create(compression_, header_.block_size);
188         auto wt = std::make_unique<CompressWorker>(std::move(compressor));
189         threads_.emplace_back(std::async(std::launch::async, &CompressWorker::RunThread, wt.get()));
190         compress_threads_.push_back(std::move(wt));
191     }
192 
193     LOG_INFO << num_compress_threads_ << " thread used for compression";
194 }
195 
Initialize(std::optional<uint64_t> label)196 bool CowWriterV2::Initialize(std::optional<uint64_t> label) {
197     if (!InitFd() || !ParseOptions()) {
198         return false;
199     }
200     if (!label) {
201         if (!OpenForWrite()) {
202             return false;
203         }
204     } else {
205         if (!OpenForAppend(*label)) {
206             return false;
207         }
208     }
209 
210     if (!compress_threads_.size()) {
211         InitWorkers();
212     }
213     return true;
214 }
215 
InitPos()216 void CowWriterV2::InitPos() {
217     next_op_pos_ = sizeof(CowHeader) + header_.buffer_size;
218     cluster_size_ = header_.cluster_ops * sizeof(CowOperationV2);
219     if (header_.cluster_ops) {
220         next_data_pos_ = next_op_pos_ + cluster_size_;
221     } else {
222         next_data_pos_ = next_op_pos_ + sizeof(CowOperationV2);
223     }
224     current_cluster_size_ = 0;
225     current_data_size_ = 0;
226 }
227 
OpenForWrite()228 bool CowWriterV2::OpenForWrite() {
229     // This limitation is tied to the data field size in CowOperationV2.
230     if (header_.block_size > std::numeric_limits<uint16_t>::max()) {
231         LOG(ERROR) << "Block size is too large";
232         return false;
233     }
234 
235     if (lseek(fd_.get(), 0, SEEK_SET) < 0) {
236         PLOG(ERROR) << "lseek failed";
237         return false;
238     }
239 
240     if (options_.scratch_space) {
241         header_.buffer_size = BUFFER_REGION_DEFAULT_SIZE;
242     }
243 
244     // Headers are not complete, but this ensures the file is at the right
245     // position.
246     if (!android::base::WriteFully(fd_, &header_, sizeof(CowHeader))) {
247         PLOG(ERROR) << "write failed";
248         return false;
249     }
250 
251     if (options_.scratch_space) {
252         // Initialize the scratch space
253         std::string data(header_.buffer_size, 0);
254         if (!android::base::WriteFully(fd_, data.data(), header_.buffer_size)) {
255             PLOG(ERROR) << "writing scratch space failed";
256             return false;
257         }
258     }
259 
260     if (!Sync()) {
261         LOG(ERROR) << "Header sync failed";
262         return false;
263     }
264 
265     InitPos();
266     InitBatchWrites();
267 
268     return true;
269 }
270 
OpenForAppend(uint64_t label)271 bool CowWriterV2::OpenForAppend(uint64_t label) {
272     CowHeaderV3 header_v3;
273     if (!ReadCowHeader(fd_, &header_v3)) {
274         return false;
275     }
276 
277     header_ = header_v3;
278 
279     CowParserV2 parser;
280     if (!parser.Parse(fd_, header_v3, {label})) {
281         return false;
282     }
283     if (header_.prefix.major_version > 2) {
284         LOG(ERROR) << "CowWriterV2 tried to open incompatible version "
285                    << header_.prefix.major_version;
286         return false;
287     }
288 
289     options_.block_size = header_.block_size;
290     options_.cluster_ops = header_.cluster_ops;
291 
292     // Reset this, since we're going to reimport all operations.
293     footer_.op.num_ops = 0;
294     InitPos();
295 
296     for (const auto& op : *parser.get_v2ops()) {
297         AddOperation(op);
298     }
299 
300     if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
301         PLOG(ERROR) << "lseek failed";
302         return false;
303     }
304 
305     InitBatchWrites();
306 
307     return EmitClusterIfNeeded();
308 }
309 
EmitCopy(uint64_t new_block,uint64_t old_block,uint64_t num_blocks)310 bool CowWriterV2::EmitCopy(uint64_t new_block, uint64_t old_block, uint64_t num_blocks) {
311     CHECK(!merge_in_progress_);
312 
313     for (size_t i = 0; i < num_blocks; i++) {
314         CowOperationV2 op = {};
315         op.type = kCowCopyOp;
316         op.new_block = new_block + i;
317         op.source = old_block + i;
318         if (!WriteOperation(op)) {
319             return false;
320         }
321     }
322 
323     return true;
324 }
325 
EmitRawBlocks(uint64_t new_block_start,const void * data,size_t size)326 bool CowWriterV2::EmitRawBlocks(uint64_t new_block_start, const void* data, size_t size) {
327     return EmitBlocks(new_block_start, data, size, 0, 0, kCowReplaceOp);
328 }
329 
EmitXorBlocks(uint32_t new_block_start,const void * data,size_t size,uint32_t old_block,uint16_t offset)330 bool CowWriterV2::EmitXorBlocks(uint32_t new_block_start, const void* data, size_t size,
331                                 uint32_t old_block, uint16_t offset) {
332     return EmitBlocks(new_block_start, data, size, old_block, offset, kCowXorOp);
333 }
334 
CompressBlocks(size_t num_blocks,const void * data)335 bool CowWriterV2::CompressBlocks(size_t num_blocks, const void* data) {
336     size_t num_threads = (num_blocks == 1) ? 1 : num_compress_threads_;
337     size_t num_blocks_per_thread = num_blocks / num_threads;
338     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
339     compressed_buf_.clear();
340     if (num_threads <= 1) {
341         if (!compressor_) {
342             compressor_ = ICompressor::Create(compression_, header_.block_size);
343         }
344         return CompressWorker::CompressBlocks(compressor_.get(), options_.block_size, data,
345                                               num_blocks, &compressed_buf_);
346     }
347     // Submit the blocks per thread. The retrieval of
348     // compressed buffers has to be done in the same order.
349     // We should not poll for completed buffers in a different order as the
350     // buffers are tightly coupled with block ordering.
351     for (size_t i = 0; i < num_threads; i++) {
352         CompressWorker* worker = compress_threads_[i].get();
353         if (i == num_threads - 1) {
354             num_blocks_per_thread = num_blocks;
355         }
356         worker->EnqueueCompressBlocks(iter, header_.block_size, num_blocks_per_thread);
357         iter += (num_blocks_per_thread * header_.block_size);
358         num_blocks -= num_blocks_per_thread;
359     }
360 
361     for (size_t i = 0; i < num_threads; i++) {
362         CompressWorker* worker = compress_threads_[i].get();
363         if (!worker->GetCompressedBuffers(&compressed_buf_)) {
364             return false;
365         }
366     }
367 
368     return true;
369 }
370 
EmitBlocks(uint64_t new_block_start,const void * data,size_t size,uint64_t old_block,uint16_t offset,CowOperationType type)371 bool CowWriterV2::EmitBlocks(uint64_t new_block_start, const void* data, size_t size,
372                              uint64_t old_block, uint16_t offset, CowOperationType type) {
373     CHECK(!merge_in_progress_);
374     const uint8_t* iter = reinterpret_cast<const uint8_t*>(data);
375 
376     // Update engine can potentially send 100MB of blocks at a time. We
377     // don't want to process all those blocks in one shot as it can
378     // stress the memory. Hence, process the blocks in chunks.
379     //
380     // 1024 blocks is reasonable given we will end up using max
381     // memory of ~4MB.
382     const size_t kProcessingBlocks = 1024;
383     size_t num_blocks = (size / header_.block_size);
384     size_t i = 0;
385 
386     while (num_blocks) {
387         size_t pending_blocks = (std::min(kProcessingBlocks, num_blocks));
388 
389         if (compression_.algorithm && num_compress_threads_ > 1) {
390             if (!CompressBlocks(pending_blocks, iter)) {
391                 return false;
392             }
393             buf_iter_ = compressed_buf_.begin();
394             CHECK(pending_blocks == compressed_buf_.size());
395         }
396 
397         num_blocks -= pending_blocks;
398 
399         while (i < size / header_.block_size && pending_blocks) {
400             CowOperationV2 op = {};
401             op.new_block = new_block_start + i;
402             op.type = type;
403             if (type == kCowXorOp) {
404                 op.source = (old_block + i) * header_.block_size + offset;
405             } else {
406                 op.source = next_data_pos_;
407             }
408 
409             if (compression_.algorithm) {
410                 auto data = [&, this]() {
411                     if (num_compress_threads_ > 1) {
412                         auto data = std::move(*buf_iter_);
413                         buf_iter_++;
414                         return data;
415                     } else {
416                         if (!compressor_) {
417                             compressor_ = ICompressor::Create(compression_, header_.block_size);
418                         }
419 
420                         auto data = compressor_->Compress(iter, header_.block_size);
421                         return data;
422                     }
423                 }();
424                 op.compression = compression_.algorithm;
425                 op.data_length = static_cast<uint16_t>(data.size());
426 
427                 if (!WriteOperation(op, data.data(), data.size())) {
428                     PLOG(ERROR) << "AddRawBlocks: write failed";
429                     return false;
430                 }
431             } else {
432                 op.data_length = static_cast<uint16_t>(header_.block_size);
433                 if (!WriteOperation(op, iter, header_.block_size)) {
434                     PLOG(ERROR) << "AddRawBlocks: write failed";
435                     return false;
436                 }
437             }
438             iter += header_.block_size;
439 
440             i += 1;
441             pending_blocks -= 1;
442         }
443 
444         CHECK(pending_blocks == 0);
445     }
446     return true;
447 }
448 
EmitZeroBlocks(uint64_t new_block_start,uint64_t num_blocks)449 bool CowWriterV2::EmitZeroBlocks(uint64_t new_block_start, uint64_t num_blocks) {
450     CHECK(!merge_in_progress_);
451     for (uint64_t i = 0; i < num_blocks; i++) {
452         CowOperationV2 op = {};
453         op.type = kCowZeroOp;
454         op.new_block = new_block_start + i;
455         op.source = 0;
456         WriteOperation(op);
457     }
458     return true;
459 }
460 
EmitLabel(uint64_t label)461 bool CowWriterV2::EmitLabel(uint64_t label) {
462     CHECK(!merge_in_progress_);
463     CowOperationV2 op = {};
464     op.type = kCowLabelOp;
465     op.source = label;
466     return WriteOperation(op) && Sync();
467 }
468 
EmitSequenceData(size_t num_ops,const uint32_t * data)469 bool CowWriterV2::EmitSequenceData(size_t num_ops, const uint32_t* data) {
470     CHECK(!merge_in_progress_);
471     size_t to_add = 0;
472     size_t max_ops = (header_.block_size * 2) / sizeof(uint32_t);
473     while (num_ops > 0) {
474         CowOperationV2 op = {};
475         op.type = kCowSequenceOp;
476         op.source = next_data_pos_;
477         to_add = std::min(num_ops, max_ops);
478         op.data_length = static_cast<uint16_t>(to_add * sizeof(uint32_t));
479         if (!WriteOperation(op, data, op.data_length)) {
480             PLOG(ERROR) << "AddSequenceData: write failed";
481             return false;
482         }
483         num_ops -= to_add;
484         data += to_add;
485     }
486     return true;
487 }
488 
EmitCluster()489 bool CowWriterV2::EmitCluster() {
490     CowOperationV2 op = {};
491     op.type = kCowClusterOp;
492     // Next cluster starts after remainder of current cluster and the next data block.
493     op.source = current_data_size_ + cluster_size_ - current_cluster_size_ - sizeof(CowOperationV2);
494     return WriteOperation(op);
495 }
496 
EmitClusterIfNeeded()497 bool CowWriterV2::EmitClusterIfNeeded() {
498     // If there isn't room for another op and the cluster end op, end the current cluster
499     if (cluster_size_ && cluster_size_ < current_cluster_size_ + 2 * sizeof(CowOperationV2)) {
500         if (!EmitCluster()) return false;
501     }
502     return true;
503 }
504 
Finalize()505 bool CowWriterV2::Finalize() {
506     if (!FlushCluster()) {
507         LOG(ERROR) << "Finalize: FlushCluster() failed";
508         return false;
509     }
510 
511     auto continue_cluster_size = current_cluster_size_;
512     auto continue_data_size = current_data_size_;
513     auto continue_data_pos = next_data_pos_;
514     auto continue_op_pos = next_op_pos_;
515     auto continue_num_ops = footer_.op.num_ops;
516     bool extra_cluster = false;
517 
518     // Blank out extra ops, in case we're in append mode and dropped ops.
519     if (cluster_size_) {
520         auto unused_cluster_space = cluster_size_ - current_cluster_size_;
521         std::string clr;
522         clr.resize(unused_cluster_space, '\0');
523         if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
524             PLOG(ERROR) << "Failed to seek to footer position.";
525             return false;
526         }
527         if (!android::base::WriteFully(fd_, clr.data(), clr.size())) {
528             PLOG(ERROR) << "clearing unused cluster area failed";
529             return false;
530         }
531     }
532 
533     // Footer should be at the end of a file, so if there is data after the current block, end
534     // it and start a new cluster.
535     if (cluster_size_ && current_data_size_ > 0) {
536         EmitCluster();
537         extra_cluster = true;
538     }
539 
540     footer_.op.ops_size = footer_.op.num_ops * sizeof(CowOperationV2);
541     if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
542         PLOG(ERROR) << "Failed to seek to footer position.";
543         return false;
544     }
545     memset(&footer_.unused, 0, sizeof(footer_.unused));
546 
547     // Write out footer at end of file
548     if (!android::base::WriteFully(fd_, reinterpret_cast<const uint8_t*>(&footer_),
549                                    sizeof(footer_))) {
550         PLOG(ERROR) << "write footer failed";
551         return false;
552     }
553 
554     // Remove excess data, if we're in append mode and threw away more data
555     // than we wrote before.
556     off_t offs = lseek(fd_.get(), 0, SEEK_CUR);
557     if (offs < 0) {
558         PLOG(ERROR) << "Failed to lseek to find current position";
559         return false;
560     }
561     if (!Truncate(offs)) {
562         return false;
563     }
564 
565     // Reposition for additional Writing
566     if (extra_cluster) {
567         current_cluster_size_ = continue_cluster_size;
568         current_data_size_ = continue_data_size;
569         next_data_pos_ = continue_data_pos;
570         next_op_pos_ = continue_op_pos;
571         footer_.op.num_ops = continue_num_ops;
572     }
573 
574     FlushCluster();
575 
576     return Sync();
577 }
578 
GetCowSizeInfo() const579 CowSizeInfo CowWriterV2::GetCowSizeInfo() const {
580     CowSizeInfo info;
581     if (current_data_size_ > 0) {
582         info.cow_size = next_data_pos_ + sizeof(footer_);
583     } else {
584         info.cow_size = next_op_pos_ + sizeof(footer_);
585     }
586     return info;
587 }
588 
GetDataPos(uint64_t * pos)589 bool CowWriterV2::GetDataPos(uint64_t* pos) {
590     off_t offs = lseek(fd_.get(), 0, SEEK_CUR);
591     if (offs < 0) {
592         PLOG(ERROR) << "lseek failed";
593         return false;
594     }
595     *pos = offs;
596     return true;
597 }
598 
EnsureSpaceAvailable(const uint64_t bytes_needed) const599 bool CowWriterV2::EnsureSpaceAvailable(const uint64_t bytes_needed) const {
600     if (bytes_needed > cow_image_size_) {
601         LOG(ERROR) << "No space left on COW device. Required: " << bytes_needed
602                    << ", available: " << cow_image_size_;
603         errno = ENOSPC;
604         return false;
605     }
606     return true;
607 }
608 
FlushCluster()609 bool CowWriterV2::FlushCluster() {
610     ssize_t ret;
611 
612     if (op_vec_index_) {
613         ret = pwritev(fd_.get(), cowop_vec_.get(), op_vec_index_, current_op_pos_);
614         if (ret != (op_vec_index_ * sizeof(CowOperationV2))) {
615             PLOG(ERROR) << "pwritev failed for CowOperationV2. Expected: "
616                         << (op_vec_index_ * sizeof(CowOperationV2));
617             return false;
618         }
619     }
620 
621     if (data_vec_index_) {
622         ret = pwritev(fd_.get(), data_vec_.get(), data_vec_index_, current_data_pos_);
623         if (ret != total_data_written_) {
624             PLOG(ERROR) << "pwritev failed for data. Expected: " << total_data_written_;
625             return false;
626         }
627     }
628 
629     total_data_written_ = 0;
630     op_vec_index_ = 0;
631     data_vec_index_ = 0;
632     current_op_pos_ = next_op_pos_;
633     current_data_pos_ = next_data_pos_;
634 
635     return true;
636 }
637 
WriteOperation(const CowOperationV2 & op,const void * data,size_t size)638 bool CowWriterV2::WriteOperation(const CowOperationV2& op, const void* data, size_t size) {
639     if (!EnsureSpaceAvailable(next_op_pos_ + sizeof(op)) ||
640         !EnsureSpaceAvailable(next_data_pos_ + size)) {
641         return false;
642     }
643 
644     if (batch_write_) {
645         CowOperationV2* cow_op =
646                 reinterpret_cast<CowOperationV2*>(cowop_vec_[op_vec_index_].iov_base);
647         std::memcpy(cow_op, &op, sizeof(CowOperationV2));
648         op_vec_index_ += 1;
649 
650         if (data != nullptr && size > 0) {
651             struct iovec* data_ptr = data_vec_.get();
652             std::memcpy(data_ptr[data_vec_index_].iov_base, data, size);
653             data_ptr[data_vec_index_].iov_len = size;
654             data_vec_index_ += 1;
655             total_data_written_ += size;
656         }
657     } else {
658         if (lseek(fd_.get(), next_op_pos_, SEEK_SET) < 0) {
659             PLOG(ERROR) << "lseek failed for writing operation.";
660             return false;
661         }
662         if (!android::base::WriteFully(fd_, reinterpret_cast<const uint8_t*>(&op), sizeof(op))) {
663             return false;
664         }
665         if (data != nullptr && size > 0) {
666             if (!WriteRawData(data, size)) return false;
667         }
668     }
669 
670     AddOperation(op);
671 
672     if (batch_write_) {
673         if (op_vec_index_ == header_.cluster_ops || data_vec_index_ == header_.cluster_ops ||
674             op.type == kCowLabelOp || op.type == kCowClusterOp) {
675             if (!FlushCluster()) {
676                 LOG(ERROR) << "Failed to flush cluster data";
677                 return false;
678             }
679         }
680     }
681 
682     return EmitClusterIfNeeded();
683 }
684 
AddOperation(const CowOperationV2 & op)685 void CowWriterV2::AddOperation(const CowOperationV2& op) {
686     footer_.op.num_ops++;
687 
688     if (op.type == kCowClusterOp) {
689         current_cluster_size_ = 0;
690         current_data_size_ = 0;
691     } else if (header_.cluster_ops) {
692         current_cluster_size_ += sizeof(op);
693         current_data_size_ += op.data_length;
694     }
695 
696     next_data_pos_ += op.data_length + GetNextDataOffset(op, header_.cluster_ops);
697     next_op_pos_ += sizeof(CowOperationV2) + GetNextOpOffset(op, header_.cluster_ops);
698 }
699 
WriteRawData(const void * data,const size_t size)700 bool CowWriterV2::WriteRawData(const void* data, const size_t size) {
701     if (!android::base::WriteFullyAtOffset(fd_, data, size, next_data_pos_)) {
702         return false;
703     }
704     return true;
705 }
706 
Truncate(off_t length)707 bool CowWriterV2::Truncate(off_t length) {
708     if (is_dev_null_ || is_block_device_) {
709         return true;
710     }
711     if (ftruncate(fd_.get(), length) < 0) {
712         PLOG(ERROR) << "Failed to truncate.";
713         return false;
714     }
715     return true;
716 }
717 
718 }  // namespace snapshot
719 }  // namespace android
720