1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #define LOG_TAG "AidlBufferPool"
18 //#define LOG_NDEBUG 0
19 
20 #include <sys/types.h>
21 #include <stdint.h>
22 #include <time.h>
23 #include <unistd.h>
24 #include <utils/Log.h>
25 #include <thread>
26 #include "Accessor.h"
27 #include "BufferPool.h"
28 #include "Connection.h"
29 #include "DataHelper.h"
30 
31 namespace aidl::android::hardware::media::bufferpool2::implementation {
32 
33 namespace {
34     static constexpr int64_t kCleanUpDurationMs = 500; // 0.5 sec
35     static constexpr int64_t kLogDurationMs = 5000; // 5 secs
36 
37     static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
38     static constexpr size_t kMinBufferCountForEviction = 25;
39     static constexpr size_t kMaxUnusedBufferCount = 64;
40     static constexpr size_t kUnusedBufferCountTarget = kMaxUnusedBufferCount - 16;
41 }
42 
BufferPool()43 BufferPool::BufferPool()
44     : mTimestampMs(::android::elapsedRealtime()),
45       mLastCleanUpMs(mTimestampMs),
46       mLastLogMs(mTimestampMs),
47       mSeq(0),
48       mStartSeq(0) {
49     mValid = mInvalidationChannel.isValid();
50 }
51 
52 
53 // Statistics helper
54 template<typename T, typename S>
percentage(T base,S total)55 int percentage(T base, S total) {
56     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
57 }
58 
59 std::atomic<std::uint32_t> BufferPool::Invalidation::sInvSeqId(0);
60 
~BufferPool()61 BufferPool::~BufferPool() {
62     std::lock_guard<std::mutex> lock(mMutex);
63     ALOGD("Destruction - bufferpool2 %p "
64           "cached: %zu/%zuM, %zu/%d%% in use; "
65           "allocs: %zu, %d%% recycled; "
66           "transfers: %zu, %d%% unfetched",
67           this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
68           mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
69           mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
70           mStats.mTotalTransfers,
71           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
72 }
73 
onConnect(ConnectionId conId,const std::shared_ptr<IObserver> & observer)74 void BufferPool::Invalidation::onConnect(
75         ConnectionId conId, const std::shared_ptr<IObserver>& observer) {
76     mAcks[conId] = mInvalidationId; // starts from current invalidationId
77     mObservers.insert(std::make_pair(conId, observer));
78 }
79 
onClose(ConnectionId conId)80 void BufferPool::Invalidation::onClose(ConnectionId conId) {
81     mAcks.erase(conId);
82     mObservers.erase(conId);
83 }
84 
onAck(ConnectionId conId,uint32_t msgId)85 void BufferPool::Invalidation::onAck(
86         ConnectionId conId,
87         uint32_t msgId) {
88     auto it = mAcks.find(conId);
89     if (it == mAcks.end()) {
90         ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
91         return;
92     }
93     if (isMessageLater(msgId, it->second)) {
94         mAcks[conId] = msgId;
95     }
96 }
97 
onBufferInvalidated(BufferId bufferId,BufferInvalidationChannel & channel)98 void BufferPool::Invalidation::onBufferInvalidated(
99         BufferId bufferId,
100         BufferInvalidationChannel &channel) {
101     for (auto it = mPendings.begin(); it != mPendings.end();) {
102         if (it->isInvalidated(bufferId)) {
103             uint32_t msgId = 0;
104             if (it->mNeedsAck) {
105                 if (mInvalidationId == UINT_MAX) {
106                     // wrap happens;
107                     mInvalidationId = 0;
108                 }
109                 msgId = ++mInvalidationId;
110             }
111             channel.postInvalidation(msgId, it->mFrom, it->mTo);
112             it = mPendings.erase(it);
113             continue;
114         }
115         ++it;
116     }
117 }
118 
onInvalidationRequest(bool needsAck,uint32_t from,uint32_t to,size_t left,BufferInvalidationChannel & channel,const std::shared_ptr<Accessor> & impl)119 void BufferPool::Invalidation::onInvalidationRequest(
120         bool needsAck,
121         uint32_t from,
122         uint32_t to,
123         size_t left,
124         BufferInvalidationChannel &channel,
125         const std::shared_ptr<Accessor> &impl) {
126         uint32_t msgId = 0;
127     if (needsAck) {
128         if (mInvalidationId == UINT_MAX) {
129             //wrap happens
130             mInvalidationId = 0;
131         }
132         msgId = ++mInvalidationId;
133     }
134     ALOGV("bufferpool2 invalidation requested and queued");
135     if (left == 0) {
136         channel.postInvalidation(msgId, from, to);
137     } else {
138         ALOGV("bufferpoo2 invalidation requested and pending");
139         Pending pending(needsAck, from, to, left, impl);
140         mPendings.push_back(pending);
141     }
142     Accessor::sInvalidator->addAccessor(mId, impl);
143 }
144 
onHandleAck(std::map<ConnectionId,const std::shared_ptr<IObserver>> * observers,uint32_t * invalidationId)145 void BufferPool::Invalidation::onHandleAck(
146         std::map<ConnectionId, const std::shared_ptr<IObserver>> *observers,
147         uint32_t *invalidationId) {
148     if (mInvalidationId != 0) {
149         *invalidationId = mInvalidationId;
150         std::set<int> deads;
151         for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
152             if (it->second != mInvalidationId) {
153                 const std::shared_ptr<IObserver> observer = mObservers[it->first];
154                 if (observer) {
155                     observers->emplace(it->first, observer);
156                     ALOGV("connection %lld will call observer (%u: %u)",
157                           (long long)it->first, it->second, mInvalidationId);
158                     // N.B: onMessage will be called later. ignore possibility of
159                     // onMessage# oneway call being lost.
160                     it->second = mInvalidationId;
161                 } else {
162                     ALOGV("bufferpool2 observer died %lld", (long long)it->first);
163                     deads.insert(it->first);
164                 }
165             }
166         }
167         if (deads.size() > 0) {
168             for (auto it = deads.begin(); it != deads.end(); ++it) {
169                 onClose(*it);
170             }
171         }
172     }
173     if (mPendings.size() == 0) {
174         // All invalidation Ids are synced and no more pending invalidations.
175         Accessor::sInvalidator->delAccessor(mId);
176     }
177 }
178 
handleOwnBuffer(ConnectionId connectionId,BufferId bufferId)179 bool BufferPool::handleOwnBuffer(
180         ConnectionId connectionId, BufferId bufferId) {
181 
182     bool added = insert(&mUsingBuffers, connectionId, bufferId);
183     if (added) {
184         auto iter = mBuffers.find(bufferId);
185         iter->second->mOwnerCount++;
186     }
187     insert(&mUsingConnections, bufferId, connectionId);
188     return added;
189 }
190 
handleReleaseBuffer(ConnectionId connectionId,BufferId bufferId)191 bool BufferPool::handleReleaseBuffer(
192         ConnectionId connectionId, BufferId bufferId) {
193     bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
194     if (deleted) {
195         auto iter = mBuffers.find(bufferId);
196         iter->second->mOwnerCount--;
197         if (iter->second->mOwnerCount == 0 &&
198                 iter->second->mTransactionCount == 0) {
199             if (!iter->second->mInvalidated) {
200                 mStats.onBufferUnused(iter->second->mAllocSize);
201                 mFreeBuffers.insert(bufferId);
202             } else {
203                 mStats.onBufferUnused(iter->second->mAllocSize);
204                 mStats.onBufferEvicted(iter->second->mAllocSize);
205                 mBuffers.erase(iter);
206                 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
207             }
208         }
209     }
210     erase(&mUsingConnections, bufferId, connectionId);
211     ALOGV("release buffer %u : %d", bufferId, deleted);
212     return deleted;
213 }
214 
handleTransferTo(const BufferStatusMessage & message)215 bool BufferPool::handleTransferTo(const BufferStatusMessage &message) {
216     auto completed = mCompletedTransactions.find(
217             message.transactionId);
218     if (completed != mCompletedTransactions.end()) {
219         // already completed
220         mCompletedTransactions.erase(completed);
221         return true;
222     }
223     // the buffer should exist and be owned.
224     auto bufferIter = mBuffers.find(message.bufferId);
225     if (bufferIter == mBuffers.end() ||
226             !contains(&mUsingBuffers, message.connectionId, FromAidl(message.bufferId))) {
227         return false;
228     }
229     auto found = mTransactions.find(message.transactionId);
230     if (found != mTransactions.end()) {
231         // transfer_from was received earlier.
232         found->second->mSender = message.connectionId;
233         found->second->mSenderValidated = true;
234         return true;
235     }
236     if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
237         // N.B: it could be fake or receive connection already closed.
238         ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
239               this, (long long)message.targetConnectionId);
240         return false;
241     }
242     mStats.onBufferSent();
243     mTransactions.insert(std::make_pair(
244             message.transactionId,
245             std::make_unique<TransactionStatus>(message, mTimestampMs)));
246     insert(&mPendingTransactions, message.targetConnectionId,
247            FromAidl(message.transactionId));
248     bufferIter->second->mTransactionCount++;
249     return true;
250 }
251 
handleTransferFrom(const BufferStatusMessage & message)252 bool BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
253     auto found = mTransactions.find(message.transactionId);
254     if (found == mTransactions.end()) {
255         // TODO: is it feasible to check ownership here?
256         mStats.onBufferSent();
257         mTransactions.insert(std::make_pair(
258                 message.transactionId,
259                 std::make_unique<TransactionStatus>(message, mTimestampMs)));
260         insert(&mPendingTransactions, message.connectionId,
261                FromAidl(message.transactionId));
262         auto bufferIter = mBuffers.find(message.bufferId);
263         bufferIter->second->mTransactionCount++;
264     } else {
265         if (message.connectionId == found->second->mReceiver) {
266             found->second->mStatus = BufferStatus::TRANSFER_FROM;
267         }
268     }
269     return true;
270 }
271 
handleTransferResult(const BufferStatusMessage & message)272 bool BufferPool::handleTransferResult(const BufferStatusMessage &message) {
273     auto found = mTransactions.find(message.transactionId);
274     if (found != mTransactions.end()) {
275         bool deleted = erase(&mPendingTransactions, message.connectionId,
276                              FromAidl(message.transactionId));
277         if (deleted) {
278             if (!found->second->mSenderValidated) {
279                 mCompletedTransactions.insert(message.transactionId);
280             }
281             auto bufferIter = mBuffers.find(message.bufferId);
282             if (message.status == BufferStatus::TRANSFER_OK) {
283                 handleOwnBuffer(message.connectionId, message.bufferId);
284             }
285             bufferIter->second->mTransactionCount--;
286             if (bufferIter->second->mOwnerCount == 0
287                 && bufferIter->second->mTransactionCount == 0) {
288                 if (!bufferIter->second->mInvalidated) {
289                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
290                     mFreeBuffers.insert(message.bufferId);
291                 } else {
292                     mStats.onBufferUnused(bufferIter->second->mAllocSize);
293                     mStats.onBufferEvicted(bufferIter->second->mAllocSize);
294                     mBuffers.erase(bufferIter);
295                     mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
296                 }
297             }
298             mTransactions.erase(found);
299         }
300         ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
301               message.bufferId, deleted);
302         return deleted;
303     }
304     ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
305           message.bufferId);
306     return false;
307 }
308 
processStatusMessages()309 void BufferPool::processStatusMessages() {
310     std::vector<BufferStatusMessage> messages;
311     mObserver.getBufferStatusChanges(messages);
312     mTimestampMs = ::android::elapsedRealtime();
313     for (BufferStatusMessage& message: messages) {
314         bool ret = false;
315         switch (message.status) {
316             case BufferStatus::NOT_USED:
317                 ret = handleReleaseBuffer(
318                         message.connectionId, message.bufferId);
319                 break;
320             case BufferStatus::USED:
321                 // not happening
322                 break;
323             case BufferStatus::TRANSFER_TO:
324                 ret = handleTransferTo(message);
325                 break;
326             case BufferStatus::TRANSFER_FROM:
327                 ret = handleTransferFrom(message);
328                 break;
329             case BufferStatus::TRANSFER_TIMEOUT:
330                 // TODO
331                 break;
332             case BufferStatus::TRANSFER_LOST:
333                 // TODO
334                 break;
335             case BufferStatus::TRANSFER_FETCH:
336                 // not happening
337                 break;
338             case BufferStatus::TRANSFER_OK:
339             case BufferStatus::TRANSFER_ERROR:
340                 ret = handleTransferResult(message);
341                 break;
342             case BufferStatus::INVALIDATION_ACK:
343                 mInvalidation.onAck(message.connectionId, message.bufferId);
344                 ret = true;
345                 break;
346         }
347         if (ret == false) {
348             ALOGW("buffer status message processing failure - message : %d connection : %lld",
349                   message.status, (long long)message.connectionId);
350         }
351     }
352     messages.clear();
353 }
354 
handleClose(ConnectionId connectionId)355 bool BufferPool::handleClose(ConnectionId connectionId) {
356     // Cleaning buffers
357     auto buffers = mUsingBuffers.find(connectionId);
358     if (buffers != mUsingBuffers.end()) {
359         for (const BufferId& bufferId : buffers->second) {
360             bool deleted = erase(&mUsingConnections, bufferId, connectionId);
361             if (deleted) {
362                 auto bufferIter = mBuffers.find(bufferId);
363                 bufferIter->second->mOwnerCount--;
364                 if (bufferIter->second->mOwnerCount == 0 &&
365                         bufferIter->second->mTransactionCount == 0) {
366                     // TODO: handle freebuffer insert fail
367                     if (!bufferIter->second->mInvalidated) {
368                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
369                         mFreeBuffers.insert(bufferId);
370                     } else {
371                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
372                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
373                         mBuffers.erase(bufferIter);
374                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
375                     }
376                 }
377             }
378         }
379         mUsingBuffers.erase(buffers);
380     }
381 
382     // Cleaning transactions
383     auto pending = mPendingTransactions.find(connectionId);
384     if (pending != mPendingTransactions.end()) {
385         for (const TransactionId& transactionId : pending->second) {
386             auto iter = mTransactions.find(transactionId);
387             if (iter != mTransactions.end()) {
388                 if (!iter->second->mSenderValidated) {
389                     mCompletedTransactions.insert(transactionId);
390                 }
391                 BufferId bufferId = iter->second->mBufferId;
392                 auto bufferIter = mBuffers.find(bufferId);
393                 bufferIter->second->mTransactionCount--;
394                 if (bufferIter->second->mOwnerCount == 0 &&
395                     bufferIter->second->mTransactionCount == 0) {
396                     // TODO: handle freebuffer insert fail
397                     if (!bufferIter->second->mInvalidated) {
398                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
399                         mFreeBuffers.insert(bufferId);
400                     } else {
401                         mStats.onBufferUnused(bufferIter->second->mAllocSize);
402                         mStats.onBufferEvicted(bufferIter->second->mAllocSize);
403                         mBuffers.erase(bufferIter);
404                         mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
405                     }
406                 }
407                 mTransactions.erase(iter);
408             }
409         }
410     }
411     mConnectionIds.erase(connectionId);
412     return true;
413 }
414 
getFreeBuffer(const std::shared_ptr<BufferPoolAllocator> & allocator,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)415 bool BufferPool::getFreeBuffer(
416         const std::shared_ptr<BufferPoolAllocator> &allocator,
417         const std::vector<uint8_t> &params, BufferId *pId,
418         const native_handle_t** handle) {
419     auto bufferIt = mFreeBuffers.begin();
420     for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
421         BufferId bufferId = *bufferIt;
422         if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
423             break;
424         }
425     }
426     if (bufferIt != mFreeBuffers.end()) {
427         BufferId id = *bufferIt;
428         mFreeBuffers.erase(bufferIt);
429         mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
430         *handle = mBuffers[id]->handle();
431         *pId = id;
432         ALOGV("recycle a buffer %u %p", id, *handle);
433         return true;
434     }
435     return false;
436 }
437 
addNewBuffer(const std::shared_ptr<BufferPoolAllocation> & alloc,const size_t allocSize,const std::vector<uint8_t> & params,BufferId * pId,const native_handle_t ** handle)438 BufferPoolStatus BufferPool::addNewBuffer(
439         const std::shared_ptr<BufferPoolAllocation> &alloc,
440         const size_t allocSize,
441         const std::vector<uint8_t> &params,
442         BufferId *pId,
443         const native_handle_t** handle) {
444 
445     BufferId bufferId = mSeq++;
446     if (mSeq == Connection::SYNC_BUFFERID) {
447         mSeq = 0;
448     }
449     std::unique_ptr<InternalBuffer> buffer =
450             std::make_unique<InternalBuffer>(
451                     bufferId, alloc, allocSize, params);
452     if (buffer) {
453         auto res = mBuffers.insert(std::make_pair(
454                 bufferId, std::move(buffer)));
455         if (res.second) {
456             mStats.onBufferAllocated(allocSize);
457             *handle = alloc->handle();
458             *pId = bufferId;
459             return ResultStatus::OK;
460         }
461     }
462     return ResultStatus::NO_MEMORY;
463 }
464 
cleanUp(bool clearCache)465 void BufferPool::cleanUp(bool clearCache) {
466     if (clearCache || mTimestampMs > mLastCleanUpMs + kCleanUpDurationMs ||
467             mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
468         mLastCleanUpMs = mTimestampMs;
469         if (mTimestampMs > mLastLogMs + kLogDurationMs ||
470                 mStats.buffersNotInUse() > kMaxUnusedBufferCount) {
471             mLastLogMs = mTimestampMs;
472             ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
473                   "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
474                   "%zu/%zu (fetch/transfer)",
475                   this, mStats.mBuffersCached, mStats.mSizeCached,
476                   mStats.mBuffersInUse, mStats.mSizeInUse,
477                   mStats.mTotalRecycles, mStats.mTotalAllocations,
478                   mStats.mTotalFetches, mStats.mTotalTransfers);
479         }
480         for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
481             if (!clearCache && mStats.buffersNotInUse() <= kUnusedBufferCountTarget &&
482                     (mStats.mSizeCached < kMinAllocBytesForEviction ||
483                      mBuffers.size() < kMinBufferCountForEviction)) {
484                 break;
485             }
486             auto it = mBuffers.find(*freeIt);
487             if (it != mBuffers.end() &&
488                     it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
489                 mStats.onBufferEvicted(it->second->mAllocSize);
490                 mBuffers.erase(it);
491                 freeIt = mFreeBuffers.erase(freeIt);
492             } else {
493                 ++freeIt;
494                 ALOGW("bufferpool2 inconsistent!");
495             }
496         }
497     }
498 }
499 
invalidate(bool needsAck,BufferId from,BufferId to,const std::shared_ptr<Accessor> & impl)500 void BufferPool::invalidate(
501         bool needsAck, BufferId from, BufferId to,
502         const std::shared_ptr<Accessor> &impl) {
503     for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
504         if (isBufferInRange(from, to, *freeIt)) {
505             auto it = mBuffers.find(*freeIt);
506             if (it != mBuffers.end() &&
507                 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
508                 mStats.onBufferEvicted(it->second->mAllocSize);
509                 mBuffers.erase(it);
510                 freeIt = mFreeBuffers.erase(freeIt);
511                 continue;
512             } else {
513                 ALOGW("bufferpool2 inconsistent!");
514             }
515         }
516         ++freeIt;
517     }
518 
519     size_t left = 0;
520     for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
521         if (isBufferInRange(from, to, it->first)) {
522             it->second->invalidate();
523             ++left;
524         }
525     }
526     mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
527 }
528 
flush(const std::shared_ptr<Accessor> & impl)529 void BufferPool::flush(const std::shared_ptr<Accessor> &impl) {
530     BufferId from = mStartSeq;
531     BufferId to = mSeq;
532     mStartSeq = mSeq;
533     // TODO: needsAck params
534     ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
535     if (from != to) {
536         invalidate(true, from, to, impl);
537     }
538 }
539 
540 }  // namespace aidl::android::hardware::media::bufferpool2::implementation
541