1#!/usr/bin/env python3
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17"""Decodes the protobufs described in go/apollo-qa-tracing-design."""
18
19import base64
20import binascii
21import struct
22
23from acts.controllers.buds_lib.dev_utils.proto.gen import apollo_qa_pb2
24from acts.controllers.buds_lib.dev_utils.proto.gen import audiowear_pb2
25
26
27def to_dictionary(proto):
28    proto_dic = {}
29    msg = [element.split(':') for element in str(proto).split('\n') if element]
30    for element in msg:
31        key = element[0].strip()
32        value = element[1].strip()
33        proto_dic[key] = value
34    return proto_dic
35
36
37def is_automation_protobuf(logline):
38    return logline.startswith('QA_MSG|')
39
40
41def decode(logline):
42    """Decode the logline.
43
44    Args:
45      logline: String line with the encoded message.
46
47    Returns:
48      String value with the decoded message.
49    """
50    decoded = None
51    decoders = {'HEX': binascii.unhexlify, 'B64': base64.decodebytes}
52    msgs = {
53        apollo_qa_pb2.TRACE:
54            apollo_qa_pb2.ApolloQATrace,
55        apollo_qa_pb2.GET_VER_RESPONSE:
56            apollo_qa_pb2.ApolloQAGetVerResponse,
57        apollo_qa_pb2.GET_CODEC_RESPONSE:
58            apollo_qa_pb2.ApolloQAGetCodecResponse,
59        apollo_qa_pb2.GET_DSP_STATUS_RESPONSE:
60            apollo_qa_pb2.ApolloQAGetDspStatusResponse,
61    }
62
63    if is_automation_protobuf(logline):
64        _, encoding, message = logline.split("|", 2)
65        message = message.rstrip()
66        if encoding in decoders.keys():
67            message = decoders[encoding](message)
68            header = message[0:4]
69            serialized = message[4:]
70            if len(header) == 4 and len(serialized) == len(message) - 4:
71                msg_group, msg_type, msg_len = struct.unpack('>BBH', header)
72                if (len(serialized) == msg_len and
73                        msg_group == audiowear_pb2.APOLLO_QA):
74                    proto = msgs[msg_type]()
75                    proto.ParseFromString(serialized)
76                    decoded = to_dictionary(proto)
77    return decoded
78