1 /* 2 * Copyright (C) 2018 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #define LOG_TAG "BufferPoolAccessor1.0" 18 //#define LOG_NDEBUG 0 19 20 #include <sys/types.h> 21 #include <stdint.h> 22 #include <time.h> 23 #include <unistd.h> 24 #include <utils/Log.h> 25 #include "AccessorImpl.h" 26 #include "Connection.h" 27 28 namespace android { 29 namespace hardware { 30 namespace media { 31 namespace bufferpool { 32 namespace V1_0 { 33 namespace implementation { 34 35 namespace { 36 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec 37 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs 38 39 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15; 40 static constexpr size_t kMinBufferCountForEviction = 40; 41 } 42 43 // Buffer structure in bufferpool process 44 struct InternalBuffer { 45 BufferId mId; 46 size_t mOwnerCount; 47 size_t mTransactionCount; 48 const std::shared_ptr<BufferPoolAllocation> mAllocation; 49 const size_t mAllocSize; 50 const std::vector<uint8_t> mConfig; 51 InternalBufferandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer52 InternalBuffer( 53 BufferId id, 54 const std::shared_ptr<BufferPoolAllocation> &alloc, 55 const size_t allocSize, 56 const std::vector<uint8_t> &allocConfig) 57 : mId(id), mOwnerCount(0), mTransactionCount(0), 58 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {} 59 handleandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer60 const native_handle_t *handle() { 61 return mAllocation->handle(); 62 } 63 }; 64 65 struct TransactionStatus { 66 TransactionId mId; 67 BufferId mBufferId; 68 ConnectionId mSender; 69 ConnectionId mReceiver; 70 BufferStatus mStatus; 71 int64_t mTimestampUs; 72 bool mSenderValidated; 73 TransactionStatusandroid::hardware::media::bufferpool::V1_0::implementation::TransactionStatus74 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) { 75 mId = message.transactionId; 76 mBufferId = message.bufferId; 77 mStatus = message.newStatus; 78 mTimestampUs = timestampUs; 79 if (mStatus == BufferStatus::TRANSFER_TO) { 80 mSender = message.connectionId; 81 mReceiver = message.targetConnectionId; 82 mSenderValidated = true; 83 } else { 84 mSender = -1LL; 85 mReceiver = message.connectionId; 86 mSenderValidated = false; 87 } 88 } 89 }; 90 91 // Helper template methods for handling map of set. 92 template<class T, class U> insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)93 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) { 94 auto iter = mapOfSet->find(key); 95 if (iter == mapOfSet->end()) { 96 std::set<U> valueSet{value}; 97 mapOfSet->insert(std::make_pair(key, valueSet)); 98 return true; 99 } else if (iter->second.find(value) == iter->second.end()) { 100 iter->second.insert(value); 101 return true; 102 } 103 return false; 104 } 105 106 template<class T, class U> erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)107 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) { 108 bool ret = false; 109 auto iter = mapOfSet->find(key); 110 if (iter != mapOfSet->end()) { 111 if (iter->second.erase(value) > 0) { 112 ret = true; 113 } 114 if (iter->second.size() == 0) { 115 mapOfSet->erase(iter); 116 } 117 } 118 return ret; 119 } 120 121 template<class T, class U> contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)122 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) { 123 auto iter = mapOfSet->find(key); 124 if (iter != mapOfSet->end()) { 125 auto setIter = iter->second.find(value); 126 return setIter != iter->second.end(); 127 } 128 return false; 129 } 130 131 uint32_t Accessor::Impl::sSeqId = time(nullptr); 132 Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)133 Accessor::Impl::Impl( 134 const std::shared_ptr<BufferPoolAllocator> &allocator) 135 : mAllocator(allocator) {} 136 ~Impl()137 Accessor::Impl::~Impl() { 138 } 139 connect(const sp<Accessor> & accessor,sp<Connection> * connection,ConnectionId * pConnectionId,const QueueDescriptor ** fmqDescPtr)140 ResultStatus Accessor::Impl::connect( 141 const sp<Accessor> &accessor, sp<Connection> *connection, 142 ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) { 143 sp<Connection> newConnection = new Connection(); 144 ResultStatus status = ResultStatus::CRITICAL_ERROR; 145 { 146 std::lock_guard<std::mutex> lock(mBufferPool.mMutex); 147 if (newConnection) { 148 int32_t pid = getpid(); 149 ConnectionId id = (int64_t)pid << 32 | sSeqId; 150 status = mBufferPool.mObserver.open(id, fmqDescPtr); 151 if (status == ResultStatus::OK) { 152 newConnection->initialize(accessor, id); 153 *connection = newConnection; 154 *pConnectionId = id; 155 mBufferPool.mConnectionIds.insert(id); 156 if (sSeqId == UINT32_MAX) { 157 sSeqId = 0; 158 } else { 159 ++sSeqId; 160 } 161 } 162 } 163 mBufferPool.processStatusMessages(); 164 mBufferPool.cleanUp(); 165 } 166 return status; 167 } 168 close(ConnectionId connectionId)169 ResultStatus Accessor::Impl::close(ConnectionId connectionId) { 170 std::lock_guard<std::mutex> lock(mBufferPool.mMutex); 171 mBufferPool.processStatusMessages(); 172 mBufferPool.handleClose(connectionId); 173 mBufferPool.mObserver.close(connectionId); 174 // Since close# will be called after all works are finished, it is OK to 175 // evict unused buffers. 176 mBufferPool.cleanUp(true); 177 return ResultStatus::OK; 178 } 179 allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)180 ResultStatus Accessor::Impl::allocate( 181 ConnectionId connectionId, const std::vector<uint8_t>& params, 182 BufferId *bufferId, const native_handle_t** handle) { 183 std::unique_lock<std::mutex> lock(mBufferPool.mMutex); 184 mBufferPool.processStatusMessages(); 185 ResultStatus status = ResultStatus::OK; 186 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) { 187 lock.unlock(); 188 std::shared_ptr<BufferPoolAllocation> alloc; 189 size_t allocSize; 190 status = mAllocator->allocate(params, &alloc, &allocSize); 191 lock.lock(); 192 if (status == ResultStatus::OK) { 193 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle); 194 } 195 ALOGV("create a buffer %d : %u %p", 196 status == ResultStatus::OK, *bufferId, *handle); 197 } 198 if (status == ResultStatus::OK) { 199 // TODO: handle ownBuffer failure 200 mBufferPool.handleOwnBuffer(connectionId, *bufferId); 201 } 202 mBufferPool.cleanUp(); 203 return status; 204 } 205 fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)206 ResultStatus Accessor::Impl::fetch( 207 ConnectionId connectionId, TransactionId transactionId, 208 BufferId bufferId, const native_handle_t** handle) { 209 std::lock_guard<std::mutex> lock(mBufferPool.mMutex); 210 mBufferPool.processStatusMessages(); 211 auto found = mBufferPool.mTransactions.find(transactionId); 212 if (found != mBufferPool.mTransactions.end() && 213 contains(&mBufferPool.mPendingTransactions, 214 connectionId, transactionId)) { 215 if (found->second->mSenderValidated && 216 found->second->mStatus == BufferStatus::TRANSFER_FROM && 217 found->second->mBufferId == bufferId) { 218 found->second->mStatus = BufferStatus::TRANSFER_FETCH; 219 auto bufferIt = mBufferPool.mBuffers.find(bufferId); 220 if (bufferIt != mBufferPool.mBuffers.end()) { 221 mBufferPool.mStats.onBufferFetched(); 222 *handle = bufferIt->second->handle(); 223 return ResultStatus::OK; 224 } 225 } 226 } 227 mBufferPool.cleanUp(); 228 return ResultStatus::CRITICAL_ERROR; 229 } 230 cleanUp(bool clearCache)231 void Accessor::Impl::cleanUp(bool clearCache) { 232 // transaction timeout, buffer cacheing TTL handling 233 std::lock_guard<std::mutex> lock(mBufferPool.mMutex); 234 mBufferPool.processStatusMessages(); 235 mBufferPool.cleanUp(clearCache); 236 } 237 BufferPool()238 Accessor::Impl::Impl::BufferPool::BufferPool() 239 : mTimestampUs(getTimestampNow()), 240 mLastCleanUpUs(mTimestampUs), 241 mLastLogUs(mTimestampUs), 242 mSeq(0) {} 243 244 245 // Statistics helper 246 template<typename T, typename S> percentage(T base,S total)247 int percentage(T base, S total) { 248 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0); 249 } 250 ~BufferPool()251 Accessor::Impl::Impl::BufferPool::~BufferPool() { 252 std::lock_guard<std::mutex> lock(mMutex); 253 ALOGD("Destruction - bufferpool %p " 254 "cached: %zu/%zuM, %zu/%d%% in use; " 255 "allocs: %zu, %d%% recycled; " 256 "transfers: %zu, %d%% unfetched", 257 this, mStats.mBuffersCached, mStats.mSizeCached >> 20, 258 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached), 259 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations), 260 mStats.mTotalTransfers, 261 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers)); 262 } 263 handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)264 bool Accessor::Impl::BufferPool::handleOwnBuffer( 265 ConnectionId connectionId, BufferId bufferId) { 266 267 bool added = insert(&mUsingBuffers, connectionId, bufferId); 268 if (added) { 269 auto iter = mBuffers.find(bufferId); 270 iter->second->mOwnerCount++; 271 } 272 insert(&mUsingConnections, bufferId, connectionId); 273 return added; 274 } 275 handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)276 bool Accessor::Impl::BufferPool::handleReleaseBuffer( 277 ConnectionId connectionId, BufferId bufferId) { 278 bool deleted = erase(&mUsingBuffers, connectionId, bufferId); 279 if (deleted) { 280 auto iter = mBuffers.find(bufferId); 281 iter->second->mOwnerCount--; 282 if (iter->second->mOwnerCount == 0 && 283 iter->second->mTransactionCount == 0) { 284 mStats.onBufferUnused(iter->second->mAllocSize); 285 mFreeBuffers.insert(bufferId); 286 } 287 } 288 erase(&mUsingConnections, bufferId, connectionId); 289 ALOGV("release buffer %u : %d", bufferId, deleted); 290 return deleted; 291 } 292 handleTransferTo(const BufferStatusMessage & message)293 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) { 294 auto completed = mCompletedTransactions.find( 295 message.transactionId); 296 if (completed != mCompletedTransactions.end()) { 297 // already completed 298 mCompletedTransactions.erase(completed); 299 return true; 300 } 301 // the buffer should exist and be owned. 302 auto bufferIter = mBuffers.find(message.bufferId); 303 if (bufferIter == mBuffers.end() || 304 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) { 305 return false; 306 } 307 auto found = mTransactions.find(message.transactionId); 308 if (found != mTransactions.end()) { 309 // transfer_from was received earlier. 310 found->second->mSender = message.connectionId; 311 found->second->mSenderValidated = true; 312 return true; 313 } 314 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) { 315 // N.B: it could be fake or receive connection already closed. 316 ALOGD("bufferpool %p receiver connection %lld is no longer valid", 317 this, (long long)message.targetConnectionId); 318 return false; 319 } 320 mStats.onBufferSent(); 321 mTransactions.insert(std::make_pair( 322 message.transactionId, 323 std::make_unique<TransactionStatus>(message, mTimestampUs))); 324 insert(&mPendingTransactions, message.targetConnectionId, 325 message.transactionId); 326 bufferIter->second->mTransactionCount++; 327 return true; 328 } 329 handleTransferFrom(const BufferStatusMessage & message)330 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) { 331 auto found = mTransactions.find(message.transactionId); 332 if (found == mTransactions.end()) { 333 // TODO: is it feasible to check ownership here? 334 mStats.onBufferSent(); 335 mTransactions.insert(std::make_pair( 336 message.transactionId, 337 std::make_unique<TransactionStatus>(message, mTimestampUs))); 338 insert(&mPendingTransactions, message.connectionId, 339 message.transactionId); 340 auto bufferIter = mBuffers.find(message.bufferId); 341 bufferIter->second->mTransactionCount++; 342 } else { 343 if (message.connectionId == found->second->mReceiver) { 344 found->second->mStatus = BufferStatus::TRANSFER_FROM; 345 } 346 } 347 return true; 348 } 349 handleTransferResult(const BufferStatusMessage & message)350 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) { 351 auto found = mTransactions.find(message.transactionId); 352 if (found != mTransactions.end()) { 353 bool deleted = erase(&mPendingTransactions, message.connectionId, 354 message.transactionId); 355 if (deleted) { 356 if (!found->second->mSenderValidated) { 357 mCompletedTransactions.insert(message.transactionId); 358 } 359 auto bufferIter = mBuffers.find(message.bufferId); 360 if (message.newStatus == BufferStatus::TRANSFER_OK) { 361 handleOwnBuffer(message.connectionId, message.bufferId); 362 } 363 bufferIter->second->mTransactionCount--; 364 if (bufferIter->second->mOwnerCount == 0 365 && bufferIter->second->mTransactionCount == 0) { 366 mStats.onBufferUnused(bufferIter->second->mAllocSize); 367 mFreeBuffers.insert(message.bufferId); 368 } 369 mTransactions.erase(found); 370 } 371 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId, 372 message.bufferId, deleted); 373 return deleted; 374 } 375 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId, 376 message.bufferId); 377 return false; 378 } 379 processStatusMessages()380 void Accessor::Impl::BufferPool::processStatusMessages() { 381 std::vector<BufferStatusMessage> messages; 382 mObserver.getBufferStatusChanges(messages); 383 mTimestampUs = getTimestampNow(); 384 for (BufferStatusMessage& message: messages) { 385 bool ret = false; 386 switch (message.newStatus) { 387 case BufferStatus::NOT_USED: 388 ret = handleReleaseBuffer( 389 message.connectionId, message.bufferId); 390 break; 391 case BufferStatus::USED: 392 // not happening 393 break; 394 case BufferStatus::TRANSFER_TO: 395 ret = handleTransferTo(message); 396 break; 397 case BufferStatus::TRANSFER_FROM: 398 ret = handleTransferFrom(message); 399 break; 400 case BufferStatus::TRANSFER_TIMEOUT: 401 // TODO 402 break; 403 case BufferStatus::TRANSFER_LOST: 404 // TODO 405 break; 406 case BufferStatus::TRANSFER_FETCH: 407 // not happening 408 break; 409 case BufferStatus::TRANSFER_OK: 410 case BufferStatus::TRANSFER_ERROR: 411 ret = handleTransferResult(message); 412 break; 413 } 414 if (ret == false) { 415 ALOGW("buffer status message processing failure - message : %d connection : %lld", 416 message.newStatus, (long long)message.connectionId); 417 } 418 } 419 messages.clear(); 420 } 421 handleClose(ConnectionId connectionId)422 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) { 423 // Cleaning buffers 424 auto buffers = mUsingBuffers.find(connectionId); 425 if (buffers != mUsingBuffers.end()) { 426 for (const BufferId& bufferId : buffers->second) { 427 bool deleted = erase(&mUsingConnections, bufferId, connectionId); 428 if (deleted) { 429 auto bufferIter = mBuffers.find(bufferId); 430 bufferIter->second->mOwnerCount--; 431 if (bufferIter->second->mOwnerCount == 0 && 432 bufferIter->second->mTransactionCount == 0) { 433 // TODO: handle freebuffer insert fail 434 mStats.onBufferUnused(bufferIter->second->mAllocSize); 435 mFreeBuffers.insert(bufferId); 436 } 437 } 438 } 439 mUsingBuffers.erase(buffers); 440 } 441 442 // Cleaning transactions 443 auto pending = mPendingTransactions.find(connectionId); 444 if (pending != mPendingTransactions.end()) { 445 for (const TransactionId& transactionId : pending->second) { 446 auto iter = mTransactions.find(transactionId); 447 if (iter != mTransactions.end()) { 448 if (!iter->second->mSenderValidated) { 449 mCompletedTransactions.insert(transactionId); 450 } 451 BufferId bufferId = iter->second->mBufferId; 452 auto bufferIter = mBuffers.find(bufferId); 453 bufferIter->second->mTransactionCount--; 454 if (bufferIter->second->mOwnerCount == 0 && 455 bufferIter->second->mTransactionCount == 0) { 456 // TODO: handle freebuffer insert fail 457 mStats.onBufferUnused(bufferIter->second->mAllocSize); 458 mFreeBuffers.insert(bufferId); 459 } 460 mTransactions.erase(iter); 461 } 462 } 463 } 464 mConnectionIds.erase(connectionId); 465 return true; 466 } 467 getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)468 bool Accessor::Impl::BufferPool::getFreeBuffer( 469 const std::shared_ptr<BufferPoolAllocator> &allocator, 470 const std::vector<uint8_t> ¶ms, BufferId *pId, 471 const native_handle_t** handle) { 472 auto bufferIt = mFreeBuffers.begin(); 473 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) { 474 BufferId bufferId = *bufferIt; 475 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) { 476 break; 477 } 478 } 479 if (bufferIt != mFreeBuffers.end()) { 480 BufferId id = *bufferIt; 481 mFreeBuffers.erase(bufferIt); 482 mStats.onBufferRecycled(mBuffers[id]->mAllocSize); 483 *handle = mBuffers[id]->handle(); 484 *pId = id; 485 ALOGV("recycle a buffer %u %p", id, *handle); 486 return true; 487 } 488 return false; 489 } 490 addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)491 ResultStatus Accessor::Impl::BufferPool::addNewBuffer( 492 const std::shared_ptr<BufferPoolAllocation> &alloc, 493 const size_t allocSize, 494 const std::vector<uint8_t> ¶ms, 495 BufferId *pId, 496 const native_handle_t** handle) { 497 498 BufferId bufferId = mSeq++; 499 if (mSeq == Connection::SYNC_BUFFERID) { 500 mSeq = 0; 501 } 502 std::unique_ptr<InternalBuffer> buffer = 503 std::make_unique<InternalBuffer>( 504 bufferId, alloc, allocSize, params); 505 if (buffer) { 506 auto res = mBuffers.insert(std::make_pair( 507 bufferId, std::move(buffer))); 508 if (res.second) { 509 mStats.onBufferAllocated(allocSize); 510 *handle = alloc->handle(); 511 *pId = bufferId; 512 return ResultStatus::OK; 513 } 514 } 515 return ResultStatus::NO_MEMORY; 516 } 517 cleanUp(bool clearCache)518 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) { 519 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) { 520 mLastCleanUpUs = mTimestampUs; 521 if (mTimestampUs > mLastLogUs + kLogDurationUs) { 522 mLastLogUs = mTimestampUs; 523 ALOGD("bufferpool %p : %zu(%zu size) total buffers - " 524 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - " 525 "%zu/%zu (fetch/transfer)", 526 this, mStats.mBuffersCached, mStats.mSizeCached, 527 mStats.mBuffersInUse, mStats.mSizeInUse, 528 mStats.mTotalRecycles, mStats.mTotalAllocations, 529 mStats.mTotalFetches, mStats.mTotalTransfers); 530 } 531 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) { 532 if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction 533 && mBuffers.size() < kMinBufferCountForEviction) { 534 break; 535 } 536 auto it = mBuffers.find(*freeIt); 537 if (it != mBuffers.end() && 538 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) { 539 mStats.onBufferEvicted(it->second->mAllocSize); 540 mBuffers.erase(it); 541 freeIt = mFreeBuffers.erase(freeIt); 542 } else { 543 ++freeIt; 544 ALOGW("bufferpool inconsistent!"); 545 } 546 } 547 } 548 } 549 550 } // namespace implementation 551 } // namespace V1_0 552 } // namespace bufferpool 553 } // namespace media 554 } // namespace hardware 555 } // namespace android 556