1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.car.telemetry.databroker; 18 19 import android.annotation.NonNull; 20 import android.car.telemetry.TelemetryProto; 21 import android.os.PersistableBundle; 22 import android.os.SystemClock; 23 24 import java.util.List; 25 import java.util.Objects; 26 27 /** 28 * Subscriber class that receives published data and schedules tasks for execution. 29 * All methods of this class must be accessed on telemetry thread. 30 */ 31 public class DataSubscriber { 32 /** 33 * Binder transaction size limit is 1MB for all binders per process, so for large script input 34 * file pipe will be used to transfer the data to script executor instead of binder call. This 35 * is the input size threshold above which piping is used. 36 */ 37 public static final int SCRIPT_INPUT_SIZE_THRESHOLD_BYTES = 20 * 1024; // 20 kb 38 39 private final DataBroker mDataBroker; 40 private final TelemetryProto.MetricsConfig mMetricsConfig; 41 private final TelemetryProto.Subscriber mSubscriber; 42 DataSubscriber( @onNull DataBroker dataBroker, @NonNull TelemetryProto.MetricsConfig metricsConfig, @NonNull TelemetryProto.Subscriber subscriber)43 public DataSubscriber( 44 @NonNull DataBroker dataBroker, 45 @NonNull TelemetryProto.MetricsConfig metricsConfig, 46 @NonNull TelemetryProto.Subscriber subscriber) { 47 mDataBroker = dataBroker; 48 mMetricsConfig = metricsConfig; 49 mSubscriber = subscriber; 50 } 51 52 /** Returns the handler function name for this subscriber. */ 53 @NonNull getHandlerName()54 public String getHandlerName() { 55 return mSubscriber.getHandler(); 56 } 57 58 /** 59 * Returns the publisher param {@link TelemetryProto.Publisher} that 60 * contains the data source and the config. 61 */ 62 @NonNull getPublisherParam()63 public TelemetryProto.Publisher getPublisherParam() { 64 return mSubscriber.getPublisher(); 65 } 66 67 /** 68 * Returns the publisher type (as a number) indicates which type of 69 * {@link TelemetryProto.Publisher} will publish the data. 70 */ getPublisherType()71 private int getPublisherType() { 72 return getPublisherParam().getPublisherCase().getNumber(); 73 } 74 75 /** 76 * Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task 77 * will be pending execution. Flag isLargeData indicates whether data is large. 78 * 79 * @param data The published data. 80 * @param isLargeData Whether the data is large. 81 * @return The number of tasks that are pending execution that are produced by the calling 82 * publisher. 83 */ push(@onNull PersistableBundle data, boolean isLargeData)84 public int push(@NonNull PersistableBundle data, boolean isLargeData) { 85 ScriptExecutionTask task = new ScriptExecutionTask( 86 this, data, SystemClock.elapsedRealtime(), isLargeData, getPublisherType()); 87 return mDataBroker.addTaskToQueue(task); 88 } 89 90 /** 91 * Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task 92 * will be pending execution. 93 * 94 * @param bundleList The published bundle list data. 95 * @return The number of tasks that are pending execution that are produced by the calling 96 * publisher. 97 */ push(@onNull List<PersistableBundle> bundleList)98 public int push(@NonNull List<PersistableBundle> bundleList) { 99 ScriptExecutionTask task = new ScriptExecutionTask( 100 this, bundleList, SystemClock.elapsedRealtime(), getPublisherType()); 101 return mDataBroker.addTaskToQueue(task); 102 } 103 104 /** 105 * Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task 106 * will be pending execution. Defaults isLargeData flag to false. 107 * 108 * @param data The published data. 109 * @return The number of tasks that are pending execution that are produced by the calling 110 * publisher. 111 */ push(@onNull PersistableBundle data)112 public int push(@NonNull PersistableBundle data) { 113 return push(data, false); 114 } 115 116 /** Returns the {@link TelemetryProto.MetricsConfig}. */ 117 @NonNull getMetricsConfig()118 public TelemetryProto.MetricsConfig getMetricsConfig() { 119 return mMetricsConfig; 120 } 121 122 /** Returns the {@link TelemetryProto.Subscriber}. */ 123 @NonNull getSubscriber()124 public TelemetryProto.Subscriber getSubscriber() { 125 return mSubscriber; 126 } 127 128 /** Returns the priority of subscriber. */ getPriority()129 public int getPriority() { 130 return mSubscriber.getPriority(); 131 } 132 133 @Override equals(Object o)134 public boolean equals(Object o) { 135 if (!(o instanceof DataSubscriber)) { 136 return false; 137 } 138 DataSubscriber other = (DataSubscriber) o; 139 return mMetricsConfig.getName().equals(other.getMetricsConfig().getName()) 140 && mMetricsConfig.getVersion() == other.getMetricsConfig().getVersion() 141 && mSubscriber.getHandler().equals(other.getSubscriber().getHandler()); 142 } 143 144 @Override hashCode()145 public int hashCode() { 146 return Objects.hash(mMetricsConfig.getName(), mMetricsConfig.getVersion(), 147 mSubscriber.getHandler()); 148 } 149 } 150