1#!/usr/bin/env python3 2# Lint as: python3 3""" 4Base class for setting up devices for CDM functionalities. 5""" 6 7from mobly import base_test 8from mobly import utils 9from mobly.controllers import android_device 10 11CDM_SNIPPET_PACKAGE = 'android.companion.multidevices' 12 13 14class BaseTestClass(base_test.BaseTestClass): 15 16 def setup_class(self): 17 # Declare that two Android devices are needed. 18 self.sender, self.receiver = self.register_controller( 19 android_device, min_number=2) 20 self.sender_id = None 21 self.receiver_id = None 22 23 def _setup_device(device): 24 device.load_snippet('cdm', CDM_SNIPPET_PACKAGE) 25 device.adb.shell('input keyevent KEYCODE_WAKEUP') 26 device.adb.shell('input keyevent KEYCODE_MENU') 27 device.adb.shell('input keyevent KEYCODE_HOME') 28 29 # Clean up existing associations 30 device.cdm.disassociateAll() 31 32 # Sets up devices in parallel to save time. 33 utils.concurrent_exec( 34 _setup_device, 35 ((self.sender,), (self.receiver,)), 36 max_workers=2, 37 raise_on_exception=True) 38 39 def associate_devices(self) -> tuple[int, int]: 40 """Associate devices with each other and return association IDs for both""" 41 # If association already exists, don't need another 42 if self.sender_id and self.receiver_id: 43 return (self.sender_id, self.receiver_id) 44 45 receiver_name = self.receiver.cdm.becomeDiscoverable() 46 self.receiver_id = self.sender.cdm.associate(receiver_name) 47 48 sender_name = self.sender.cdm.becomeDiscoverable() 49 self.sender_id = self.receiver.cdm.associate(sender_name) 50 51 return (self.sender_id, self.receiver_id) 52 53 def attach_transports(self): 54 """Attach transports to both devices""" 55 self.associate_devices() 56 57 self.receiver.cdm.attachServerSocket(self.sender_id) 58 self.sender.cdm.attachClientSocket(self.receiver_id) 59 60 def teardown_class(self): 61 """Clean up the opened sockets""" 62 self.sender.cdm.closeAllSockets() 63 self.receiver.cdm.closeAllSockets() 64 65