1 /*
2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "android.hardware.tv.tuner-service.example-Demux"
19 
20 #include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
21 #include <aidl/android/hardware/tv/tuner/Result.h>
22 
23 #include <fmq/AidlMessageQueue.h>
24 #include <utils/Log.h>
25 #include <thread>
26 #include "Demux.h"
27 
28 namespace aidl {
29 namespace android {
30 namespace hardware {
31 namespace tv {
32 namespace tuner {
33 
34 using ::aidl::android::hardware::common::fmq::MQDescriptor;
35 using ::aidl::android::hardware::common::fmq::SynchronizedReadWrite;
36 using ::android::AidlMessageQueue;
37 using ::android::hardware::EventFlag;
38 
39 using FilterMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
40 using AidlMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
41 using AidlMQDesc = MQDescriptor<int8_t, SynchronizedReadWrite>;
42 
43 #define WAIT_TIMEOUT 3000000000
44 
Demux(int32_t demuxId,uint32_t filterTypes)45 Demux::Demux(int32_t demuxId, uint32_t filterTypes) {
46     mDemuxId = demuxId;
47     mFilterTypes = filterTypes;
48 }
49 
setTunerService(std::shared_ptr<Tuner> tuner)50 void Demux::setTunerService(std::shared_ptr<Tuner> tuner) {
51     mTuner = tuner;
52 }
53 
~Demux()54 Demux::~Demux() {
55     ALOGV("%s", __FUNCTION__);
56     close();
57 }
58 
openDvr(DvrType in_type,int32_t in_bufferSize,const std::shared_ptr<IDvrCallback> & in_cb,std::shared_ptr<IDvr> * _aidl_return)59 ::ndk::ScopedAStatus Demux::openDvr(DvrType in_type, int32_t in_bufferSize,
60                                     const std::shared_ptr<IDvrCallback>& in_cb,
61                                     std::shared_ptr<IDvr>* _aidl_return) {
62     ALOGV("%s", __FUNCTION__);
63 
64     if (in_cb == nullptr) {
65         ALOGW("[Demux] DVR callback can't be null");
66         *_aidl_return = nullptr;
67         return ::ndk::ScopedAStatus::fromServiceSpecificError(
68                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
69     }
70 
71     set<int64_t>::iterator it;
72     switch (in_type) {
73         case DvrType::PLAYBACK:
74             mDvrPlayback = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
75                                                          this->ref<Demux>());
76             if (!mDvrPlayback->createDvrMQ()) {
77                 ALOGE("[Demux] cannot create dvr message queue");
78                 mDvrPlayback = nullptr;
79                 *_aidl_return = mDvrPlayback;
80                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
81                         static_cast<int32_t>(Result::UNKNOWN_ERROR));
82             }
83 
84             for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
85                 if (!mDvrPlayback->addPlaybackFilter(*it, mFilters[*it])) {
86                     ALOGE("[Demux] Can't get filter info for DVR playback");
87                     mDvrPlayback = nullptr;
88                     *_aidl_return = mDvrPlayback;
89                     return ::ndk::ScopedAStatus::fromServiceSpecificError(
90                             static_cast<int32_t>(Result::UNKNOWN_ERROR));
91                 }
92             }
93 
94             ALOGI("Playback normal case");
95 
96             *_aidl_return = mDvrPlayback;
97             return ::ndk::ScopedAStatus::ok();
98         case DvrType::RECORD:
99             mDvrRecord = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
100                                                        this->ref<Demux>());
101             if (!mDvrRecord->createDvrMQ()) {
102                 mDvrRecord = nullptr;
103                 *_aidl_return = mDvrRecord;
104                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
105                         static_cast<int32_t>(Result::UNKNOWN_ERROR));
106             }
107 
108             *_aidl_return = mDvrRecord;
109             return ::ndk::ScopedAStatus::ok();
110         default:
111             *_aidl_return = nullptr;
112             return ::ndk::ScopedAStatus::fromServiceSpecificError(
113                     static_cast<int32_t>(Result::INVALID_ARGUMENT));
114     }
115 }
116 
setIptvThreadRunning(bool isIptvThreadRunning)117 void Demux::setIptvThreadRunning(bool isIptvThreadRunning) {
118     std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
119     mIsIptvReadThreadRunning = isIptvThreadRunning;
120     mIsIptvThreadRunningCv.notify_all();
121 }
122 
frontendIptvInputThreadLoop(dtv_plugin * interface,dtv_streamer * streamer,void * buf)123 void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf) {
124     Timer *timer, *fullBufferTimer;
125     bool isTuneBytePushedToDvr = false;
126     while (true) {
127         std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
128         mIsIptvThreadRunningCv.wait(
129                 lock, [this] { return mIsIptvReadThreadRunning || mIsIptvReadThreadTerminated; });
130         if (mIsIptvReadThreadTerminated) {
131             ALOGI("[Demux] IPTV reading thread for playback terminated");
132             break;
133         }
134         if (mIsIptvDvrFMQFull &&
135             fullBufferTimer->get_elapsed_time_ms() > IPTV_PLAYBACK_BUFFER_TIMEOUT) {
136             ALOGE("DVR FMQ has not been flushed within timeout of %d ms",
137                   IPTV_PLAYBACK_BUFFER_TIMEOUT);
138             delete fullBufferTimer;
139             break;
140         }
141         timer = new Timer();
142         ssize_t bytes_read;
143         void* tuneByteBuffer = mFrontend->getTuneByteBuffer();
144         if (!isTuneBytePushedToDvr && tuneByteBuffer != nullptr) {
145             memcpy(buf, tuneByteBuffer, 1);
146             char* offsetBuf = (char*)buf + 1;
147             bytes_read = interface->read_stream(streamer, (void*)offsetBuf, IPTV_BUFFER_SIZE - 1,
148                                                 IPTV_PLAYBACK_TIMEOUT);
149             isTuneBytePushedToDvr = true;
150         } else {
151             bytes_read =
152                     interface->read_stream(streamer, buf, IPTV_BUFFER_SIZE, IPTV_PLAYBACK_TIMEOUT);
153         }
154 
155         if (bytes_read <= 0) {
156             double elapsed_time = timer->get_elapsed_time_ms();
157             if (elapsed_time > IPTV_PLAYBACK_TIMEOUT) {
158                 ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time,
159                       IPTV_PLAYBACK_TIMEOUT);
160             }
161             ALOGE("[Demux] Cannot read data from the socket");
162             delete timer;
163             break;
164         }
165 
166         delete timer;
167         ALOGI("Number of bytes read: %zd", bytes_read);
168         int result = mDvrPlayback->writePlaybackFMQ(buf, bytes_read);
169 
170         switch (result) {
171             case DVR_WRITE_FAILURE_REASON_FMQ_FULL:
172                 if (!mIsIptvDvrFMQFull) {
173                     mIsIptvDvrFMQFull = true;
174                     fullBufferTimer = new Timer();
175                 }
176                 ALOGI("Waiting for client to flush DVR FMQ.");
177                 break;
178             case DVR_WRITE_FAILURE_REASON_UNKNOWN:
179                 ALOGE("Failed to write data into DVR FMQ for unknown reason");
180                 break;
181             case DVR_WRITE_SUCCESS:
182                 ALOGI("Wrote %zd bytes to DVR FMQ", bytes_read);
183                 break;
184             default:
185                 ALOGI("Invalid DVR Status");
186         }
187     }
188 }
189 
setFrontendDataSource(int32_t in_frontendId)190 ::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) {
191     ALOGV("%s", __FUNCTION__);
192 
193     if (mTuner == nullptr) {
194         return ::ndk::ScopedAStatus::fromServiceSpecificError(
195                 static_cast<int32_t>(Result::NOT_INITIALIZED));
196     }
197     mFrontend = mTuner->getFrontendById(in_frontendId);
198     if (mFrontend == nullptr) {
199         return ::ndk::ScopedAStatus::fromServiceSpecificError(
200                 static_cast<int32_t>(Result::INVALID_STATE));
201     }
202 
203     mTuner->setFrontendAsDemuxSource(in_frontendId, mDemuxId);
204 
205     // if mFrontend is an IPTV frontend, create streamer to read TS data from socket
206     if (mFrontend->getFrontendType() == FrontendType::IPTV) {
207         // create a DVR instance on the demux
208         shared_ptr<IDvr> iptvDvr;
209 
210         std::shared_ptr<IDvrCallback> dvrPlaybackCallback =
211                 ::ndk::SharedRefBase::make<DvrPlaybackCallback>();
212 
213         ::ndk::ScopedAStatus status =
214                 openDvr(DvrType::PLAYBACK, IPTV_BUFFER_SIZE, dvrPlaybackCallback, &iptvDvr);
215         if (status.isOk()) {
216             ALOGI("DVR instance created");
217         }
218 
219         // get plugin interface from frontend
220         dtv_plugin* interface = mFrontend->getIptvPluginInterface();
221         // if plugin interface is not on frontend, create a new plugin interface
222         if (interface == nullptr) {
223             interface = mFrontend->createIptvPluginInterface();
224             if (interface == nullptr) {
225                 ALOGE("[   INFO   ] Failed to load plugin.");
226                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
227                         static_cast<int32_t>(Result::INVALID_STATE));
228             }
229         }
230 
231         // get transport description from frontend
232         string transport_desc = mFrontend->getIptvTransportDescription();
233         if (transport_desc.empty()) {
234             string content_url = "rtp://127.0.0.1:12345";
235             transport_desc = "{ \"uri\": \"" + content_url + "\"}";
236         }
237         ALOGI("[Demux] transport_desc: %s", transport_desc.c_str());
238 
239         // get streamer object from Frontend instance
240         dtv_streamer* streamer = mFrontend->getIptvPluginStreamer();
241         if (streamer == nullptr) {
242             streamer = mFrontend->createIptvPluginStreamer(interface, transport_desc.c_str());
243             if (streamer == nullptr) {
244                 ALOGE("[   INFO   ] Failed to open stream");
245                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
246                         static_cast<int32_t>(Result::INVALID_STATE));
247             }
248         }
249         stopIptvFrontendInput();
250         mIsIptvReadThreadTerminated = false;
251         void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
252         if (buf == nullptr) {
253             ALOGE("[Demux] Buffer allocation failed");
254             return ::ndk::ScopedAStatus::fromServiceSpecificError(
255                     static_cast<int32_t>(Result::INVALID_STATE));
256         }
257         mDemuxIptvReadThread =
258                 std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer, buf);
259     }
260     return ::ndk::ScopedAStatus::ok();
261 }
262 
openFilter(const DemuxFilterType & in_type,int32_t in_bufferSize,const std::shared_ptr<IFilterCallback> & in_cb,std::shared_ptr<IFilter> * _aidl_return)263 ::ndk::ScopedAStatus Demux::openFilter(const DemuxFilterType& in_type, int32_t in_bufferSize,
264                                        const std::shared_ptr<IFilterCallback>& in_cb,
265                                        std::shared_ptr<IFilter>* _aidl_return) {
266     ALOGV("%s", __FUNCTION__);
267 
268     int64_t filterId;
269     filterId = ++mLastUsedFilterId;
270 
271     if (in_cb == nullptr) {
272         ALOGW("[Demux] callback can't be null");
273         *_aidl_return = nullptr;
274         return ::ndk::ScopedAStatus::fromServiceSpecificError(
275                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
276     }
277 
278     std::shared_ptr<Filter> filter = ndk::SharedRefBase::make<Filter>(
279             in_type, filterId, in_bufferSize, in_cb, this->ref<Demux>());
280     if (!filter->createFilterMQ()) {
281         *_aidl_return = nullptr;
282         return ::ndk::ScopedAStatus::fromServiceSpecificError(
283                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
284     }
285 
286     mFilters[filterId] = filter;
287     if (filter->isPcrFilter()) {
288         mPcrFilterIds.insert(filterId);
289     }
290     bool result = true;
291     if (!filter->isRecordFilter()) {
292         // Only save non-record filters for now. Record filters are saved when the
293         // IDvr.attacheFilter is called.
294         mPlaybackFilterIds.insert(filterId);
295         if (mDvrPlayback != nullptr) {
296             result = mDvrPlayback->addPlaybackFilter(filterId, filter);
297         }
298     }
299 
300     if (!result) {
301         *_aidl_return = nullptr;
302         return ::ndk::ScopedAStatus::fromServiceSpecificError(
303                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
304     }
305 
306     *_aidl_return = filter;
307     return ::ndk::ScopedAStatus::ok();
308 }
309 
openTimeFilter(std::shared_ptr<ITimeFilter> * _aidl_return)310 ::ndk::ScopedAStatus Demux::openTimeFilter(std::shared_ptr<ITimeFilter>* _aidl_return) {
311     ALOGV("%s", __FUNCTION__);
312 
313     mTimeFilter = ndk::SharedRefBase::make<TimeFilter>(this->ref<Demux>());
314 
315     *_aidl_return = mTimeFilter;
316     return ::ndk::ScopedAStatus::ok();
317 }
318 
getAvSyncHwId(const std::shared_ptr<IFilter> & in_filter,int32_t * _aidl_return)319 ::ndk::ScopedAStatus Demux::getAvSyncHwId(const std::shared_ptr<IFilter>& in_filter,
320                                           int32_t* _aidl_return) {
321     ALOGV("%s", __FUNCTION__);
322 
323     int64_t id;
324     ::ndk::ScopedAStatus status;
325 
326     status = in_filter->getId64Bit(&id);
327     if (!status.isOk()) {
328         ALOGE("[Demux] Can't get filter Id.");
329         *_aidl_return = -1;
330         return ::ndk::ScopedAStatus::fromServiceSpecificError(
331                 static_cast<int32_t>(Result::INVALID_STATE));
332     }
333 
334     if (!mFilters[id]->isMediaFilter()) {
335         ALOGE("[Demux] Given filter is not a media filter.");
336         *_aidl_return = -1;
337         return ::ndk::ScopedAStatus::fromServiceSpecificError(
338                 static_cast<int32_t>(Result::INVALID_STATE));
339     }
340 
341     if (!mPcrFilterIds.empty()) {
342         // Return the lowest pcr filter id in the default implementation as the av sync id
343         *_aidl_return = *mPcrFilterIds.begin();
344         return ::ndk::ScopedAStatus::ok();
345     }
346 
347     ALOGE("[Demux] No PCR filter opened.");
348     *_aidl_return = -1;
349     return ::ndk::ScopedAStatus::fromServiceSpecificError(
350             static_cast<int32_t>(Result::INVALID_STATE));
351 }
352 
getAvSyncTime(int32_t in_avSyncHwId,int64_t * _aidl_return)353 ::ndk::ScopedAStatus Demux::getAvSyncTime(int32_t in_avSyncHwId, int64_t* _aidl_return) {
354     ALOGV("%s", __FUNCTION__);
355 
356     if (mPcrFilterIds.empty()) {
357         *_aidl_return = -1;
358         return ::ndk::ScopedAStatus::fromServiceSpecificError(
359                 static_cast<int32_t>(Result::INVALID_STATE));
360     }
361     if (in_avSyncHwId != *mPcrFilterIds.begin()) {
362         *_aidl_return = -1;
363         return ::ndk::ScopedAStatus::fromServiceSpecificError(
364                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
365     }
366 
367     *_aidl_return = -1;
368     return ::ndk::ScopedAStatus::ok();
369 }
370 
close()371 ::ndk::ScopedAStatus Demux::close() {
372     ALOGV("%s", __FUNCTION__);
373 
374     stopFrontendInput();
375     stopIptvFrontendInput();
376 
377     set<int64_t>::iterator it;
378     for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
379         mDvrPlayback->removePlaybackFilter(*it);
380     }
381     mPlaybackFilterIds.clear();
382     mRecordFilterIds.clear();
383     mFilters.clear();
384     mLastUsedFilterId = -1;
385     if (mTuner != nullptr) {
386         mTuner->removeDemux(mDemuxId);
387         mTuner = nullptr;
388     }
389 
390     return ::ndk::ScopedAStatus::ok();
391 }
392 
connectCiCam(int32_t in_ciCamId)393 ::ndk::ScopedAStatus Demux::connectCiCam(int32_t in_ciCamId) {
394     ALOGV("%s", __FUNCTION__);
395 
396     mCiCamId = in_ciCamId;
397 
398     return ::ndk::ScopedAStatus::ok();
399 }
400 
disconnectCiCam()401 ::ndk::ScopedAStatus Demux::disconnectCiCam() {
402     ALOGV("%s", __FUNCTION__);
403 
404     return ::ndk::ScopedAStatus::ok();
405 }
406 
removeFilter(int64_t filterId)407 ::ndk::ScopedAStatus Demux::removeFilter(int64_t filterId) {
408     ALOGV("%s", __FUNCTION__);
409 
410     if (mDvrPlayback != nullptr) {
411         mDvrPlayback->removePlaybackFilter(filterId);
412     }
413     mPlaybackFilterIds.erase(filterId);
414     mRecordFilterIds.erase(filterId);
415     mFilters.erase(filterId);
416 
417     return ::ndk::ScopedAStatus::ok();
418 }
419 
startBroadcastTsFilter(vector<int8_t> data)420 void Demux::startBroadcastTsFilter(vector<int8_t> data) {
421     set<int64_t>::iterator it;
422     uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
423     if (DEBUG_DEMUX) {
424         ALOGW("[Demux] start ts filter pid: %d", pid);
425     }
426     for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
427         if (pid == mFilters[*it]->getTpid()) {
428             mFilters[*it]->updateFilterOutput(data);
429         }
430     }
431 }
432 
sendFrontendInputToRecord(vector<int8_t> data)433 void Demux::sendFrontendInputToRecord(vector<int8_t> data) {
434     set<int64_t>::iterator it;
435     if (DEBUG_DEMUX) {
436         ALOGW("[Demux] update record filter output");
437     }
438     for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) {
439         mFilters[*it]->updateRecordOutput(data);
440     }
441 }
442 
sendFrontendInputToRecord(vector<int8_t> data,uint16_t pid,uint64_t pts)443 void Demux::sendFrontendInputToRecord(vector<int8_t> data, uint16_t pid, uint64_t pts) {
444     sendFrontendInputToRecord(data);
445     set<int64_t>::iterator it;
446     for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) {
447         if (pid == mFilters[*it]->getTpid()) {
448             mFilters[*it]->updatePts(pts);
449         }
450     }
451 }
452 
startBroadcastFilterDispatcher()453 bool Demux::startBroadcastFilterDispatcher() {
454     set<int64_t>::iterator it;
455 
456     // Handle the output data per filter type
457     for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
458         if (!mFilters[*it]->startFilterHandler().isOk()) {
459             return false;
460         }
461     }
462 
463     return true;
464 }
465 
startRecordFilterDispatcher()466 bool Demux::startRecordFilterDispatcher() {
467     set<int64_t>::iterator it;
468 
469     for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) {
470         if (!mFilters[*it]->startRecordFilterHandler().isOk()) {
471             return false;
472         }
473     }
474 
475     return true;
476 }
477 
startFilterHandler(int64_t filterId)478 ::ndk::ScopedAStatus Demux::startFilterHandler(int64_t filterId) {
479     return mFilters[filterId]->startFilterHandler();
480 }
481 
updateFilterOutput(int64_t filterId,vector<int8_t> data)482 void Demux::updateFilterOutput(int64_t filterId, vector<int8_t> data) {
483     mFilters[filterId]->updateFilterOutput(data);
484 }
485 
updateMediaFilterOutput(int64_t filterId,vector<int8_t> data,uint64_t pts)486 void Demux::updateMediaFilterOutput(int64_t filterId, vector<int8_t> data, uint64_t pts) {
487     updateFilterOutput(filterId, data);
488     mFilters[filterId]->updatePts(pts);
489 }
490 
getFilterTpid(int64_t filterId)491 uint16_t Demux::getFilterTpid(int64_t filterId) {
492     return mFilters[filterId]->getTpid();
493 }
494 
getDemuxId()495 int32_t Demux::getDemuxId() {
496     return mDemuxId;
497 }
498 
isInUse()499 bool Demux::isInUse() {
500     return mInUse;
501 }
502 
setInUse(bool inUse)503 void Demux::setInUse(bool inUse) {
504     mInUse = inUse;
505 }
506 
getDemuxInfo(DemuxInfo * demuxInfo)507 void Demux::getDemuxInfo(DemuxInfo* demuxInfo) {
508     *demuxInfo = {.filterTypes = mFilterTypes};
509 }
510 
startFrontendInputLoop()511 void Demux::startFrontendInputLoop() {
512     ALOGD("[Demux] start frontend on demux");
513     // Stop current Frontend thread loop first, in case the user starts a new
514     // tuning before stopping current tuning.
515     stopFrontendInput();
516     mFrontendInputThreadRunning = true;
517     mFrontendInputThread = std::thread(&Demux::frontendInputThreadLoop, this);
518 }
519 
frontendInputThreadLoop()520 void Demux::frontendInputThreadLoop() {
521     if (!mFrontendInputThreadRunning) {
522         return;
523     }
524 
525     if (!mDvrPlayback) {
526         ALOGW("[Demux] No software Frontend input configured. Ending Frontend thread loop.");
527         mFrontendInputThreadRunning = false;
528         return;
529     }
530 
531     while (mFrontendInputThreadRunning) {
532         uint32_t efState = 0;
533         ::android::status_t status = mDvrPlayback->getDvrEventFlag()->wait(
534                 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY), &efState, WAIT_TIMEOUT,
535                 true /* retry on spurious wake */);
536         if (status != ::android::OK) {
537             ALOGD("[Demux] wait for data ready on the playback FMQ");
538             continue;
539         }
540         if (mDvrPlayback->getSettings().get<DvrSettings::Tag::playback>().dataFormat ==
541             DataFormat::ES) {
542             if (!mDvrPlayback->processEsDataOnPlayback(true /*isVirtualFrontend*/, mIsRecording)) {
543                 ALOGE("[Demux] playback es data failed to be filtered. Ending thread");
544                 break;
545             }
546             continue;
547         }
548         // Our current implementation filter the data and write it into the filter FMQ immediately
549         // after the DATA_READY from the VTS/framework
550         // This is for the non-ES data source, real playback use case handling.
551         if (!mDvrPlayback->readPlaybackFMQ(true /*isVirtualFrontend*/, mIsRecording) ||
552             !mDvrPlayback->startFilterDispatcher(true /*isVirtualFrontend*/, mIsRecording)) {
553             ALOGE("[Demux] playback data failed to be filtered. Ending thread");
554             break;
555         }
556     }
557 
558     mFrontendInputThreadRunning = false;
559     ALOGW("[Demux] Frontend Input thread end.");
560 }
561 
stopFrontendInput()562 void Demux::stopFrontendInput() {
563     ALOGD("[Demux] stop frontend on demux");
564     mKeepFetchingDataFromFrontend = false;
565     mFrontendInputThreadRunning = false;
566     if (mFrontendInputThread.joinable()) {
567         mFrontendInputThread.join();
568     }
569 }
570 
stopIptvFrontendInput()571 void Demux::stopIptvFrontendInput() {
572     ALOGD("[Demux] stop iptv frontend on demux");
573     if (mDemuxIptvReadThread.joinable()) {
574         mIsIptvReadThreadTerminated = true;
575         mIsIptvThreadRunningCv.notify_all();
576         mDemuxIptvReadThread.join();
577     }
578 }
579 
setIsRecording(bool isRecording)580 void Demux::setIsRecording(bool isRecording) {
581     mIsRecording = isRecording;
582 }
583 
isRecording()584 bool Demux::isRecording() {
585     return mIsRecording;
586 }
587 
dump(int fd,const char ** args,uint32_t numArgs)588 binder_status_t Demux::dump(int fd, const char** args, uint32_t numArgs) {
589     dprintf(fd, " Demux %d:\n", mDemuxId);
590     dprintf(fd, "  mIsRecording %d\n", mIsRecording);
591     {
592         dprintf(fd, "  Filters:\n");
593         map<int64_t, std::shared_ptr<Filter>>::iterator it;
594         for (it = mFilters.begin(); it != mFilters.end(); it++) {
595             it->second->dump(fd, args, numArgs);
596         }
597     }
598     {
599         dprintf(fd, "  TimeFilter:\n");
600         if (mTimeFilter != nullptr) {
601             mTimeFilter->dump(fd, args, numArgs);
602         }
603     }
604     {
605         dprintf(fd, "  DvrPlayback:\n");
606         if (mDvrPlayback != nullptr) {
607             mDvrPlayback->dump(fd, args, numArgs);
608         }
609     }
610     {
611         dprintf(fd, "  DvrRecord:\n");
612         if (mDvrRecord != nullptr) {
613             mDvrRecord->dump(fd, args, numArgs);
614         }
615     }
616     return STATUS_OK;
617 }
618 
attachRecordFilter(int64_t filterId)619 bool Demux::attachRecordFilter(int64_t filterId) {
620     if (mFilters[filterId] == nullptr || mDvrRecord == nullptr ||
621         !mFilters[filterId]->isRecordFilter()) {
622         return false;
623     }
624 
625     mRecordFilterIds.insert(filterId);
626     mFilters[filterId]->attachFilterToRecord(mDvrRecord);
627 
628     return true;
629 }
630 
detachRecordFilter(int64_t filterId)631 bool Demux::detachRecordFilter(int64_t filterId) {
632     if (mFilters[filterId] == nullptr || mDvrRecord == nullptr) {
633         return false;
634     }
635 
636     mRecordFilterIds.erase(filterId);
637     mFilters[filterId]->detachFilterFromRecord();
638 
639     return true;
640 }
641 
642 }  // namespace tuner
643 }  // namespace tv
644 }  // namespace hardware
645 }  // namespace android
646 }  // namespace aidl
647