/* * Copyright 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "android.hardware.tv.tuner-service.example-Demux" #include #include #include #include #include #include "Demux.h" namespace aidl { namespace android { namespace hardware { namespace tv { namespace tuner { using ::aidl::android::hardware::common::fmq::MQDescriptor; using ::aidl::android::hardware::common::fmq::SynchronizedReadWrite; using ::android::AidlMessageQueue; using ::android::hardware::EventFlag; using FilterMQ = AidlMessageQueue; using AidlMQ = AidlMessageQueue; using AidlMQDesc = MQDescriptor; #define WAIT_TIMEOUT 3000000000 Demux::Demux(int32_t demuxId, uint32_t filterTypes) { mDemuxId = demuxId; mFilterTypes = filterTypes; } void Demux::setTunerService(std::shared_ptr tuner) { mTuner = tuner; } Demux::~Demux() { ALOGV("%s", __FUNCTION__); close(); } ::ndk::ScopedAStatus Demux::openDvr(DvrType in_type, int32_t in_bufferSize, const std::shared_ptr& in_cb, std::shared_ptr* _aidl_return) { ALOGV("%s", __FUNCTION__); if (in_cb == nullptr) { ALOGW("[Demux] DVR callback can't be null"); *_aidl_return = nullptr; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_ARGUMENT)); } set::iterator it; switch (in_type) { case DvrType::PLAYBACK: mDvrPlayback = ndk::SharedRefBase::make(in_type, in_bufferSize, in_cb, this->ref()); if (!mDvrPlayback->createDvrMQ()) { ALOGE("[Demux] cannot create dvr message queue"); mDvrPlayback = nullptr; *_aidl_return = mDvrPlayback; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::UNKNOWN_ERROR)); } for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) { if (!mDvrPlayback->addPlaybackFilter(*it, mFilters[*it])) { ALOGE("[Demux] Can't get filter info for DVR playback"); mDvrPlayback = nullptr; *_aidl_return = mDvrPlayback; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::UNKNOWN_ERROR)); } } ALOGI("Playback normal case"); *_aidl_return = mDvrPlayback; return ::ndk::ScopedAStatus::ok(); case DvrType::RECORD: mDvrRecord = ndk::SharedRefBase::make(in_type, in_bufferSize, in_cb, this->ref()); if (!mDvrRecord->createDvrMQ()) { mDvrRecord = nullptr; *_aidl_return = mDvrRecord; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::UNKNOWN_ERROR)); } *_aidl_return = mDvrRecord; return ::ndk::ScopedAStatus::ok(); default: *_aidl_return = nullptr; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_ARGUMENT)); } } void Demux::setIptvThreadRunning(bool isIptvThreadRunning) { std::unique_lock lock(mIsIptvThreadRunningMutex); mIsIptvReadThreadRunning = isIptvThreadRunning; mIsIptvThreadRunningCv.notify_all(); } void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf) { Timer *timer, *fullBufferTimer; bool isTuneBytePushedToDvr = false; while (true) { std::unique_lock lock(mIsIptvThreadRunningMutex); mIsIptvThreadRunningCv.wait( lock, [this] { return mIsIptvReadThreadRunning || mIsIptvReadThreadTerminated; }); if (mIsIptvReadThreadTerminated) { ALOGI("[Demux] IPTV reading thread for playback terminated"); break; } if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > IPTV_PLAYBACK_BUFFER_TIMEOUT) { ALOGE("DVR FMQ has not been flushed within timeout of %d ms", IPTV_PLAYBACK_BUFFER_TIMEOUT); delete fullBufferTimer; break; } timer = new Timer(); ssize_t bytes_read; void* tuneByteBuffer = mFrontend->getTuneByteBuffer(); if (!isTuneBytePushedToDvr && tuneByteBuffer != nullptr) { memcpy(buf, tuneByteBuffer, 1); char* offsetBuf = (char*)buf + 1; bytes_read = interface->read_stream(streamer, (void*)offsetBuf, IPTV_BUFFER_SIZE - 1, IPTV_PLAYBACK_TIMEOUT); isTuneBytePushedToDvr = true; } else { bytes_read = interface->read_stream(streamer, buf, IPTV_BUFFER_SIZE, IPTV_PLAYBACK_TIMEOUT); } if (bytes_read <= 0) { double elapsed_time = timer->get_elapsed_time_ms(); if (elapsed_time > IPTV_PLAYBACK_TIMEOUT) { ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time, IPTV_PLAYBACK_TIMEOUT); } ALOGE("[Demux] Cannot read data from the socket"); delete timer; break; } delete timer; ALOGI("Number of bytes read: %zd", bytes_read); int result = mDvrPlayback->writePlaybackFMQ(buf, bytes_read); switch (result) { case DVR_WRITE_FAILURE_REASON_FMQ_FULL: if (!mIsIptvDvrFMQFull) { mIsIptvDvrFMQFull = true; fullBufferTimer = new Timer(); } ALOGI("Waiting for client to flush DVR FMQ."); break; case DVR_WRITE_FAILURE_REASON_UNKNOWN: ALOGE("Failed to write data into DVR FMQ for unknown reason"); break; case DVR_WRITE_SUCCESS: ALOGI("Wrote %zd bytes to DVR FMQ", bytes_read); break; default: ALOGI("Invalid DVR Status"); } } } ::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) { ALOGV("%s", __FUNCTION__); if (mTuner == nullptr) { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::NOT_INITIALIZED)); } mFrontend = mTuner->getFrontendById(in_frontendId); if (mFrontend == nullptr) { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } mTuner->setFrontendAsDemuxSource(in_frontendId, mDemuxId); // if mFrontend is an IPTV frontend, create streamer to read TS data from socket if (mFrontend->getFrontendType() == FrontendType::IPTV) { // create a DVR instance on the demux shared_ptr iptvDvr; std::shared_ptr dvrPlaybackCallback = ::ndk::SharedRefBase::make(); ::ndk::ScopedAStatus status = openDvr(DvrType::PLAYBACK, IPTV_BUFFER_SIZE, dvrPlaybackCallback, &iptvDvr); if (status.isOk()) { ALOGI("DVR instance created"); } // get plugin interface from frontend dtv_plugin* interface = mFrontend->getIptvPluginInterface(); // if plugin interface is not on frontend, create a new plugin interface if (interface == nullptr) { interface = mFrontend->createIptvPluginInterface(); if (interface == nullptr) { ALOGE("[ INFO ] Failed to load plugin."); return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } } // get transport description from frontend string transport_desc = mFrontend->getIptvTransportDescription(); if (transport_desc.empty()) { string content_url = "rtp://127.0.0.1:12345"; transport_desc = "{ \"uri\": \"" + content_url + "\"}"; } ALOGI("[Demux] transport_desc: %s", transport_desc.c_str()); // get streamer object from Frontend instance dtv_streamer* streamer = mFrontend->getIptvPluginStreamer(); if (streamer == nullptr) { streamer = mFrontend->createIptvPluginStreamer(interface, transport_desc.c_str()); if (streamer == nullptr) { ALOGE("[ INFO ] Failed to open stream"); return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } } stopIptvFrontendInput(); mIsIptvReadThreadTerminated = false; void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); if (buf == nullptr) { ALOGE("[Demux] Buffer allocation failed"); return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } mDemuxIptvReadThread = std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer, buf); } return ::ndk::ScopedAStatus::ok(); } ::ndk::ScopedAStatus Demux::openFilter(const DemuxFilterType& in_type, int32_t in_bufferSize, const std::shared_ptr& in_cb, std::shared_ptr* _aidl_return) { ALOGV("%s", __FUNCTION__); int64_t filterId; filterId = ++mLastUsedFilterId; if (in_cb == nullptr) { ALOGW("[Demux] callback can't be null"); *_aidl_return = nullptr; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_ARGUMENT)); } std::shared_ptr filter = ndk::SharedRefBase::make( in_type, filterId, in_bufferSize, in_cb, this->ref()); if (!filter->createFilterMQ()) { *_aidl_return = nullptr; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::UNKNOWN_ERROR)); } mFilters[filterId] = filter; if (filter->isPcrFilter()) { mPcrFilterIds.insert(filterId); } bool result = true; if (!filter->isRecordFilter()) { // Only save non-record filters for now. Record filters are saved when the // IDvr.attacheFilter is called. mPlaybackFilterIds.insert(filterId); if (mDvrPlayback != nullptr) { result = mDvrPlayback->addPlaybackFilter(filterId, filter); } } if (!result) { *_aidl_return = nullptr; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_ARGUMENT)); } *_aidl_return = filter; return ::ndk::ScopedAStatus::ok(); } ::ndk::ScopedAStatus Demux::openTimeFilter(std::shared_ptr* _aidl_return) { ALOGV("%s", __FUNCTION__); mTimeFilter = ndk::SharedRefBase::make(this->ref()); *_aidl_return = mTimeFilter; return ::ndk::ScopedAStatus::ok(); } ::ndk::ScopedAStatus Demux::getAvSyncHwId(const std::shared_ptr& in_filter, int32_t* _aidl_return) { ALOGV("%s", __FUNCTION__); int64_t id; ::ndk::ScopedAStatus status; status = in_filter->getId64Bit(&id); if (!status.isOk()) { ALOGE("[Demux] Can't get filter Id."); *_aidl_return = -1; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } if (!mFilters[id]->isMediaFilter()) { ALOGE("[Demux] Given filter is not a media filter."); *_aidl_return = -1; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } if (!mPcrFilterIds.empty()) { // Return the lowest pcr filter id in the default implementation as the av sync id *_aidl_return = *mPcrFilterIds.begin(); return ::ndk::ScopedAStatus::ok(); } ALOGE("[Demux] No PCR filter opened."); *_aidl_return = -1; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } ::ndk::ScopedAStatus Demux::getAvSyncTime(int32_t in_avSyncHwId, int64_t* _aidl_return) { ALOGV("%s", __FUNCTION__); if (mPcrFilterIds.empty()) { *_aidl_return = -1; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_STATE)); } if (in_avSyncHwId != *mPcrFilterIds.begin()) { *_aidl_return = -1; return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast(Result::INVALID_ARGUMENT)); } *_aidl_return = -1; return ::ndk::ScopedAStatus::ok(); } ::ndk::ScopedAStatus Demux::close() { ALOGV("%s", __FUNCTION__); stopFrontendInput(); stopIptvFrontendInput(); set::iterator it; for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) { mDvrPlayback->removePlaybackFilter(*it); } mPlaybackFilterIds.clear(); mRecordFilterIds.clear(); mFilters.clear(); mLastUsedFilterId = -1; if (mTuner != nullptr) { mTuner->removeDemux(mDemuxId); mTuner = nullptr; } return ::ndk::ScopedAStatus::ok(); } ::ndk::ScopedAStatus Demux::connectCiCam(int32_t in_ciCamId) { ALOGV("%s", __FUNCTION__); mCiCamId = in_ciCamId; return ::ndk::ScopedAStatus::ok(); } ::ndk::ScopedAStatus Demux::disconnectCiCam() { ALOGV("%s", __FUNCTION__); return ::ndk::ScopedAStatus::ok(); } ::ndk::ScopedAStatus Demux::removeFilter(int64_t filterId) { ALOGV("%s", __FUNCTION__); if (mDvrPlayback != nullptr) { mDvrPlayback->removePlaybackFilter(filterId); } mPlaybackFilterIds.erase(filterId); mRecordFilterIds.erase(filterId); mFilters.erase(filterId); return ::ndk::ScopedAStatus::ok(); } void Demux::startBroadcastTsFilter(vector data) { set::iterator it; uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff)); if (DEBUG_DEMUX) { ALOGW("[Demux] start ts filter pid: %d", pid); } for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) { if (pid == mFilters[*it]->getTpid()) { mFilters[*it]->updateFilterOutput(data); } } } void Demux::sendFrontendInputToRecord(vector data) { set::iterator it; if (DEBUG_DEMUX) { ALOGW("[Demux] update record filter output"); } for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) { mFilters[*it]->updateRecordOutput(data); } } void Demux::sendFrontendInputToRecord(vector data, uint16_t pid, uint64_t pts) { sendFrontendInputToRecord(data); set::iterator it; for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) { if (pid == mFilters[*it]->getTpid()) { mFilters[*it]->updatePts(pts); } } } bool Demux::startBroadcastFilterDispatcher() { set::iterator it; // Handle the output data per filter type for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) { if (!mFilters[*it]->startFilterHandler().isOk()) { return false; } } return true; } bool Demux::startRecordFilterDispatcher() { set::iterator it; for (it = mRecordFilterIds.begin(); it != mRecordFilterIds.end(); it++) { if (!mFilters[*it]->startRecordFilterHandler().isOk()) { return false; } } return true; } ::ndk::ScopedAStatus Demux::startFilterHandler(int64_t filterId) { return mFilters[filterId]->startFilterHandler(); } void Demux::updateFilterOutput(int64_t filterId, vector data) { mFilters[filterId]->updateFilterOutput(data); } void Demux::updateMediaFilterOutput(int64_t filterId, vector data, uint64_t pts) { updateFilterOutput(filterId, data); mFilters[filterId]->updatePts(pts); } uint16_t Demux::getFilterTpid(int64_t filterId) { return mFilters[filterId]->getTpid(); } int32_t Demux::getDemuxId() { return mDemuxId; } bool Demux::isInUse() { return mInUse; } void Demux::setInUse(bool inUse) { mInUse = inUse; } void Demux::getDemuxInfo(DemuxInfo* demuxInfo) { *demuxInfo = {.filterTypes = mFilterTypes}; } void Demux::startFrontendInputLoop() { ALOGD("[Demux] start frontend on demux"); // Stop current Frontend thread loop first, in case the user starts a new // tuning before stopping current tuning. stopFrontendInput(); mFrontendInputThreadRunning = true; mFrontendInputThread = std::thread(&Demux::frontendInputThreadLoop, this); } void Demux::frontendInputThreadLoop() { if (!mFrontendInputThreadRunning) { return; } if (!mDvrPlayback) { ALOGW("[Demux] No software Frontend input configured. Ending Frontend thread loop."); mFrontendInputThreadRunning = false; return; } while (mFrontendInputThreadRunning) { uint32_t efState = 0; ::android::status_t status = mDvrPlayback->getDvrEventFlag()->wait( static_cast(DemuxQueueNotifyBits::DATA_READY), &efState, WAIT_TIMEOUT, true /* retry on spurious wake */); if (status != ::android::OK) { ALOGD("[Demux] wait for data ready on the playback FMQ"); continue; } if (mDvrPlayback->getSettings().get().dataFormat == DataFormat::ES) { if (!mDvrPlayback->processEsDataOnPlayback(true /*isVirtualFrontend*/, mIsRecording)) { ALOGE("[Demux] playback es data failed to be filtered. Ending thread"); break; } continue; } // Our current implementation filter the data and write it into the filter FMQ immediately // after the DATA_READY from the VTS/framework // This is for the non-ES data source, real playback use case handling. if (!mDvrPlayback->readPlaybackFMQ(true /*isVirtualFrontend*/, mIsRecording) || !mDvrPlayback->startFilterDispatcher(true /*isVirtualFrontend*/, mIsRecording)) { ALOGE("[Demux] playback data failed to be filtered. Ending thread"); break; } } mFrontendInputThreadRunning = false; ALOGW("[Demux] Frontend Input thread end."); } void Demux::stopFrontendInput() { ALOGD("[Demux] stop frontend on demux"); mKeepFetchingDataFromFrontend = false; mFrontendInputThreadRunning = false; if (mFrontendInputThread.joinable()) { mFrontendInputThread.join(); } } void Demux::stopIptvFrontendInput() { ALOGD("[Demux] stop iptv frontend on demux"); if (mDemuxIptvReadThread.joinable()) { mIsIptvReadThreadTerminated = true; mIsIptvThreadRunningCv.notify_all(); mDemuxIptvReadThread.join(); } } void Demux::setIsRecording(bool isRecording) { mIsRecording = isRecording; } bool Demux::isRecording() { return mIsRecording; } binder_status_t Demux::dump(int fd, const char** args, uint32_t numArgs) { dprintf(fd, " Demux %d:\n", mDemuxId); dprintf(fd, " mIsRecording %d\n", mIsRecording); { dprintf(fd, " Filters:\n"); map>::iterator it; for (it = mFilters.begin(); it != mFilters.end(); it++) { it->second->dump(fd, args, numArgs); } } { dprintf(fd, " TimeFilter:\n"); if (mTimeFilter != nullptr) { mTimeFilter->dump(fd, args, numArgs); } } { dprintf(fd, " DvrPlayback:\n"); if (mDvrPlayback != nullptr) { mDvrPlayback->dump(fd, args, numArgs); } } { dprintf(fd, " DvrRecord:\n"); if (mDvrRecord != nullptr) { mDvrRecord->dump(fd, args, numArgs); } } return STATUS_OK; } bool Demux::attachRecordFilter(int64_t filterId) { if (mFilters[filterId] == nullptr || mDvrRecord == nullptr || !mFilters[filterId]->isRecordFilter()) { return false; } mRecordFilterIds.insert(filterId); mFilters[filterId]->attachFilterToRecord(mDvrRecord); return true; } bool Demux::detachRecordFilter(int64_t filterId) { if (mFilters[filterId] == nullptr || mDvrRecord == nullptr) { return false; } mRecordFilterIds.erase(filterId); mFilters[filterId]->detachFilterFromRecord(); return true; } } // namespace tuner } // namespace tv } // namespace hardware } // namespace android } // namespace aidl