1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolAccessor2.0"
18 //#define LOG_NDEBUG 0
19 
20 #include <android-base/no_destructor.h>
21 
22 #include <sys/types.h>
23 #include <stdint.h>
24 #include <time.h>
25 #include <unistd.h>
26 #include <utils/Log.h>
27 #include <thread>
28 #include "AccessorImpl.h"
29 #include "Connection.h"
30 
31 namespace android {
32 namespace hardware {
33 namespace media {
34 namespace bufferpool {
35 namespace V2_0 {
36 namespace implementation {
37 
38 namespace {
39     static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
40     static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
41 
42     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
43     static constexpr size_t kMinBufferCountForEviction = 25;
44     static constexpr size_t kMaxUnusedBufferCount = 64;
45     static constexpr size_t kUnusedBufferCountTarget = kMaxUnusedBufferCount - 16;
46 
47     static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
48     static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
49 }
50 
51 // Buffer structure in bufferpool process
52 struct InternalBuffer {
53     BufferId mId;
54     size_t mOwnerCount;
55     size_t mTransactionCount;
56     const std::shared_ptr<BufferPoolAllocation> mAllocation;
57     const size_t mAllocSize;
58     const std::vector<uint8_t> mConfig;
59     bool mInvalidated;
60 
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer61     InternalBuffer(
62             BufferId id,
63             const std::shared_ptr<BufferPoolAllocation> &alloc,
64             const size_t allocSize,
65             const std::vector<uint8_t> &allocConfig)
66             : mId(id), mOwnerCount(0), mTransactionCount(0),
67             mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
68             mInvalidated(false) {}
69 
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer70     const native_handle_t *handle() {
71         return mAllocation->handle();
72     }
73 
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer74     void invalidate() {
75         mInvalidated = true;
76     }
77 };
78 
79 struct TransactionStatus {
80     TransactionId mId;
81     BufferId mBufferId;
82     ConnectionId mSender;
83     ConnectionId mReceiver;
84     BufferStatus mStatus;
85     int64_t mTimestampUs;
86     bool mSenderValidated;
87 
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus88     TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
89         mId = message.transactionId;
90         mBufferId = message.bufferId;
91         mStatus = message.newStatus;
92         mTimestampUs = timestampUs;
93         if (mStatus == BufferStatus::TRANSFER_TO) {
94             mSender = message.connectionId;
95             mReceiver = message.targetConnectionId;
96             mSenderValidated = true;
97         } else {
98             mSender = -1LL;
99             mReceiver = message.connectionId;
100             mSenderValidated = false;
101         }
102     }
103 };
104 
105 // Helper template methods for handling map of set.
106 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)107 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
108     auto iter = mapOfSet->find(key);
109     if (iter == mapOfSet->end()) {
110         std::set<U> valueSet{value};
111         mapOfSet->insert(std::make_pair(key, valueSet));
112         return true;
113     } else if (iter->second.find(value)  == iter->second.end()) {
114         iter->second.insert(value);
115         return true;
116     }
117     return false;
118 }
119 
120 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)121 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
122     bool ret = false;
123     auto iter = mapOfSet->find(key);
124     if (iter != mapOfSet->end()) {
125         if (iter->second.erase(value) > 0) {
126             ret = true;
127         }
128         if (iter->second.size() == 0) {
129             mapOfSet->erase(iter);
130         }
131     }
132     return ret;
133 }
134 
135 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)136 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
137     auto iter = mapOfSet->find(key);
138     if (iter != mapOfSet->end()) {
139         auto setIter = iter->second.find(value);
140         return setIter != iter->second.end();
141     }
142     return false;
143 }
144 
145 #ifdef __ANDROID_VNDK__
146 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
147 #else
148 static constexpr uint32_t kSeqIdVndkBit = 0;
149 #endif
150 
151 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
152 
ConnectionIdGenerator()153 Accessor::Impl::ConnectionIdGenerator::ConnectionIdGenerator() {
154     mSeqId = static_cast<uint32_t>(time(nullptr) & kSeqIdMax);
155     mPid = static_cast<int32_t>(getpid());
156 }
157 
getConnectionId()158 ConnectionId Accessor::Impl::ConnectionIdGenerator::getConnectionId() {
159     uint32_t seq;
160     {
161         std::lock_guard<std::mutex> l(mLock);
162         seq = mSeqId;
163         if (mSeqId == kSeqIdMax) {
164             mSeqId = 0;
165         } else {
166             ++mSeqId;
167         }
168     }
169     return (int64_t)mPid << 32 | seq | kSeqIdVndkBit;
170 }
171 
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)172 Accessor::Impl::Impl(
173         const std::shared_ptr<BufferPoolAllocator> &allocator)
174         : mAllocator(allocator), mScheduleEvictTs(0) {}
175 
~Impl()176 Accessor::Impl::~Impl() {
177 }
178 
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)179 ResultStatus Accessor::Impl::connect(
180         const sp<Accessor> &accessor, const sp<IObserver> &observer,
181         sp<Connection> *connection,
182         ConnectionId *pConnectionId,
183         uint32_t *pMsgId,
184         const StatusDescriptor** statusDescPtr,
185         const InvalidationDescriptor** invDescPtr) {
186     static ::android::base::NoDestructor<ConnectionIdGenerator> sConIdGenerator;
187     sp<Connection> newConnection = new Connection();
188     ResultStatus status = ResultStatus::CRITICAL_ERROR;
189     {
190         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
191         if (newConnection) {
192             int32_t pid = getpid();
193             ConnectionId id = sConIdGenerator->getConnectionId();
194             status = mBufferPool.mObserver.open(id, statusDescPtr);
195             if (status == ResultStatus::OK) {
196                 newConnection->initialize(accessor, id);
197                 *connection = newConnection;
198                 *pConnectionId = id;
199                 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
200                 mBufferPool.mConnectionIds.insert(id);
201                 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
202                 mBufferPool.mInvalidation.onConnect(id, observer);
203             }
204 
205         }
206         mBufferPool.processStatusMessages();
207         mBufferPool.cleanUp();
208         scheduleEvictIfNeeded();
209     }
210     return status;
211 }
212 
close(ConnectionId connectionId)213 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
214     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
215     ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
216     mBufferPool.processStatusMessages();
217     mBufferPool.handleClose(connectionId);
218     mBufferPool.mObserver.close(connectionId);
219     mBufferPool.mInvalidation.onClose(connectionId);
220     // Since close# will be called after all works are finished, it is OK to
221     // evict unused buffers.
222     mBufferPool.cleanUp(true);
223     scheduleEvictIfNeeded();
224     return ResultStatus::OK;
225 }
226 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)227 ResultStatus Accessor::Impl::allocate(
228         ConnectionId connectionId, const std::vector<uint8_t>& params,
229         BufferId *bufferId, const native_handle_t** handle) {
230     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
231     mBufferPool.processStatusMessages();
232     ResultStatus status = ResultStatus::OK;
233     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
234         lock.unlock();
235         std::shared_ptr<BufferPoolAllocation> alloc;
236         size_t allocSize;
237         status = mAllocator->allocate(params, &alloc, &allocSize);
238         lock.lock();
239         if (status == ResultStatus::OK) {
240             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
241         }
242         ALOGV("create a buffer %d : %u %p",
243               status == ResultStatus::OK, *bufferId, *handle);
244     }
245     if (status == ResultStatus::OK) {
246         // TODO: handle ownBuffer failure
247         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
248     }
249     mBufferPool.cleanUp();
250     scheduleEvictIfNeeded();
251     return status;
252 }
253 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)254 ResultStatus Accessor::Impl::fetch(
255         ConnectionId connectionId, TransactionId transactionId,
256         BufferId bufferId, const native_handle_t** handle) {
257     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
258     mBufferPool.processStatusMessages();
259     auto found = mBufferPool.mTransactions.find(transactionId);
260     if (found != mBufferPool.mTransactions.end() &&
261             contains(&mBufferPool.mPendingTransactions,
262                      connectionId, transactionId)) {
263         if (found->second->mSenderValidated &&
264                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
265                 found->second->mBufferId == bufferId) {
266             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
267             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
268             if (bufferIt != mBufferPool.mBuffers.end()) {
269                 mBufferPool.mStats.onBufferFetched();
270                 *handle = bufferIt->second->handle();
271                 return ResultStatus::OK;
272             }
273         }
274     }
275     mBufferPool.cleanUp();
276     scheduleEvictIfNeeded();
277     return ResultStatus::CRITICAL_ERROR;
278 }
279 
cleanUp(bool clearCache)280 void Accessor::Impl::cleanUp(bool clearCache) {
281     // transaction timeout, buffer cacheing TTL handling
282     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
283     mBufferPool.processStatusMessages();
284     mBufferPool.cleanUp(clearCache);
285 }
286 
flush()287 void Accessor::Impl::flush() {
288     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
289     mBufferPool.processStatusMessages();
290     mBufferPool.flush(shared_from_this());
291 }
292 
handleInvalidateAck()293 void Accessor::Impl::handleInvalidateAck() {
294     std::map<ConnectionId, const sp<IObserver>> observers;
295     uint32_t invalidationId;
296     {
297         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
298         mBufferPool.processStatusMessages();
299         mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
300     }
301     // Do not hold lock for send invalidations
302     size_t deadClients = 0;
303     for (auto it = observers.begin(); it != observers.end(); ++it) {
304         const sp<IObserver> observer = it->second;
305         if (observer) {
306             Return<void> transResult = observer->onMessage(it->first, invalidationId);
307             if (!transResult.isOk()) {
308                 ++deadClients;
309             }
310         }
311     }
312     if (deadClients > 0) {
313         ALOGD("During invalidation found %zu dead clients", deadClients);
314     }
315 }
316 
isValid()317 bool Accessor::Impl::isValid() {
318     return mBufferPool.isValid();
319 }
320 
BufferPool()321 Accessor::Impl::Impl::BufferPool::BufferPool()
322     : mTimestampUs(getTimestampNow()),
323       mLastCleanUpUs(mTimestampUs),
324       mLastLogUs(mTimestampUs),
325       mSeq(0),
326       mStartSeq(0) {
327     mValid = mInvalidationChannel.isValid();
328 }
329 
330 
331 // Statistics helper
332 template<typename T, typename S>
percentage(T base,S total)333 int percentage(T base, S total) {
334     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
335 }
336 
337 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
338 
~BufferPool()339 Accessor::Impl::Impl::BufferPool::~BufferPool() {
340     std::lock_guard<std::mutex> lock(mMutex);
341     ALOGD("Destruction - bufferpool2 %p "
342           "cached: %zu/%zuM, %zu/%d%% in use; "
343           "allocs: %zu, %d%% recycled; "
344           "transfers: %zu, %d%% unfetched",
345           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
346           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
347           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
348           mStats.mTotalTransfers,
349           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
350 }
351 
onConnect(ConnectionId conId,const sp<IObserver> & observer)352 void Accessor::Impl::BufferPool::Invalidation::onConnect(
353         ConnectionId conId, const sp<IObserver>& observer) {
354     mAcks[conId] = mInvalidationId; // starts from current invalidationId
355     mObservers.insert(std::make_pair(conId, observer));
356 }
357 
onClose(ConnectionId conId)358 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
359     mAcks.erase(conId);
360     mObservers.erase(conId);
361 }
362 
onAck(ConnectionId conId,uint32_t msgId)363 void Accessor::Impl::BufferPool::Invalidation::onAck(
364         ConnectionId conId,
365         uint32_t msgId) {
366     auto it = mAcks.find(conId);
367     if (it == mAcks.end()) {
368         ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
369         return;
370     }
371     if (isMessageLater(msgId, it->second)) {
372         mAcks[conId] = msgId;
373     }
374 }
375 
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)376 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
377         BufferId bufferId,
378         BufferInvalidationChannel &channel) {
379     for (auto it = mPendings.begin(); it != mPendings.end();) {
380         if (it->isInvalidated(bufferId)) {
381             uint32_t msgId = 0;
382             if (it->mNeedsAck) {
383                 msgId = ++mInvalidationId;
384                 if (msgId == 0) {
385                     // wrap happens
386                     msgId = ++mInvalidationId;
387                 }
388             }
389             channel.postInvalidation(msgId, it->mFrom, it->mTo);
390             it = mPendings.erase(it);
391             continue;
392         }
393         ++it;
394     }
395 }
396 
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)397 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
398         bool needsAck,
399         uint32_t from,
400         uint32_t to,
401         size_t left,
402         BufferInvalidationChannel &channel,
403         const std::shared_ptr<Accessor::Impl> &impl) {
404         uint32_t msgId = 0;
405     if (needsAck) {
406         msgId = ++mInvalidationId;
407         if (msgId == 0) {
408             // wrap happens
409             msgId = ++mInvalidationId;
410         }
411     }
412     ALOGV("bufferpool2 invalidation requested and queued");
413     if (left == 0) {
414         channel.postInvalidation(msgId, from, to);
415     } else {
416         // TODO: sending hint message?
417         ALOGV("bufferpoo2 invalidation requested and pending");
418         Pending pending(needsAck, from, to, left, impl);
419         mPendings.push_back(pending);
420     }
421     sInvalidator->addAccessor(mId, impl);
422 }
423 
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)424 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
425         std::map<ConnectionId, const sp<IObserver>> *observers,
426         uint32_t *invalidationId) {
427     if (mInvalidationId != 0) {
428         *invalidationId = mInvalidationId;
429         std::set<int> deads;
430         for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
431             if (it->second != mInvalidationId) {
432                 const sp<IObserver> observer = mObservers[it->first];
433                 if (observer) {
434                     observers->emplace(it->first, observer);
435                     ALOGV("connection %lld will call observer (%u: %u)",
436                           (long long)it->first, it->second, mInvalidationId);
437                     // N.B: onMessage will be called later. ignore possibility of
438                     // onMessage# oneway call being lost.
439                     it->second = mInvalidationId;
440                 } else {
441                     ALOGV("bufferpool2 observer died %lld", (long long)it->first);
442                     deads.insert(it->first);
443                 }
444             }
445         }
446         if (deads.size() > 0) {
447             for (auto it = deads.begin(); it != deads.end(); ++it) {
448                 onClose(*it);
449             }
450         }
451     }
452     if (mPendings.size() == 0) {
453         // All invalidation Ids are synced and no more pending invalidations.
454         sInvalidator->delAccessor(mId);
455     }
456 }
457 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)458 bool Accessor::Impl::BufferPool::handleOwnBuffer(
459         ConnectionId connectionId, BufferId bufferId) {
460 
461     bool added = insert(&mUsingBuffers, connectionId, bufferId);
462     if (added) {
463         auto iter = mBuffers.find(bufferId);
464         iter->second->mOwnerCount++;
465     }
466     insert(&mUsingConnections, bufferId, connectionId);
467     return added;
468 }
469 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)470 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
471         ConnectionId connectionId, BufferId bufferId) {
472     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
473     if (deleted) {
474         auto iter = mBuffers.find(bufferId);
475         iter->second->mOwnerCount--;
476         if (iter->second->mOwnerCount == 0 &&
477                 iter->second->mTransactionCount == 0) {
478             if (!iter->second->mInvalidated) {
479                 mStats.onBufferUnused(iter->second->mAllocSize);
480                 mFreeBuffers.insert(bufferId);
481             } else {
482                 mStats.onBufferUnused(iter->second->mAllocSize);
483                 mStats.onBufferEvicted(iter->second->mAllocSize);
484                 mBuffers.erase(iter);
485                 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
486             }
487         }
488     }
489     erase(&mUsingConnections, bufferId, connectionId);
490     ALOGV("release buffer %u : %d", bufferId, deleted);
491     return deleted;
492 }
493 
handleTransferTo(const BufferStatusMessage & message)494 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
495     auto completed = mCompletedTransactions.find(
496             message.transactionId);
497     if (completed != mCompletedTransactions.end()) {
498         // already completed
499         mCompletedTransactions.erase(completed);
500         return true;
501     }
502     // the buffer should exist and be owned.
503     auto bufferIter = mBuffers.find(message.bufferId);
504     if (bufferIter == mBuffers.end() ||
505             !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
506         return false;
507     }
508     auto found = mTransactions.find(message.transactionId);
509     if (found != mTransactions.end()) {
510         // transfer_from was received earlier.
511         found->second->mSender = message.connectionId;
512         found->second->mSenderValidated = true;
513         return true;
514     }
515     if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
516         // N.B: it could be fake or receive connection already closed.
517         ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
518               this, (long long)message.targetConnectionId);
519         return false;
520     }
521     mStats.onBufferSent();
522     mTransactions.insert(std::make_pair(
523             message.transactionId,
524             std::make_unique<TransactionStatus>(message, mTimestampUs)));
525     insert(&mPendingTransactions, message.targetConnectionId,
526            message.transactionId);
527     bufferIter->second->mTransactionCount++;
528     return true;
529 }
530 
handleTransferFrom(const BufferStatusMessage & message)531 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
532     auto found = mTransactions.find(message.transactionId);
533     if (found == mTransactions.end()) {
534         // TODO: is it feasible to check ownership here?
535         mStats.onBufferSent();
536         mTransactions.insert(std::make_pair(
537                 message.transactionId,
538                 std::make_unique<TransactionStatus>(message, mTimestampUs)));
539         insert(&mPendingTransactions, message.connectionId,
540                message.transactionId);
541         auto bufferIter = mBuffers.find(message.bufferId);
542         bufferIter->second->mTransactionCount++;
543     } else {
544         if (message.connectionId == found->second->mReceiver) {
545             found->second->mStatus = BufferStatus::TRANSFER_FROM;
546         }
547     }
548     return true;
549 }
550 
handleTransferResult(const BufferStatusMessage & message)551 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
552     auto found = mTransactions.find(message.transactionId);
553     if (found != mTransactions.end()) {
554         bool deleted = erase(&mPendingTransactions, message.connectionId,
555                              message.transactionId);
556         if (deleted) {
557             if (!found->second->mSenderValidated) {
558                 mCompletedTransactions.insert(message.transactionId);
559             }
560             auto bufferIter = mBuffers.find(message.bufferId);
561             if (message.newStatus == BufferStatus::TRANSFER_OK) {
562                 handleOwnBuffer(message.connectionId, message.bufferId);
563             }
564             bufferIter->second->mTransactionCount--;
565             if (bufferIter->second->mOwnerCount == 0
566                 && bufferIter->second->mTransactionCount == 0) {
567                 if (!bufferIter->second->mInvalidated) {
568                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
569                     mFreeBuffers.insert(message.bufferId);
570                 } else {
571                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
572                     mStats.onBufferEvicted(bufferIter->second->mAllocSize);
573                     mBuffers.erase(bufferIter);
574                     mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
575                 }
576             }
577             mTransactions.erase(found);
578         }
579         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
580               message.bufferId, deleted);
581         return deleted;
582     }
583     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
584           message.bufferId);
585     return false;
586 }
587 
processStatusMessages()588 void Accessor::Impl::BufferPool::processStatusMessages() {
589     std::vector<BufferStatusMessage> messages;
590     mObserver.getBufferStatusChanges(messages);
591     mTimestampUs = getTimestampNow();
592     for (BufferStatusMessage& message: messages) {
593         bool ret = false;
594         switch (message.newStatus) {
595             case BufferStatus::NOT_USED:
596                 ret = handleReleaseBuffer(
597                         message.connectionId, message.bufferId);
598                 break;
599             case BufferStatus::USED:
600                 // not happening
601                 break;
602             case BufferStatus::TRANSFER_TO:
603                 ret = handleTransferTo(message);
604                 break;
605             case BufferStatus::TRANSFER_FROM:
606                 ret = handleTransferFrom(message);
607                 break;
608             case BufferStatus::TRANSFER_TIMEOUT:
609                 // TODO
610                 break;
611             case BufferStatus::TRANSFER_LOST:
612                 // TODO
613                 break;
614             case BufferStatus::TRANSFER_FETCH:
615                 // not happening
616                 break;
617             case BufferStatus::TRANSFER_OK:
618             case BufferStatus::TRANSFER_ERROR:
619                 ret = handleTransferResult(message);
620                 break;
621             case BufferStatus::INVALIDATION_ACK:
622                 mInvalidation.onAck(message.connectionId, message.bufferId);
623                 ret = true;
624                 break;
625         }
626         if (ret == false) {
627             ALOGW("buffer status message processing failure - message : %d connection : %lld",
628                   (int)message.newStatus, (long long)message.connectionId);
629         }
630     }
631     messages.clear();
632 }
633 
handleClose(ConnectionId connectionId)634 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
635     // Cleaning buffers
636     auto buffers = mUsingBuffers.find(connectionId);
637     if (buffers != mUsingBuffers.end()) {
638         for (const BufferId& bufferId : buffers->second) {
639             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
640             if (deleted) {
641                 auto bufferIter = mBuffers.find(bufferId);
642                 bufferIter->second->mOwnerCount--;
643                 if (bufferIter->second->mOwnerCount == 0 &&
644                         bufferIter->second->mTransactionCount == 0) {
645                     // TODO: handle freebuffer insert fail
646                     if (!bufferIter->second->mInvalidated) {
647                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
648                         mFreeBuffers.insert(bufferId);
649                     } else {
650                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
651                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
652                         mBuffers.erase(bufferIter);
653                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
654                     }
655                 }
656             }
657         }
658         mUsingBuffers.erase(buffers);
659     }
660 
661     // Cleaning transactions
662     auto pending = mPendingTransactions.find(connectionId);
663     if (pending != mPendingTransactions.end()) {
664         for (const TransactionId& transactionId : pending->second) {
665             auto iter = mTransactions.find(transactionId);
666             if (iter != mTransactions.end()) {
667                 if (!iter->second->mSenderValidated) {
668                     mCompletedTransactions.insert(transactionId);
669                 }
670                 BufferId bufferId = iter->second->mBufferId;
671                 auto bufferIter = mBuffers.find(bufferId);
672                 bufferIter->second->mTransactionCount--;
673                 if (bufferIter->second->mOwnerCount == 0 &&
674                     bufferIter->second->mTransactionCount == 0) {
675                     // TODO: handle freebuffer insert fail
676                     if (!bufferIter->second->mInvalidated) {
677                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
678                         mFreeBuffers.insert(bufferId);
679                     } else {
680                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
681                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
682                         mBuffers.erase(bufferIter);
683                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
684                     }
685                 }
686                 mTransactions.erase(iter);
687             }
688         }
689     }
690     mConnectionIds.erase(connectionId);
691     return true;
692 }
693 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)694 bool Accessor::Impl::BufferPool::getFreeBuffer(
695         const std::shared_ptr<BufferPoolAllocator> &allocator,
696         const std::vector<uint8_t> &params, BufferId *pId,
697         const native_handle_t** handle) {
698     auto bufferIt = mFreeBuffers.begin();
699     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
700         BufferId bufferId = *bufferIt;
701         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
702             break;
703         }
704     }
705     if (bufferIt != mFreeBuffers.end()) {
706         BufferId id = *bufferIt;
707         mFreeBuffers.erase(bufferIt);
708         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
709         *handle = mBuffers[id]->handle();
710         *pId = id;
711         ALOGV("recycle a buffer %u %p", id, *handle);
712         return true;
713     }
714     return false;
715 }
716 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)717 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
718         const std::shared_ptr<BufferPoolAllocation> &alloc,
719         const size_t allocSize,
720         const std::vector<uint8_t> &params,
721         BufferId *pId,
722         const native_handle_t** handle) {
723 
724     BufferId bufferId = mSeq++;
725     if (mSeq == Connection::SYNC_BUFFERID) {
726         mSeq = 0;
727     }
728     std::unique_ptr<InternalBuffer> buffer =
729             std::make_unique<InternalBuffer>(
730                     bufferId, alloc, allocSize, params);
731     if (buffer) {
732         auto res = mBuffers.insert(std::make_pair(
733                 bufferId, std::move(buffer)));
734         if (res.second) {
735             mStats.onBufferAllocated(allocSize);
736             *handle = alloc->handle();
737             *pId = bufferId;
738             return ResultStatus::OK;
739         }
740     }
741     return ResultStatus::NO_MEMORY;
742 }
743 
cleanUp(bool clearCache)744 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
745     if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs ||
746             mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
747         mLastCleanUpUs = mTimestampUs;
748         if (mTimestampUs > mLastLogUs + kLogDurationUs ||
749                 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
750             mLastLogUs = mTimestampUs;
751             ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
752                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
753                   "%zu/%zu (fetch/transfer)",
754                   this, mStats.mBuffersCached, mStats.mSizeCached,
755                   mStats.mBuffersInUse, mStats.mSizeInUse,
756                   mStats.mTotalRecycles, mStats.mTotalAllocations,
757                   mStats.mTotalFetches, mStats.mTotalTransfers);
758         }
759         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
760             if (!clearCache && mStats.buffersNotInUse() <= kUnusedBufferCountTarget &&
761                     (mStats.mSizeCached < kMinAllocBytesForEviction ||
762                      mBuffers.size() < kMinBufferCountForEviction)) {
763                 break;
764             }
765             auto it = mBuffers.find(*freeIt);
766             if (it != mBuffers.end() &&
767                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
768                 mStats.onBufferEvicted(it->second->mAllocSize);
769                 mBuffers.erase(it);
770                 freeIt = mFreeBuffers.erase(freeIt);
771             } else {
772                 ++freeIt;
773                 ALOGW("bufferpool2 inconsistent!");
774             }
775         }
776     }
777 }
778 
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)779 void Accessor::Impl::BufferPool::invalidate(
780         bool needsAck, BufferId from, BufferId to,
781         const std::shared_ptr<Accessor::Impl> &impl) {
782     for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
783         if (isBufferInRange(from, to, *freeIt)) {
784             auto it = mBuffers.find(*freeIt);
785             if (it != mBuffers.end() &&
786                 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
787                 mStats.onBufferEvicted(it->second->mAllocSize);
788                 mBuffers.erase(it);
789                 freeIt = mFreeBuffers.erase(freeIt);
790                 continue;
791             } else {
792                 ALOGW("bufferpool2 inconsistent!");
793             }
794         }
795         ++freeIt;
796     }
797 
798     size_t left = 0;
799     for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
800         if (isBufferInRange(from, to, it->first)) {
801             it->second->invalidate();
802             ++left;
803         }
804     }
805     mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
806 }
807 
flush(const std::shared_ptr<Accessor::Impl> & impl)808 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
809     BufferId from = mStartSeq;
810     BufferId to = mSeq;
811     mStartSeq = mSeq;
812     // TODO: needsAck params
813     ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
814     if (from != to) {
815         invalidate(true, from, to, impl);
816     }
817 }
818 
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)819 void Accessor::Impl::invalidatorThread(
820             std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
821             std::mutex &mutex,
822             std::condition_variable &cv,
823             bool &ready) {
824     constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
825     constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
826     constexpr useconds_t MAX_SLEEP_US = 10000;
827     uint32_t numSpin = 0;
828     useconds_t sleepUs = 1;
829 
830     while(true) {
831         std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
832         {
833             std::unique_lock<std::mutex> lock(mutex);
834             if (!ready) {
835                 numSpin = 0;
836                 sleepUs = 1;
837                 cv.wait(lock);
838             }
839             copied.insert(accessors.begin(), accessors.end());
840         }
841         std::list<ConnectionId> erased;
842         for (auto it = copied.begin(); it != copied.end(); ++it) {
843             const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
844             if (!impl) {
845                 erased.push_back(it->first);
846             } else {
847                 impl->handleInvalidateAck();
848             }
849         }
850         {
851             std::unique_lock<std::mutex> lock(mutex);
852             for (auto it = erased.begin(); it != erased.end(); ++it) {
853                 accessors.erase(*it);
854             }
855             if (accessors.size() == 0) {
856                 ready = false;
857             } else {
858                 // TODO Use an efficient way to wait over FMQ.
859                 // N.B. Since there is not a efficient way to wait over FMQ,
860                 // polling over the FMQ is the current way to prevent draining
861                 // CPU.
862                 lock.unlock();
863                 ++numSpin;
864                 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
865                     sleepUs < MAX_SLEEP_US) {
866                     sleepUs *= 10;
867                 }
868                 if (numSpin % NUM_SPIN_TO_LOG == 0) {
869                     ALOGW("invalidator thread spinning");
870                 }
871                 ::usleep(sleepUs);
872             }
873         }
874     }
875 }
876 
AccessorInvalidator()877 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
878     std::thread invalidator(
879             invalidatorThread,
880             std::ref(mAccessors),
881             std::ref(mMutex),
882             std::ref(mCv),
883             std::ref(mReady));
884     invalidator.detach();
885 }
886 
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)887 void Accessor::Impl::AccessorInvalidator::addAccessor(
888         uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
889     bool notify = false;
890     std::unique_lock<std::mutex> lock(mMutex);
891     if (mAccessors.find(accessorId) == mAccessors.end()) {
892         if (!mReady) {
893             mReady = true;
894             notify = true;
895         }
896         mAccessors.insert(std::make_pair(accessorId, impl));
897         ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
898     }
899     lock.unlock();
900     if (notify) {
901         mCv.notify_one();
902     }
903 }
904 
delAccessor(uint32_t accessorId)905 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
906     std::lock_guard<std::mutex> lock(mMutex);
907     mAccessors.erase(accessorId);
908     ALOGV("buffer invalidation deleted bp:%u", accessorId);
909     if (mAccessors.size() == 0) {
910         mReady = false;
911     }
912 }
913 
914 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
915 
createInvalidator()916 void Accessor::Impl::createInvalidator() {
917     if (!sInvalidator) {
918         sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
919     }
920 }
921 
evictorThread(std::map<const std::weak_ptr<Accessor::Impl>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)922 void Accessor::Impl::evictorThread(
923         std::map<const std::weak_ptr<Accessor::Impl>, nsecs_t, std::owner_less<>> &accessors,
924         std::mutex &mutex,
925         std::condition_variable &cv) {
926     std::list<const std::weak_ptr<Accessor::Impl>> evictList;
927     while (true) {
928         int expired = 0;
929         int evicted = 0;
930         {
931             nsecs_t now = systemTime();
932             std::unique_lock<std::mutex> lock(mutex);
933             if (accessors.size() == 0) {
934                 cv.wait(lock);
935             }
936             auto it = accessors.begin();
937             while (it != accessors.end()) {
938                 if (now > (it->second + kEvictDurationNs)) {
939                     ++expired;
940                     evictList.push_back(it->first);
941                     it = accessors.erase(it);
942                 } else {
943                     ++it;
944                 }
945             }
946         }
947         // evict idle accessors;
948         for (auto it = evictList.begin(); it != evictList.end(); ++it) {
949             const std::shared_ptr<Accessor::Impl> accessor = it->lock();
950             if (accessor) {
951                 accessor->cleanUp(true);
952                 ++evicted;
953             }
954         }
955         if (expired > 0) {
956             ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
957         }
958         evictList.clear();
959         ::usleep(kEvictGranularityNs / 1000);
960     }
961 }
962 
AccessorEvictor()963 Accessor::Impl::AccessorEvictor::AccessorEvictor() {
964     std::thread evictor(
965             evictorThread,
966             std::ref(mAccessors),
967             std::ref(mMutex),
968             std::ref(mCv));
969     evictor.detach();
970 }
971 
addAccessor(const std::weak_ptr<Accessor::Impl> & impl,nsecs_t ts)972 void Accessor::Impl::AccessorEvictor::addAccessor(
973         const std::weak_ptr<Accessor::Impl> &impl, nsecs_t ts) {
974     std::lock_guard<std::mutex> lock(mMutex);
975     bool notify = mAccessors.empty();
976     auto it = mAccessors.find(impl);
977     if (it == mAccessors.end()) {
978         mAccessors.emplace(impl, ts);
979     } else {
980         it->second = ts;
981     }
982     if (notify) {
983         mCv.notify_one();
984     }
985 }
986 
987 std::unique_ptr<Accessor::Impl::AccessorEvictor> Accessor::Impl::sEvictor;
988 
createEvictor()989 void Accessor::Impl::createEvictor() {
990     if (!sEvictor) {
991         sEvictor = std::make_unique<Accessor::Impl::AccessorEvictor>();
992     }
993 }
994 
scheduleEvictIfNeeded()995 void Accessor::Impl::scheduleEvictIfNeeded() {
996     nsecs_t now = systemTime();
997 
998     if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
999         mScheduleEvictTs = now;
1000         sEvictor->addAccessor(shared_from_this(), now);
1001     }
1002 }
1003 
1004 }  // namespace implementation
1005 }  // namespace V2_0
1006 }  // namespace bufferpool
1007 }  // namespace media
1008 }  // namespace hardware
1009 }  // namespace android
1010