1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15from threading import Thread 16from mmi2grpc._helpers import assert_description, match_description 17from mmi2grpc._rootcanal import Dongle 18from mmi2grpc._proxy import ProfileProxy 19from time import sleep 20import sys 21 22from pandora_experimental.gatt_grpc import GATT 23from pandora_experimental.gatt_pb2 import GattServiceParams, GattCharacteristicParams 24from pandora.host_grpc import Host 25from pandora.host_pb2 import PUBLIC, RANDOM, DISCOVERABLE_GENERAL, NOT_DISCOVERABLE, DISCOVERABLE_LIMITED, NOT_CONNECTABLE, DataTypes 26from pandora.security_grpc import Security, SecurityStorage 27from pandora.security_pb2 import LEVEL1, LEVEL2, LE_LEVEL3, PairingEventAnswer 28 29 30class GAPProxy(ProfileProxy): 31 32 def __init__(self, channel, rootcanal): 33 super().__init__(channel) 34 self.gatt = GATT(channel) 35 self.host = Host(channel) 36 self.security = Security(channel) 37 self.security_storage = SecurityStorage(channel) 38 self.rootcanal = rootcanal 39 40 self.connection = None 41 self.pairing_events = None 42 self.inquiry_responses = None 43 self.scan_responses = None 44 45 self.counter = 0 46 self.cached_passkey = None 47 48 self._auto_confirm_requests() 49 50 def test_started(self, test: str, description: str, pts_addr: bytes): 51 if test in [ 52 "GAP/CONN/CPUP/BV-06-C", 53 ]: 54 self.rootcanal.select_pts_dongle(Dongle.LAIRD_BL654) 55 else: 56 self.rootcanal.select_pts_dongle(Dongle.CSR_RCK_PTS_DONGLE) 57 58 if test in [ 59 "GAP/DM/LEP/BV-07-C", 60 "GAP/DM/LEP/BV-08-C", 61 "GAP/DM/LEP/BV-11-C", 62 "GAP/MOD/CON/BV-01-C", 63 ]: 64 self.host.SetDiscoverabilityMode(mode=DISCOVERABLE_GENERAL) 65 66 return "OK" 67 68 @match_description 69 def TSC_MMI_iut_send_hci_connect_request(self, test: str, pts_addr: bytes, **kwargs): 70 """ 71 Please send an HCI connect request to establish a basic rate 72 connection( after the IUT discovers the Lower Tester over BR and LE)?. 73 """ 74 75 if test in [ 76 "GAP/IDLE/BON/BV-03-C", 77 "GAP/IDLE/BON/BV-04-C", 78 "GAP/IDLE/BON/BV-05-C", 79 "GAP/IDLE/BON/BV-06-C", 80 "GAP/SEC/AUT/BV-02-C", 81 "GAP/SEC/SEM/BV-05-C", 82 "GAP/SEC/SEM/BV-08-C", 83 "GAP/SEC/SEM/BV-50-C", 84 "GAP/SEC/SEM/BI-27-C", 85 "GAP/SEC/SEM/BI-32-C", 86 "GAP/EST/LIE/BV-02-C", 87 ]: 88 # we connect then pair, so we have to pair directly in this MMI 89 self.pairing_events = self.security.OnPairing() 90 self.connection = self.host.Connect(address=pts_addr).connection 91 else: 92 self.connection = self.host.Connect(address=pts_addr).connection 93 94 return "OK" 95 96 @assert_description 97 def _mmi_222(self, **kwargs): 98 """ 99 Please initiate a BR/EDR security authentication and pairing with 100 interaction of HCI commands. 101 102 Press OK to continue. 103 """ 104 105 # pairing already initiated with Connect() on Android 106 self.pairing_events = self.security.OnPairing() 107 108 return "OK" 109 110 @match_description 111 def _mmi_2001(self, passkey: str, **kwargs): 112 """ 113 Please verify the passKey is correct: (?P<passkey>[0-9]+) 114 """ 115 116 for event in self.pairing_events: 117 assert event.numeric_comparison == int(passkey), (event, passkey) 118 self.pairing_events.send(PairingEventAnswer(event=event, confirm=True)) 119 return "OK" 120 121 assert False, "did not receive expected pairing event" 122 123 @assert_description 124 def TSC_MMI_iut_send_advertising_report_event_connectable_undirected(self, **kwargs): 125 """ 126 Please send a connectable undirected advertising report. 127 """ 128 129 self.advertise = self.host.Advertise( 130 legacy=True, 131 connectable=True, 132 own_address_type=PUBLIC, 133 ) 134 135 self.pairing_events = self.security.OnPairing() 136 137 return "OK" 138 139 @assert_description 140 def TSC_MMI_iut_enter_handle_for_insufficient_authentication(self, pts_addr: bytes, **kwargs): 141 """ 142 Please enter the handle(2 octet) to the characteristic in the IUT 143 database where Insufficient Authentication error will be returned : 144 """ 145 146 response = self.gatt.RegisterService(service=GattServiceParams( 147 uuid="955798ce-3022-455c-b759-ee8edcd73d1a", 148 characteristics=[ 149 GattCharacteristicParams( 150 uuid="cf99ed9b-3c43-4343-b8a7-8afa513752ce", 151 properties=0x02, # PROPERTY_READ, 152 permissions=0x04, # PERMISSION_READ_ENCRYPTED_MITM 153 ), 154 ], 155 )) 156 157 self.pairing_events = self.security.OnPairing() 158 159 return handle_format(response.service.characteristics[0].handle) 160 161 @match_description 162 def TSC_MMI_the_security_id_is(self, pts_addr: bytes, passkey: str, **kwargs): 163 """ 164 The Secure ID is (?P<passkey>[0-9]*) 165 """ 166 167 for event in self.pairing_events: 168 if event.address == pts_addr and event.passkey_entry_request: 169 self.pairing_events.send(PairingEventAnswer(event=event, passkey=int(passkey))) 170 return "OK" 171 172 assert False 173 174 @assert_description 175 def TSC_MMI_iut_send_le_connect_request(self, test: str, pts_addr: bytes, **kwargs): 176 """ 177 Please send an LE connect request to establish a connection. 178 """ 179 180 if test == "GAP/DM/BON/BV-01-C": 181 # we also begin pairing here if we are not already paired on LE 182 if self.counter == 0: 183 self.counter += 1 184 self.security_storage.DeleteBond(public=pts_addr) 185 self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection 186 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 187 return "OK" 188 189 if test == "GAP/SEC/AUT/BV-21-C" and self.connection is not None: 190 # no-op since the peer just disconnected from us, 191 # so we have immediately auto-connected back to it 192 return "OK" 193 194 if test in [ 195 "GAP/CONN/DCEP/BV-03-C", 196 "GAP/CONN/GCEP/BV-02-C", 197 "GAP/DM/LEP/BV-06-C", 198 "GAP/CONN/GCEP/BV-01-C", 199 ]: 200 # PTS is not advertising with the local name, use identity address 201 address = pts_addr 202 else: 203 # the PTS sometimes decides to advertise with an RPA, so we do a scan to find its real address 204 scans = self.host.Scan() 205 for scan in scans: 206 adv_address = scan.public if scan.HasField("public") else scan.random 207 device_name = scan.data.complete_local_name 208 if "pts" in device_name.lower(): 209 address = adv_address 210 scans.cancel() 211 break 212 213 self.pairing_events = self.security.OnPairing() 214 self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=address).connection 215 216 if test in {"GAP/BOND/BON/BV-04-C"}: 217 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 218 219 return "OK" 220 221 @assert_description 222 def TSC_MMI_enter_security_id(self, pts_addr: bytes, **kwargs): 223 """ 224 Please enter Secure Id. 225 """ 226 227 if self.cached_passkey is not None: 228 self.log(f"Returning cached passkey entry {self.cached_passkey}") 229 return str(self.cached_passkey) 230 231 for event in self.pairing_events: 232 if event.address == pts_addr and event.passkey_entry_notification: 233 self.log(f"Got passkey entry {event.passkey_entry_notification}") 234 self.cached_passkey = event.passkey_entry_notification 235 return str(event.passkey_entry_notification) 236 237 assert False 238 239 @match_description 240 def TSC_MMI_iut_send_att_service_request(self, pts_addr: bytes, handle: str, **kwargs): 241 r""" 242 Please send an ATT service request - read or write request with handle 243 (?P<handle>[0-9a-e]+) \(octet\).Discover services if needed. 244 """ 245 246 self.gatt.ReadCharacteristicFromHandle( 247 connection=self.connection, 248 # They want us to read the characteristic value handle using ATT, but the interface only lets us 249 # read the characteristic by its handle. So we offset by one, since in this test the characteristic 250 # value handle is one above the characteristic handle itself. 251 handle=int(handle, base=16) - 1, 252 ) 253 254 return "OK" 255 256 @assert_description 257 def TSC_MMI_iut_send_advertising_report_event_service_uuid(self, **kwargs): 258 """ 259 Please prepare IUT to send an advertising report with Service UUID. 260 """ 261 262 self.advertise = self.host.Advertise( 263 legacy=True, 264 own_address_type=PUBLIC, 265 data=DataTypes(complete_service_class_uuids128=["955798ce-3022-455c-b759-ee8edcd73d1a"],)) 266 return "OK" 267 268 @assert_description 269 def TSC_MMI_iut_send_advertising_report_event_local_name(self, **kwargs): 270 """ 271 Please prepare IUT to send an advertising report with Local Name. 272 """ 273 274 self.advertise = self.host.Advertise(own_address_type=PUBLIC, 275 legacy=True, 276 data=DataTypes( 277 include_complete_local_name=True, 278 include_shortened_local_name=True, 279 )) 280 281 return "OK" 282 283 @assert_description 284 def TSC_MMI_iut_send_advertising_report_event_flags(self, **kwargs): 285 """ 286 Please prepare IUT to send an advertising report with Flags. 287 """ 288 289 self.advertise = self.host.Advertise( 290 legacy=True, 291 connectable=True, 292 own_address_type=PUBLIC, 293 ) 294 295 self.pairing_events = self.security.OnPairing() 296 297 return "OK" 298 299 @assert_description 300 def TSC_MMI_iut_send_advertising_report_event_manufacturer_specific_data(self, **kwargs): 301 """ 302 Please prepare IUT to send an advertising report with Manufacture 303 Specific Data. 304 """ 305 306 self.advertise = self.host.Advertise(own_address_type=PUBLIC, 307 legacy=True, 308 data=DataTypes(manufacturer_specific_data=b"d0n't b3 3v1l!",)) 309 310 return "OK" 311 312 @assert_description 313 def TSC_MMI_iut_send_advertising_report_event_tx_power_level(self, **kwargs): 314 """ 315 Please prepare IUT to send an advertising report with TX Power Level. 316 """ 317 318 self.advertise = self.host.Advertise(legacy=True, 319 own_address_type=PUBLIC, 320 data=DataTypes(include_tx_power_level=True,)) 321 322 return "OK" 323 324 @assert_description 325 def TSC_MMI_iut_send_advertising_report_event_connectable(self, **kwargs): 326 """ 327 Please send a connectable advertising report. 328 """ 329 330 self.advertise = self.host.Advertise( 331 legacy=True, 332 own_address_type=PUBLIC, 333 connectable=True, 334 ) 335 336 self.pairing_events = self.security.OnPairing() 337 338 return "OK" 339 340 @assert_description 341 def TSC_MMI_iut_send_advertising_report_event_ADV_IND(self, **kwargs): 342 """ 343 Please send connectable undirected advertising report. 344 """ 345 346 self.advertise = self.host.Advertise( 347 legacy=True, 348 own_address_type=PUBLIC, 349 connectable=True, 350 ) 351 352 self.pairing_events = self.security.OnPairing() 353 354 return "OK" 355 356 @assert_description 357 def TSC_MMI_iut_confirm_idle_mode_security_4(self, **kwargs): 358 """ 359 Please confirm that IUT is in Idle mode with security mode 4. Press OK 360 when IUT is ready to start device discovery. 361 """ 362 363 return "OK" 364 365 @match_description 366 def TSC_MMI_iut_start_general_inquiry_found(self, pts_addr: bytes, **kwargs): 367 """ 368 Please start general inquiry. Click 'Yes' If IUT does discovers PTS and 369 ready for PTS to initiate (a|LE) create connection otherwise click 'No'. 370 """ 371 372 inquiry_responses = self.host.Inquiry() 373 for response in inquiry_responses: 374 assert response.address == pts_addr, (response.address, pts_addr) 375 inquiry_responses.cancel() 376 return "Yes" 377 378 assert False 379 380 @assert_description 381 def TSC_MMI_iut_send_att_read_by_type_request_name_request(self, pts_addr: bytes, **kwargs): 382 """ 383 Please start the Name Discovery Procedure to retrieve Device Name from 384 the PTS. 385 """ 386 387 # Android does RNR when connecting for the first time 388 self.connection = self.host.Connect(address=pts_addr).connection 389 390 return "OK" 391 392 @match_description 393 def TSC_MMI_iut_confirm_device_discovery(self, test: str, pts_addr: bytes, **kwargs): 394 """ 395 Please confirm that IUT has discovered PTS and retrieved its name '?(?P<name>[a-zA-Z\-0-9]*)'?\.? 396 """ 397 #Verifying if the BD Address matches in Inquiry 398 inquiry_responses = self.host.Inquiry() 399 for response in inquiry_responses: 400 assert response.address == pts_addr, (response.address, pts_addr) 401 inquiry_responses.cancel() 402 return "Yes" 403 404 assert False 405 406 @assert_description 407 def TSC_MMI_check_if_iut_support_non_connectable_advertising(self, **kwargs): 408 """ 409 Does the IUT have an ability to send non-connectable advertising report? 410 """ 411 412 return "Yes" 413 414 @assert_description 415 def TSC_MMI_iut_send_advertising_report_event_general_discoverable_ok_to_continue(self, **kwargs): 416 """ 417 Please prepare IUT into general discoverable mode and send an 418 advertising report. Press OK to continue. 419 """ 420 421 self.advertise = self.host.Advertise( 422 legacy=True, 423 data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL), 424 own_address_type=PUBLIC, 425 connectable=True, 426 ) 427 428 self.pairing_events = self.security.OnPairing() 429 430 return "OK" 431 432 @assert_description 433 def TSC_MMI_iut_send_advertising_report_event_general_discoverable_0203(self, **kwargs): 434 """ 435 Please prepare IUT into general discoverable mode and send an 436 advertising report using either non - directed advertising or 437 discoverable undirected advertising. 438 """ 439 440 self.advertise = self.host.Advertise( 441 legacy=True, 442 data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL), 443 own_address_type=PUBLIC, 444 connectable=True, 445 ) 446 447 self.pairing_events = self.security.OnPairing() 448 449 return "OK" 450 451 @assert_description 452 def TSC_MMI_iut_enter_undirected_connectable_mode_non_discoverable_mode(self, **kwargs): 453 """ 454 Please prepare IUT into non-discoverable mode and send an advertising 455 report using connectable undirected advertising. 456 """ 457 458 self.advertise = self.host.Advertise( 459 legacy=True, 460 data=DataTypes(le_discoverability_mode=NOT_DISCOVERABLE), 461 own_address_type=PUBLIC, 462 connectable=True, 463 ) 464 465 return "OK" 466 467 def TSC_MMI_iut_send_advertising_report_event_general_discoverable_00(self, **kwargs): 468 """ 469 Please prepare IUT into general discoverable mode and send an 470 advertising report using connectable undirected advertising. 471 """ 472 473 self.advertise = self.host.Advertise( 474 legacy=True, 475 data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL), 476 own_address_type=PUBLIC, 477 connectable=True, 478 ) 479 480 return "OK" 481 482 @assert_description 483 def TSC_MMI_iut_start_general_discovery(self, **kwargs): 484 """ 485 Please start General Discovery. Press OK to continue. 486 """ 487 488 self.scan_responses = self.host.Scan() 489 490 return "OK" 491 492 @assert_description 493 def TSC_MMI_iut_start_limited_discovery(self, **kwargs): 494 """ 495 Please start Limited Discovery. Press OK to continue. 496 """ 497 498 self.scan_responses = self.host.Scan() 499 500 return "OK" 501 502 @assert_description 503 def TSC_MMI_iut_confirm_general_discovered_device(self, pts_addr: bytes, **kwargs): 504 """ 505 Please confirm that PTS is discovered. 506 """ 507 508 for response in self.scan_responses: 509 assert response.HasField("public") 510 # General Discoverability shall be able to check both limited and general advertising 511 if response.public == pts_addr: 512 self.scan_responses.cancel() 513 return "OK" 514 515 assert False 516 517 @assert_description 518 def TSC_MMI_iut_confirm_limited_discovered_device(self, pts_addr: bytes, **kwargs): 519 """ 520 Please confirm that PTS is discovered. 521 """ 522 523 for response in self.scan_responses: 524 assert response.HasField("public") 525 if (response.public == pts_addr and response.data.le_discoverability_mode == DISCOVERABLE_LIMITED): 526 self.scan_responses.cancel() 527 return "OK" 528 529 assert False 530 531 @assert_description 532 def TSC_MMI_iut_confirm_general_discovered_device_not_found(self, pts_addr: bytes, **kwargs): 533 """ 534 Please confirm that PTS is NOT discovered. 535 """ 536 537 discovered = False 538 539 def search(): 540 nonlocal discovered 541 for response in self.scan_responses: 542 assert response.HasField("public") 543 if (response.public == pts_addr and response.data.le_discoverability_mode == DISCOVERABLE_GENERAL): 544 self.scan_responses.cancel() 545 discovered = True 546 return 547 548 # search for five seconds, if we don't find anything, give up 549 worker = Thread(target=search) 550 worker.start() 551 worker.join(timeout=5) 552 553 assert not discovered 554 555 return "OK" 556 557 @assert_description 558 def TSC_MMI_iut_confirm_limited_discovered_device_not_found(self, pts_addr: bytes, **kwargs): 559 """ 560 Please confirm that PTS is NOT discovered. 561 """ 562 563 discovered = False 564 565 def search(): 566 nonlocal discovered 567 for response in self.scan_responses: 568 assert response.HasField("public") 569 if (response.public == pts_addr and response.data.le_discoverability_mode == DISCOVERABLE_LIMITED): 570 self.inquiry_responses.cancel() 571 discovered = True 572 return 573 574 # search for five seconds, if we don't find anything, give up 575 worker = Thread(target=search) 576 worker.start() 577 worker.join(timeout=5) 578 579 assert not discovered 580 581 return "OK" 582 583 @assert_description 584 def TSC_MMI_iut_send_advertising_report_event_non_discoverable(self, **kwargs): 585 """ 586 Please prepare IUT into non-discoverable and non-connectable mode and 587 send an advertising report. 588 """ 589 590 self.advertise = self.host.Advertise( 591 legacy=True, 592 own_address_type=PUBLIC, 593 ) 594 595 return "OK" 596 597 @assert_description 598 def TSC_MMI_set_iut_in_bondable_mode(self, **kwargs): 599 """ 600 Please set IUT into bondable mode. Press OK to continue. 601 """ 602 603 return "OK" 604 605 @assert_description 606 def TSC_MMI_iut_send_le_disconnect_request(self, test: str, pts_addr: bytes, **kwargs): 607 """ 608 Please send a disconnect request to terminate connection. 609 """ 610 if test == "GAP/CONN/TERM/BV-01-C": 611 self.connection = next(self.advertise).connection 612 try: 613 self.host.Disconnect(connection=self.connection) 614 except Exception: 615 pass 616 617 return "OK" 618 619 @match_description 620 def TSC_MMI_iut_start_bonding_procedure_bondable(self, test: str, pts_addr: bytes, **kwargs): 621 """ 622 (Please start the Bonding Procedure in bondable mode.|Please configure the IUT into LE Security and start pairing process.) 623 """ 624 625 if not self.pairing_events: 626 self.pairing_events = self.security.OnPairing() 627 628 if not self.connection: 629 self.connection = next(self.advertise).connection 630 631 if test == "GAP/DM/BON/BV-01-C": 632 # we already started in the previous test 633 return "OK" 634 635 if test not in {"GAP/SEC/AUT/BV-21-C"}: 636 self.security_storage.DeleteBond(public=pts_addr) 637 638 if test in ["GAP/SEC/SEM/BV-53-C"]: 639 self.security.Secure(connection=self.connection, classic=LEVEL1) 640 else: 641 642 def secure(): 643 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 644 645 Thread(target=secure).start() 646 647 return "OK" 648 649 @assert_description 650 def TSC_MMI_make_iut_connectable(self, **kwargs): 651 """ 652 Please make IUT connectable. Press OK to continue. 653 """ 654 655 return "OK" 656 657 @assert_description 658 def TSC_MMI_iut_start_general_discovery_DM(self, pts_addr: bytes, **kwargs): 659 """ 660 Please start general discovery over BR/EDR and over LE. If IUT discovers 661 PTS with both BR/EDR and LE method, press OK. 662 """ 663 664 discovered_bredr = False 665 666 def search_bredr(): 667 nonlocal discovered_bredr 668 inquiry_responses = self.host.Inquiry() 669 for response in inquiry_responses: 670 if response.address == pts_addr: 671 inquiry_responses.cancel() 672 discovered_bredr = True 673 return 674 675 bredr_worker = Thread(target=search_bredr) 676 bredr_worker.start() 677 678 discovered_le = False 679 680 def search_le(): 681 nonlocal discovered_le 682 scan_responses = self.host.Scan() 683 for event in scan_responses: 684 address = event.public if event.HasField("public") else event.random 685 if (address == pts_addr and event.data.le_discoverability_mode): 686 scan_responses.cancel() 687 discovered_le = True 688 return 689 690 le_worker = Thread(target=search_le) 691 le_worker.start() 692 693 # search for five seconds, if we don't find anything, give up 694 bredr_worker.join(timeout=5) 695 le_worker.join(timeout=5) 696 697 assert discovered_bredr and discovered_le, (discovered_bredr, discovered_le) 698 699 return "OK" 700 701 def TSC_MMI_make_iut_general_discoverable(self, test: str, **kwargs): 702 """ 703 Please make IUT general discoverable. Press OK to continue. 704 """ 705 706 self.host.SetDiscoverabilityMode(mode=DISCOVERABLE_GENERAL) 707 708 self.advertise = self.host.Advertise( 709 legacy=True, 710 data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL), 711 own_address_type=PUBLIC, 712 connectable=True, 713 ) 714 715 if test in [ 716 "GAP/SEC/SEM/BI-31-C", 717 ]: 718 self.pairing_events = self.security.OnPairing() 719 720 return "OK" 721 722 @assert_description 723 def TSC_MMI_iut_start_basic_rate_name_discovery_DM(self, pts_addr: bytes, **kwargs): 724 """ 725 Please start device name discovery over BR/EDR . If IUT discovers PTS, 726 press OK to continue. 727 """ 728 729 inquiry_responses = self.host.Inquiry() 730 for response in inquiry_responses: 731 if response.address == pts_addr: 732 inquiry_responses.cancel() 733 return "OK" 734 735 assert False 736 737 @assert_description 738 def TSC_MMI_make_iut_not_connectable(self, **kwargs): 739 """ 740 Please make IUT not connectable. Press OK to continue. 741 """ 742 743 self.host.SetDiscoverabilityMode(mode=NOT_DISCOVERABLE) 744 self.host.SetConnectabilityMode(mode=NOT_CONNECTABLE) 745 746 return "OK" 747 748 @assert_description 749 def TSC_MMI_make_iut_not_discoverable(self, **kwargs): 750 """ 751 Please make IUT not discoverable. Press OK to continue. 752 """ 753 754 self.host.SetDiscoverabilityMode(mode=NOT_DISCOVERABLE) 755 self.host.SetConnectabilityMode(mode=NOT_CONNECTABLE) 756 757 return "OK" 758 759 @assert_description 760 def TSC_MMI_press_ok_to_disconnect(self, test: str, pts_addr: bytes, **kwargs): 761 """ 762 Please press ok to disconnect the link. 763 """ 764 765 return "OK" 766 767 @assert_description 768 def TSC_MMI_iut_send_att_disconnect_request(self, **kwargs): 769 """ 770 Please send an ATT disconnect request to terminate an L2CAP channel. 771 """ 772 773 try: 774 self.host.Disconnect(connection=self.connection) 775 except Exception: 776 # we already disconnected, no-op 777 pass 778 779 return "OK" 780 781 @assert_description 782 def TSC_MMI_iut_start_bonding_procedure_non_bondable(self, pts_addr: bytes, **kwargs): 783 """ 784 Please start the Bonding Procedure in non-bondable mode. 785 """ 786 787 # No idea how we can bond in non-bondable mode, but this passes the tests... 788 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 789 790 return "OK" 791 792 @assert_description 793 def TSC_MMI_iut_send_le_connection_update_request_timeout(self, **kwargs): 794 """ 795 Please send an L2CAP Connection Parameter Update request using valid 796 parameters and wait for TSPX_iut_connection_parameter_timeout 30000ms 797 timeout... 798 """ 799 800 return "OK" 801 802 @assert_description 803 def TSC_MMI_iut_perform_direct_connection_establishment_procedure(self, **kwargs): 804 """ 805 Please prepare IUT into the Direct Connection Establishment Procedure. 806 """ 807 808 return "OK" 809 810 @assert_description 811 def TSC_MMI_iut_perform_general_connection_establishment_procedure(self, **kwargs): 812 """ 813 Please prepare IUT into the General Connection Establishment Procedure. 814 Press ok to continue. 815 """ 816 817 return "OK" 818 819 @match_description 820 def TSC_MMI_iut_enter_non_connectable_mode(self, **kwargs): 821 """ 822 Please enter (Non-Connectable|non connectable) mode( and genrate advertising report event)?. 823 """ 824 825 self.advertise = self.host.Advertise( 826 legacy=True, 827 data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL), 828 own_address_type=PUBLIC, 829 connectable=False, 830 ) 831 832 return "OK" 833 834 @assert_description 835 def TSC_MMI_iut_enter_non_connectable_mode_general_discoverable_mode(self, **kwargs): 836 """ 837 Please enter General Discoverable and Non-Connectable mode. 838 """ 839 840 self.host.SetDiscoverabilityMode(mode=DISCOVERABLE_GENERAL) 841 self.host.SetConnectabilityMode(mode=NOT_CONNECTABLE) 842 843 return "OK" 844 845 @assert_description 846 def TSC_MMI_iut_send_advertising_report_event_0203_general_discoverable(self, **kwargs): 847 """ 848 Please send non-connectable undirected advertising report or 849 discoverable undirected advertising report with general discoverable 850 flags turned on. 851 """ 852 853 self.advertise = self.host.Advertise( 854 legacy=True, 855 data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL), 856 own_address_type=PUBLIC, 857 connectable=False, 858 ) 859 860 return "OK" 861 862 @assert_description 863 def TSC_MMI_iut_send_advertising_report_event_non_discoverable_and_undirected_connectable(self, **kwargs): 864 """ 865 Please prepare IUT into non-discoverable and connectable mode and send 866 an advertising report. 867 """ 868 869 self.advertise = self.host.Advertise( 870 legacy=True, 871 data=DataTypes(le_discoverability_mode=NOT_DISCOVERABLE), 872 own_address_type=PUBLIC, 873 connectable=True, 874 ) 875 876 return "OK" 877 878 @assert_description 879 def TSC_MMI_iut_enter_undirected_connectable_mode_general_discoverable_mode(self, **kwargs): 880 """ 881 Please prepare IUT into general discoverable mode and send an 882 advertising report using connectable undirected advertising. 883 """ 884 885 self.advertise = self.host.Advertise( 886 legacy=True, 887 data=DataTypes(le_discoverability_mode=DISCOVERABLE_GENERAL), 888 own_address_type=PUBLIC, 889 connectable=True, 890 ) 891 892 return "OK" 893 894 @assert_description 895 def TSC_MMI_wait_for_encryption_change_event(self, **kwargs): 896 """ 897 Waiting for HCI_ENCRYPTION_CHANGE_EVENT... 898 """ 899 900 return "OK" 901 902 @assert_description 903 def TSC_MMI_iut_enter_security_mode_4(self, **kwargs): 904 """ 905 Please order the IUT to go in connectable mode and in security mode 4. 906 Press OK to continue. 907 """ 908 909 self.pairing_events = self.security.OnPairing() 910 911 return "OK" 912 913 @assert_description 914 def _mmi_251(self, **kwargs): 915 """ 916 Please send L2CAP Connection Response to PTS. 917 """ 918 919 return "OK" 920 921 @assert_description 922 def _mmi_230(self, **kwargs): 923 """ 924 Please order the IUT to be in security mode 4. Press OK to make 925 connection to Lower Tester. 926 """ 927 928 return "OK" 929 930 @assert_description 931 def TSC_MMI_iut_start_simple_pairing(self, pts_addr: bytes, **kwargs): 932 """ 933 Please start simple pairing procedure. 934 """ 935 936 # we have always started this already in the connection, so no-op 937 938 return "OK" 939 940 def TSC_MMI_iut_send_l2cap_connect_request(self, pts_addr: bytes, **kwargs): 941 """ 942 Please initiate BR/EDR security authentication and pairing to establish 943 a service level enforced security! 944 After that, please create the service 945 channel using L2CAP Connection Request. 946 947 Press OK to continue. 948 """ 949 950 def after_that(): 951 sleep(5) 952 self.host.Connect(address=pts_addr) 953 954 Thread(target=after_that).start() 955 956 return "OK" 957 958 @assert_description 959 def TSC_MMI_iut_confirm_lost_bond(self, **kwargs): 960 """ 961 Please confirm that IUT has informed of a lost bond. 962 """ 963 964 return "OK" 965 966 @assert_description 967 def TSC_MMI_iut_send_att_connect_request(self, test: str, pts_addr: bytes, **kwargs): 968 """ 969 Please send an ATT connect request to establish an L2CAP channel. 970 """ 971 self.connection = self.host.ConnectLE(own_address_type=RANDOM, public=pts_addr).connection 972 973 return "OK" 974 975 @match_description 976 def TSC_MMI_iut_send_ll_connection_update_request(self, **kwargs): 977 """ 978 Please send a LL Connection Parameter Update request using valid 979 parameters. 980 (With 0x0032 value set in TSPX_conn_update_int_min 981 0x0046 982 value set in TSPX_conn_update_int_max 983 0x0001 value set in 984 TSPX_conn_update_peripheral_latency and 985 0x01F4 value set in 986 TSPX_conn_update_supervision_timeout)? 987 """ 988 989 return "OK" 990 991 @assert_description 992 def TSC_MMI_iut_send_le_connection_update_request(self, **kwargs): 993 """ 994 Please start a Connection Update procedure using valid parameters. 995 With 0x0032 value set in TSPX_conn_update_int_min 996 0x0046 value set in 997 TSPX_conn_update_int_max 998 0x0001 value set in 999 TSPX_conn_update_peripheral_latency and 1000 0x01F4 value set in 1001 TSPX_conn_update_supervision_timeout 1002 """ 1003 1004 return "OK" 1005 1006 @assert_description 1007 def TSC_MMI_iut_enter_handle_for_insufficient_authentication_or_insufficient_encryption(self, **kwargs): 1008 """ 1009 Please enter the handle to the characteristic in the IUT database where 1010 Insufficient Authentication or Insufficient Encryption error will be 1011 returned: 1012 """ 1013 1014 response = self.gatt.RegisterService(service=GattServiceParams( 1015 uuid="955798ce-3022-455c-b759-ee8edcd73d1a", 1016 characteristics=[ 1017 GattCharacteristicParams( 1018 uuid="cf99ed9b-3c43-4343-b8a7-8afa513752ce", 1019 properties=0x02, # PROPERTY_READ, 1020 permissions=0x04, # PERMISSION_READ_ENCRYPTED_MITM 1021 ), 1022 ], 1023 )) 1024 1025 self.pairing_events = self.security.OnPairing() 1026 1027 return handle_format(response.service.characteristics[0].handle) 1028 1029 @assert_description 1030 def TSC_MMI_iut_remove_bonding(self, pts_addr: bytes, **kwargs): 1031 """ 1032 Please have Upper Tester remove the bonding information of the PTS. 1033 Press OK to continue. 1034 """ 1035 1036 self.security_storage.DeleteBond(public=pts_addr) 1037 1038 return "OK" 1039 1040 @assert_description 1041 def _mmi_231(self, test: str, pts_addr: bytes, **kwargs): 1042 """ 1043 Please start the Bonding Procedure in bondable mode. 1044 After Bonding 1045 Procedure is completed, please send a disconnect request to terminate 1046 connection. 1047 """ 1048 1049 if test != "GAP/SEC/SEM/BV-08-C": 1050 # we already started in the Connect MMI 1051 self.pairing_events = self.security.OnPairing() 1052 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 1053 1054 def after_that(): 1055 self.host.WaitConnection() # this really waits for bonding 1056 sleep(1) 1057 self.host.Disconnect(connection=self.connection) 1058 1059 Thread(target=after_that).start() 1060 1061 return "OK" 1062 1063 @match_description 1064 def TSC_MMI_helper_do_not_find_confirm(self, pts_addr: bytes, passkey: str, **kwargs): 1065 """ 1066 Please confirm the following number matches IUT: (?P<passkey>[0-9]+). 1067 """ 1068 1069 for event in self.pairing_events: 1070 if event.address == pts_addr and event.numeric_comparison == int(passkey): 1071 self.pairing_events.send(PairingEventAnswer(event=event, confirm=True)) 1072 return "OK" 1073 1074 assert False 1075 1076 @assert_description 1077 def _mmi_208(self, **kwargs): 1078 """ 1079 Please configure the IUT into LE Security Mode 1 Level 4 and start 1080 pairing process. 1081 """ 1082 1083 def secure(): 1084 self.pairing_events = self.security.OnPairing() 1085 self.security.Secure(connection=self.connection, le=LE_LEVEL3) 1086 1087 Thread(target=secure).start() 1088 1089 return "OK" 1090 1091 @assert_description 1092 def _mmi_252(self, **kwargs): 1093 """ 1094 Please send L2CAP Connection Response with Security Blocked to PTS. 1095 """ 1096 1097 # TODO 1098 1099 return "OK" 1100 1101 @assert_description 1102 def _mmi_261(self, **kwargs): 1103 """ 1104 Please bring IUT to Security Mode 4 level 2. Press OK to continue. 1105 """ 1106 1107 # TODO 1108 1109 return "OK" 1110 1111 @assert_description 1112 def _mmi_263(self, **kwargs): 1113 """ 1114 Please bring IUT to Security Mode 4 level 4. Press OK to continue. 1115 """ 1116 1117 # TODO 1118 1119 return "OK" 1120 1121 @assert_description 1122 def _mmi_264(self, **kwargs): 1123 """ 1124 Please send L2CAP Connection Request to PTS. 1125 """ 1126 1127 # TODO 1128 1129 return "OK" 1130 1131 @assert_description 1132 def _mmi_265(self, **kwargs): 1133 """ 1134 Please initiate a link encryption with the Lower Tester. 1135 """ 1136 1137 # TODO 1138 1139 return "OK" 1140 1141 @assert_description 1142 def _mmi_273(self, **kwargs): 1143 """ 1144 Please trigger channel creation. Expect to perform link encryption 1145 before channel creation. 1146 """ 1147 1148 self.security.Secure(connection=self.connection, classic=LEVEL2) 1149 1150 return "OK" 1151 1152 @assert_description 1153 def _mmi_20001(self, **kwargs): 1154 """ 1155 Please prepare IUT into a connectable mode. 1156 1157 Description: Verify that 1158 the Implementation Under Test (IUT) can accept GATT connect request from 1159 PTS. 1160 """ 1161 1162 return "OK" 1163 1164 @assert_description 1165 def _mmi_20115(self, **kwargs): 1166 """ 1167 Please initiate a GATT disconnection to the PTS. 1168 1169 Description: Verify 1170 that the Implementation Under Test (IUT) can initiate GATT disconnect 1171 request to PTS. 1172 """ 1173 1174 # TODO 1175 1176 return "OK" 1177 1178 def _auto_confirm_requests(self, times=None): 1179 1180 def task(): 1181 cnt = 0 1182 pairing_events = self.security.OnPairing() 1183 for event in pairing_events: 1184 if event.WhichOneof('method') in {"just_works", "numeric_comparison"}: 1185 if times is None or cnt < times: 1186 cnt += 1 1187 pairing_events.send(PairingEventAnswer(event=event, confirm=True)) 1188 1189 Thread(target=task).start() 1190 1191 1192def handle_format(handle): 1193 return hex(handle)[2:].zfill(4) 1194