1 /*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #define LOG_TAG "BufferPoolAccessor1.0"
18 //#define LOG_NDEBUG 0
19
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include "AccessorImpl.h"
26 #include "Connection.h"
27
28 namespace android {
29 namespace hardware {
30 namespace media {
31 namespace bufferpool {
32 namespace V1_0 {
33 namespace implementation {
34
35 namespace {
36 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38
39 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40 static constexpr size_t kMinBufferCountForEviction = 40;
41 }
42
43 // Buffer structure in bufferpool process
44 struct InternalBuffer {
45 BufferId mId;
46 size_t mOwnerCount;
47 size_t mTransactionCount;
48 const std::shared_ptr<BufferPoolAllocation> mAllocation;
49 const size_t mAllocSize;
50 const std::vector<uint8_t> mConfig;
51
InternalBufferandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer52 InternalBuffer(
53 BufferId id,
54 const std::shared_ptr<BufferPoolAllocation> &alloc,
55 const size_t allocSize,
56 const std::vector<uint8_t> &allocConfig)
57 : mId(id), mOwnerCount(0), mTransactionCount(0),
58 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
59
handleandroid::hardware::media::bufferpool::V1_0::implementation::InternalBuffer60 const native_handle_t *handle() {
61 return mAllocation->handle();
62 }
63 };
64
65 struct TransactionStatus {
66 TransactionId mId;
67 BufferId mBufferId;
68 ConnectionId mSender;
69 ConnectionId mReceiver;
70 BufferStatus mStatus;
71 int64_t mTimestampUs;
72 bool mSenderValidated;
73
TransactionStatusandroid::hardware::media::bufferpool::V1_0::implementation::TransactionStatus74 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
75 mId = message.transactionId;
76 mBufferId = message.bufferId;
77 mStatus = message.newStatus;
78 mTimestampUs = timestampUs;
79 if (mStatus == BufferStatus::TRANSFER_TO) {
80 mSender = message.connectionId;
81 mReceiver = message.targetConnectionId;
82 mSenderValidated = true;
83 } else {
84 mSender = -1LL;
85 mReceiver = message.connectionId;
86 mSenderValidated = false;
87 }
88 }
89 };
90
91 // Helper template methods for handling map of set.
92 template<class T, class U>
insert(std::map<T,std::set<U>> * mapOfSet,T key,U value)93 bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
94 auto iter = mapOfSet->find(key);
95 if (iter == mapOfSet->end()) {
96 std::set<U> valueSet{value};
97 mapOfSet->insert(std::make_pair(key, valueSet));
98 return true;
99 } else if (iter->second.find(value) == iter->second.end()) {
100 iter->second.insert(value);
101 return true;
102 }
103 return false;
104 }
105
106 template<class T, class U>
erase(std::map<T,std::set<U>> * mapOfSet,T key,U value)107 bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
108 bool ret = false;
109 auto iter = mapOfSet->find(key);
110 if (iter != mapOfSet->end()) {
111 if (iter->second.erase(value) > 0) {
112 ret = true;
113 }
114 if (iter->second.size() == 0) {
115 mapOfSet->erase(iter);
116 }
117 }
118 return ret;
119 }
120
121 template<class T, class U>
contains(std::map<T,std::set<U>> * mapOfSet,T key,U value)122 bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
123 auto iter = mapOfSet->find(key);
124 if (iter != mapOfSet->end()) {
125 auto setIter = iter->second.find(value);
126 return setIter != iter->second.end();
127 }
128 return false;
129 }
130
131 uint32_t Accessor::Impl::sSeqId = time(nullptr);
132
Impl(const std::shared_ptr<BufferPoolAllocator> & allocator)133 Accessor::Impl::Impl(
134 const std::shared_ptr<BufferPoolAllocator> &allocator)
135 : mAllocator(allocator) {}
136
~Impl()137 Accessor::Impl::~Impl() {
138 }
139
connect(const sp<Accessor> & accessor,sp<Connection> * connection,ConnectionId * pConnectionId,const QueueDescriptor ** fmqDescPtr)140 ResultStatus Accessor::Impl::connect(
141 const sp<Accessor> &accessor, sp<Connection> *connection,
142 ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) {
143 sp<Connection> newConnection = new Connection();
144 ResultStatus status = ResultStatus::CRITICAL_ERROR;
145 {
146 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
147 if (newConnection) {
148 int32_t pid = getpid();
149 ConnectionId id = (int64_t)pid << 32 | sSeqId;
150 status = mBufferPool.mObserver.open(id, fmqDescPtr);
151 if (status == ResultStatus::OK) {
152 newConnection->initialize(accessor, id);
153 *connection = newConnection;
154 *pConnectionId = id;
155 mBufferPool.mConnectionIds.insert(id);
156 if (sSeqId == UINT32_MAX) {
157 sSeqId = 0;
158 } else {
159 ++sSeqId;
160 }
161 }
162 }
163 mBufferPool.processStatusMessages();
164 mBufferPool.cleanUp();
165 }
166 return status;
167 }
168
close(ConnectionId connectionId)169 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
170 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
171 mBufferPool.processStatusMessages();
172 mBufferPool.handleClose(connectionId);
173 mBufferPool.mObserver.close(connectionId);
174 // Since close# will be called after all works are finished, it is OK to
175 // evict unused buffers.
176 mBufferPool.cleanUp(true);
177 return ResultStatus::OK;
178 }
179
allocate(ConnectionId connectionId,const std::vector<uint8_t> & params,BufferId * bufferId,const native_handle_t ** handle)180 ResultStatus Accessor::Impl::allocate(
181 ConnectionId connectionId, const std::vector<uint8_t>& params,
182 BufferId *bufferId, const native_handle_t** handle) {
183 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
184 mBufferPool.processStatusMessages();
185 ResultStatus status = ResultStatus::OK;
186 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
187 lock.unlock();
188 std::shared_ptr<BufferPoolAllocation> alloc;
189 size_t allocSize;
190 status = mAllocator->allocate(params, &alloc, &allocSize);
191 lock.lock();
192 if (status == ResultStatus::OK) {
193 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
194 }
195 ALOGV("create a buffer %d : %u %p",
196 status == ResultStatus::OK, *bufferId, *handle);
197 }
198 if (status == ResultStatus::OK) {
199 // TODO: handle ownBuffer failure
200 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
201 }
202 mBufferPool.cleanUp();
203 return status;
204 }
205
fetch(ConnectionId connectionId,TransactionId transactionId,BufferId bufferId,const native_handle_t ** handle)206 ResultStatus Accessor::Impl::fetch(
207 ConnectionId connectionId, TransactionId transactionId,
208 BufferId bufferId, const native_handle_t** handle) {
209 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
210 mBufferPool.processStatusMessages();
211 auto found = mBufferPool.mTransactions.find(transactionId);
212 if (found != mBufferPool.mTransactions.end() &&
213 contains(&mBufferPool.mPendingTransactions,
214 connectionId, transactionId)) {
215 if (found->second->mSenderValidated &&
216 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
217 found->second->mBufferId == bufferId) {
218 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
219 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
220 if (bufferIt != mBufferPool.mBuffers.end()) {
221 mBufferPool.mStats.onBufferFetched();
222 *handle = bufferIt->second->handle();
223 return ResultStatus::OK;
224 }
225 }
226 }
227 mBufferPool.cleanUp();
228 return ResultStatus::CRITICAL_ERROR;
229 }
230
cleanUp(bool clearCache)231 void Accessor::Impl::cleanUp(bool clearCache) {
232 // transaction timeout, buffer cacheing TTL handling
233 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
234 mBufferPool.processStatusMessages();
235 mBufferPool.cleanUp(clearCache);
236 }
237
BufferPool()238 Accessor::Impl::Impl::BufferPool::BufferPool()
239 : mTimestampUs(getTimestampNow()),
240 mLastCleanUpUs(mTimestampUs),
241 mLastLogUs(mTimestampUs),
242 mSeq(0) {}
243
244
245 // Statistics helper
246 template<typename T, typename S>
percentage(T base,S total)247 int percentage(T base, S total) {
248 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
249 }
250
~BufferPool()251 Accessor::Impl::Impl::BufferPool::~BufferPool() {
252 std::lock_guard<std::mutex> lock(mMutex);
253 ALOGD("Destruction - bufferpool %p "
254 "cached: %zu/%zuM, %zu/%d%% in use; "
255 "allocs: %zu, %d%% recycled; "
256 "transfers: %zu, %d%% unfetched",
257 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
258 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
259 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
260 mStats.mTotalTransfers,
261 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
262 }
263
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)264 bool Accessor::Impl::BufferPool::handleOwnBuffer(
265 ConnectionId connectionId, BufferId bufferId) {
266
267 bool added = insert(&mUsingBuffers, connectionId, bufferId);
268 if (added) {
269 auto iter = mBuffers.find(bufferId);
270 iter->second->mOwnerCount++;
271 }
272 insert(&mUsingConnections, bufferId, connectionId);
273 return added;
274 }
275
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)276 bool Accessor::Impl::BufferPool::handleReleaseBuffer(
277 ConnectionId connectionId, BufferId bufferId) {
278 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
279 if (deleted) {
280 auto iter = mBuffers.find(bufferId);
281 iter->second->mOwnerCount--;
282 if (iter->second->mOwnerCount == 0 &&
283 iter->second->mTransactionCount == 0) {
284 mStats.onBufferUnused(iter->second->mAllocSize);
285 mFreeBuffers.insert(bufferId);
286 }
287 }
288 erase(&mUsingConnections, bufferId, connectionId);
289 ALOGV("release buffer %u : %d", bufferId, deleted);
290 return deleted;
291 }
292
handleTransferTo(const BufferStatusMessage & message)293 bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
294 auto completed = mCompletedTransactions.find(
295 message.transactionId);
296 if (completed != mCompletedTransactions.end()) {
297 // already completed
298 mCompletedTransactions.erase(completed);
299 return true;
300 }
301 // the buffer should exist and be owned.
302 auto bufferIter = mBuffers.find(message.bufferId);
303 if (bufferIter == mBuffers.end() ||
304 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
305 return false;
306 }
307 auto found = mTransactions.find(message.transactionId);
308 if (found != mTransactions.end()) {
309 // transfer_from was received earlier.
310 found->second->mSender = message.connectionId;
311 found->second->mSenderValidated = true;
312 return true;
313 }
314 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
315 // N.B: it could be fake or receive connection already closed.
316 ALOGD("bufferpool %p receiver connection %lld is no longer valid",
317 this, (long long)message.targetConnectionId);
318 return false;
319 }
320 mStats.onBufferSent();
321 mTransactions.insert(std::make_pair(
322 message.transactionId,
323 std::make_unique<TransactionStatus>(message, mTimestampUs)));
324 insert(&mPendingTransactions, message.targetConnectionId,
325 message.transactionId);
326 bufferIter->second->mTransactionCount++;
327 return true;
328 }
329
handleTransferFrom(const BufferStatusMessage & message)330 bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
331 auto found = mTransactions.find(message.transactionId);
332 if (found == mTransactions.end()) {
333 // TODO: is it feasible to check ownership here?
334 mStats.onBufferSent();
335 mTransactions.insert(std::make_pair(
336 message.transactionId,
337 std::make_unique<TransactionStatus>(message, mTimestampUs)));
338 insert(&mPendingTransactions, message.connectionId,
339 message.transactionId);
340 auto bufferIter = mBuffers.find(message.bufferId);
341 bufferIter->second->mTransactionCount++;
342 } else {
343 if (message.connectionId == found->second->mReceiver) {
344 found->second->mStatus = BufferStatus::TRANSFER_FROM;
345 }
346 }
347 return true;
348 }
349
handleTransferResult(const BufferStatusMessage & message)350 bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
351 auto found = mTransactions.find(message.transactionId);
352 if (found != mTransactions.end()) {
353 bool deleted = erase(&mPendingTransactions, message.connectionId,
354 message.transactionId);
355 if (deleted) {
356 if (!found->second->mSenderValidated) {
357 mCompletedTransactions.insert(message.transactionId);
358 }
359 auto bufferIter = mBuffers.find(message.bufferId);
360 if (message.newStatus == BufferStatus::TRANSFER_OK) {
361 handleOwnBuffer(message.connectionId, message.bufferId);
362 }
363 bufferIter->second->mTransactionCount--;
364 if (bufferIter->second->mOwnerCount == 0
365 && bufferIter->second->mTransactionCount == 0) {
366 mStats.onBufferUnused(bufferIter->second->mAllocSize);
367 mFreeBuffers.insert(message.bufferId);
368 }
369 mTransactions.erase(found);
370 }
371 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
372 message.bufferId, deleted);
373 return deleted;
374 }
375 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
376 message.bufferId);
377 return false;
378 }
379
processStatusMessages()380 void Accessor::Impl::BufferPool::processStatusMessages() {
381 std::vector<BufferStatusMessage> messages;
382 mObserver.getBufferStatusChanges(messages);
383 mTimestampUs = getTimestampNow();
384 for (BufferStatusMessage& message: messages) {
385 bool ret = false;
386 switch (message.newStatus) {
387 case BufferStatus::NOT_USED:
388 ret = handleReleaseBuffer(
389 message.connectionId, message.bufferId);
390 break;
391 case BufferStatus::USED:
392 // not happening
393 break;
394 case BufferStatus::TRANSFER_TO:
395 ret = handleTransferTo(message);
396 break;
397 case BufferStatus::TRANSFER_FROM:
398 ret = handleTransferFrom(message);
399 break;
400 case BufferStatus::TRANSFER_TIMEOUT:
401 // TODO
402 break;
403 case BufferStatus::TRANSFER_LOST:
404 // TODO
405 break;
406 case BufferStatus::TRANSFER_FETCH:
407 // not happening
408 break;
409 case BufferStatus::TRANSFER_OK:
410 case BufferStatus::TRANSFER_ERROR:
411 ret = handleTransferResult(message);
412 break;
413 }
414 if (ret == false) {
415 ALOGW("buffer status message processing failure - message : %d connection : %lld",
416 message.newStatus, (long long)message.connectionId);
417 }
418 }
419 messages.clear();
420 }
421
handleClose(ConnectionId connectionId)422 bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
423 // Cleaning buffers
424 auto buffers = mUsingBuffers.find(connectionId);
425 if (buffers != mUsingBuffers.end()) {
426 for (const BufferId& bufferId : buffers->second) {
427 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
428 if (deleted) {
429 auto bufferIter = mBuffers.find(bufferId);
430 bufferIter->second->mOwnerCount--;
431 if (bufferIter->second->mOwnerCount == 0 &&
432 bufferIter->second->mTransactionCount == 0) {
433 // TODO: handle freebuffer insert fail
434 mStats.onBufferUnused(bufferIter->second->mAllocSize);
435 mFreeBuffers.insert(bufferId);
436 }
437 }
438 }
439 mUsingBuffers.erase(buffers);
440 }
441
442 // Cleaning transactions
443 auto pending = mPendingTransactions.find(connectionId);
444 if (pending != mPendingTransactions.end()) {
445 for (const TransactionId& transactionId : pending->second) {
446 auto iter = mTransactions.find(transactionId);
447 if (iter != mTransactions.end()) {
448 if (!iter->second->mSenderValidated) {
449 mCompletedTransactions.insert(transactionId);
450 }
451 BufferId bufferId = iter->second->mBufferId;
452 auto bufferIter = mBuffers.find(bufferId);
453 bufferIter->second->mTransactionCount--;
454 if (bufferIter->second->mOwnerCount == 0 &&
455 bufferIter->second->mTransactionCount == 0) {
456 // TODO: handle freebuffer insert fail
457 mStats.onBufferUnused(bufferIter->second->mAllocSize);
458 mFreeBuffers.insert(bufferId);
459 }
460 mTransactions.erase(iter);
461 }
462 }
463 }
464 mConnectionIds.erase(connectionId);
465 return true;
466 }
467
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)468 bool Accessor::Impl::BufferPool::getFreeBuffer(
469 const std::shared_ptr<BufferPoolAllocator> &allocator,
470 const std::vector<uint8_t> ¶ms, BufferId *pId,
471 const native_handle_t** handle) {
472 auto bufferIt = mFreeBuffers.begin();
473 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
474 BufferId bufferId = *bufferIt;
475 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
476 break;
477 }
478 }
479 if (bufferIt != mFreeBuffers.end()) {
480 BufferId id = *bufferIt;
481 mFreeBuffers.erase(bufferIt);
482 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
483 *handle = mBuffers[id]->handle();
484 *pId = id;
485 ALOGV("recycle a buffer %u %p", id, *handle);
486 return true;
487 }
488 return false;
489 }
490
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)491 ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
492 const std::shared_ptr<BufferPoolAllocation> &alloc,
493 const size_t allocSize,
494 const std::vector<uint8_t> ¶ms,
495 BufferId *pId,
496 const native_handle_t** handle) {
497
498 BufferId bufferId = mSeq++;
499 if (mSeq == Connection::SYNC_BUFFERID) {
500 mSeq = 0;
501 }
502 std::unique_ptr<InternalBuffer> buffer =
503 std::make_unique<InternalBuffer>(
504 bufferId, alloc, allocSize, params);
505 if (buffer) {
506 auto res = mBuffers.insert(std::make_pair(
507 bufferId, std::move(buffer)));
508 if (res.second) {
509 mStats.onBufferAllocated(allocSize);
510 *handle = alloc->handle();
511 *pId = bufferId;
512 return ResultStatus::OK;
513 }
514 }
515 return ResultStatus::NO_MEMORY;
516 }
517
cleanUp(bool clearCache)518 void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
519 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
520 mLastCleanUpUs = mTimestampUs;
521 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
522 mLastLogUs = mTimestampUs;
523 ALOGD("bufferpool %p : %zu(%zu size) total buffers - "
524 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
525 "%zu/%zu (fetch/transfer)",
526 this, mStats.mBuffersCached, mStats.mSizeCached,
527 mStats.mBuffersInUse, mStats.mSizeInUse,
528 mStats.mTotalRecycles, mStats.mTotalAllocations,
529 mStats.mTotalFetches, mStats.mTotalTransfers);
530 }
531 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
532 if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
533 && mBuffers.size() < kMinBufferCountForEviction) {
534 break;
535 }
536 auto it = mBuffers.find(*freeIt);
537 if (it != mBuffers.end() &&
538 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
539 mStats.onBufferEvicted(it->second->mAllocSize);
540 mBuffers.erase(it);
541 freeIt = mFreeBuffers.erase(freeIt);
542 } else {
543 ++freeIt;
544 ALOGW("bufferpool inconsistent!");
545 }
546 }
547 }
548 }
549
550 } // namespace implementation
551 } // namespace V1_0
552 } // namespace bufferpool
553 } // namespace media
554 } // namespace hardware
555 } // namespace android
556