1# Copyright (C) 2024 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from threading import Thread
16from mmi2grpc._helpers import assert_description, match_description
17from mmi2grpc._rootcanal import Dongle
18from mmi2grpc._proxy import ProfileProxy
19from time import sleep
20import sys
21
22from pandora_experimental.gatt_grpc import GATT
23from pandora_experimental.gatt_pb2 import GattServiceParams, GattCharacteristicParams
24from pandora.host_grpc import Host
25from pandora.host_pb2 import PUBLIC, RANDOM, DISCOVERABLE_GENERAL, NOT_DISCOVERABLE, DISCOVERABLE_LIMITED, NOT_CONNECTABLE, DataTypes
26from pandora.security_grpc import Security, SecurityStorage
27from pandora.security_pb2 import LEVEL1, LEVEL2, LE_LEVEL3, PairingEventAnswer
28
29
30class GAPProxy(ProfileProxy):
31
32    def __init__(self, channel, rootcanal):
33        super().__init__(channel)
34        self.gatt = GATT(channel)
35        self.host = Host(channel)
36        self.security = Security(channel)
37        self.security_storage = SecurityStorage(channel)
38        self.rootcanal = rootcanal
39
40        self.connection = None
41        self.pairing_events = None
42        self.inquiry_responses = None
43        self.scan_responses = None
44
45        self.counter = 0
46        self.cached_passkey = None
47
48        self._auto_confirm_requests()
49
50    def test_started(self, test: str, description: str, pts_addr: bytes):
51        if test in [
52                "GAP/CONN/CPUP/BV-06-C",
53        ]:
54            self.rootcanal.select_pts_dongle(Dongle.LAIRD_BL654)
55        else:
56            self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE)
57
58        if test in [
59                "GAP/DM/LEP/BV-07-C",
60                "GAP/DM/LEP/BV-08-C",
61                "GAP/DM/LEP/BV-11-C",
62                "GAP/MOD/CON/BV-01-C",
63        ]:
64            self.host.SetDiscoverabilityMode(mode=DISCOVERABLE_GENERAL)
65
66        return "OK"
67
68    @match_description
69    def TSC_MMI_iut_send_hci_connect_request(self, test: str, pts_addr: bytes, **kwargs):
70        """
71        Please send an HCI connect request to establish a basic rate
72        connection( after the IUT discovers the Lower Tester over BR and LE)?.
73        """
74
75        if test in [
76                "GAP/IDLE/BON/BV-03-C",
77                "GAP/IDLE/BON/BV-04-C",
78                "GAP/IDLE/BON/BV-05-C",
79                "GAP/IDLE/BON/BV-06-C",
80                "GAP/SEC/AUT/BV-02-C",
81                "GAP/SEC/SEM/BV-05-C",
82                "GAP/SEC/SEM/BV-08-C",
83                "GAP/SEC/SEM/BV-50-C",
84                "GAP/SEC/SEM/BI-27-C",
85                "GAP/SEC/SEM/BI-32-C",
86                "GAP/EST/LIE/BV-02-C",
87        ]:
88            # we connect then pair, so we have to pair directly in this MMI
89            self.pairing_events = self.security.OnPairing()
90            self.connection = self.host.Connect(address=pts_addr).connection
91        else:
92            self.connection = self.host.Connect(address=pts_addr).connection
93
94        return "OK"
95
96    @assert_description
97    def _mmi_222(self, **kwargs):
98        """
99        Please initiate a BR/EDR security authentication and pairing with
100        interaction of HCI commands.
101
102        Press OK to continue.
103        """
104
105        # pairing already initiated with Connect() on Android
106        self.pairing_events = self.security.OnPairing()
107
108        return "OK"
109
110    @match_description
111    def _mmi_2001(self, passkey: str, **kwargs):
112        """
113        Please verify the passKey is correct: (?P<passkey>[0-9]+)
114        """
115
116        for event in self.pairing_events:
117            assert event.numeric_comparison == int(passkey), (event, passkey)
118            self.pairing_events.send(PairingEventAnswer(event=event, confirm=True))
119            return "OK"
120
121        assert False, "did not receive expected pairing event"
122
123    @assert_description
124    def TSC_MMI_iut_send_advertising_report_event_connectable_undirected(self, **kwargs):
125        """
126        Please send a connectable undirected advertising report.
127        """
128
129        self.advertise = self.host.Advertise(
130            legacy=True,
131            connectable=True,
132            own_address_type=PUBLIC,
133        )
134
135        self.pairing_events = self.security.OnPairing()
136
137        return "OK"
138
139    @assert_description
140    def TSC_MMI_iut_enter_handle_for_insufficient_authentication(self, pts_addr: bytes, **kwargs):
141        """
142        Please enter the handle(2 octet) to the characteristic in the IUT
143        database where Insufficient Authentication error will be returned :
144        """
145
146        response = self.gatt.RegisterService(service=GattServiceParams(
147            uuid="955798ce-3022-455c-b759-ee8edcd73d1a",
148            characteristics=[
149                GattCharacteristicParams(
150                    uuid="cf99ed9b-3c43-4343-b8a7-8afa513752ce",
151                    properties=0x02,  # PROPERTY_READ,
152                    permissions=0x04,  # PERMISSION_READ_ENCRYPTED_MITM
153                ),
154            ],
155        ))
156
157        self.pairing_events = self.security.OnPairing()
158
159        return handle_format(response.service.characteristics[0].handle)
160
161    @match_description
162    def TSC_MMI_the_security_id_is(self, pts_addr: bytes, passkey: str, **kwargs):
163        """
164        The Secure ID is (?P<passkey>[0-9]*)
165        """
166
167        for event in self.pairing_events:
168            if event.address == pts_addr and event.passkey_entry_request:
169                self.pairing_events.send(PairingEventAnswer(event=event, passkey=int(passkey)))
170                return "OK"
171
172        assert False
173
174    @assert_description
175    def TSC_MMI_iut_send_le_connect_request(self, test: str, pts_addr: bytes, **kwargs):
176        """
177        Please send an LE connect request to establish a connection.
178        """
179
180        if test == "GAP/DM/BON/BV-01-C":
181            # we also begin pairing here if we are not already paired on LE
182            if self.counter == 0:
183                self.counter += 1
184                self.security_storage.DeleteBond(public=pts_addr)
185                self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection
186                self.security.Secure(connection=self.connection, le=LE_LEVEL3)
187                return "OK"
188
189        if test == "GAP/SEC/AUT/BV-21-C" and self.connection is not None:
190            # no-op since the peer just disconnected from us,
191            # so we have immediately auto-connected back to it
192            return "OK"
193
194        if test in [
195                "GAP/CONN/DCEP/BV-03-C",
196                "GAP/CONN/GCEP/BV-02-C",
197                "GAP/DM/LEP/BV-06-C",
198                "GAP/CONN/GCEP/BV-01-C",
199        ]:
200            # PTS is not advertising with the local name, use identity address
201            address = pts_addr
202        else:
203            # the PTS sometimes decides to advertise with an RPA, so we do a scan to find its real address
204            scans = self.host.Scan()
205            for scan in scans:
206                adv_address = scan.public if scan.HasField("public") else scan.random
207                device_name = scan.data.complete_local_name
208                if "pts" in device_name.lower():
209                    address = adv_address
210                    scans.cancel()
211                    break
212
213        self.pairing_events = self.security.OnPairing()
214        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=address).connection
215
216        if test in {"GAP/BOND/BON/BV-04-C"}:
217            self.security.Secure(connection=self.connection, le=LE_LEVEL3)
218
219        return "OK"
220
221    @assert_description
222    def TSC_MMI_enter_security_id(self, pts_addr: bytes, **kwargs):
223        """
224        Please enter Secure Id.
225        """
226
227        if self.cached_passkey is not None:
228            self.log(f"Returning cached passkey entry {self.cached_passkey}")
229            return str(self.cached_passkey)
230
231        for event in self.pairing_events:
232            if event.address == pts_addr and event.passkey_entry_notification:
233                self.log(f"Got passkey entry {event.passkey_entry_notification}")
234                self.cached_passkey = event.passkey_entry_notification
235                return str(event.passkey_entry_notification)
236
237        assert False
238
239    @match_description
240    def TSC_MMI_iut_send_att_service_request(self, pts_addr: bytes, handle: str, **kwargs):
241        r"""
242        Please send an ATT service request - read or write request with handle
243        (?P<handle>[0-9a-e]+) \(octet\).Discover services if needed.
244        """
245
246        self.gatt.ReadCharacteristicFromHandle(
247            connection=self.connection,
248            # They want us to read the characteristic value handle using ATT, but the interface only lets us
249            # read the characteristic by its handle. So we offset by one, since in this test the characteristic
250            # value handle is one above the characteristic handle itself.
251            handle=int(handle, base=16) - 1,
252        )
253
254        return "OK"
255
256    @assert_description
257    def TSC_MMI_iut_send_advertising_report_event_service_uuid(self, **kwargs):
258        """
259        Please prepare IUT to send an advertising report with Service UUID.
260        """
261
262        self.advertise = self.host.Advertise(
263            legacy=True,
264            own_address_type=PUBLIC,
265            data=DataTypes(complete_service_class_uuids128=["955798ce-3022-455c-b759-ee8edcd73d1a"],))
266        return "OK"
267
268    @assert_description
269    def TSC_MMI_iut_send_advertising_report_event_local_name(self, **kwargs):
270        """
271        Please prepare IUT to send an advertising report with Local Name.
272        """
273
274        self.advertise = self.host.Advertise(own_address_type=PUBLIC,
275                                             legacy=True,
276                                             data=DataTypes(
277                                                 include_complete_local_name=True,
278                                                 include_shortened_local_name=True,
279                                             ))
280
281        return "OK"
282
283    @assert_description
284    def TSC_MMI_iut_send_advertising_report_event_flags(self, **kwargs):
285        """
286        Please prepare IUT to send an advertising report with Flags.
287        """
288
289        self.advertise = self.host.Advertise(
290            legacy=True,
291            connectable=True,
292            own_address_type=PUBLIC,
293        )
294
295        self.pairing_events = self.security.OnPairing()
296
297        return "OK"
298
299    @assert_description
300    def TSC_MMI_iut_send_advertising_report_event_manufacturer_specific_data(self, **kwargs):
301        """
302        Please prepare IUT to send an advertising report with Manufacture
303        Specific Data.
304        """
305
306        self.advertise = self.host.Advertise(own_address_type=PUBLIC,
307                                             legacy=True,
308                                             data=DataTypes(manufacturer_specific_data=b"d0n't b3 3v1l!",))
309
310        return "OK"
311
312    @assert_description
313    def TSC_MMI_iut_send_advertising_report_event_tx_power_level(self, **kwargs):
314        """
315        Please prepare IUT to send an advertising report with TX Power Level.
316        """
317
318        self.advertise = self.host.Advertise(legacy=True,
319                                             own_address_type=PUBLIC,
320                                             data=DataTypes(include_tx_power_level=True,))
321
322        return "OK"
323
324    @assert_description
325    def TSC_MMI_iut_send_advertising_report_event_connectable(self, **kwargs):
326        """
327        Please send a connectable advertising report.
328        """
329
330        self.advertise = self.host.Advertise(
331            legacy=True,
332            own_address_type=PUBLIC,
333            connectable=True,
334        )
335
336        self.pairing_events = self.security.OnPairing()
337
338        return "OK"
339
340    @assert_description
341    def TSC_MMI_iut_send_advertising_report_event_ADV_IND(self, **kwargs):
342        """
343        Please send connectable undirected advertising report.
344        """
345
346        self.advertise = self.host.Advertise(
347            legacy=True,
348            own_address_type=PUBLIC,
349            connectable=True,
350        )
351
352        self.pairing_events = self.security.OnPairing()
353
354        return "OK"
355
356    @assert_description
357    def TSC_MMI_iut_confirm_idle_mode_security_4(self, **kwargs):
358        """
359        Please confirm that IUT is in Idle mode with security mode 4. Press OK
360        when IUT is ready to start device discovery.
361        """
362
363        return "OK"
364
365    @match_description
366    def TSC_MMI_iut_start_general_inquiry_found(self, pts_addr: bytes, **kwargs):
367        """
368        Please start general inquiry. Click 'Yes' If IUT does discovers PTS and
369        ready for PTS to initiate (a|LE) create connection otherwise click 'No'.
370        """
371
372        inquiry_responses = self.host.Inquiry()
373        for response in inquiry_responses:
374            assert response.address == pts_addr, (response.address, pts_addr)
375            inquiry_responses.cancel()
376            return "Yes"
377
378        assert False
379
380    @assert_description
381    def TSC_MMI_iut_send_att_read_by_type_request_name_request(self, pts_addr: bytes, **kwargs):
382        """
383        Please start the Name Discovery Procedure to retrieve Device Name from
384        the PTS.
385        """
386
387        # Android does RNR when connecting for the first time
388        self.connection = self.host.Connect(address=pts_addr).connection
389
390        return "OK"
391
392    @match_description
393    def TSC_MMI_iut_confirm_device_discovery(self, test: str, pts_addr: bytes, **kwargs):
394        """
395        Please confirm that IUT has discovered PTS and retrieved its name '?(?P<name>[a-zA-Z\-0-9]*)'?\.?
396        """
397        #Verifying if the BD Address matches in Inquiry
398        inquiry_responses = self.host.Inquiry()
399        for response in inquiry_responses:
400            assert response.address == pts_addr, (response.address, pts_addr)
401            inquiry_responses.cancel()
402            return "Yes"
403
404        assert False
405
406    @assert_description
407    def TSC_MMI_check_if_iut_support_non_connectable_advertising(self, **kwargs):
408        """
409        Does the IUT have an ability to send non-connectable advertising report?
410        """
411
412        return "Yes"
413
414    @assert_description
415    def TSC_MMI_iut_send_advertising_report_event_general_discoverable_ok_to_continue(self, **kwargs):
416        """
417        Please prepare IUT into general discoverable mode and send an
418        advertising report. Press OK to continue.
419        """
420
421        self.advertise = self.host.Advertise(
422            legacy=True,
423            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
424            own_address_type=PUBLIC,
425            connectable=True,
426        )
427
428        self.pairing_events = self.security.OnPairing()
429
430        return "OK"
431
432    @assert_description
433    def TSC_MMI_iut_send_advertising_report_event_general_discoverable_0203(self, **kwargs):
434        """
435        Please prepare IUT into general discoverable mode and send an
436        advertising report using either non - directed advertising or
437        discoverable undirected advertising.
438        """
439
440        self.advertise = self.host.Advertise(
441            legacy=True,
442            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
443            own_address_type=PUBLIC,
444            connectable=True,
445        )
446
447        self.pairing_events = self.security.OnPairing()
448
449        return "OK"
450
451    @assert_description
452    def TSC_MMI_iut_enter_undirected_connectable_mode_non_discoverable_mode(self, **kwargs):
453        """
454        Please prepare IUT into non-discoverable mode and send an advertising
455        report using connectable undirected advertising.
456        """
457
458        self.advertise = self.host.Advertise(
459            legacy=True,
460            data=DataTypes(le_discoverability_mode=NOT_DISCOVERABLE),
461            own_address_type=PUBLIC,
462            connectable=True,
463        )
464
465        return "OK"
466
467    def TSC_MMI_iut_send_advertising_report_event_general_discoverable_00(self, **kwargs):
468        """
469        Please prepare IUT into general discoverable mode and send an
470        advertising report using connectable undirected advertising.
471        """
472
473        self.advertise = self.host.Advertise(
474            legacy=True,
475            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
476            own_address_type=PUBLIC,
477            connectable=True,
478        )
479
480        return "OK"
481
482    @assert_description
483    def TSC_MMI_iut_start_general_discovery(self, **kwargs):
484        """
485        Please start General Discovery. Press OK to continue.
486        """
487
488        self.scan_responses = self.host.Scan()
489
490        return "OK"
491
492    @assert_description
493    def TSC_MMI_iut_start_limited_discovery(self, **kwargs):
494        """
495        Please start Limited Discovery. Press OK to continue.
496        """
497
498        self.scan_responses = self.host.Scan()
499
500        return "OK"
501
502    @assert_description
503    def TSC_MMI_iut_confirm_general_discovered_device(self, pts_addr: bytes, **kwargs):
504        """
505        Please confirm that PTS is discovered.
506        """
507
508        for response in self.scan_responses:
509            assert response.HasField("public")
510            # General Discoverability shall be able to check both limited and general advertising
511            if response.public == pts_addr:
512                self.scan_responses.cancel()
513                return "OK"
514
515        assert False
516
517    @assert_description
518    def TSC_MMI_iut_confirm_limited_discovered_device(self, pts_addr: bytes, **kwargs):
519        """
520        Please confirm that PTS is discovered.
521        """
522
523        for response in self.scan_responses:
524            assert response.HasField("public")
525            if (response.public == pts_addr and response.data.le_discoverability_mode == DISCOVERABLE_LIMITED):
526                self.scan_responses.cancel()
527                return "OK"
528
529        assert False
530
531    @assert_description
532    def TSC_MMI_iut_confirm_general_discovered_device_not_found(self, pts_addr: bytes, **kwargs):
533        """
534        Please confirm that PTS is NOT discovered.
535        """
536
537        discovered = False
538
539        def search():
540            nonlocal discovered
541            for response in self.scan_responses:
542                assert response.HasField("public")
543                if (response.public == pts_addr and response.data.le_discoverability_mode == DISCOVERABLE_GENERAL):
544                    self.scan_responses.cancel()
545                    discovered = True
546                    return
547
548        # search for five seconds, if we don't find anything, give up
549        worker = Thread(target=search)
550        worker.start()
551        worker.join(timeout=5)
552
553        assert not discovered
554
555        return "OK"
556
557    @assert_description
558    def TSC_MMI_iut_confirm_limited_discovered_device_not_found(self, pts_addr: bytes, **kwargs):
559        """
560        Please confirm that PTS is NOT discovered.
561        """
562
563        discovered = False
564
565        def search():
566            nonlocal discovered
567            for response in self.scan_responses:
568                assert response.HasField("public")
569                if (response.public == pts_addr and response.data.le_discoverability_mode == DISCOVERABLE_LIMITED):
570                    self.inquiry_responses.cancel()
571                    discovered = True
572                    return
573
574        # search for five seconds, if we don't find anything, give up
575        worker = Thread(target=search)
576        worker.start()
577        worker.join(timeout=5)
578
579        assert not discovered
580
581        return "OK"
582
583    @assert_description
584    def TSC_MMI_iut_send_advertising_report_event_non_discoverable(self, **kwargs):
585        """
586        Please prepare IUT into non-discoverable and non-connectable mode and
587        send an advertising report.
588        """
589
590        self.advertise = self.host.Advertise(
591            legacy=True,
592            own_address_type=PUBLIC,
593        )
594
595        return "OK"
596
597    @assert_description
598    def TSC_MMI_set_iut_in_bondable_mode(self, **kwargs):
599        """
600        Please set IUT into bondable mode. Press OK to continue.
601        """
602
603        return "OK"
604
605    @assert_description
606    def TSC_MMI_iut_send_le_disconnect_request(self, test: str, pts_addr: bytes, **kwargs):
607        """
608        Please send a disconnect request to terminate connection.
609        """
610        if test == "GAP/CONN/TERM/BV-01-C":
611            self.connection = next(self.advertise).connection
612        try:
613            self.host.Disconnect(connection=self.connection)
614        except Exception:
615            pass
616
617        return "OK"
618
619    @match_description
620    def TSC_MMI_iut_start_bonding_procedure_bondable(self, test: str, pts_addr: bytes, **kwargs):
621        """
622        (Please start the Bonding Procedure in bondable mode.|Please configure the IUT into LE Security and start pairing process.)
623        """
624
625        if not self.pairing_events:
626            self.pairing_events = self.security.OnPairing()
627
628        if not self.connection:
629            self.connection = next(self.advertise).connection
630
631        if test == "GAP/DM/BON/BV-01-C":
632            # we already started in the previous test
633            return "OK"
634
635        if test not in {"GAP/SEC/AUT/BV-21-C"}:
636            self.security_storage.DeleteBond(public=pts_addr)
637
638        if test in ["GAP/SEC/SEM/BV-53-C"]:
639            self.security.Secure(connection=self.connection, classic=LEVEL1)
640        else:
641
642            def secure():
643                self.security.Secure(connection=self.connection, le=LE_LEVEL3)
644
645            Thread(target=secure).start()
646
647        return "OK"
648
649    @assert_description
650    def TSC_MMI_make_iut_connectable(self, **kwargs):
651        """
652        Please make IUT connectable. Press OK to continue.
653        """
654
655        return "OK"
656
657    @assert_description
658    def TSC_MMI_iut_start_general_discovery_DM(self, pts_addr: bytes, **kwargs):
659        """
660        Please start general discovery over BR/EDR and over LE. If IUT discovers
661        PTS with both BR/EDR and LE method, press OK.
662        """
663
664        discovered_bredr = False
665
666        def search_bredr():
667            nonlocal discovered_bredr
668            inquiry_responses = self.host.Inquiry()
669            for response in inquiry_responses:
670                if response.address == pts_addr:
671                    inquiry_responses.cancel()
672                    discovered_bredr = True
673                    return
674
675        bredr_worker = Thread(target=search_bredr)
676        bredr_worker.start()
677
678        discovered_le = False
679
680        def search_le():
681            nonlocal discovered_le
682            scan_responses = self.host.Scan()
683            for event in scan_responses:
684                address = event.public if event.HasField("public") else event.random
685                if (address == pts_addr and event.data.le_discoverability_mode):
686                    scan_responses.cancel()
687                    discovered_le = True
688                    return
689
690        le_worker = Thread(target=search_le)
691        le_worker.start()
692
693        # search for five seconds, if we don't find anything, give up
694        bredr_worker.join(timeout=5)
695        le_worker.join(timeout=5)
696
697        assert discovered_bredr and discovered_le, (discovered_bredr, discovered_le)
698
699        return "OK"
700
701    def TSC_MMI_make_iut_general_discoverable(self, test: str, **kwargs):
702        """
703        Please make IUT general discoverable. Press OK to continue.
704        """
705
706        self.host.SetDiscoverabilityMode(mode=DISCOVERABLE_GENERAL)
707
708        self.advertise = self.host.Advertise(
709            legacy=True,
710            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
711            own_address_type=PUBLIC,
712            connectable=True,
713        )
714
715        if test in [
716                "GAP/SEC/SEM/BI-31-C",
717        ]:
718            self.pairing_events = self.security.OnPairing()
719
720        return "OK"
721
722    @assert_description
723    def TSC_MMI_iut_start_basic_rate_name_discovery_DM(self, pts_addr: bytes, **kwargs):
724        """
725        Please start device name discovery over BR/EDR . If IUT discovers PTS,
726        press OK to continue.
727        """
728
729        inquiry_responses = self.host.Inquiry()
730        for response in inquiry_responses:
731            if response.address == pts_addr:
732                inquiry_responses.cancel()
733                return "OK"
734
735        assert False
736
737    @assert_description
738    def TSC_MMI_make_iut_not_connectable(self, **kwargs):
739        """
740        Please make IUT not connectable. Press OK to continue.
741        """
742
743        self.host.SetDiscoverabilityMode(mode=NOT_DISCOVERABLE)
744        self.host.SetConnectabilityMode(mode=NOT_CONNECTABLE)
745
746        return "OK"
747
748    @assert_description
749    def TSC_MMI_make_iut_not_discoverable(self, **kwargs):
750        """
751        Please make IUT not discoverable. Press OK to continue.
752        """
753
754        self.host.SetDiscoverabilityMode(mode=NOT_DISCOVERABLE)
755        self.host.SetConnectabilityMode(mode=NOT_CONNECTABLE)
756
757        return "OK"
758
759    @assert_description
760    def TSC_MMI_press_ok_to_disconnect(self, test: str, pts_addr: bytes, **kwargs):
761        """
762        Please press ok to disconnect the link.
763        """
764
765        return "OK"
766
767    @assert_description
768    def TSC_MMI_iut_send_att_disconnect_request(self, **kwargs):
769        """
770        Please send an ATT disconnect request to terminate an L2CAP channel.
771        """
772
773        try:
774            self.host.Disconnect(connection=self.connection)
775        except Exception:
776            # we already disconnected, no-op
777            pass
778
779        return "OK"
780
781    @assert_description
782    def TSC_MMI_iut_start_bonding_procedure_non_bondable(self, pts_addr: bytes, **kwargs):
783        """
784        Please start the Bonding Procedure in non-bondable mode.
785        """
786
787        # No idea how we can bond in non-bondable mode, but this passes the tests...
788        self.security.Secure(connection=self.connection, le=LE_LEVEL3)
789
790        return "OK"
791
792    @assert_description
793    def TSC_MMI_iut_send_le_connection_update_request_timeout(self, **kwargs):
794        """
795        Please send an L2CAP Connection Parameter Update request using valid
796        parameters and wait for TSPX_iut_connection_parameter_timeout 30000ms
797        timeout...
798        """
799
800        return "OK"
801
802    @assert_description
803    def TSC_MMI_iut_perform_direct_connection_establishment_procedure(self, **kwargs):
804        """
805        Please prepare IUT into the Direct Connection Establishment Procedure.
806        """
807
808        return "OK"
809
810    @assert_description
811    def TSC_MMI_iut_perform_general_connection_establishment_procedure(self, **kwargs):
812        """
813        Please prepare IUT into the General Connection Establishment Procedure.
814        Press ok to continue.
815        """
816
817        return "OK"
818
819    @match_description
820    def TSC_MMI_iut_enter_non_connectable_mode(self, **kwargs):
821        """
822        Please enter (Non-Connectable|non connectable) mode( and genrate advertising report event)?.
823        """
824
825        self.advertise = self.host.Advertise(
826            legacy=True,
827            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
828            own_address_type=PUBLIC,
829            connectable=False,
830        )
831
832        return "OK"
833
834    @assert_description
835    def TSC_MMI_iut_enter_non_connectable_mode_general_discoverable_mode(self, **kwargs):
836        """
837        Please enter General Discoverable and Non-Connectable mode.
838        """
839
840        self.host.SetDiscoverabilityMode(mode=DISCOVERABLE_GENERAL)
841        self.host.SetConnectabilityMode(mode=NOT_CONNECTABLE)
842
843        return "OK"
844
845    @assert_description
846    def TSC_MMI_iut_send_advertising_report_event_0203_general_discoverable(self, **kwargs):
847        """
848        Please send non-connectable undirected advertising report or
849        discoverable undirected advertising report with general discoverable
850        flags turned on.
851        """
852
853        self.advertise = self.host.Advertise(
854            legacy=True,
855            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
856            own_address_type=PUBLIC,
857            connectable=False,
858        )
859
860        return "OK"
861
862    @assert_description
863    def TSC_MMI_iut_send_advertising_report_event_non_discoverable_and_undirected_connectable(self, **kwargs):
864        """
865        Please prepare IUT into non-discoverable and connectable mode and send
866        an advertising report.
867        """
868
869        self.advertise = self.host.Advertise(
870            legacy=True,
871            data=DataTypes(le_discoverability_mode=NOT_DISCOVERABLE),
872            own_address_type=PUBLIC,
873            connectable=True,
874        )
875
876        return "OK"
877
878    @assert_description
879    def TSC_MMI_iut_enter_undirected_connectable_mode_general_discoverable_mode(self, **kwargs):
880        """
881        Please prepare IUT into general discoverable mode and send an
882        advertising report using connectable undirected advertising.
883        """
884
885        self.advertise = self.host.Advertise(
886            legacy=True,
887            data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL),
888            own_address_type=PUBLIC,
889            connectable=True,
890        )
891
892        return "OK"
893
894    @assert_description
895    def TSC_MMI_wait_for_encryption_change_event(self, **kwargs):
896        """
897        Waiting for HCI_ENCRYPTION_CHANGE_EVENT...
898        """
899
900        return "OK"
901
902    @assert_description
903    def TSC_MMI_iut_enter_security_mode_4(self, **kwargs):
904        """
905        Please order the IUT to go in connectable mode and in security mode 4.
906        Press OK to continue.
907        """
908
909        self.pairing_events = self.security.OnPairing()
910
911        return "OK"
912
913    @assert_description
914    def _mmi_251(self, **kwargs):
915        """
916        Please send L2CAP Connection Response to PTS.
917        """
918
919        return "OK"
920
921    @assert_description
922    def _mmi_230(self, **kwargs):
923        """
924        Please order the IUT to be in security mode 4. Press OK to make
925        connection to Lower Tester.
926        """
927
928        return "OK"
929
930    @assert_description
931    def TSC_MMI_iut_start_simple_pairing(self, pts_addr: bytes, **kwargs):
932        """
933        Please start simple pairing procedure.
934        """
935
936        # we have always started this already in the connection, so no-op
937
938        return "OK"
939
940    def TSC_MMI_iut_send_l2cap_connect_request(self, pts_addr: bytes, **kwargs):
941        """
942        Please initiate BR/EDR security authentication and pairing to establish
943        a service level enforced security!
944        After that, please create the service
945        channel using L2CAP Connection Request.
946
947        Press OK to continue.
948        """
949
950        def after_that():
951            sleep(5)
952            self.host.Connect(address=pts_addr)
953
954        Thread(target=after_that).start()
955
956        return "OK"
957
958    @assert_description
959    def TSC_MMI_iut_confirm_lost_bond(self, **kwargs):
960        """
961        Please confirm that IUT has informed of a lost bond.
962        """
963
964        return "OK"
965
966    @assert_description
967    def TSC_MMI_iut_send_att_connect_request(self, test: str, pts_addr: bytes, **kwargs):
968        """
969        Please send an ATT connect request to establish an L2CAP channel.
970        """
971        self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection
972
973        return "OK"
974
975    @match_description
976    def TSC_MMI_iut_send_ll_connection_update_request(self, **kwargs):
977        """
978        Please send a LL Connection Parameter Update request using valid
979        parameters.
980        (With 0x0032 value set in TSPX_conn_update_int_min
981         0x0046
982        value set in TSPX_conn_update_int_max
983         0x0001 value set in
984        TSPX_conn_update_peripheral_latency and
985         0x01F4 value set in
986        TSPX_conn_update_supervision_timeout)?
987        """
988
989        return "OK"
990
991    @assert_description
992    def TSC_MMI_iut_send_le_connection_update_request(self, **kwargs):
993        """
994        Please start a Connection Update procedure using valid parameters.
995        With 0x0032 value set in TSPX_conn_update_int_min
996         0x0046 value set in
997        TSPX_conn_update_int_max
998         0x0001 value set in
999        TSPX_conn_update_peripheral_latency and
1000         0x01F4 value set in
1001        TSPX_conn_update_supervision_timeout
1002        """
1003
1004        return "OK"
1005
1006    @assert_description
1007    def TSC_MMI_iut_enter_handle_for_insufficient_authentication_or_insufficient_encryption(self, **kwargs):
1008        """
1009        Please enter the handle to the characteristic in the IUT database where
1010        Insufficient Authentication or Insufficient Encryption error will be
1011        returned:
1012        """
1013
1014        response = self.gatt.RegisterService(service=GattServiceParams(
1015            uuid="955798ce-3022-455c-b759-ee8edcd73d1a",
1016            characteristics=[
1017                GattCharacteristicParams(
1018                    uuid="cf99ed9b-3c43-4343-b8a7-8afa513752ce",
1019                    properties=0x02,  # PROPERTY_READ,
1020                    permissions=0x04,  # PERMISSION_READ_ENCRYPTED_MITM
1021                ),
1022            ],
1023        ))
1024
1025        self.pairing_events = self.security.OnPairing()
1026
1027        return handle_format(response.service.characteristics[0].handle)
1028
1029    @assert_description
1030    def TSC_MMI_iut_remove_bonding(self, pts_addr: bytes, **kwargs):
1031        """
1032        Please have Upper Tester remove the bonding information of the PTS.
1033        Press OK to continue.
1034        """
1035
1036        self.security_storage.DeleteBond(public=pts_addr)
1037
1038        return "OK"
1039
1040    @assert_description
1041    def _mmi_231(self, test: str, pts_addr: bytes, **kwargs):
1042        """
1043        Please start the Bonding Procedure in bondable mode.
1044        After Bonding
1045        Procedure is completed, please send a disconnect request to terminate
1046        connection.
1047        """
1048
1049        if test != "GAP/SEC/SEM/BV-08-C":
1050            # we already started in the Connect MMI
1051            self.pairing_events = self.security.OnPairing()
1052            self.security.Secure(connection=self.connection, le=LE_LEVEL3)
1053
1054        def after_that():
1055            self.host.WaitConnection()  # this really waits for bonding
1056            sleep(1)
1057            self.host.Disconnect(connection=self.connection)
1058
1059        Thread(target=after_that).start()
1060
1061        return "OK"
1062
1063    @match_description
1064    def TSC_MMI_helper_do_not_find_confirm(self, pts_addr: bytes, passkey: str, **kwargs):
1065        """
1066        Please confirm the following number matches IUT: (?P<passkey>[0-9]+).
1067        """
1068
1069        for event in self.pairing_events:
1070            if event.address == pts_addr and event.numeric_comparison == int(passkey):
1071                self.pairing_events.send(PairingEventAnswer(event=event, confirm=True))
1072                return "OK"
1073
1074        assert False
1075
1076    @assert_description
1077    def _mmi_208(self, **kwargs):
1078        """
1079        Please configure the IUT into LE Security Mode 1 Level 4 and start
1080        pairing process.
1081        """
1082
1083        def secure():
1084            self.pairing_events = self.security.OnPairing()
1085            self.security.Secure(connection=self.connection, le=LE_LEVEL3)
1086
1087        Thread(target=secure).start()
1088
1089        return "OK"
1090
1091    @assert_description
1092    def _mmi_252(self, **kwargs):
1093        """
1094        Please send L2CAP Connection Response with Security Blocked to PTS.
1095        """
1096
1097        # TODO
1098
1099        return "OK"
1100
1101    @assert_description
1102    def _mmi_261(self, **kwargs):
1103        """
1104        Please bring IUT to Security Mode 4 level 2. Press OK to continue.
1105        """
1106
1107        # TODO
1108
1109        return "OK"
1110
1111    @assert_description
1112    def _mmi_263(self, **kwargs):
1113        """
1114        Please bring IUT to Security Mode 4 level 4. Press OK to continue.
1115        """
1116
1117        # TODO
1118
1119        return "OK"
1120
1121    @assert_description
1122    def _mmi_264(self, **kwargs):
1123        """
1124        Please send L2CAP Connection Request to PTS.
1125        """
1126
1127        # TODO
1128
1129        return "OK"
1130
1131    @assert_description
1132    def _mmi_265(self, **kwargs):
1133        """
1134        Please initiate a link encryption with the Lower Tester.
1135        """
1136
1137        # TODO
1138
1139        return "OK"
1140
1141    @assert_description
1142    def _mmi_273(self, **kwargs):
1143        """
1144        Please trigger channel creation. Expect to perform link encryption
1145        before channel creation.
1146        """
1147
1148        self.security.Secure(connection=self.connection, classic=LEVEL2)
1149
1150        return "OK"
1151
1152    @assert_description
1153    def _mmi_20001(self, **kwargs):
1154        """
1155        Please prepare IUT into a connectable mode.
1156
1157        Description: Verify that
1158        the Implementation Under Test (IUT) can accept GATT connect request from
1159        PTS.
1160        """
1161
1162        return "OK"
1163
1164    @assert_description
1165    def _mmi_20115(self, **kwargs):
1166        """
1167        Please initiate a GATT disconnection to the PTS.
1168
1169        Description: Verify
1170        that the Implementation Under Test (IUT) can initiate GATT disconnect
1171        request to PTS.
1172        """
1173
1174        # TODO
1175
1176        return "OK"
1177
1178    def _auto_confirm_requests(self, times=None):
1179
1180        def task():
1181            cnt = 0
1182            pairing_events = self.security.OnPairing()
1183            for event in pairing_events:
1184                if event.WhichOneof('method') in {"just_works", "numeric_comparison"}:
1185                    if times is None or cnt < times:
1186                        cnt += 1
1187                        pairing_events.send(PairingEventAnswer(event=event, confirm=True))
1188
1189        Thread(target=task).start()
1190
1191
1192def handle_format(handle):
1193    return hex(handle)[2:].zfill(4)
1194