1# Copyright 2022 The Android Open Source Project
2#
3# Licensed under the Apache License, Version 2.0 (the 'License');
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an 'AS IS' BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""Utility functions to enable capture read noise analysis."""
15
16import csv
17import logging
18import math
19import os
20import pickle
21import camera_properties_utils
22import capture_request_utils
23import error_util
24import image_processing_utils
25import its_session_utils
26from matplotlib import pylab
27import matplotlib.pyplot as plt
28from matplotlib.ticker import NullLocator
29from matplotlib.ticker import ScalarFormatter
30import noise_model_constants
31import noise_model_utils
32import numpy as np
33
34_LINEAR_FIT_NUM_SAMPLES = 100  # Number of samples to plot for the linear fit
35_PLOT_AXIS_TICKS = 5  # Number of ticks to display on the plot axis
36_FIG_DPI = 100  # Read noise plots dpi.
37# Valid raw format for capturing read noise images.
38_VALID_RAW_FORMATS = ('raw', 'raw10', 'rawQuadBayer', 'raw10QuadBayer')
39
40
41def save_read_noise_data_as_csv(read_noise_data, iso_low, iso_high, file,
42                                color_channels_names):
43  """Creates and saves a CSV file containing read noise data.
44
45  Args:
46    read_noise_data: A list of lists of dictionaries, where each dictionary
47      contains read noise data for a single color channel.
48    iso_low: The minimum ISO sensitivity to include in the CSV file.
49    iso_high: The maximum ISO sensitivity to include in the CSV file.
50    file: The path to the CSV file to create.
51    color_channels_names: A list of color channels to include in the CSV file.
52  """
53  with open(file, 'w+') as f:
54    writer = csv.writer(f)
55
56    results = list(
57        filter(
58            lambda x: x[0]['iso'] >= iso_low and x[0]['iso'] <= iso_high,
59            read_noise_data,
60        )
61    )
62
63    # Create headers for csv file
64    headers = ['iso', 'iso^2']
65    headers.extend([f'mean_{color}' for color in color_channels_names])
66    headers.extend([f'var_{color}' for color in color_channels_names])
67    headers.extend([f'norm_var_{color}' for color in color_channels_names])
68
69    writer.writerow(headers)
70
71    # Create data rows
72    for data_row in results:
73      row = [data_row[0]['iso']]
74      row.append(data_row[0]['iso']**2)
75      row.extend([stats['mean'] for stats in data_row])
76      row.extend([stats['var'] for stats in data_row])
77      row.extend([stats['norm_var'] for stats in data_row])
78
79      writer.writerow(row)
80
81    writer.writerow([])  # divider line
82
83    # Create row containing the offset coefficients calculated by np.polyfit
84    coeff_headers = ['', 'offset_coefficient_a', 'offset_coefficient_b']
85    writer.writerow(coeff_headers)
86
87    offset_a, offset_b = get_read_noise_coefficients(results, iso_low, iso_high)
88    for i in range(len(color_channels_names)):
89      writer.writerow([color_channels_names[i], offset_a[i], offset_b[i]])
90
91
92def plot_read_noise_data(read_noise_data, iso_low, iso_high, file_path,
93                         color_channel_names, plot_colors):
94  """Plots the read noise data for the given ISO range.
95
96  Args:
97      read_noise_data: Quad Bayer read noise data object.
98      iso_low: The minimum iso value to include.
99      iso_high: The maximum iso value to include.
100      file_path: File path for the plot image.
101      color_channel_names: The name list of each color channel.
102      plot_colors: The color list for plotting.
103  """
104  num_channels = len(color_channel_names)
105  is_quad_bayer = num_channels == noise_model_constants.NUM_QUAD_BAYER_CHANNELS
106  # Create the figure for plotting the read noise to ISO^2 curve.
107  fig, ((red, green_r), (green_b, blue)) = plt.subplots(2, 2, figsize=(22, 22))
108  subplots = [red, green_r, green_b, blue]
109  fig.gca()
110  fig.suptitle('Read Noise to ISO^2', x=0.54, y=0.99)
111
112  # Get the ISO values for the current range.
113  filtered_data = list(
114      filter(
115          lambda x: x[0]['iso'] >= iso_low and x[0]['iso'] <= iso_high,
116          read_noise_data,
117      )
118  )
119
120  # Get X-axis values (ISO^2) for current_range.
121  iso_sq = [data[0]['iso']**2 for data in filtered_data]
122
123  # Get X-axis values for the calculated linear fit for the read noise
124  iso_sq_values = np.linspace(iso_low**2, iso_high**2, _LINEAR_FIT_NUM_SAMPLES)
125
126  # Get the line fit coeff for plotting the linear fit of read noise to iso^2
127  coeff_a, coeff_b = get_read_noise_coefficients(
128      filtered_data, iso_low, iso_high
129  )
130
131  # Plot the read noise to iso^2 data
132  for pidx, color_channel in enumerate(color_channel_names):
133    norm_vars = [data[pidx]['norm_var'] for data in filtered_data]
134
135    # Plot the measured read noise to ISO^2 values
136    if is_quad_bayer:
137      subplot = subplots[pidx // 4]
138    else:
139      subplot = subplots[pidx]
140
141    subplot.plot(
142        iso_sq,
143        norm_vars,
144        color=plot_colors[pidx],
145        marker='o',
146        markeredgecolor=plot_colors[pidx],
147        linestyle='None',
148        label=color_channel,
149        alpha=0.3,
150    )
151
152    # Plot the line fit calculated from the read noise values
153    subplot.plot(
154        iso_sq_values,
155        coeff_a[pidx] * iso_sq_values + coeff_b[pidx],
156        color=plot_colors[pidx],
157        )
158
159  # Create a numpy array containing all normalized variance values for the
160  # current range, this will be used for labelling the X-axis
161  y_values = np.array(
162      [[color['norm_var'] for color in x] for x in filtered_data]
163  )
164
165  x_ticks = np.linspace(iso_low**2, iso_high**2, _PLOT_AXIS_TICKS)
166  y_ticks = np.linspace(np.min(y_values), np.max(y_values), _PLOT_AXIS_TICKS)
167
168  for i, subplot in enumerate(subplots):
169    subplot.set_title(noise_model_constants.BAYER_COLORS[i])
170    subplot.set_xlabel('ISO^2')
171    subplot.set_ylabel('Read Noise')
172
173    subplot.set_xticks(x_ticks)
174    subplot.xaxis.set_minor_locator(NullLocator())
175    subplot.xaxis.set_major_formatter(ScalarFormatter())
176
177    subplot.set_yticks(y_ticks)
178    subplot.yaxis.set_minor_locator(NullLocator())
179    subplot.yaxis.set_major_formatter(ScalarFormatter())
180
181    subplot.legend()
182    pylab.tight_layout()
183
184  fig.savefig(file_path, dpi=_FIG_DPI)
185
186
187def _generate_read_noise_stats(img, iso, white_level, cfa_order):
188  """Generates read noise data for a given image.
189
190    The read noise data of each channel is added in the order of cfa_order.
191    As a result, the read noise data channels are reordered as the following.
192    (1) For standard Bayer: R, Gr, Gb, B.
193    (2) For quad Bayer: R0, R1, R2, R3,
194                        Gr0, Gr1, Gr2, Gr3,
195                        Gb0, Gb1, Gb2, Gb3,
196                        B0, B1, B2, B3.
197
198  Args:
199    img: The input image.
200    iso: The ISO sensitivity used to capture the image.
201    white_level: The white level of the image.
202    cfa_order: The color filter arrangement (CFA) order of the image.
203
204  Returns:
205    A list of dictionaries, where each dictionary contains information for a
206    single color channel in the image.
207  """
208  result = []
209
210  num_channels = len(cfa_order)
211  channel_img = image_processing_utils.subsample(img, num_channels)
212
213  # Create a list of dictionaries of read noise stats for each color channel
214  # in the image.
215  # The stats is reordered according to the color filter arrangement order.
216  for ch in cfa_order:
217    mean = np.mean(channel_img[:, :, ch])
218    var = np.var(channel_img[:, :, ch])
219    norm_var = var / ((white_level - mean)**2)
220    result.append({
221        'iso': iso,
222        'mean': mean,
223        'var': var,
224        'norm_var': norm_var
225    })
226
227  return result
228
229
230def get_read_noise_coefficients(read_noise_data, iso_low=0, iso_high=1000000):
231  """Calculates read noise coefficients that best fit the read noise data.
232
233  Args:
234    read_noise_data: Read noise data object.
235    iso_low: The lower bound of the ISO range to consider.
236    iso_high: The upper bound of the ISO range to consider.
237
238  Returns:
239    A tuple of two numpy arrays, where the first array contains read noise
240    coefficient a and the second array contains read noise coefficient b.
241  """
242  # Filter the values by the given ISO range.
243  read_noise_data_filtered = list(
244      filter(
245          lambda x: x[0]['iso'] >= iso_low and x[0]['iso'] <= iso_high,
246          read_noise_data,
247      )
248  )
249
250  read_noise_coefficients_a = []
251  read_noise_coefficients_b = []
252
253  # Get ISO^2 values used for X-axis in polyfit
254  iso_sq = [data[0]['iso'] ** 2 for data in read_noise_data_filtered]
255
256  # Find the linear equation coefficients for each color channel
257  num_channels = len(read_noise_data_filtered[0])
258  for i in range(num_channels):
259    norm_var = [data[i]['norm_var'] for data in read_noise_data_filtered]
260    coeffs = np.polyfit(iso_sq, norm_var, 1)
261
262    read_noise_coefficients_a.append(coeffs[0])
263    read_noise_coefficients_b.append(coeffs[1])
264
265  read_noise_coefficients_a = np.asarray(read_noise_coefficients_a)
266  read_noise_coefficients_b = np.asarray(read_noise_coefficients_b)
267  return read_noise_coefficients_a, read_noise_coefficients_b
268
269
270def _capture_read_noise_for_iso_range(cam, raw_format, low_iso, high_iso,
271                                      steps_per_stop, dest_file):
272  """Captures read noise data at the lowest advertised exposure value.
273
274  This function captures a series of images at different ISO sensitivities,
275  starting at `low_iso` and ending at `high_iso`. The number of steps between
276  each ISO sensitivity is equal to `steps`. Then read noise stats data is
277  computed. Finally, stats data of color channels are reordered into the
278  canonical order before saving it to `dest_file`.
279
280  Args:
281    cam:             Camera for the current ItsSession.
282    raw_format:      The format of read noise image.
283    low_iso:         The lowest iso value in range.
284    high_iso:        The highest iso value in range.
285    steps_per_stop:  Steps to take per stop.
286    dest_file:       The path where read noise stats should be saved.
287
288  Returns:
289    Read noise stats list for each sensitivity.
290  """
291  if raw_format not in _VALID_RAW_FORMATS:
292    supported_formats_str = ', '.join(_VALID_RAW_FORMATS)
293    raise error_util.CameraItsError(
294        f'Invalid raw format {raw_format}. '
295        f'Current supported raw formats: {supported_formats_str}.'
296    )
297
298  props = cam.get_camera_properties()
299  props = cam.override_with_hidden_physical_camera_props(props)
300
301  format_check_result = False
302  if raw_format in ('raw', 'rawQuadBayer'):
303    format_check_result = camera_properties_utils.raw16(props)
304  elif raw_format in ('raw10', 'raw10QuadBayer'):
305    format_check_result = camera_properties_utils.raw10(props)
306
307  camera_properties_utils.skip_unless(
308      format_check_result and
309      camera_properties_utils.manual_sensor(props) and
310      camera_properties_utils.read_3a(props) and
311      camera_properties_utils.per_frame_control(props))
312  min_exposure_ns, _ = props['android.sensor.info.exposureTimeRange']
313  min_fd = props['android.lens.info.minimumFocusDistance']
314  white_level = props['android.sensor.info.whiteLevel']
315  is_quad_bayer = 'QuadBayer' in raw_format
316  cfa_order = image_processing_utils.get_canonical_cfa_order(
317      props, is_quad_bayer
318  )
319  pre_iso_cap = None
320  iso = low_iso
321  iso_multiplier = math.pow(2, 1.0 / steps_per_stop)
322  stats_list = []
323  # This operation can last a very long time, if it happens to fail halfway
324  # through, this section of code will allow us to pick up where we left off
325  if os.path.exists(dest_file):
326    # If there already exists a read noise stats file, retrieve them.
327    with open(dest_file, 'rb') as f:
328      stats_list = pickle.load(f)
329    # Set the starting iso to the last iso of read noise stats.
330    pre_iso_cap = stats_list[-1][0]['iso']
331    iso = noise_model_utils.get_next_iso(pre_iso_cap, high_iso, iso_multiplier)
332
333  if round(iso) <= high_iso:
334    # Wait until camera is repositioned for read noise data collection.
335    input(
336        f'\nPress <ENTER> after concealing camera {cam.get_camera_name()} '
337        'in complete darkness.\n'
338    )
339
340  fmt = {'format': raw_format}
341  logging.info('Capturing read noise images with format %s.', raw_format)
342  while round(iso) <= high_iso:
343    req = capture_request_utils.manual_capture_request(
344        round(iso), min_exposure_ns
345    )
346    req['android.lens.focusDistance'] = min_fd
347    cap = cam.do_capture(req, fmt)
348    iso_cap = cap['metadata']['android.sensor.sensitivity']
349
350    # Different iso values may result in captures with the same iso_cap value,
351    # so skip this capture if it's redundant.
352    if iso_cap == pre_iso_cap:
353      logging.info(
354          'Skip current capture because of the same iso %d with the previous'
355          ' capture.',
356          iso_cap,
357      )
358      iso = noise_model_utils.get_next_iso(iso, high_iso, iso_multiplier)
359      continue
360
361    pre_iso_cap = iso_cap
362    w = cap['width']
363    h = cap['height']
364
365    if raw_format in ('raw10', 'raw10QuadBayer'):
366      img = image_processing_utils.unpack_raw10_image(
367          cap['data'].reshape(h, w * 5 // 4)
368      )
369    elif raw_format in ('raw', 'rawQuadBayer'):
370      img = np.ndarray(
371          shape=(h * w,), dtype='<u2', buffer=cap['data'][0: w * h * 2]
372      )
373      img = img.astype(dtype=np.uint16).reshape(h, w)
374
375    # Add reordered read noise stats to read noise stats list.
376    stats = _generate_read_noise_stats(img, iso_cap, white_level, cfa_order)
377    stats_list.append(stats)
378
379    logging.info('iso: %.2f, mean: %.2f, var: %.2f, min: %d, max: %d', iso_cap,
380                 np.mean(img), np.var(img), np.min(img), np.max(img))
381
382    with open(dest_file, 'wb+') as f:
383      pickle.dump(stats_list, f)
384
385    iso = noise_model_utils.get_next_iso(iso, high_iso, iso_multiplier)
386
387  logging.info('Read noise stats pickled into file %s.', dest_file)
388
389  return stats_list
390
391
392def calibrate_read_noise(
393    device_id: str,
394    camera_id: str,
395    hidden_physical_id: str,
396    read_noise_folder_prefix: str,
397    read_noise_file_name: str,
398    steps_per_stop: int,
399    raw_format: str = 'raw',
400    is_two_stage_model: bool = False,
401) -> str:
402  """Calibrates the read noise of the camera.
403
404  Read noise is a type of noise that occurs in digital cameras when the image
405  sensor converts light to an electronic signal. Calibrating read noise is the
406  first step in the 2-stage noise model calibration.
407
408  Args:
409    device_id: The device ID of the camera.
410    camera_id: The camera ID of the camera.
411    hidden_physical_id: The hidden physical ID of the camera.
412    read_noise_folder_prefix: The prefix of the read noise folder.
413    read_noise_file_name: The name of the read noise file.
414    steps_per_stop: The number of steps per stop.
415    raw_format: The format of raw capture, which can be one of raw, raw10,
416      rawQuadBayer and raw10QuadBayer.
417    is_two_stage_model: A boolean flag indicating if the noise model is
418      calibrated in the two-stage mode.
419
420  Returns:
421    The path to the read noise file.
422  """
423  if not is_two_stage_model:
424    return ''
425  # If two-stage model is enabled, check/collect read noise data.
426  with its_session_utils.ItsSession(
427      device_id=device_id,
428      camera_id=camera_id,
429      hidden_physical_id=hidden_physical_id,
430  ) as cam:
431    props = cam.get_camera_properties()
432    props = cam.override_with_hidden_physical_camera_props(props)
433
434    # Get sensor analog ISO range.
435    sens_min, _ = props['android.sensor.info.sensitivityRange']
436    sens_max_analog = props['android.sensor.maxAnalogSensitivity']
437    # Maximum sensitivity for measuring noise model.
438    sens_max_meas = sens_max_analog
439
440    # Prepare read noise folder.
441    camera_name = cam.get_camera_name()
442    read_noise_folder = os.path.join(
443        read_noise_folder_prefix, device_id.replace(':', '_'), camera_name
444    )
445    read_noise_file_path = os.path.join(read_noise_folder, read_noise_file_name)
446    if not os.path.exists(read_noise_folder):
447      os.makedirs(read_noise_folder)
448    logging.info('Read noise data folder: %s', read_noise_folder)
449
450    # Collect or retrieve read noise data.
451    if not os.path.isfile(read_noise_file_path):
452      logging.info('Collecting read noise data for %s', camera_name)
453      # Read noise data file does not exist, collect read noise data.
454      _capture_read_noise_for_iso_range(
455          cam,
456          raw_format,
457          sens_min,
458          sens_max_meas,
459          steps_per_stop,
460          read_noise_file_path,
461      )
462    else:
463      # If data exists, check if it covers the full range.
464      with open(read_noise_file_path, 'rb') as f:
465        read_noise_data = pickle.load(f)
466        # The +5 offset takes write to read error into account.
467        if read_noise_data[-1][0]['iso'] + 5 < sens_max_meas:
468          logging.error(
469              (
470                  '\nNot enough ISO data points exist. '
471                  '\nMax ISO measured: %.2f'
472                  '\nMax ISO possible: %.2f'
473              ),
474              read_noise_data[-1][0]['iso'],
475              sens_max_meas,
476          )
477          # Not all data points were captured, continue capture.
478          _capture_read_noise_for_iso_range(
479              cam,
480              raw_format,
481              sens_min,
482              sens_max_meas,
483              steps_per_stop,
484              read_noise_file_path,
485          )
486
487    return read_noise_file_path
488