1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolAccessor2.0"
18 //#define LOG_NDEBUG 0
19
20 #include <android-base/no_destructor.h>
21
22 #include <sys/types.h>
23 #include <stdint.h>
24 #include <time.h>
25 #include <unistd.h>
26 #include <utils/Log.h>
27 #include <thread>
28 #include "AccessorImpl.h"
29 #include "Connection.h"
30
31 namespace android {
32 namespace hardware {
33 namespace media {
34 namespace bufferpool {
35 namespace V2_0 {
36 namespace implementation {
37
38 namespace {
39 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
40 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
41
42 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
43 static constexpr size_t kMinBufferCountForEviction = 25;
44 static constexpr size_t kMaxUnusedBufferCount = 64;
45 static constexpr size_t kUnusedBufferCountTarget = kMaxUnusedBufferCount - 16;
46
47 static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
48 static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
49 }
50
51 // Buffer structure in bufferpool process
52 struct InternalBuffer {
53 BufferId mId;
54 size_t mOwnerCount;
55 size_t mTransactionCount;
56 const std::shared_ptr<BufferPoolAllocation> mAllocation;
57 const size_t mAllocSize;
58 const std::vector<uint8_t> mConfig;
59 bool mInvalidated;
60
InternalBufferandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer61 InternalBuffer(
62 BufferId id,
63 const std::shared_ptr<BufferPoolAllocation> &alloc,
64 const size_t allocSize,
65 const std::vector<uint8_t> &allocConfig)
66 : mId(id), mOwnerCount(0), mTransactionCount(0),
67 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
68 mInvalidated(false) {}
69
handleandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer70 const native_handle_t *handle() {
71 return mAllocation->handle();
72 }
73
invalidateandroid::hardware::media::bufferpool::V2_0::implementation::InternalBuffer74 void invalidate() {
75 mInvalidated = true;
76 }
77 };
78
79 struct TransactionStatus {
80 TransactionId mId;
81 BufferId mBufferId;
82 ConnectionId mSender;
83 ConnectionId mReceiver;
84 BufferStatus mStatus;
85 int64_t mTimestampUs;
86 bool mSenderValidated;
87
TransactionStatusandroid::hardware::media::bufferpool::V2_0::implementation::TransactionStatus88 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
89 mId = message.transactionId;
90 mBufferId = message.bufferId;
91 mStatus = message.newStatus;
92 mTimestampUs = timestampUs;
93 if (mStatus == BufferStatus::TRANSFER_TO) {
94 mSender = message.connectionId;
95 mReceiver = message.targetConnectionId;
96 mSenderValidated = true;
97 } else {
98 mSender = -1LL;
99 mReceiver = message.connectionId;
100 mSenderValidated = false;
101 }
102 }
103 };
104
105 // Helper template methods for handling map of set.
106 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)107 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
108 auto iter = mapOfSet->find(key);
109 if (iter == mapOfSet->end()) {
110 std::set<U> valueSet{value};
111 mapOfSet->insert(std::make_pair(key, valueSet));
112 return true;
113 } else if (iter->second.find(value) == iter->second.end()) {
114 iter->second.insert(value);
115 return true;
116 }
117 return false;
118 }
119
120 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)121 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
122 bool ret = false;
123 auto iter = mapOfSet->find(key);
124 if (iter != mapOfSet->end()) {
125 if (iter->second.erase(value) > 0) {
126 ret = true;
127 }
128 if (iter->second.size() == 0) {
129 mapOfSet->erase(iter);
130 }
131 }
132 return ret;
133 }
134
135 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)136 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
137 auto iter = mapOfSet->find(key);
138 if (iter != mapOfSet->end()) {
139 auto setIter = iter->second.find(value);
140 return setIter != iter->second.end();
141 }
142 return false;
143 }
144
145 #ifdef __ANDROID_VNDK__
146 static constexpr uint32_t kSeqIdVndkBit = 1U << 31;
147 #else
148 static constexpr uint32_t kSeqIdVndkBit = 0;
149 #endif
150
151 static constexpr uint32_t kSeqIdMax = 0x7fffffff;
152
ConnectionIdGenerator()153 Accessor::Impl::ConnectionIdGenerator::ConnectionIdGenerator() {
154 mSeqId = static_cast<uint32_t>(time(nullptr) & kSeqIdMax);
155 mPid = static_cast<int32_t>(getpid());
156 }
157
getConnectionId()158 ConnectionId Accessor::Impl::ConnectionIdGenerator::getConnectionId() {
159 uint32_t seq;
160 {
161 std::lock_guard<std::mutex> l(mLock);
162 seq = mSeqId;
163 if (mSeqId == kSeqIdMax) {
164 mSeqId = 0;
165 } else {
166 ++mSeqId;
167 }
168 }
169 return (int64_t)mPid << 32 | seq | kSeqIdVndkBit;
170 }
171
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)172 Accessor::Impl::Impl(
173 const std::shared_ptr<BufferPoolAllocator> &allocator)
174 : mAllocator(allocator), mScheduleEvictTs(0) {}
175
~Impl()176 Accessor::Impl::~Impl() {
177 }
178
connect(const sp<Accessor> & accessor,const sp<IObserver> & observer,sp<Connection> * connection,ConnectionId * pConnectionId,uint32_t * pMsgId,const StatusDescriptor ** statusDescPtr,const InvalidationDescriptor ** invDescPtr)179 ResultStatus Accessor::Impl::connect(
180 const sp<Accessor> &accessor, const sp<IObserver> &observer,
181 sp<Connection> *connection,
182 ConnectionId *pConnectionId,
183 uint32_t *pMsgId,
184 const StatusDescriptor** statusDescPtr,
185 const InvalidationDescriptor** invDescPtr) {
186 static ::android::base::NoDestructor<ConnectionIdGenerator> sConIdGenerator;
187 sp<Connection> newConnection = new Connection();
188 ResultStatus status = ResultStatus::CRITICAL_ERROR;
189 {
190 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
191 if (newConnection) {
192 int32_t pid = getpid();
193 ConnectionId id = sConIdGenerator->getConnectionId();
194 status = mBufferPool.mObserver.open(id, statusDescPtr);
195 if (status == ResultStatus::OK) {
196 newConnection->initialize(accessor, id);
197 *connection = newConnection;
198 *pConnectionId = id;
199 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
200 mBufferPool.mConnectionIds.insert(id);
201 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
202 mBufferPool.mInvalidation.onConnect(id, observer);
203 }
204
205 }
206 mBufferPool.processStatusMessages();
207 mBufferPool.cleanUp();
208 scheduleEvictIfNeeded();
209 }
210 return status;
211 }
212
close(ConnectionId connectionId)213 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
214 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
215 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
216 mBufferPool.processStatusMessages();
217 mBufferPool.handleClose(connectionId);
218 mBufferPool.mObserver.close(connectionId);
219 mBufferPool.mInvalidation.onClose(connectionId);
220 // Since close# will be called after all works are finished, it is OK to
221 // evict unused buffers.
222 mBufferPool.cleanUp(true);
223 scheduleEvictIfNeeded();
224 return ResultStatus::OK;
225 }
226
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)227 ResultStatus Accessor::Impl::allocate(
228 ConnectionId connectionId, const std::vector<uint8_t>& params,
229 BufferId *bufferId, const native_handle_t** handle) {
230 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
231 mBufferPool.processStatusMessages();
232 ResultStatus status = ResultStatus::OK;
233 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
234 lock.unlock();
235 std::shared_ptr<BufferPoolAllocation> alloc;
236 size_t allocSize;
237 status = mAllocator->allocate(params, &alloc, &allocSize);
238 lock.lock();
239 if (status == ResultStatus::OK) {
240 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
241 }
242 ALOGV("create a buffer %d : %u %p",
243 status == ResultStatus::OK, *bufferId, *handle);
244 }
245 if (status == ResultStatus::OK) {
246 // TODO: handle ownBuffer failure
247 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
248 }
249 mBufferPool.cleanUp();
250 scheduleEvictIfNeeded();
251 return status;
252 }
253
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)254 ResultStatus Accessor::Impl::fetch(
255 ConnectionId connectionId, TransactionId transactionId,
256 BufferId bufferId, const native_handle_t** handle) {
257 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
258 mBufferPool.processStatusMessages();
259 auto found = mBufferPool.mTransactions.find(transactionId);
260 if (found != mBufferPool.mTransactions.end() &&
261 contains(&mBufferPool.mPendingTransactions,
262 connectionId, transactionId)) {
263 if (found->second->mSenderValidated &&
264 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
265 found->second->mBufferId == bufferId) {
266 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
267 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
268 if (bufferIt != mBufferPool.mBuffers.end()) {
269 mBufferPool.mStats.onBufferFetched();
270 *handle = bufferIt->second->handle();
271 return ResultStatus::OK;
272 }
273 }
274 }
275 mBufferPool.cleanUp();
276 scheduleEvictIfNeeded();
277 return ResultStatus::CRITICAL_ERROR;
278 }
279
cleanUp(bool clearCache)280 void Accessor::Impl::cleanUp(bool clearCache) {
281 // transaction timeout, buffer cacheing TTL handling
282 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
283 mBufferPool.processStatusMessages();
284 mBufferPool.cleanUp(clearCache);
285 }
286
flush()287 void Accessor::Impl::flush() {
288 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
289 mBufferPool.processStatusMessages();
290 mBufferPool.flush(shared_from_this());
291 }
292
handleInvalidateAck()293 void Accessor::Impl::handleInvalidateAck() {
294 std::map<ConnectionId, const sp<IObserver>> observers;
295 uint32_t invalidationId;
296 {
297 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
298 mBufferPool.processStatusMessages();
299 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
300 }
301 // Do not hold lock for send invalidations
302 size_t deadClients = 0;
303 for (auto it = observers.begin(); it != observers.end(); ++it) {
304 const sp<IObserver> observer = it->second;
305 if (observer) {
306 Return<void> transResult = observer->onMessage(it->first, invalidationId);
307 if (!transResult.isOk()) {
308 ++deadClients;
309 }
310 }
311 }
312 if (deadClients > 0) {
313 ALOGD("During invalidation found %zu dead clients", deadClients);
314 }
315 }
316
isValid()317 bool Accessor::Impl::isValid() {
318 return mBufferPool.isValid();
319 }
320
BufferPool()321 Accessor::Impl::Impl::BufferPool::BufferPool()
322 : mTimestampUs(getTimestampNow()),
323 mLastCleanUpUs(mTimestampUs),
324 mLastLogUs(mTimestampUs),
325 mSeq(0),
326 mStartSeq(0) {
327 mValid = mInvalidationChannel.isValid();
328 }
329
330
331 // Statistics helper
332 template<typename T, typename S>
percentage(T base,S total)333 int percentage(T base, S total) {
334 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
335 }
336
337 std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
338
~BufferPool()339 Accessor::Impl::Impl::BufferPool::~BufferPool() {
340 std::lock_guard<std::mutex> lock(mMutex);
341 ALOGD("Destruction - bufferpool2 %p "
342 "cached: %zu/%zuM, %zu/%d%% in use; "
343 "allocs: %zu, %d%% recycled; "
344 "transfers: %zu, %d%% unfetched",
345 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
346 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
347 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
348 mStats.mTotalTransfers,
349 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
350 }
351
onConnect(ConnectionId conId,const sp<IObserver> & observer)352 void Accessor::Impl::BufferPool::Invalidation::onConnect(
353 ConnectionId conId, const sp<IObserver>& observer) {
354 mAcks[conId] = mInvalidationId; // starts from current invalidationId
355 mObservers.insert(std::make_pair(conId, observer));
356 }
357
onClose(ConnectionId conId)358 void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
359 mAcks.erase(conId);
360 mObservers.erase(conId);
361 }
362
onAck(ConnectionId conId,uint32_t msgId)363 void Accessor::Impl::BufferPool::Invalidation::onAck(
364 ConnectionId conId,
365 uint32_t msgId) {
366 auto it = mAcks.find(conId);
367 if (it == mAcks.end()) {
368 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
369 return;
370 }
371 if (isMessageLater(msgId, it->second)) {
372 mAcks[conId] = msgId;
373 }
374 }
375
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)376 void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
377 BufferId bufferId,
378 BufferInvalidationChannel &channel) {
379 for (auto it = mPendings.begin(); it != mPendings.end();) {
380 if (it->isInvalidated(bufferId)) {
381 uint32_t msgId = 0;
382 if (it->mNeedsAck) {
383 msgId = ++mInvalidationId;
384 if (msgId == 0) {
385 // wrap happens
386 msgId = ++mInvalidationId;
387 }
388 }
389 channel.postInvalidation(msgId, it->mFrom, it->mTo);
390 it = mPendings.erase(it);
391 continue;
392 }
393 ++it;
394 }
395 }
396
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor::Impl> & impl)397 void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
398 bool needsAck,
399 uint32_t from,
400 uint32_t to,
401 size_t left,
402 BufferInvalidationChannel &channel,
403 const std::shared_ptr<Accessor::Impl> &impl) {
404 uint32_t msgId = 0;
405 if (needsAck) {
406 msgId = ++mInvalidationId;
407 if (msgId == 0) {
408 // wrap happens
409 msgId = ++mInvalidationId;
410 }
411 }
412 ALOGV("bufferpool2 invalidation requested and queued");
413 if (left == 0) {
414 channel.postInvalidation(msgId, from, to);
415 } else {
416 // TODO: sending hint message?
417 ALOGV("bufferpoo2 invalidation requested and pending");
418 Pending pending(needsAck, from, to, left, impl);
419 mPendings.push_back(pending);
420 }
421 sInvalidator->addAccessor(mId, impl);
422 }
423
onHandleAck(std::map<ConnectionId,const sp<IObserver>> * observers,uint32_t * invalidationId)424 void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
425 std::map<ConnectionId, const sp<IObserver>> *observers,
426 uint32_t *invalidationId) {
427 if (mInvalidationId != 0) {
428 *invalidationId = mInvalidationId;
429 std::set<int> deads;
430 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
431 if (it->second != mInvalidationId) {
432 const sp<IObserver> observer = mObservers[it->first];
433 if (observer) {
434 observers->emplace(it->first, observer);
435 ALOGV("connection %lld will call observer (%u: %u)",
436 (long long)it->first, it->second, mInvalidationId);
437 // N.B: onMessage will be called later. ignore possibility of
438 // onMessage# oneway call being lost.
439 it->second = mInvalidationId;
440 } else {
441 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
442 deads.insert(it->first);
443 }
444 }
445 }
446 if (deads.size() > 0) {
447 for (auto it = deads.begin(); it != deads.end(); ++it) {
448 onClose(*it);
449 }
450 }
451 }
452 if (mPendings.size() == 0) {
453 // All invalidation Ids are synced and no more pending invalidations.
454 sInvalidator->delAccessor(mId);
455 }
456 }
457
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)458 bool Accessor::Impl::BufferPool::handleOwnBuffer(
459 ConnectionId connectionId, BufferId bufferId) {
460
461 bool added = insert(&mUsingBuffers, connectionId, bufferId);
462 if (added) {
463 auto iter = mBuffers.find(bufferId);
464 iter->second->mOwnerCount++;
465 }
466 insert(&mUsingConnections, bufferId, connectionId);
467 return added;
468 }
469
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)470 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
471 ConnectionId connectionId, BufferId bufferId) {
472 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
473 if (deleted) {
474 auto iter = mBuffers.find(bufferId);
475 iter->second->mOwnerCount--;
476 if (iter->second->mOwnerCount == 0 &&
477 iter->second->mTransactionCount == 0) {
478 if (!iter->second->mInvalidated) {
479 mStats.onBufferUnused(iter->second->mAllocSize);
480 mFreeBuffers.insert(bufferId);
481 } else {
482 mStats.onBufferUnused(iter->second->mAllocSize);
483 mStats.onBufferEvicted(iter->second->mAllocSize);
484 mBuffers.erase(iter);
485 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
486 }
487 }
488 }
489 erase(&mUsingConnections, bufferId, connectionId);
490 ALOGV("release buffer %u : %d", bufferId, deleted);
491 return deleted;
492 }
493
handleTransferTo(const BufferStatusMessage & message)494 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
495 auto completed = mCompletedTransactions.find(
496 message.transactionId);
497 if (completed != mCompletedTransactions.end()) {
498 // already completed
499 mCompletedTransactions.erase(completed);
500 return true;
501 }
502 // the buffer should exist and be owned.
503 auto bufferIter = mBuffers.find(message.bufferId);
504 if (bufferIter == mBuffers.end() ||
505 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
506 return false;
507 }
508 auto found = mTransactions.find(message.transactionId);
509 if (found != mTransactions.end()) {
510 // transfer_from was received earlier.
511 found->second->mSender = message.connectionId;
512 found->second->mSenderValidated = true;
513 return true;
514 }
515 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
516 // N.B: it could be fake or receive connection already closed.
517 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
518 this, (long long)message.targetConnectionId);
519 return false;
520 }
521 mStats.onBufferSent();
522 mTransactions.insert(std::make_pair(
523 message.transactionId,
524 std::make_unique<TransactionStatus>(message, mTimestampUs)));
525 insert(&mPendingTransactions, message.targetConnectionId,
526 message.transactionId);
527 bufferIter->second->mTransactionCount++;
528 return true;
529 }
530
handleTransferFrom(const BufferStatusMessage & message)531 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
532 auto found = mTransactions.find(message.transactionId);
533 if (found == mTransactions.end()) {
534 // TODO: is it feasible to check ownership here?
535 mStats.onBufferSent();
536 mTransactions.insert(std::make_pair(
537 message.transactionId,
538 std::make_unique<TransactionStatus>(message, mTimestampUs)));
539 insert(&mPendingTransactions, message.connectionId,
540 message.transactionId);
541 auto bufferIter = mBuffers.find(message.bufferId);
542 bufferIter->second->mTransactionCount++;
543 } else {
544 if (message.connectionId == found->second->mReceiver) {
545 found->second->mStatus = BufferStatus::TRANSFER_FROM;
546 }
547 }
548 return true;
549 }
550
handleTransferResult(const BufferStatusMessage & message)551 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
552 auto found = mTransactions.find(message.transactionId);
553 if (found != mTransactions.end()) {
554 bool deleted = erase(&mPendingTransactions, message.connectionId,
555 message.transactionId);
556 if (deleted) {
557 if (!found->second->mSenderValidated) {
558 mCompletedTransactions.insert(message.transactionId);
559 }
560 auto bufferIter = mBuffers.find(message.bufferId);
561 if (message.newStatus == BufferStatus::TRANSFER_OK) {
562 handleOwnBuffer(message.connectionId, message.bufferId);
563 }
564 bufferIter->second->mTransactionCount--;
565 if (bufferIter->second->mOwnerCount == 0
566 && bufferIter->second->mTransactionCount == 0) {
567 if (!bufferIter->second->mInvalidated) {
568 mStats.onBufferUnused(bufferIter->second->mAllocSize);
569 mFreeBuffers.insert(message.bufferId);
570 } else {
571 mStats.onBufferUnused(bufferIter->second->mAllocSize);
572 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
573 mBuffers.erase(bufferIter);
574 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
575 }
576 }
577 mTransactions.erase(found);
578 }
579 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
580 message.bufferId, deleted);
581 return deleted;
582 }
583 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
584 message.bufferId);
585 return false;
586 }
587
processStatusMessages()588 void Accessor::Impl::BufferPool::processStatusMessages() {
589 std::vector<BufferStatusMessage> messages;
590 mObserver.getBufferStatusChanges(messages);
591 mTimestampUs = getTimestampNow();
592 for (BufferStatusMessage& message: messages) {
593 bool ret = false;
594 switch (message.newStatus) {
595 case BufferStatus::NOT_USED:
596 ret = handleReleaseBuffer(
597 message.connectionId, message.bufferId);
598 break;
599 case BufferStatus::USED:
600 // not happening
601 break;
602 case BufferStatus::TRANSFER_TO:
603 ret = handleTransferTo(message);
604 break;
605 case BufferStatus::TRANSFER_FROM:
606 ret = handleTransferFrom(message);
607 break;
608 case BufferStatus::TRANSFER_TIMEOUT:
609 // TODO
610 break;
611 case BufferStatus::TRANSFER_LOST:
612 // TODO
613 break;
614 case BufferStatus::TRANSFER_FETCH:
615 // not happening
616 break;
617 case BufferStatus::TRANSFER_OK:
618 case BufferStatus::TRANSFER_ERROR:
619 ret = handleTransferResult(message);
620 break;
621 case BufferStatus::INVALIDATION_ACK:
622 mInvalidation.onAck(message.connectionId, message.bufferId);
623 ret = true;
624 break;
625 }
626 if (ret == false) {
627 ALOGW("buffer status message processing failure - message : %d connection : %lld",
628 (int)message.newStatus, (long long)message.connectionId);
629 }
630 }
631 messages.clear();
632 }
633
handleClose(ConnectionId connectionId)634 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
635 // Cleaning buffers
636 auto buffers = mUsingBuffers.find(connectionId);
637 if (buffers != mUsingBuffers.end()) {
638 for (const BufferId& bufferId : buffers->second) {
639 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
640 if (deleted) {
641 auto bufferIter = mBuffers.find(bufferId);
642 bufferIter->second->mOwnerCount--;
643 if (bufferIter->second->mOwnerCount == 0 &&
644 bufferIter->second->mTransactionCount == 0) {
645 // TODO: handle freebuffer insert fail
646 if (!bufferIter->second->mInvalidated) {
647 mStats.onBufferUnused(bufferIter->second->mAllocSize);
648 mFreeBuffers.insert(bufferId);
649 } else {
650 mStats.onBufferUnused(bufferIter->second->mAllocSize);
651 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
652 mBuffers.erase(bufferIter);
653 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
654 }
655 }
656 }
657 }
658 mUsingBuffers.erase(buffers);
659 }
660
661 // Cleaning transactions
662 auto pending = mPendingTransactions.find(connectionId);
663 if (pending != mPendingTransactions.end()) {
664 for (const TransactionId& transactionId : pending->second) {
665 auto iter = mTransactions.find(transactionId);
666 if (iter != mTransactions.end()) {
667 if (!iter->second->mSenderValidated) {
668 mCompletedTransactions.insert(transactionId);
669 }
670 BufferId bufferId = iter->second->mBufferId;
671 auto bufferIter = mBuffers.find(bufferId);
672 bufferIter->second->mTransactionCount--;
673 if (bufferIter->second->mOwnerCount == 0 &&
674 bufferIter->second->mTransactionCount == 0) {
675 // TODO: handle freebuffer insert fail
676 if (!bufferIter->second->mInvalidated) {
677 mStats.onBufferUnused(bufferIter->second->mAllocSize);
678 mFreeBuffers.insert(bufferId);
679 } else {
680 mStats.onBufferUnused(bufferIter->second->mAllocSize);
681 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
682 mBuffers.erase(bufferIter);
683 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
684 }
685 }
686 mTransactions.erase(iter);
687 }
688 }
689 }
690 mConnectionIds.erase(connectionId);
691 return true;
692 }
693
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)694 bool Accessor::Impl::BufferPool::getFreeBuffer(
695 const std::shared_ptr<BufferPoolAllocator> &allocator,
696 const std::vector<uint8_t> ¶ms, BufferId *pId,
697 const native_handle_t** handle) {
698 auto bufferIt = mFreeBuffers.begin();
699 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
700 BufferId bufferId = *bufferIt;
701 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
702 break;
703 }
704 }
705 if (bufferIt != mFreeBuffers.end()) {
706 BufferId id = *bufferIt;
707 mFreeBuffers.erase(bufferIt);
708 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
709 *handle = mBuffers[id]->handle();
710 *pId = id;
711 ALOGV("recycle a buffer %u %p", id, *handle);
712 return true;
713 }
714 return false;
715 }
716
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)717 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
718 const std::shared_ptr<BufferPoolAllocation> &alloc,
719 const size_t allocSize,
720 const std::vector<uint8_t> ¶ms,
721 BufferId *pId,
722 const native_handle_t** handle) {
723
724 BufferId bufferId = mSeq++;
725 if (mSeq == Connection::SYNC_BUFFERID) {
726 mSeq = 0;
727 }
728 std::unique_ptr<InternalBuffer> buffer =
729 std::make_unique<InternalBuffer>(
730 bufferId, alloc, allocSize, params);
731 if (buffer) {
732 auto res = mBuffers.insert(std::make_pair(
733 bufferId, std::move(buffer)));
734 if (res.second) {
735 mStats.onBufferAllocated(allocSize);
736 *handle = alloc->handle();
737 *pId = bufferId;
738 return ResultStatus::OK;
739 }
740 }
741 return ResultStatus::NO_MEMORY;
742 }
743
cleanUp(bool clearCache)744 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
745 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs ||
746 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
747 mLastCleanUpUs = mTimestampUs;
748 if (mTimestampUs > mLastLogUs + kLogDurationUs ||
749 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
750 mLastLogUs = mTimestampUs;
751 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
752 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
753 "%zu/%zu (fetch/transfer)",
754 this, mStats.mBuffersCached, mStats.mSizeCached,
755 mStats.mBuffersInUse, mStats.mSizeInUse,
756 mStats.mTotalRecycles, mStats.mTotalAllocations,
757 mStats.mTotalFetches, mStats.mTotalTransfers);
758 }
759 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
760 if (!clearCache && mStats.buffersNotInUse() <= kUnusedBufferCountTarget &&
761 (mStats.mSizeCached < kMinAllocBytesForEviction ||
762 mBuffers.size() < kMinBufferCountForEviction)) {
763 break;
764 }
765 auto it = mBuffers.find(*freeIt);
766 if (it != mBuffers.end() &&
767 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
768 mStats.onBufferEvicted(it->second->mAllocSize);
769 mBuffers.erase(it);
770 freeIt = mFreeBuffers.erase(freeIt);
771 } else {
772 ++freeIt;
773 ALOGW("bufferpool2 inconsistent!");
774 }
775 }
776 }
777 }
778
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor::Impl> & impl)779 void Accessor::Impl::BufferPool::invalidate(
780 bool needsAck, BufferId from, BufferId to,
781 const std::shared_ptr<Accessor::Impl> &impl) {
782 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
783 if (isBufferInRange(from, to, *freeIt)) {
784 auto it = mBuffers.find(*freeIt);
785 if (it != mBuffers.end() &&
786 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
787 mStats.onBufferEvicted(it->second->mAllocSize);
788 mBuffers.erase(it);
789 freeIt = mFreeBuffers.erase(freeIt);
790 continue;
791 } else {
792 ALOGW("bufferpool2 inconsistent!");
793 }
794 }
795 ++freeIt;
796 }
797
798 size_t left = 0;
799 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
800 if (isBufferInRange(from, to, it->first)) {
801 it->second->invalidate();
802 ++left;
803 }
804 }
805 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
806 }
807
flush(const std::shared_ptr<Accessor::Impl> & impl)808 void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
809 BufferId from = mStartSeq;
810 BufferId to = mSeq;
811 mStartSeq = mSeq;
812 // TODO: needsAck params
813 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
814 if (from != to) {
815 invalidate(true, from, to, impl);
816 }
817 }
818
invalidatorThread(std::map<uint32_t,const std::weak_ptr<Accessor::Impl>> & accessors,std::mutex & mutex,std::condition_variable & cv,bool & ready)819 void Accessor::Impl::invalidatorThread(
820 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
821 std::mutex &mutex,
822 std::condition_variable &cv,
823 bool &ready) {
824 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
825 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
826 constexpr useconds_t MAX_SLEEP_US = 10000;
827 uint32_t numSpin = 0;
828 useconds_t sleepUs = 1;
829
830 while(true) {
831 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
832 {
833 std::unique_lock<std::mutex> lock(mutex);
834 if (!ready) {
835 numSpin = 0;
836 sleepUs = 1;
837 cv.wait(lock);
838 }
839 copied.insert(accessors.begin(), accessors.end());
840 }
841 std::list<ConnectionId> erased;
842 for (auto it = copied.begin(); it != copied.end(); ++it) {
843 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
844 if (!impl) {
845 erased.push_back(it->first);
846 } else {
847 impl->handleInvalidateAck();
848 }
849 }
850 {
851 std::unique_lock<std::mutex> lock(mutex);
852 for (auto it = erased.begin(); it != erased.end(); ++it) {
853 accessors.erase(*it);
854 }
855 if (accessors.size() == 0) {
856 ready = false;
857 } else {
858 // TODO Use an efficient way to wait over FMQ.
859 // N.B. Since there is not a efficient way to wait over FMQ,
860 // polling over the FMQ is the current way to prevent draining
861 // CPU.
862 lock.unlock();
863 ++numSpin;
864 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
865 sleepUs < MAX_SLEEP_US) {
866 sleepUs *= 10;
867 }
868 if (numSpin % NUM_SPIN_TO_LOG == 0) {
869 ALOGW("invalidator thread spinning");
870 }
871 ::usleep(sleepUs);
872 }
873 }
874 }
875 }
876
AccessorInvalidator()877 Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
878 std::thread invalidator(
879 invalidatorThread,
880 std::ref(mAccessors),
881 std::ref(mMutex),
882 std::ref(mCv),
883 std::ref(mReady));
884 invalidator.detach();
885 }
886
addAccessor(uint32_t accessorId,const std::weak_ptr<Accessor::Impl> & impl)887 void Accessor::Impl::AccessorInvalidator::addAccessor(
888 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
889 bool notify = false;
890 std::unique_lock<std::mutex> lock(mMutex);
891 if (mAccessors.find(accessorId) == mAccessors.end()) {
892 if (!mReady) {
893 mReady = true;
894 notify = true;
895 }
896 mAccessors.insert(std::make_pair(accessorId, impl));
897 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
898 }
899 lock.unlock();
900 if (notify) {
901 mCv.notify_one();
902 }
903 }
904
delAccessor(uint32_t accessorId)905 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
906 std::lock_guard<std::mutex> lock(mMutex);
907 mAccessors.erase(accessorId);
908 ALOGV("buffer invalidation deleted bp:%u", accessorId);
909 if (mAccessors.size() == 0) {
910 mReady = false;
911 }
912 }
913
914 std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
915
createInvalidator()916 void Accessor::Impl::createInvalidator() {
917 if (!sInvalidator) {
918 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
919 }
920 }
921
evictorThread(std::map<const std::weak_ptr<Accessor::Impl>,nsecs_t,std::owner_less<>> & accessors,std::mutex & mutex,std::condition_variable & cv)922 void Accessor::Impl::evictorThread(
923 std::map<const std::weak_ptr<Accessor::Impl>, nsecs_t, std::owner_less<>> &accessors,
924 std::mutex &mutex,
925 std::condition_variable &cv) {
926 std::list<const std::weak_ptr<Accessor::Impl>> evictList;
927 while (true) {
928 int expired = 0;
929 int evicted = 0;
930 {
931 nsecs_t now = systemTime();
932 std::unique_lock<std::mutex> lock(mutex);
933 if (accessors.size() == 0) {
934 cv.wait(lock);
935 }
936 auto it = accessors.begin();
937 while (it != accessors.end()) {
938 if (now > (it->second + kEvictDurationNs)) {
939 ++expired;
940 evictList.push_back(it->first);
941 it = accessors.erase(it);
942 } else {
943 ++it;
944 }
945 }
946 }
947 // evict idle accessors;
948 for (auto it = evictList.begin(); it != evictList.end(); ++it) {
949 const std::shared_ptr<Accessor::Impl> accessor = it->lock();
950 if (accessor) {
951 accessor->cleanUp(true);
952 ++evicted;
953 }
954 }
955 if (expired > 0) {
956 ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
957 }
958 evictList.clear();
959 ::usleep(kEvictGranularityNs / 1000);
960 }
961 }
962
AccessorEvictor()963 Accessor::Impl::AccessorEvictor::AccessorEvictor() {
964 std::thread evictor(
965 evictorThread,
966 std::ref(mAccessors),
967 std::ref(mMutex),
968 std::ref(mCv));
969 evictor.detach();
970 }
971
addAccessor(const std::weak_ptr<Accessor::Impl> & impl,nsecs_t ts)972 void Accessor::Impl::AccessorEvictor::addAccessor(
973 const std::weak_ptr<Accessor::Impl> &impl, nsecs_t ts) {
974 std::lock_guard<std::mutex> lock(mMutex);
975 bool notify = mAccessors.empty();
976 auto it = mAccessors.find(impl);
977 if (it == mAccessors.end()) {
978 mAccessors.emplace(impl, ts);
979 } else {
980 it->second = ts;
981 }
982 if (notify) {
983 mCv.notify_one();
984 }
985 }
986
987 std::unique_ptr<Accessor::Impl::AccessorEvictor> Accessor::Impl::sEvictor;
988
createEvictor()989 void Accessor::Impl::createEvictor() {
990 if (!sEvictor) {
991 sEvictor = std::make_unique<Accessor::Impl::AccessorEvictor>();
992 }
993 }
994
scheduleEvictIfNeeded()995 void Accessor::Impl::scheduleEvictIfNeeded() {
996 nsecs_t now = systemTime();
997
998 if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
999 mScheduleEvictTs = now;
1000 sEvictor->addAccessor(shared_from_this(), now);
1001 }
1002 }
1003
1004 } // namespace implementation
1005 } // namespace V2_0
1006 } // namespace bufferpool
1007 } // namespace media
1008 } // namespace hardware
1009 } // namespace android
1010