1 /*
2 * Copyright 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 //#define LOG_NDEBUG 0
18 #define LOG_TAG "android.hardware.tv.tuner-service.example-Dvr"
19
20 #include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
21 #include <aidl/android/hardware/tv/tuner/Result.h>
22
23 #include <utils/Log.h>
24 #include "Dvr.h"
25
26 namespace aidl {
27 namespace android {
28 namespace hardware {
29 namespace tv {
30 namespace tuner {
31
32 #define WAIT_TIMEOUT 3000000000
33
Dvr(DvrType type,uint32_t bufferSize,const std::shared_ptr<IDvrCallback> & cb,std::shared_ptr<Demux> demux)34 Dvr::Dvr(DvrType type, uint32_t bufferSize, const std::shared_ptr<IDvrCallback>& cb,
35 std::shared_ptr<Demux> demux) {
36 mType = type;
37 mBufferSize = bufferSize;
38 mCallback = cb;
39 mDemux = demux;
40 }
41
~Dvr()42 Dvr::~Dvr() {
43 // make sure thread has joined
44 close();
45 }
46
getQueueDesc(MQDescriptor<int8_t,SynchronizedReadWrite> * out_queue)47 ::ndk::ScopedAStatus Dvr::getQueueDesc(MQDescriptor<int8_t, SynchronizedReadWrite>* out_queue) {
48 ALOGV("%s", __FUNCTION__);
49
50 *out_queue = mDvrMQ->dupeDesc();
51
52 return ::ndk::ScopedAStatus::ok();
53 }
54
configure(const DvrSettings & in_settings)55 ::ndk::ScopedAStatus Dvr::configure(const DvrSettings& in_settings) {
56 ALOGV("%s", __FUNCTION__);
57
58 mDvrSettings = in_settings;
59 mDvrConfigured = true;
60
61 return ::ndk::ScopedAStatus::ok();
62 }
63
attachFilter(const std::shared_ptr<IFilter> & in_filter)64 ::ndk::ScopedAStatus Dvr::attachFilter(const std::shared_ptr<IFilter>& in_filter) {
65 ALOGV("%s", __FUNCTION__);
66
67 int64_t filterId;
68 ::ndk::ScopedAStatus status = in_filter->getId64Bit(&filterId);
69 if (!status.isOk()) {
70 return status;
71 }
72
73 if (!mDemux->attachRecordFilter(filterId)) {
74 return ::ndk::ScopedAStatus::fromServiceSpecificError(
75 static_cast<int32_t>(Result::INVALID_ARGUMENT));
76 }
77
78 return ::ndk::ScopedAStatus::ok();
79 }
80
detachFilter(const std::shared_ptr<IFilter> & in_filter)81 ::ndk::ScopedAStatus Dvr::detachFilter(const std::shared_ptr<IFilter>& in_filter) {
82 ALOGV("%s", __FUNCTION__);
83
84 int64_t filterId;
85 ::ndk::ScopedAStatus status = in_filter->getId64Bit(&filterId);
86 if (!status.isOk()) {
87 return status;
88 }
89
90 if (!mDemux->detachRecordFilter(filterId)) {
91 return ::ndk::ScopedAStatus::fromServiceSpecificError(
92 static_cast<int32_t>(Result::INVALID_ARGUMENT));
93 }
94
95 return ::ndk::ScopedAStatus::ok();
96 }
97
start()98 ::ndk::ScopedAStatus Dvr::start() {
99 ALOGV("%s", __FUNCTION__);
100 if (mDvrThreadRunning) {
101 return ::ndk::ScopedAStatus::ok();
102 }
103
104 if (!mCallback) {
105 return ::ndk::ScopedAStatus::fromServiceSpecificError(
106 static_cast<int32_t>(Result::NOT_INITIALIZED));
107 }
108
109 if (!mDvrConfigured) {
110 return ::ndk::ScopedAStatus::fromServiceSpecificError(
111 static_cast<int32_t>(Result::INVALID_STATE));
112 }
113
114 if (mType == DvrType::PLAYBACK) {
115 mDvrThreadRunning = true;
116 mDvrThread = std::thread(&Dvr::playbackThreadLoop, this);
117 } else if (mType == DvrType::RECORD) {
118 mRecordStatus = RecordStatus::DATA_READY;
119 mDemux->setIsRecording(mType == DvrType::RECORD);
120 }
121
122 // TODO start another thread to send filter status callback to the framework
123
124 return ::ndk::ScopedAStatus::ok();
125 }
126
stop()127 ::ndk::ScopedAStatus Dvr::stop() {
128 ALOGV("%s", __FUNCTION__);
129
130 mDvrThreadRunning = false;
131 if (mDvrThread.joinable()) {
132 mDvrThread.join();
133 }
134 // thread should always be joinable if it is running,
135 // so it should be safe to assume recording stopped.
136 mDemux->setIsRecording(false);
137
138 return ::ndk::ScopedAStatus::ok();
139 }
140
flush()141 ::ndk::ScopedAStatus Dvr::flush() {
142 ALOGV("%s", __FUNCTION__);
143
144 mRecordStatus = RecordStatus::DATA_READY;
145
146 return ::ndk::ScopedAStatus::ok();
147 }
148
close()149 ::ndk::ScopedAStatus Dvr::close() {
150 ALOGV("%s", __FUNCTION__);
151
152 stop();
153
154 return ::ndk::ScopedAStatus::ok();
155 }
156
setStatusCheckIntervalHint(int64_t)157 ::ndk::ScopedAStatus Dvr::setStatusCheckIntervalHint(int64_t /* in_milliseconds */) {
158 ALOGV("%s", __FUNCTION__);
159
160 // There is no active polling in this default implementation,
161 // so directly return ok here.
162 return ::ndk::ScopedAStatus::ok();
163 }
164
createDvrMQ()165 bool Dvr::createDvrMQ() {
166 ALOGV("%s", __FUNCTION__);
167
168 // Create a synchronized FMQ that supports blocking read/write
169 unique_ptr<DvrMQ> tmpDvrMQ = unique_ptr<DvrMQ>(new (nothrow) DvrMQ(mBufferSize, true));
170 if (!tmpDvrMQ->isValid()) {
171 ALOGW("[Dvr] Failed to create FMQ of DVR");
172 return false;
173 }
174
175 mDvrMQ = std::move(tmpDvrMQ);
176
177 if (EventFlag::createEventFlag(mDvrMQ->getEventFlagWord(), &mDvrEventFlag) != ::android::OK) {
178 return false;
179 }
180
181 return true;
182 }
183
getDvrEventFlag()184 EventFlag* Dvr::getDvrEventFlag() {
185 return mDvrEventFlag;
186 }
187
dump(int fd,const char **,uint32_t)188 binder_status_t Dvr::dump(int fd, const char** /* args */, uint32_t /* numArgs */) {
189 dprintf(fd, " Dvr:\n");
190 dprintf(fd, " mType: %hhd\n", mType);
191 dprintf(fd, " mDvrThreadRunning: %d\n", (bool)mDvrThreadRunning);
192 return STATUS_OK;
193 }
194
playbackThreadLoop()195 void Dvr::playbackThreadLoop() {
196 ALOGD("[Dvr] playback threadLoop start.");
197
198 while (mDvrThreadRunning) {
199 uint32_t efState = 0;
200 ::android::status_t status =
201 mDvrEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
202 &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
203 if (status != ::android::OK) {
204 ALOGD("[Dvr] wait for data ready on the playback FMQ");
205 continue;
206 }
207
208 // If the both dvr playback and dvr record are created, the playback will be treated as
209 // the source of the record. isVirtualFrontend set to true would direct the dvr playback
210 // input to the demux record filters or live broadcast filters.
211 bool isRecording = mDemux->isRecording();
212 bool isVirtualFrontend = isRecording;
213
214 if (mDvrSettings.get<DvrSettings::Tag::playback>().dataFormat == DataFormat::ES) {
215 if (!processEsDataOnPlayback(isVirtualFrontend, isRecording)) {
216 ALOGE("[Dvr] playback es data failed to be filtered. Ending thread");
217 break;
218 }
219 maySendPlaybackStatusCallback();
220 continue;
221 }
222
223 // Our current implementation filter the data and write it into the filter FMQ immediately
224 // after the DATA_READY from the VTS/framework
225 // This is for the non-ES data source, real playback use case handling.
226 if (!readPlaybackFMQ(isVirtualFrontend, isRecording) ||
227 !startFilterDispatcher(isVirtualFrontend, isRecording)) {
228 ALOGE("[Dvr] playback data failed to be filtered. Ending thread");
229 break;
230 }
231
232 maySendPlaybackStatusCallback();
233 }
234
235 mDvrThreadRunning = false;
236 ALOGD("[Dvr] playback thread ended.");
237 }
238
maySendIptvPlaybackStatusCallback()239 void Dvr::maySendIptvPlaybackStatusCallback() {
240 lock_guard<mutex> lock(mPlaybackStatusLock);
241 int availableToRead = mDvrMQ->availableToRead();
242 int availableToWrite = mDvrMQ->availableToWrite();
243
244 PlaybackStatus newStatus = checkPlaybackStatusChange(availableToWrite, availableToRead,
245 IPTV_PLAYBACK_STATUS_THRESHOLD_HIGH,
246 IPTV_PLAYBACK_STATUS_THRESHOLD_LOW);
247 if (mPlaybackStatus != newStatus) {
248 map<int64_t, std::shared_ptr<Filter>>::iterator it;
249 for (it = mFilters.begin(); it != mFilters.end(); it++) {
250 std::shared_ptr<Filter> currentFilter = it->second;
251 currentFilter->setIptvDvrPlaybackStatus(newStatus);
252 }
253 mCallback->onPlaybackStatus(newStatus);
254 mPlaybackStatus = newStatus;
255 }
256 }
257
maySendPlaybackStatusCallback()258 void Dvr::maySendPlaybackStatusCallback() {
259 lock_guard<mutex> lock(mPlaybackStatusLock);
260 int availableToRead = mDvrMQ->availableToRead();
261 int availableToWrite = mDvrMQ->availableToWrite();
262
263 PlaybackStatus newStatus =
264 checkPlaybackStatusChange(availableToWrite, availableToRead,
265 mDvrSettings.get<DvrSettings::Tag::playback>().highThreshold,
266 mDvrSettings.get<DvrSettings::Tag::playback>().lowThreshold);
267 if (mPlaybackStatus != newStatus) {
268 mCallback->onPlaybackStatus(newStatus);
269 mPlaybackStatus = newStatus;
270 }
271 }
272
checkPlaybackStatusChange(uint32_t availableToWrite,uint32_t availableToRead,int64_t highThreshold,int64_t lowThreshold)273 PlaybackStatus Dvr::checkPlaybackStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
274 int64_t highThreshold, int64_t lowThreshold) {
275 if (availableToWrite == 0) {
276 return PlaybackStatus::SPACE_FULL;
277 } else if (availableToRead > highThreshold) {
278 return PlaybackStatus::SPACE_ALMOST_FULL;
279 } else if (availableToRead < lowThreshold) {
280 return PlaybackStatus::SPACE_ALMOST_EMPTY;
281 } else if (availableToRead == 0) {
282 return PlaybackStatus::SPACE_EMPTY;
283 }
284 return mPlaybackStatus;
285 }
286
readPlaybackFMQ(bool isVirtualFrontend,bool isRecording)287 bool Dvr::readPlaybackFMQ(bool isVirtualFrontend, bool isRecording) {
288 // Read playback data from the input FMQ
289 size_t size = mDvrMQ->availableToRead();
290 int64_t playbackPacketSize = mDvrSettings.get<DvrSettings::Tag::playback>().packetSize;
291 vector<int8_t> dataOutputBuffer;
292 dataOutputBuffer.resize(playbackPacketSize);
293 // Dispatch the packet to the PID matching filter output buffer
294 for (int i = 0; i < size / playbackPacketSize; i++) {
295 if (!mDvrMQ->read(dataOutputBuffer.data(), playbackPacketSize)) {
296 return false;
297 }
298 if (isVirtualFrontend) {
299 if (isRecording) {
300 mDemux->sendFrontendInputToRecord(dataOutputBuffer);
301 } else {
302 mDemux->startBroadcastTsFilter(dataOutputBuffer);
303 }
304 } else {
305 startTpidFilter(dataOutputBuffer);
306 }
307 }
308
309 return true;
310 }
311
processEsDataOnPlayback(bool isVirtualFrontend,bool isRecording)312 bool Dvr::processEsDataOnPlayback(bool isVirtualFrontend, bool isRecording) {
313 // Read ES from the DVR FMQ
314 // Note that currently we only provides ES with metaData in a specific format to be parsed.
315 // The ES size should be smaller than the Playback FMQ size to avoid reading truncated data.
316 int size = mDvrMQ->availableToRead();
317 vector<int8_t> dataOutputBuffer;
318 dataOutputBuffer.resize(size);
319 if (!mDvrMQ->read(dataOutputBuffer.data(), size)) {
320 return false;
321 }
322
323 int metaDataSize = size;
324 int totalFrames = 0;
325 int videoEsDataSize = 0;
326 int audioEsDataSize = 0;
327 int audioPid = 0;
328 int videoPid = 0;
329
330 vector<MediaEsMetaData> esMeta;
331 int videoReadPointer = 0;
332 int audioReadPointer = 0;
333 int frameCount = 0;
334 // Get meta data from the es
335 for (int i = 0; i < metaDataSize; i++) {
336 switch (dataOutputBuffer[i]) {
337 case 'm':
338 metaDataSize = 0;
339 getMetaDataValue(i, dataOutputBuffer.data(), metaDataSize);
340 videoReadPointer = metaDataSize;
341 continue;
342 case 'l':
343 getMetaDataValue(i, dataOutputBuffer.data(), totalFrames);
344 esMeta.resize(totalFrames);
345 continue;
346 case 'V':
347 getMetaDataValue(i, dataOutputBuffer.data(), videoEsDataSize);
348 audioReadPointer = metaDataSize + videoEsDataSize;
349 continue;
350 case 'A':
351 getMetaDataValue(i, dataOutputBuffer.data(), audioEsDataSize);
352 continue;
353 case 'p':
354 if (dataOutputBuffer[++i] == 'a') {
355 getMetaDataValue(i, dataOutputBuffer.data(), audioPid);
356 } else if (dataOutputBuffer[i] == 'v') {
357 getMetaDataValue(i, dataOutputBuffer.data(), videoPid);
358 }
359 continue;
360 case 'v':
361 case 'a':
362 if (dataOutputBuffer[i + 1] != ',') {
363 ALOGE("[Dvr] Invalid format meta data.");
364 return false;
365 }
366 esMeta[frameCount] = {
367 .isAudio = dataOutputBuffer[i] == 'a' ? true : false,
368 };
369 i += 5; // Move to Len
370 getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].len);
371 if (esMeta[frameCount].isAudio) {
372 esMeta[frameCount].startIndex = audioReadPointer;
373 audioReadPointer += esMeta[frameCount].len;
374 } else {
375 esMeta[frameCount].startIndex = videoReadPointer;
376 videoReadPointer += esMeta[frameCount].len;
377 }
378 i += 4; // move to PTS
379 getMetaDataValue(i, dataOutputBuffer.data(), esMeta[frameCount].pts);
380 frameCount++;
381 continue;
382 default:
383 continue;
384 }
385 }
386
387 if (frameCount != totalFrames) {
388 ALOGE("[Dvr] Invalid meta data, frameCount=%d, totalFrames reported=%d", frameCount,
389 totalFrames);
390 return false;
391 }
392
393 if (metaDataSize + audioEsDataSize + videoEsDataSize != size) {
394 ALOGE("[Dvr] Invalid meta data, metaSize=%d, videoSize=%d, audioSize=%d, totolSize=%d",
395 metaDataSize, videoEsDataSize, audioEsDataSize, size);
396 return false;
397 }
398
399 // Read es raw data from the FMQ per meta data built previously
400 vector<int8_t> frameData;
401 map<int64_t, std::shared_ptr<Filter>>::iterator it;
402 int pid = 0;
403 for (int i = 0; i < totalFrames; i++) {
404 frameData.resize(esMeta[i].len);
405 pid = esMeta[i].isAudio ? audioPid : videoPid;
406 memcpy(frameData.data(), dataOutputBuffer.data() + esMeta[i].startIndex, esMeta[i].len);
407 // Send to the media filters or record filters
408 if (!isRecording) {
409 for (it = mFilters.begin(); it != mFilters.end(); it++) {
410 if (pid == mDemux->getFilterTpid(it->first)) {
411 mDemux->updateMediaFilterOutput(it->first, frameData,
412 static_cast<uint64_t>(esMeta[i].pts));
413 }
414 }
415 } else {
416 mDemux->sendFrontendInputToRecord(frameData, pid, static_cast<uint64_t>(esMeta[i].pts));
417 }
418 startFilterDispatcher(isVirtualFrontend, isRecording);
419 frameData.clear();
420 }
421
422 return true;
423 }
424
getMetaDataValue(int & index,int8_t * dataOutputBuffer,int & value)425 void Dvr::getMetaDataValue(int& index, int8_t* dataOutputBuffer, int& value) {
426 index += 2; // Move the pointer across the ":" to the value
427 while (dataOutputBuffer[index] != ',' && dataOutputBuffer[index] != '\n') {
428 value = ((dataOutputBuffer[index++] - 48) + value * 10);
429 }
430 }
431
startTpidFilter(vector<int8_t> data)432 void Dvr::startTpidFilter(vector<int8_t> data) {
433 map<int64_t, std::shared_ptr<Filter>>::iterator it;
434 for (it = mFilters.begin(); it != mFilters.end(); it++) {
435 uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff));
436 if (DEBUG_DVR) {
437 ALOGW("[Dvr] start ts filter pid: %d", pid);
438 }
439 if (pid == mDemux->getFilterTpid(it->first)) {
440 mDemux->updateFilterOutput(it->first, data);
441 }
442 }
443 }
444
startFilterDispatcher(bool isVirtualFrontend,bool isRecording)445 bool Dvr::startFilterDispatcher(bool isVirtualFrontend, bool isRecording) {
446 if (isVirtualFrontend) {
447 if (isRecording) {
448 return mDemux->startRecordFilterDispatcher();
449 } else {
450 return mDemux->startBroadcastFilterDispatcher();
451 }
452 }
453
454 map<int64_t, std::shared_ptr<Filter>>::iterator it;
455 // Handle the output data per filter type
456 for (it = mFilters.begin(); it != mFilters.end(); it++) {
457 if (!mDemux->startFilterHandler(it->first).isOk()) {
458 return false;
459 }
460 }
461
462 return true;
463 }
464
writePlaybackFMQ(void * buf,size_t size)465 int Dvr::writePlaybackFMQ(void* buf, size_t size) {
466 lock_guard<mutex> lock(mWriteLock);
467 ALOGI("Playback status: %d", mPlaybackStatus);
468 if (mPlaybackStatus == PlaybackStatus::SPACE_FULL) {
469 ALOGW("[Dvr] stops writing and wait for the client side flushing.");
470 return DVR_WRITE_FAILURE_REASON_FMQ_FULL;
471 }
472 ALOGI("availableToWrite before: %zu", mDvrMQ->availableToWrite());
473 if (mDvrMQ->write((int8_t*)buf, size)) {
474 mDvrEventFlag->wake(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY));
475 ALOGI("availableToWrite: %zu", mDvrMQ->availableToWrite());
476 maySendIptvPlaybackStatusCallback();
477 return DVR_WRITE_SUCCESS;
478 }
479 maySendIptvPlaybackStatusCallback();
480 return DVR_WRITE_FAILURE_REASON_UNKNOWN;
481 }
482
writeRecordFMQ(const vector<int8_t> & data)483 bool Dvr::writeRecordFMQ(const vector<int8_t>& data) {
484 lock_guard<mutex> lock(mWriteLock);
485 if (mRecordStatus == RecordStatus::OVERFLOW) {
486 ALOGW("[Dvr] stops writing and wait for the client side flushing.");
487 return true;
488 }
489 if (mDvrMQ->write(data.data(), data.size())) {
490 mDvrEventFlag->wake(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY));
491 maySendRecordStatusCallback();
492 return true;
493 }
494
495 maySendRecordStatusCallback();
496 return false;
497 }
498
maySendRecordStatusCallback()499 void Dvr::maySendRecordStatusCallback() {
500 lock_guard<mutex> lock(mRecordStatusLock);
501 int availableToRead = mDvrMQ->availableToRead();
502 int availableToWrite = mDvrMQ->availableToWrite();
503
504 RecordStatus newStatus =
505 checkRecordStatusChange(availableToWrite, availableToRead,
506 mDvrSettings.get<DvrSettings::Tag::record>().highThreshold,
507 mDvrSettings.get<DvrSettings::Tag::record>().lowThreshold);
508 if (mRecordStatus != newStatus) {
509 mCallback->onRecordStatus(newStatus);
510 mRecordStatus = newStatus;
511 }
512 }
513
checkRecordStatusChange(uint32_t availableToWrite,uint32_t availableToRead,int64_t highThreshold,int64_t lowThreshold)514 RecordStatus Dvr::checkRecordStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
515 int64_t highThreshold, int64_t lowThreshold) {
516 if (availableToWrite == 0) {
517 return RecordStatus::OVERFLOW;
518 } else if (availableToRead > highThreshold) {
519 return RecordStatus::HIGH_WATER;
520 } else if (availableToRead < lowThreshold) {
521 return RecordStatus::LOW_WATER;
522 }
523 return mRecordStatus;
524 }
525
addPlaybackFilter(int64_t filterId,std::shared_ptr<Filter> filter)526 bool Dvr::addPlaybackFilter(int64_t filterId, std::shared_ptr<Filter> filter) {
527 mFilters[filterId] = filter;
528 return true;
529 }
530
removePlaybackFilter(int64_t filterId)531 bool Dvr::removePlaybackFilter(int64_t filterId) {
532 mFilters.erase(filterId);
533 return true;
534 }
535
536 } // namespace tuner
537 } // namespace tv
538 } // namespace hardware
539 } // namespace android
540 } // namespace aidl
541