/* * Copyright (C) 2022 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "AidlBufferPoolAcc" //#define LOG_NDEBUG 0 #include #include #include #include #include #include #include #include "Accessor.h" #include "Connection.h" #include "DataHelper.h" namespace aidl::android::hardware::media::bufferpool2::implementation { namespace { static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs } #ifdef __ANDROID_VNDK__ static constexpr uint32_t kSeqIdVndkBit = 1U << 31; #else static constexpr uint32_t kSeqIdVndkBit = 0; #endif static constexpr uint32_t kSeqIdMax = 0x7fffffff; Accessor::ConnectionIdGenerator::ConnectionIdGenerator() { mSeqId = static_cast(time(nullptr) & kSeqIdMax); mPid = static_cast(getpid()); } ConnectionId Accessor::ConnectionIdGenerator::getConnectionId() { uint32_t seq; { std::lock_guard l(mLock); seq = mSeqId; if (mSeqId == kSeqIdMax) { mSeqId = 0; } else { ++mSeqId; } } return (int64_t)mPid << 32 | seq | kSeqIdVndkBit; } namespace { // anonymous namespace static std::shared_ptr sConnectionDeathRecipient = std::make_shared(); void serviceDied(void *cookie) { if (sConnectionDeathRecipient) { sConnectionDeathRecipient->onDead(cookie); } } } std::shared_ptr Accessor::getConnectionDeathRecipient() { return sConnectionDeathRecipient; } ConnectionDeathRecipient::ConnectionDeathRecipient() { mDeathRecipient = ndk::ScopedAIBinder_DeathRecipient( AIBinder_DeathRecipient_new(serviceDied)); } void ConnectionDeathRecipient::add( int64_t connectionId, const std::shared_ptr &accessor) { std::lock_guard lock(mLock); if (mAccessors.find(connectionId) == mAccessors.end()) { mAccessors.insert(std::make_pair(connectionId, accessor)); } } void ConnectionDeathRecipient::remove(int64_t connectionId) { std::lock_guard lock(mLock); mAccessors.erase(connectionId); auto it = mConnectionToCookie.find(connectionId); if (it != mConnectionToCookie.end()) { void * cookie = it->second; mConnectionToCookie.erase(it); auto cit = mCookieToConnections.find(cookie); if (cit != mCookieToConnections.end()) { cit->second.erase(connectionId); if (cit->second.size() == 0) { mCookieToConnections.erase(cit); } } } } void ConnectionDeathRecipient::addCookieToConnection( void *cookie, int64_t connectionId) { std::lock_guard lock(mLock); if (mAccessors.find(connectionId) == mAccessors.end()) { return; } mConnectionToCookie.insert(std::make_pair(connectionId, cookie)); auto it = mCookieToConnections.find(cookie); if (it != mCookieToConnections.end()) { it->second.insert(connectionId); } else { mCookieToConnections.insert(std::make_pair( cookie, std::set{connectionId})); } } void ConnectionDeathRecipient::onDead(void *cookie) { std::map> connectionsToClose; { std::lock_guard lock(mLock); auto it = mCookieToConnections.find(cookie); if (it != mCookieToConnections.end()) { for (auto conIt = it->second.begin(); conIt != it->second.end(); ++conIt) { auto accessorIt = mAccessors.find(*conIt); if (accessorIt != mAccessors.end()) { connectionsToClose.insert(std::make_pair(*conIt, accessorIt->second)); mAccessors.erase(accessorIt); } mConnectionToCookie.erase(*conIt); } mCookieToConnections.erase(it); } } if (connectionsToClose.size() > 0) { std::shared_ptr accessor; for (auto it = connectionsToClose.begin(); it != connectionsToClose.end(); ++it) { accessor = it->second.lock(); if (accessor) { accessor->close(it->first); ALOGD("connection %lld closed on death", (long long)it->first); } } } } AIBinder_DeathRecipient *ConnectionDeathRecipient::getRecipient() { return mDeathRecipient.get(); } ::ndk::ScopedAStatus Accessor::connect(const std::shared_ptr<::aidl::android::hardware::media::bufferpool2::IObserver>& in_observer, ::aidl::android::hardware::media::bufferpool2::IAccessor::ConnectionInfo* _aidl_return) { std::shared_ptr connection; ConnectionId connectionId; uint32_t msgId; StatusDescriptor statusDesc; InvalidationDescriptor invDesc; BufferPoolStatus status = connect( in_observer, false, &connection, &connectionId, &msgId, &statusDesc, &invDesc); if (status == ResultStatus::OK) { _aidl_return->connection = connection; _aidl_return->connectionId = connectionId; _aidl_return->msgId = msgId; _aidl_return->toFmqDesc = std::move(statusDesc); _aidl_return->fromFmqDesc = std::move(invDesc); return ::ndk::ScopedAStatus::ok(); } return ::ndk::ScopedAStatus::fromServiceSpecificError(status); } Accessor::Accessor(const std::shared_ptr &allocator) : mAllocator(allocator), mScheduleEvictTs(0) {} Accessor::~Accessor() { } bool Accessor::isValid() { return mBufferPool.isValid(); } BufferPoolStatus Accessor::flush() { std::lock_guard lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.flush(ref()); return ResultStatus::OK; } BufferPoolStatus Accessor::allocate( ConnectionId connectionId, const std::vector ¶ms, BufferId *bufferId, const native_handle_t** handle) { std::unique_lock lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); BufferPoolStatus status = ResultStatus::OK; if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) { lock.unlock(); std::shared_ptr alloc; size_t allocSize; status = mAllocator->allocate(params, &alloc, &allocSize); lock.lock(); if (status == ResultStatus::OK) { status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle); } ALOGV("create a buffer %d : %u %p", status == ResultStatus::OK, *bufferId, *handle); } if (status == ResultStatus::OK) { // TODO: handle ownBuffer failure mBufferPool.handleOwnBuffer(connectionId, *bufferId); } mBufferPool.cleanUp(); scheduleEvictIfNeeded(); return status; } BufferPoolStatus Accessor::fetch( ConnectionId connectionId, TransactionId transactionId, BufferId bufferId, const native_handle_t** handle) { std::lock_guard lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); auto found = mBufferPool.mTransactions.find(transactionId); if (found != mBufferPool.mTransactions.end() && contains(&mBufferPool.mPendingTransactions, connectionId, transactionId)) { if (found->second->mSenderValidated && found->second->mStatus == BufferStatus::TRANSFER_FROM && found->second->mBufferId == bufferId) { found->second->mStatus = BufferStatus::TRANSFER_FETCH; auto bufferIt = mBufferPool.mBuffers.find(bufferId); if (bufferIt != mBufferPool.mBuffers.end()) { mBufferPool.mStats.onBufferFetched(); *handle = bufferIt->second->handle(); return ResultStatus::OK; } } } mBufferPool.cleanUp(); scheduleEvictIfNeeded(); return ResultStatus::CRITICAL_ERROR; } BufferPoolStatus Accessor::connect( const std::shared_ptr &observer, bool local, std::shared_ptr *connection, ConnectionId *pConnectionId, uint32_t *pMsgId, StatusDescriptor* statusDescPtr, InvalidationDescriptor* invDescPtr) { static ::android::base::NoDestructor sConIdGenerator; std::shared_ptr newConnection = ::ndk::SharedRefBase::make(); BufferPoolStatus status = ResultStatus::CRITICAL_ERROR; { std::lock_guard lock(mBufferPool.mMutex); if (newConnection) { int32_t pid = getpid(); ConnectionId id = sConIdGenerator->getConnectionId(); status = mBufferPool.mObserver.open(id, statusDescPtr); if (status == ResultStatus::OK) { newConnection->initialize(ref(), id); *connection = newConnection; *pConnectionId = id; *pMsgId = mBufferPool.mInvalidation.mInvalidationId; mBufferPool.mConnectionIds.insert(id); mBufferPool.mInvalidationChannel.getDesc(invDescPtr); mBufferPool.mInvalidation.onConnect(id, observer); } } mBufferPool.processStatusMessages(); mBufferPool.cleanUp(); scheduleEvictIfNeeded(); } if (!local && status == ResultStatus::OK) { std::shared_ptr accessor(ref()); sConnectionDeathRecipient->add(*pConnectionId, accessor); } return status; } BufferPoolStatus Accessor::close(ConnectionId connectionId) { { std::lock_guard lock(mBufferPool.mMutex); ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId); mBufferPool.processStatusMessages(); mBufferPool.handleClose(connectionId); mBufferPool.mObserver.close(connectionId); mBufferPool.mInvalidation.onClose(connectionId); // Since close# will be called after all works are finished, it is OK to // evict unused buffers. mBufferPool.cleanUp(true); scheduleEvictIfNeeded(); } sConnectionDeathRecipient->remove(connectionId); return ResultStatus::OK; } void Accessor::cleanUp(bool clearCache) { // transaction timeout, buffer caching TTL handling std::lock_guard lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.cleanUp(clearCache); } void Accessor::handleInvalidateAck() { std::map> observers; uint32_t invalidationId; { std::lock_guard lock(mBufferPool.mMutex); mBufferPool.processStatusMessages(); mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId); } // Do not hold lock for send invalidations size_t deadClients = 0; for (auto it = observers.begin(); it != observers.end(); ++it) { const std::shared_ptr observer = it->second; if (observer) { ::ndk::ScopedAStatus status = observer->onMessage(it->first, invalidationId); if (!status.isOk()) { ++deadClients; } } } if (deadClients > 0) { ALOGD("During invalidation found %zu dead clients", deadClients); } } void Accessor::invalidatorThread( std::map> &accessors, std::mutex &mutex, std::condition_variable &cv, bool &ready) { constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024; constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8; constexpr useconds_t MAX_SLEEP_US = 10000; uint32_t numSpin = 0; useconds_t sleepUs = 1; while(true) { std::map> copied; { std::unique_lock lock(mutex); while (!ready) { numSpin = 0; sleepUs = 1; cv.wait(lock); } copied.insert(accessors.begin(), accessors.end()); } std::list erased; for (auto it = copied.begin(); it != copied.end(); ++it) { const std::shared_ptr acc = it->second.lock(); if (!acc) { erased.push_back(it->first); } else { acc->handleInvalidateAck(); } } { std::unique_lock lock(mutex); for (auto it = erased.begin(); it != erased.end(); ++it) { accessors.erase(*it); } if (accessors.size() == 0) { ready = false; } else { // N.B. Since there is not a efficient way to wait over FMQ, // polling over the FMQ is the current way to prevent draining // CPU. lock.unlock(); ++numSpin; if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 && sleepUs < MAX_SLEEP_US) { sleepUs *= 10; } if (numSpin % NUM_SPIN_TO_LOG == 0) { ALOGW("invalidator thread spinning"); } ::usleep(sleepUs); } } } } Accessor::AccessorInvalidator::AccessorInvalidator() : mReady(false) { std::thread invalidator( invalidatorThread, std::ref(mAccessors), std::ref(mMutex), std::ref(mCv), std::ref(mReady)); invalidator.detach(); } void Accessor::AccessorInvalidator::addAccessor( uint32_t accessorId, const std::weak_ptr &accessor) { bool notify = false; std::unique_lock lock(mMutex); if (mAccessors.find(accessorId) == mAccessors.end()) { if (!mReady) { mReady = true; notify = true; } mAccessors.emplace(accessorId, accessor); ALOGV("buffer invalidation added bp:%u %d", accessorId, notify); } lock.unlock(); if (notify) { mCv.notify_one(); } } void Accessor::AccessorInvalidator::delAccessor(uint32_t accessorId) { std::lock_guard lock(mMutex); mAccessors.erase(accessorId); ALOGV("buffer invalidation deleted bp:%u", accessorId); if (mAccessors.size() == 0) { mReady = false; } } std::unique_ptr Accessor::sInvalidator; void Accessor::createInvalidator() { if (!sInvalidator) { sInvalidator = std::make_unique(); } } void Accessor::evictorThread( std::map, nsecs_t, std::owner_less<>> &accessors, std::mutex &mutex, std::condition_variable &cv) { std::list> evictList; while (true) { int expired = 0; int evicted = 0; { nsecs_t now = systemTime(); std::unique_lock lock(mutex); while (accessors.size() == 0) { cv.wait(lock); } auto it = accessors.begin(); while (it != accessors.end()) { if (now > (it->second + kEvictDurationNs)) { ++expired; evictList.push_back(it->first); it = accessors.erase(it); } else { ++it; } } } // evict idle accessors; for (auto it = evictList.begin(); it != evictList.end(); ++it) { const std::shared_ptr accessor = it->lock(); if (accessor) { accessor->cleanUp(true); ++evicted; } } if (expired > 0) { ALOGD("evictor expired: %d, evicted: %d", expired, evicted); } evictList.clear(); ::usleep(kEvictGranularityNs / 1000); } } Accessor::AccessorEvictor::AccessorEvictor() { std::thread evictor( evictorThread, std::ref(mAccessors), std::ref(mMutex), std::ref(mCv)); evictor.detach(); } void Accessor::AccessorEvictor::addAccessor( const std::weak_ptr &accessor, nsecs_t ts) { std::lock_guard lock(mMutex); bool notify = mAccessors.empty(); auto it = mAccessors.find(accessor); if (it == mAccessors.end()) { mAccessors.emplace(accessor, ts); } else { it->second = ts; } if (notify) { mCv.notify_one(); } } std::unique_ptr Accessor::sEvictor; void Accessor::createEvictor() { if (!sEvictor) { sEvictor = std::make_unique(); } } void Accessor::scheduleEvictIfNeeded() { nsecs_t now = systemTime(); if (now > (mScheduleEvictTs + kEvictGranularityNs)) { mScheduleEvictTs = now; sEvictor->addAccessor(ref(), now); } } } // namespace aidl::android::hardware::media::bufferpool2::implemntation {