1 /*
2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "android.hardware.tv.tuner-service.example-Dvr"
19 
20 #include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
21 #include <aidl/android/hardware/tv/tuner/Result.h>
22 
23 #include <utils/Log.h>
24 #include "Dvr.h"
25 
26 namespace aidl {
27 namespace android {
28 namespace hardware {
29 namespace tv {
30 namespace tuner {
31 
32 #define WAIT_TIMEOUT 3000000000
33 
Dvr(DvrType type,uint32_t bufferSize,const std::shared_ptr<IDvrCallback> & cb,std::shared_ptr<Demux> demux)34 Dvr::Dvr(DvrType type, uint32_t bufferSize, const std::shared_ptr<IDvrCallback>& cb,
35          std::shared_ptr<Demux> demux) {
36     mType = type;
37     mBufferSize = bufferSize;
38     mCallback = cb;
39     mDemux = demux;
40 }
41 
~Dvr()42 Dvr::~Dvr() {
43     // make sure thread has joined
44     close();
45 }
46 
getQueueDesc(MQDescriptor<int8_t,SynchronizedReadWrite> * out_queue)47 ::ndk::ScopedAStatus Dvr::getQueueDesc(MQDescriptor<int8_t, SynchronizedReadWrite>* out_queue) {
48     ALOGV("%s", __FUNCTION__);
49 
50     *out_queue = mDvrMQ->dupeDesc();
51 
52     return ::ndk::ScopedAStatus::ok();
53 }
54 
configure(const DvrSettings & in_settings)55 ::ndk::ScopedAStatus Dvr::configure(const DvrSettings& in_settings) {
56     ALOGV("%s", __FUNCTION__);
57 
58     mDvrSettings = in_settings;
59     mDvrConfigured = true;
60 
61     return ::ndk::ScopedAStatus::ok();
62 }
63 
attachFilter(const std::shared_ptr<IFilter> & in_filter)64 ::ndk::ScopedAStatus Dvr::attachFilter(const std::shared_ptr<IFilter>& in_filter) {
65     ALOGV("%s", __FUNCTION__);
66 
67     int64_t filterId;
68     ::ndk::ScopedAStatus status = in_filter->getId64Bit(&filterId);
69     if (!status.isOk()) {
70         return status;
71     }
72 
73     if (!mDemux->attachRecordFilter(filterId)) {
74         return ::ndk::ScopedAStatus::fromServiceSpecificError(
75                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
76     }
77 
78     return ::ndk::ScopedAStatus::ok();
79 }
80 
detachFilter(const std::shared_ptr<IFilter> & in_filter)81 ::ndk::ScopedAStatus Dvr::detachFilter(const std::shared_ptr<IFilter>& in_filter) {
82     ALOGV("%s", __FUNCTION__);
83 
84     int64_t filterId;
85     ::ndk::ScopedAStatus status = in_filter->getId64Bit(&filterId);
86     if (!status.isOk()) {
87         return status;
88     }
89 
90     if (!mDemux->detachRecordFilter(filterId)) {
91         return ::ndk::ScopedAStatus::fromServiceSpecificError(
92                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
93     }
94 
95     return ::ndk::ScopedAStatus::ok();
96 }
97 
start()98 ::ndk::ScopedAStatus Dvr::start() {
99     ALOGV("%s", __FUNCTION__);
100     if (mDvrThreadRunning) {
101         return ::ndk::ScopedAStatus::ok();
102     }
103 
104     if (!mCallback) {
105         return ::ndk::ScopedAStatus::fromServiceSpecificError(
106                 static_cast<int32_t>(Result::NOT_INITIALIZED));
107     }
108 
109     if (!mDvrConfigured) {
110         return ::ndk::ScopedAStatus::fromServiceSpecificError(
111                 static_cast<int32_t>(Result::INVALID_STATE));
112     }
113 
114     if (mType == DvrType::PLAYBACK) {
115         mDvrThreadRunning = true;
116         mDvrThread = std::thread(&Dvr::playbackThreadLoop, this);
117     } else if (mType == DvrType::RECORD) {
118         mRecordStatus = RecordStatus::DATA_READY;
119         mDemux->setIsRecording(mType == DvrType::RECORD);
120     }
121 
122     // TODO start another thread to send filter status callback to the framework
123 
124     return ::ndk::ScopedAStatus::ok();
125 }
126 
stop()127 ::ndk::ScopedAStatus Dvr::stop() {
128     ALOGV("%s", __FUNCTION__);
129 
130     mDvrThreadRunning = false;
131     if (mDvrThread.joinable()) {
132         mDvrThread.join();
133     }
134     // thread should always be joinable if it is running,
135     // so it should be safe to assume recording stopped.
136     mDemux->setIsRecording(false);
137 
138     return ::ndk::ScopedAStatus::ok();
139 }
140 
flush()141 ::ndk::ScopedAStatus Dvr::flush() {
142     ALOGV("%s", __FUNCTION__);
143 
144     mRecordStatus = RecordStatus::DATA_READY;
145 
146     return ::ndk::ScopedAStatus::ok();
147 }
148 
close()149 ::ndk::ScopedAStatus Dvr::close() {
150     ALOGV("%s", __FUNCTION__);
151 
152     stop();
153 
154     return ::ndk::ScopedAStatus::ok();
155 }
156 
setStatusCheckIntervalHint(int64_t)157 ::ndk::ScopedAStatus Dvr::setStatusCheckIntervalHint(int64_t /* in_milliseconds */) {
158     ALOGV("%s", __FUNCTION__);
159 
160     // There is no active polling in this default implementation,
161     // so directly return ok here.
162     return ::ndk::ScopedAStatus::ok();
163 }
164 
createDvrMQ()165 bool Dvr::createDvrMQ() {
166     ALOGV("%s", __FUNCTION__);
167 
168     // Create a synchronized FMQ that supports blocking read/write
169     unique_ptr<DvrMQ> tmpDvrMQ = unique_ptr<DvrMQ>(new (nothrow) DvrMQ(mBufferSize, true));
170     if (!tmpDvrMQ->isValid()) {
171         ALOGW("[Dvr] Failed to create FMQ of DVR");
172         return false;
173     }
174 
175     mDvrMQ = std::move(tmpDvrMQ);
176 
177     if (EventFlag::createEventFlag(mDvrMQ->getEventFlagWord(), &mDvrEventFlag) != ::android::OK) {
178         return false;
179     }
180 
181     return true;
182 }
183 
getDvrEventFlag()184 EventFlag* Dvr::getDvrEventFlag() {
185     return mDvrEventFlag;
186 }
187 
dump(int fd,const char **,uint32_t)188 binder_status_t Dvr::dump(int fd, const char** /* args */, uint32_t /* numArgs */) {
189     dprintf(fd, "    Dvr:\n");
190     dprintf(fd, "      mType: %hhd\n", mType);
191     dprintf(fd, "      mDvrThreadRunning: %d\n", (bool)mDvrThreadRunning);
192     return STATUS_OK;
193 }
194 
playbackThreadLoop()195 void Dvr::playbackThreadLoop() {
196     ALOGD("[Dvr] playback threadLoop start.");
197 
198     while (mDvrThreadRunning) {
199         uint32_t efState = 0;
200         ::android::status_t status =
201                 mDvrEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
202                                     &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
203         if (status != ::android::OK) {
204             ALOGD("[Dvr] wait for data ready on the playback FMQ");
205             continue;
206         }
207 
208         // If the both dvr playback and dvr record are created, the playback will be treated as
209         // the source of the record. isVirtualFrontend set to true would direct the dvr playback
210         // input to the demux record filters or live broadcast filters.
211         bool isRecording = mDemux->isRecording();
212         bool isVirtualFrontend = isRecording;
213 
214         if (mDvrSettings.get<DvrSettings::Tag::playback>().dataFormat == DataFormat::ES) {
215             if (!processEsDataOnPlayback(isVirtualFrontend, isRecording)) {
216                 ALOGE("[Dvr] playback es data failed to be filtered. Ending thread");
217                 break;
218             }
219             maySendPlaybackStatusCallback();
220             continue;
221         }
222 
223         // Our current implementation filter the data and write it into the filter FMQ immediately
224         // after the DATA_READY from the VTS/framework
225         // This is for the non-ES data source, real playback use case handling.
226         if (!readPlaybackFMQ(isVirtualFrontend, isRecording) ||
227             !startFilterDispatcher(isVirtualFrontend, isRecording)) {
228             ALOGE("[Dvr] playback data failed to be filtered. Ending thread");
229             break;
230         }
231 
232         maySendPlaybackStatusCallback();
233     }
234 
235     mDvrThreadRunning = false;
236     ALOGD("[Dvr] playback thread ended.");
237 }
238 
maySendIptvPlaybackStatusCallback()239 void Dvr::maySendIptvPlaybackStatusCallback() {
240     lock_guard<mutex> lock(mPlaybackStatusLock);
241     int availableToRead = mDvrMQ->availableToRead();
242     int availableToWrite = mDvrMQ->availableToWrite();
243 
244     PlaybackStatus newStatus = checkPlaybackStatusChange(availableToWrite, availableToRead,
245                                                          IPTV_PLAYBACK_STATUS_THRESHOLD_HIGH,
246                                                          IPTV_PLAYBACK_STATUS_THRESHOLD_LOW);
247     if (mPlaybackStatus != newStatus) {
248         map<int64_t, std::shared_ptr<Filter>>::iterator it;
249         for (it = mFilters.begin(); it != mFilters.end(); it++) {
250             std::shared_ptr<Filter> currentFilter = it->second;
251             currentFilter->setIptvDvrPlaybackStatus(newStatus);
252         }
253         mCallback->onPlaybackStatus(newStatus);
254         mPlaybackStatus = newStatus;
255     }
256 }
257 
maySendPlaybackStatusCallback()258 void Dvr::maySendPlaybackStatusCallback() {
259     lock_guard<mutex> lock(mPlaybackStatusLock);
260     int availableToRead = mDvrMQ->availableToRead();
261     int availableToWrite = mDvrMQ->availableToWrite();
262 
263     PlaybackStatus newStatus =
264             checkPlaybackStatusChange(availableToWrite, availableToRead,
265                                       mDvrSettings.get<DvrSettings::Tag::playback>().highThreshold,
266                                       mDvrSettings.get<DvrSettings::Tag::playback>().lowThreshold);
267     if (mPlaybackStatus != newStatus) {
268         mCallback->onPlaybackStatus(newStatus);
269         mPlaybackStatus = newStatus;
270     }
271 }
272 
checkPlaybackStatusChange(uint32_t availableToWrite,uint32_t availableToRead,int64_t highThreshold,int64_t lowThreshold)273 PlaybackStatus Dvr::checkPlaybackStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
274                                               int64_t highThreshold, int64_t lowThreshold) {
275     if (availableToWrite == 0) {
276         return PlaybackStatus::SPACE_FULL;
277     } else if (availableToRead > highThreshold) {
278         return PlaybackStatus::SPACE_ALMOST_FULL;
279     } else if (availableToRead < lowThreshold) {
280         return PlaybackStatus::SPACE_ALMOST_EMPTY;
281     } else if (availableToRead == 0) {
282         return PlaybackStatus::SPACE_EMPTY;
283     }
284     return mPlaybackStatus;
285 }
286 
readPlaybackFMQ(bool isVirtualFrontend,bool isRecording)287 bool Dvr::readPlaybackFMQ(bool isVirtualFrontend, bool isRecording) {
288     // Read playback data from the input FMQ
289     size_t size = mDvrMQ->availableToRead();
290     int64_t playbackPacketSize = mDvrSettings.get<DvrSettings::Tag::playback>().packetSize;
291     vector<int8_t> dataOutputBuffer;
292     dataOutputBuffer.resize(playbackPacketSize);
293     // Dispatch the packet to the PID matching filter output buffer
294     for (int i = 0; i < size / playbackPacketSize; i++) {
295         if (!mDvrMQ->read(dataOutputBuffer.data(), playbackPacketSize)) {
296             return false;
297         }
298         if (isVirtualFrontend) {
299             if (isRecording) {
300                 mDemux->sendFrontendInputToRecord(dataOutputBuffer);
301             } else {
302                 mDemux->startBroadcastTsFilter(dataOutputBuffer);
303             }
304         } else {
305             startTpidFilter(dataOutputBuffer);
306         }
307     }
308 
309     return true;
310 }
311 
processEsDataOnPlayback(bool isVirtualFrontend,bool isRecording)312 bool Dvr::processEsDataOnPlayback(bool isVirtualFrontend, bool isRecording) {
313     // Read ES from the DVR FMQ
314     // Note that currently we only provides ES with metaData in a specific format to be parsed.
315     // The ES size should be smaller than the Playback FMQ size to avoid reading truncated data.
316     int size = mDvrMQ->availableToRead();
317     vector<int8_t> dataOutputBuffer;
318     dataOutputBuffer.resize(size);
319     if (!mDvrMQ->read(dataOutputBuffer.data(), size)) {
320         return false;
321     }
322 
323     int metaDataSize = size;
324     int totalFrames = 0;
325     int videoEsDataSize = 0;
326     int audioEsDataSize = 0;
327     int audioPid = 0;
328     int videoPid = 0;
329 
330     vector<MediaEsMetaData> esMeta;
331     int videoReadPointer = 0;
332     int audioReadPointer = 0;
333     int frameCount = 0;
334     // Get meta data from the es
335     for (int i = 0; i < metaDataSize; i++) {
336         switch (dataOutputBuffer[i]) {
337             case 'm':
338                 metaDataSize = 0;
339                 getMetaDataValue(i, dataOutputBuffer.data(), metaDataSize);
340                 videoReadPointer = metaDataSize;
341                 continue;
342             case 'l':
343                 getMetaDataValue(i, dataOutputBuffer.data(), totalFrames);
344                 esMeta.resize(totalFrames);
345                 continue;
346             case 'V':
347                 getMetaDataValue(i, dataOutputBuffer.data(), videoEsDataSize);
348                 audioReadPointer = metaDataSize + videoEsDataSize;
349                 continue;
350             case 'A':
351                 getMetaDataValue(i, dataOutputBuffer.data(), audioEsDataSize);
352                 continue;
353             case 'p':
354                 if (dataOutputBuffer[++i] == 'a') {
355                     getMetaDataValue(i, dataOutputBuffer.data(), audioPid);
356                 } else if (dataOutputBuffer[i] == 'v') {
357                     getMetaDataValue(i, dataOutputBuffer.data(), videoPid);
358                 }
359                 continue;
360             case 'v':
361             case 'a':
362                 if (dataOutputBuffer[i + 1] != ',') {
363                     ALOGE("[Dvr] Invalid format meta data.");
364                     return false;
365                 }
366                 esMeta[frameCount] = {
367                         .isAudio = dataOutputBuffer[i] == 'a' ? true : false,
368                 };
369                 i += 5;  // Move to Len
370                 getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].len);
371                 if (esMeta[frameCount].isAudio) {
372                     esMeta[frameCount].startIndex = audioReadPointer;
373                     audioReadPointer += esMeta[frameCount].len;
374                 } else {
375                     esMeta[frameCount].startIndex = videoReadPointer;
376                     videoReadPointer += esMeta[frameCount].len;
377                 }
378                 i += 4;  // move to PTS
379                 getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].pts);
380                 frameCount++;
381                 continue;
382             default:
383                 continue;
384         }
385     }
386 
387     if (frameCount != totalFrames) {
388         ALOGE("[Dvr] Invalid meta data, frameCount=%d, totalFrames reported=%d", frameCount,
389               totalFrames);
390         return false;
391     }
392 
393     if (metaDataSize + audioEsDataSize + videoEsDataSize != size) {
394         ALOGE("[Dvr] Invalid meta data, metaSize=%d, videoSize=%d, audioSize=%d, totolSize=%d",
395               metaDataSize, videoEsDataSize, audioEsDataSize, size);
396         return false;
397     }
398 
399     // Read es raw data from the FMQ per meta data built previously
400     vector<int8_t> frameData;
401     map<int64_t, std::shared_ptr<Filter>>::iterator it;
402     int pid = 0;
403     for (int i = 0; i < totalFrames; i++) {
404         frameData.resize(esMeta[i].len);
405         pid = esMeta[i].isAudio ? audioPid : videoPid;
406         memcpy(frameData.data(), dataOutputBuffer.data() + esMeta[i].startIndex, esMeta[i].len);
407         // Send to the media filters or record filters
408         if (!isRecording) {
409             for (it = mFilters.begin(); it != mFilters.end(); it++) {
410                 if (pid == mDemux->getFilterTpid(it->first)) {
411                     mDemux->updateMediaFilterOutput(it->first, frameData,
412                                                     static_cast<uint64_t>(esMeta[i].pts));
413                 }
414             }
415         } else {
416             mDemux->sendFrontendInputToRecord(frameData, pid, static_cast<uint64_t>(esMeta[i].pts));
417         }
418         startFilterDispatcher(isVirtualFrontend, isRecording);
419         frameData.clear();
420     }
421 
422     return true;
423 }
424 
getMetaDataValue(int & index,int8_t * dataOutputBuffer,int & value)425 void Dvr::getMetaDataValue(int& index, int8_t* dataOutputBuffer, int& value) {
426     index += 2;  // Move the pointer across the ":" to the value
427     while (dataOutputBuffer[index] != ',' && dataOutputBuffer[index] != '\n') {
428         value = ((dataOutputBuffer[index++] - 48) + value * 10);
429     }
430 }
431 
startTpidFilter(vector<int8_t> data)432 void Dvr::startTpidFilter(vector<int8_t> data) {
433     map<int64_t, std::shared_ptr<Filter>>::iterator it;
434     for (it = mFilters.begin(); it != mFilters.end(); it++) {
435         uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
436         if (DEBUG_DVR) {
437             ALOGW("[Dvr] start ts filter pid: %d", pid);
438         }
439         if (pid == mDemux->getFilterTpid(it->first)) {
440             mDemux->updateFilterOutput(it->first, data);
441         }
442     }
443 }
444 
startFilterDispatcher(bool isVirtualFrontend,bool isRecording)445 bool Dvr::startFilterDispatcher(bool isVirtualFrontend, bool isRecording) {
446     if (isVirtualFrontend) {
447         if (isRecording) {
448             return mDemux->startRecordFilterDispatcher();
449         } else {
450             return mDemux->startBroadcastFilterDispatcher();
451         }
452     }
453 
454     map<int64_t, std::shared_ptr<Filter>>::iterator it;
455     // Handle the output data per filter type
456     for (it = mFilters.begin(); it != mFilters.end(); it++) {
457         if (!mDemux->startFilterHandler(it->first).isOk()) {
458             return false;
459         }
460     }
461 
462     return true;
463 }
464 
writePlaybackFMQ(void * buf,size_t size)465 int Dvr::writePlaybackFMQ(void* buf, size_t size) {
466     lock_guard<mutex> lock(mWriteLock);
467     ALOGI("Playback status: %d", mPlaybackStatus);
468     if (mPlaybackStatus == PlaybackStatus::SPACE_FULL) {
469         ALOGW("[Dvr] stops writing and wait for the client side flushing.");
470         return DVR_WRITE_FAILURE_REASON_FMQ_FULL;
471     }
472     ALOGI("availableToWrite before: %zu", mDvrMQ->availableToWrite());
473     if (mDvrMQ->write((int8_t*)buf, size)) {
474         mDvrEventFlag->wake(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY));
475         ALOGI("availableToWrite: %zu", mDvrMQ->availableToWrite());
476         maySendIptvPlaybackStatusCallback();
477         return DVR_WRITE_SUCCESS;
478     }
479     maySendIptvPlaybackStatusCallback();
480     return DVR_WRITE_FAILURE_REASON_UNKNOWN;
481 }
482 
writeRecordFMQ(const vector<int8_t> & data)483 bool Dvr::writeRecordFMQ(const vector<int8_t>& data) {
484     lock_guard<mutex> lock(mWriteLock);
485     if (mRecordStatus == RecordStatus::OVERFLOW) {
486         ALOGW("[Dvr] stops writing and wait for the client side flushing.");
487         return true;
488     }
489     if (mDvrMQ->write(data.data(), data.size())) {
490         mDvrEventFlag->wake(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY));
491         maySendRecordStatusCallback();
492         return true;
493     }
494 
495     maySendRecordStatusCallback();
496     return false;
497 }
498 
maySendRecordStatusCallback()499 void Dvr::maySendRecordStatusCallback() {
500     lock_guard<mutex> lock(mRecordStatusLock);
501     int availableToRead = mDvrMQ->availableToRead();
502     int availableToWrite = mDvrMQ->availableToWrite();
503 
504     RecordStatus newStatus =
505             checkRecordStatusChange(availableToWrite, availableToRead,
506                                     mDvrSettings.get<DvrSettings::Tag::record>().highThreshold,
507                                     mDvrSettings.get<DvrSettings::Tag::record>().lowThreshold);
508     if (mRecordStatus != newStatus) {
509         mCallback->onRecordStatus(newStatus);
510         mRecordStatus = newStatus;
511     }
512 }
513 
checkRecordStatusChange(uint32_t availableToWrite,uint32_t availableToRead,int64_t highThreshold,int64_t lowThreshold)514 RecordStatus Dvr::checkRecordStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
515                                           int64_t highThreshold, int64_t lowThreshold) {
516     if (availableToWrite == 0) {
517         return RecordStatus::OVERFLOW;
518     } else if (availableToRead > highThreshold) {
519         return RecordStatus::HIGH_WATER;
520     } else if (availableToRead < lowThreshold) {
521         return RecordStatus::LOW_WATER;
522     }
523     return mRecordStatus;
524 }
525 
addPlaybackFilter(int64_t filterId,std::shared_ptr<Filter> filter)526 bool Dvr::addPlaybackFilter(int64_t filterId, std::shared_ptr<Filter> filter) {
527     mFilters[filterId] = filter;
528     return true;
529 }
530 
removePlaybackFilter(int64_t filterId)531 bool Dvr::removePlaybackFilter(int64_t filterId) {
532     mFilters.erase(filterId);
533     return true;
534 }
535 
536 }  // namespace tuner
537 }  // namespace tv
538 }  // namespace hardware
539 }  // namespace android
540 }  // namespace aidl
541