1 /**
2 * Copyright (C) 2022 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include <SocketReaderNode.h>
18 #include <ImsMediaTrace.h>
19 #include <ImsMediaTimer.h>
20 #include <thread>
21
22 #define MAX_BUFFER_QUEUE 250 // 5 sec in audio case.
23
SocketReaderNode(BaseSessionCallback * callback)24 SocketReaderNode::SocketReaderNode(BaseSessionCallback* callback) :
25 BaseNode(callback),
26 mLocalFd(0)
27 {
28 mSocket = nullptr;
29 mReceiveTtl = false;
30 mSocketOpened = false;
31 }
32
~SocketReaderNode()33 SocketReaderNode::~SocketReaderNode()
34 {
35 IMLOGD1("[~SocketReaderNode] queue size[%d]", GetDataCount());
36 CloseSocket();
37 }
38
GetNodeId()39 kBaseNodeId SocketReaderNode::GetNodeId()
40 {
41 return kNodeIdSocketReader;
42 }
43
Prepare()44 bool SocketReaderNode::Prepare()
45 {
46 if (!mSocketOpened)
47 {
48 return OpenSocket();
49 }
50
51 return true;
52 }
53
Start()54 ImsMediaResult SocketReaderNode::Start()
55 {
56 if (mSocketOpened)
57 {
58 IMLOGD0("[Start] opened already");
59 }
60 else
61 {
62 if (!OpenSocket())
63 {
64 return RESULT_PORT_UNAVAILABLE;
65 }
66 }
67
68 mNodeState = kNodeStateRunning;
69 return RESULT_SUCCESS;
70 }
71
Stop()72 void SocketReaderNode::Stop()
73 {
74 IMLOGD2("[Stop] media[%d], protocolType[%d]", mMediaType, mProtocolType);
75 std::lock_guard<std::mutex> guard(mMutex);
76 mNodeState = kNodeStateStopped;
77 }
78
IsRunTime()79 bool SocketReaderNode::IsRunTime()
80 {
81 return true;
82 }
83
IsSourceNode()84 bool SocketReaderNode::IsSourceNode()
85 {
86 return true;
87 }
88
SetConfig(void * config)89 void SocketReaderNode::SetConfig(void* config)
90 {
91 if (config == nullptr)
92 {
93 return;
94 }
95
96 RtpConfig* pConfig = reinterpret_cast<RtpConfig*>(config);
97
98 if (mProtocolType == kProtocolRtp)
99 {
100 mPeerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort());
101 }
102 else if (mProtocolType == kProtocolRtcp)
103 {
104 mPeerAddress =
105 RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort() + 1);
106 }
107 }
108
IsSameConfig(void * config)109 bool SocketReaderNode::IsSameConfig(void* config)
110 {
111 if (config == nullptr)
112 {
113 return true;
114 }
115
116 RtpConfig* pConfig = reinterpret_cast<RtpConfig*>(config);
117 RtpAddress peerAddress;
118
119 if (mProtocolType == kProtocolRtp)
120 {
121 peerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort());
122 }
123 else if (mProtocolType == kProtocolRtcp)
124 {
125 peerAddress = RtpAddress(pConfig->getRemoteAddress().c_str(), pConfig->getRemotePort() + 1);
126 }
127
128 return (mPeerAddress == peerAddress);
129 }
130
UpdateConfig(void * config)131 ImsMediaResult SocketReaderNode::UpdateConfig(void* config)
132 {
133 // check config items updates
134 bool isUpdateNode = false;
135
136 if (IsSameConfig(config))
137 {
138 IMLOGD0("[UpdateConfig] no update");
139 return RESULT_SUCCESS;
140 }
141 else
142 {
143 isUpdateNode = true;
144 }
145
146 kBaseNodeState prevState = mNodeState;
147
148 if (isUpdateNode && mNodeState == kNodeStateRunning)
149 {
150 Stop();
151
152 if (mSocketOpened)
153 {
154 CloseSocket();
155 }
156 }
157
158 // reset the parameters
159 SetConfig(config);
160
161 if (isUpdateNode && prevState == kNodeStateRunning)
162 {
163 if (Prepare())
164 {
165 return Start();
166 }
167 else
168 {
169 return RESULT_INVALID_PARAM;
170 }
171 }
172
173 return RESULT_SUCCESS;
174 }
175
OnReadDataFromSocket()176 void SocketReaderNode::OnReadDataFromSocket()
177 {
178 std::lock_guard<std::mutex> guard(mMutex);
179
180 if (mSocketOpened && mSocket != nullptr)
181 {
182 int len = mSocket->ReceiveFrom(mBuffer, DEFAULT_MTU);
183
184 if (len > 0)
185 {
186 IMLOGD_PACKET2(IM_PACKET_LOG_SOCKET, "[OnReadDataFromSocket] media[%d], data size[%d]",
187 mMediaType, len);
188
189 if (mNodeState == kNodeStateRunning)
190 {
191 SendDataToRearNode(MEDIASUBTYPE_UNDEFINED, mBuffer, len, 0, 0, 0,
192 MEDIASUBTYPE_UNDEFINED, ImsMediaTimer::GetTimeInMilliSeconds());
193 }
194 }
195 }
196 }
197
SetLocalFd(int fd)198 void SocketReaderNode::SetLocalFd(int fd)
199 {
200 mLocalFd = fd;
201 }
202
SetLocalAddress(const RtpAddress & address)203 void SocketReaderNode::SetLocalAddress(const RtpAddress& address)
204 {
205 mLocalAddress = address;
206 }
207
SetPeerAddress(const RtpAddress & address)208 void SocketReaderNode::SetPeerAddress(const RtpAddress& address)
209 {
210 mPeerAddress = address;
211 }
212
OpenSocket()213 bool SocketReaderNode::OpenSocket()
214 {
215 IMLOGD2("[OpenSocket] media[%d], protocolType[%d]", mMediaType, mProtocolType);
216 mSocket = ISocket::GetInstance(mLocalAddress.port, mPeerAddress.ipAddress, mPeerAddress.port);
217
218 if (mSocket == nullptr)
219 {
220 IMLOGE0("[OpenSocket] can't create socket instance");
221 return false;
222 }
223
224 // set socket local/peer address here
225 mSocket->SetLocalEndpoint(mLocalAddress.ipAddress, mLocalAddress.port);
226 mSocket->SetPeerEndpoint(mPeerAddress.ipAddress, mPeerAddress.port);
227
228 if (!mSocketOpened && !mSocket->Open(mLocalFd))
229 {
230 IMLOGE0("[OpenSocket] can't open socket");
231 mSocketOpened = false;
232 return false;
233 }
234
235 mReceiveTtl = false;
236
237 if (mSocket->SetSocketOpt(kSocketOptionIpTtl, 1))
238 {
239 mReceiveTtl = true;
240 }
241
242 mSocket->Listen(this);
243 mSocketOpened = true;
244 return true;
245 }
246
CloseSocket()247 void SocketReaderNode::CloseSocket()
248 {
249 if (mSocket != nullptr)
250 {
251 IMLOGD2("[CloseSocket] media[%d], protocolType[%d]", mMediaType, mProtocolType);
252
253 if (mSocketOpened)
254 {
255 mSocket->Listen(nullptr);
256 mSocket->Close();
257 mSocketOpened = false;
258 }
259
260 mMutex.lock();
261 ISocket::ReleaseInstance(mSocket);
262 mSocket = nullptr;
263 mMutex.unlock();
264 }
265 }