1 /*
2  * Copyright (C) 2018 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "BufferPoolClient"
18 //#define LOG_NDEBUG 0
19 
20 #include <thread>
21 #include <utils/Log.h>
22 #include "BufferPoolClient.h"
23 #include "Connection.h"
24 
25 namespace android {
26 namespace hardware {
27 namespace media {
28 namespace bufferpool {
29 namespace V2_0 {
30 namespace implementation {
31 
32 static constexpr int64_t kReceiveTimeoutUs = 2000000; // 2s
33 static constexpr int kPostMaxRetry = 3;
34 static constexpr int kCacheTtlUs = 1000000; // TODO: tune
35 static constexpr size_t kMaxCachedBufferCount = 64;
36 static constexpr size_t kCachedBufferCountTarget = kMaxCachedBufferCount - 16;
37 
38 class BufferPoolClient::Impl
39         : public std::enable_shared_from_this<BufferPoolClient::Impl> {
40 public:
41     explicit Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer);
42 
43     explicit Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer);
44 
isValid()45     bool isValid() {
46         return mValid;
47     }
48 
isLocal()49     bool isLocal() {
50         return mValid && mLocal;
51     }
52 
getConnectionId()53     ConnectionId getConnectionId() {
54         return mConnectionId;
55     }
56 
getAccessor()57     sp<IAccessor> &getAccessor() {
58         return mAccessor;
59     }
60 
61     bool isActive(int64_t *lastTransactionUs, bool clearCache);
62 
63     void receiveInvalidation(uint32_t msgID);
64 
65     ResultStatus flush();
66 
67     ResultStatus allocate(const std::vector<uint8_t> &params,
68                           native_handle_t **handle,
69                           std::shared_ptr<BufferPoolData> *buffer);
70 
71     ResultStatus receive(
72             TransactionId transactionId, BufferId bufferId,
73             int64_t timestampUs,
74             native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer);
75 
76     void postBufferRelease(BufferId bufferId);
77 
78     bool postSend(
79             BufferId bufferId, ConnectionId receiver,
80             TransactionId *transactionId, int64_t *timestampUs);
81 private:
82 
83     bool postReceive(
84             BufferId bufferId, TransactionId transactionId,
85             int64_t timestampUs);
86 
87     bool postReceiveResult(
88             BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync);
89 
90     void trySyncFromRemote();
91 
92     bool syncReleased(uint32_t msgId = 0);
93 
94     void evictCaches(bool clearCache = false);
95 
96     void invalidateBuffer(BufferId id);
97 
98     void invalidateRange(BufferId from, BufferId to);
99 
100     ResultStatus allocateBufferHandle(
101             const std::vector<uint8_t>& params, BufferId *bufferId,
102             native_handle_t **handle);
103 
104     ResultStatus fetchBufferHandle(
105             TransactionId transactionId, BufferId bufferId,
106             native_handle_t **handle);
107 
108     struct BlockPoolDataDtor;
109     struct ClientBuffer;
110 
111     bool mLocal;
112     bool mValid;
113     sp<IAccessor> mAccessor;
114     sp<Connection> mLocalConnection;
115     sp<IConnection> mRemoteConnection;
116     uint32_t mSeqId;
117     ConnectionId mConnectionId;
118     int64_t mLastEvictCacheUs;
119     std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
120 
121     // CachedBuffers
122     struct BufferCache {
123         std::mutex mLock;
124         bool mCreating;
125         std::condition_variable mCreateCv;
126         std::map<BufferId, std::unique_ptr<ClientBuffer>> mBuffers;
127         int mActive;
128         int64_t mLastChangeUs;
129 
BufferCacheandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::BufferCache130         BufferCache() : mCreating(false), mActive(0), mLastChangeUs(getTimestampNow()) {}
131 
incActive_landroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::BufferCache132         void incActive_l() {
133             ++mActive;
134             mLastChangeUs = getTimestampNow();
135         }
136 
decActive_landroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::BufferCache137         void decActive_l() {
138             --mActive;
139             mLastChangeUs = getTimestampNow();
140         }
141 
cachedBufferCountandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::BufferCache142         int cachedBufferCount() const {
143             return mBuffers.size() - mActive;
144         }
145     } mCache;
146 
147     // FMQ - release notifier
148     struct ReleaseCache {
149         std::mutex mLock;
150         // TODO: use only one list?(using one list may dealy sending messages?)
151         std::list<BufferId> mReleasingIds;
152         std::list<BufferId> mReleasedIds;
153         uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
154         bool mInvalidateAck;
155         std::unique_ptr<BufferStatusChannel> mStatusChannel;
156 
ReleaseCacheandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ReleaseCache157         ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
158     } mReleasing;
159 
160     // This lock is held during synchronization from remote side.
161     // In order to minimize remote calls and locking durtaion, this lock is held
162     // by best effort approach using try_lock().
163     std::mutex mRemoteSyncLock;
164 };
165 
166 struct BufferPoolClient::Impl::BlockPoolDataDtor {
BlockPoolDataDtorandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::BlockPoolDataDtor167     BlockPoolDataDtor(const std::shared_ptr<BufferPoolClient::Impl> &impl)
168             : mImpl(impl) {}
169 
operator ()android::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::BlockPoolDataDtor170     void operator()(BufferPoolData *buffer) {
171         BufferId id = buffer->mId;
172         delete buffer;
173 
174         auto impl = mImpl.lock();
175         if (impl && impl->isValid()) {
176             impl->postBufferRelease(id);
177         }
178     }
179     const std::weak_ptr<BufferPoolClient::Impl> mImpl;
180 };
181 
182 struct BufferPoolClient::Impl::ClientBuffer {
183 private:
184     int64_t mExpireUs;
185     bool mHasCache;
186     ConnectionId mConnectionId;
187     BufferId mId;
188     native_handle_t *mHandle;
189     std::weak_ptr<BufferPoolData> mCache;
190 
updateExpireandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer191     void updateExpire() {
192         mExpireUs = getTimestampNow() + kCacheTtlUs;
193     }
194 
195 public:
ClientBufferandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer196     ClientBuffer(
197             ConnectionId connectionId, BufferId id, native_handle_t *handle)
198             : mHasCache(false), mConnectionId(connectionId),
199               mId(id), mHandle(handle) {
200         mExpireUs = getTimestampNow() + kCacheTtlUs;
201     }
202 
~ClientBufferandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer203     ~ClientBuffer() {
204         if (mHandle) {
205             native_handle_close(mHandle);
206             native_handle_delete(mHandle);
207         }
208     }
209 
idandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer210     BufferId id() const {
211         return mId;
212     }
213 
expireandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer214     bool expire() const {
215         int64_t now = getTimestampNow();
216         return now >= mExpireUs;
217     }
218 
hasCacheandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer219     bool hasCache() const {
220         return mHasCache;
221     }
222 
fetchCacheandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer223     std::shared_ptr<BufferPoolData> fetchCache(native_handle_t **pHandle) {
224         if (mHasCache) {
225             std::shared_ptr<BufferPoolData> cache = mCache.lock();
226             if (cache) {
227                 *pHandle = mHandle;
228             }
229             return cache;
230         }
231         return nullptr;
232     }
233 
createCacheandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer234     std::shared_ptr<BufferPoolData> createCache(
235             const std::shared_ptr<BufferPoolClient::Impl> &impl,
236             native_handle_t **pHandle) {
237         if (!mHasCache) {
238             // Allocates a raw ptr in order to avoid sending #postBufferRelease
239             // from deleter, in case of native_handle_clone failure.
240             BufferPoolData *ptr = new BufferPoolData(mConnectionId, mId);
241             if (ptr) {
242                 std::shared_ptr<BufferPoolData> cache(ptr, BlockPoolDataDtor(impl));
243                 if (cache) {
244                     mCache = cache;
245                     mHasCache = true;
246                     *pHandle = mHandle;
247                     return cache;
248                 }
249             }
250             if (ptr) {
251                 delete ptr;
252             }
253         }
254         return nullptr;
255     }
256 
onCacheReleaseandroid::hardware::media::bufferpool::V2_0::implementation::BufferPoolClient::Impl::ClientBuffer257     bool onCacheRelease() {
258         if (mHasCache) {
259             // TODO: verify mCache is not valid;
260             updateExpire();
261             mHasCache = false;
262             return true;
263         }
264         return false;
265     }
266 };
267 
Impl(const sp<Accessor> & accessor,const sp<IObserver> & observer)268 BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer)
269     : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
270       mLastEvictCacheUs(getTimestampNow()) {
271     const StatusDescriptor *statusDesc;
272     const InvalidationDescriptor *invDesc;
273     ResultStatus status = accessor->connect(
274             observer, true,
275             &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
276             &statusDesc, &invDesc);
277     if (status == ResultStatus::OK) {
278         mReleasing.mStatusChannel =
279                 std::make_unique<BufferStatusChannel>(*statusDesc);
280         mInvalidationListener =
281                 std::make_unique<BufferInvalidationListener>(*invDesc);
282         mValid = mReleasing.mStatusChannel &&
283                 mReleasing.mStatusChannel->isValid() &&
284                 mInvalidationListener &&
285                 mInvalidationListener->isValid();
286     }
287 }
288 
Impl(const sp<IAccessor> & accessor,const sp<IObserver> & observer)289 BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer)
290     : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
291       mLastEvictCacheUs(getTimestampNow()) {
292     bool valid = false;
293     sp<IConnection>& outConnection = mRemoteConnection;
294     ConnectionId& id = mConnectionId;
295     uint32_t& outMsgId = mReleasing.mInvalidateId;
296     std::unique_ptr<BufferStatusChannel>& outChannel =
297             mReleasing.mStatusChannel;
298     std::unique_ptr<BufferInvalidationListener>& outObserver =
299             mInvalidationListener;
300     Return<void> transResult = accessor->connect(
301             observer,
302             [&valid, &outConnection, &id, &outMsgId, &outChannel, &outObserver]
303             (ResultStatus status, sp<IConnection> connection,
304              ConnectionId connectionId, uint32_t msgId,
305              const StatusDescriptor& statusDesc,
306              const InvalidationDescriptor& invDesc) {
307                 if (status == ResultStatus::OK) {
308                     outConnection = connection;
309                     id = connectionId;
310                     outMsgId = msgId;
311                     outChannel = std::make_unique<BufferStatusChannel>(statusDesc);
312                     outObserver = std::make_unique<BufferInvalidationListener>(invDesc);
313                     if (outChannel && outChannel->isValid() &&
314                         outObserver && outObserver->isValid()) {
315                         valid = true;
316                     }
317                 }
318             });
319     mValid = transResult.isOk() && valid;
320 }
321 
isActive(int64_t * lastTransactionUs,bool clearCache)322 bool BufferPoolClient::Impl::isActive(int64_t *lastTransactionUs, bool clearCache) {
323     bool active = false;
324     {
325         std::lock_guard<std::mutex> lock(mCache.mLock);
326         syncReleased();
327         evictCaches(clearCache);
328         *lastTransactionUs = mCache.mLastChangeUs;
329         active = mCache.mActive > 0;
330     }
331     if (mValid && mLocal && mLocalConnection) {
332         mLocalConnection->cleanUp(clearCache);
333         return true;
334     }
335     return active;
336 }
337 
receiveInvalidation(uint32_t messageId)338 void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
339     std::lock_guard<std::mutex> lock(mCache.mLock);
340     syncReleased(messageId);
341     // TODO: evict cache required?
342 }
343 
flush()344 ResultStatus BufferPoolClient::Impl::flush() {
345     if (!mLocal || !mLocalConnection || !mValid) {
346         return ResultStatus::CRITICAL_ERROR;
347     }
348     {
349         std::unique_lock<std::mutex> lock(mCache.mLock);
350         syncReleased();
351         evictCaches();
352         return mLocalConnection->flush();
353     }
354 }
355 
allocate(const std::vector<uint8_t> & params,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)356 ResultStatus BufferPoolClient::Impl::allocate(
357         const std::vector<uint8_t> &params,
358         native_handle_t **pHandle,
359         std::shared_ptr<BufferPoolData> *buffer) {
360     if (!mLocal || !mLocalConnection || !mValid) {
361         return ResultStatus::CRITICAL_ERROR;
362     }
363     BufferId bufferId;
364     native_handle_t *handle = nullptr;
365     buffer->reset();
366     ResultStatus status = allocateBufferHandle(params, &bufferId, &handle);
367     if (status == ResultStatus::OK) {
368         if (handle) {
369             std::unique_lock<std::mutex> lock(mCache.mLock);
370             syncReleased();
371             evictCaches();
372             auto cacheIt = mCache.mBuffers.find(bufferId);
373             if (cacheIt != mCache.mBuffers.end()) {
374                 // TODO: verify it is recycled. (not having active ref)
375                 mCache.mBuffers.erase(cacheIt);
376             }
377             auto clientBuffer = std::make_unique<ClientBuffer>(
378                     mConnectionId, bufferId, handle);
379             if (clientBuffer) {
380                 auto result = mCache.mBuffers.insert(std::make_pair(
381                         bufferId, std::move(clientBuffer)));
382                 if (result.second) {
383                     *buffer = result.first->second->createCache(
384                             shared_from_this(), pHandle);
385                     if (*buffer) {
386                         mCache.incActive_l();
387                     }
388                 }
389             }
390         }
391         if (!*buffer) {
392             ALOGV("client cache creation failure %d: %lld",
393                   handle != nullptr, (long long)mConnectionId);
394             status = ResultStatus::NO_MEMORY;
395             postBufferRelease(bufferId);
396         }
397     }
398     return status;
399 }
400 
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** pHandle,std::shared_ptr<BufferPoolData> * buffer)401 ResultStatus BufferPoolClient::Impl::receive(
402         TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
403         native_handle_t **pHandle,
404         std::shared_ptr<BufferPoolData> *buffer) {
405     if (!mValid) {
406         return ResultStatus::CRITICAL_ERROR;
407     }
408     if (timestampUs != 0) {
409         timestampUs += kReceiveTimeoutUs;
410     }
411     if (!postReceive(bufferId, transactionId, timestampUs)) {
412         return ResultStatus::CRITICAL_ERROR;
413     }
414     ResultStatus status = ResultStatus::CRITICAL_ERROR;
415     buffer->reset();
416     while(1) {
417         std::unique_lock<std::mutex> lock(mCache.mLock);
418         syncReleased();
419         evictCaches();
420         auto cacheIt = mCache.mBuffers.find(bufferId);
421         if (cacheIt != mCache.mBuffers.end()) {
422             if (cacheIt->second->hasCache()) {
423                 *buffer = cacheIt->second->fetchCache(pHandle);
424                 if (!*buffer) {
425                     // check transfer time_out
426                     lock.unlock();
427                     std::this_thread::yield();
428                     continue;
429                 }
430                 ALOGV("client receive from reference %lld", (long long)mConnectionId);
431                 break;
432             } else {
433                 *buffer = cacheIt->second->createCache(shared_from_this(), pHandle);
434                 if (*buffer) {
435                     mCache.incActive_l();
436                 }
437                 ALOGV("client receive from cache %lld", (long long)mConnectionId);
438                 break;
439             }
440         } else {
441             if (!mCache.mCreating) {
442                 mCache.mCreating = true;
443                 lock.unlock();
444                 native_handle_t* handle = nullptr;
445                 status = fetchBufferHandle(transactionId, bufferId, &handle);
446                 lock.lock();
447                 if (status == ResultStatus::OK) {
448                     if (handle) {
449                         auto clientBuffer = std::make_unique<ClientBuffer>(
450                                 mConnectionId, bufferId, handle);
451                         if (clientBuffer) {
452                             auto result = mCache.mBuffers.insert(
453                                     std::make_pair(bufferId, std::move(
454                                             clientBuffer)));
455                             if (result.second) {
456                                 *buffer = result.first->second->createCache(
457                                         shared_from_this(), pHandle);
458                                 if (*buffer) {
459                                     mCache.incActive_l();
460                                 }
461                             }
462                         }
463                     }
464                     if (!*buffer) {
465                         status = ResultStatus::NO_MEMORY;
466                     }
467                 }
468                 mCache.mCreating = false;
469                 lock.unlock();
470                 mCache.mCreateCv.notify_all();
471                 break;
472             }
473             mCache.mCreateCv.wait(lock);
474         }
475     }
476     bool needsSync = false;
477     bool posted = postReceiveResult(bufferId, transactionId,
478                                       *buffer ? true : false, &needsSync);
479     ALOGV("client receive %lld - %u : %s (%d)", (long long)mConnectionId, bufferId,
480           *buffer ? "ok" : "fail", posted);
481     if (mValid && mLocal && mLocalConnection) {
482         mLocalConnection->cleanUp(false);
483     }
484     if (needsSync && mRemoteConnection) {
485         trySyncFromRemote();
486     }
487     if (*buffer) {
488         if (!posted) {
489             buffer->reset();
490             return ResultStatus::CRITICAL_ERROR;
491         }
492         return ResultStatus::OK;
493     }
494     return status;
495 }
496 
497 
postBufferRelease(BufferId bufferId)498 void BufferPoolClient::Impl::postBufferRelease(BufferId bufferId) {
499     std::lock_guard<std::mutex> lock(mReleasing.mLock);
500     mReleasing.mReleasingIds.push_back(bufferId);
501     mReleasing.mStatusChannel->postBufferRelease(
502             mConnectionId, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
503 }
504 
505 // TODO: revise ad-hoc posting data structure
postSend(BufferId bufferId,ConnectionId receiver,TransactionId * transactionId,int64_t * timestampUs)506 bool BufferPoolClient::Impl::postSend(
507         BufferId bufferId, ConnectionId receiver,
508         TransactionId *transactionId, int64_t *timestampUs) {
509     {
510         // TODO: don't need to call syncReleased every time
511         std::lock_guard<std::mutex> lock(mCache.mLock);
512         syncReleased();
513     }
514     bool ret = false;
515     bool needsSync = false;
516     {
517         std::lock_guard<std::mutex> lock(mReleasing.mLock);
518         *timestampUs = getTimestampNow();
519         *transactionId = (mConnectionId << 32) | mSeqId++;
520         // TODO: retry, add timeout, target?
521         ret =  mReleasing.mStatusChannel->postBufferStatusMessage(
522                 *transactionId, bufferId, BufferStatus::TRANSFER_TO, mConnectionId,
523                 receiver, mReleasing.mReleasingIds, mReleasing.mReleasedIds);
524         needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
525     }
526     if (mValid && mLocal && mLocalConnection) {
527         mLocalConnection->cleanUp(false);
528     }
529     if (needsSync && mRemoteConnection) {
530         trySyncFromRemote();
531     }
532     return ret;
533 }
534 
postReceive(BufferId bufferId,TransactionId transactionId,int64_t timestampUs)535 bool BufferPoolClient::Impl::postReceive(
536         BufferId bufferId, TransactionId transactionId, int64_t timestampUs) {
537     for (int i = 0; i < kPostMaxRetry; ++i) {
538         std::unique_lock<std::mutex> lock(mReleasing.mLock);
539         int64_t now = getTimestampNow();
540         if (timestampUs == 0 || now < timestampUs) {
541             bool result = mReleasing.mStatusChannel->postBufferStatusMessage(
542                     transactionId, bufferId, BufferStatus::TRANSFER_FROM,
543                     mConnectionId, -1, mReleasing.mReleasingIds,
544                     mReleasing.mReleasedIds);
545             if (result) {
546                 return true;
547             }
548             lock.unlock();
549             std::this_thread::yield();
550         } else {
551             mReleasing.mStatusChannel->postBufferStatusMessage(
552                     transactionId, bufferId, BufferStatus::TRANSFER_TIMEOUT,
553                     mConnectionId, -1, mReleasing.mReleasingIds,
554                     mReleasing.mReleasedIds);
555             return false;
556         }
557     }
558     return false;
559 }
560 
postReceiveResult(BufferId bufferId,TransactionId transactionId,bool result,bool * needsSync)561 bool BufferPoolClient::Impl::postReceiveResult(
562         BufferId bufferId, TransactionId transactionId, bool result, bool *needsSync) {
563     std::lock_guard<std::mutex> lock(mReleasing.mLock);
564     // TODO: retry, add timeout
565     bool ret = mReleasing.mStatusChannel->postBufferStatusMessage(
566             transactionId, bufferId,
567             result ? BufferStatus::TRANSFER_OK : BufferStatus::TRANSFER_ERROR,
568             mConnectionId, -1, mReleasing.mReleasingIds,
569             mReleasing.mReleasedIds);
570     *needsSync = !mLocal && mReleasing.mStatusChannel->needsSync();
571     return ret;
572 }
573 
trySyncFromRemote()574 void BufferPoolClient::Impl::trySyncFromRemote() {
575     if (mRemoteSyncLock.try_lock()) {
576         bool needsSync = false;
577         {
578             std::lock_guard<std::mutex> lock(mReleasing.mLock);
579             needsSync = mReleasing.mStatusChannel->needsSync();
580         }
581         if (needsSync) {
582             TransactionId transactionId = (mConnectionId << 32);
583             BufferId bufferId = Connection::SYNC_BUFFERID;
584             Return<void> transResult = mRemoteConnection->fetch(
585                     transactionId, bufferId,
586                     []
587                     (ResultStatus outStatus, Buffer outBuffer) {
588                         (void) outStatus;
589                         (void) outBuffer;
590                     });
591             if (!transResult.isOk()) {
592                 ALOGD("sync from client %lld failed: bufferpool process died.",
593                       (long long)mConnectionId);
594             }
595         }
596         mRemoteSyncLock.unlock();
597     }
598 }
599 
600 // should have mCache.mLock
syncReleased(uint32_t messageId)601 bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
602     bool cleared = false;
603     {
604         std::lock_guard<std::mutex> lock(mReleasing.mLock);
605         if (mReleasing.mReleasingIds.size() > 0) {
606             mReleasing.mStatusChannel->postBufferRelease(
607                     mConnectionId, mReleasing.mReleasingIds,
608                     mReleasing.mReleasedIds);
609         }
610         if (mReleasing.mReleasedIds.size() > 0) {
611             for (BufferId& id: mReleasing.mReleasedIds) {
612                 ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
613                 auto found = mCache.mBuffers.find(id);
614                 if (found != mCache.mBuffers.end()) {
615                     if (found->second->onCacheRelease()) {
616                         mCache.decActive_l();
617                     } else {
618                         // should not happen!
619                         ALOGW("client %lld cache release status inconsitent!",
620                             (long long)mConnectionId);
621                     }
622                 } else {
623                     // should not happen!
624                     ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
625                 }
626             }
627             mReleasing.mReleasedIds.clear();
628             cleared = true;
629         }
630     }
631     std::vector<BufferInvalidationMessage> invalidations;
632     mInvalidationListener->getInvalidations(invalidations);
633     uint32_t lastMsgId = 0;
634     if (invalidations.size() > 0) {
635         for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
636             if (it->messageId != 0) {
637                 lastMsgId = it->messageId;
638             }
639             if (it->fromBufferId == it->toBufferId) {
640                 // TODO: handle fromBufferId = UINT32_MAX
641                 invalidateBuffer(it->fromBufferId);
642             } else {
643                 invalidateRange(it->fromBufferId, it->toBufferId);
644             }
645         }
646     }
647     {
648         std::lock_guard<std::mutex> lock(mReleasing.mLock);
649         if (lastMsgId != 0) {
650             if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
651                 mReleasing.mInvalidateId = lastMsgId;
652                 mReleasing.mInvalidateAck = false;
653             }
654         } else if (messageId != 0) {
655             // messages are drained.
656             if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
657                 mReleasing.mInvalidateId = messageId;
658                 mReleasing.mInvalidateAck = true;
659             }
660         }
661         if (!mReleasing.mInvalidateAck) {
662             // post ACK
663             mReleasing.mStatusChannel->postBufferInvalidateAck(
664                     mConnectionId,
665                     mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
666             ALOGV("client %lld invalidateion ack (%d) %u",
667                 (long long)mConnectionId,
668                 mReleasing.mInvalidateAck, mReleasing.mInvalidateId);
669         }
670     }
671     return cleared;
672 }
673 
674 // should have mCache.mLock
evictCaches(bool clearCache)675 void BufferPoolClient::Impl::evictCaches(bool clearCache) {
676     int64_t now = getTimestampNow();
677     if (now >= mLastEvictCacheUs + kCacheTtlUs ||
678             clearCache || mCache.cachedBufferCount() > kMaxCachedBufferCount) {
679         size_t evicted = 0;
680         for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
681             if (!it->second->hasCache() && (it->second->expire() ||
682                         clearCache || mCache.cachedBufferCount() > kCachedBufferCountTarget)) {
683                 it = mCache.mBuffers.erase(it);
684                 ++evicted;
685             } else {
686                 ++it;
687             }
688         }
689         ALOGV("cache count %lld : total %zu, active %d, evicted %zu",
690               (long long)mConnectionId, mCache.mBuffers.size(), mCache.mActive, evicted);
691         mLastEvictCacheUs = now;
692     }
693 }
694 
695 // should have mCache.mLock
invalidateBuffer(BufferId id)696 void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
697     for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
698         if (id == it->second->id()) {
699             if (!it->second->hasCache()) {
700                 mCache.mBuffers.erase(it);
701                 ALOGV("cache invalidated %lld : buffer %u",
702                       (long long)mConnectionId, id);
703             } else {
704                 ALOGW("Inconsitent invalidation %lld : activer buffer!! %u",
705                       (long long)mConnectionId, (unsigned int)id);
706             }
707             break;
708         }
709     }
710 }
711 
712 // should have mCache.mLock
invalidateRange(BufferId from,BufferId to)713 void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
714     size_t invalidated = 0;
715     for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
716         if (!it->second->hasCache()) {
717             BufferId bid = it->second->id();
718             if (from < to) {
719                 if (from <= bid && bid < to) {
720                     ++invalidated;
721                     it = mCache.mBuffers.erase(it);
722                     continue;
723                 }
724             } else {
725                 if (from <= bid || bid < to) {
726                     ++invalidated;
727                     it = mCache.mBuffers.erase(it);
728                     continue;
729                 }
730             }
731         }
732         ++it;
733     }
734     ALOGV("cache invalidated %lld : # of invalidated %zu",
735           (long long)mConnectionId, invalidated);
736 }
737 
allocateBufferHandle(const std::vector<uint8_t> & params,BufferId * bufferId,native_handle_t ** handle)738 ResultStatus BufferPoolClient::Impl::allocateBufferHandle(
739         const std::vector<uint8_t>& params, BufferId *bufferId,
740         native_handle_t** handle) {
741     if (mLocalConnection) {
742         const native_handle_t* allocHandle = nullptr;
743         ResultStatus status = mLocalConnection->allocate(
744                 params, bufferId, &allocHandle);
745         if (status == ResultStatus::OK) {
746             *handle = native_handle_clone(allocHandle);
747         }
748         ALOGV("client allocate result %lld %d : %u clone %p",
749               (long long)mConnectionId, status == ResultStatus::OK,
750               *handle ? *bufferId : 0 , *handle);
751         return status;
752     }
753     return ResultStatus::CRITICAL_ERROR;
754 }
755 
fetchBufferHandle(TransactionId transactionId,BufferId bufferId,native_handle_t ** handle)756 ResultStatus BufferPoolClient::Impl::fetchBufferHandle(
757         TransactionId transactionId, BufferId bufferId,
758         native_handle_t **handle) {
759     sp<IConnection> connection;
760     if (mLocal) {
761         connection = mLocalConnection;
762     } else {
763         connection = mRemoteConnection;
764     }
765     ResultStatus status;
766     Return<void> transResult = connection->fetch(
767             transactionId, bufferId,
768             [&status, &handle]
769             (ResultStatus outStatus, Buffer outBuffer) {
770                 status = outStatus;
771                 if (status == ResultStatus::OK) {
772                     *handle = native_handle_clone(
773                             outBuffer.buffer.getNativeHandle());
774                 }
775             });
776     return transResult.isOk() ? status : ResultStatus::CRITICAL_ERROR;
777 }
778 
779 
BufferPoolClient(const sp<Accessor> & accessor,const sp<IObserver> & observer)780 BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor,
781                                    const sp<IObserver> &observer) {
782     mImpl = std::make_shared<Impl>(accessor, observer);
783 }
784 
BufferPoolClient(const sp<IAccessor> & accessor,const sp<IObserver> & observer)785 BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor,
786                                    const sp<IObserver> &observer) {
787     mImpl = std::make_shared<Impl>(accessor, observer);
788 }
789 
~BufferPoolClient()790 BufferPoolClient::~BufferPoolClient() {
791     // TODO: how to handle orphaned buffers?
792 }
793 
isValid()794 bool BufferPoolClient::isValid() {
795     return mImpl && mImpl->isValid();
796 }
797 
isLocal()798 bool BufferPoolClient::isLocal() {
799     return mImpl && mImpl->isLocal();
800 }
801 
isActive(int64_t * lastTransactionUs,bool clearCache)802 bool BufferPoolClient::isActive(int64_t *lastTransactionUs, bool clearCache) {
803     if (!isValid()) {
804         *lastTransactionUs = 0;
805         return false;
806     }
807     return mImpl->isActive(lastTransactionUs, clearCache);
808 }
809 
getConnectionId()810 ConnectionId BufferPoolClient::getConnectionId() {
811     if (isValid()) {
812         return mImpl->getConnectionId();
813     }
814     return -1;
815 }
816 
getAccessor(sp<IAccessor> * accessor)817 ResultStatus BufferPoolClient::getAccessor(sp<IAccessor> *accessor) {
818     if (isValid()) {
819         *accessor = mImpl->getAccessor();
820         return ResultStatus::OK;
821     }
822     return ResultStatus::CRITICAL_ERROR;
823 }
824 
receiveInvalidation(uint32_t msgId)825 void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
826     ALOGV("bufferpool2 client recv inv %u", msgId);
827     if (isValid()) {
828         mImpl->receiveInvalidation(msgId);
829     }
830 }
831 
flush()832 ResultStatus BufferPoolClient::flush() {
833     if (isValid()) {
834         return mImpl->flush();
835     }
836     return ResultStatus::CRITICAL_ERROR;
837 }
838 
allocate(const std::vector<uint8_t> & params,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)839 ResultStatus BufferPoolClient::allocate(
840         const std::vector<uint8_t> &params,
841         native_handle_t **handle,
842         std::shared_ptr<BufferPoolData> *buffer) {
843     if (isValid()) {
844         return mImpl->allocate(params, handle, buffer);
845     }
846     return ResultStatus::CRITICAL_ERROR;
847 }
848 
receive(TransactionId transactionId,BufferId bufferId,int64_t timestampUs,native_handle_t ** handle,std::shared_ptr<BufferPoolData> * buffer)849 ResultStatus BufferPoolClient::receive(
850         TransactionId transactionId, BufferId bufferId, int64_t timestampUs,
851         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
852     if (isValid()) {
853         return mImpl->receive(transactionId, bufferId, timestampUs, handle, buffer);
854     }
855     return ResultStatus::CRITICAL_ERROR;
856 }
857 
postSend(ConnectionId receiverId,const std::shared_ptr<BufferPoolData> & buffer,TransactionId * transactionId,int64_t * timestampUs)858 ResultStatus BufferPoolClient::postSend(
859         ConnectionId receiverId,
860         const std::shared_ptr<BufferPoolData> &buffer,
861         TransactionId *transactionId,
862         int64_t *timestampUs) {
863     if (isValid()) {
864         bool result = mImpl->postSend(
865                 buffer->mId, receiverId, transactionId, timestampUs);
866         return result ? ResultStatus::OK : ResultStatus::CRITICAL_ERROR;
867     }
868     return ResultStatus::CRITICAL_ERROR;
869 }
870 
871 }  // namespace implementation
872 }  // namespace V2_0
873 }  // namespace bufferpool
874 }  // namespace media
875 }  // namespace hardware
876 }  // namespace android
877