1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16"""
17Prerequisites:
18    Windows 10
19    Bluetooth PTS installed
20    Recommended: Running cmder as Admin: https://cmder.net/
21
22### BEGIN SETUP STEPS###
231. Install latest version of Python for windows:
24    https://www.python.org/downloads/windows/
25
26Tested successfully on Python 3.7.3.:
27    https://www.python.org/ftp/python/3.7.3/python-3.7.3.exe
28
292. Launch Powershell and setup PATH:
30Setx PATH “%PATH%;C:/Users/<username>/AppData/Local/Programs/Python/Python37-32/Scripts31
323. Launch Cmder as Admin before running any PTS related ACTS tests.
33
34
35### END SETUP STEPS###
36
37
38Bluetooth PTS controller.
39Mandatory parameters are log_directory and sig_root_directory.
40
41ACTS Config setup:
42"BluetoothPtsDevice": {
43    "log_directory": "C:\\Users\\fsbtt\\Documents\\Profile Tuning Suite\\Test_Dir",
44    "sig_root_directory": "C:\\Program Files (x86)\\Bluetooth SIG"
45}
46
47"""
48from acts import signals
49from datetime import datetime
50
51import ctypes
52import logging
53import os
54import subprocess
55import time
56import xml.etree.ElementTree as ET
57
58from xml.dom import minidom
59from xml.etree.ElementTree import Element
60
61
62class BluetoothPtsDeviceConfigError(signals.ControllerError):
63    pass
64
65
66class BluetoothPtsSnifferError(signals.ControllerError):
67    pass
68
69
70MOBLY_CONTROLLER_CONFIG_NAME = "BluetoothPtsDevice"
71ACTS_CONTROLLER_REFERENCE_NAME = "bluetooth_pts_device"
72
73# Prefix to identify final verdict string. This is a PTS specific log String.
74VERDICT = 'VERDICT/'
75
76# Verdict strings that are specific to PTS.
77VERDICT_STRINGS = {
78    'RESULT_PASS': 'PASS',
79    'RESULT_FAIL': 'FAIL',
80    'RESULT_INCONC': 'INCONC',
81    'RESULT_INCOMP':
82    'INCOMP',  # Initial final verdict meaning that test has not completed yet.
83    'RESULT_NONE':
84    'NONE',  # Error verdict usually indicating internal PTS error.
85}
86
87# Sniffer ready log message.
88SNIFFER_READY = 'SNIFFER/Save and clear complete'
89
90# PTS Log Types as defined by PTS:
91LOG_TYPE_GENERAL_TEXT = 0
92LOG_TYPE_FIRST = 1
93LOG_TYPE_START_TEST_CASE = 1
94LOG_TYPE_TEST_CASE_ENDED = 2
95LOG_TYPE_START_DEFAULT = 3
96LOG_TYPE_DEFAULT_ENDED = 4
97LOG_TYPE_FINAL_VERDICT = 5
98LOG_TYPE_PRELIMINARY_VERDICT = 6
99LOG_TYPE_TIMEOUT = 7
100LOG_TYPE_ASSIGNMENT = 8
101LOG_TYPE_START_TIMER = 9
102LOG_TYPE_STOP_TIMER = 10
103LOG_TYPE_CANCEL_TIMER = 11
104LOG_TYPE_READ_TIMER = 12
105LOG_TYPE_ATTACH = 13
106LOG_TYPE_IMPLICIT_SEND = 14
107LOG_TYPE_GOTO = 15
108LOG_TYPE_TIMED_OUT_TIMER = 16
109LOG_TYPE_ERROR = 17
110LOG_TYPE_CREATE = 18
111LOG_TYPE_DONE = 19
112LOG_TYPE_ACTIVATE = 20
113LOG_TYPE_MESSAGE = 21
114LOG_TYPE_LINE_MATCHED = 22
115LOG_TYPE_LINE_NOT_MATCHED = 23
116LOG_TYPE_SEND_EVENT = 24
117LOG_TYPE_RECEIVE_EVENT = 25
118LOG_TYPE_OTHERWISE_EVENT = 26
119LOG_TYPE_RECEIVED_ON_PCO = 27
120LOG_TYPE_MATCH_FAILED = 28
121LOG_TYPE_COORDINATION_MESSAGE = 29
122
123PTS_DEVICE_EMPTY_CONFIG_MSG = "Configuration is empty, abort!"
124
125
126def create(config):
127    if not config:
128        raise errors.PTS_DEVICE_EMPTY_CONFIG_MSG
129    return get_instance(config)
130
131
132def destroy(pts):
133    try:
134        pts[0].clean_up()
135    except:
136        pts[0].log.error("Failed to clean up properly.")
137
138
139def get_info(pts_devices):
140    """Get information from the BluetoothPtsDevice object.
141
142    Args:
143        pts_devices: A list of BluetoothPtsDevice objects although only one
144        will ever be specified.
145
146    Returns:
147        A dict, representing info for BluetoothPtsDevice object.
148    """
149    return {
150        "address": pts_devices[0].address,
151        "sniffer_ready": pts_devices[0].sniffer_ready,
152        "ets_manager_library": pts_devices[0].ets_manager_library,
153        "log_directory": pts_devices[0].log_directory,
154        "pts_installation_directory":
155        pts_devices[0].pts_installation_directory,
156    }
157
158
159def get_instance(config):
160    """Create BluetoothPtsDevice instance from a dictionary containing
161    information related to PTS. Namely the SIG root directory as
162    sig_root_directory and the log directory represented by the log_directory.
163
164    Args:
165        config: A dict that contains BluetoothPtsDevice device info.
166
167    Returns:
168        A list of BluetoothPtsDevice objects.
169    """
170    result = []
171    try:
172        log_directory = config.pop("log_directory")
173    except KeyError:
174        raise BluetoothPtsDeviceConfigError(
175            "Missing mandatory log_directory in config.")
176    try:
177        sig_root_directory = config.pop("sig_root_directory")
178    except KeyError:
179        example_path = \
180            "C:\\\\Program Files (x86)\\\\Bluetooth SIG"
181        raise BluetoothPtsDeviceConfigError(
182            "Missing mandatory sig_root_directory in config. Example path: {}".
183            format(example_path))
184
185    # "C:\\Program Files (x86)\\Bluetooth SIG\\Bluetooth PTS\\bin\\ETSManager.dll"
186    ets_manager_library = "{}\\Bluetooth PTS\\bin\\ETSManager.dll".format(
187        sig_root_directory)
188    # "C:\\Program Files (x86)\\Bluetooth SIG\\Bluetooth PTS\\bin"
189    pts_installation_directory = "{}\\Bluetooth PTS\\bin".format(
190        sig_root_directory)
191    # "C:\\Program Files (x86)\\Bluetooth SIG\\Bluetooth Protocol Viewer"
192    pts_sniffer_directory = "{}\\Bluetooth Protocol Viewer".format(
193        sig_root_directory)
194    result.append(
195        BluetoothPtsDevice(ets_manager_library, log_directory,
196                           pts_installation_directory, pts_sniffer_directory))
197    return result
198
199
200class BluetoothPtsDevice:
201    """Class representing an Bluetooth PTS device and associated functions.
202
203    Each object of this class represents one BluetoothPtsDevice in ACTS.
204    """
205
206    _next_action = -1
207    _observers = []
208    address = ""
209    current_implicit_send_description = ""
210    devices = []
211    extra_answers = []
212    log_directory = ""
213    log = None
214    ics = None
215    ixit = None
216    profile_under_test = None
217    pts_library = None
218    pts_profile_mmi_request = ""
219    pts_test_result = VERDICT_STRINGS['RESULT_INCOMP']
220    sniffer_ready = False
221    test_log_directory = ""
222    test_log_prefix = ""
223
224    def __init__(self, ets_manager_library, log_directory,
225                 pts_installation_directory, pts_sniffer_directory):
226        self.log = logging.getLogger()
227        if ets_manager_library is not None:
228            self.ets_manager_library = ets_manager_library
229        self.log_directory = log_directory
230        if pts_installation_directory is not None:
231            self.pts_installation_directory = pts_installation_directory
232        if pts_sniffer_directory is not None:
233            self.pts_sniffer_directory = pts_sniffer_directory
234        # Define callback functions
235        self.USEAUTOIMPLSENDFUNC = ctypes.CFUNCTYPE(ctypes.c_bool)
236        self.use_auto_impl_send_func = self.USEAUTOIMPLSENDFUNC(
237            self.UseAutoImplicitSend)
238
239        self.DONGLE_MSG_FUNC = ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_char_p)
240        self.dongle_msg_func = self.DONGLE_MSG_FUNC(self.DongleMsg)
241
242        self.DEVICE_SEARCH_MSG_FUNC = ctypes.CFUNCTYPE(ctypes.c_bool,
243                                                       ctypes.c_char_p,
244                                                       ctypes.c_char_p,
245                                                       ctypes.c_char_p)
246        self.dev_search_msg_func = self.DEVICE_SEARCH_MSG_FUNC(
247            self.DeviceSearchMsg)
248
249        self.LOGFUNC = ctypes.CFUNCTYPE(ctypes.c_bool, ctypes.c_char_p,
250                                        ctypes.c_char_p, ctypes.c_char_p,
251                                        ctypes.c_int, ctypes.c_void_p)
252        self.log_func = self.LOGFUNC(self.Log)
253
254        self.ONIMPLSENDFUNC = ctypes.CFUNCTYPE(ctypes.c_char_p,
255                                               ctypes.c_char_p, ctypes.c_int)
256        self.onimplsend_func = self.ONIMPLSENDFUNC(self.ImplicitSend)
257
258        # Helps with PTS reliability.
259        os.chdir(self.pts_installation_directory)
260        # Load EtsManager
261        self.pts_library = ctypes.cdll.LoadLibrary(self.ets_manager_library)
262        self.log.info("ETS Manager library {0:s} has been loaded".format(
263            self.ets_manager_library))
264        # If post-logging is turned on all callbacks to LPLOG-type function
265        # will be executed after test execution is complete. It is recommended
266        # that post-logging is turned on to avoid simultaneous invocations of
267        # LPLOG and LPAUTOIMPLICITSEND callbacks.
268        self.pts_library.SetPostLoggingEx(True)
269
270        self.xml_root = Element("ARCHIVE")
271        version = Element("VERSION")
272        version.text = "2.0"
273        self.xml_root.append(version)
274        self.xml_pts_pixit = Element("PicsPixit")
275        self.xml_pts_pixit.text = ""
276        self.xml_pts_running_log = Element("LOG")
277        self.xml_pts_running_log.text = ""
278        self.xml_pts_running_summary = Element("SUMMARY")
279        self.xml_pts_running_summary.text = ""
280
281    def clean_up(self):
282        # Since we have no insight to the actual PTS library,
283        # catch all Exceptions and log them.
284        try:
285            self.log.info("Cleaning up Stack...")
286            self.pts_library.ExitStackEx(self.profile_under_test)
287        except Exception as err:
288            self.log.error(
289                "Failed to clean up BluetoothPtsDevice: {}".format(err))
290        try:
291            self.log.info("Unregistering Profile...")
292            self.pts_library.UnregisterProfileEx.argtypes = [ctypes.c_char_p]
293            self.pts_library.UnregisterProfileEx(
294                self.profile_under_test.encode())
295            self.pts_library.UnRegisterGetDevInfoEx()
296        except Exception as err:
297            self.log.error(
298                "Failed to clean up BluetoothPtsDevice: {}".format(err))
299        try:
300            self.log.info("Cleaning up Sniffer")
301            self.pts_library.SnifferTerminateEx()
302        except Exception as err:
303            self.log.error(
304                "Failed to clean up BluetoothPtsDevice: {}".format(err))
305        self.log.info("Cleanup Done.")
306
307    def write_xml_pts_pixit_values_for_current_test(self):
308        """ Writes the current PICS and IXIT values to the XML result.
309        """
310        self.xml_pts_pixit.text = "ICS VALUES:\n\n"
311        for key, value in self.ics.items():
312            self.xml_pts_pixit.text += "{} {}\n".format(
313                key.decode(), value.decode())
314        self.xml_pts_pixit.text += "\nIXIT VALUES:\n\n"
315        for key, (_, value) in self.ixit.items():
316            self.xml_pts_pixit.text += "{} {}\n".format(
317                key.decode(), value.decode())
318
319    def set_ics_and_ixit(self, ics, ixit):
320        self.ics = ics
321        self.ixit = ixit
322
323    def set_profile_under_test(self, profile):
324        self.profile_under_test = profile
325
326    def setup_pts(self):
327        """Prepares PTS to run tests. This needs to be called in test classes
328        after ICS, IXIT, and setting Profile under test.
329        Specifically BluetoothPtsDevice functions:
330            set_profile_under_test
331            set_ics_and_ixit
332        """
333
334        # Register layer to test with callbacks
335        self.pts_library.RegisterProfileWithCallbacks.argtypes = [
336            ctypes.c_char_p, self.USEAUTOIMPLSENDFUNC, self.ONIMPLSENDFUNC,
337            self.LOGFUNC, self.DEVICE_SEARCH_MSG_FUNC, self.DONGLE_MSG_FUNC
338        ]
339        res = self.pts_library.RegisterProfileWithCallbacks(
340            self.profile_under_test.encode(), self.use_auto_impl_send_func,
341            self.onimplsend_func, self.log_func, self.dev_search_msg_func,
342            self.dongle_msg_func)
343
344        self.log.info(
345            "Profile has been registered with result {0:d}".format(res))
346
347        # GetDeviceInfo module is for discovering devices and PTS Dongle address
348        # Initialize GetDeviceInfo and register it with callbacks
349        # First parameter is PTS executable directory
350        self.pts_library.InitGetDevInfoWithCallbacks.argtypes = [
351            ctypes.c_char_p, self.DEVICE_SEARCH_MSG_FUNC, self.DONGLE_MSG_FUNC
352        ]
353        res = self.pts_library.InitGetDevInfoWithCallbacks(
354            self.pts_installation_directory.encode(), self.dev_search_msg_func,
355            self.dongle_msg_func)
356        self.log.info(
357            "GetDevInfo has been initialized with result {0:d}".format(res))
358        # Initialize PTS dongle
359        res = self.pts_library.VerifyDongleEx()
360        self.log.info(
361            "PTS dongle has been initialized with result {0:d}".format(res))
362
363        # Find PTS dongle address
364        self.pts_library.GetDongleBDAddress.restype = ctypes.c_ulonglong
365        self.address = self.pts_library.GetDongleBDAddress()
366        self.address_str = "{0:012X}".format(self.address)
367        self.log.info("PTS BD Address 0x{0:s}".format(self.address_str))
368
369        # Initialize Bluetooth Protocol Viewer communication module
370        self.pts_library.SnifferInitializeEx()
371
372        # If Bluetooth Protocol Viewer is not running, start it
373        if not self.is_sniffer_running():
374            self.log.info("Starting Protocol Viewer")
375            args = [
376                "{}\Executables\Core\FTS.exe".format(
377                    self.pts_sniffer_directory),
378                '/PTS Protocol Viewer=Generic',
379                '/OEMTitle=Bluetooth Protocol Viewer', '/OEMKey=Virtual'
380            ]
381            subprocess.Popen(args)
382            sniffer_timeout = 10
383            while not self.is_sniffer_running():
384                time.sleep(sniffer_timeout)
385
386        # Register to recieve Bluetooth Protocol Viewer notofications
387        self.pts_library.SnifferRegisterNotificationEx()
388        self.pts_library.SetParameterEx.argtypes = [
389            ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p
390        ]
391
392        for ics_name in self.ics:
393            res = self.pts_library.SetParameterEx(
394                ics_name, b'BOOLEAN', self.ics[ics_name],
395                self.profile_under_test.encode())
396            if res:
397                self.log.info("ICS {0:s} set successfully".format(
398                    str(ics_name)))
399            else:
400                self.log.error("Setting ICS {0:s} value failed".format(
401                    str(ics_name)))
402
403        for ixit_name in self.ixit:
404            res = self.pts_library.SetParameterEx(
405                ixit_name, (self.ixit[ixit_name])[0],
406                (self.ixit[ixit_name])[1], self.profile_under_test.encode())
407            if res:
408                self.log.info("IXIT {0:s} set successfully".format(
409                    str(ixit_name)))
410            else:
411                self.log.error("Setting IXIT {0:s} value failed".format(
412                    str(ixit_name)))
413
414        # Prepare directory to store Bluetooth Protocol Viewer output
415        if not os.path.exists(self.log_directory):
416            os.makedirs(self.log_directory)
417
418        address_b = self.address_str.encode("utf-8")
419        self.pts_library.InitEtsEx.argtypes = [
420            ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p
421        ]
422
423        implicit_send_path = "{}\\implicit_send3.dll".format(
424            self.pts_installation_directory).encode()
425        res = self.pts_library.InitEtsEx(self.profile_under_test.encode(),
426                                         self.log_directory.encode(),
427                                         implicit_send_path, address_b)
428        self.log.info("ETS has been initialized with result {0:s}".format(
429            str(res)))
430
431        # Initialize Host Stack DLL
432        self.pts_library.InitStackEx.argtypes = [ctypes.c_char_p]
433        res = self.pts_library.InitStackEx(self.profile_under_test.encode())
434        self.log.info("Stack has been initialized with result {0:s}".format(
435            str(res)))
436
437        # Select to receive Log messages after test is done
438        self.pts_library.SetPostLoggingEx.argtypes = [
439            ctypes.c_bool, ctypes.c_char_p
440        ]
441        self.pts_library.SetPostLoggingEx(True,
442                                          self.profile_under_test.encode())
443
444        # Clear Bluetooth Protocol Viewer. Dongle message callback will update
445        # sniffer_ready automatically. No need to fail setup if the timeout
446        # is exceeded since the logs will still be available just not starting
447        # from a clean slate. Just post a warning.
448        self.sniffer_ready = False
449        self.pts_library.SnifferClearEx()
450        end_time = time.time() + 10
451        while not self.sniffer_ready and time.time() < end_time:
452            time.sleep(1)
453        if not self.sniffer_ready:
454            self.log.warning("Sniffer not cleared. Continuing.")
455
456    def is_sniffer_running(self):
457        """ Looks for running Bluetooth Protocol Viewer process
458
459        Returns:
460            Returns True if finds one, False otherwise.
461        """
462        prog = [
463            line.split()
464            for line in subprocess.check_output("tasklist").splitlines()
465        ]
466        [prog.pop(e) for e in [0, 1, 2]]
467        for task in prog:
468            task_name = task[0].decode("utf-8")
469            if task_name == "Fts.exe":
470                self.log.info("Found FTS process successfully.")
471                # Sleep recommended by PTS.
472                time.sleep(1)
473                return True
474        return False
475
476    def UseAutoImplicitSend(self):
477        """Callback method that defines Which ImplicitSend will be used.
478
479        Returns:
480            True always to inform PTS to use the local implementation.
481        """
482        return True
483
484    def DongleMsg(self, msg_str):
485        """ Receives PTS dongle messages.
486
487        Specifically this receives the Bluetooth Protocol Viewer completed
488        save/clear operations.
489
490        Returns:
491            True if sniffer is ready, False otherwise.
492        """
493        msg = (ctypes.c_char_p(msg_str).value).decode("utf-8")
494        self.log.info(msg)
495        # Sleep recommended by PTS.
496        time.sleep(1)
497        if SNIFFER_READY in msg:
498            self.sniffer_ready = True
499        return True
500
501    def DeviceSearchMsg(self, addr_str, name_str, cod_str):
502        """ Receives device search messages
503
504        Each device may return multiple messages
505        Each message will contain device address and may contain device name and
506        COD.
507
508        Returns:
509            True always and reports to the callback appropriately.
510        """
511        addr = (ctypes.c_char_p(addr_str).value).replace(b'\xed',
512                                                         b' ').decode("utf-8")
513        name = (ctypes.c_char_p(name_str).value).replace(b'\xed',
514                                                         b' ').decode("utf-8")
515        cod = (ctypes.c_char_p(cod_str).value).replace(b'\xed',
516                                                       b' ').decode("utf-8")
517        self.devices.append(
518            "Device address = {0:s} name = {1:s} cod = {2:s}".format(
519                addr, name, cod))
520        return True
521
522    def Log(self, log_time_str, log_descr_str, log_msg_str, log_type, project):
523        """ Receives PTS log messages.
524
525        Returns:
526            True always and reports to the callback appropriately.
527        """
528        log_time = (ctypes.c_char_p(log_time_str).value).decode("utf-8")
529        log_descr = (ctypes.c_char_p(log_descr_str).value).decode("utf-8")
530        log_msg = (ctypes.c_char_p(log_msg_str).value).decode("utf-8")
531        if "Verdict Description" in log_descr:
532            self.xml_pts_running_summary.text += "\t- {}".format(log_msg)
533        if "Final Verdict" in log_descr:
534            self.xml_pts_running_summary.text += "{}{}\n".format(
535                log_descr.strip(), log_msg.strip())
536        full_log_msg = "{}{}{}".format(log_time, log_descr, log_msg)
537        self.xml_pts_running_log.text += "{}\n".format(str(full_log_msg))
538
539        if ctypes.c_int(log_type).value == LOG_TYPE_FINAL_VERDICT:
540            indx = log_msg.find(VERDICT)
541            if indx == 0:
542                if self.pts_test_result == VERDICT_STRINGS['RESULT_INCOMP']:
543                    if VERDICT_STRINGS['RESULT_INCONC'] in log_msg:
544                        self.pts_test_result = VERDICT_STRINGS['RESULT_INCONC']
545                    elif VERDICT_STRINGS['RESULT_FAIL'] in log_msg:
546                        self.pts_test_result = VERDICT_STRINGS['RESULT_FAIL']
547                    elif VERDICT_STRINGS['RESULT_PASS'] in log_msg:
548                        self.pts_test_result = VERDICT_STRINGS['RESULT_PASS']
549                    elif VERDICT_STRINGS['RESULT_NONE'] in log_msg:
550                        self.pts_test_result = VERDICT_STRINGS['RESULT_NONE']
551        return True
552
553    def ImplicitSend(self, description, style):
554        """ ImplicitSend callback
555
556        Implicit Send Styles:
557            MMI_Style_Ok_Cancel1 =     0x11041, Simple prompt           | OK, Cancel buttons      | Default: OK
558            MMI_Style_Ok_Cancel2 =     0x11141, Simple prompt           | Cancel button           | Default: Cancel
559            MMI_Style_Ok1 =            0x11040, Simple prompt           | OK button               | Default: OK
560            MMI_Style_Yes_No1 =        0x11044, Simple prompt           | Yes, No buttons         | Default: Yes
561            MMI_Style_Yes_No_Cancel1 = 0x11043, Simple prompt           | Yes, No buttons         | Default: Yes
562            MMI_Style_Abort_Retry1 =   0x11042, Simple prompt           | Abort, Retry buttons    | Default: Abort
563            MMI_Style_Edit1 =          0x12040, Request for data input  | OK, Cancel buttons      | Default: OK
564            MMI_Style_Edit2 =          0x12140, Select item from a list | OK, Cancel buttons      | Default: OK
565
566        Handling
567            MMI_Style_Ok_Cancel1
568                OK = return "OK"
569                Cancel = return 0
570
571            MMI_Style_Ok_Cancel2
572                OK = return "OK"
573                Cancel = return 0
574
575            MMI_Style_Ok1
576                OK = return "OK", this version should not return 0
577
578            MMI_Style_Yes_No1
579                Yes = return "OK"
580                No = return 0
581
582            MMI_Style_Yes_No_Cancel1
583                Yes = return "OK"
584                No = return 0
585                Cancel = has been deprecated
586
587            MMI_Style_Abort_Retry1
588                Abort = return 0
589                Retry = return "OK"
590
591            MMI_Style_Edit1
592                OK = return expected string
593                Cancel = return 0
594
595            MMI_Style_Edit2
596                OK = return expected string
597                Cancel = return 0
598
599        Receives ImplicitSend messages
600        Description format is as following:
601        {MMI_ID,Test Name,Layer Name}MMI Action\n\nDescription: MMI Description
602        """
603        descr_str = (ctypes.c_char_p(description).value).decode("utf-8")
604        # Sleep recommended by PTS.
605        time.sleep(1)
606        indx = descr_str.find('}')
607        implicit_send_info = descr_str[1:(indx)]
608        self.current_implicit_send_description = descr_str[(indx + 1):]
609        items = implicit_send_info.split(',')
610        implicit_send_info_id = items[0]
611        implicit_send_info_test_case = items[1]
612        self.pts_profile_mmi_request = items[2]
613        self.log.info(
614            "OnImplicitSend() has been called with the following parameters:\n"
615        )
616        self.log.info("\t\tproject_name = {0:s}".format(
617            self.pts_profile_mmi_request))
618        self.log.info("\t\tid = {0:s}".format(implicit_send_info_id))
619        self.log.info(
620            "\t\ttest_case = {0:s}".format(implicit_send_info_test_case))
621        self.log.info("\t\tdescription = {0:s}".format(
622            self.current_implicit_send_description))
623        self.log.info("\t\tstyle = {0:#X}".format(ctypes.c_int(style).value))
624        self.log.info("")
625        try:
626            self.next_action = int(implicit_send_info_id)
627        except Exception as err:
628            self.log.error(
629                "Setting verdict to RESULT_FAIL, exception found: {}".format(
630                    err))
631            self.pts_test_result = VERDICT_STRINGS['RESULT_FAIL']
632        res = b'OK'
633        if len(self.extra_answers) > 0:
634            res = self.extra_answers.pop(0).encode()
635        self.log.info("Sending Response: {}".format(res))
636        return res
637
638    def log_results(self, test_name):
639        """Log results.
640
641        Saves the sniffer results in cfa format and clears the sniffer.
642
643        Args:
644            test_name: string, name of the test run.
645        """
646        self.pts_library.SnifferCanSaveEx.restype = ctypes.c_bool
647        canSave = ctypes.c_bool(self.pts_library.SnifferCanSaveEx()).value
648        self.pts_library.SnifferCanSaveAndClearEx.restype = ctypes.c_bool
649        canSaveClear = ctypes.c_bool(
650            self.pts_library.SnifferCanSaveAndClearEx()).value
651        file_name = "\\{}.cfa".format(self.test_log_prefix).encode()
652        path = self.test_log_directory.encode() + file_name
653
654        if canSave == True:
655            self.pts_library.SnifferSaveEx.argtypes = [ctypes.c_char_p]
656            self.pts_library.SnifferSaveEx(path)
657        else:
658            self.pts_library.SnifferSaveAndClearEx.argtypes = [ctypes.c_char_p]
659            self.pts_library.SnifferSaveAndClearEx(path)
660        end_time = time.time() + 60
661        while self.sniffer_ready == False and end_time > time.time():
662            self.log.info("Waiting for sniffer to be ready...")
663            time.sleep(1)
664        if self.sniffer_ready == False:
665            raise BluetoothPtsSnifferError(
666                "Sniffer not ready after 60 seconds.")
667
668    def execute_test(self, test_name, test_timeout=60):
669        """Execute the input test name.
670
671        Preps PTS to run the test and waits up to 2 minutes for all steps
672        in the execution to finish. Cleanup of PTS related objects follows
673        any test verdict.
674
675        Args:
676            test_name: string, name of the test to execute.
677        """
678        today = datetime.now()
679        self.write_xml_pts_pixit_values_for_current_test()
680        # TODO: Find out how to grab the PTS version. Temporarily
681        # hardcoded to v.7.4.1.2.
682        self.xml_pts_pixit.text = (
683            "Test Case Started: {} v.7.4.1.2, {} started on {}\n\n{}".format(
684                self.profile_under_test, test_name,
685                today.strftime("%A, %B %d, %Y, %H:%M:%S"),
686                self.xml_pts_pixit.text))
687
688        self.xml_pts_running_summary.text += "Test case : {} started\n".format(
689            test_name)
690        log_time_formatted = "{:%Y_%m_%d_%H_%M_%S}".format(datetime.now())
691        formatted_test_name = test_name.replace('/', '_')
692        formatted_test_name = formatted_test_name.replace('-', '_')
693        self.test_log_prefix = "{}_{}".format(formatted_test_name,
694                                              log_time_formatted)
695        self.test_log_directory = "{}\\{}\\{}".format(self.log_directory,
696                                                      self.profile_under_test,
697                                                      self.test_log_prefix)
698        os.makedirs(self.test_log_directory)
699        curr_test = test_name.encode()
700
701        self.pts_library.StartTestCaseEx.argtypes = [
702            ctypes.c_char_p, ctypes.c_char_p, ctypes.c_bool
703        ]
704        res = self.pts_library.StartTestCaseEx(
705            curr_test, self.profile_under_test.encode(), True)
706        self.log.info("Test has been started with result {0:s}".format(
707            str(res)))
708
709        # Wait till verdict is received
710        self.log.info("Begin Test Execution... waiting for verdict.")
711        end_time = time.time() + test_timeout
712        while self.pts_test_result == VERDICT_STRINGS[
713                'RESULT_INCOMP'] and time.time() < end_time:
714            time.sleep(1)
715        self.log.info("End Test Execution... Verdict {}".format(
716            self.pts_test_result))
717
718        # Clean up after test is done
719        self.pts_library.TestCaseFinishedEx.argtypes = [
720            ctypes.c_char_p, ctypes.c_char_p
721        ]
722        res = self.pts_library.TestCaseFinishedEx(
723            curr_test, self.profile_under_test.encode())
724
725        self.log_results(test_name)
726        self.xml_pts_running_summary.text += "{} finished\n".format(test_name)
727        # Add the log results to the XML output
728        self.xml_root.append(self.xml_pts_pixit)
729        self.xml_root.append(self.xml_pts_running_log)
730        self.xml_root.append(self.xml_pts_running_summary)
731        rough_string = ET.tostring(self.xml_root,
732                                   encoding='utf-8',
733                                   method='xml')
734        reparsed = minidom.parseString(rough_string)
735        with open(
736                "{}\\{}.xml".format(self.test_log_directory,
737                                    self.test_log_prefix), "w") as writter:
738            writter.write(
739                reparsed.toprettyxml(indent="  ", encoding="utf-8").decode())
740
741        if self.pts_test_result is VERDICT_STRINGS['RESULT_PASS']:
742            return True
743        return False
744
745    """Observer functions"""
746
747    def bind_to(self, callback):
748        """ Callbacks to add to the observer.
749        This is used for DUTS automatic responses (ImplicitSends local
750        implementation).
751        """
752        self._observers.append(callback)
753
754    @property
755    def next_action(self):
756        return self._next_action
757
758    @next_action.setter
759    def next_action(self, action):
760        self._next_action = action
761        for callback in self._observers:
762            callback(self._next_action)
763
764    """End Observer functions"""
765