1 /*
2  * Copyright (C) 2020 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "host/frontend/webrtc/libdevice/streamer.h"
18 
19 #include <android-base/logging.h>
20 #include <json/json.h>
21 
22 #include <api/audio_codecs/audio_decoder_factory.h>
23 #include <api/audio_codecs/audio_encoder_factory.h>
24 #include <api/audio_codecs/builtin_audio_decoder_factory.h>
25 #include <api/audio_codecs/builtin_audio_encoder_factory.h>
26 #include <api/create_peerconnection_factory.h>
27 #include <api/peer_connection_interface.h>
28 #include <api/video_codecs/builtin_video_decoder_factory.h>
29 #include <api/video_codecs/builtin_video_encoder_factory.h>
30 #include <api/video_codecs/video_decoder_factory.h>
31 #include <api/video_codecs/video_encoder_factory.h>
32 #include <media/base/video_broadcaster.h>
33 #include <pc/video_track_source.h>
34 
35 #include "common/libs/fs/shared_fd.h"
36 #include "host/frontend/webrtc/libcommon/audio_device.h"
37 #include "host/frontend/webrtc/libcommon/peer_connection_utils.h"
38 #include "host/frontend/webrtc/libcommon/port_range_socket_factory.h"
39 #include "host/frontend/webrtc/libcommon/utils.h"
40 #include "host/frontend/webrtc/libcommon/vp8only_encoder_factory.h"
41 #include "host/frontend/webrtc/libdevice/audio_track_source_impl.h"
42 #include "host/frontend/webrtc/libdevice/camera_streamer.h"
43 #include "host/frontend/webrtc/libdevice/client_handler.h"
44 #include "host/frontend/webrtc/libdevice/video_track_source_impl.h"
45 #include "host/frontend/webrtc_operator/constants/signaling_constants.h"
46 
47 namespace cuttlefish {
48 namespace webrtc_streaming {
49 namespace {
50 
51 constexpr auto kStreamIdField = "stream_id";
52 constexpr auto kLabelField = "label";
53 constexpr auto kXResField = "x_res";
54 constexpr auto kYResField = "y_res";
55 constexpr auto kDpiField = "dpi";
56 constexpr auto kIsTouchField = "is_touch";
57 constexpr auto kDisplaysField = "displays";
58 constexpr auto kTouchpadsField = "touchpads";
59 constexpr auto kAudioStreamsField = "audio_streams";
60 constexpr auto kHardwareField = "hardware";
61 constexpr auto kOpenwrtDeviceIdField = "openwrt_device_id";
62 constexpr auto kOpenwrtAddrField = "openwrt_addr";
63 constexpr auto kControlEnvProxyServerPathField =
64     "control_env_proxy_server_path";
65 constexpr auto kControlPanelButtonCommand = "command";
66 constexpr auto kControlPanelButtonTitle = "title";
67 constexpr auto kControlPanelButtonIconName = "icon_name";
68 constexpr auto kControlPanelButtonShellCommand = "shell_command";
69 constexpr auto kControlPanelButtonDeviceStates = "device_states";
70 constexpr auto kControlPanelButtonLidSwitchOpen = "lid_switch_open";
71 constexpr auto kControlPanelButtonHingeAngleValue = "hinge_angle_value";
72 constexpr auto kCustomControlPanelButtonsField = "custom_control_panel_buttons";
73 constexpr auto kGroupIdField = "group_id";
74 
75 constexpr int kRegistrationRetries = 3;
76 constexpr int kRetryFirstIntervalMs = 1000;
77 constexpr int kReconnectRetries = 100;
78 constexpr int kReconnectIntervalMs = 1000;
79 
ParseMessage(const uint8_t * data,size_t length,Json::Value * msg_out)80 bool ParseMessage(const uint8_t* data, size_t length, Json::Value* msg_out) {
81   auto str = reinterpret_cast<const char*>(data);
82   Json::CharReaderBuilder builder;
83   std::unique_ptr<Json::CharReader> json_reader(builder.newCharReader());
84   std::string errorMessage;
85   return json_reader->parse(str, str + length, msg_out, &errorMessage);
86 }
87 
88 struct DisplayDescriptor {
89   int width;
90   int height;
91   int dpi;
92   bool touch_enabled;
93   rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source;
94 };
95 
96 struct TouchpadDescriptor {
97   int width;
98   int height;
99 };
100 
101 struct ControlPanelButtonDescriptor {
102   std::string command;
103   std::string title;
104   std::string icon_name;
105   std::optional<std::string> shell_command;
106   std::vector<DeviceState> device_states;
107 };
108 
109 // TODO (jemoreira): move to a place in common with the signaling server
110 struct OperatorServerConfig {
111   std::vector<webrtc::PeerConnectionInterface::IceServer> servers;
112 };
113 
114 // Wraps a scoped_refptr pointer to an audio device module
115 class AudioDeviceModuleWrapper : public AudioSource {
116  public:
AudioDeviceModuleWrapper(rtc::scoped_refptr<CfAudioDeviceModule> device_module)117   AudioDeviceModuleWrapper(
118       rtc::scoped_refptr<CfAudioDeviceModule> device_module)
119       : device_module_(device_module) {}
GetMoreAudioData(void * data,int bytes_per_sample,int samples_per_channel,int num_channels,int sample_rate,bool & muted)120   int GetMoreAudioData(void* data, int bytes_per_sample,
121                        int samples_per_channel, int num_channels,
122                        int sample_rate, bool& muted) override {
123     return device_module_->GetMoreAudioData(data, bytes_per_sample,
124                                             samples_per_channel, num_channels,
125                                             sample_rate, muted);
126   }
127 
device_module()128   rtc::scoped_refptr<CfAudioDeviceModule> device_module() {
129     return device_module_;
130   }
131 
132  private:
133   rtc::scoped_refptr<CfAudioDeviceModule> device_module_;
134 };
135 
136 }  // namespace
137 
138 
139 class Streamer::Impl : public ServerConnectionObserver,
140                        public PeerConnectionBuilder,
141                        public std::enable_shared_from_this<ServerConnectionObserver> {
142  public:
143   std::shared_ptr<ClientHandler> CreateClientHandler(int client_id);
144 
145   void Register(std::weak_ptr<OperatorObserver> observer);
146 
147   void SendMessageToClient(int client_id, const Json::Value& msg);
148   void DestroyClientHandler(int client_id);
149   void SetupCameraForClient(int client_id);
150 
151   // WsObserver
152   void OnOpen() override;
153   void OnClose() override;
154   void OnError(const std::string& error) override;
155   void OnReceive(const uint8_t* msg, size_t length, bool is_binary) override;
156 
157   void HandleConfigMessage(const Json::Value& msg);
158   void HandleClientMessage(const Json::Value& server_message);
159 
160   // PeerConnectionBuilder
161   Result<rtc::scoped_refptr<webrtc::PeerConnectionInterface>> Build(
162       webrtc::PeerConnectionObserver& observer,
163       const std::vector<webrtc::PeerConnectionInterface::IceServer>&
164           per_connection_servers) override;
165 
166   // All accesses to these variables happen from the signal_thread, so there is
167   // no need for extra synchronization mechanisms (mutex)
168   StreamerConfig config_;
169   OperatorServerConfig operator_config_;
170   std::unique_ptr<ServerConnection> server_connection_;
171   std::shared_ptr<ConnectionObserverFactory> connection_observer_factory_;
172   rtc::scoped_refptr<webrtc::PeerConnectionFactoryInterface>
173       peer_connection_factory_;
174   std::unique_ptr<rtc::Thread> network_thread_;
175   std::unique_ptr<rtc::Thread> worker_thread_;
176   std::unique_ptr<rtc::Thread> signal_thread_;
177   std::map<std::string, DisplayDescriptor> displays_;
178   std::map<std::string, TouchpadDescriptor> touchpads_;
179   std::map<std::string, rtc::scoped_refptr<AudioTrackSourceImpl>>
180       audio_sources_;
181   std::map<int, std::shared_ptr<ClientHandler>> clients_;
182   std::weak_ptr<OperatorObserver> operator_observer_;
183   std::map<std::string, std::string> hardware_;
184   std::vector<ControlPanelButtonDescriptor> custom_control_panel_buttons_;
185   std::shared_ptr<AudioDeviceModuleWrapper> audio_device_module_;
186   std::unique_ptr<CameraStreamer> camera_streamer_;
187   int registration_retries_left_ = kRegistrationRetries;
188   int retry_interval_ms_ = kRetryFirstIntervalMs;
189   RecordingManager* recording_manager_ = nullptr;
190 };
191 
Streamer(std::unique_ptr<Streamer::Impl> impl)192 Streamer::Streamer(std::unique_ptr<Streamer::Impl> impl)
193     : impl_(std::move(impl)) {}
194 
195 /* static */
Create(const StreamerConfig & cfg,RecordingManager & recording_manager,std::shared_ptr<ConnectionObserverFactory> connection_observer_factory)196 std::unique_ptr<Streamer> Streamer::Create(
197     const StreamerConfig& cfg, RecordingManager& recording_manager,
198     std::shared_ptr<ConnectionObserverFactory> connection_observer_factory) {
199   rtc::LogMessage::LogToDebug(rtc::LS_ERROR);
200 
201   std::unique_ptr<Streamer::Impl> impl(new Streamer::Impl());
202   impl->config_ = cfg;
203   impl->recording_manager_ = &recording_manager;
204   impl->connection_observer_factory_ = connection_observer_factory;
205 
206   auto network_thread_result = CreateAndStartThread("network-thread");
207   if (!network_thread_result.ok()) {
208     LOG(ERROR) << network_thread_result.error().FormatForEnv();
209     return nullptr;
210   }
211   impl->network_thread_ = std::move(*network_thread_result);
212 
213   auto worker_thread_result = CreateAndStartThread("worker-thread");
214   if (!worker_thread_result.ok()) {
215     LOG(ERROR) << worker_thread_result.error().FormatForEnv();
216     return nullptr;
217   }
218   impl->worker_thread_ = std::move(*worker_thread_result);
219 
220   auto signal_thread_result = CreateAndStartThread("signal-thread");
221   if (!signal_thread_result.ok()) {
222     LOG(ERROR) << signal_thread_result.error().FormatForEnv();
223     return nullptr;
224   }
225   impl->signal_thread_ = std::move(*signal_thread_result);
226 
227   impl->audio_device_module_ = std::make_shared<AudioDeviceModuleWrapper>(
228       rtc::scoped_refptr<CfAudioDeviceModule>(
229           new rtc::RefCountedObject<CfAudioDeviceModule>()));
230 
231   auto result = CreatePeerConnectionFactory(
232       impl->network_thread_.get(), impl->worker_thread_.get(),
233       impl->signal_thread_.get(), impl->audio_device_module_->device_module());
234 
235   if (!result.ok()) {
236     LOG(ERROR) << result.error().FormatForEnv();
237     return nullptr;
238   }
239   impl->peer_connection_factory_ = *result;
240 
241   return std::unique_ptr<Streamer>(new Streamer(std::move(impl)));
242 }
243 
AddDisplay(const std::string & label,int width,int height,int dpi,bool touch_enabled)244 std::shared_ptr<VideoSink> Streamer::AddDisplay(const std::string& label,
245                                                 int width, int height, int dpi,
246                                                 bool touch_enabled) {
247   // Usually called from an application thread
248   return impl_->signal_thread_->BlockingCall(
249       [this, &label, width, height, dpi,
250        touch_enabled]() -> std::shared_ptr<VideoSink> {
251         if (impl_->displays_.count(label)) {
252           LOG(ERROR) << "Display with same label already exists: " << label;
253           return nullptr;
254         }
255         rtc::scoped_refptr<VideoTrackSourceImpl> source(
256             new rtc::RefCountedObject<VideoTrackSourceImpl>(width, height));
257         impl_->displays_[label] = {width, height, dpi, touch_enabled, source};
258 
259         auto video_track = impl_->peer_connection_factory_->CreateVideoTrack(
260             label, source.get());
261 
262         for (auto& [_, client] : impl_->clients_) {
263           client->AddDisplay(video_track, label);
264         }
265 
266         if (impl_->recording_manager_) {
267           rtc::scoped_refptr<webrtc::VideoTrackSourceInterface> source2 =
268               source;
269           auto deleter = [](webrtc::VideoTrackSourceInterface* source) {
270             source->Release();
271           };
272           std::shared_ptr<webrtc::VideoTrackSourceInterface> source_shared(
273               source2.release(), deleter);
274           impl_->recording_manager_->AddSource(width, height, source_shared, label);
275         }
276 
277         return std::shared_ptr<VideoSink>(
278             new VideoTrackSourceImplSinkWrapper(source));
279       });
280 }
281 
RemoveDisplay(const std::string & label)282 bool Streamer::RemoveDisplay(const std::string& label) {
283   // Usually called from an application thread
284   return impl_->signal_thread_->BlockingCall(
285       [this, &label]() -> bool {
286         if (impl_->recording_manager_) {
287           impl_->recording_manager_->RemoveSource(label);
288         }
289 
290         for (auto& [_, client] : impl_->clients_) {
291           client->RemoveDisplay(label);
292         }
293 
294         impl_->displays_.erase(label);
295         return true;
296       });
297 }
298 
AddTouchpad(const std::string & label,int width,int height)299 bool Streamer::AddTouchpad(const std::string& label, int width, int height) {
300   // Usually called from an application thread
301   return impl_->signal_thread_->BlockingCall(
302       [this, &label, width, height]() -> bool {
303         if (impl_->touchpads_.count(label)) {
304           LOG(ERROR) << "Touchpad with same label already exists: " << label;
305           return false;
306         }
307         impl_->touchpads_[label] = {width, height};
308 
309         return true;
310       });
311 }
312 
AddAudioStream(const std::string & label)313 std::shared_ptr<AudioSink> Streamer::AddAudioStream(const std::string& label) {
314   // Usually called from an application thread
315   return impl_->signal_thread_->BlockingCall(
316       [this, &label]() -> std::shared_ptr<AudioSink> {
317         if (impl_->audio_sources_.count(label)) {
318           LOG(ERROR) << "Audio stream with same label already exists: "
319                      << label;
320           return nullptr;
321         }
322         rtc::scoped_refptr<AudioTrackSourceImpl> source(
323             new rtc::RefCountedObject<AudioTrackSourceImpl>());
324         impl_->audio_sources_[label] = source;
325         return std::shared_ptr<AudioSink>(
326             new AudioTrackSourceImplSinkWrapper(source));
327       });
328 }
329 
GetAudioSource()330 std::shared_ptr<AudioSource> Streamer::GetAudioSource() {
331   return impl_->audio_device_module_;
332 }
333 
AddCamera(unsigned int port,unsigned int cid,bool vhost_user)334 CameraController* Streamer::AddCamera(unsigned int port, unsigned int cid,
335                                       bool vhost_user) {
336   impl_->camera_streamer_ =
337       std::make_unique<CameraStreamer>(port, cid, vhost_user);
338   return impl_->camera_streamer_.get();
339 }
340 
SetHardwareSpec(std::string key,std::string value)341 void Streamer::SetHardwareSpec(std::string key, std::string value) {
342   impl_->hardware_.emplace(key, value);
343 }
344 
AddCustomControlPanelButton(const std::string & command,const std::string & title,const std::string & icon_name)345 void Streamer::AddCustomControlPanelButton(const std::string& command,
346                                            const std::string& title,
347                                            const std::string& icon_name) {
348   ControlPanelButtonDescriptor button = {
349       .command = command, .title = title, .icon_name = icon_name};
350   impl_->custom_control_panel_buttons_.push_back(button);
351 }
352 
AddCustomControlPanelButtonWithShellCommand(const std::string & command,const std::string & title,const std::string & icon_name,const std::string & shell_command)353 void Streamer::AddCustomControlPanelButtonWithShellCommand(
354     const std::string& command, const std::string& title,
355     const std::string& icon_name, const std::string& shell_command) {
356   ControlPanelButtonDescriptor button = {
357       .command = command, .title = title, .icon_name = icon_name};
358   button.shell_command = shell_command;
359   impl_->custom_control_panel_buttons_.push_back(button);
360 }
361 
AddCustomControlPanelButtonWithDeviceStates(const std::string & command,const std::string & title,const std::string & icon_name,const std::vector<DeviceState> & device_states)362 void Streamer::AddCustomControlPanelButtonWithDeviceStates(
363     const std::string& command, const std::string& title,
364     const std::string& icon_name,
365     const std::vector<DeviceState>& device_states) {
366   ControlPanelButtonDescriptor button = {
367       .command = command, .title = title, .icon_name = icon_name};
368   button.device_states = device_states;
369   impl_->custom_control_panel_buttons_.push_back(button);
370 }
371 
Register(std::weak_ptr<OperatorObserver> observer)372 void Streamer::Register(std::weak_ptr<OperatorObserver> observer) {
373   // Usually called from an application thread
374   // No need to block the calling thread on this, the observer will be notified
375   // when the connection is established.
376   impl_->signal_thread_->PostTask([this, observer]() {
377     impl_->Register(observer);
378   });
379 }
380 
Unregister()381 void Streamer::Unregister() {
382   // Usually called from an application thread.
383   impl_->signal_thread_->PostTask(
384       [this]() { impl_->server_connection_.reset(); });
385 }
386 
Register(std::weak_ptr<OperatorObserver> observer)387 void Streamer::Impl::Register(std::weak_ptr<OperatorObserver> observer) {
388   operator_observer_ = observer;
389   // When the connection is established the OnOpen function will be called where
390   // the registration will take place
391   if (!server_connection_) {
392     server_connection_ =
393         ServerConnection::Connect(config_.operator_server, weak_from_this());
394   } else {
395     // in case connection attempt is retried, just call Reconnect().
396     // Recreating server_connection_ object will destroy existing WSConnection
397     // object and task re-scheduling will fail
398     server_connection_->Reconnect();
399   }
400 }
401 
OnOpen()402 void Streamer::Impl::OnOpen() {
403   // Called from the websocket thread.
404   // Connected to operator.
405   signal_thread_->PostTask([this]() {
406     Json::Value register_obj;
407     register_obj[cuttlefish::webrtc_signaling::kTypeField] =
408         cuttlefish::webrtc_signaling::kRegisterType;
409     register_obj[cuttlefish::webrtc_signaling::kDeviceIdField] =
410         config_.device_id;
411     CHECK(config_.client_files_port >= 0) << "Invalid device port provided";
412     register_obj[cuttlefish::webrtc_signaling::kDevicePortField] =
413         config_.client_files_port;
414 
415     Json::Value device_info;
416     Json::Value displays(Json::ValueType::arrayValue);
417     // No need to synchronize with other accesses to display_ because all
418     // happens on signal_thread.
419     for (auto& entry : displays_) {
420       Json::Value display;
421       display[kStreamIdField] = entry.first;
422       display[kXResField] = entry.second.width;
423       display[kYResField] = entry.second.height;
424       display[kDpiField] = entry.second.dpi;
425       display[kIsTouchField] = true;
426       displays.append(display);
427     }
428 
429     device_info[kGroupIdField] = config_.group_id;
430     device_info[kDisplaysField] = displays;
431 
432     Json::Value touchpads(Json::ValueType::arrayValue);
433     for (const auto& [label, touchpad_desc] : touchpads_) {
434       Json::Value touchpad;
435       touchpad[kXResField] = touchpad_desc.width;
436       touchpad[kYResField] = touchpad_desc.height;
437       touchpad[kLabelField] = label;
438       touchpads.append(touchpad);
439     }
440     device_info[kTouchpadsField] = touchpads;
441     Json::Value audio_streams(Json::ValueType::arrayValue);
442     for (auto& entry : audio_sources_) {
443       Json::Value audio;
444       audio[kStreamIdField] = entry.first;
445       audio_streams.append(audio);
446     }
447     device_info[kAudioStreamsField] = audio_streams;
448     Json::Value hardware;
449     for (const auto& [k, v] : hardware_) {
450       hardware[k] = v;
451     }
452     device_info[kHardwareField] = hardware;
453     device_info[kOpenwrtDeviceIdField] = config_.openwrt_device_id;
454     device_info[kOpenwrtAddrField] = config_.openwrt_addr;
455     device_info[kControlEnvProxyServerPathField] =
456         config_.control_env_proxy_server_path;
457     Json::Value custom_control_panel_buttons(Json::arrayValue);
458     for (const auto& button : custom_control_panel_buttons_) {
459       Json::Value button_entry;
460       button_entry[kControlPanelButtonCommand] = button.command;
461       button_entry[kControlPanelButtonTitle] = button.title;
462       button_entry[kControlPanelButtonIconName] = button.icon_name;
463       if (button.shell_command) {
464         button_entry[kControlPanelButtonShellCommand] = *(button.shell_command);
465       } else if (!button.device_states.empty()) {
466         Json::Value device_states(Json::arrayValue);
467         for (const DeviceState& device_state : button.device_states) {
468           Json::Value device_state_entry;
469           if (device_state.lid_switch_open) {
470             device_state_entry[kControlPanelButtonLidSwitchOpen] =
471                 *device_state.lid_switch_open;
472           }
473           if (device_state.hinge_angle_value) {
474             device_state_entry[kControlPanelButtonHingeAngleValue] =
475                 *device_state.hinge_angle_value;
476           }
477           device_states.append(device_state_entry);
478         }
479         button_entry[kControlPanelButtonDeviceStates] = device_states;
480       }
481       custom_control_panel_buttons.append(button_entry);
482     }
483     device_info[kCustomControlPanelButtonsField] = custom_control_panel_buttons;
484     register_obj[cuttlefish::webrtc_signaling::kDeviceInfoField] = device_info;
485     server_connection_->Send(register_obj);
486     // Do this last as OnRegistered() is user code and may take some time to
487     // complete (although it shouldn't...)
488     auto observer = operator_observer_.lock();
489     if (observer) {
490       observer->OnRegistered();
491     }
492   });
493 }
494 
OnClose()495 void Streamer::Impl::OnClose() {
496   // Called from websocket thread
497   // The operator shouldn't close the connection with the client, it's up to the
498   // device to decide when to disconnect.
499   LOG(WARNING) << "Connection with server closed unexpectedly";
500   signal_thread_->PostTask([this]() {
501     auto observer = operator_observer_.lock();
502     if (observer) {
503       observer->OnClose();
504     }
505   });
506   LOG(INFO) << "Trying to re-connect to operator..";
507   registration_retries_left_ = kReconnectRetries;
508   retry_interval_ms_ = kReconnectIntervalMs;
509   signal_thread_->PostDelayedTask(
510       [this]() { Register(operator_observer_); },
511       webrtc::TimeDelta::Millis(retry_interval_ms_));
512 }
513 
OnError(const std::string & error)514 void Streamer::Impl::OnError(const std::string& error) {
515   // Called from websocket thread.
516   if (registration_retries_left_) {
517     LOG(WARNING) << "Connection to operator failed (" << error << "), "
518                  << registration_retries_left_ << " retries left"
519                  << " (will retry in " << retry_interval_ms_ / 1000 << "s)";
520     --registration_retries_left_;
521     signal_thread_->PostDelayedTask(
522         [this]() {
523           // Need to reconnect and register again with operator
524           Register(operator_observer_);
525         },
526         webrtc::TimeDelta::Millis(retry_interval_ms_));
527     retry_interval_ms_ *= 2;
528   } else {
529     LOG(ERROR) << "Error on connection with the operator: " << error;
530     signal_thread_->PostTask([this]() {
531       auto observer = operator_observer_.lock();
532       if (observer) {
533         observer->OnError();
534       }
535     });
536   }
537 }
538 
HandleConfigMessage(const Json::Value & server_message)539 void Streamer::Impl::HandleConfigMessage(const Json::Value& server_message) {
540   CHECK(signal_thread_->IsCurrent())
541       << __FUNCTION__ << " called from the wrong thread";
542   auto result = ParseIceServersMessage(server_message);
543   if (!result.ok()) {
544     LOG(WARNING) << "Failed to parse ice servers message from server: "
545                  << result.error().FormatForEnv();
546   }
547   operator_config_.servers = *result;
548 }
549 
HandleClientMessage(const Json::Value & server_message)550 void Streamer::Impl::HandleClientMessage(const Json::Value& server_message) {
551   CHECK(signal_thread_->IsCurrent())
552       << __FUNCTION__ << " called from the wrong thread";
553   if (!server_message.isMember(cuttlefish::webrtc_signaling::kClientIdField) ||
554       !server_message[cuttlefish::webrtc_signaling::kClientIdField].isInt()) {
555     LOG(ERROR) << "Client message received without valid client id";
556     return;
557   }
558   auto client_id =
559       server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
560   if (!server_message.isMember(cuttlefish::webrtc_signaling::kPayloadField)) {
561     LOG(WARNING) << "Received empty client message";
562     return;
563   }
564   auto client_message =
565       server_message[cuttlefish::webrtc_signaling::kPayloadField];
566   if (clients_.count(client_id) == 0) {
567     auto client_handler = CreateClientHandler(client_id);
568     if (!client_handler) {
569       LOG(ERROR) << "Failed to create a new client handler";
570       return;
571     }
572     clients_.emplace(client_id, client_handler);
573   }
574   auto client_handler = clients_[client_id];
575 
576   client_handler->HandleMessage(client_message);
577 }
578 
OnReceive(const uint8_t * msg,size_t length,bool is_binary)579 void Streamer::Impl::OnReceive(const uint8_t* msg, size_t length,
580                                bool is_binary) {
581   // Usually called from websocket thread.
582   Json::Value server_message;
583   // Once OnReceive returns the buffer can be destroyed/recycled at any time, so
584   // parse the data into a JSON object while still on the websocket thread.
585   if (is_binary || !ParseMessage(msg, length, &server_message)) {
586     LOG(ERROR) << "Received invalid JSON from server: '"
587                << (is_binary ? std::string("(binary_data)")
588                              : std::string(msg, msg + length))
589                << "'";
590     return;
591   }
592   // Transition to the signal thread before member variables are accessed.
593   signal_thread_->PostTask([this, server_message]() {
594     if (!server_message.isMember(cuttlefish::webrtc_signaling::kTypeField) ||
595         !server_message[cuttlefish::webrtc_signaling::kTypeField].isString()) {
596       LOG(ERROR) << "No message_type field from server";
597       // Notify the caller
598       OnError(
599           "Invalid message received from operator: no message type field "
600           "present");
601       return;
602     }
603     auto type =
604         server_message[cuttlefish::webrtc_signaling::kTypeField].asString();
605     if (type == cuttlefish::webrtc_signaling::kConfigType) {
606       HandleConfigMessage(server_message);
607     } else if (type == cuttlefish::webrtc_signaling::kClientDisconnectType) {
608       if (!server_message.isMember(
609               cuttlefish::webrtc_signaling::kClientIdField) ||
610           !server_message.isMember(
611               cuttlefish::webrtc_signaling::kClientIdField)) {
612         LOG(ERROR) << "Invalid disconnect message received from server";
613         // Notify the caller
614         OnError("Invalid disconnect message: client_id is required");
615         return;
616       }
617       auto client_id =
618           server_message[cuttlefish::webrtc_signaling::kClientIdField].asInt();
619       LOG(INFO) << "Client " << client_id << " has disconnected.";
620       DestroyClientHandler(client_id);
621     } else if (type == cuttlefish::webrtc_signaling::kClientMessageType) {
622       HandleClientMessage(server_message);
623     } else {
624       LOG(ERROR) << "Unknown message type: " << type;
625       // Notify the caller
626       OnError("Invalid message received from operator: unknown message type");
627       return;
628     }
629   });
630 }
631 
CreateClientHandler(int client_id)632 std::shared_ptr<ClientHandler> Streamer::Impl::CreateClientHandler(
633     int client_id) {
634   CHECK(signal_thread_->IsCurrent())
635       << __FUNCTION__ << " called from the wrong thread";
636   auto observer = connection_observer_factory_->CreateObserver();
637 
638   auto client_handler = ClientHandler::Create(
639       client_id, observer, *this,
640       [this, client_id](const Json::Value& msg) {
641         SendMessageToClient(client_id, msg);
642       },
643       [this, client_id](bool isOpen) {
644         if (isOpen) {
645           SetupCameraForClient(client_id);
646         } else {
647           DestroyClientHandler(client_id);
648         }
649       });
650 
651   for (auto& entry : displays_) {
652     auto& label = entry.first;
653     auto& video_source = entry.second.source;
654 
655     auto video_track =
656         peer_connection_factory_->CreateVideoTrack(label, video_source.get());
657     client_handler->AddDisplay(video_track, label);
658   }
659 
660   for (auto& entry : audio_sources_) {
661     auto& label = entry.first;
662     auto& audio_stream = entry.second;
663     auto audio_track =
664         peer_connection_factory_->CreateAudioTrack(label, audio_stream.get());
665     client_handler->AddAudio(audio_track, label);
666   }
667 
668   return client_handler;
669 }
670 
671 Result<rtc::scoped_refptr<webrtc::PeerConnectionInterface>>
Build(webrtc::PeerConnectionObserver & observer,const std::vector<webrtc::PeerConnectionInterface::IceServer> & per_connection_servers)672 Streamer::Impl::Build(
673     webrtc::PeerConnectionObserver& observer,
674     const std::vector<webrtc::PeerConnectionInterface::IceServer>&
675         per_connection_servers) {
676   webrtc::PeerConnectionDependencies dependencies(&observer);
677   auto servers = operator_config_.servers;
678   servers.insert(servers.end(), per_connection_servers.begin(),
679                  per_connection_servers.end());
680   if (config_.udp_port_range != config_.tcp_port_range) {
681     // libwebrtc removed the ability to provide a packet socket factory when
682     // creating a peer connection. They plan to provide that functionality with
683     // the peer connection factory, but that's currently incomplete (the packet
684     // socket factory is ignored by the peer connection factory). The only other
685     // choice to customize port ranges is through the port allocator config, but
686     // this is suboptimal as it only allows to specify a single port range that
687     // will be use for both tcp and udp ports.
688     LOG(WARNING) << "TCP and UDP port ranges differ, TCP connections may not "
689                     "work properly";
690   }
691   return CF_EXPECT(
692       CreatePeerConnection(peer_connection_factory_, std::move(dependencies),
693                            config_.udp_port_range.first,
694                            config_.udp_port_range.second, servers),
695       "Failed to build peer connection");
696 }
697 
SendMessageToClient(int client_id,const Json::Value & msg)698 void Streamer::Impl::SendMessageToClient(int client_id,
699                                          const Json::Value& msg) {
700   LOG(VERBOSE) << "Sending to client: " << msg.toStyledString();
701   CHECK(signal_thread_->IsCurrent())
702       << __FUNCTION__ << " called from the wrong thread";
703   Json::Value wrapper;
704   wrapper[cuttlefish::webrtc_signaling::kPayloadField] = msg;
705   wrapper[cuttlefish::webrtc_signaling::kTypeField] =
706       cuttlefish::webrtc_signaling::kForwardType;
707   wrapper[cuttlefish::webrtc_signaling::kClientIdField] = client_id;
708   // This is safe to call from the webrtc threads because
709   // ServerConnection(s) are thread safe
710   server_connection_->Send(wrapper);
711 }
712 
DestroyClientHandler(int client_id)713 void Streamer::Impl::DestroyClientHandler(int client_id) {
714   // Usually called from signal thread, could be called from websocket thread or
715   // an application thread.
716   signal_thread_->PostTask([this, client_id]() {
717     // This needs to be 'posted' to the thread instead of 'invoked'
718     // immediately for two reasons:
719     // * The client handler is destroyed by this code, it's generally a
720     // bad idea (though not necessarily wrong) to return to a member
721     // function of a destroyed object.
722     // * The client handler may call this from within a peer connection
723     // observer callback, destroying the client handler there leads to a
724     // deadlock.
725     clients_.erase(client_id);
726   });
727 }
728 
SetupCameraForClient(int client_id)729 void Streamer::Impl::SetupCameraForClient(int client_id) {
730   if (!camera_streamer_) {
731     return;
732   }
733   auto client_handler = clients_[client_id];
734   if (client_handler) {
735     auto camera_track = client_handler->GetCameraStream();
736     if (camera_track) {
737       camera_track->AddOrUpdateSink(camera_streamer_.get(),
738                                     rtc::VideoSinkWants());
739     }
740   }
741 }
742 
743 }  // namespace webrtc_streaming
744 }  // namespace cuttlefish
745