1# Copyright 2023 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Tests for sensor_fusion_utils."""
15
16import math
17import unittest
18
19import numpy as np
20from scipy.optimize import fmin
21
22import sensor_fusion_utils
23
24
25class SensorFusionUtilsTests(unittest.TestCase):
26  """Run a suite of unit tests on sensor_fusion_utils."""
27
28  _CAM_FRAME_TIME = 30 * sensor_fusion_utils._MSEC_TO_NSEC  # Similar to 30FPS
29  _CAM_ROT_AMPLITUDE = 0.04  # Empirical number for rotation per frame (rads/s).
30
31  def _generate_pwl_waveform(self, pts, step, amplitude):
32    """Helper function to generate piece wise linear waveform."""
33    pwl_waveform = []
34    for t in range(pts[0], pts[1], step):
35      pwl_waveform.append(0)
36    for t in range(pts[1], pts[2], step):
37      pwl_waveform.append((t-pts[1])/(pts[2]-pts[1])*amplitude)
38    for t in range(pts[2], pts[3], step):
39      pwl_waveform.append(amplitude)
40    for t in range(pts[3], pts[4], step):
41      pwl_waveform.append((pts[4]-t)/(pts[4]-pts[3])*amplitude)
42    for t in range(pts[4], pts[5], step):
43      pwl_waveform.append(0)
44    for t in range(pts[5], pts[6], step):
45      pwl_waveform.append((-1*(t-pts[5])/(pts[6]-pts[5]))*amplitude)
46    for t in range(pts[6], pts[7], step):
47      pwl_waveform.append(-1*amplitude)
48    for t in range(pts[7], pts[8], step):
49      pwl_waveform.append((t-pts[8])/(pts[8]-pts[7])*amplitude)
50    for t in range(pts[8], pts[9], step):
51      pwl_waveform.append(0)
52    return pwl_waveform
53
54  def _generate_test_waveforms(self, gyro_sampling_rate, t_offset=0):
55    """Define ideal camera/gryo behavior.
56
57    Args:
58      gyro_sampling_rate: Value in samples/sec.
59      t_offset: Value in ns for gyro/camera timing offset.
60
61    Returns:
62      cam_times: numpy array of camera times N values long.
63      cam_rots: numpy array of camera rotations N-1 values long.
64      gyro_events: list of dicts of gyro events N*gyro_sampling_rate/30 long.
65
66    Round trip for motor is ~2 seconds (~60 frames)
67            1111111111111111
68           i                i
69          i                  i
70         i                    i
71     0000                      0000                      0000
72                                   i                    i
73                                    i                  i
74                                     i                i
75                                      -1-1-1-1-1-1-1-1
76    t_0 t_1 t_2           t_3 t_4 t_5 t_6           t_7 t_8 t_9
77
78    Note gyro waveform must extend +/- _CORR_TIME_OFFSET_MAX to enable shifting
79    of camera waveform to find best correlation.
80
81    """
82
83    t_ramp = 4 * self._CAM_FRAME_TIME
84    pts = {}
85    pts[0] = 3 * self._CAM_FRAME_TIME
86    pts[1] = pts[0] + 3 * self._CAM_FRAME_TIME
87    pts[2] = pts[1] + t_ramp
88    pts[3] = pts[2] + 32 * self._CAM_FRAME_TIME
89    pts[4] = pts[3] + t_ramp
90    pts[5] = pts[4] + 4 * self._CAM_FRAME_TIME
91    pts[6] = pts[5] + t_ramp
92    pts[7] = pts[6] + 32 * self._CAM_FRAME_TIME
93    pts[8] = pts[7] + t_ramp
94    pts[9] = pts[8] + 4 * self._CAM_FRAME_TIME
95    cam_times = np.array(range(pts[0], pts[9], self._CAM_FRAME_TIME))
96    cam_rots = self._generate_pwl_waveform(
97        pts, self._CAM_FRAME_TIME, self._CAM_ROT_AMPLITUDE)
98    cam_rots.pop()  # rots is N-1 for N length times.
99    cam_rots = np.array(cam_rots)
100
101    # Generate gyro waveform.
102    gyro_step = int(round(
103        sensor_fusion_utils._SEC_TO_NSEC/gyro_sampling_rate, 0))
104    gyro_pts = {k: v+t_offset+self._CAM_FRAME_TIME//2 for k, v in pts.items()}
105    gyro_pts[0] = 0  # adjust end pts to bound camera
106    gyro_pts[9] += self._CAM_FRAME_TIME*2  # adjust end pt to bound camera
107    gyro_rot_amplitude = (self._CAM_ROT_AMPLITUDE / self._CAM_FRAME_TIME
108                          * sensor_fusion_utils._SEC_TO_NSEC)
109    gyro_rots = self._generate_pwl_waveform(
110        gyro_pts, gyro_step, gyro_rot_amplitude)
111
112    # Create gyro events list of dicts.
113    gyro_events = []
114    for i, t in enumerate(range(gyro_pts[0], gyro_pts[9], gyro_step)):
115      gyro_events.append({'time': t, 'z': gyro_rots[i]})
116
117    return cam_times, cam_rots, gyro_events
118
119  def test_get_gyro_rotations(self):
120    """Tests that gyro rotations are masked properly by camera rotations.
121
122    Note that waveform ideal waveform generation only works properly with
123    integer multiples of frame rate.
124    """
125    # Run with different sampling rates to validate.
126    for gyro_sampling_rate in [200, 1000]:  # 6x, 30x frame rate
127      cam_times, cam_rots, gyro_events = self._generate_test_waveforms(
128          gyro_sampling_rate)
129      gyro_rots = sensor_fusion_utils.get_gyro_rotations(
130          gyro_events, cam_times)
131      e_msg = f'gyro sampling rate = {gyro_sampling_rate}\n'
132      e_msg += f'cam_times = {list(cam_times)}\n'
133      e_msg += f'cam_rots = {list(cam_rots)}\n'
134      e_msg += f'gyro_rots = {list(gyro_rots)}'
135
136      self.assertTrue(np.allclose(
137          gyro_rots, cam_rots, atol=self._CAM_ROT_AMPLITUDE*0.10), e_msg)
138
139  def test_get_best_alignment_offset(self):
140    """Unit test for alignment offset check."""
141
142    gyro_sampling_rate = 5000
143    for t_offset_ms in [0, 1]:  # Run with different offsets to validate.
144      t_offset = int(t_offset_ms * sensor_fusion_utils._MSEC_TO_NSEC)
145      cam_times, cam_rots, gyro_events = self._generate_test_waveforms(
146          gyro_sampling_rate, t_offset)
147
148      (
149          best_fit_offset, coeffs, x, y
150      ) = sensor_fusion_utils.get_best_alignment_offset(
151          cam_times, cam_rots, gyro_events)
152      e_msg = f'best: {best_fit_offset} ms\n'
153      e_msg += f'coeffs: {coeffs}\n'
154      e_msg += f'x: {x}\n'
155      e_msg += f'y: {y}'
156      self.assertTrue(
157          math.isclose(t_offset_ms, best_fit_offset, abs_tol=0.1), e_msg)
158
159  def test_polynomial_from_coefficients(self):
160    """Unit test to check polynomial function generated from coefficients."""
161    # -2x^4 + 3x^3 + 4x^2 + 5x - 6
162    function_1 = sensor_fusion_utils.polynomial_from_coefficients(
163        [-2, 3, 4, 5, -6])
164    # 0.3x^2 - 0.6x + 0.9
165    function_2 = sensor_fusion_utils.polynomial_from_coefficients(
166        [0.3, -0.6, 0.9])
167    # -7x + 8
168    function_3 = sensor_fusion_utils.polynomial_from_coefficients([-7, -8])
169    for x in np.arange(-1.1, 1.1, 0.05):
170      self.assertEqual(function_1(x),
171                       -2 * x ** 4 + 3 * x ** 3 + 4 * x ** 2 + 5 * x + -6)
172      self.assertEqual(function_2(x), 0.3 * x ** 2 + -0.6 * x + 0.9)
173      self.assertEqual(function_3(x), -7 * x + -8)
174
175  def test_smallest_absolute_minimum_of_polynomial(self):
176    """Unit test for the derivative method to find minima."""
177    # List of polynomials where an x near zero locally minimizes the function
178    polynomials = ((-1e-9, 2e-9, 3e-10, 4e-11),
179                   (2e-8, -3e-4, 4e-6),
180                   (5, 6, -7, 8),
181                   (1e-4, 2e-8, 3e-12),
182                   # ideal data from historical results
183                   (1.44638577e-05, -6.75789581e-06, 1.02377194e-05))
184    for coefficients in polynomials:
185      derivative_minimum = (
186          sensor_fusion_utils.smallest_absolute_minimum_of_polynomial(
187              coefficients))
188      # x0=0 because the desired result is the smallest absolute minimum
189      scipy_minimum = fmin(
190          sensor_fusion_utils.polynomial_from_coefficients(coefficients),
191          x0=0, xtol=1e-10)[0]
192      self.assertAlmostEqual(
193          derivative_minimum,
194          scipy_minimum,
195          msg='Minimum value for polynomial function described by'
196              f' {coefficients} was expected to be'
197              f' {derivative_minimum}, received'
198              f' {scipy_minimum} via scipy.optimize.fmin.')
199
200
201if __name__ == '__main__':
202  unittest.main()
203