1 /**
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <SocketReaderNode.h>
18 #include <ImsMediaTrace.h>
19 #include <ImsMediaTimer.h>
20 #include <thread>
21 
22 #define MAX_BUFFER_QUEUE 250  // 5 sec in audio case.
23 
SocketReaderNode(BaseSessionCallback * callback)24 SocketReaderNode::SocketReaderNode(BaseSessionCallback* callback) :
25         BaseNode(callback),
26         mLocalFd(0)
27 {
28     mSocket = nullptr;
29     mReceiveTtl = false;
30     mSocketOpened = false;
31 }
32 
~SocketReaderNode()33 SocketReaderNode::~SocketReaderNode()
34 {
35     IMLOGD1("[~SocketReaderNode] queue size[%d]", GetDataCount());
36     CloseSocket();
37 }
38 
GetNodeId()39 kBaseNodeId SocketReaderNode::GetNodeId()
40 {
41     return kNodeIdSocketReader;
42 }
43 
Prepare()44 bool SocketReaderNode::Prepare()
45 {
46     if (!mSocketOpened)
47     {
48         return OpenSocket();
49     }
50 
51     return true;
52 }
53 
Start()54 ImsMediaResult SocketReaderNode::Start()
55 {
56     if (mSocketOpened)
57     {
58         IMLOGD0("[Start] opened already");
59     }
60     else
61     {
62         if (!OpenSocket())
63         {
64             return RESULT_PORT_UNAVAILABLE;
65         }
66     }
67 
68     mNodeState = kNodeStateRunning;
69     return RESULT_SUCCESS;
70 }
71 
Stop()72 void SocketReaderNode::Stop()
73 {
74     IMLOGD2("[Stop] media[%d], protocolType[%d]", mMediaType, mProtocolType);
75     std::lock_guard<std::mutex> guard(mMutex);
76     mNodeState = kNodeStateStopped;
77 }
78 
IsRunTime()79 bool SocketReaderNode::IsRunTime()
80 {
81     return true;
82 }
83 
IsSourceNode()84 bool SocketReaderNode::IsSourceNode()
85 {
86     return true;
87 }
88 
SetConfig(void * config)89 void SocketReaderNode::SetConfig(void* config)
90 {
91     if (config == nullptr)
92     {
93         return;
94     }
95 
96     RtpConfig* pConfig = reinterpret_cast<RtpConfig*>(config);
97 
98     if (mProtocolType == kProtocolRtp)
99     {
100         mPeerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort());
101     }
102     else if (mProtocolType == kProtocolRtcp)
103     {
104         mPeerAddress =
105                 RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort() + 1);
106     }
107 }
108 
IsSameConfig(void * config)109 bool SocketReaderNode::IsSameConfig(void* config)
110 {
111     if (config == nullptr)
112     {
113         return true;
114     }
115 
116     RtpConfig* pConfig = reinterpret_cast<RtpConfig*>(config);
117     RtpAddress peerAddress;
118 
119     if (mProtocolType == kProtocolRtp)
120     {
121         peerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort());
122     }
123     else if (mProtocolType == kProtocolRtcp)
124     {
125         peerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort() + 1);
126     }
127 
128     return (mPeerAddress == peerAddress);
129 }
130 
UpdateConfig(void * config)131 ImsMediaResult SocketReaderNode::UpdateConfig(void* config)
132 {
133     // check config items updates
134     bool isUpdateNode = false;
135 
136     if (IsSameConfig(config))
137     {
138         IMLOGD0("[UpdateConfig] no update");
139         return RESULT_SUCCESS;
140     }
141     else
142     {
143         isUpdateNode = true;
144     }
145 
146     kBaseNodeState prevState = mNodeState;
147 
148     if (isUpdateNode && mNodeState == kNodeStateRunning)
149     {
150         Stop();
151 
152         if (mSocketOpened)
153         {
154             CloseSocket();
155         }
156     }
157 
158     // reset the parameters
159     SetConfig(config);
160 
161     if (isUpdateNode && prevState == kNodeStateRunning)
162     {
163         if (Prepare())
164         {
165             return Start();
166         }
167         else
168         {
169             return RESULT_INVALID_PARAM;
170         }
171     }
172 
173     return RESULT_SUCCESS;
174 }
175 
OnReadDataFromSocket()176 void SocketReaderNode::OnReadDataFromSocket()
177 {
178     std::lock_guard<std::mutex> guard(mMutex);
179 
180     if (mSocketOpened && mSocket != nullptr)
181     {
182         int len = mSocket->ReceiveFrom(mBuffer, DEFAULT_MTU);
183 
184         if (len > 0)
185         {
186             IMLOGD_PACKET2(IM_PACKET_LOG_SOCKET, "[OnReadDataFromSocket] media[%d], data size[%d]",
187                     mMediaType, len);
188 
189             if (mNodeState == kNodeStateRunning)
190             {
191                 SendDataToRearNode(MEDIASUBTYPE_UNDEFINED, mBuffer, len, 0, 0, 0,
192                         MEDIASUBTYPE_UNDEFINED, ImsMediaTimer::GetTimeInMilliSeconds());
193             }
194         }
195     }
196 }
197 
SetLocalFd(int fd)198 void SocketReaderNode::SetLocalFd(int fd)
199 {
200     mLocalFd = fd;
201 }
202 
SetLocalAddress(const RtpAddress & address)203 void SocketReaderNode::SetLocalAddress(const RtpAddress& address)
204 {
205     mLocalAddress = address;
206 }
207 
SetPeerAddress(const RtpAddress & address)208 void SocketReaderNode::SetPeerAddress(const RtpAddress& address)
209 {
210     mPeerAddress = address;
211 }
212 
OpenSocket()213 bool SocketReaderNode::OpenSocket()
214 {
215     IMLOGD2("[OpenSocket] media[%d], protocolType[%d]", mMediaType, mProtocolType);
216     mSocket = ISocket::GetInstance(mLocalAddress.port, mPeerAddress.ipAddress, mPeerAddress.port);
217 
218     if (mSocket == nullptr)
219     {
220         IMLOGE0("[OpenSocket] can't create socket instance");
221         return false;
222     }
223 
224     // set socket local/peer address here
225     mSocket->SetLocalEndpoint(mLocalAddress.ipAddress, mLocalAddress.port);
226     mSocket->SetPeerEndpoint(mPeerAddress.ipAddress, mPeerAddress.port);
227 
228     if (!mSocketOpened && !mSocket->Open(mLocalFd))
229     {
230         IMLOGE0("[OpenSocket] can't open socket");
231         mSocketOpened = false;
232         return false;
233     }
234 
235     mReceiveTtl = false;
236 
237     if (mSocket->SetSocketOpt(kSocketOptionIpTtl, 1))
238     {
239         mReceiveTtl = true;
240     }
241 
242     mSocket->Listen(this);
243     mSocketOpened = true;
244     return true;
245 }
246 
CloseSocket()247 void SocketReaderNode::CloseSocket()
248 {
249     if (mSocket != nullptr)
250     {
251         IMLOGD2("[CloseSocket] media[%d], protocolType[%d]", mMediaType, mProtocolType);
252 
253         if (mSocketOpened)
254         {
255             mSocket->Listen(nullptr);
256             mSocket->Close();
257             mSocketOpened = false;
258         }
259 
260         mMutex.lock();
261         ISocket::ReleaseInstance(mSocket);
262         mSocket = nullptr;
263         mMutex.unlock();
264     }
265 }