1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.car.telemetry.databroker;
18 
19 import android.annotation.NonNull;
20 import android.car.telemetry.TelemetryProto;
21 import android.os.PersistableBundle;
22 import android.os.SystemClock;
23 
24 import java.util.List;
25 import java.util.Objects;
26 
27 /**
28  * Subscriber class that receives published data and schedules tasks for execution.
29  * All methods of this class must be accessed on telemetry thread.
30  */
31 public class DataSubscriber {
32     /**
33      * Binder transaction size limit is 1MB for all binders per process, so for large script input
34      * file pipe will be used to transfer the data to script executor instead of binder call. This
35      * is the input size threshold above which piping is used.
36      */
37     public static final int SCRIPT_INPUT_SIZE_THRESHOLD_BYTES = 20 * 1024; // 20 kb
38 
39     private final DataBroker mDataBroker;
40     private final TelemetryProto.MetricsConfig mMetricsConfig;
41     private final TelemetryProto.Subscriber mSubscriber;
42 
DataSubscriber( @onNull DataBroker dataBroker, @NonNull TelemetryProto.MetricsConfig metricsConfig, @NonNull TelemetryProto.Subscriber subscriber)43     public DataSubscriber(
44             @NonNull DataBroker dataBroker,
45             @NonNull TelemetryProto.MetricsConfig metricsConfig,
46             @NonNull TelemetryProto.Subscriber subscriber) {
47         mDataBroker = dataBroker;
48         mMetricsConfig = metricsConfig;
49         mSubscriber = subscriber;
50     }
51 
52     /** Returns the handler function name for this subscriber. */
53     @NonNull
getHandlerName()54     public String getHandlerName() {
55         return mSubscriber.getHandler();
56     }
57 
58     /**
59      * Returns the publisher param {@link TelemetryProto.Publisher} that
60      * contains the data source and the config.
61      */
62     @NonNull
getPublisherParam()63     public TelemetryProto.Publisher getPublisherParam() {
64         return mSubscriber.getPublisher();
65     }
66 
67     /**
68      * Returns the publisher type (as a number) indicates which type of
69      * {@link TelemetryProto.Publisher} will publish the data.
70      */
getPublisherType()71     private int getPublisherType() {
72         return getPublisherParam().getPublisherCase().getNumber();
73     }
74 
75     /**
76      * Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task
77      * will be pending execution. Flag isLargeData indicates whether data is large.
78      *
79      * @param data The published data.
80      * @param isLargeData Whether the data is large.
81      * @return The number of tasks that are pending execution that are produced by the calling
82      * publisher.
83      */
push(@onNull PersistableBundle data, boolean isLargeData)84     public int push(@NonNull PersistableBundle data, boolean isLargeData) {
85         ScriptExecutionTask task = new ScriptExecutionTask(
86                 this, data, SystemClock.elapsedRealtime(), isLargeData, getPublisherType());
87         return mDataBroker.addTaskToQueue(task);
88     }
89 
90     /**
91      * Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task
92      * will be pending execution.
93      *
94      * @param bundleList The published bundle list data.
95      * @return The number of tasks that are pending execution that are produced by the calling
96      * publisher.
97      */
push(@onNull List<PersistableBundle> bundleList)98     public int push(@NonNull List<PersistableBundle> bundleList) {
99         ScriptExecutionTask task = new ScriptExecutionTask(
100                 this, bundleList, SystemClock.elapsedRealtime(), getPublisherType());
101         return mDataBroker.addTaskToQueue(task);
102     }
103 
104     /**
105      * Creates a {@link ScriptExecutionTask} and pushes it to the priority queue where the task
106      * will be pending execution. Defaults isLargeData flag to false.
107      *
108      * @param data The published data.
109      * @return The number of tasks that are pending execution that are produced by the calling
110      * publisher.
111      */
push(@onNull PersistableBundle data)112     public int push(@NonNull PersistableBundle data) {
113         return push(data, false);
114     }
115 
116     /** Returns the {@link TelemetryProto.MetricsConfig}. */
117     @NonNull
getMetricsConfig()118     public TelemetryProto.MetricsConfig getMetricsConfig() {
119         return mMetricsConfig;
120     }
121 
122     /** Returns the {@link TelemetryProto.Subscriber}. */
123     @NonNull
getSubscriber()124     public TelemetryProto.Subscriber getSubscriber() {
125         return mSubscriber;
126     }
127 
128     /** Returns the priority of subscriber. */
getPriority()129     public int getPriority() {
130         return mSubscriber.getPriority();
131     }
132 
133     @Override
equals(Object o)134     public boolean equals(Object o) {
135         if (!(o instanceof DataSubscriber)) {
136             return false;
137         }
138         DataSubscriber other = (DataSubscriber) o;
139         return mMetricsConfig.getName().equals(other.getMetricsConfig().getName())
140                 && mMetricsConfig.getVersion() == other.getMetricsConfig().getVersion()
141                 && mSubscriber.getHandler().equals(other.getSubscriber().getHandler());
142     }
143 
144     @Override
hashCode()145     public int hashCode() {
146         return Objects.hash(mMetricsConfig.getName(), mMetricsConfig.getVersion(),
147                 mSubscriber.getHandler());
148     }
149 }
150