• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolAccessor1.0"
18 //#define LOG_NDEBUG 0
19 
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include "AccessorImpl.h"
26 #include "Connection.h"
27 
28 namespace android {
29 namespace hardware {
30 namespace media {
31 namespace bufferpool {
32 namespace V1_0 {
33 namespace implementation {
34 
35 namespace {
36     static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37     static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38 
39     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40     static constexpr size_t kMinBufferCountForEviction = 40;
41 }
42 
43 // Buffer structure in bufferpool process
44 struct InternalBuffer {
45     BufferId mId;
46     size_t mOwnerCount;
47     size_t mTransactionCount;
48     const std::shared_ptr<BufferPoolAllocation> mAllocation;
49     const size_t mAllocSize;
50     const std::vector<uint8_t> mConfig;
51 
InternalBufferandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer52     InternalBuffer(
53             BufferId id,
54             const std::shared_ptr<BufferPoolAllocation> &alloc,
55             const size_t allocSize,
56             const std::vector<uint8_t> &allocConfig)
57             : mId(id), mOwnerCount(0), mTransactionCount(0),
58             mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
59 
handleandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer60     const native_handle_t *handle() {
61         return mAllocation->handle();
62     }
63 };
64 
65 struct TransactionStatus {
66     TransactionId mId;
67     BufferId mBufferId;
68     ConnectionId mSender;
69     ConnectionId mReceiver;
70     BufferStatus mStatus;
71     int64_t mTimestampUs;
72     bool mSenderValidated;
73 
TransactionStatusandroid::hardware::media::bufferpool::V1_0::implementation::TransactionStatus74     TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
75         mId = message.transactionId;
76         mBufferId = message.bufferId;
77         mStatus = message.newStatus;
78         mTimestampUs = timestampUs;
79         if (mStatus == BufferStatus::TRANSFER_TO) {
80             mSender = message.connectionId;
81             mReceiver = message.targetConnectionId;
82             mSenderValidated = true;
83         } else {
84             mSender = -1LL;
85             mReceiver = message.connectionId;
86             mSenderValidated = false;
87         }
88     }
89 };
90 
91 // Helper template methods for handling map of set.
92 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)93 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
94     auto iter = mapOfSet->find(key);
95     if (iter == mapOfSet->end()) {
96         std::set<U> valueSet{value};
97         mapOfSet->insert(std::make_pair(key, valueSet));
98         return true;
99     } else if (iter->second.find(value)  == iter->second.end()) {
100         iter->second.insert(value);
101         return true;
102     }
103     return false;
104 }
105 
106 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)107 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
108     bool ret = false;
109     auto iter = mapOfSet->find(key);
110     if (iter != mapOfSet->end()) {
111         if (iter->second.erase(value) > 0) {
112             ret = true;
113         }
114         if (iter->second.size() == 0) {
115             mapOfSet->erase(iter);
116         }
117     }
118     return ret;
119 }
120 
121 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)122 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
123     auto iter = mapOfSet->find(key);
124     if (iter != mapOfSet->end()) {
125         auto setIter = iter->second.find(value);
126         return setIter != iter->second.end();
127     }
128     return false;
129 }
130 
131 uint32_t Accessor::Impl::sSeqId = time(nullptr);
132 
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)133 Accessor::Impl::Impl(
134         const std::shared_ptr<BufferPoolAllocator> &allocator)
135         : mAllocator(allocator) {}
136 
~Impl()137 Accessor::Impl::~Impl() {
138 }
139 
connect(const sp<Accessor> & accessor,sp<Connection> * connection,ConnectionId * pConnectionId,const QueueDescriptor ** fmqDescPtr)140 ResultStatus Accessor::Impl::connect(
141         const sp<Accessor> &accessor, sp<Connection> *connection,
142         ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) {
143     sp<Connection> newConnection = new Connection();
144     ResultStatus status = ResultStatus::CRITICAL_ERROR;
145     {
146         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
147         if (newConnection) {
148             int32_t pid = getpid();
149             ConnectionId id = (int64_t)pid << 32 | sSeqId;
150             status = mBufferPool.mObserver.open(id, fmqDescPtr);
151             if (status == ResultStatus::OK) {
152                 newConnection->initialize(accessor, id);
153                 *connection = newConnection;
154                 *pConnectionId = id;
155                 mBufferPool.mConnectionIds.insert(id);
156                 if (sSeqId == UINT32_MAX) {
157                    sSeqId = 0;
158                 } else {
159                     ++sSeqId;
160                 }
161             }
162         }
163         mBufferPool.processStatusMessages();
164         mBufferPool.cleanUp();
165     }
166     return status;
167 }
168 
close(ConnectionId connectionId)169 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
170     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
171     mBufferPool.processStatusMessages();
172     mBufferPool.handleClose(connectionId);
173     mBufferPool.mObserver.close(connectionId);
174     // Since close# will be called after all works are finished, it is OK to
175     // evict unused buffers.
176     mBufferPool.cleanUp(true);
177     return ResultStatus::OK;
178 }
179 
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)180 ResultStatus Accessor::Impl::allocate(
181         ConnectionId connectionId, const std::vector<uint8_t>& params,
182         BufferId *bufferId, const native_handle_t** handle) {
183     std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
184     mBufferPool.processStatusMessages();
185     ResultStatus status = ResultStatus::OK;
186     if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
187         lock.unlock();
188         std::shared_ptr<BufferPoolAllocation> alloc;
189         size_t allocSize;
190         status = mAllocator->allocate(params, &alloc, &allocSize);
191         lock.lock();
192         if (status == ResultStatus::OK) {
193             status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
194         }
195         ALOGV("create a buffer %d : %u %p",
196               status == ResultStatus::OK, *bufferId, *handle);
197     }
198     if (status == ResultStatus::OK) {
199         // TODO: handle ownBuffer failure
200         mBufferPool.handleOwnBuffer(connectionId, *bufferId);
201     }
202     mBufferPool.cleanUp();
203     return status;
204 }
205 
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)206 ResultStatus Accessor::Impl::fetch(
207         ConnectionId connectionId, TransactionId transactionId,
208         BufferId bufferId, const native_handle_t** handle) {
209     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
210     mBufferPool.processStatusMessages();
211     auto found = mBufferPool.mTransactions.find(transactionId);
212     if (found != mBufferPool.mTransactions.end() &&
213             contains(&mBufferPool.mPendingTransactions,
214                      connectionId, transactionId)) {
215         if (found->second->mSenderValidated &&
216                 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
217                 found->second->mBufferId == bufferId) {
218             found->second->mStatus = BufferStatus::TRANSFER_FETCH;
219             auto bufferIt = mBufferPool.mBuffers.find(bufferId);
220             if (bufferIt != mBufferPool.mBuffers.end()) {
221                 mBufferPool.mStats.onBufferFetched();
222                 *handle = bufferIt->second->handle();
223                 return ResultStatus::OK;
224             }
225         }
226     }
227     mBufferPool.cleanUp();
228     return ResultStatus::CRITICAL_ERROR;
229 }
230 
cleanUp(bool clearCache)231 void Accessor::Impl::cleanUp(bool clearCache) {
232     // transaction timeout, buffer cacheing TTL handling
233     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
234     mBufferPool.processStatusMessages();
235     mBufferPool.cleanUp(clearCache);
236 }
237 
BufferPool()238 Accessor::Impl::Impl::BufferPool::BufferPool()
239     : mTimestampUs(getTimestampNow()),
240       mLastCleanUpUs(mTimestampUs),
241       mLastLogUs(mTimestampUs),
242       mSeq(0) {}
243 
244 
245 // Statistics helper
246 template<typename T, typename S>
percentage(T base,S total)247 int percentage(T base, S total) {
248     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
249 }
250 
~BufferPool()251 Accessor::Impl::Impl::BufferPool::~BufferPool() {
252     std::lock_guard<std::mutex> lock(mMutex);
253     ALOGD("Destruction - bufferpool %p "
254           "cached: %zu/%zuM, %zu/%d%% in use; "
255           "allocs: %zu, %d%% recycled; "
256           "transfers: %zu, %d%% unfetched",
257           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
258           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
259           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
260           mStats.mTotalTransfers,
261           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
262 }
263 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)264 bool Accessor::Impl::BufferPool::handleOwnBuffer(
265         ConnectionId connectionId, BufferId bufferId) {
266 
267     bool added = insert(&mUsingBuffers, connectionId, bufferId);
268     if (added) {
269         auto iter = mBuffers.find(bufferId);
270         iter->second->mOwnerCount++;
271     }
272     insert(&mUsingConnections, bufferId, connectionId);
273     return added;
274 }
275 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)276 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
277         ConnectionId connectionId, BufferId bufferId) {
278     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
279     if (deleted) {
280         auto iter = mBuffers.find(bufferId);
281         iter->second->mOwnerCount--;
282         if (iter->second->mOwnerCount == 0 &&
283                 iter->second->mTransactionCount == 0) {
284             mStats.onBufferUnused(iter->second->mAllocSize);
285             mFreeBuffers.insert(bufferId);
286         }
287     }
288     erase(&mUsingConnections, bufferId, connectionId);
289     ALOGV("release buffer %u : %d", bufferId, deleted);
290     return deleted;
291 }
292 
handleTransferTo(const BufferStatusMessage & message)293 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
294     auto completed = mCompletedTransactions.find(
295             message.transactionId);
296     if (completed != mCompletedTransactions.end()) {
297         // already completed
298         mCompletedTransactions.erase(completed);
299         return true;
300     }
301     // the buffer should exist and be owned.
302     auto bufferIter = mBuffers.find(message.bufferId);
303     if (bufferIter == mBuffers.end() ||
304             !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
305         return false;
306     }
307     auto found = mTransactions.find(message.transactionId);
308     if (found != mTransactions.end()) {
309         // transfer_from was received earlier.
310         found->second->mSender = message.connectionId;
311         found->second->mSenderValidated = true;
312         return true;
313     }
314     if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
315         // N.B: it could be fake or receive connection already closed.
316         ALOGD("bufferpool %p receiver connection %lld is no longer valid",
317               this, (long long)message.targetConnectionId);
318         return false;
319     }
320     mStats.onBufferSent();
321     mTransactions.insert(std::make_pair(
322             message.transactionId,
323             std::make_unique<TransactionStatus>(message, mTimestampUs)));
324     insert(&mPendingTransactions, message.targetConnectionId,
325            message.transactionId);
326     bufferIter->second->mTransactionCount++;
327     return true;
328 }
329 
handleTransferFrom(const BufferStatusMessage & message)330 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
331     auto found = mTransactions.find(message.transactionId);
332     if (found == mTransactions.end()) {
333         // TODO: is it feasible to check ownership here?
334         mStats.onBufferSent();
335         mTransactions.insert(std::make_pair(
336                 message.transactionId,
337                 std::make_unique<TransactionStatus>(message, mTimestampUs)));
338         insert(&mPendingTransactions, message.connectionId,
339                message.transactionId);
340         auto bufferIter = mBuffers.find(message.bufferId);
341         bufferIter->second->mTransactionCount++;
342     } else {
343         if (message.connectionId == found->second->mReceiver) {
344             found->second->mStatus = BufferStatus::TRANSFER_FROM;
345         }
346     }
347     return true;
348 }
349 
handleTransferResult(const BufferStatusMessage & message)350 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
351     auto found = mTransactions.find(message.transactionId);
352     if (found != mTransactions.end()) {
353         bool deleted = erase(&mPendingTransactions, message.connectionId,
354                              message.transactionId);
355         if (deleted) {
356             if (!found->second->mSenderValidated) {
357                 mCompletedTransactions.insert(message.transactionId);
358             }
359             auto bufferIter = mBuffers.find(message.bufferId);
360             if (message.newStatus == BufferStatus::TRANSFER_OK) {
361                 handleOwnBuffer(message.connectionId, message.bufferId);
362             }
363             bufferIter->second->mTransactionCount--;
364             if (bufferIter->second->mOwnerCount == 0
365                 && bufferIter->second->mTransactionCount == 0) {
366                 mStats.onBufferUnused(bufferIter->second->mAllocSize);
367                 mFreeBuffers.insert(message.bufferId);
368             }
369             mTransactions.erase(found);
370         }
371         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
372               message.bufferId, deleted);
373         return deleted;
374     }
375     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
376           message.bufferId);
377     return false;
378 }
379 
processStatusMessages()380 void Accessor::Impl::BufferPool::processStatusMessages() {
381     std::vector<BufferStatusMessage> messages;
382     mObserver.getBufferStatusChanges(messages);
383     mTimestampUs = getTimestampNow();
384     for (BufferStatusMessage& message: messages) {
385         bool ret = false;
386         switch (message.newStatus) {
387             case BufferStatus::NOT_USED:
388                 ret = handleReleaseBuffer(
389                         message.connectionId, message.bufferId);
390                 break;
391             case BufferStatus::USED:
392                 // not happening
393                 break;
394             case BufferStatus::TRANSFER_TO:
395                 ret = handleTransferTo(message);
396                 break;
397             case BufferStatus::TRANSFER_FROM:
398                 ret = handleTransferFrom(message);
399                 break;
400             case BufferStatus::TRANSFER_TIMEOUT:
401                 // TODO
402                 break;
403             case BufferStatus::TRANSFER_LOST:
404                 // TODO
405                 break;
406             case BufferStatus::TRANSFER_FETCH:
407                 // not happening
408                 break;
409             case BufferStatus::TRANSFER_OK:
410             case BufferStatus::TRANSFER_ERROR:
411                 ret = handleTransferResult(message);
412                 break;
413         }
414         if (ret == false) {
415             ALOGW("buffer status message processing failure - message : %d connection : %lld",
416                   message.newStatus, (long long)message.connectionId);
417         }
418     }
419     messages.clear();
420 }
421 
handleClose(ConnectionId connectionId)422 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
423     // Cleaning buffers
424     auto buffers = mUsingBuffers.find(connectionId);
425     if (buffers != mUsingBuffers.end()) {
426         for (const BufferId& bufferId : buffers->second) {
427             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
428             if (deleted) {
429                 auto bufferIter = mBuffers.find(bufferId);
430                 bufferIter->second->mOwnerCount--;
431                 if (bufferIter->second->mOwnerCount == 0 &&
432                         bufferIter->second->mTransactionCount == 0) {
433                     // TODO: handle freebuffer insert fail
434                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
435                     mFreeBuffers.insert(bufferId);
436                 }
437             }
438         }
439         mUsingBuffers.erase(buffers);
440     }
441 
442     // Cleaning transactions
443     auto pending = mPendingTransactions.find(connectionId);
444     if (pending != mPendingTransactions.end()) {
445         for (const TransactionId& transactionId : pending->second) {
446             auto iter = mTransactions.find(transactionId);
447             if (iter != mTransactions.end()) {
448                 if (!iter->second->mSenderValidated) {
449                     mCompletedTransactions.insert(transactionId);
450                 }
451                 BufferId bufferId = iter->second->mBufferId;
452                 auto bufferIter = mBuffers.find(bufferId);
453                 bufferIter->second->mTransactionCount--;
454                 if (bufferIter->second->mOwnerCount == 0 &&
455                     bufferIter->second->mTransactionCount == 0) {
456                     // TODO: handle freebuffer insert fail
457                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
458                     mFreeBuffers.insert(bufferId);
459                 }
460                 mTransactions.erase(iter);
461             }
462         }
463     }
464     mConnectionIds.erase(connectionId);
465     return true;
466 }
467 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)468 bool Accessor::Impl::BufferPool::getFreeBuffer(
469         const std::shared_ptr<BufferPoolAllocator> &allocator,
470         const std::vector<uint8_t> &params, BufferId *pId,
471         const native_handle_t** handle) {
472     auto bufferIt = mFreeBuffers.begin();
473     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
474         BufferId bufferId = *bufferIt;
475         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
476             break;
477         }
478     }
479     if (bufferIt != mFreeBuffers.end()) {
480         BufferId id = *bufferIt;
481         mFreeBuffers.erase(bufferIt);
482         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
483         *handle = mBuffers[id]->handle();
484         *pId = id;
485         ALOGV("recycle a buffer %u %p", id, *handle);
486         return true;
487     }
488     return false;
489 }
490 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)491 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
492         const std::shared_ptr<BufferPoolAllocation> &alloc,
493         const size_t allocSize,
494         const std::vector<uint8_t> &params,
495         BufferId *pId,
496         const native_handle_t** handle) {
497 
498     BufferId bufferId = mSeq++;
499     if (mSeq == Connection::SYNC_BUFFERID) {
500         mSeq = 0;
501     }
502     std::unique_ptr<InternalBuffer> buffer =
503             std::make_unique<InternalBuffer>(
504                     bufferId, alloc, allocSize, params);
505     if (buffer) {
506         auto res = mBuffers.insert(std::make_pair(
507                 bufferId, std::move(buffer)));
508         if (res.second) {
509             mStats.onBufferAllocated(allocSize);
510             *handle = alloc->handle();
511             *pId = bufferId;
512             return ResultStatus::OK;
513         }
514     }
515     return ResultStatus::NO_MEMORY;
516 }
517 
cleanUp(bool clearCache)518 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
519     if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
520         mLastCleanUpUs = mTimestampUs;
521         if (mTimestampUs > mLastLogUs + kLogDurationUs) {
522             mLastLogUs = mTimestampUs;
523             ALOGD("bufferpool %p : %zu(%zu size) total buffers - "
524                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
525                   "%zu/%zu (fetch/transfer)",
526                   this, mStats.mBuffersCached, mStats.mSizeCached,
527                   mStats.mBuffersInUse, mStats.mSizeInUse,
528                   mStats.mTotalRecycles, mStats.mTotalAllocations,
529                   mStats.mTotalFetches, mStats.mTotalTransfers);
530         }
531         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
532             if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
533                     && mBuffers.size() < kMinBufferCountForEviction) {
534                 break;
535             }
536             auto it = mBuffers.find(*freeIt);
537             if (it != mBuffers.end() &&
538                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
539                 mStats.onBufferEvicted(it->second->mAllocSize);
540                 mBuffers.erase(it);
541                 freeIt = mFreeBuffers.erase(freeIt);
542             } else {
543                 ++freeIt;
544                 ALOGW("bufferpool inconsistent!");
545             }
546         }
547     }
548 }
549 
550 }  // namespace implementation
551 }  // namespace V1_0
552 }  // namespace bufferpool
553 }  // namespace media
554 }  // namespace hardware
555 }  // namespace android
556