• Home
  • History
  • Annotate
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1  /*
2   * Copyright (C) 2018 The Android Open Source Project
3   *
4   * Licensed under the Apache License, Version 2.0 (the "License");
5   * you may not use this file except in compliance with the License.
6   * You may obtain a copy of the License at
7   *
8   *      http://www.apache.org/licenses/LICENSE-2.0
9   *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  #define LOG_TAG "BufferPoolAccessor1.0"
18  //#define LOG_NDEBUG 0
19  
20  #include <sys/types.h>
21  #include <stdint.h>
22  #include <time.h>
23  #include <unistd.h>
24  #include <utils/Log.h>
25  #include "AccessorImpl.h"
26  #include "Connection.h"
27  
28  namespace android {
29  namespace hardware {
30  namespace media {
31  namespace bufferpool {
32  namespace V1_0 {
33  namespace implementation {
34  
35  namespace {
36      static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37      static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38  
39      static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40      static constexpr size_t kMinBufferCountForEviction = 40;
41  }
42  
43  // Buffer structure in bufferpool process
44  struct InternalBuffer {
45      BufferId mId;
46      size_t mOwnerCount;
47      size_t mTransactionCount;
48      const std::shared_ptr<BufferPoolAllocation> mAllocation;
49      const size_t mAllocSize;
50      const std::vector<uint8_t> mConfig;
51  
InternalBufferandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer52      InternalBuffer(
53              BufferId id,
54              const std::shared_ptr<BufferPoolAllocation> &alloc,
55              const size_t allocSize,
56              const std::vector<uint8_t> &allocConfig)
57              : mId(id), mOwnerCount(0), mTransactionCount(0),
58              mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
59  
handleandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer60      const native_handle_t *handle() {
61          return mAllocation->handle();
62      }
63  };
64  
65  struct TransactionStatus {
66      TransactionId mId;
67      BufferId mBufferId;
68      ConnectionId mSender;
69      ConnectionId mReceiver;
70      BufferStatus mStatus;
71      int64_t mTimestampUs;
72      bool mSenderValidated;
73  
TransactionStatusandroid::hardware::media::bufferpool::V1_0::implementation::TransactionStatus74      TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
75          mId = message.transactionId;
76          mBufferId = message.bufferId;
77          mStatus = message.newStatus;
78          mTimestampUs = timestampUs;
79          if (mStatus == BufferStatus::TRANSFER_TO) {
80              mSender = message.connectionId;
81              mReceiver = message.targetConnectionId;
82              mSenderValidated = true;
83          } else {
84              mSender = -1LL;
85              mReceiver = message.connectionId;
86              mSenderValidated = false;
87          }
88      }
89  };
90  
91  // Helper template methods for handling map of set.
92  template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)93  bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
94      auto iter = mapOfSet->find(key);
95      if (iter == mapOfSet->end()) {
96          std::set<U> valueSet{value};
97          mapOfSet->insert(std::make_pair(key, valueSet));
98          return true;
99      } else if (iter->second.find(value)  == iter->second.end()) {
100          iter->second.insert(value);
101          return true;
102      }
103      return false;
104  }
105  
106  template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)107  bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
108      bool ret = false;
109      auto iter = mapOfSet->find(key);
110      if (iter != mapOfSet->end()) {
111          if (iter->second.erase(value) > 0) {
112              ret = true;
113          }
114          if (iter->second.size() == 0) {
115              mapOfSet->erase(iter);
116          }
117      }
118      return ret;
119  }
120  
121  template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)122  bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
123      auto iter = mapOfSet->find(key);
124      if (iter != mapOfSet->end()) {
125          auto setIter = iter->second.find(value);
126          return setIter != iter->second.end();
127      }
128      return false;
129  }
130  
131  uint32_t Accessor::Impl::sSeqId = time(nullptr);
132  
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)133  Accessor::Impl::Impl(
134          const std::shared_ptr<BufferPoolAllocator> &allocator)
135          : mAllocator(allocator) {}
136  
~Impl()137  Accessor::Impl::~Impl() {
138  }
139  
connect(const sp<Accessor> & accessor,sp<Connection> * connection,ConnectionId * pConnectionId,const QueueDescriptor ** fmqDescPtr)140  ResultStatus Accessor::Impl::connect(
141          const sp<Accessor> &accessor, sp<Connection> *connection,
142          ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) {
143      sp<Connection> newConnection = new Connection();
144      ResultStatus status = ResultStatus::CRITICAL_ERROR;
145      {
146          std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
147          if (newConnection) {
148              int32_t pid = getpid();
149              ConnectionId id = (int64_t)pid << 32 | sSeqId;
150              status = mBufferPool.mObserver.open(id, fmqDescPtr);
151              if (status == ResultStatus::OK) {
152                  newConnection->initialize(accessor, id);
153                  *connection = newConnection;
154                  *pConnectionId = id;
155                  mBufferPool.mConnectionIds.insert(id);
156                  if (sSeqId == UINT32_MAX) {
157                     sSeqId = 0;
158                  } else {
159                      ++sSeqId;
160                  }
161              }
162          }
163          mBufferPool.processStatusMessages();
164          mBufferPool.cleanUp();
165      }
166      return status;
167  }
168  
close(ConnectionId connectionId)169  ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
170      std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
171      mBufferPool.processStatusMessages();
172      mBufferPool.handleClose(connectionId);
173      mBufferPool.mObserver.close(connectionId);
174      // Since close# will be called after all works are finished, it is OK to
175      // evict unused buffers.
176      mBufferPool.cleanUp(true);
177      return ResultStatus::OK;
178  }
179  
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)180  ResultStatus Accessor::Impl::allocate(
181          ConnectionId connectionId, const std::vector<uint8_t>& params,
182          BufferId *bufferId, const native_handle_t** handle) {
183      std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
184      mBufferPool.processStatusMessages();
185      ResultStatus status = ResultStatus::OK;
186      if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
187          lock.unlock();
188          std::shared_ptr<BufferPoolAllocation> alloc;
189          size_t allocSize;
190          status = mAllocator->allocate(params, &alloc, &allocSize);
191          lock.lock();
192          if (status == ResultStatus::OK) {
193              status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
194          }
195          ALOGV("create a buffer %d : %u %p",
196                status == ResultStatus::OK, *bufferId, *handle);
197      }
198      if (status == ResultStatus::OK) {
199          // TODO: handle ownBuffer failure
200          mBufferPool.handleOwnBuffer(connectionId, *bufferId);
201      }
202      mBufferPool.cleanUp();
203      return status;
204  }
205  
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)206  ResultStatus Accessor::Impl::fetch(
207          ConnectionId connectionId, TransactionId transactionId,
208          BufferId bufferId, const native_handle_t** handle) {
209      std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
210      mBufferPool.processStatusMessages();
211      auto found = mBufferPool.mTransactions.find(transactionId);
212      if (found != mBufferPool.mTransactions.end() &&
213              contains(&mBufferPool.mPendingTransactions,
214                       connectionId, transactionId)) {
215          if (found->second->mSenderValidated &&
216                  found->second->mStatus == BufferStatus::TRANSFER_FROM &&
217                  found->second->mBufferId == bufferId) {
218              found->second->mStatus = BufferStatus::TRANSFER_FETCH;
219              auto bufferIt = mBufferPool.mBuffers.find(bufferId);
220              if (bufferIt != mBufferPool.mBuffers.end()) {
221                  mBufferPool.mStats.onBufferFetched();
222                  *handle = bufferIt->second->handle();
223                  return ResultStatus::OK;
224              }
225          }
226      }
227      mBufferPool.cleanUp();
228      return ResultStatus::CRITICAL_ERROR;
229  }
230  
cleanUp(bool clearCache)231  void Accessor::Impl::cleanUp(bool clearCache) {
232      // transaction timeout, buffer cacheing TTL handling
233      std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
234      mBufferPool.processStatusMessages();
235      mBufferPool.cleanUp(clearCache);
236  }
237  
BufferPool()238  Accessor::Impl::Impl::BufferPool::BufferPool()
239      : mTimestampUs(getTimestampNow()),
240        mLastCleanUpUs(mTimestampUs),
241        mLastLogUs(mTimestampUs),
242        mSeq(0) {}
243  
244  
245  // Statistics helper
246  template<typename T, typename S>
percentage(T base,S total)247  int percentage(T base, S total) {
248      return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
249  }
250  
~BufferPool()251  Accessor::Impl::Impl::BufferPool::~BufferPool() {
252      std::lock_guard<std::mutex> lock(mMutex);
253      ALOGD("Destruction - bufferpool %p "
254            "cached: %zu/%zuM, %zu/%d%% in use; "
255            "allocs: %zu, %d%% recycled; "
256            "transfers: %zu, %d%% unfetched",
257            this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
258            mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
259            mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
260            mStats.mTotalTransfers,
261            percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
262  }
263  
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)264  bool Accessor::Impl::BufferPool::handleOwnBuffer(
265          ConnectionId connectionId, BufferId bufferId) {
266  
267      bool added = insert(&mUsingBuffers, connectionId, bufferId);
268      if (added) {
269          auto iter = mBuffers.find(bufferId);
270          iter->second->mOwnerCount++;
271      }
272      insert(&mUsingConnections, bufferId, connectionId);
273      return added;
274  }
275  
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)276  bool Accessor::Impl::BufferPool::handleReleaseBuffer(
277          ConnectionId connectionId, BufferId bufferId) {
278      bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
279      if (deleted) {
280          auto iter = mBuffers.find(bufferId);
281          iter->second->mOwnerCount--;
282          if (iter->second->mOwnerCount == 0 &&
283                  iter->second->mTransactionCount == 0) {
284              mStats.onBufferUnused(iter->second->mAllocSize);
285              mFreeBuffers.insert(bufferId);
286          }
287      }
288      erase(&mUsingConnections, bufferId, connectionId);
289      ALOGV("release buffer %u : %d", bufferId, deleted);
290      return deleted;
291  }
292  
handleTransferTo(const BufferStatusMessage & message)293  bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
294      auto completed = mCompletedTransactions.find(
295              message.transactionId);
296      if (completed != mCompletedTransactions.end()) {
297          // already completed
298          mCompletedTransactions.erase(completed);
299          return true;
300      }
301      // the buffer should exist and be owned.
302      auto bufferIter = mBuffers.find(message.bufferId);
303      if (bufferIter == mBuffers.end() ||
304              !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
305          return false;
306      }
307      auto found = mTransactions.find(message.transactionId);
308      if (found != mTransactions.end()) {
309          // transfer_from was received earlier.
310          found->second->mSender = message.connectionId;
311          found->second->mSenderValidated = true;
312          return true;
313      }
314      if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
315          // N.B: it could be fake or receive connection already closed.
316          ALOGD("bufferpool %p receiver connection %lld is no longer valid",
317                this, (long long)message.targetConnectionId);
318          return false;
319      }
320      mStats.onBufferSent();
321      mTransactions.insert(std::make_pair(
322              message.transactionId,
323              std::make_unique<TransactionStatus>(message, mTimestampUs)));
324      insert(&mPendingTransactions, message.targetConnectionId,
325             message.transactionId);
326      bufferIter->second->mTransactionCount++;
327      return true;
328  }
329  
handleTransferFrom(const BufferStatusMessage & message)330  bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
331      auto found = mTransactions.find(message.transactionId);
332      if (found == mTransactions.end()) {
333          // TODO: is it feasible to check ownership here?
334          mStats.onBufferSent();
335          mTransactions.insert(std::make_pair(
336                  message.transactionId,
337                  std::make_unique<TransactionStatus>(message, mTimestampUs)));
338          insert(&mPendingTransactions, message.connectionId,
339                 message.transactionId);
340          auto bufferIter = mBuffers.find(message.bufferId);
341          bufferIter->second->mTransactionCount++;
342      } else {
343          if (message.connectionId == found->second->mReceiver) {
344              found->second->mStatus = BufferStatus::TRANSFER_FROM;
345          }
346      }
347      return true;
348  }
349  
handleTransferResult(const BufferStatusMessage & message)350  bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
351      auto found = mTransactions.find(message.transactionId);
352      if (found != mTransactions.end()) {
353          bool deleted = erase(&mPendingTransactions, message.connectionId,
354                               message.transactionId);
355          if (deleted) {
356              if (!found->second->mSenderValidated) {
357                  mCompletedTransactions.insert(message.transactionId);
358              }
359              auto bufferIter = mBuffers.find(message.bufferId);
360              if (message.newStatus == BufferStatus::TRANSFER_OK) {
361                  handleOwnBuffer(message.connectionId, message.bufferId);
362              }
363              bufferIter->second->mTransactionCount--;
364              if (bufferIter->second->mOwnerCount == 0
365                  && bufferIter->second->mTransactionCount == 0) {
366                  mStats.onBufferUnused(bufferIter->second->mAllocSize);
367                  mFreeBuffers.insert(message.bufferId);
368              }
369              mTransactions.erase(found);
370          }
371          ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
372                message.bufferId, deleted);
373          return deleted;
374      }
375      ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
376            message.bufferId);
377      return false;
378  }
379  
processStatusMessages()380  void Accessor::Impl::BufferPool::processStatusMessages() {
381      std::vector<BufferStatusMessage> messages;
382      mObserver.getBufferStatusChanges(messages);
383      mTimestampUs = getTimestampNow();
384      for (BufferStatusMessage& message: messages) {
385          bool ret = false;
386          switch (message.newStatus) {
387              case BufferStatus::NOT_USED:
388                  ret = handleReleaseBuffer(
389                          message.connectionId, message.bufferId);
390                  break;
391              case BufferStatus::USED:
392                  // not happening
393                  break;
394              case BufferStatus::TRANSFER_TO:
395                  ret = handleTransferTo(message);
396                  break;
397              case BufferStatus::TRANSFER_FROM:
398                  ret = handleTransferFrom(message);
399                  break;
400              case BufferStatus::TRANSFER_TIMEOUT:
401                  // TODO
402                  break;
403              case BufferStatus::TRANSFER_LOST:
404                  // TODO
405                  break;
406              case BufferStatus::TRANSFER_FETCH:
407                  // not happening
408                  break;
409              case BufferStatus::TRANSFER_OK:
410              case BufferStatus::TRANSFER_ERROR:
411                  ret = handleTransferResult(message);
412                  break;
413          }
414          if (ret == false) {
415              ALOGW("buffer status message processing failure - message : %d connection : %lld",
416                    message.newStatus, (long long)message.connectionId);
417          }
418      }
419      messages.clear();
420  }
421  
handleClose(ConnectionId connectionId)422  bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
423      // Cleaning buffers
424      auto buffers = mUsingBuffers.find(connectionId);
425      if (buffers != mUsingBuffers.end()) {
426          for (const BufferId& bufferId : buffers->second) {
427              bool deleted = erase(&mUsingConnections, bufferId, connectionId);
428              if (deleted) {
429                  auto bufferIter = mBuffers.find(bufferId);
430                  bufferIter->second->mOwnerCount--;
431                  if (bufferIter->second->mOwnerCount == 0 &&
432                          bufferIter->second->mTransactionCount == 0) {
433                      // TODO: handle freebuffer insert fail
434                      mStats.onBufferUnused(bufferIter->second->mAllocSize);
435                      mFreeBuffers.insert(bufferId);
436                  }
437              }
438          }
439          mUsingBuffers.erase(buffers);
440      }
441  
442      // Cleaning transactions
443      auto pending = mPendingTransactions.find(connectionId);
444      if (pending != mPendingTransactions.end()) {
445          for (const TransactionId& transactionId : pending->second) {
446              auto iter = mTransactions.find(transactionId);
447              if (iter != mTransactions.end()) {
448                  if (!iter->second->mSenderValidated) {
449                      mCompletedTransactions.insert(transactionId);
450                  }
451                  BufferId bufferId = iter->second->mBufferId;
452                  auto bufferIter = mBuffers.find(bufferId);
453                  bufferIter->second->mTransactionCount--;
454                  if (bufferIter->second->mOwnerCount == 0 &&
455                      bufferIter->second->mTransactionCount == 0) {
456                      // TODO: handle freebuffer insert fail
457                      mStats.onBufferUnused(bufferIter->second->mAllocSize);
458                      mFreeBuffers.insert(bufferId);
459                  }
460                  mTransactions.erase(iter);
461              }
462          }
463      }
464      mConnectionIds.erase(connectionId);
465      return true;
466  }
467  
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)468  bool Accessor::Impl::BufferPool::getFreeBuffer(
469          const std::shared_ptr<BufferPoolAllocator> &allocator,
470          const std::vector<uint8_t> &params, BufferId *pId,
471          const native_handle_t** handle) {
472      auto bufferIt = mFreeBuffers.begin();
473      for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
474          BufferId bufferId = *bufferIt;
475          if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
476              break;
477          }
478      }
479      if (bufferIt != mFreeBuffers.end()) {
480          BufferId id = *bufferIt;
481          mFreeBuffers.erase(bufferIt);
482          mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
483          *handle = mBuffers[id]->handle();
484          *pId = id;
485          ALOGV("recycle a buffer %u %p", id, *handle);
486          return true;
487      }
488      return false;
489  }
490  
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)491  ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
492          const std::shared_ptr<BufferPoolAllocation> &alloc,
493          const size_t allocSize,
494          const std::vector<uint8_t> &params,
495          BufferId *pId,
496          const native_handle_t** handle) {
497  
498      BufferId bufferId = mSeq++;
499      if (mSeq == Connection::SYNC_BUFFERID) {
500          mSeq = 0;
501      }
502      std::unique_ptr<InternalBuffer> buffer =
503              std::make_unique<InternalBuffer>(
504                      bufferId, alloc, allocSize, params);
505      if (buffer) {
506          auto res = mBuffers.insert(std::make_pair(
507                  bufferId, std::move(buffer)));
508          if (res.second) {
509              mStats.onBufferAllocated(allocSize);
510              *handle = alloc->handle();
511              *pId = bufferId;
512              return ResultStatus::OK;
513          }
514      }
515      return ResultStatus::NO_MEMORY;
516  }
517  
cleanUp(bool clearCache)518  void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
519      if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
520          mLastCleanUpUs = mTimestampUs;
521          if (mTimestampUs > mLastLogUs + kLogDurationUs) {
522              mLastLogUs = mTimestampUs;
523              ALOGD("bufferpool %p : %zu(%zu size) total buffers - "
524                    "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
525                    "%zu/%zu (fetch/transfer)",
526                    this, mStats.mBuffersCached, mStats.mSizeCached,
527                    mStats.mBuffersInUse, mStats.mSizeInUse,
528                    mStats.mTotalRecycles, mStats.mTotalAllocations,
529                    mStats.mTotalFetches, mStats.mTotalTransfers);
530          }
531          for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
532              if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
533                      && mBuffers.size() < kMinBufferCountForEviction) {
534                  break;
535              }
536              auto it = mBuffers.find(*freeIt);
537              if (it != mBuffers.end() &&
538                      it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
539                  mStats.onBufferEvicted(it->second->mAllocSize);
540                  mBuffers.erase(it);
541                  freeIt = mFreeBuffers.erase(freeIt);
542              } else {
543                  ++freeIt;
544                  ALOGW("bufferpool inconsistent!");
545              }
546          }
547      }
548  }
549  
550  }  // namespace implementation
551  }  // namespace V1_0
552  }  // namespace bufferpool
553  }  // namespace media
554  }  // namespace hardware
555  }  // namespace android
556