1 /*
2  * Copyright 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "android.hardware.tv.tuner-service.example-Filter"
19 
20 #include <BufferAllocator/BufferAllocator.h>
21 #include <aidl/android/hardware/tv/tuner/DemuxFilterMonitorEventType.h>
22 #include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
23 #include <aidl/android/hardware/tv/tuner/Result.h>
24 #include <aidlcommonsupport/NativeHandle.h>
25 #include <inttypes.h>
26 #include <utils/Log.h>
27 
28 #include "Filter.h"
29 
30 namespace aidl {
31 namespace android {
32 namespace hardware {
33 namespace tv {
34 namespace tuner {
35 
36 #define WAIT_TIMEOUT 3000000000
37 
FilterCallbackScheduler(const std::shared_ptr<IFilterCallback> & cb)38 FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb)
39     : mCallback(cb),
40       mIsConditionMet(false),
41       mDataLength(0),
42       mTimeDelayInMs(0),
43       mDataSizeDelayInBytes(0) {
44     start();
45 }
46 
~FilterCallbackScheduler()47 FilterCallbackScheduler::~FilterCallbackScheduler() {
48     stop();
49 }
50 
onFilterEvent(DemuxFilterEvent && event)51 void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) {
52     std::unique_lock<std::mutex> lock(mLock);
53     mCallbackBuffer.push_back(std::move(event));
54     mDataLength += getDemuxFilterEventDataLength(event);
55 
56     if (isDataSizeDelayConditionMetLocked()) {
57         mIsConditionMet = true;
58         // unlock, so thread is not immediately blocked when it is notified.
59         lock.unlock();
60         mCv.notify_all();
61     }
62 }
63 
onFilterStatus(const DemuxFilterStatus & status)64 void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) {
65     if (mCallback) {
66         mCallback->onFilterStatus(status);
67     }
68 }
69 
flushEvents()70 void FilterCallbackScheduler::flushEvents() {
71     std::unique_lock<std::mutex> lock(mLock);
72     mCallbackBuffer.clear();
73     mDataLength = 0;
74 }
75 
setTimeDelayHint(int timeDelay)76 void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) {
77     std::unique_lock<std::mutex> lock(mLock);
78     mTimeDelayInMs = timeDelay;
79     // always notify condition variable to update timeout
80     mIsConditionMet = true;
81     lock.unlock();
82     mCv.notify_all();
83 }
84 
setDataSizeDelayHint(int dataSizeDelay)85 void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) {
86     std::unique_lock<std::mutex> lock(mLock);
87     mDataSizeDelayInBytes = dataSizeDelay;
88     if (isDataSizeDelayConditionMetLocked()) {
89         mIsConditionMet = true;
90         lock.unlock();
91         mCv.notify_all();
92     }
93 }
94 
hasCallbackRegistered() const95 bool FilterCallbackScheduler::hasCallbackRegistered() const {
96     return mCallback != nullptr;
97 }
98 
start()99 void FilterCallbackScheduler::start() {
100     mIsRunning = true;
101     mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this);
102 }
103 
stop()104 void FilterCallbackScheduler::stop() {
105     mIsRunning = false;
106     if (mCallbackThread.joinable()) {
107         {
108             std::lock_guard<std::mutex> lock(mLock);
109             mIsConditionMet = true;
110         }
111         mCv.notify_all();
112         mCallbackThread.join();
113     }
114 }
115 
threadLoop()116 void FilterCallbackScheduler::threadLoop() {
117     while (mIsRunning) {
118         threadLoopOnce();
119     }
120 }
121 
threadLoopOnce()122 void FilterCallbackScheduler::threadLoopOnce() {
123     std::unique_lock<std::mutex> lock(mLock);
124     if (mTimeDelayInMs > 0) {
125         // Note: predicate protects from lost and spurious wakeups
126         mCv.wait_for(lock, std::chrono::milliseconds(mTimeDelayInMs),
127                      [this] { return mIsConditionMet; });
128     } else {
129         // Note: predicate protects from lost and spurious wakeups
130         mCv.wait(lock, [this] { return mIsConditionMet; });
131     }
132     mIsConditionMet = false;
133 
134     // condition_variable wait locks mutex on timeout / notify
135     // Note: if stop() has been called in the meantime, do not send more filter
136     // events.
137     if (mIsRunning && !mCallbackBuffer.empty()) {
138         if (mCallback) {
139             mCallback->onFilterEvent(mCallbackBuffer);
140         }
141         mCallbackBuffer.clear();
142         mDataLength = 0;
143     }
144 }
145 
146 // mLock needs to be held to call this function
isDataSizeDelayConditionMetLocked()147 bool FilterCallbackScheduler::isDataSizeDelayConditionMetLocked() {
148     if (mDataSizeDelayInBytes == 0) {
149         // Data size delay is disabled.
150         if (mTimeDelayInMs == 0) {
151             // Events should only be sent immediately if time delay is disabled
152             // as well.
153             return true;
154         }
155         return false;
156     }
157 
158     // Data size delay is enabled.
159     return mDataLength >= mDataSizeDelayInBytes;
160 }
161 
getDemuxFilterEventDataLength(const DemuxFilterEvent & event)162 int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) {
163     // there is a risk that dataLength could be a negative value, but it
164     // *should* be safe to assume that it is always positive.
165     switch (event.getTag()) {
166         case DemuxFilterEvent::Tag::section:
167             return event.get<DemuxFilterEvent::Tag::section>().dataLength;
168         case DemuxFilterEvent::Tag::media:
169             return event.get<DemuxFilterEvent::Tag::media>().dataLength;
170         case DemuxFilterEvent::Tag::pes:
171             return event.get<DemuxFilterEvent::Tag::pes>().dataLength;
172         case DemuxFilterEvent::Tag::download:
173             return event.get<DemuxFilterEvent::Tag::download>().dataLength;
174         case DemuxFilterEvent::Tag::ipPayload:
175             return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength;
176 
177         case DemuxFilterEvent::Tag::tsRecord:
178         case DemuxFilterEvent::Tag::mmtpRecord:
179         case DemuxFilterEvent::Tag::temi:
180         case DemuxFilterEvent::Tag::monitorEvent:
181         case DemuxFilterEvent::Tag::startId:
182             // these events do not include a payload and should therefore return
183             // 0.
184             // do not add a default option, so this will not compile when new types
185             // are added.
186             return 0;
187     }
188 }
189 
Filter(DemuxFilterType type,int64_t filterId,uint32_t bufferSize,const std::shared_ptr<IFilterCallback> & cb,std::shared_ptr<Demux> demux)190 Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
191                const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux)
192     : mDemux(demux),
193       mCallbackScheduler(cb),
194       mFilterId(filterId),
195       mBufferSize(bufferSize),
196       mType(type) {
197     switch (mType.mainType) {
198         case DemuxFilterMainType::TS:
199             if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
200                         DemuxTsFilterType::AUDIO ||
201                 mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
202                         DemuxTsFilterType::VIDEO) {
203                 mIsMediaFilter = true;
204             }
205             if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
206                 DemuxTsFilterType::PCR) {
207                 mIsPcrFilter = true;
208             }
209             if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
210                 DemuxTsFilterType::RECORD) {
211                 mIsRecordFilter = true;
212             }
213             break;
214         case DemuxFilterMainType::MMTP:
215             if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
216                         DemuxMmtpFilterType::AUDIO ||
217                 mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
218                         DemuxMmtpFilterType::VIDEO) {
219                 mIsMediaFilter = true;
220             }
221             if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
222                 DemuxMmtpFilterType::RECORD) {
223                 mIsRecordFilter = true;
224             }
225             break;
226         case DemuxFilterMainType::IP:
227             break;
228         case DemuxFilterMainType::TLV:
229             break;
230         case DemuxFilterMainType::ALP:
231             break;
232         default:
233             break;
234     }
235 }
236 
~Filter()237 Filter::~Filter() {
238     close();
239 }
240 
getId64Bit(int64_t * _aidl_return)241 ::ndk::ScopedAStatus Filter::getId64Bit(int64_t* _aidl_return) {
242     ALOGV("%s", __FUNCTION__);
243 
244     *_aidl_return = mFilterId;
245     return ::ndk::ScopedAStatus::ok();
246 }
247 
getId(int32_t * _aidl_return)248 ::ndk::ScopedAStatus Filter::getId(int32_t* _aidl_return) {
249     ALOGV("%s", __FUNCTION__);
250 
251     *_aidl_return = static_cast<int32_t>(mFilterId);
252     return ::ndk::ScopedAStatus::ok();
253 }
254 
setDataSource(const std::shared_ptr<IFilter> & in_filter)255 ::ndk::ScopedAStatus Filter::setDataSource(const std::shared_ptr<IFilter>& in_filter) {
256     ALOGV("%s", __FUNCTION__);
257 
258     mDataSource = in_filter;
259     mIsDataSourceDemux = false;
260 
261     return ::ndk::ScopedAStatus::ok();
262 }
263 
setDelayHint(const FilterDelayHint & in_hint)264 ::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) {
265     if (mIsMediaFilter) {
266         // delay hint is not supported for media filters
267         return ::ndk::ScopedAStatus::fromServiceSpecificError(
268                 static_cast<int32_t>(Result::UNAVAILABLE));
269     }
270 
271     ALOGV("%s", __FUNCTION__);
272     if (in_hint.hintValue < 0) {
273         return ::ndk::ScopedAStatus::fromServiceSpecificError(
274                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
275     }
276 
277     switch (in_hint.hintType) {
278         case FilterDelayHintType::TIME_DELAY_IN_MS:
279             mCallbackScheduler.setTimeDelayHint(in_hint.hintValue);
280             break;
281         case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES:
282             mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue);
283             break;
284         default:
285             return ::ndk::ScopedAStatus::fromServiceSpecificError(
286                     static_cast<int32_t>(Result::INVALID_ARGUMENT));
287     }
288 
289     return ::ndk::ScopedAStatus::ok();
290 }
291 
getQueueDesc(MQDescriptor<int8_t,SynchronizedReadWrite> * out_queue)292 ::ndk::ScopedAStatus Filter::getQueueDesc(MQDescriptor<int8_t, SynchronizedReadWrite>* out_queue) {
293     ALOGV("%s", __FUNCTION__);
294 
295     mIsUsingFMQ = mIsRecordFilter ? false : true;
296 
297     *out_queue = mFilterMQ->dupeDesc();
298     return ::ndk::ScopedAStatus::ok();
299 }
300 
configure(const DemuxFilterSettings & in_settings)301 ::ndk::ScopedAStatus Filter::configure(const DemuxFilterSettings& in_settings) {
302     ALOGV("%s", __FUNCTION__);
303 
304     mFilterSettings = in_settings;
305     switch (mType.mainType) {
306         case DemuxFilterMainType::TS:
307             mTpid = in_settings.get<DemuxFilterSettings::Tag::ts>().tpid;
308             break;
309         case DemuxFilterMainType::MMTP:
310             break;
311         case DemuxFilterMainType::IP:
312             break;
313         case DemuxFilterMainType::TLV:
314             break;
315         case DemuxFilterMainType::ALP:
316             break;
317         default:
318             break;
319     }
320 
321     mConfigured = true;
322     return ::ndk::ScopedAStatus::ok();
323 }
324 
start()325 ::ndk::ScopedAStatus Filter::start() {
326     ALOGV("%s", __FUNCTION__);
327     mFilterThreadRunning = true;
328     std::vector<DemuxFilterEvent> events;
329 
330     mFilterCount += 1;
331     mDemux->setIptvThreadRunning(true);
332 
333     // All the filter event callbacks in start are for testing purpose.
334     switch (mType.mainType) {
335         case DemuxFilterMainType::TS:
336             createMediaEvent(events, false);
337             createMediaEvent(events, true);
338             createTsRecordEvent(events);
339             createTemiEvent(events);
340             break;
341         case DemuxFilterMainType::MMTP:
342             createDownloadEvent(events);
343             createMmtpRecordEvent(events);
344             break;
345         case DemuxFilterMainType::IP:
346             createSectionEvent(events);
347             createIpPayloadEvent(events);
348             break;
349         case DemuxFilterMainType::TLV:
350             createMonitorEvent(events);
351             break;
352         case DemuxFilterMainType::ALP:
353             createMonitorEvent(events);
354             break;
355         default:
356             break;
357     }
358 
359     for (auto&& event : events) {
360         mCallbackScheduler.onFilterEvent(std::move(event));
361     }
362 
363     return startFilterLoop();
364 }
365 
stop()366 ::ndk::ScopedAStatus Filter::stop() {
367     ALOGV("%s", __FUNCTION__);
368 
369     if (mFilterCount > 0) {
370         mFilterCount -= 1;
371         if (mFilterCount.load() == 0) {
372             mDemux->setIptvThreadRunning(false);
373         }
374     }
375 
376     mFilterThreadRunning = false;
377     if (mFilterThread.joinable()) {
378         mFilterThread.join();
379     }
380 
381     mCallbackScheduler.flushEvents();
382 
383     return ::ndk::ScopedAStatus::ok();
384 }
385 
flush()386 ::ndk::ScopedAStatus Filter::flush() {
387     ALOGV("%s", __FUNCTION__);
388 
389     // temp implementation to flush the FMQ
390     int size = mFilterMQ->availableToRead();
391     int8_t* buffer = new int8_t[size];
392     mFilterMQ->read(buffer, size);
393     delete[] buffer;
394     mFilterStatus = DemuxFilterStatus::DATA_READY;
395 
396     return ::ndk::ScopedAStatus::ok();
397 }
398 
releaseAvHandle(const NativeHandle & in_avMemory,int64_t in_avDataId)399 ::ndk::ScopedAStatus Filter::releaseAvHandle(const NativeHandle& in_avMemory, int64_t in_avDataId) {
400     ALOGV("%s", __FUNCTION__);
401 
402     if ((mSharedAvMemHandle != nullptr) && (in_avMemory.fds.size() > 0) &&
403         (sameFile(in_avMemory.fds[0].get(), mSharedAvMemHandle->data[0]))) {
404         freeSharedAvHandle();
405         return ::ndk::ScopedAStatus::ok();
406     }
407 
408     if (mDataId2Avfd.find(in_avDataId) == mDataId2Avfd.end()) {
409         return ::ndk::ScopedAStatus::fromServiceSpecificError(
410                 static_cast<int32_t>(Result::INVALID_ARGUMENT));
411     }
412 
413     ::close(mDataId2Avfd[in_avDataId]);
414     return ::ndk::ScopedAStatus::ok();
415 }
416 
close()417 ::ndk::ScopedAStatus Filter::close() {
418     ALOGV("%s", __FUNCTION__);
419 
420     stop();
421 
422     return mDemux->removeFilter(mFilterId);
423 }
424 
configureIpCid(int32_t in_ipCid)425 ::ndk::ScopedAStatus Filter::configureIpCid(int32_t in_ipCid) {
426     ALOGV("%s", __FUNCTION__);
427 
428     if (mType.mainType != DemuxFilterMainType::IP) {
429         return ::ndk::ScopedAStatus::fromServiceSpecificError(
430                 static_cast<int32_t>(Result::INVALID_STATE));
431     }
432 
433     mCid = in_ipCid;
434     return ::ndk::ScopedAStatus::ok();
435 }
436 
getAvSharedHandle(NativeHandle * out_avMemory,int64_t * _aidl_return)437 ::ndk::ScopedAStatus Filter::getAvSharedHandle(NativeHandle* out_avMemory, int64_t* _aidl_return) {
438     ALOGV("%s", __FUNCTION__);
439 
440     if (!mIsMediaFilter) {
441         return ::ndk::ScopedAStatus::fromServiceSpecificError(
442                 static_cast<int32_t>(Result::INVALID_STATE));
443     }
444 
445     if (mSharedAvMemHandle != nullptr) {
446         *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
447         *_aidl_return = BUFFER_SIZE;
448         mUsingSharedAvMem = true;
449         return ::ndk::ScopedAStatus::ok();
450     }
451 
452     int av_fd = createAvIonFd(BUFFER_SIZE);
453     if (av_fd < 0) {
454         return ::ndk::ScopedAStatus::fromServiceSpecificError(
455                 static_cast<int32_t>(Result::OUT_OF_MEMORY));
456     }
457 
458     mSharedAvMemHandle = createNativeHandle(av_fd);
459     if (mSharedAvMemHandle == nullptr) {
460         ::close(av_fd);
461         *_aidl_return = 0;
462         return ::ndk::ScopedAStatus::fromServiceSpecificError(
463                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
464     }
465     ::close(av_fd);
466     mUsingSharedAvMem = true;
467 
468     *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
469     *_aidl_return = BUFFER_SIZE;
470     return ::ndk::ScopedAStatus::ok();
471 }
472 
configureAvStreamType(const AvStreamType & in_avStreamType)473 ::ndk::ScopedAStatus Filter::configureAvStreamType(const AvStreamType& in_avStreamType) {
474     ALOGV("%s", __FUNCTION__);
475 
476     if (!mIsMediaFilter) {
477         return ::ndk::ScopedAStatus::fromServiceSpecificError(
478                 static_cast<int32_t>(Result::UNAVAILABLE));
479     }
480 
481     switch (in_avStreamType.getTag()) {
482         case AvStreamType::Tag::audio:
483             mAudioStreamType =
484                     static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::audio>());
485             break;
486         case AvStreamType::Tag::video:
487             mVideoStreamType =
488                     static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::video>());
489             break;
490         default:
491             break;
492     }
493 
494     return ::ndk::ScopedAStatus::ok();
495 }
496 
configureMonitorEvent(int in_monitorEventTypes)497 ::ndk::ScopedAStatus Filter::configureMonitorEvent(int in_monitorEventTypes) {
498     ALOGV("%s", __FUNCTION__);
499 
500     int32_t newScramblingStatus =
501             in_monitorEventTypes &
502             static_cast<int32_t>(DemuxFilterMonitorEventType::SCRAMBLING_STATUS);
503     int32_t newIpCid =
504             in_monitorEventTypes & static_cast<int32_t>(DemuxFilterMonitorEventType::IP_CID_CHANGE);
505 
506     // if scrambling status monitoring flipped, record the new state and send msg on enabling
507     if (newScramblingStatus ^ mScramblingStatusMonitored) {
508         mScramblingStatusMonitored = newScramblingStatus;
509         if (mScramblingStatusMonitored) {
510             if (mCallbackScheduler.hasCallbackRegistered()) {
511                 // Assuming current status is always NOT_SCRAMBLED
512                 auto monitorEvent = DemuxFilterMonitorEvent::make<
513                         DemuxFilterMonitorEvent::Tag::scramblingStatus>(
514                         ScramblingStatus::NOT_SCRAMBLED);
515                 auto event =
516                         DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
517                 mCallbackScheduler.onFilterEvent(std::move(event));
518             } else {
519                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
520                         static_cast<int32_t>(Result::INVALID_STATE));
521             }
522         }
523     }
524 
525     // if ip cid monitoring flipped, record the new state and send msg on enabling
526     if (newIpCid ^ mIpCidMonitored) {
527         mIpCidMonitored = newIpCid;
528         if (mIpCidMonitored) {
529             if (mCallbackScheduler.hasCallbackRegistered()) {
530                 // Return random cid
531                 auto monitorEvent =
532                         DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1);
533                 auto event =
534                         DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
535                 mCallbackScheduler.onFilterEvent(std::move(event));
536             } else {
537                 return ::ndk::ScopedAStatus::fromServiceSpecificError(
538                         static_cast<int32_t>(Result::INVALID_STATE));
539             }
540         }
541     }
542 
543     return ::ndk::ScopedAStatus::ok();
544 }
545 
createFilterMQ()546 bool Filter::createFilterMQ() {
547     ALOGV("%s", __FUNCTION__);
548 
549     // Create a synchronized FMQ that supports blocking read/write
550     std::unique_ptr<FilterMQ> tmpFilterMQ =
551             std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(mBufferSize, true));
552     if (!tmpFilterMQ->isValid()) {
553         ALOGW("[Filter] Failed to create FMQ of filter with id: %" PRIu64, mFilterId);
554         return false;
555     }
556 
557     mFilterMQ = std::move(tmpFilterMQ);
558 
559     if (EventFlag::createEventFlag(mFilterMQ->getEventFlagWord(), &mFilterEventsFlag) !=
560         ::android::OK) {
561         return false;
562     }
563 
564     return true;
565 }
566 
startFilterLoop()567 ::ndk::ScopedAStatus Filter::startFilterLoop() {
568     mFilterThread = std::thread(&Filter::filterThreadLoop, this);
569     return ::ndk::ScopedAStatus::ok();
570 }
571 
filterThreadLoop()572 void Filter::filterThreadLoop() {
573     if (!mFilterThreadRunning) {
574         return;
575     }
576 
577     ALOGD("[Filter] filter %" PRIu64 " threadLoop start.", mFilterId);
578 
579     ALOGI("IPTV DVR Playback status on Filter: %d", mIptvDvrPlaybackStatus);
580 
581     // For the first time of filter output, implementation needs to send the filter
582     // Event Callback without waiting for the DATA_CONSUMED to init the process.
583     while (mFilterThreadRunning) {
584         std::unique_lock<std::mutex> lock(mFilterEventsLock);
585         if (mFilterEvents.size() == 0) {
586             lock.unlock();
587             if (DEBUG_FILTER) {
588                 ALOGD("[Filter] wait for filter data output.");
589             }
590             usleep(1000 * 1000);
591             continue;
592         }
593 
594         // After successfully write, send a callback and wait for the read to be done
595         if (mCallbackScheduler.hasCallbackRegistered()) {
596             if (mConfigured) {
597                 auto startEvent =
598                         DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++);
599                 mCallbackScheduler.onFilterEvent(std::move(startEvent));
600                 mConfigured = false;
601             }
602 
603             // lock is still being held
604             for (auto&& event : mFilterEvents) {
605                 mCallbackScheduler.onFilterEvent(std::move(event));
606             }
607         } else {
608             ALOGD("[Filter] filter callback is not configured yet.");
609             mFilterThreadRunning = false;
610             return;
611         }
612 
613         mFilterEvents.clear();
614         mFilterStatus = DemuxFilterStatus::DATA_READY;
615         mCallbackScheduler.onFilterStatus(mFilterStatus);
616         break;
617     }
618 
619     while (mFilterThreadRunning) {
620         uint32_t efState = 0;
621         // We do not wait for the last round of written data to be read to finish the thread
622         // because the VTS can verify the reading itself.
623         for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
624             if (!mFilterThreadRunning) {
625                 break;
626             }
627             while (mFilterThreadRunning && mIsUsingFMQ) {
628                 ::android::status_t status = mFilterEventsFlag->wait(
629                         static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
630                         WAIT_TIMEOUT, true /* retry on spurious wake */);
631                 if (status != ::android::OK) {
632                     ALOGD("[Filter] wait for data consumed");
633                     continue;
634                 }
635                 break;
636             }
637 
638             maySendFilterStatusCallback();
639 
640             while (mFilterThreadRunning) {
641                 std::lock_guard<std::mutex> lock(mFilterEventsLock);
642                 if (mFilterEvents.size() == 0) {
643                     continue;
644                 }
645                 // After successfully write, send a callback and wait for the read to be done
646                 for (auto&& event : mFilterEvents) {
647                     mCallbackScheduler.onFilterEvent(std::move(event));
648                 }
649                 mFilterEvents.clear();
650                 break;
651             }
652             // We do not wait for the last read to be done
653             // VTS can verify the read result itself.
654             if (i == SECTION_WRITE_COUNT - 1) {
655                 ALOGD("[Filter] filter %" PRIu64 " writing done. Ending thread", mFilterId);
656                 break;
657             }
658         }
659         break;
660     }
661     ALOGD("[Filter] filter thread ended.");
662 }
663 
freeSharedAvHandle()664 void Filter::freeSharedAvHandle() {
665     if (!mIsMediaFilter) {
666         return;
667     }
668     native_handle_close(mSharedAvMemHandle);
669     native_handle_delete(mSharedAvMemHandle);
670     mSharedAvMemHandle = nullptr;
671 }
672 
dump(int fd,const char **,uint32_t)673 binder_status_t Filter::dump(int fd, const char** /* args */, uint32_t /* numArgs */) {
674     dprintf(fd, "    Filter %" PRIu64 ":\n", mFilterId);
675     dprintf(fd, "      Main type: %d\n", mType.mainType);
676     dprintf(fd, "      mIsMediaFilter: %d\n", mIsMediaFilter);
677     dprintf(fd, "      mIsPcrFilter: %d\n", mIsPcrFilter);
678     dprintf(fd, "      mIsRecordFilter: %d\n", mIsRecordFilter);
679     dprintf(fd, "      mIsUsingFMQ: %d\n", mIsUsingFMQ);
680     dprintf(fd, "      mFilterThreadRunning: %d\n", (bool)mFilterThreadRunning);
681     return STATUS_OK;
682 }
683 
maySendFilterStatusCallback()684 void Filter::maySendFilterStatusCallback() {
685     if (!mIsUsingFMQ) {
686         return;
687     }
688     std::lock_guard<std::mutex> lock(mFilterStatusLock);
689     int availableToRead = mFilterMQ->availableToRead();
690     int availableToWrite = mFilterMQ->availableToWrite();
691     int fmqSize = mFilterMQ->getQuantumCount();
692 
693     DemuxFilterStatus newStatus = checkFilterStatusChange(
694             availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
695     if (mFilterStatus != newStatus) {
696         mCallbackScheduler.onFilterStatus(newStatus);
697         mFilterStatus = newStatus;
698     }
699 }
700 
checkFilterStatusChange(uint32_t availableToWrite,uint32_t availableToRead,uint32_t highThreshold,uint32_t lowThreshold)701 DemuxFilterStatus Filter::checkFilterStatusChange(uint32_t availableToWrite,
702                                                   uint32_t availableToRead, uint32_t highThreshold,
703                                                   uint32_t lowThreshold) {
704     if (availableToWrite == 0) {
705         return DemuxFilterStatus::OVERFLOW;
706     } else if (availableToRead > highThreshold) {
707         return DemuxFilterStatus::HIGH_WATER;
708     } else if (availableToRead == 0) {
709         return DemuxFilterStatus::NO_DATA;
710     } else if (availableToRead < lowThreshold) {
711         return DemuxFilterStatus::LOW_WATER;
712     }
713     return mFilterStatus;
714 }
715 
getTpid()716 uint16_t Filter::getTpid() {
717     return mTpid;
718 }
719 
updateFilterOutput(vector<int8_t> & data)720 void Filter::updateFilterOutput(vector<int8_t>& data) {
721     std::lock_guard<std::mutex> lock(mFilterOutputLock);
722     mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
723 }
724 
updatePts(uint64_t pts)725 void Filter::updatePts(uint64_t pts) {
726     std::lock_guard<std::mutex> lock(mFilterOutputLock);
727     mPts = pts;
728 }
729 
updateRecordOutput(vector<int8_t> & data)730 void Filter::updateRecordOutput(vector<int8_t>& data) {
731     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
732     mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
733 }
734 
startFilterHandler()735 ::ndk::ScopedAStatus Filter::startFilterHandler() {
736     std::lock_guard<std::mutex> lock(mFilterOutputLock);
737     switch (mType.mainType) {
738         case DemuxFilterMainType::TS:
739             switch (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>()) {
740                 case DemuxTsFilterType::UNDEFINED:
741                     break;
742                 case DemuxTsFilterType::SECTION:
743                     startSectionFilterHandler();
744                     break;
745                 case DemuxTsFilterType::PES:
746                     startPesFilterHandler();
747                     break;
748                 case DemuxTsFilterType::TS:
749                     startTsFilterHandler();
750                     break;
751                 case DemuxTsFilterType::AUDIO:
752                 case DemuxTsFilterType::VIDEO:
753                     startMediaFilterHandler();
754                     break;
755                 case DemuxTsFilterType::PCR:
756                     startPcrFilterHandler();
757                     break;
758                 case DemuxTsFilterType::TEMI:
759                     startTemiFilterHandler();
760                     break;
761                 default:
762                     break;
763             }
764             break;
765         case DemuxFilterMainType::MMTP:
766             /*mmtpSettings*/
767             break;
768         case DemuxFilterMainType::IP:
769             /*ipSettings*/
770             break;
771         case DemuxFilterMainType::TLV:
772             /*tlvSettings*/
773             break;
774         case DemuxFilterMainType::ALP:
775             /*alpSettings*/
776             break;
777         default:
778             break;
779     }
780     return ::ndk::ScopedAStatus::ok();
781 }
782 
startSectionFilterHandler()783 ::ndk::ScopedAStatus Filter::startSectionFilterHandler() {
784     if (mFilterOutput.empty()) {
785         return ::ndk::ScopedAStatus::ok();
786     }
787     if (!writeSectionsAndCreateEvent(mFilterOutput)) {
788         ALOGD("[Filter] filter %" PRIu64 " fails to write into FMQ. Ending thread", mFilterId);
789         return ::ndk::ScopedAStatus::fromServiceSpecificError(
790                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
791     }
792 
793     mFilterOutput.clear();
794 
795     return ::ndk::ScopedAStatus::ok();
796 }
797 
startPesFilterHandler()798 ::ndk::ScopedAStatus Filter::startPesFilterHandler() {
799     if (mFilterOutput.empty()) {
800         return ::ndk::ScopedAStatus::ok();
801     }
802 
803     for (int i = 0; i < mFilterOutput.size(); i += 188) {
804         if (mPesSizeLeft == 0) {
805             uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
806                               mFilterOutput[i + 6];
807             if (DEBUG_FILTER) {
808                 ALOGD("[Filter] prefix %d", prefix);
809             }
810             if (prefix == 0x000001) {
811                 // TODO handle mulptiple Pes filters
812                 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
813                                static_cast<uint8_t>(mFilterOutput[i + 9]);
814                 mPesSizeLeft += 6;
815                 if (DEBUG_FILTER) {
816                     ALOGD("[Filter] pes data length %d", mPesSizeLeft);
817                 }
818             } else {
819                 continue;
820             }
821         }
822 
823         uint32_t endPoint = min(184u, mPesSizeLeft);
824         // append data and check size
825         vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
826         vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
827         mPesOutput.insert(mPesOutput.end(), first, last);
828         // size does not match then continue
829         mPesSizeLeft -= endPoint;
830         if (DEBUG_FILTER) {
831             ALOGD("[Filter] pes data left %d", mPesSizeLeft);
832         }
833         if (mPesSizeLeft > 0) {
834             continue;
835         }
836         // size match then create event
837         if (!writeDataToFilterMQ(mPesOutput)) {
838             ALOGD("[Filter] pes data write failed");
839             mFilterOutput.clear();
840             return ::ndk::ScopedAStatus::fromServiceSpecificError(
841                     static_cast<int32_t>(Result::INVALID_ARGUMENT));
842         }
843         maySendFilterStatusCallback();
844         DemuxFilterPesEvent pesEvent;
845         pesEvent = {
846                 // temp dump meta data
847                 .streamId = static_cast<int32_t>(mPesOutput[3]),
848                 .dataLength = static_cast<int32_t>(mPesOutput.size()),
849         };
850         if (DEBUG_FILTER) {
851             ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
852         }
853 
854         {
855             std::lock_guard<std::mutex> lock(mFilterEventsLock);
856             mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent));
857         }
858 
859         mPesOutput.clear();
860     }
861 
862     mFilterOutput.clear();
863 
864     return ::ndk::ScopedAStatus::ok();
865 }
866 
startTsFilterHandler()867 ::ndk::ScopedAStatus Filter::startTsFilterHandler() {
868     // TODO handle starting TS filter
869     return ::ndk::ScopedAStatus::ok();
870 }
871 
872 // Read PES (Packetized Elementary Stream) Packets from TransportStreams
873 // as defined in ISO/IEC 13818-1 Section 2.4.3.6. Create MediaEvents
874 // containing only their data without TS or PES headers.
startMediaFilterHandler()875 ::ndk::ScopedAStatus Filter::startMediaFilterHandler() {
876     if (mFilterOutput.empty()) {
877         return ::ndk::ScopedAStatus::ok();
878     }
879 
880     // mPts being set before our MediaFilterHandler begins indicates that all
881     // metadata has already been handled. We can therefore create an event
882     // with the existing data. This method is used when processing ES files.
883     ::ndk::ScopedAStatus result;
884     if (mPts) {
885         result = createMediaFilterEventWithIon(mFilterOutput);
886         if (result.isOk()) {
887             mFilterOutput.clear();
888         }
889         return result;
890     }
891 
892     for (int i = 0; i < mFilterOutput.size(); i += 188) {
893         // Every packet has a 4 Byte TS Header preceding it
894         uint32_t headerSize = 4;
895 
896         if (mPesSizeLeft == 0) {
897             // Packet Start Code Prefix is defined as the first 3 bytes of
898             // the PES Header and should always have the value 0x000001
899             uint32_t prefix = (static_cast<uint8_t>(mFilterOutput[i + 4]) << 16) |
900                               (static_cast<uint8_t>(mFilterOutput[i + 5]) << 8) |
901                               static_cast<uint8_t>(mFilterOutput[i + 6]);
902             if (DEBUG_FILTER) {
903                 ALOGD("[Filter] prefix %d", prefix);
904             }
905             if (prefix == 0x000001) {
906                 // TODO handle multiple Pes filters
907                 // Location of PES fields from ISO/IEC 13818-1 Section 2.4.3.6
908                 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
909                                static_cast<uint8_t>(mFilterOutput[i + 9]);
910                 bool hasPts = static_cast<uint8_t>(mFilterOutput[i + 11]) & 0x80;
911                 uint8_t optionalFieldsLength = static_cast<uint8_t>(mFilterOutput[i + 12]);
912                 headerSize += 9 + optionalFieldsLength;
913 
914                 if (hasPts) {
915                     // Pts is a 33-bit field which is stored across 5 bytes, with
916                     // bits in between as reserved fields which must be ignored
917                     mPts = 0;
918                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 13]) & 0x0e) << 29;
919                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 14]) & 0xff) << 22;
920                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 15]) & 0xfe) << 14;
921                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 16]) & 0xff) << 7;
922                     mPts |= (static_cast<uint8_t>(mFilterOutput[i + 17]) & 0xfe) >> 1;
923                 }
924 
925                 if (DEBUG_FILTER) {
926                     ALOGD("[Filter] pes data length %d", mPesSizeLeft);
927                 }
928             } else {
929                 continue;
930             }
931         }
932 
933         uint32_t endPoint = min(188u - headerSize, mPesSizeLeft);
934         // append data and check size
935         vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + headerSize;
936         vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + headerSize + endPoint;
937         mPesOutput.insert(mPesOutput.end(), first, last);
938         // size does not match then continue
939         mPesSizeLeft -= endPoint;
940         if (DEBUG_FILTER) {
941             ALOGD("[Filter] pes data left %d", mPesSizeLeft);
942         }
943         if (mPesSizeLeft > 0 || mAvBufferCopyCount++ < 10) {
944             continue;
945         }
946 
947         result = createMediaFilterEventWithIon(mPesOutput);
948         if (!result.isOk()) {
949             mFilterOutput.clear();
950             return result;
951         }
952     }
953 
954     mFilterOutput.clear();
955 
956     return ::ndk::ScopedAStatus::ok();
957 }
958 
createMediaFilterEventWithIon(vector<int8_t> & output)959 ::ndk::ScopedAStatus Filter::createMediaFilterEventWithIon(vector<int8_t>& output) {
960     if (mUsingSharedAvMem) {
961         if (mSharedAvMemHandle == nullptr) {
962             return ::ndk::ScopedAStatus::fromServiceSpecificError(
963                     static_cast<int32_t>(Result::UNKNOWN_ERROR));
964         }
965         return createShareMemMediaEvents(output);
966     }
967 
968     return createIndependentMediaEvents(output);
969 }
970 
startRecordFilterHandler()971 ::ndk::ScopedAStatus Filter::startRecordFilterHandler() {
972     std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
973     if (mRecordFilterOutput.empty()) {
974         return ::ndk::ScopedAStatus::ok();
975     }
976 
977     if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
978         ALOGD("[Filter] dvr fails to write into record FMQ.");
979         return ::ndk::ScopedAStatus::fromServiceSpecificError(
980                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
981     }
982 
983     DemuxFilterTsRecordEvent recordEvent;
984     recordEvent = {
985             .byteNumber = static_cast<int64_t>(mRecordFilterOutput.size()),
986             .pts = (mPts == 0) ? static_cast<int64_t>(time(NULL)) * 900000 : mPts,
987             .firstMbInSlice = 0,  // random address
988     };
989 
990     {
991         std::lock_guard<std::mutex> lock(mFilterEventsLock);
992         mFilterEvents.push_back(
993                 DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent));
994     }
995 
996     mRecordFilterOutput.clear();
997     return ::ndk::ScopedAStatus::ok();
998 }
999 
startPcrFilterHandler()1000 ::ndk::ScopedAStatus Filter::startPcrFilterHandler() {
1001     // TODO handle starting PCR filter
1002     return ::ndk::ScopedAStatus::ok();
1003 }
1004 
startTemiFilterHandler()1005 ::ndk::ScopedAStatus Filter::startTemiFilterHandler() {
1006     // TODO handle starting TEMI filter
1007     return ::ndk::ScopedAStatus::ok();
1008 }
1009 
1010 // Read PSI (Program Specific Information) Sections from TransportStreams
1011 // as defined in ISO/IEC 13818-1 Section 2.4.4
writeSectionsAndCreateEvent(vector<int8_t> & data)1012 bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) {
1013     // TODO check how many sections has been read
1014     ALOGD("[Filter] section handler");
1015 
1016     // Transport Stream Packets are 188 bytes long, as defined in the
1017     // Introduction of ISO/IEC 13818-1
1018     for (int i = 0; i < data.size(); i += 188) {
1019         if (mSectionSizeLeft == 0) {
1020             // Location for sectionSize as defined by Section 2.4.4
1021             // Note that the first 4 bytes skipped are the TsHeader
1022             mSectionSizeLeft = ((static_cast<uint8_t>(data[i + 5]) & 0x0f) << 8) |
1023                                static_cast<uint8_t>(data[i + 6]);
1024             mSectionSizeLeft += 3;
1025             if (DEBUG_FILTER) {
1026                 ALOGD("[Filter] section data length %d", mSectionSizeLeft);
1027             }
1028         }
1029 
1030         // 184 bytes per packet is derived by subtracting the 4 byte length of
1031         // the TsHeader from its 188 byte packet size
1032         uint32_t endPoint = min(184u, mSectionSizeLeft);
1033         // append data and check size
1034         vector<int8_t>::const_iterator first = data.begin() + i + 4;
1035         vector<int8_t>::const_iterator last = data.begin() + i + 4 + endPoint;
1036         mSectionOutput.insert(mSectionOutput.end(), first, last);
1037         // size does not match then continue
1038         mSectionSizeLeft -= endPoint;
1039         if (DEBUG_FILTER) {
1040             ALOGD("[Filter] section data left %d", mSectionSizeLeft);
1041         }
1042         if (mSectionSizeLeft > 0) {
1043             continue;
1044         }
1045 
1046         if (!writeDataToFilterMQ(mSectionOutput)) {
1047             mSectionOutput.clear();
1048             return false;
1049         }
1050 
1051         DemuxFilterSectionEvent secEvent;
1052         secEvent = {
1053                 // temp dump meta data
1054                 .tableId = 0,
1055                 .version = 1,
1056                 .sectionNum = 1,
1057                 .dataLength = static_cast<int32_t>(mSectionOutput.size()),
1058         };
1059         if (DEBUG_FILTER) {
1060             ALOGD("[Filter] assembled section data length %" PRIu64, secEvent.dataLength);
1061         }
1062 
1063         {
1064             std::lock_guard<std::mutex> lock(mFilterEventsLock);
1065             mFilterEvents.push_back(
1066                     DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent));
1067         }
1068         mSectionOutput.clear();
1069     }
1070 
1071     return true;
1072 }
1073 
writeDataToFilterMQ(const std::vector<int8_t> & data)1074 bool Filter::writeDataToFilterMQ(const std::vector<int8_t>& data) {
1075     std::lock_guard<std::mutex> lock(mWriteLock);
1076     if (mFilterMQ->write(data.data(), data.size())) {
1077         return true;
1078     }
1079     return false;
1080 }
1081 
attachFilterToRecord(const std::shared_ptr<Dvr> dvr)1082 void Filter::attachFilterToRecord(const std::shared_ptr<Dvr> dvr) {
1083     mDvr = dvr;
1084 }
1085 
detachFilterFromRecord()1086 void Filter::detachFilterFromRecord() {
1087     mDvr = nullptr;
1088 }
1089 
createAvIonFd(int size)1090 int Filter::createAvIonFd(int size) {
1091     // Create an DMA-BUF fd and allocate an av fd mapped to a buffer to it.
1092     auto buffer_allocator = std::make_unique<BufferAllocator>();
1093     if (!buffer_allocator) {
1094         ALOGE("[Filter] Unable to create BufferAllocator object");
1095         return -1;
1096     }
1097     int av_fd = -1;
1098     av_fd = buffer_allocator->Alloc("system-uncached", size);
1099     if (av_fd < 0) {
1100         ALOGE("[Filter] Failed to create av fd %d", errno);
1101         return -1;
1102     }
1103     return av_fd;
1104 }
1105 
getIonBuffer(int fd,int size)1106 uint8_t* Filter::getIonBuffer(int fd, int size) {
1107     uint8_t* avBuf = static_cast<uint8_t*>(
1108             mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 /*offset*/));
1109     if (avBuf == MAP_FAILED) {
1110         ALOGE("[Filter] fail to allocate buffer %d", errno);
1111         return NULL;
1112     }
1113     return avBuf;
1114 }
1115 
createNativeHandle(int fd)1116 native_handle_t* Filter::createNativeHandle(int fd) {
1117     native_handle_t* nativeHandle;
1118     if (fd < 0) {
1119         nativeHandle = native_handle_create(/*numFd*/ 0, 0);
1120     } else {
1121         // Create a native handle to pass the av fd via the callback event.
1122         nativeHandle = native_handle_create(/*numFd*/ 1, 0);
1123     }
1124     if (nativeHandle == NULL) {
1125         ALOGE("[Filter] Failed to create native_handle %d", errno);
1126         return NULL;
1127     }
1128     if (nativeHandle->numFds > 0) {
1129         nativeHandle->data[0] = dup(fd);
1130     }
1131     return nativeHandle;
1132 }
1133 
createIndependentMediaEvents(vector<int8_t> & output)1134 ::ndk::ScopedAStatus Filter::createIndependentMediaEvents(vector<int8_t>& output) {
1135     int av_fd = createAvIonFd(output.size());
1136     if (av_fd == -1) {
1137         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1138                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1139     }
1140     // copy the filtered data to the buffer
1141     uint8_t* avBuffer = getIonBuffer(av_fd, output.size());
1142     if (avBuffer == NULL) {
1143         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1144                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1145     }
1146     memcpy(avBuffer, output.data(), output.size() * sizeof(uint8_t));
1147 
1148     native_handle_t* nativeHandle = createNativeHandle(av_fd);
1149     if (nativeHandle == NULL) {
1150         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1151                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1152     }
1153 
1154     // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1155     uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1156     mDataId2Avfd[dataId] = dup(av_fd);
1157 
1158     // Create mediaEvent and send callback
1159     auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1160     auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1161     mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1162     mediaEvent.dataLength = static_cast<int64_t>(output.size());
1163     mediaEvent.avDataId = static_cast<int64_t>(dataId);
1164     if (mPts) {
1165         mediaEvent.pts = mPts;
1166         mPts = 0;
1167     }
1168 
1169     {
1170         std::lock_guard<std::mutex> lock(mFilterEventsLock);
1171         mFilterEvents.push_back(std::move(event));
1172     }
1173 
1174     // Clear and log
1175     native_handle_close(nativeHandle);
1176     native_handle_delete(nativeHandle);
1177     output.clear();
1178     mAvBufferCopyCount = 0;
1179     if (DEBUG_FILTER) {
1180         ALOGD("[Filter] av data length %d", static_cast<int32_t>(output.size()));
1181     }
1182     return ::ndk::ScopedAStatus::ok();
1183 }
1184 
createShareMemMediaEvents(vector<int8_t> & output)1185 ::ndk::ScopedAStatus Filter::createShareMemMediaEvents(vector<int8_t>& output) {
1186     // copy the filtered data to the shared buffer
1187     uint8_t* sharedAvBuffer =
1188             getIonBuffer(mSharedAvMemHandle->data[0], output.size() + mSharedAvMemOffset);
1189     if (sharedAvBuffer == NULL) {
1190         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1191                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1192     }
1193     memcpy(sharedAvBuffer + mSharedAvMemOffset, output.data(), output.size() * sizeof(uint8_t));
1194 
1195     // Create a memory handle with numFds == 0
1196     native_handle_t* nativeHandle = createNativeHandle(-1);
1197     if (nativeHandle == NULL) {
1198         return ::ndk::ScopedAStatus::fromServiceSpecificError(
1199                 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1200     }
1201 
1202     // Create mediaEvent and send callback
1203     auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1204     auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1205     mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1206     mediaEvent.offset = mSharedAvMemOffset;
1207     mediaEvent.dataLength = static_cast<int64_t>(output.size());
1208     if (mPts) {
1209         mediaEvent.pts = mPts;
1210         mPts = 0;
1211     }
1212 
1213     {
1214         std::lock_guard<std::mutex> lock(mFilterEventsLock);
1215         mFilterEvents.push_back(std::move(event));
1216     }
1217 
1218     mSharedAvMemOffset += output.size();
1219 
1220     // Clear and log
1221     native_handle_close(nativeHandle);
1222     native_handle_delete(nativeHandle);
1223     output.clear();
1224     if (DEBUG_FILTER) {
1225         ALOGD("[Filter] shared av data length %d", static_cast<int32_t>(output.size()));
1226     }
1227     return ::ndk::ScopedAStatus::ok();
1228 }
1229 
sameFile(int fd1,int fd2)1230 bool Filter::sameFile(int fd1, int fd2) {
1231     struct stat stat1, stat2;
1232     if (fstat(fd1, &stat1) < 0 || fstat(fd2, &stat2) < 0) {
1233         return false;
1234     }
1235     return (stat1.st_dev == stat2.st_dev) && (stat1.st_ino == stat2.st_ino);
1236 }
1237 
createMediaEvent(vector<DemuxFilterEvent> & events,bool isAudioPresentation)1238 void Filter::createMediaEvent(vector<DemuxFilterEvent>& events, bool isAudioPresentation) {
1239     DemuxFilterMediaEvent mediaEvent;
1240     mediaEvent.streamId = 1;
1241     mediaEvent.isPtsPresent = true;
1242     mediaEvent.isDtsPresent = false;
1243     mediaEvent.dataLength = 3;
1244     mediaEvent.offset = 4;
1245     mediaEvent.isSecureMemory = true;
1246     mediaEvent.mpuSequenceNumber = 6;
1247     mediaEvent.isPesPrivateData = true;
1248 
1249     if (isAudioPresentation) {
1250         AudioPresentation audioPresentation0{
1251                 .preselection.preselectionId = 0,
1252                 .preselection.labels = {{"en", "Commentator"}, {"es", "Comentarista"}},
1253                 .preselection.language = "en",
1254                 .preselection.renderingIndication =
1255                         AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1256                 .preselection.hasAudioDescription = false,
1257                 .preselection.hasSpokenSubtitles = false,
1258                 .preselection.hasDialogueEnhancement = true,
1259                 .ac4ShortProgramId = 42};
1260         AudioPresentation audioPresentation1{
1261                 .preselection.preselectionId = 1,
1262                 .preselection.labels = {{"en", "Crowd"}, {"es", "Multitud"}},
1263                 .preselection.language = "en",
1264                 .preselection.renderingIndication =
1265                         AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1266                 .preselection.hasAudioDescription = false,
1267                 .preselection.hasSpokenSubtitles = false,
1268                 .preselection.hasDialogueEnhancement = false,
1269                 .ac4ShortProgramId = 42};
1270         vector<AudioPresentation> audioPresentations;
1271         audioPresentations.push_back(audioPresentation0);
1272         audioPresentations.push_back(audioPresentation1);
1273         mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audioPresentations>(
1274                 audioPresentations);
1275     } else {
1276         AudioExtraMetaData audio;
1277         audio.adFade = 1;
1278         audio.adPan = 2;
1279         audio.versionTextTag = 3;
1280         audio.adGainCenter = 4;
1281         audio.adGainFront = 5;
1282         audio.adGainSurround = 6;
1283         mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audio>(audio);
1284     }
1285 
1286     int av_fd = createAvIonFd(BUFFER_SIZE);
1287     if (av_fd == -1) {
1288         return;
1289     }
1290 
1291     native_handle_t* nativeHandle = createNativeHandle(av_fd);
1292     if (nativeHandle == nullptr) {
1293         ::close(av_fd);
1294         ALOGE("[Filter] Failed to create native_handle %d", errno);
1295         return;
1296     }
1297 
1298     // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1299     uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1300     mDataId2Avfd[dataId] = dup(av_fd);
1301 
1302     mediaEvent.avDataId = static_cast<int64_t>(dataId);
1303     mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1304 
1305     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(std::move(mediaEvent)));
1306 
1307     native_handle_close(nativeHandle);
1308     native_handle_delete(nativeHandle);
1309 }
1310 
createTsRecordEvent(vector<DemuxFilterEvent> & events)1311 void Filter::createTsRecordEvent(vector<DemuxFilterEvent>& events) {
1312     DemuxPid pid;
1313     DemuxFilterScIndexMask mask;
1314     DemuxFilterTsRecordEvent tsRecord1;
1315     pid.set<DemuxPid::Tag::tPid>(1);
1316     mask.set<DemuxFilterScIndexMask::Tag::scIndex>(1);
1317     tsRecord1.pid = pid;
1318     tsRecord1.tsIndexMask = 1;
1319     tsRecord1.scIndexMask = mask;
1320     tsRecord1.byteNumber = 2;
1321 
1322     DemuxFilterTsRecordEvent tsRecord2;
1323     tsRecord2.pts = 1;
1324     tsRecord2.firstMbInSlice = 2;  // random address
1325 
1326     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord1)));
1327     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord2)));
1328 }
1329 
createMmtpRecordEvent(vector<DemuxFilterEvent> & events)1330 void Filter::createMmtpRecordEvent(vector<DemuxFilterEvent>& events) {
1331     DemuxFilterMmtpRecordEvent mmtpRecord1;
1332     mmtpRecord1.scHevcIndexMask = 1;
1333     mmtpRecord1.byteNumber = 2;
1334 
1335     DemuxFilterMmtpRecordEvent mmtpRecord2;
1336     mmtpRecord2.pts = 1;
1337     mmtpRecord2.mpuSequenceNumber = 2;
1338     mmtpRecord2.firstMbInSlice = 3;
1339     mmtpRecord2.tsIndexMask = 4;
1340 
1341     events.push_back(
1342             DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord1)));
1343     events.push_back(
1344             DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord2)));
1345 }
1346 
createSectionEvent(vector<DemuxFilterEvent> & events)1347 void Filter::createSectionEvent(vector<DemuxFilterEvent>& events) {
1348     DemuxFilterSectionEvent section;
1349     section.tableId = 1;
1350     section.version = 2;
1351     section.sectionNum = 3;
1352     section.dataLength = 0;
1353 
1354     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(std::move(section)));
1355 }
1356 
createPesEvent(vector<DemuxFilterEvent> & events)1357 void Filter::createPesEvent(vector<DemuxFilterEvent>& events) {
1358     DemuxFilterPesEvent pes;
1359     pes.streamId = 1;
1360     pes.dataLength = 1;
1361     pes.mpuSequenceNumber = 2;
1362 
1363     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(std::move(pes)));
1364 }
1365 
createDownloadEvent(vector<DemuxFilterEvent> & events)1366 void Filter::createDownloadEvent(vector<DemuxFilterEvent>& events) {
1367     DemuxFilterDownloadEvent download;
1368     download.itemId = 1;
1369     download.downloadId = 1;
1370     download.mpuSequenceNumber = 2;
1371     download.itemFragmentIndex = 3;
1372     download.lastItemFragmentIndex = 4;
1373     download.dataLength = 0;
1374 
1375     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::download>(std::move(download)));
1376 }
1377 
createIpPayloadEvent(vector<DemuxFilterEvent> & events)1378 void Filter::createIpPayloadEvent(vector<DemuxFilterEvent>& events) {
1379     DemuxFilterIpPayloadEvent ipPayload;
1380     ipPayload.dataLength = 0;
1381 
1382     events.push_back(
1383             DemuxFilterEvent::make<DemuxFilterEvent::Tag::ipPayload>(std::move(ipPayload)));
1384 }
1385 
createTemiEvent(vector<DemuxFilterEvent> & events)1386 void Filter::createTemiEvent(vector<DemuxFilterEvent>& events) {
1387     DemuxFilterTemiEvent temi;
1388     temi.pts = 1;
1389     temi.descrTag = 2;
1390     temi.descrData = {3};
1391 
1392     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::temi>(std::move(temi)));
1393 }
1394 
createMonitorEvent(vector<DemuxFilterEvent> & events)1395 void Filter::createMonitorEvent(vector<DemuxFilterEvent>& events) {
1396     DemuxFilterMonitorEvent monitor;
1397     monitor.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>(ScramblingStatus::SCRAMBLED);
1398 
1399     events.push_back(
1400             DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(std::move(monitor)));
1401 }
1402 
createRestartEvent(vector<DemuxFilterEvent> & events)1403 void Filter::createRestartEvent(vector<DemuxFilterEvent>& events) {
1404     events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(1));
1405 }
1406 
1407 }  // namespace tuner
1408 }  // namespace tv
1409 }  // namespace hardware
1410 }  // namespace android
1411 }  // namespace aidl
1412