1#!/usr/bin/env python3 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17"""Decodes the protobufs described in go/apollo-qa-tracing-design.""" 18 19import base64 20import binascii 21import struct 22 23from acts.controllers.buds_lib.dev_utils.proto.gen import apollo_qa_pb2 24from acts.controllers.buds_lib.dev_utils.proto.gen import audiowear_pb2 25 26 27def to_dictionary(proto): 28 proto_dic = {} 29 msg = [element.split(':') for element in str(proto).split('\n') if element] 30 for element in msg: 31 key = element[0].strip() 32 value = element[1].strip() 33 proto_dic[key] = value 34 return proto_dic 35 36 37def is_automation_protobuf(logline): 38 return logline.startswith('QA_MSG|') 39 40 41def decode(logline): 42 """Decode the logline. 43 44 Args: 45 logline: String line with the encoded message. 46 47 Returns: 48 String value with the decoded message. 49 """ 50 decoded = None 51 decoders = {'HEX': binascii.unhexlify, 'B64': base64.decodebytes} 52 msgs = { 53 apollo_qa_pb2.TRACE: 54 apollo_qa_pb2.ApolloQATrace, 55 apollo_qa_pb2.GET_VER_RESPONSE: 56 apollo_qa_pb2.ApolloQAGetVerResponse, 57 apollo_qa_pb2.GET_CODEC_RESPONSE: 58 apollo_qa_pb2.ApolloQAGetCodecResponse, 59 apollo_qa_pb2.GET_DSP_STATUS_RESPONSE: 60 apollo_qa_pb2.ApolloQAGetDspStatusResponse, 61 } 62 63 if is_automation_protobuf(logline): 64 _, encoding, message = logline.split("|", 2) 65 message = message.rstrip() 66 if encoding in decoders.keys(): 67 message = decoders[encoding](message) 68 header = message[0:4] 69 serialized = message[4:] 70 if len(header) == 4 and len(serialized) == len(message) - 4: 71 msg_group, msg_type, msg_len = struct.unpack('>BBH', header) 72 if (len(serialized) == msg_len and 73 msg_group == audiowear_pb2.APOLLO_QA): 74 proto = msgs[msg_type]() 75 proto.ParseFromString(serialized) 76 decoded = to_dictionary(proto) 77 return decoded 78