1#!/usr/bin/python3.4
2#
3#   Copyright 2017 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import time
19
20from acts import asserts
21from acts.test_decorators import test_tracker_info
22from acts_contrib.test_utils.wifi.aware import aware_const as aconsts
23from acts_contrib.test_utils.wifi.aware import aware_test_utils as autils
24from acts_contrib.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest
25from acts_contrib.test_utils.wifi.rtt import rtt_const as rconsts
26from acts_contrib.test_utils.wifi.rtt import rtt_test_utils as rutils
27from acts_contrib.test_utils.wifi.rtt.RttBaseTest import RttBaseTest
28
29
30class RangeAwareTest(AwareBaseTest, RttBaseTest):
31    """Test class for RTT ranging to Wi-Fi Aware peers"""
32    SERVICE_NAME = "GoogleTestServiceXY"
33
34    # Number of RTT iterations
35    NUM_ITER = 10
36
37    # Time gap (in seconds) between iterations
38    TIME_BETWEEN_ITERATIONS = 0
39
40    # Time gap (in seconds) when switching between Initiator and Responder
41    TIME_BETWEEN_ROLES = 4
42
43    # Min and max of the test subscribe range.
44    DISTANCE_MIN = 0
45    DISTANCE_MAX = 1000000
46
47    def setup_test(self):
48        """Manual setup here due to multiple inheritance: explicitly execute the
49        setup method from both parents.
50        """
51        AwareBaseTest.setup_test(self)
52        RttBaseTest.setup_test(self)
53
54    def teardown_test(self):
55        """Manual teardown here due to multiple inheritance: explicitly execute the
56        teardown method from both parents.
57        """
58        AwareBaseTest.teardown_test(self)
59        RttBaseTest.teardown_test(self)
60
61    #############################################################################
62
63    def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None):
64        """Perform single RTT measurement, using Aware, from the Initiator DUT to
65        a Responder. The RTT Responder can be specified using its MAC address
66        (obtained using out- of-band discovery) or its Peer ID (using Aware
67        discovery).
68
69        Args:
70              init_dut: RTT Initiator device
71              resp_mac: MAC address of the RTT Responder device
72              resp_peer_id: Peer ID of the RTT Responder device
73        """
74        asserts.assert_true(
75            resp_mac is not None or resp_peer_id is not None,
76            "One of the Responder specifications (MAC or Peer ID)"
77            " must be provided!")
78        if resp_mac is not None:
79            id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac)
80        else:
81            id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id)
82        event_name = rutils.decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT,
83                                           id)
84        try:
85            event = init_dut.ed.pop_event(event_name, rutils.EVENT_TIMEOUT)
86            result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0]
87            if resp_mac is not None:
88                rutils.validate_aware_mac_result(result, resp_mac, "DUT")
89            else:
90                rutils.validate_aware_peer_id_result(result, resp_peer_id,
91                                                     "DUT")
92            return result
93        except queue.Empty:
94            self.log.warning("Timed-out waiting for %s", event_name)
95            return None
96
97    def run_rtt_ib_discovery_set(self, do_both_directions, iter_count,
98                                 time_between_iterations, time_between_roles):
99        """Perform a set of RTT measurements, using in-band (Aware) discovery.
100
101        Args:
102              do_both_directions: False - perform all measurements in one direction,
103                                  True - perform 2 measurements one in both directions.
104              iter_count: Number of measurements to perform.
105              time_between_iterations: Number of seconds to wait between iterations.
106              time_between_roles: Number of seconds to wait when switching between
107                                  Initiator and Responder roles (only matters if
108                                  do_both_directions=True).
109
110        Returns: a list of the events containing the RTT results (or None for a
111        failed measurement). If both directions are tested then returns a list of
112        2 elements: one set for each direction.
113        """
114        p_dut = self.android_devices[0]
115        s_dut = self.android_devices[1]
116
117        (p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub,
118         peer_id_on_pub) = autils.create_discovery_pair(
119             p_dut,
120             s_dut,
121             p_config=autils.add_ranging_to_pub(
122                 autils.create_discovery_config(
123                     self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED),
124                 True),
125             s_config=autils.add_ranging_to_sub(
126                 autils.create_discovery_config(
127                     self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE),
128                 self.DISTANCE_MIN, self.DISTANCE_MAX),
129             device_startup_offset=self.device_startup_offset,
130             msg_id=self.get_next_msg_id())
131
132        resultsPS = []
133        resultsSP = []
134        for i in range(iter_count):
135            if i != 0 and time_between_iterations != 0:
136                time.sleep(time_between_iterations)
137
138            # perform RTT from pub -> sub
139            resultsPS.append(
140                self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub))
141
142            if do_both_directions:
143                if time_between_roles != 0:
144                    time.sleep(time_between_roles)
145
146                # perform RTT from sub -> pub
147                resultsSP.append(
148                    self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub))
149
150        return resultsPS if not do_both_directions else [resultsPS, resultsSP]
151
152    def run_rtt_oob_discovery_set(self, do_both_directions, iter_count,
153                                  time_between_iterations, time_between_roles):
154        """Perform a set of RTT measurements, using out-of-band discovery.
155
156        Args:
157              do_both_directions: False - perform all measurements in one direction,
158                                  True - perform 2 measurements one in both directions.
159              iter_count: Number of measurements to perform.
160              time_between_iterations: Number of seconds to wait between iterations.
161              time_between_roles: Number of seconds to wait when switching between
162                                  Initiator and Responder roles (only matters if
163                                  do_both_directions=True).
164              enable_ranging: True to enable Ranging, False to disable.
165
166        Returns: a list of the events containing the RTT results (or None for a
167        failed measurement). If both directions are tested then returns a list of
168        2 elements: one set for each direction.
169        """
170        dut0 = self.android_devices[0]
171        dut1 = self.android_devices[1]
172
173        id0, mac0 = autils.attach_with_identity(dut0)
174        id1, mac1 = autils.attach_with_identity(dut1)
175
176        # wait for for devices to synchronize with each other - there are no other
177        # mechanisms to make sure this happens for OOB discovery (except retrying
178        # to execute the data-path request)
179        time.sleep(autils.WAIT_FOR_CLUSTER)
180
181        # start publisher(s) on the Responder(s) with ranging enabled
182        p_config = autils.add_ranging_to_pub(
183            autils.create_discovery_config(self.SERVICE_NAME,
184                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
185            enable_ranging=True)
186        dut1.droid.wifiAwarePublish(id1, p_config)
187        autils.wait_for_event(dut1, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
188        if do_both_directions:
189            dut0.droid.wifiAwarePublish(id0, p_config)
190            autils.wait_for_event(dut0, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
191
192        results01 = []
193        results10 = []
194        for i in range(iter_count):
195            if i != 0 and time_between_iterations != 0:
196                time.sleep(time_between_iterations)
197
198            # perform RTT from dut0 -> dut1
199            results01.append(self.run_rtt_discovery(dut0, resp_mac=mac1))
200
201            if do_both_directions:
202                if time_between_roles != 0:
203                    time.sleep(time_between_roles)
204
205                # perform RTT from dut1 -> dut0
206                results10.append(self.run_rtt_discovery(dut1, resp_mac=mac0))
207
208        return results01 if not do_both_directions else [results01, results10]
209
210    def verify_results(self, results, results_reverse_direction=None, accuracy_evaluation=False):
211        """Verifies the results of the RTT experiment.
212
213        Args:
214              results: List of RTT results.
215              results_reverse_direction: List of RTT results executed in the
216                                        reverse direction. Optional.
217              accuracy_evaluation: False - only evaluate success rate.
218                                   True - evaluate both success rate and accuracy
219                                   default is False.
220        """
221        stats = rutils.extract_stats(results, self.rtt_reference_distance_mm,
222                                     self.rtt_reference_distance_margin_mm,
223                                     self.rtt_min_expected_rssi_dbm)
224        stats_reverse_direction = None
225        if results_reverse_direction is not None:
226            stats_reverse_direction = rutils.extract_stats(
227                results_reverse_direction, self.rtt_reference_distance_mm,
228                self.rtt_reference_distance_margin_mm,
229                self.rtt_min_expected_rssi_dbm)
230        self.log.debug("Stats: %s", stats)
231        if stats_reverse_direction is not None:
232            self.log.debug("Stats in reverse direction: %s",
233                           stats_reverse_direction)
234
235        extras = stats if stats_reverse_direction is None else {
236            "forward": stats,
237            "reverse": stats_reverse_direction
238        }
239
240        asserts.assert_true(
241            stats['num_no_results'] == 0,
242            "Missing (timed-out) results",
243            extras=extras)
244        asserts.assert_false(
245            stats['any_lci_mismatch'], "LCI mismatch", extras=extras)
246        asserts.assert_false(
247            stats['any_lcr_mismatch'], "LCR mismatch", extras=extras)
248        asserts.assert_false(
249            stats['invalid_num_attempted'],
250            "Invalid (0) number of attempts",
251            extras=stats)
252        asserts.assert_false(
253            stats['invalid_num_successful'],
254            "Invalid (0) number of successes",
255            extras=stats)
256        asserts.assert_equal(
257            stats['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
258        asserts.assert_true(
259            stats['num_failures'] <=
260            self.rtt_max_failure_rate_two_sided_rtt_percentage *
261            stats['num_results'] / 100,
262            "Failure rate is too high",
263            extras=extras)
264        if accuracy_evaluation:
265            asserts.assert_true(
266                stats['num_range_out_of_margin'] <=
267                self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage *
268                stats['num_success_results'] / 100,
269                "Results exceeding error margin rate is too high",
270                extras=extras)
271
272        if stats_reverse_direction is not None:
273            asserts.assert_true(
274                stats_reverse_direction['num_no_results'] == 0,
275                "Missing (timed-out) results",
276                extras=extras)
277            asserts.assert_false(
278                stats_reverse_direction['any_lci_mismatch'], "LCI mismatch", extras=extras)
279            asserts.assert_false(
280                stats_reverse_direction['any_lcr_mismatch'], "LCR mismatch", extras=extras)
281            asserts.assert_equal(
282                stats_reverse_direction['num_invalid_rssi'], 0, "Invalid RSSI", extras=extras)
283            asserts.assert_true(
284                stats_reverse_direction['num_failures'] <=
285                self.rtt_max_failure_rate_two_sided_rtt_percentage *
286                stats_reverse_direction['num_results'] / 100,
287                "Failure rate is too high",
288                extras=extras)
289            if accuracy_evaluation:
290                asserts.assert_true(
291                    stats_reverse_direction['num_range_out_of_margin'] <=
292                    self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage *
293                    stats_reverse_direction['num_success_results'] / 100,
294                    "Results exceeding error margin rate is too high",
295                    extras=extras)
296
297        asserts.explicit_pass("RTT Aware test done", extras=extras)
298
299    def run_rtt_with_aware_session_disabled_ranging(self, disable_publish):
300        """Try to perform RTT operation when there publish or subscribe disabled ranging.
301
302        Args:
303            disable_publish: if true disable ranging on publish config, otherwise disable ranging on
304                            subscribe config
305        """
306        p_dut = self.android_devices[0]
307        s_dut = self.android_devices[1]
308        pub_config = autils.create_discovery_config(
309            self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED)
310        sub_config = autils.create_discovery_config(
311            self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE)
312        if disable_publish:
313            sub_config = autils.add_ranging_to_sub(sub_config, self.DISTANCE_MIN, self.DISTANCE_MAX)
314        else:
315            pub_config = autils.add_ranging_to_pub(pub_config, True)
316        (p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub,
317         peer_id_on_pub) = autils.create_discovery_pair(
318            p_dut,
319            s_dut,
320            p_config=pub_config,
321            s_config=sub_config,
322            device_startup_offset=self.device_startup_offset,
323            msg_id=self.get_next_msg_id())
324        for i in range(self.NUM_ITER):
325            result_sub = self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub)
326            asserts.assert_equal(rconsts.EVENT_CB_RANGING_STATUS_FAIL,
327                                 result_sub[rconsts.EVENT_CB_RANGING_KEY_STATUS],
328                                 "Ranging to publisher should fail.",
329                                 extras={"data": result_sub})
330
331        time.sleep(self.TIME_BETWEEN_ROLES)
332
333        for i in range(self.NUM_ITER):
334            result_pub = self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub)
335            asserts.assert_equal(rconsts.EVENT_CB_RANGING_STATUS_FAIL,
336                                 result_pub[rconsts.EVENT_CB_RANGING_KEY_STATUS],
337                                 "Ranging to subscriber should fail.",
338                                 extras={"data": result_pub})
339
340    #############################################################################
341
342    @test_tracker_info(uuid="22edba77-eeb2-43ee-875a-84437550ad84")
343    def test_rtt_oob_discovery_both_ways(self):
344        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
345        to communicate the MAC addresses to the peer. Test RTT both-ways:
346        switching rapidly between Initiator and Responder.
347        Functionality test: Only evaluate success rate.
348        """
349        rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
350            do_both_directions=True,
351            iter_count=self.NUM_ITER,
352            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
353            time_between_roles=self.TIME_BETWEEN_ROLES)
354        self.verify_results(rtt_results1, rtt_results2)
355
356    @test_tracker_info(uuid="18cef4be-95b4-4f7d-a140-5165874e7d1c")
357    def test_rtt_ib_discovery_one_way(self):
358        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
359        to communicate the MAC addresses to the peer. Test one-direction RTT only.
360        Functionality test: Only evaluate success rate.
361        """
362        rtt_results = self.run_rtt_ib_discovery_set(
363            do_both_directions=False,
364            iter_count=self.NUM_ITER,
365            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
366            time_between_roles=self.TIME_BETWEEN_ROLES)
367        self.verify_results(rtt_results)
368
369    @test_tracker_info(uuid="c67c8e70-c417-42d9-9bca-af3a89f1ddd9")
370    def test_rtt_ib_discovery_both_ways(self):
371        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
372        to communicate the MAC addresses to the peer. Test RTT both-ways:
373        switching rapidly between Initiator and Responder.
374        Functionality test: Only evaluate success rate.
375        """
376        rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
377            do_both_directions=True,
378            iter_count=self.NUM_ITER,
379            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
380            time_between_roles=self.TIME_BETWEEN_ROLES)
381        self.verify_results(rtt_results1, rtt_results2)
382
383    @test_tracker_info(uuid="82f954a5-c0ec-4bc6-8940-3b72199328ac")
384    def test_rtt_oob_discovery_both_ways_with_accuracy_evaluation(self):
385        """Perform RTT between 2 Wi-Fi Aware devices. Use out-of-band discovery
386        to communicate the MAC addresses to the peer. Test RTT both-ways:
387        switching rapidly between Initiator and Responder.
388        Performance test: evaluate success rate and accuracy.
389        """
390        rtt_results1, rtt_results2 = self.run_rtt_oob_discovery_set(
391            do_both_directions=True,
392            iter_count=self.NUM_ITER,
393            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
394            time_between_roles=self.TIME_BETWEEN_ROLES)
395        self.verify_results(rtt_results1, rtt_results2, accuracy_evaluation=True)
396
397    @test_tracker_info(uuid="4194e00c-ea49-488e-b67f-ad9360daa5fa")
398    def test_rtt_ib_discovery_one_way_with_accuracy_evaluation(self):
399        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
400        to communicate the MAC addresses to the peer. Test one-direction RTT only.
401        Performance test: evaluate success rate and accuracy.
402        """
403        rtt_results = self.run_rtt_ib_discovery_set(
404            do_both_directions=False,
405            iter_count=self.NUM_ITER,
406            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
407            time_between_roles=self.TIME_BETWEEN_ROLES)
408        self.verify_results(rtt_results, accuracy_evaluation=True)
409
410    @test_tracker_info(uuid="bea3ac8b-756d-4cd8-8936-b8bfe64676c9")
411    def test_rtt_ib_discovery_both_ways_with_accuracy_evaluation(self):
412        """Perform RTT between 2 Wi-Fi Aware devices. Use in-band (Aware) discovery
413        to communicate the MAC addresses to the peer. Test RTT both-ways:
414        switching rapidly between Initiator and Responder.
415        Performance test: evaluate success rate and accuracy.
416        """
417        rtt_results1, rtt_results2 = self.run_rtt_ib_discovery_set(
418            do_both_directions=True,
419            iter_count=self.NUM_ITER,
420            time_between_iterations=self.TIME_BETWEEN_ITERATIONS,
421            time_between_roles=self.TIME_BETWEEN_ROLES)
422        self.verify_results(rtt_results1, rtt_results2, accuracy_evaluation=True)
423
424    @test_tracker_info(uuid="54f9693d-45e5-4979-adbb-1b875d217c0c")
425    def test_rtt_without_initiator_aware(self):
426        """Try to perform RTT operation when there is no local Aware session (on the
427        Initiator). The Responder is configured normally: Aware on and a Publisher
428        with Ranging enable. Should FAIL.
429        """
430        init_dut = self.android_devices[0]
431        resp_dut = self.android_devices[1]
432
433        # Enable a Responder and start a Publisher
434        resp_id = resp_dut.droid.wifiAwareAttach(True)
435        autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
436        resp_ident_event = autils.wait_for_event(
437            resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
438        resp_mac = resp_ident_event['data']['mac']
439
440        resp_config = autils.add_ranging_to_pub(
441            autils.create_discovery_config(self.SERVICE_NAME,
442                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
443            enable_ranging=True)
444        resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
445        autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
446
447        # Initiate an RTT to Responder (no Aware started on Initiator!)
448        results = []
449        num_no_responses = 0
450        num_successes = 0
451        for i in range(self.NUM_ITER):
452            result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
453            self.log.debug("result: %s", result)
454            results.append(result)
455            if result is None:
456                num_no_responses = num_no_responses + 1
457            elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
458                  rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
459                num_successes = num_successes + 1
460
461        asserts.assert_equal(
462            num_no_responses, 0, "No RTT response?", extras={"data": results})
463        asserts.assert_equal(
464            num_successes,
465            0,
466            "Aware RTT w/o Aware should FAIL!",
467            extras={"data": results})
468        asserts.explicit_pass("RTT Aware test done", extras={"data": results})
469
470    @test_tracker_info(uuid="87a69053-8261-4928-8ec1-c93aac7f3a8d")
471    def test_rtt_without_responder_aware(self):
472        """Try to perform RTT operation when there is no peer Aware session (on the
473        Responder). Should FAIL."""
474        init_dut = self.android_devices[0]
475        resp_dut = self.android_devices[1]
476
477        # Enable a Responder and start a Publisher
478        resp_id = resp_dut.droid.wifiAwareAttach(True)
479        autils.wait_for_event(resp_dut, aconsts.EVENT_CB_ON_ATTACHED)
480        resp_ident_event = autils.wait_for_event(
481            resp_dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED)
482        resp_mac = resp_ident_event['data']['mac']
483
484        resp_config = autils.add_ranging_to_pub(
485            autils.create_discovery_config(self.SERVICE_NAME,
486                                           aconsts.PUBLISH_TYPE_UNSOLICITED),
487            enable_ranging=True)
488        resp_dut.droid.wifiAwarePublish(resp_id, resp_config)
489        autils.wait_for_event(resp_dut, aconsts.SESSION_CB_ON_PUBLISH_STARTED)
490
491        # Disable Responder
492        resp_dut.droid.wifiAwareDestroy(resp_id)
493
494        # Enable the Initiator
495        init_id = init_dut.droid.wifiAwareAttach()
496        autils.wait_for_event(init_dut, aconsts.EVENT_CB_ON_ATTACHED)
497
498        # Initiate an RTT to Responder (no Aware started on Initiator!)
499        results = []
500        num_no_responses = 0
501        num_successes = 0
502        for i in range(self.NUM_ITER):
503            result = self.run_rtt_discovery(init_dut, resp_mac=resp_mac)
504            self.log.debug("result: %s", result)
505            results.append(result)
506            if result is None:
507                num_no_responses = num_no_responses + 1
508            elif (result[rconsts.EVENT_CB_RANGING_KEY_STATUS] ==
509                  rconsts.EVENT_CB_RANGING_STATUS_SUCCESS):
510                num_successes = num_successes + 1
511
512        asserts.assert_equal(
513            num_no_responses, 0, "No RTT response?", extras={"data": results})
514        asserts.assert_equal(
515            num_successes,
516            0,
517            "Aware RTT w/o Aware should FAIL!",
518            extras={"data": results})
519        asserts.explicit_pass("RTT Aware test done", extras={"data": results})
520
521    @test_tracker_info(uuid="80b0f96e-f87d-4dc9-a2b9-fae48558c8d8")
522    def test_rtt_with_publish_ranging_disabled(self):
523        """
524        Try to perform RTT operation when publish ranging disabled, should fail.
525        """
526        self.run_rtt_with_aware_session_disabled_ranging(True)
527
528    @test_tracker_info(uuid="cb93a902-9b7a-4734-ba81-864878f9fa55")
529    def test_rtt_with_subscribe_ranging_disabled(self):
530        """
531        Try to perform RTT operation when subscribe ranging disabled, should fail.
532        """
533        self.run_rtt_with_aware_session_disabled_ranging(False)
534