1 // Copyright 2014 The Android Open Source Project
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "aemu/base/synchronization/AndroidMessageChannel.h"
16 
17 namespace gfxstream {
18 namespace guest {
19 
MessageChannelBase(size_t capacity)20 MessageChannelBase::MessageChannelBase(size_t capacity) : mCapacity(capacity) {}
21 
size() const22 size_t MessageChannelBase::size() const {
23     AutoLock<Lock> lock(mLock);
24     return mCount;
25 }
26 
stop()27 void MessageChannelBase::stop() {
28     gfxstream::guest::AutoLock<Lock> lock(mLock);
29     mStopped = true;
30     mCount = 0;
31     mCanRead.broadcast();
32     mCanWrite.broadcastAndUnlock(&lock);
33 }
34 
isStopped() const35 bool MessageChannelBase::isStopped() const {
36     AutoLock<Lock> lock(mLock);
37     return isStoppedLocked();
38 }
39 
waitForEmpty()40 void MessageChannelBase::waitForEmpty() {
41     AutoLock<Lock> lock(mLock);
42     while (mCount > 0) {
43         mCanWrite.wait(&lock);
44     }
45 }
46 
beforeWrite()47 size_t MessageChannelBase::beforeWrite() {
48     mLock.lock();
49     while (mCount >= mCapacity && !mStopped) {
50         mCanWrite.wait(&mLock);
51     }
52     // Return value is undefined if stopped, so let's save a branch and skip the
53     // check for it.
54     size_t result = mPos + mCount;
55     if (result >= mCapacity) {
56         result -= mCapacity;
57     }
58     return result;
59 }
60 
beforeTryWrite()61 Optional<size_t> MessageChannelBase::beforeTryWrite() {
62     mLock.lock();
63 
64     if (mCount >= mCapacity || mStopped) {
65         return {};
66     }
67     size_t result = mPos + mCount;
68     if (result >= mCapacity) {
69         result -= mCapacity;
70     }
71     return result;
72 }
73 
afterWrite(bool success)74 void MessageChannelBase::afterWrite(bool success) {
75     if (success) {
76         ++mCount;
77     }
78     mCanRead.signalAndUnlock(&mLock);
79 }
80 
beforeRead()81 size_t MessageChannelBase::beforeRead() {
82     mLock.lock();
83     while (mCount == 0 && !mStopped) {
84         mCanRead.wait(&mLock);
85     }
86     return mPos; // return value is undefined if stopped, so let's save a branch
87 }
88 
beforeTryRead()89 Optional<size_t> MessageChannelBase::beforeTryRead() {
90     mLock.lock();
91 
92     if (mCount == 0 || mStopped) {
93         return {};
94     }
95     return mPos;
96 }
97 
beforeTimedRead(uint64_t wallTimeUs)98 Optional<size_t> MessageChannelBase::beforeTimedRead(uint64_t wallTimeUs) {
99     mLock.lock();
100 
101     while (mCount == 0 && !mStopped) {
102         if (!mCanRead.timedWait(&mLock, wallTimeUs)) {
103             return {};
104         }
105     }
106     return mPos;
107 }
108 
afterRead(bool success)109 void MessageChannelBase::afterRead(bool success) {
110     if (success) {
111         if (++mPos == mCapacity) {
112             mPos = 0U;
113         }
114         --mCount;
115     }
116     mCanWrite.signalAndUnlock(&mLock);
117 }
118 
119 } // namespace guest
120 } // namespace gfxstream
121