1 /*
2 * Copyright 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "android.hardware.tv.tuner-service.example-Filter"
19
20 #include <BufferAllocator/BufferAllocator.h>
21 #include <aidl/android/hardware/tv/tuner/DemuxFilterMonitorEventType.h>
22 #include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
23 #include <aidl/android/hardware/tv/tuner/Result.h>
24 #include <aidlcommonsupport/NativeHandle.h>
25 #include <inttypes.h>
26 #include <utils/Log.h>
27
28 #include "Filter.h"
29
30 namespace aidl {
31 namespace android {
32 namespace hardware {
33 namespace tv {
34 namespace tuner {
35
36 #define WAIT_TIMEOUT 3000000000
37
FilterCallbackScheduler(const std::shared_ptr<IFilterCallback> & cb)38 FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb)
39 : mCallback(cb),
40 mIsConditionMet(false),
41 mDataLength(0),
42 mTimeDelayInMs(0),
43 mDataSizeDelayInBytes(0) {
44 start();
45 }
46
~FilterCallbackScheduler()47 FilterCallbackScheduler::~FilterCallbackScheduler() {
48 stop();
49 }
50
onFilterEvent(DemuxFilterEvent && event)51 void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) {
52 std::unique_lock<std::mutex> lock(mLock);
53 mCallbackBuffer.push_back(std::move(event));
54 mDataLength += getDemuxFilterEventDataLength(event);
55
56 if (isDataSizeDelayConditionMetLocked()) {
57 mIsConditionMet = true;
58 // unlock, so thread is not immediately blocked when it is notified.
59 lock.unlock();
60 mCv.notify_all();
61 }
62 }
63
onFilterStatus(const DemuxFilterStatus & status)64 void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) {
65 if (mCallback) {
66 mCallback->onFilterStatus(status);
67 }
68 }
69
flushEvents()70 void FilterCallbackScheduler::flushEvents() {
71 std::unique_lock<std::mutex> lock(mLock);
72 mCallbackBuffer.clear();
73 mDataLength = 0;
74 }
75
setTimeDelayHint(int timeDelay)76 void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) {
77 std::unique_lock<std::mutex> lock(mLock);
78 mTimeDelayInMs = timeDelay;
79 // always notify condition variable to update timeout
80 mIsConditionMet = true;
81 lock.unlock();
82 mCv.notify_all();
83 }
84
setDataSizeDelayHint(int dataSizeDelay)85 void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) {
86 std::unique_lock<std::mutex> lock(mLock);
87 mDataSizeDelayInBytes = dataSizeDelay;
88 if (isDataSizeDelayConditionMetLocked()) {
89 mIsConditionMet = true;
90 lock.unlock();
91 mCv.notify_all();
92 }
93 }
94
hasCallbackRegistered() const95 bool FilterCallbackScheduler::hasCallbackRegistered() const {
96 return mCallback != nullptr;
97 }
98
start()99 void FilterCallbackScheduler::start() {
100 mIsRunning = true;
101 mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this);
102 }
103
stop()104 void FilterCallbackScheduler::stop() {
105 mIsRunning = false;
106 if (mCallbackThread.joinable()) {
107 {
108 std::lock_guard<std::mutex> lock(mLock);
109 mIsConditionMet = true;
110 }
111 mCv.notify_all();
112 mCallbackThread.join();
113 }
114 }
115
threadLoop()116 void FilterCallbackScheduler::threadLoop() {
117 while (mIsRunning) {
118 threadLoopOnce();
119 }
120 }
121
threadLoopOnce()122 void FilterCallbackScheduler::threadLoopOnce() {
123 std::unique_lock<std::mutex> lock(mLock);
124 if (mTimeDelayInMs > 0) {
125 // Note: predicate protects from lost and spurious wakeups
126 mCv.wait_for(lock, std::chrono::milliseconds(mTimeDelayInMs),
127 [this] { return mIsConditionMet; });
128 } else {
129 // Note: predicate protects from lost and spurious wakeups
130 mCv.wait(lock, [this] { return mIsConditionMet; });
131 }
132 mIsConditionMet = false;
133
134 // condition_variable wait locks mutex on timeout / notify
135 // Note: if stop() has been called in the meantime, do not send more filter
136 // events.
137 if (mIsRunning && !mCallbackBuffer.empty()) {
138 if (mCallback) {
139 mCallback->onFilterEvent(mCallbackBuffer);
140 }
141 mCallbackBuffer.clear();
142 mDataLength = 0;
143 }
144 }
145
146 // mLock needs to be held to call this function
isDataSizeDelayConditionMetLocked()147 bool FilterCallbackScheduler::isDataSizeDelayConditionMetLocked() {
148 if (mDataSizeDelayInBytes == 0) {
149 // Data size delay is disabled.
150 if (mTimeDelayInMs == 0) {
151 // Events should only be sent immediately if time delay is disabled
152 // as well.
153 return true;
154 }
155 return false;
156 }
157
158 // Data size delay is enabled.
159 return mDataLength >= mDataSizeDelayInBytes;
160 }
161
getDemuxFilterEventDataLength(const DemuxFilterEvent & event)162 int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) {
163 // there is a risk that dataLength could be a negative value, but it
164 // *should* be safe to assume that it is always positive.
165 switch (event.getTag()) {
166 case DemuxFilterEvent::Tag::section:
167 return event.get<DemuxFilterEvent::Tag::section>().dataLength;
168 case DemuxFilterEvent::Tag::media:
169 return event.get<DemuxFilterEvent::Tag::media>().dataLength;
170 case DemuxFilterEvent::Tag::pes:
171 return event.get<DemuxFilterEvent::Tag::pes>().dataLength;
172 case DemuxFilterEvent::Tag::download:
173 return event.get<DemuxFilterEvent::Tag::download>().dataLength;
174 case DemuxFilterEvent::Tag::ipPayload:
175 return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength;
176
177 case DemuxFilterEvent::Tag::tsRecord:
178 case DemuxFilterEvent::Tag::mmtpRecord:
179 case DemuxFilterEvent::Tag::temi:
180 case DemuxFilterEvent::Tag::monitorEvent:
181 case DemuxFilterEvent::Tag::startId:
182 // these events do not include a payload and should therefore return
183 // 0.
184 // do not add a default option, so this will not compile when new types
185 // are added.
186 return 0;
187 }
188 }
189
Filter(DemuxFilterType type,int64_t filterId,uint32_t bufferSize,const std::shared_ptr<IFilterCallback> & cb,std::shared_ptr<Demux> demux)190 Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
191 const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux)
192 : mDemux(demux),
193 mCallbackScheduler(cb),
194 mFilterId(filterId),
195 mBufferSize(bufferSize),
196 mType(type) {
197 switch (mType.mainType) {
198 case DemuxFilterMainType::TS:
199 if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
200 DemuxTsFilterType::AUDIO ||
201 mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
202 DemuxTsFilterType::VIDEO) {
203 mIsMediaFilter = true;
204 }
205 if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
206 DemuxTsFilterType::PCR) {
207 mIsPcrFilter = true;
208 }
209 if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
210 DemuxTsFilterType::RECORD) {
211 mIsRecordFilter = true;
212 }
213 break;
214 case DemuxFilterMainType::MMTP:
215 if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
216 DemuxMmtpFilterType::AUDIO ||
217 mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
218 DemuxMmtpFilterType::VIDEO) {
219 mIsMediaFilter = true;
220 }
221 if (mType.subType.get<DemuxFilterSubType::Tag::mmtpFilterType>() ==
222 DemuxMmtpFilterType::RECORD) {
223 mIsRecordFilter = true;
224 }
225 break;
226 case DemuxFilterMainType::IP:
227 break;
228 case DemuxFilterMainType::TLV:
229 break;
230 case DemuxFilterMainType::ALP:
231 break;
232 default:
233 break;
234 }
235 }
236
~Filter()237 Filter::~Filter() {
238 close();
239 }
240
getId64Bit(int64_t * _aidl_return)241 ::ndk::ScopedAStatus Filter::getId64Bit(int64_t* _aidl_return) {
242 ALOGV("%s", __FUNCTION__);
243
244 *_aidl_return = mFilterId;
245 return ::ndk::ScopedAStatus::ok();
246 }
247
getId(int32_t * _aidl_return)248 ::ndk::ScopedAStatus Filter::getId(int32_t* _aidl_return) {
249 ALOGV("%s", __FUNCTION__);
250
251 *_aidl_return = static_cast<int32_t>(mFilterId);
252 return ::ndk::ScopedAStatus::ok();
253 }
254
setDataSource(const std::shared_ptr<IFilter> & in_filter)255 ::ndk::ScopedAStatus Filter::setDataSource(const std::shared_ptr<IFilter>& in_filter) {
256 ALOGV("%s", __FUNCTION__);
257
258 mDataSource = in_filter;
259 mIsDataSourceDemux = false;
260
261 return ::ndk::ScopedAStatus::ok();
262 }
263
setDelayHint(const FilterDelayHint & in_hint)264 ::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) {
265 if (mIsMediaFilter) {
266 // delay hint is not supported for media filters
267 return ::ndk::ScopedAStatus::fromServiceSpecificError(
268 static_cast<int32_t>(Result::UNAVAILABLE));
269 }
270
271 ALOGV("%s", __FUNCTION__);
272 if (in_hint.hintValue < 0) {
273 return ::ndk::ScopedAStatus::fromServiceSpecificError(
274 static_cast<int32_t>(Result::INVALID_ARGUMENT));
275 }
276
277 switch (in_hint.hintType) {
278 case FilterDelayHintType::TIME_DELAY_IN_MS:
279 mCallbackScheduler.setTimeDelayHint(in_hint.hintValue);
280 break;
281 case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES:
282 mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue);
283 break;
284 default:
285 return ::ndk::ScopedAStatus::fromServiceSpecificError(
286 static_cast<int32_t>(Result::INVALID_ARGUMENT));
287 }
288
289 return ::ndk::ScopedAStatus::ok();
290 }
291
getQueueDesc(MQDescriptor<int8_t,SynchronizedReadWrite> * out_queue)292 ::ndk::ScopedAStatus Filter::getQueueDesc(MQDescriptor<int8_t, SynchronizedReadWrite>* out_queue) {
293 ALOGV("%s", __FUNCTION__);
294
295 mIsUsingFMQ = mIsRecordFilter ? false : true;
296
297 *out_queue = mFilterMQ->dupeDesc();
298 return ::ndk::ScopedAStatus::ok();
299 }
300
configure(const DemuxFilterSettings & in_settings)301 ::ndk::ScopedAStatus Filter::configure(const DemuxFilterSettings& in_settings) {
302 ALOGV("%s", __FUNCTION__);
303
304 mFilterSettings = in_settings;
305 switch (mType.mainType) {
306 case DemuxFilterMainType::TS:
307 mTpid = in_settings.get<DemuxFilterSettings::Tag::ts>().tpid;
308 break;
309 case DemuxFilterMainType::MMTP:
310 break;
311 case DemuxFilterMainType::IP:
312 break;
313 case DemuxFilterMainType::TLV:
314 break;
315 case DemuxFilterMainType::ALP:
316 break;
317 default:
318 break;
319 }
320
321 mConfigured = true;
322 return ::ndk::ScopedAStatus::ok();
323 }
324
start()325 ::ndk::ScopedAStatus Filter::start() {
326 ALOGV("%s", __FUNCTION__);
327 mFilterThreadRunning = true;
328 std::vector<DemuxFilterEvent> events;
329
330 mFilterCount += 1;
331 mDemux->setIptvThreadRunning(true);
332
333 // All the filter event callbacks in start are for testing purpose.
334 switch (mType.mainType) {
335 case DemuxFilterMainType::TS:
336 createMediaEvent(events, false);
337 createMediaEvent(events, true);
338 createTsRecordEvent(events);
339 createTemiEvent(events);
340 break;
341 case DemuxFilterMainType::MMTP:
342 createDownloadEvent(events);
343 createMmtpRecordEvent(events);
344 break;
345 case DemuxFilterMainType::IP:
346 createSectionEvent(events);
347 createIpPayloadEvent(events);
348 break;
349 case DemuxFilterMainType::TLV:
350 createMonitorEvent(events);
351 break;
352 case DemuxFilterMainType::ALP:
353 createMonitorEvent(events);
354 break;
355 default:
356 break;
357 }
358
359 for (auto&& event : events) {
360 mCallbackScheduler.onFilterEvent(std::move(event));
361 }
362
363 return startFilterLoop();
364 }
365
stop()366 ::ndk::ScopedAStatus Filter::stop() {
367 ALOGV("%s", __FUNCTION__);
368
369 if (mFilterCount > 0) {
370 mFilterCount -= 1;
371 if (mFilterCount.load() == 0) {
372 mDemux->setIptvThreadRunning(false);
373 }
374 }
375
376 mFilterThreadRunning = false;
377 if (mFilterThread.joinable()) {
378 mFilterThread.join();
379 }
380
381 mCallbackScheduler.flushEvents();
382
383 return ::ndk::ScopedAStatus::ok();
384 }
385
flush()386 ::ndk::ScopedAStatus Filter::flush() {
387 ALOGV("%s", __FUNCTION__);
388
389 // temp implementation to flush the FMQ
390 int size = mFilterMQ->availableToRead();
391 int8_t* buffer = new int8_t[size];
392 mFilterMQ->read(buffer, size);
393 delete[] buffer;
394 mFilterStatus = DemuxFilterStatus::DATA_READY;
395
396 return ::ndk::ScopedAStatus::ok();
397 }
398
releaseAvHandle(const NativeHandle & in_avMemory,int64_t in_avDataId)399 ::ndk::ScopedAStatus Filter::releaseAvHandle(const NativeHandle& in_avMemory, int64_t in_avDataId) {
400 ALOGV("%s", __FUNCTION__);
401
402 if ((mSharedAvMemHandle != nullptr) && (in_avMemory.fds.size() > 0) &&
403 (sameFile(in_avMemory.fds[0].get(), mSharedAvMemHandle->data[0]))) {
404 freeSharedAvHandle();
405 return ::ndk::ScopedAStatus::ok();
406 }
407
408 if (mDataId2Avfd.find(in_avDataId) == mDataId2Avfd.end()) {
409 return ::ndk::ScopedAStatus::fromServiceSpecificError(
410 static_cast<int32_t>(Result::INVALID_ARGUMENT));
411 }
412
413 ::close(mDataId2Avfd[in_avDataId]);
414 return ::ndk::ScopedAStatus::ok();
415 }
416
close()417 ::ndk::ScopedAStatus Filter::close() {
418 ALOGV("%s", __FUNCTION__);
419
420 stop();
421
422 return mDemux->removeFilter(mFilterId);
423 }
424
configureIpCid(int32_t in_ipCid)425 ::ndk::ScopedAStatus Filter::configureIpCid(int32_t in_ipCid) {
426 ALOGV("%s", __FUNCTION__);
427
428 if (mType.mainType != DemuxFilterMainType::IP) {
429 return ::ndk::ScopedAStatus::fromServiceSpecificError(
430 static_cast<int32_t>(Result::INVALID_STATE));
431 }
432
433 mCid = in_ipCid;
434 return ::ndk::ScopedAStatus::ok();
435 }
436
getAvSharedHandle(NativeHandle * out_avMemory,int64_t * _aidl_return)437 ::ndk::ScopedAStatus Filter::getAvSharedHandle(NativeHandle* out_avMemory, int64_t* _aidl_return) {
438 ALOGV("%s", __FUNCTION__);
439
440 if (!mIsMediaFilter) {
441 return ::ndk::ScopedAStatus::fromServiceSpecificError(
442 static_cast<int32_t>(Result::INVALID_STATE));
443 }
444
445 if (mSharedAvMemHandle != nullptr) {
446 *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
447 *_aidl_return = BUFFER_SIZE;
448 mUsingSharedAvMem = true;
449 return ::ndk::ScopedAStatus::ok();
450 }
451
452 int av_fd = createAvIonFd(BUFFER_SIZE);
453 if (av_fd < 0) {
454 return ::ndk::ScopedAStatus::fromServiceSpecificError(
455 static_cast<int32_t>(Result::OUT_OF_MEMORY));
456 }
457
458 mSharedAvMemHandle = createNativeHandle(av_fd);
459 if (mSharedAvMemHandle == nullptr) {
460 ::close(av_fd);
461 *_aidl_return = 0;
462 return ::ndk::ScopedAStatus::fromServiceSpecificError(
463 static_cast<int32_t>(Result::UNKNOWN_ERROR));
464 }
465 ::close(av_fd);
466 mUsingSharedAvMem = true;
467
468 *out_avMemory = ::android::dupToAidl(mSharedAvMemHandle);
469 *_aidl_return = BUFFER_SIZE;
470 return ::ndk::ScopedAStatus::ok();
471 }
472
configureAvStreamType(const AvStreamType & in_avStreamType)473 ::ndk::ScopedAStatus Filter::configureAvStreamType(const AvStreamType& in_avStreamType) {
474 ALOGV("%s", __FUNCTION__);
475
476 if (!mIsMediaFilter) {
477 return ::ndk::ScopedAStatus::fromServiceSpecificError(
478 static_cast<int32_t>(Result::UNAVAILABLE));
479 }
480
481 switch (in_avStreamType.getTag()) {
482 case AvStreamType::Tag::audio:
483 mAudioStreamType =
484 static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::audio>());
485 break;
486 case AvStreamType::Tag::video:
487 mVideoStreamType =
488 static_cast<uint32_t>(in_avStreamType.get<AvStreamType::Tag::video>());
489 break;
490 default:
491 break;
492 }
493
494 return ::ndk::ScopedAStatus::ok();
495 }
496
configureMonitorEvent(int in_monitorEventTypes)497 ::ndk::ScopedAStatus Filter::configureMonitorEvent(int in_monitorEventTypes) {
498 ALOGV("%s", __FUNCTION__);
499
500 int32_t newScramblingStatus =
501 in_monitorEventTypes &
502 static_cast<int32_t>(DemuxFilterMonitorEventType::SCRAMBLING_STATUS);
503 int32_t newIpCid =
504 in_monitorEventTypes & static_cast<int32_t>(DemuxFilterMonitorEventType::IP_CID_CHANGE);
505
506 // if scrambling status monitoring flipped, record the new state and send msg on enabling
507 if (newScramblingStatus ^ mScramblingStatusMonitored) {
508 mScramblingStatusMonitored = newScramblingStatus;
509 if (mScramblingStatusMonitored) {
510 if (mCallbackScheduler.hasCallbackRegistered()) {
511 // Assuming current status is always NOT_SCRAMBLED
512 auto monitorEvent = DemuxFilterMonitorEvent::make<
513 DemuxFilterMonitorEvent::Tag::scramblingStatus>(
514 ScramblingStatus::NOT_SCRAMBLED);
515 auto event =
516 DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
517 mCallbackScheduler.onFilterEvent(std::move(event));
518 } else {
519 return ::ndk::ScopedAStatus::fromServiceSpecificError(
520 static_cast<int32_t>(Result::INVALID_STATE));
521 }
522 }
523 }
524
525 // if ip cid monitoring flipped, record the new state and send msg on enabling
526 if (newIpCid ^ mIpCidMonitored) {
527 mIpCidMonitored = newIpCid;
528 if (mIpCidMonitored) {
529 if (mCallbackScheduler.hasCallbackRegistered()) {
530 // Return random cid
531 auto monitorEvent =
532 DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1);
533 auto event =
534 DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
535 mCallbackScheduler.onFilterEvent(std::move(event));
536 } else {
537 return ::ndk::ScopedAStatus::fromServiceSpecificError(
538 static_cast<int32_t>(Result::INVALID_STATE));
539 }
540 }
541 }
542
543 return ::ndk::ScopedAStatus::ok();
544 }
545
createFilterMQ()546 bool Filter::createFilterMQ() {
547 ALOGV("%s", __FUNCTION__);
548
549 // Create a synchronized FMQ that supports blocking read/write
550 std::unique_ptr<FilterMQ> tmpFilterMQ =
551 std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(mBufferSize, true));
552 if (!tmpFilterMQ->isValid()) {
553 ALOGW("[Filter] Failed to create FMQ of filter with id: %" PRIu64, mFilterId);
554 return false;
555 }
556
557 mFilterMQ = std::move(tmpFilterMQ);
558
559 if (EventFlag::createEventFlag(mFilterMQ->getEventFlagWord(), &mFilterEventsFlag) !=
560 ::android::OK) {
561 return false;
562 }
563
564 return true;
565 }
566
startFilterLoop()567 ::ndk::ScopedAStatus Filter::startFilterLoop() {
568 mFilterThread = std::thread(&Filter::filterThreadLoop, this);
569 return ::ndk::ScopedAStatus::ok();
570 }
571
filterThreadLoop()572 void Filter::filterThreadLoop() {
573 if (!mFilterThreadRunning) {
574 return;
575 }
576
577 ALOGD("[Filter] filter %" PRIu64 " threadLoop start.", mFilterId);
578
579 ALOGI("IPTV DVR Playback status on Filter: %d", mIptvDvrPlaybackStatus);
580
581 // For the first time of filter output, implementation needs to send the filter
582 // Event Callback without waiting for the DATA_CONSUMED to init the process.
583 while (mFilterThreadRunning) {
584 std::unique_lock<std::mutex> lock(mFilterEventsLock);
585 if (mFilterEvents.size() == 0) {
586 lock.unlock();
587 if (DEBUG_FILTER) {
588 ALOGD("[Filter] wait for filter data output.");
589 }
590 usleep(1000 * 1000);
591 continue;
592 }
593
594 // After successfully write, send a callback and wait for the read to be done
595 if (mCallbackScheduler.hasCallbackRegistered()) {
596 if (mConfigured) {
597 auto startEvent =
598 DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++);
599 mCallbackScheduler.onFilterEvent(std::move(startEvent));
600 mConfigured = false;
601 }
602
603 // lock is still being held
604 for (auto&& event : mFilterEvents) {
605 mCallbackScheduler.onFilterEvent(std::move(event));
606 }
607 } else {
608 ALOGD("[Filter] filter callback is not configured yet.");
609 mFilterThreadRunning = false;
610 return;
611 }
612
613 mFilterEvents.clear();
614 mFilterStatus = DemuxFilterStatus::DATA_READY;
615 mCallbackScheduler.onFilterStatus(mFilterStatus);
616 break;
617 }
618
619 while (mFilterThreadRunning) {
620 uint32_t efState = 0;
621 // We do not wait for the last round of written data to be read to finish the thread
622 // because the VTS can verify the reading itself.
623 for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
624 if (!mFilterThreadRunning) {
625 break;
626 }
627 while (mFilterThreadRunning && mIsUsingFMQ) {
628 ::android::status_t status = mFilterEventsFlag->wait(
629 static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
630 WAIT_TIMEOUT, true /* retry on spurious wake */);
631 if (status != ::android::OK) {
632 ALOGD("[Filter] wait for data consumed");
633 continue;
634 }
635 break;
636 }
637
638 maySendFilterStatusCallback();
639
640 while (mFilterThreadRunning) {
641 std::lock_guard<std::mutex> lock(mFilterEventsLock);
642 if (mFilterEvents.size() == 0) {
643 continue;
644 }
645 // After successfully write, send a callback and wait for the read to be done
646 for (auto&& event : mFilterEvents) {
647 mCallbackScheduler.onFilterEvent(std::move(event));
648 }
649 mFilterEvents.clear();
650 break;
651 }
652 // We do not wait for the last read to be done
653 // VTS can verify the read result itself.
654 if (i == SECTION_WRITE_COUNT - 1) {
655 ALOGD("[Filter] filter %" PRIu64 " writing done. Ending thread", mFilterId);
656 break;
657 }
658 }
659 break;
660 }
661 ALOGD("[Filter] filter thread ended.");
662 }
663
freeSharedAvHandle()664 void Filter::freeSharedAvHandle() {
665 if (!mIsMediaFilter) {
666 return;
667 }
668 native_handle_close(mSharedAvMemHandle);
669 native_handle_delete(mSharedAvMemHandle);
670 mSharedAvMemHandle = nullptr;
671 }
672
dump(int fd,const char **,uint32_t)673 binder_status_t Filter::dump(int fd, const char** /* args */, uint32_t /* numArgs */) {
674 dprintf(fd, " Filter %" PRIu64 ":\n", mFilterId);
675 dprintf(fd, " Main type: %d\n", mType.mainType);
676 dprintf(fd, " mIsMediaFilter: %d\n", mIsMediaFilter);
677 dprintf(fd, " mIsPcrFilter: %d\n", mIsPcrFilter);
678 dprintf(fd, " mIsRecordFilter: %d\n", mIsRecordFilter);
679 dprintf(fd, " mIsUsingFMQ: %d\n", mIsUsingFMQ);
680 dprintf(fd, " mFilterThreadRunning: %d\n", (bool)mFilterThreadRunning);
681 return STATUS_OK;
682 }
683
maySendFilterStatusCallback()684 void Filter::maySendFilterStatusCallback() {
685 if (!mIsUsingFMQ) {
686 return;
687 }
688 std::lock_guard<std::mutex> lock(mFilterStatusLock);
689 int availableToRead = mFilterMQ->availableToRead();
690 int availableToWrite = mFilterMQ->availableToWrite();
691 int fmqSize = mFilterMQ->getQuantumCount();
692
693 DemuxFilterStatus newStatus = checkFilterStatusChange(
694 availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
695 if (mFilterStatus != newStatus) {
696 mCallbackScheduler.onFilterStatus(newStatus);
697 mFilterStatus = newStatus;
698 }
699 }
700
checkFilterStatusChange(uint32_t availableToWrite,uint32_t availableToRead,uint32_t highThreshold,uint32_t lowThreshold)701 DemuxFilterStatus Filter::checkFilterStatusChange(uint32_t availableToWrite,
702 uint32_t availableToRead, uint32_t highThreshold,
703 uint32_t lowThreshold) {
704 if (availableToWrite == 0) {
705 return DemuxFilterStatus::OVERFLOW;
706 } else if (availableToRead > highThreshold) {
707 return DemuxFilterStatus::HIGH_WATER;
708 } else if (availableToRead == 0) {
709 return DemuxFilterStatus::NO_DATA;
710 } else if (availableToRead < lowThreshold) {
711 return DemuxFilterStatus::LOW_WATER;
712 }
713 return mFilterStatus;
714 }
715
getTpid()716 uint16_t Filter::getTpid() {
717 return mTpid;
718 }
719
updateFilterOutput(vector<int8_t> & data)720 void Filter::updateFilterOutput(vector<int8_t>& data) {
721 std::lock_guard<std::mutex> lock(mFilterOutputLock);
722 mFilterOutput.insert(mFilterOutput.end(), data.begin(), data.end());
723 }
724
updatePts(uint64_t pts)725 void Filter::updatePts(uint64_t pts) {
726 std::lock_guard<std::mutex> lock(mFilterOutputLock);
727 mPts = pts;
728 }
729
updateRecordOutput(vector<int8_t> & data)730 void Filter::updateRecordOutput(vector<int8_t>& data) {
731 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
732 mRecordFilterOutput.insert(mRecordFilterOutput.end(), data.begin(), data.end());
733 }
734
startFilterHandler()735 ::ndk::ScopedAStatus Filter::startFilterHandler() {
736 std::lock_guard<std::mutex> lock(mFilterOutputLock);
737 switch (mType.mainType) {
738 case DemuxFilterMainType::TS:
739 switch (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>()) {
740 case DemuxTsFilterType::UNDEFINED:
741 break;
742 case DemuxTsFilterType::SECTION:
743 startSectionFilterHandler();
744 break;
745 case DemuxTsFilterType::PES:
746 startPesFilterHandler();
747 break;
748 case DemuxTsFilterType::TS:
749 startTsFilterHandler();
750 break;
751 case DemuxTsFilterType::AUDIO:
752 case DemuxTsFilterType::VIDEO:
753 startMediaFilterHandler();
754 break;
755 case DemuxTsFilterType::PCR:
756 startPcrFilterHandler();
757 break;
758 case DemuxTsFilterType::TEMI:
759 startTemiFilterHandler();
760 break;
761 default:
762 break;
763 }
764 break;
765 case DemuxFilterMainType::MMTP:
766 /*mmtpSettings*/
767 break;
768 case DemuxFilterMainType::IP:
769 /*ipSettings*/
770 break;
771 case DemuxFilterMainType::TLV:
772 /*tlvSettings*/
773 break;
774 case DemuxFilterMainType::ALP:
775 /*alpSettings*/
776 break;
777 default:
778 break;
779 }
780 return ::ndk::ScopedAStatus::ok();
781 }
782
startSectionFilterHandler()783 ::ndk::ScopedAStatus Filter::startSectionFilterHandler() {
784 if (mFilterOutput.empty()) {
785 return ::ndk::ScopedAStatus::ok();
786 }
787 if (!writeSectionsAndCreateEvent(mFilterOutput)) {
788 ALOGD("[Filter] filter %" PRIu64 " fails to write into FMQ. Ending thread", mFilterId);
789 return ::ndk::ScopedAStatus::fromServiceSpecificError(
790 static_cast<int32_t>(Result::UNKNOWN_ERROR));
791 }
792
793 mFilterOutput.clear();
794
795 return ::ndk::ScopedAStatus::ok();
796 }
797
startPesFilterHandler()798 ::ndk::ScopedAStatus Filter::startPesFilterHandler() {
799 if (mFilterOutput.empty()) {
800 return ::ndk::ScopedAStatus::ok();
801 }
802
803 for (int i = 0; i < mFilterOutput.size(); i += 188) {
804 if (mPesSizeLeft == 0) {
805 uint32_t prefix = (mFilterOutput[i + 4] << 16) | (mFilterOutput[i + 5] << 8) |
806 mFilterOutput[i + 6];
807 if (DEBUG_FILTER) {
808 ALOGD("[Filter] prefix %d", prefix);
809 }
810 if (prefix == 0x000001) {
811 // TODO handle mulptiple Pes filters
812 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
813 static_cast<uint8_t>(mFilterOutput[i + 9]);
814 mPesSizeLeft += 6;
815 if (DEBUG_FILTER) {
816 ALOGD("[Filter] pes data length %d", mPesSizeLeft);
817 }
818 } else {
819 continue;
820 }
821 }
822
823 uint32_t endPoint = min(184u, mPesSizeLeft);
824 // append data and check size
825 vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + 4;
826 vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + 4 + endPoint;
827 mPesOutput.insert(mPesOutput.end(), first, last);
828 // size does not match then continue
829 mPesSizeLeft -= endPoint;
830 if (DEBUG_FILTER) {
831 ALOGD("[Filter] pes data left %d", mPesSizeLeft);
832 }
833 if (mPesSizeLeft > 0) {
834 continue;
835 }
836 // size match then create event
837 if (!writeDataToFilterMQ(mPesOutput)) {
838 ALOGD("[Filter] pes data write failed");
839 mFilterOutput.clear();
840 return ::ndk::ScopedAStatus::fromServiceSpecificError(
841 static_cast<int32_t>(Result::INVALID_ARGUMENT));
842 }
843 maySendFilterStatusCallback();
844 DemuxFilterPesEvent pesEvent;
845 pesEvent = {
846 // temp dump meta data
847 .streamId = static_cast<int32_t>(mPesOutput[3]),
848 .dataLength = static_cast<int32_t>(mPesOutput.size()),
849 };
850 if (DEBUG_FILTER) {
851 ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
852 }
853
854 {
855 std::lock_guard<std::mutex> lock(mFilterEventsLock);
856 mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent));
857 }
858
859 mPesOutput.clear();
860 }
861
862 mFilterOutput.clear();
863
864 return ::ndk::ScopedAStatus::ok();
865 }
866
startTsFilterHandler()867 ::ndk::ScopedAStatus Filter::startTsFilterHandler() {
868 // TODO handle starting TS filter
869 return ::ndk::ScopedAStatus::ok();
870 }
871
872 // Read PES (Packetized Elementary Stream) Packets from TransportStreams
873 // as defined in ISO/IEC 13818-1 Section 2.4.3.6. Create MediaEvents
874 // containing only their data without TS or PES headers.
startMediaFilterHandler()875 ::ndk::ScopedAStatus Filter::startMediaFilterHandler() {
876 if (mFilterOutput.empty()) {
877 return ::ndk::ScopedAStatus::ok();
878 }
879
880 // mPts being set before our MediaFilterHandler begins indicates that all
881 // metadata has already been handled. We can therefore create an event
882 // with the existing data. This method is used when processing ES files.
883 ::ndk::ScopedAStatus result;
884 if (mPts) {
885 result = createMediaFilterEventWithIon(mFilterOutput);
886 if (result.isOk()) {
887 mFilterOutput.clear();
888 }
889 return result;
890 }
891
892 for (int i = 0; i < mFilterOutput.size(); i += 188) {
893 // Every packet has a 4 Byte TS Header preceding it
894 uint32_t headerSize = 4;
895
896 if (mPesSizeLeft == 0) {
897 // Packet Start Code Prefix is defined as the first 3 bytes of
898 // the PES Header and should always have the value 0x000001
899 uint32_t prefix = (static_cast<uint8_t>(mFilterOutput[i + 4]) << 16) |
900 (static_cast<uint8_t>(mFilterOutput[i + 5]) << 8) |
901 static_cast<uint8_t>(mFilterOutput[i + 6]);
902 if (DEBUG_FILTER) {
903 ALOGD("[Filter] prefix %d", prefix);
904 }
905 if (prefix == 0x000001) {
906 // TODO handle multiple Pes filters
907 // Location of PES fields from ISO/IEC 13818-1 Section 2.4.3.6
908 mPesSizeLeft = (static_cast<uint8_t>(mFilterOutput[i + 8]) << 8) |
909 static_cast<uint8_t>(mFilterOutput[i + 9]);
910 bool hasPts = static_cast<uint8_t>(mFilterOutput[i + 11]) & 0x80;
911 uint8_t optionalFieldsLength = static_cast<uint8_t>(mFilterOutput[i + 12]);
912 headerSize += 9 + optionalFieldsLength;
913
914 if (hasPts) {
915 // Pts is a 33-bit field which is stored across 5 bytes, with
916 // bits in between as reserved fields which must be ignored
917 mPts = 0;
918 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 13]) & 0x0e) << 29;
919 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 14]) & 0xff) << 22;
920 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 15]) & 0xfe) << 14;
921 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 16]) & 0xff) << 7;
922 mPts |= (static_cast<uint8_t>(mFilterOutput[i + 17]) & 0xfe) >> 1;
923 }
924
925 if (DEBUG_FILTER) {
926 ALOGD("[Filter] pes data length %d", mPesSizeLeft);
927 }
928 } else {
929 continue;
930 }
931 }
932
933 uint32_t endPoint = min(188u - headerSize, mPesSizeLeft);
934 // append data and check size
935 vector<int8_t>::const_iterator first = mFilterOutput.begin() + i + headerSize;
936 vector<int8_t>::const_iterator last = mFilterOutput.begin() + i + headerSize + endPoint;
937 mPesOutput.insert(mPesOutput.end(), first, last);
938 // size does not match then continue
939 mPesSizeLeft -= endPoint;
940 if (DEBUG_FILTER) {
941 ALOGD("[Filter] pes data left %d", mPesSizeLeft);
942 }
943 if (mPesSizeLeft > 0 || mAvBufferCopyCount++ < 10) {
944 continue;
945 }
946
947 result = createMediaFilterEventWithIon(mPesOutput);
948 if (!result.isOk()) {
949 mFilterOutput.clear();
950 return result;
951 }
952 }
953
954 mFilterOutput.clear();
955
956 return ::ndk::ScopedAStatus::ok();
957 }
958
createMediaFilterEventWithIon(vector<int8_t> & output)959 ::ndk::ScopedAStatus Filter::createMediaFilterEventWithIon(vector<int8_t>& output) {
960 if (mUsingSharedAvMem) {
961 if (mSharedAvMemHandle == nullptr) {
962 return ::ndk::ScopedAStatus::fromServiceSpecificError(
963 static_cast<int32_t>(Result::UNKNOWN_ERROR));
964 }
965 return createShareMemMediaEvents(output);
966 }
967
968 return createIndependentMediaEvents(output);
969 }
970
startRecordFilterHandler()971 ::ndk::ScopedAStatus Filter::startRecordFilterHandler() {
972 std::lock_guard<std::mutex> lock(mRecordFilterOutputLock);
973 if (mRecordFilterOutput.empty()) {
974 return ::ndk::ScopedAStatus::ok();
975 }
976
977 if (mDvr == nullptr || !mDvr->writeRecordFMQ(mRecordFilterOutput)) {
978 ALOGD("[Filter] dvr fails to write into record FMQ.");
979 return ::ndk::ScopedAStatus::fromServiceSpecificError(
980 static_cast<int32_t>(Result::UNKNOWN_ERROR));
981 }
982
983 DemuxFilterTsRecordEvent recordEvent;
984 recordEvent = {
985 .byteNumber = static_cast<int64_t>(mRecordFilterOutput.size()),
986 .pts = (mPts == 0) ? static_cast<int64_t>(time(NULL)) * 900000 : mPts,
987 .firstMbInSlice = 0, // random address
988 };
989
990 {
991 std::lock_guard<std::mutex> lock(mFilterEventsLock);
992 mFilterEvents.push_back(
993 DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent));
994 }
995
996 mRecordFilterOutput.clear();
997 return ::ndk::ScopedAStatus::ok();
998 }
999
startPcrFilterHandler()1000 ::ndk::ScopedAStatus Filter::startPcrFilterHandler() {
1001 // TODO handle starting PCR filter
1002 return ::ndk::ScopedAStatus::ok();
1003 }
1004
startTemiFilterHandler()1005 ::ndk::ScopedAStatus Filter::startTemiFilterHandler() {
1006 // TODO handle starting TEMI filter
1007 return ::ndk::ScopedAStatus::ok();
1008 }
1009
1010 // Read PSI (Program Specific Information) Sections from TransportStreams
1011 // as defined in ISO/IEC 13818-1 Section 2.4.4
writeSectionsAndCreateEvent(vector<int8_t> & data)1012 bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) {
1013 // TODO check how many sections has been read
1014 ALOGD("[Filter] section handler");
1015
1016 // Transport Stream Packets are 188 bytes long, as defined in the
1017 // Introduction of ISO/IEC 13818-1
1018 for (int i = 0; i < data.size(); i += 188) {
1019 if (mSectionSizeLeft == 0) {
1020 // Location for sectionSize as defined by Section 2.4.4
1021 // Note that the first 4 bytes skipped are the TsHeader
1022 mSectionSizeLeft = ((static_cast<uint8_t>(data[i + 5]) & 0x0f) << 8) |
1023 static_cast<uint8_t>(data[i + 6]);
1024 mSectionSizeLeft += 3;
1025 if (DEBUG_FILTER) {
1026 ALOGD("[Filter] section data length %d", mSectionSizeLeft);
1027 }
1028 }
1029
1030 // 184 bytes per packet is derived by subtracting the 4 byte length of
1031 // the TsHeader from its 188 byte packet size
1032 uint32_t endPoint = min(184u, mSectionSizeLeft);
1033 // append data and check size
1034 vector<int8_t>::const_iterator first = data.begin() + i + 4;
1035 vector<int8_t>::const_iterator last = data.begin() + i + 4 + endPoint;
1036 mSectionOutput.insert(mSectionOutput.end(), first, last);
1037 // size does not match then continue
1038 mSectionSizeLeft -= endPoint;
1039 if (DEBUG_FILTER) {
1040 ALOGD("[Filter] section data left %d", mSectionSizeLeft);
1041 }
1042 if (mSectionSizeLeft > 0) {
1043 continue;
1044 }
1045
1046 if (!writeDataToFilterMQ(mSectionOutput)) {
1047 mSectionOutput.clear();
1048 return false;
1049 }
1050
1051 DemuxFilterSectionEvent secEvent;
1052 secEvent = {
1053 // temp dump meta data
1054 .tableId = 0,
1055 .version = 1,
1056 .sectionNum = 1,
1057 .dataLength = static_cast<int32_t>(mSectionOutput.size()),
1058 };
1059 if (DEBUG_FILTER) {
1060 ALOGD("[Filter] assembled section data length %" PRIu64, secEvent.dataLength);
1061 }
1062
1063 {
1064 std::lock_guard<std::mutex> lock(mFilterEventsLock);
1065 mFilterEvents.push_back(
1066 DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent));
1067 }
1068 mSectionOutput.clear();
1069 }
1070
1071 return true;
1072 }
1073
writeDataToFilterMQ(const std::vector<int8_t> & data)1074 bool Filter::writeDataToFilterMQ(const std::vector<int8_t>& data) {
1075 std::lock_guard<std::mutex> lock(mWriteLock);
1076 if (mFilterMQ->write(data.data(), data.size())) {
1077 return true;
1078 }
1079 return false;
1080 }
1081
attachFilterToRecord(const std::shared_ptr<Dvr> dvr)1082 void Filter::attachFilterToRecord(const std::shared_ptr<Dvr> dvr) {
1083 mDvr = dvr;
1084 }
1085
detachFilterFromRecord()1086 void Filter::detachFilterFromRecord() {
1087 mDvr = nullptr;
1088 }
1089
createAvIonFd(int size)1090 int Filter::createAvIonFd(int size) {
1091 // Create an DMA-BUF fd and allocate an av fd mapped to a buffer to it.
1092 auto buffer_allocator = std::make_unique<BufferAllocator>();
1093 if (!buffer_allocator) {
1094 ALOGE("[Filter] Unable to create BufferAllocator object");
1095 return -1;
1096 }
1097 int av_fd = -1;
1098 av_fd = buffer_allocator->Alloc("system-uncached", size);
1099 if (av_fd < 0) {
1100 ALOGE("[Filter] Failed to create av fd %d", errno);
1101 return -1;
1102 }
1103 return av_fd;
1104 }
1105
getIonBuffer(int fd,int size)1106 uint8_t* Filter::getIonBuffer(int fd, int size) {
1107 uint8_t* avBuf = static_cast<uint8_t*>(
1108 mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0 /*offset*/));
1109 if (avBuf == MAP_FAILED) {
1110 ALOGE("[Filter] fail to allocate buffer %d", errno);
1111 return NULL;
1112 }
1113 return avBuf;
1114 }
1115
createNativeHandle(int fd)1116 native_handle_t* Filter::createNativeHandle(int fd) {
1117 native_handle_t* nativeHandle;
1118 if (fd < 0) {
1119 nativeHandle = native_handle_create(/*numFd*/ 0, 0);
1120 } else {
1121 // Create a native handle to pass the av fd via the callback event.
1122 nativeHandle = native_handle_create(/*numFd*/ 1, 0);
1123 }
1124 if (nativeHandle == NULL) {
1125 ALOGE("[Filter] Failed to create native_handle %d", errno);
1126 return NULL;
1127 }
1128 if (nativeHandle->numFds > 0) {
1129 nativeHandle->data[0] = dup(fd);
1130 }
1131 return nativeHandle;
1132 }
1133
createIndependentMediaEvents(vector<int8_t> & output)1134 ::ndk::ScopedAStatus Filter::createIndependentMediaEvents(vector<int8_t>& output) {
1135 int av_fd = createAvIonFd(output.size());
1136 if (av_fd == -1) {
1137 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1138 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1139 }
1140 // copy the filtered data to the buffer
1141 uint8_t* avBuffer = getIonBuffer(av_fd, output.size());
1142 if (avBuffer == NULL) {
1143 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1144 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1145 }
1146 memcpy(avBuffer, output.data(), output.size() * sizeof(uint8_t));
1147
1148 native_handle_t* nativeHandle = createNativeHandle(av_fd);
1149 if (nativeHandle == NULL) {
1150 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1151 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1152 }
1153
1154 // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1155 uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1156 mDataId2Avfd[dataId] = dup(av_fd);
1157
1158 // Create mediaEvent and send callback
1159 auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1160 auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1161 mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1162 mediaEvent.dataLength = static_cast<int64_t>(output.size());
1163 mediaEvent.avDataId = static_cast<int64_t>(dataId);
1164 if (mPts) {
1165 mediaEvent.pts = mPts;
1166 mPts = 0;
1167 }
1168
1169 {
1170 std::lock_guard<std::mutex> lock(mFilterEventsLock);
1171 mFilterEvents.push_back(std::move(event));
1172 }
1173
1174 // Clear and log
1175 native_handle_close(nativeHandle);
1176 native_handle_delete(nativeHandle);
1177 output.clear();
1178 mAvBufferCopyCount = 0;
1179 if (DEBUG_FILTER) {
1180 ALOGD("[Filter] av data length %d", static_cast<int32_t>(output.size()));
1181 }
1182 return ::ndk::ScopedAStatus::ok();
1183 }
1184
createShareMemMediaEvents(vector<int8_t> & output)1185 ::ndk::ScopedAStatus Filter::createShareMemMediaEvents(vector<int8_t>& output) {
1186 // copy the filtered data to the shared buffer
1187 uint8_t* sharedAvBuffer =
1188 getIonBuffer(mSharedAvMemHandle->data[0], output.size() + mSharedAvMemOffset);
1189 if (sharedAvBuffer == NULL) {
1190 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1191 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1192 }
1193 memcpy(sharedAvBuffer + mSharedAvMemOffset, output.data(), output.size() * sizeof(uint8_t));
1194
1195 // Create a memory handle with numFds == 0
1196 native_handle_t* nativeHandle = createNativeHandle(-1);
1197 if (nativeHandle == NULL) {
1198 return ::ndk::ScopedAStatus::fromServiceSpecificError(
1199 static_cast<int32_t>(Result::UNKNOWN_ERROR));
1200 }
1201
1202 // Create mediaEvent and send callback
1203 auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
1204 auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
1205 mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1206 mediaEvent.offset = mSharedAvMemOffset;
1207 mediaEvent.dataLength = static_cast<int64_t>(output.size());
1208 if (mPts) {
1209 mediaEvent.pts = mPts;
1210 mPts = 0;
1211 }
1212
1213 {
1214 std::lock_guard<std::mutex> lock(mFilterEventsLock);
1215 mFilterEvents.push_back(std::move(event));
1216 }
1217
1218 mSharedAvMemOffset += output.size();
1219
1220 // Clear and log
1221 native_handle_close(nativeHandle);
1222 native_handle_delete(nativeHandle);
1223 output.clear();
1224 if (DEBUG_FILTER) {
1225 ALOGD("[Filter] shared av data length %d", static_cast<int32_t>(output.size()));
1226 }
1227 return ::ndk::ScopedAStatus::ok();
1228 }
1229
sameFile(int fd1,int fd2)1230 bool Filter::sameFile(int fd1, int fd2) {
1231 struct stat stat1, stat2;
1232 if (fstat(fd1, &stat1) < 0 || fstat(fd2, &stat2) < 0) {
1233 return false;
1234 }
1235 return (stat1.st_dev == stat2.st_dev) && (stat1.st_ino == stat2.st_ino);
1236 }
1237
createMediaEvent(vector<DemuxFilterEvent> & events,bool isAudioPresentation)1238 void Filter::createMediaEvent(vector<DemuxFilterEvent>& events, bool isAudioPresentation) {
1239 DemuxFilterMediaEvent mediaEvent;
1240 mediaEvent.streamId = 1;
1241 mediaEvent.isPtsPresent = true;
1242 mediaEvent.isDtsPresent = false;
1243 mediaEvent.dataLength = 3;
1244 mediaEvent.offset = 4;
1245 mediaEvent.isSecureMemory = true;
1246 mediaEvent.mpuSequenceNumber = 6;
1247 mediaEvent.isPesPrivateData = true;
1248
1249 if (isAudioPresentation) {
1250 AudioPresentation audioPresentation0{
1251 .preselection.preselectionId = 0,
1252 .preselection.labels = {{"en", "Commentator"}, {"es", "Comentarista"}},
1253 .preselection.language = "en",
1254 .preselection.renderingIndication =
1255 AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1256 .preselection.hasAudioDescription = false,
1257 .preselection.hasSpokenSubtitles = false,
1258 .preselection.hasDialogueEnhancement = true,
1259 .ac4ShortProgramId = 42};
1260 AudioPresentation audioPresentation1{
1261 .preselection.preselectionId = 1,
1262 .preselection.labels = {{"en", "Crowd"}, {"es", "Multitud"}},
1263 .preselection.language = "en",
1264 .preselection.renderingIndication =
1265 AudioPreselectionRenderingIndicationType::THREE_DIMENSIONAL,
1266 .preselection.hasAudioDescription = false,
1267 .preselection.hasSpokenSubtitles = false,
1268 .preselection.hasDialogueEnhancement = false,
1269 .ac4ShortProgramId = 42};
1270 vector<AudioPresentation> audioPresentations;
1271 audioPresentations.push_back(audioPresentation0);
1272 audioPresentations.push_back(audioPresentation1);
1273 mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audioPresentations>(
1274 audioPresentations);
1275 } else {
1276 AudioExtraMetaData audio;
1277 audio.adFade = 1;
1278 audio.adPan = 2;
1279 audio.versionTextTag = 3;
1280 audio.adGainCenter = 4;
1281 audio.adGainFront = 5;
1282 audio.adGainSurround = 6;
1283 mediaEvent.extraMetaData.set<DemuxFilterMediaEventExtraMetaData::Tag::audio>(audio);
1284 }
1285
1286 int av_fd = createAvIonFd(BUFFER_SIZE);
1287 if (av_fd == -1) {
1288 return;
1289 }
1290
1291 native_handle_t* nativeHandle = createNativeHandle(av_fd);
1292 if (nativeHandle == nullptr) {
1293 ::close(av_fd);
1294 ALOGE("[Filter] Failed to create native_handle %d", errno);
1295 return;
1296 }
1297
1298 // Create a dataId and add a <dataId, av_fd> pair into the dataId2Avfd map
1299 uint64_t dataId = mLastUsedDataId++ /*createdUID*/;
1300 mDataId2Avfd[dataId] = dup(av_fd);
1301
1302 mediaEvent.avDataId = static_cast<int64_t>(dataId);
1303 mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
1304
1305 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(std::move(mediaEvent)));
1306
1307 native_handle_close(nativeHandle);
1308 native_handle_delete(nativeHandle);
1309 }
1310
createTsRecordEvent(vector<DemuxFilterEvent> & events)1311 void Filter::createTsRecordEvent(vector<DemuxFilterEvent>& events) {
1312 DemuxPid pid;
1313 DemuxFilterScIndexMask mask;
1314 DemuxFilterTsRecordEvent tsRecord1;
1315 pid.set<DemuxPid::Tag::tPid>(1);
1316 mask.set<DemuxFilterScIndexMask::Tag::scIndex>(1);
1317 tsRecord1.pid = pid;
1318 tsRecord1.tsIndexMask = 1;
1319 tsRecord1.scIndexMask = mask;
1320 tsRecord1.byteNumber = 2;
1321
1322 DemuxFilterTsRecordEvent tsRecord2;
1323 tsRecord2.pts = 1;
1324 tsRecord2.firstMbInSlice = 2; // random address
1325
1326 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord1)));
1327 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(std::move(tsRecord2)));
1328 }
1329
createMmtpRecordEvent(vector<DemuxFilterEvent> & events)1330 void Filter::createMmtpRecordEvent(vector<DemuxFilterEvent>& events) {
1331 DemuxFilterMmtpRecordEvent mmtpRecord1;
1332 mmtpRecord1.scHevcIndexMask = 1;
1333 mmtpRecord1.byteNumber = 2;
1334
1335 DemuxFilterMmtpRecordEvent mmtpRecord2;
1336 mmtpRecord2.pts = 1;
1337 mmtpRecord2.mpuSequenceNumber = 2;
1338 mmtpRecord2.firstMbInSlice = 3;
1339 mmtpRecord2.tsIndexMask = 4;
1340
1341 events.push_back(
1342 DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord1)));
1343 events.push_back(
1344 DemuxFilterEvent::make<DemuxFilterEvent::Tag::mmtpRecord>(std::move(mmtpRecord2)));
1345 }
1346
createSectionEvent(vector<DemuxFilterEvent> & events)1347 void Filter::createSectionEvent(vector<DemuxFilterEvent>& events) {
1348 DemuxFilterSectionEvent section;
1349 section.tableId = 1;
1350 section.version = 2;
1351 section.sectionNum = 3;
1352 section.dataLength = 0;
1353
1354 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(std::move(section)));
1355 }
1356
createPesEvent(vector<DemuxFilterEvent> & events)1357 void Filter::createPesEvent(vector<DemuxFilterEvent>& events) {
1358 DemuxFilterPesEvent pes;
1359 pes.streamId = 1;
1360 pes.dataLength = 1;
1361 pes.mpuSequenceNumber = 2;
1362
1363 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(std::move(pes)));
1364 }
1365
createDownloadEvent(vector<DemuxFilterEvent> & events)1366 void Filter::createDownloadEvent(vector<DemuxFilterEvent>& events) {
1367 DemuxFilterDownloadEvent download;
1368 download.itemId = 1;
1369 download.downloadId = 1;
1370 download.mpuSequenceNumber = 2;
1371 download.itemFragmentIndex = 3;
1372 download.lastItemFragmentIndex = 4;
1373 download.dataLength = 0;
1374
1375 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::download>(std::move(download)));
1376 }
1377
createIpPayloadEvent(vector<DemuxFilterEvent> & events)1378 void Filter::createIpPayloadEvent(vector<DemuxFilterEvent>& events) {
1379 DemuxFilterIpPayloadEvent ipPayload;
1380 ipPayload.dataLength = 0;
1381
1382 events.push_back(
1383 DemuxFilterEvent::make<DemuxFilterEvent::Tag::ipPayload>(std::move(ipPayload)));
1384 }
1385
createTemiEvent(vector<DemuxFilterEvent> & events)1386 void Filter::createTemiEvent(vector<DemuxFilterEvent>& events) {
1387 DemuxFilterTemiEvent temi;
1388 temi.pts = 1;
1389 temi.descrTag = 2;
1390 temi.descrData = {3};
1391
1392 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::temi>(std::move(temi)));
1393 }
1394
createMonitorEvent(vector<DemuxFilterEvent> & events)1395 void Filter::createMonitorEvent(vector<DemuxFilterEvent>& events) {
1396 DemuxFilterMonitorEvent monitor;
1397 monitor.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>(ScramblingStatus::SCRAMBLED);
1398
1399 events.push_back(
1400 DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(std::move(monitor)));
1401 }
1402
createRestartEvent(vector<DemuxFilterEvent> & events)1403 void Filter::createRestartEvent(vector<DemuxFilterEvent>& events) {
1404 events.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(1));
1405 }
1406
1407 } // namespace tuner
1408 } // namespace tv
1409 } // namespace hardware
1410 } // namespace android
1411 } // namespace aidl
1412