Ticket #712: drb-v1.patch
File drb-v1.patch, 92.1 KB (added by , 20 years ago) |
---|
-
libs/libmythtv/dvbrecorder.cpp
72 72 const int DVBRecorder::POLL_INTERVAL = 50; // msec 73 73 const int DVBRecorder::POLL_WARNING_TIMEOUT = 500; // msec 74 74 75 #define USE_DRB 76 #ifndef USE_DRB 77 static ssize_t safe_read(int fd, unsigned char *buf, size_t bufsize); 78 #endif // USE_DRB 79 75 80 #define LOC QString("DVBRec(%1): ").arg(_card_number_option) 76 81 #define LOC_WARN QString("DVBRec(%1) Warning: ").arg(_card_number_option) 77 82 #define LOC_ERR QString("DVBRec(%1) Error: ").arg(_card_number_option) 78 83 79 84 DVBRecorder::DVBRecorder(TVRec *rec, DVBChannel* advbchannel) 80 85 : DTVRecorder(rec, "DVBRecorder"), 86 _drb(NULL), 81 87 // Options set in SetOption() 82 88 _card_number_option(0), _record_transport_stream_option(false), 83 89 _hw_decoder_option(false), 84 90 // DVB stuff 85 91 dvbchannel(advbchannel), 92 _reset_pid_filters(true), 93 _pid_lock(true), 86 94 // PS recorder stuff 87 95 _ps_rec_audio_id(0xC0), _ps_rec_video_id(0xE0), 88 96 // Output stream info 89 97 _pat(NULL), _pmt(NULL), _next_pmt_version(0), 90 98 _ts_packets_until_psip_sync(0), 91 // Input Misc92 _reset_pid_filters(true),93 // Locking94 _pid_lock(true),95 99 // Statistics 96 100 _continuity_error_count(0), _stream_overflow_count(0), 97 101 _bad_packet_count(0) 98 102 { 99 103 bzero(_ps_rec_buf, sizeof(unsigned char) * 3); 100 104 101 bzero(&_polls, sizeof(struct pollfd)); 102 _polls.fd = _stream_fd; 105 #ifdef USE_DRB 106 _drb = new DeviceReadBuffer(this); 107 _buffer_size = (1024*1024 / TSPacket::SIZE) * TSPacket::SIZE; 108 #else 109 _buffer_size = (4*1024*1024 / TSPacket::SIZE) * TSPacket::SIZE; 110 #endif 103 111 104 _buffer_size = (4*1024*1024 / MPEG_TS_PKT_SIZE) * MPEG_TS_PKT_SIZE;105 112 _buffer = new unsigned char[_buffer_size]; 106 113 bzero(_buffer, _buffer_size); 107 114 } … … 113 120 114 121 void DVBRecorder::TeardownAll(void) 115 122 { 116 if (_stream_fd >= 0) 123 // Make SURE that the device read thread is cleaned up -- John Poet 124 StopRecording(); 125 126 if (IsOpen()) 117 127 Close(); 118 128 119 129 if (_buffer) … … 177 187 178 188 bool DVBRecorder::Open(void) 179 189 { 180 if ( _stream_fd >= 0)190 if (IsOpen()) 181 191 { 182 192 VERBOSE(VB_GENERAL, LOC_WARN + "Card already open"); 183 193 return true; … … 185 195 186 196 _stream_fd = open(dvbdevice(DVB_DEV_DVR,_card_number_option), 187 197 O_RDONLY | O_NONBLOCK); 188 if ( _stream_fd < 0)198 if (!IsOpen()) 189 199 { 190 200 VERBOSE(VB_IMPORTANT, LOC_ERR + "Failed to open DVB device" + ENO); 191 201 return false; 192 202 } 203 #ifdef USE_DRB 204 if (_drb) 205 _drb->Reset(videodevice, _stream_fd); 206 #endif 193 207 194 _polls.fd = _stream_fd;195 _polls.events = POLLIN;196 _polls.revents = 0;197 198 208 connect(dvbchannel, SIGNAL(UpdatePMTObject(const PMTObject *)), 199 209 this, SLOT(SetPMTObject(const PMTObject *))); 200 210 201 VERBOSE(VB_RECORD, LOC + 202 QString("Card opened successfully (using %1mode).")211 VERBOSE(VB_RECORD, LOC + QString("Card opened successfully fd(%1)") 212 .arg(_stream_fd) + QString(" (using %2 mode).") 203 213 .arg(_record_transport_stream_option ? "TS" : "PS")); 204 214 205 215 dvbchannel->RecorderStarted(); … … 211 221 { 212 222 VERBOSE(VB_RECORD, LOC + "Close() fd("<<_stream_fd<<") -- begin"); 213 223 214 if (_stream_fd < 0) 215 return; 216 217 CloseFilters(); 218 219 if (_stream_fd >= 0) 224 if (IsOpen()) 225 { 226 CloseFilters(); 220 227 close(_stream_fd); 228 _stream_fd = -1; 229 } 221 230 222 _stream_fd = -1;223 _polls.fd = -1;224 225 231 VERBOSE(VB_RECORD, LOC + "Close() fd("<<_stream_fd<<") -- end"); 226 232 } 227 233 228 234 void DVBRecorder::CloseFilters(void) 229 235 { 230 236 QMutexLocker change_lock(&_pid_lock); 231 for(unsigned int i=0; i<_pid_filters.size(); i++)232 if (_pid_filters[i] >= 0)233 close(_pid_filters[i]);234 _pid_filters.clear();235 237 236 pid_ipack_t::iterator iter = _ps_rec_pid_ipack.begin();237 for ( ;iter != _ps_rec_pid_ipack.end(); iter++)238 PIDInfoMap::iterator it; 239 for (it = _pid_infos.begin(); it != _pid_infos.end(); ++it) 238 240 { 239 if ((*iter).second != NULL) 240 { 241 free_ipack((*iter).second); 242 free((void*)(*iter).second); 243 } 241 (*it)->Close(); 242 delete *it; 244 243 } 245 _p s_rec_pid_ipack.clear();244 _pid_infos.clear(); 246 245 } 247 246 248 247 void DVBRecorder::OpenFilter(uint pid, … … 284 283 usecs /= 2; 285 284 } 286 285 VERBOSE(VB_RECORD, LOC + "Set demux buffer size for " + 287 QString("pid 0x%1 to %2,\n\t\t\t 286 QString("pid 0x%1 to %2,\n\t\t\twhich gives us a %3 msec buffer.") 288 287 .arg(pid,0,16).arg(sz).arg(usecs/1000)); 289 288 290 289 // Set the filter type … … 303 302 return; 304 303 } 305 304 305 PIDInfo *info = new PIDInfo(); 306 // Set isVideo based on stream type 307 info->isVideo = StreamID::IsVideo(stream_type); 306 308 // Add the file descriptor to the filter list 307 QMutexLocker change_lock(&_pid_lock); 308 _pid_filters.push_back(fd_tmp); 309 310 // Initialize continuity count 311 _continuity_count[pid] = -1; 312 if (_record_transport_stream_option) 313 { 314 //Set the TS->PES converter to NULL 315 _ps_rec_pid_ipack[pid] = NULL; 316 return; 317 } 318 309 info->filter_fd = fd_tmp; 319 310 // If we are in legacy PES mode, initialize TS->PES converter 320 ipack* ip = (ipack*)malloc(sizeof(ipack)); 321 assert(ip); 322 switch (type) 323 { 324 case ES_TYPE_VIDEO_MPEG1: 325 case ES_TYPE_VIDEO_MPEG2: 326 init_ipack(ip, 2048, ProcessDataPS); 327 ip->replaceid = _ps_rec_video_id++; 328 break; 311 if (!_record_transport_stream_option) 312 info->ip = CreateIPack(type); 329 313 330 case ES_TYPE_AUDIO_MPEG1: 331 case ES_TYPE_AUDIO_MPEG2: 332 init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */ 333 ip->replaceid = _ps_rec_audio_id++; 334 break; 335 336 case ES_TYPE_AUDIO_AC3: 337 init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */ 338 ip->priv_type = PRIV_TS_AC3; 339 break; 340 341 case ES_TYPE_SUBTITLE: 342 case ES_TYPE_TELETEXT: 343 init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */ 344 ip->priv_type = PRIV_DVB_SUB; 345 break; 346 347 default: 348 init_ipack(ip, 2048, ProcessDataPS); 349 break; 350 } 351 ip->data = (void*)this; 352 _ps_rec_pid_ipack[pid] = ip; 314 // Add the new info to the map 315 QMutexLocker change_lock(&_pid_lock); 316 _pid_infos[pid] = info; 353 317 } 354 318 355 bool DVBRecorder:: SetDemuxFilters(void)319 bool DVBRecorder::OpenFilters(void) 356 320 { 357 QMutexLocker change_lock(&_pid_lock);358 321 CloseFilters(); 359 322 360 _continuity_count.clear(); 361 _encrypted_pid.clear(); 362 _payload_start_seen.clear(); 363 data_found = false; 323 QMutexLocker change_lock(&_pid_lock); 324 364 325 _wait_for_keyframe = _wait_for_keyframe_option; 365 keyframe_found = false;366 326 367 327 _ps_rec_audio_id = 0xC0; 368 328 _ps_rec_video_id = 0xE0; … … 427 387 (*es).Orig_Type); 428 388 429 389 430 if (_pid_ filters.size() == 0 && _ps_rec_pid_ipack.size() == 0)390 if (_pid_infos.empty()) 431 391 { 432 392 VERBOSE(VB_GENERAL, LOC_WARN + 433 393 "Recording will not commence until a PID is set."); … … 436 396 return true; 437 397 } 438 398 439 /* 440 * Process PMT and decide which components should be recorded 399 /** \fn DVBRecorder::AutoPID(void) 400 * \brief Process PMT and decide which components should be recorded 401 * 402 * This is particularly for the hardware decoders which don't like 403 * to see more than one audio or video stream. 441 404 */ 442 405 void DVBRecorder::AutoPID(void) 443 406 { 444 407 QMutexLocker change_lock(&_pid_lock); 445 _videoPID.clear();446 408 447 409 VERBOSE(VB_RECORD, LOC + 448 410 QString("AutoPID for MPEG Program Number(%1), PCR PID(0x%2)") 449 411 .arg(_input_pmt.ServiceID).arg(((uint)_input_pmt.PCRPID),0,16)); 450 412 451 // Wanted languages:452 //QStringList Languages = iso639_get_language_list();453 454 413 // Wanted stream types: 455 414 QValueList<ES_Type> StreamTypes; 456 415 StreamTypes += ES_TYPE_VIDEO_MPEG1; … … 467 426 StreamTypes += ES_TYPE_SUBTITLE; 468 427 } 469 428 470 QMap<ES_Type, bool> flagged; 429 uint has_video = 0; 430 uint has_audio = 0; 431 471 432 QValueList<ElementaryPIDObject>::Iterator es; 472 433 for (es = _input_pmt.Components.begin(); 473 434 es != _input_pmt.Components.end(); ++es) 474 435 { 436 // Check if this is a stream we know about 475 437 if (!StreamTypes.contains((*es).Type)) 476 {477 // Type not wanted478 438 continue; 479 }480 439 481 if (((*es).Type == ES_TYPE_AUDIO_MPEG1) ||482 ((*es).Type == ES_TYPE_AUDIO_MPEG2) ||483 ((*es).Type == ES_TYPE_AUDIO_AC3))484 {485 bool ignore = false;486 487 // Check for audio descriptors488 DescriptorList::Iterator dit;489 for (dit = (*es).Descriptors.begin();490 dit != (*es).Descriptors.end(); ++dit)491 {492 // Check for "Hearing impaired" or493 // "Visual impaired commentary" stream494 if (((*dit).Data[0] == 0x0A) &&495 ((*dit).Data[5] & 0xFE == 0x02))496 {497 ignore = true;498 break;499 }500 }501 502 if (ignore)503 continue; // Ignore this stream504 }505 506 440 if (_hw_decoder_option) 507 441 { 508 // Limit hardware decoders to one A/V stream 509 switch ((*es).Type) 510 { 511 case ES_TYPE_VIDEO_MPEG1: 512 case ES_TYPE_VIDEO_MPEG2: 513 if (flagged.contains(ES_TYPE_VIDEO_MPEG1) || 514 flagged.contains(ES_TYPE_VIDEO_MPEG2)) 515 { 516 continue; // Ignore this stream 517 } 518 break; 442 has_video += StreamID::IsVideo((*es).Orig_Type) ? 1 : 0; 443 has_audio += StreamID::IsAudio((*es).Orig_Type) ? 1 : 0; 519 444 520 case ES_TYPE_AUDIO_MPEG1: 521 case ES_TYPE_AUDIO_MPEG2: 522 if (flagged.contains(ES_TYPE_AUDIO_MPEG1) || 523 flagged.contains(ES_TYPE_AUDIO_MPEG2)) 524 { 525 continue; // Ignore this stream 526 } 527 break; 528 529 default: 530 break; 531 } 445 // Limit hardware decoders streams to one video stream 446 if (StreamID::IsVideo((*es).Orig_Type) && has_video > 1) 447 continue; 448 // Limit hardware decoders streams to one audio stream 449 if (StreamID::IsAudio((*es).Orig_Type) && has_audio > 1) 450 continue; 532 451 } 533 452 534 //if (Languages.isEmpty() // No specific language wanted 535 // || (*es).Language.isEmpty() // Component has no language 536 // || Languages.contains((*es).Language)) // This language is wanted! 537 { 538 (*es).Record = true; 539 flagged[(*es).Type] = true; 540 } 453 (*es).Record = true; 541 454 } 542 455 456 if (!(print_verbose_messages|VB_RECORD)) 457 return; 458 459 // print out some debugging info 543 460 for (es = _input_pmt.Components.begin(); 544 461 es != _input_pmt.Components.end(); ++es) 545 462 { 546 if (StreamTypes.contains((*es).Type) && !flagged.contains((*es).Type)) 547 { 548 // We want this stream type but no component was flagged 549 (*es).Record = true; 550 } 551 552 if ((*es).Record) 553 { 554 VERBOSE(VB_RECORD, LOC + 555 QString("AutoPID selecting PID 0x%1, %2") 556 .arg((*es).PID,0,16).arg((*es).Description)); 557 558 switch ((*es).Type) 559 { 560 case ES_TYPE_VIDEO_MPEG1: 561 case ES_TYPE_VIDEO_MPEG2: 562 _videoPID[(*es).PID] = true; 563 break; 564 565 default: 566 // Do nothing 567 break; 568 } 569 } 570 else 571 { 572 VERBOSE(VB_RECORD, LOC + 573 QString("AutoPID skipping PID 0x%1, %2") 574 .arg((*es).PID,0,16).arg((*es).Description)); 575 } 463 QString msg = ((*es).Record) ? "recording" : "skipping"; 464 VERBOSE(VB_RECORD, LOC + 465 QString("AutoPID %1 PID 0x%2, %3") 466 .arg(msg).arg((*es).PID,0,16).arg((*es).Description)); 576 467 } 577 468 578 469 VERBOSE(VB_RECORD, LOC + "AutoPID Complete - PAT/PMT Loaded for service"); 579 470 580 471 QString msg = (_input_pmt.FTA()) ? "unencrypted" : "ENCRYPTED"; 581 VERBOSE(VB_RECORD, LOC + "A /V Stream is " + msg);472 VERBOSE(VB_RECORD, LOC + "AutoPID A/V Stream is " + msg); 582 473 } 583 474 584 475 void DVBRecorder::StartRecording(void) … … 600 491 SetPMT(NULL); 601 492 _ts_packets_until_psip_sync = 0; 602 493 603 MythTimer t; 604 t.start(); 494 #ifdef USE_DRB 495 bool ok = _drb->Setup(videodevice, _stream_fd); 496 if (!ok) 497 { 498 VERBOSE(VB_IMPORTANT, LOC_ERR + "Failed to allocate DRB buffer"); 499 Close(); 500 _error = true; 501 return; 502 } 503 _drb->Start(); 504 #else 505 _poll_timer.start(); 506 #endif // USE_DRB 507 605 508 while (_request_recording && !_error) 606 509 { 607 510 if (PauseAndWait()) 608 511 continue; 609 512 610 if ( _stream_fd < 0)513 if (!IsOpen()) 611 514 { 612 usleep( 50);515 usleep(2000); 613 516 continue; 614 517 } 615 518 616 519 if (_reset_pid_filters) 617 520 { 521 _reset_pid_filters = false; 618 522 VERBOSE(VB_RECORD, LOC + "Resetting Demux Filters"); 619 if ( SetDemuxFilters())523 if (OpenFilters()) 620 524 { 621 525 CreatePAT(); 622 526 CreatePMT(); 623 527 } 624 _reset_pid_filters = false;625 528 } 626 529 627 int ret; 628 do 629 ret = poll(&_polls, 1, POLL_INTERVAL); 630 while (!request_pause && (_stream_fd >= 0) && 631 ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno)))); 530 if (Poll()) 531 { 532 #ifndef USE_DRB 533 ssize_t len = safe_read(_stream_fd, _buffer, _buffer_size); 534 #else 535 ssize_t len = _drb->Read(_buffer, _buffer_size); 536 #endif // !USE_DRB 537 if (len > 0) 538 ProcessDataTS(_buffer, len); 539 } 632 540 633 if (request_pause || _stream_fd < 0) 634 continue; 635 636 if (ret == 0 && t.elapsed() > POLL_WARNING_TIMEOUT) 541 // Check for DRB errors 542 if (_drb->IsErrored()) 637 543 { 638 VERBOSE(VB_GENERAL, LOC_WARN + 639 QString("No data from card in %1 milliseconds.") 640 .arg(t.elapsed())); 544 VERBOSE(VB_IMPORTANT, LOC_ERR + "Device error detected"); 545 _error = true; 641 546 } 642 else if (ret == 1 && _polls.revents & POLLIN) 547 548 if (_drb->IsEOF()) 643 549 { 644 int msec = t.restart(); 645 if (msec >= POLL_WARNING_TIMEOUT) 646 { 647 VERBOSE(VB_IMPORTANT, LOC_WARN + 648 QString("Got data from card after %1 ms. (>%2)") 649 .arg(msec).arg(POLL_WARNING_TIMEOUT)); 650 } 651 652 ReadFromDMX(); 653 if (t.elapsed() >= 20) 654 { 655 VERBOSE(VB_RECORD, LOC_WARN + 656 QString("ReadFromDMX took %1 ms").arg(t.elapsed())); 657 } 550 VERBOSE(VB_IMPORTANT, LOC_ERR + "Device EOF detected"); 551 _error = true; 658 552 } 659 else if ((ret < 0) || (ret == 1 && _polls.revents & POLLERR))660 VERBOSE(VB_IMPORTANT, LOC_ERR +661 "Poll failed while waiting for data." + ENO);662 553 } 663 554 555 #ifdef USE_DRB 556 if (_drb) 557 _drb->Stop(); 558 #endif 559 664 560 Close(); 665 561 666 562 FinishRecording(); … … 668 564 _recording = false; 669 565 } 670 566 671 void DVBRecorder:: ReadFromDMX(void)567 void DVBRecorder::StopRecording(void) 672 568 { 673 int cardnum = _card_number_option; 674 int readsz = 1;675 unsigned char *pktbuf;569 #ifdef USE_DRB 570 TVRec *rec = tvrec; 571 tvrec = NULL; // don't notify of pause.. 676 572 677 while (readsz > 0) 573 bool ok = true; 574 if (!IsPaused()) 678 575 { 679 readsz = read(_stream_fd, _buffer, _buffer_size); 680 if (readsz < 0) 681 { 682 if (errno == EOVERFLOW) 683 { 684 ++_stream_overflow_count; 685 VERBOSE(VB_RECORD, LOC + 686 "DVB Buffer overflow error detected on read"); 687 break; 688 } 576 Pause(); 577 ok = WaitForPause(250); 578 } 689 579 690 if (errno == EAGAIN) 691 break; 692 VERBOSE(VB_IMPORTANT, LOC_ERR + 693 "Error reading from DVB device." + ENO); 694 break; 695 } else if (readsz == 0) 696 break; 580 _request_recording = false; 697 581 698 if (readsz % MPEG_TS_PKT_SIZE) 699 { 700 VERBOSE(VB_IMPORTANT, LOC_ERR + "Incomplete packet received."); 701 readsz = readsz - (readsz % MPEG_TS_PKT_SIZE); 702 } 582 _drb->Stop(); 703 583 704 int pkts = readsz / MPEG_TS_PKT_SIZE; 705 int curpkt = 0; 584 if (ok) 585 _drb->Teardown(); 586 else // Better to have a memory leak, than a segfault? -- John Poet 587 VERBOSE(VB_IMPORTANT, LOC_ERR + "DeviceReadBuffer not cleaned up!"); 706 588 707 _pid_lock.lock(); 708 while (curpkt < pkts) 709 { 710 if (data_found == false) 711 { 712 GENERAL("Data read from DMX - " 713 "This is for debugging with transform.c"); 714 data_found = true; 715 } 589 while (_recording) 590 usleep(2000); 716 591 717 pktbuf = _buffer + (curpkt * MPEG_TS_PKT_SIZE); 718 curpkt++; 592 tvrec = rec; 593 #else 594 DTVRecorder::StopRecording(); 595 #endif 596 } 719 597 720 int pes_offset = 0; 721 int pid = ((pktbuf[1]&0x1f) << 8) | pktbuf[2]; 722 uint8_t scrambling = (pktbuf[3] >> 6) & 0x03; 723 uint8_t cc = pktbuf[3] & 0xf; 724 uint8_t content = (pktbuf[3] & 0x30) >> 4; 598 void DVBRecorder::ReaderPaused(int /*fd*/) 599 { 600 #ifdef USE_DRB 601 pauseWait.wakeAll(); 602 if (tvrec) 603 tvrec->RecorderPaused(); 604 #endif // USE_DRB 605 } 725 606 726 if (pktbuf[1] & 0x80) 727 { 728 VERBOSE(VB_RECORD, LOC + 729 "Packet dropped due to uncorrectable error."); 730 ++_bad_packet_count; 731 continue; 732 } 607 bool DVBRecorder::PauseAndWait(int timeout) 608 { 609 #ifdef USE_DRB 610 if (request_pause) 611 { 612 paused = true; 613 if (!_drb->IsPaused()) 614 _drb->SetRequestPause(true); 733 615 734 if (scrambling) 735 { 736 if (!_encrypted_pid[pid]) 737 { 738 VERBOSE(VB_RECORD, LOC + 739 QString("PID 0x%1 is encrypted, ignoring") 740 .arg(pid,0,16)); 741 _encrypted_pid[pid] = true; 742 } 743 continue; // Drop encrypted TS packet 744 } 616 unpauseWait.wait(timeout); 617 } 618 else if (_drb->IsPaused()) 619 { 620 _drb->SetRequestPause(false); 621 _drb->WaitForUnpause(timeout); 622 paused = _drb->IsPaused(); 623 } 624 else 625 { 626 paused = false; 627 } 628 return paused; 629 #else // if !USE_DRB 630 return RecorderBase::PauseAndWait(timeout); 631 #endif // !USE_DRB 632 } 745 633 746 if (_encrypted_pid[pid]) 747 { 748 VERBOSE(VB_RECORD, LOC + 749 QString("PID 0x%1 is no longer encrypted") 750 .arg(pid,0,16)); 751 _encrypted_pid[pid] = false; 752 } 753 if (content & 0x1) 754 { 755 if (_continuity_count[pid] < 0) 756 _continuity_count[pid] = cc; 757 else 758 { 759 _continuity_count[pid] = (_continuity_count[pid]+1) & 0xf; 760 if (_continuity_count[pid] != cc) 761 { 762 VERBOSE(VB_RECORD, LOC + 763 QString("PID 0x%1 _continuity_count %2 cc %3") 764 .arg(pid,0,16).arg(_continuity_count[pid]) 765 .arg(cc)); 766 _continuity_count[pid] = cc; 767 ++_continuity_error_count; 768 } 769 } 770 } 634 uint DVBRecorder::ProcessDataTS(unsigned char *buffer, uint len) 635 { 636 if (len % TSPacket::SIZE) 637 { 638 VERBOSE(VB_IMPORTANT, LOC_ERR + "Incomplete packet received."); 639 len = len - (len % TSPacket::SIZE); 640 } 641 if (len < TSPacket::SIZE) 642 return len; 771 643 772 if (_record_transport_stream_option) 773 { // handle TS recording 774 MythTimer t; 775 t.start(); 776 if (_videoPID[pid]) 777 { 778 // Check for keyframe 779 const TSPacket *pkt = 780 reinterpret_cast<const TSPacket*>(pktbuf); 781 FindKeyframes(pkt); 782 } 783 if (t.elapsed() > 10) 784 { 785 VERBOSE(VB_RECORD, LOC_WARN + 786 QString("Find keyframes took %1 ms!") 787 .arg(t.elapsed())); 788 } 644 uint pos = 0; 645 uint end = len - TSPacket::SIZE; 646 while (pos <= end) 647 { 648 const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[pos]); 649 ProcessTSPacket(*pkt); 650 pos += TSPacket::SIZE; 651 } 652 return len - pos; 653 } 789 654 790 // Sync recording start to first keyframe 791 if (_wait_for_keyframe && !_keyframe_seen) 792 continue;793 if (!keyframe_found)794 {795 keyframe_found = true;796 VERBOSE(VB_RECORD, QString("Found first keyframe"));797 655 bool DVBRecorder::ProcessTSPacket(const TSPacket& tspacket) 656 { 657 if (tspacket.TransportError()) 658 { 659 VERBOSE(VB_RECORD, LOC + "Packet dropped due to uncorrectable error."); 660 ++_bad_packet_count; 661 return false; // Drop bad TS packets... 662 } 798 663 799 // Sync streams to the first Payload Unit Start Indicator 800 // _after_ first keyframe iff _wait_for_keyframe is true 801 if (!_payload_start_seen[pid]) 802 { 803 if ((pktbuf[1] & 0x40) == 0) 804 continue; // not payload start - drop packet 664 const uint pid = tspacket.PID(); 805 665 806 VERBOSE(VB_RECORD, QString("Found Payload Start for PID %1").arg(pid)); 807 _payload_start_seen[pid] = true; 808 } 666 QMutexLocker locker(&_pid_lock); 667 PIDInfo *info = _pid_infos[pid]; 668 if (!info) 669 info = _pid_infos[pid] = new PIDInfo(); 809 670 810 t.start(); 811 LocalProcessDataTS(pktbuf, MPEG_TS_PKT_SIZE); 812 if (t.elapsed() > 10) 813 { 814 VERBOSE(VB_RECORD, LOC_WARN + 815 QString("TS packet write took %1 ms!") 816 .arg(t.elapsed())); 817 } 818 } 819 else 820 { // handle PS recording 821 ipack *ip = _ps_rec_pid_ipack[pid]; 822 if (ip == NULL) 823 continue; 824 825 ip->ps = 1; 826 827 if ((pktbuf[1] & 0x40) && (ip->plength == MMAX_PLENGTH-6)) 828 { 829 ip->plength = ip->found-6; 830 ip->found = 0; 831 send_ipack(ip); 832 reset_ipack(ip); 833 } 834 835 if (content & 0x2) 836 pes_offset = pktbuf[4] + 1; 837 838 if (pes_offset > 183) 839 continue; 840 841 instant_repack(pktbuf + 4 + pes_offset, 842 MPEG_TS_PKT_SIZE - 4 - pes_offset, ip); 843 } 671 // track scrambled pids 672 if (tspacket.ScramplingControl()) 673 { 674 if (!info->isEncrypted) 675 { 676 VERBOSE(VB_RECORD, LOC + 677 QString("PID 0x%1 is encrypted, ignoring").arg(pid,0,16)); 678 info->isEncrypted = true; 844 679 } 845 _pid_lock.unlock();680 return true; // Drop encrypted TS packets... 846 681 } 847 } 682 else if (info->isEncrypted) 683 { 684 VERBOSE(VB_RECORD, LOC + 685 QString("PID 0x%1 is no longer encrypted").arg(pid,0,16)); 686 info->isEncrypted = false; 687 } 848 688 849 #define SEQ_START 0x000001B3 850 #define GOP_START 0x000001B8 851 #define PICTURE_START 0x00000100 852 #define SLICE_MIN 0x00000101 853 #define SLICE_MAX 0x000001af 854 855 void DVBRecorder::ProcessDataPS(unsigned char *buffer, int len, void *priv) 856 { 857 ((DVBRecorder*)priv)->LocalProcessDataPS(buffer, len); 858 } 859 860 void DVBRecorder::LocalProcessDataPS(unsigned char *buffer, int len) 861 { 862 if (buffer[0] != 0x00 || buffer[1] != 0x00 || buffer[2] != 0x01) 689 // Check continuity counter 690 if (tspacket.HasPayload()) 863 691 { 864 if (!_wait_for_keyframe) 865 ringBuffer->Write(buffer, len); 866 return; 692 if (!info->CheckCC(tspacket.ContinuityCounter())) 693 { 694 VERBOSE(VB_RECORD, LOC + 695 QString("PID 0x%1 discontinuity detected").arg(pid,0,16)); 696 _continuity_error_count++; 697 } 867 698 } 868 699 869 if ( buffer[3] >= VIDEO_STREAM_S && buffer[3] <= VIDEO_STREAM_E)700 if (_record_transport_stream_option) 870 701 { 871 int pos = 8 + buffer[8]; 872 int datalen = len - pos; 702 // handle TS recording 873 703 874 unsigned char *bufptr = &buffer[pos]; 875 uint state = 0xFFFFFFFF; 876 uint state_byte = 0; 877 int prvcount = -1; 704 // Check for keyframes and count frames 705 if (info->isVideo) 706 FindKeyframes(&tspacket); 878 707 879 while (bufptr < &buffer[pos] + datalen) 708 // Sync recording start to first keyframe 709 if (_wait_for_keyframe_option && !_keyframe_seen) 710 return true; 711 712 // Sync streams to the first Payload Unit Start Indicator 713 // _after_ first keyframe iff _wait_for_keyframe_option is true 714 if (!info->payloadStartSeen) 880 715 { 881 if (++prvcount < 3) 882 state_byte = _ps_rec_buf[prvcount]; 883 else 884 state_byte = *bufptr++; 716 if (!tspacket.PayloadStart()) 717 return true; // not payload start - drop packet 885 718 886 if (state != 0x000001) 887 { 888 state = ((state << 8) | state_byte) & 0xFFFFFF; 889 continue; 890 } 719 VERBOSE(VB_RECORD, 720 QString("PID 0x%1 Found Payload Start").arg(pid,0,16)); 721 info->payloadStartSeen = true; 722 } 891 723 892 state = ((state << 8) | state_byte) & 0xFFFFFF; 893 if (state >= SLICE_MIN && state <= SLICE_MAX) 894 continue; 724 // Write PAT & PMT tables occasionally 725 WritePATPMT(); 895 726 896 if (state == SEQ_START) 897 _wait_for_keyframe = false; 727 // Write Data 728 ringBuffer->Write(tspacket.data(), TSPacket::SIZE); 729 } 730 else 731 { 732 // handle PS recording 898 733 899 if (GOP_START == state) 900 { 901 long long startpos = ringBuffer->GetWritePosition(); 902 903 if (!_position_map.contains(_frames_written_count)) 904 { 905 _position_map_delta[_frames_written_count] = startpos; 906 _position_map[_frames_written_count] = startpos; 734 ipack *ip = info->ip; 735 if (ip == NULL) 736 return true; 907 737 908 if (curRecording && 909 ((_position_map_delta.size() % 30) == 0)) 910 { 911 curRecording->SetPositionMapDelta( 912 _position_map_delta, 913 MARK_GOP_BYFRAME); 914 curRecording->SetFilesize(startpos); 915 _position_map_delta.clear(); 916 } 917 } 918 } 919 else if (PICTURE_START == state) 920 _frames_written_count++; 738 ip->ps = 1; 739 740 if (tspacket.PayloadStart() && (ip->plength == MMAX_PLENGTH-6)) 741 { 742 ip->plength = ip->found-6; 743 ip->found = 0; 744 send_ipack(ip); 745 reset_ipack(ip); 921 746 } 747 748 uint afc_offset = tspacket.AFCOffset(); 749 if (afc_offset > 187) 750 return true; 751 752 instant_repack((uint8_t*)tspacket.data() + afc_offset, 753 TSPacket::SIZE - afc_offset, ip); 922 754 } 923 memcpy(_ps_rec_buf, &buffer[len - 3], 3); 924 925 if (!_wait_for_keyframe) 926 ringBuffer->Write(buffer, len); 755 return true; 927 756 } 928 757 929 void DVBRecorder:: LocalProcessDataTS(unsigned char *buffer, int len)758 void DVBRecorder::WritePATPMT(void) 930 759 { 931 QMutexLocker read_lock(&_pid_lock);932 760 if (_ts_packets_until_psip_sync == 0) 933 761 { 762 QMutexLocker read_lock(&_pid_lock); 934 763 if (_pat && _pmt) 935 764 { 936 765 ringBuffer->Write(_pat->tsheader()->data(), TSPacket::SIZE); … … 940 769 } 941 770 else 942 771 _ts_packets_until_psip_sync--; 943 944 ringBuffer->Write(buffer,len);945 772 } 946 773 947 774 void DVBRecorder::Reset(void) … … 977 804 void DVBRecorder::CreatePAT(void) 978 805 { 979 806 QMutexLocker read_lock(&_pid_lock); 980 int next_cc = 0;807 uint next_cc = 0; 981 808 if (_pat) 982 809 next_cc = (_pat->tsheader()->ContinuityCounter() + 1) & 0x0F; 983 810 … … 989 816 SetPAT(ProgramAssociationTable::Create(tsid, next_cc, pnum, pid)); 990 817 } 991 818 819 //#define USE_OLD_CREATE_PMT 820 #ifndef USE_OLD_CREATE_PMT 821 static void DescList_to_desc_list(DescriptorList &list, desc_list_t &vec) 822 { 823 vec.clear(); 824 for (DescriptorList::iterator it = list.begin(); it != list.end(); ++it) 825 vec.push_back((*it).Data); 826 } 827 992 828 void DVBRecorder::CreatePMT(void) 993 829 { 994 830 QMutexLocker read_lock(&_pid_lock); 995 int pmt_cc = 0; 831 832 // Figure out what goes into the PMT 833 uint programNumber = 1; // MPEG Program Number 834 desc_list_t gdesc; 835 vector<uint> pids; 836 vector<uint> types; 837 vector<desc_list_t> pdesc; 838 QValueList<ElementaryPIDObject>::iterator it; 839 840 DescList_to_desc_list(_input_pmt.Descriptors, gdesc); 841 842 it = _input_pmt.Components.begin(); 843 for (; it != _input_pmt.Components.end(); ++it) 844 { 845 if ((*it).Record) 846 { 847 pids.push_back((*it).PID); 848 types.push_back((*it).Orig_Type); 849 pdesc.resize(pdesc.size()+1); 850 DescList_to_desc_list((*it).Descriptors, pdesc.back()); 851 } 852 } 853 854 // Create the PMT 855 ProgramMapTable *pmt = ProgramMapTable::Create( 856 programNumber, PMT_PID, _input_pmt.PCRPID, 857 _next_pmt_version, gdesc, 858 pids, types, pdesc); 859 860 // Increment the continuity counter... 861 uint pmt_cc = 0; 996 862 if (_pmt) 997 863 pmt_cc = (_pmt->tsheader()->ContinuityCounter() + 1) & 0x0F; 864 pmt->tsheader()->SetContinuityCounter(pmt_cc); 998 865 999 TSPacket pkt;1000 uint8_t *ts_packet = pkt.data(); 1001 memset(ts_packet, 0xFF, TSPacket::SIZE); 866 SetPMT(pmt); 867 } 868 #endif 1002 869 1003 ts_packet[0] = 0x47; // sync byte 1004 ts_packet[1] = 0x40 | ((PMT_PID >> 8) & 0x1F); // payload start & PID 1005 ts_packet[2] = PMT_PID & 0xFF; // PID 1006 // scrambling, adaptation & continuity counter 1007 ts_packet[3] = 0x10 | pmt_cc; 1008 ts_packet[4] = 0x00; // pointer field 870 //////////////////////////////////////////////////////////// 871 // Stuff below this comment will be phased out after 0.20 // 872 //////////////////////////////////////////////////////////// 1009 873 1010 ++pmt_cc &= 0x0F; // inc. continuity counter 1011 uint8_t *pmt_data = ts_packet + 5; 1012 int p = 0; 874 #ifdef USE_OLD_CREATE_PMT 875 void DVBRecorder::CreatePMT(void) 876 { 877 QMutexLocker read_lock(&_pid_lock); 878 uint pmt_cc = 0; 879 if (_pmt) 880 pmt_cc = (_pmt->tsheader()->ContinuityCounter() + 1) & 0x0F; 1013 881 1014 pmt_data[p++] = PMT_TID; // table ID 1015 pmt_data[p++] = 0xB0; // section syntax indicator 1016 p++; // section length (set later) 1017 pmt_data[p++] = 0; // program number (ServiceID) 1018 pmt_data[p++] = 1; // program number (ServiceID) 1019 pmt_data[p++] = 0xC1 + (_next_pmt_version << 1); // Version + Current/Next 1020 pmt_data[p++] = 0; // Current Section 1021 pmt_data[p++] = 0; // Last Section 1022 pmt_data[p++] = (_input_pmt.PCRPID >> 8) & 0x1F; 1023 pmt_data[p++] = _input_pmt.PCRPID & 0xFF; 882 ProgramMapTable *pmt = ProgramMapTable::CreateBlank(); 883 pmt->tsheader()->SetPID(PMT_PID); 884 pmt->tsheader()->SetContinuityCounter(pmt_cc); 1024 885 886 pmt->SetProgramNumber(1); 887 pmt->SetPCRPID(_input_pmt.PCRPID); 888 pmt->SetVersionNumber(_next_pmt_version); 889 uint8_t *pmt_data = pmt->tsheader()->data() + 5; 890 uint p = 10; 891 1025 892 // Write descriptors 1026 893 int program_info_length = 0; 1027 894 DescriptorList::Iterator dit; … … 1111 978 pmt_data[p++] = (crc >> 16) & 0xFF; 1112 979 pmt_data[p++] = (crc >> 8) & 0xFF; 1113 980 pmt_data[p++] = crc & 0xFF; 1114 1115 SetPMT( new ProgramMapTable(PSIPTable(pkt)));981 982 SetPMT(pmt); 1116 983 } 984 #endif 1117 985 1118 void DVBRecorder::DebugTSHeader(unsigned char* buffer, int len) 986 bool DVBRecorder::Poll(void) const 1119 987 { 1120 (void) len; 988 #ifndef USE_DRB 989 struct pollfd polls; 990 polls.fd = _stream_fd; 991 polls.events = POLLIN; 992 polls.revents = 0; 1121 993 1122 uint8_t sync = buffer[0]; 1123 uint8_t transport_error = (buffer[1] & 0x80) >> 7; 1124 uint8_t payload_start = (buffer[1] & 0x40) >> 6; 1125 uint16_t pid = (buffer[1] & 0x1F) << 8 | buffer[2]; 1126 uint8_t transport_scrambled = (buffer[3] & 0xB0) >> 6; 1127 uint8_t adaptation_control = (buffer[3] & 0x30) >> 4; 1128 uint8_t counter = buffer[3] & 0x0F; 994 int ret; 995 do 996 ret = poll(&polls, 1, POLL_INTERVAL); 997 while (!request_pause && IsOpen() && 998 ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno)))); 1129 999 1130 int pos=4; 1131 if (adaptation_control == 2 || adaptation_control == 3) 1000 if (request_pause || !IsOpen()) 1001 return false; 1002 1003 if (ret > 0 && polls.revents & POLLIN) 1132 1004 { 1133 unsigned char adaptation_length; 1134 adaptation_length = buffer[pos++]; 1135 pos += adaptation_length; 1005 if (_poll_timer.elapsed() >= POLL_WARNING_TIMEOUT) 1006 { 1007 VERBOSE(VB_IMPORTANT, LOC_WARN + 1008 QString("Got data from card after %1 ms. (>%2)") 1009 .arg(_poll_timer.elapsed()).arg(POLL_WARNING_TIMEOUT)); 1010 } 1011 _poll_timer.start(); 1012 return true; 1136 1013 } 1137 1014 1138 QString debugmsg = 1139 QString("sync: %1 err: %2 paystart: %3 " 1140 "pid: %4 enc: %5 adaptation: %6 counter: %7") 1141 .arg(sync, 2, 16) 1142 .arg(transport_error) 1143 .arg(payload_start) 1144 .arg(pid) 1145 .arg(transport_scrambled) 1146 .arg(adaptation_control) 1147 .arg(counter); 1015 if (ret == 0 && _poll_timer.elapsed() > POLL_WARNING_TIMEOUT) 1016 { 1017 VERBOSE(VB_GENERAL, LOC_WARN + 1018 QString("No data from card in %1 ms.") 1019 .arg(_poll_timer.elapsed())); 1020 } 1148 1021 1149 const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[0]); 1150 FindKeyframes(pkt); 1022 if (ret < 0 && (EOVERFLOW == errno)) 1023 { 1024 _stream_overflow_count++; 1025 VERBOSE(VB_RECORD, LOC_ERR + "Driver buffer overflow detected."); 1026 } 1151 1027 1152 int cardnum = _card_number_option; 1153 GENERAL(debugmsg); 1028 if ((ret < 0) || (ret > 0 && polls.revents & POLLERR)) 1029 { 1030 VERBOSE(VB_IMPORTANT, LOC_ERR + 1031 "Poll failed while waiting for data." + ENO); 1032 } 1033 1034 return false; 1035 #else // if USE_DRB 1036 return true; 1037 #endif // USE_DRB 1154 1038 } 1039 1040 void DVBRecorder::ProcessDataPS(unsigned char *buffer, uint len) 1041 { 1042 if (len < 4) 1043 return; 1044 1045 if (buffer[0] != 0x00 || buffer[1] != 0x00 || buffer[2] != 0x01) 1046 { 1047 if (!_wait_for_keyframe_option || _keyframe_seen) 1048 ringBuffer->Write(buffer, len); 1049 return; 1050 } 1051 1052 uint stream_id = buffer[3]; 1053 if ((stream_id >= PESStreamID::MPEGVideoStreamBegin) && 1054 (stream_id <= PESStreamID::MPEGVideoStreamEnd)) 1055 { 1056 uint pos = 8 + buffer[8]; 1057 uint datalen = len - pos; 1058 1059 unsigned char *bufptr = &buffer[pos]; 1060 uint state = 0xFFFFFFFF; 1061 uint state_byte = 0; 1062 int prvcount = -1; 1063 1064 while (bufptr < &buffer[pos] + datalen) 1065 { 1066 state_byte = (++prvcount < 3) ? _ps_rec_buf[prvcount] : *bufptr++; 1067 uint last = state; 1068 state = ((state << 8) | state_byte) & 0xFFFFFF; 1069 stream_id = state_byte; 1070 1071 // Skip non-prefixed stream id's and skip slice PES stream id's 1072 if ((last != 0x000001) || 1073 ((stream_id >= PESStreamID::SliceStartCodeBegin) && 1074 (stream_id <= PESStreamID::SliceStartCodeEnd))) 1075 { 1076 continue; 1077 } 1078 1079 // Now process the stream id's we care about 1080 if (PESStreamID::PictureStartCode == stream_id) 1081 _frames_written_count++; 1082 else if (PESStreamID::SequenceStartCode == stream_id) 1083 _keyframe_seen = true; 1084 else if (PESStreamID::GOPStartCode == stream_id) 1085 { 1086 _position_map_lock.lock(); 1087 bool save_map = false; 1088 if (!_position_map.contains(_frames_written_count)) 1089 { 1090 long long startpos = ringBuffer->GetWritePosition(); 1091 _position_map_delta[_frames_written_count] = startpos; 1092 _position_map[_frames_written_count] = startpos; 1093 save_map = true; 1094 } 1095 _position_map_lock.unlock(); 1096 if (save_map) 1097 SavePositionMap(false); 1098 } 1099 } 1100 } 1101 memcpy(_ps_rec_buf, &buffer[len - 3], 3); 1102 1103 if (!_wait_for_keyframe_option || _keyframe_seen) 1104 ringBuffer->Write(buffer, len); 1105 } 1106 1107 void DVBRecorder::process_data_ps_cb(unsigned char *buffer, 1108 int len, void *priv) 1109 { 1110 ((DVBRecorder*)priv)->ProcessDataPS(buffer, (uint)len); 1111 } 1112 1113 ipack *DVBRecorder::CreateIPack(ES_Type type) 1114 { 1115 ipack* ip = (ipack*)malloc(sizeof(ipack)); 1116 assert(ip); 1117 switch (type) 1118 { 1119 case ES_TYPE_VIDEO_MPEG1: 1120 case ES_TYPE_VIDEO_MPEG2: 1121 init_ipack(ip, 2048, process_data_ps_cb); 1122 ip->replaceid = _ps_rec_video_id++; 1123 break; 1124 1125 case ES_TYPE_AUDIO_MPEG1: 1126 case ES_TYPE_AUDIO_MPEG2: 1127 init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */ 1128 ip->replaceid = _ps_rec_audio_id++; 1129 break; 1130 1131 case ES_TYPE_AUDIO_AC3: 1132 init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */ 1133 ip->priv_type = PRIV_TS_AC3; 1134 break; 1135 1136 case ES_TYPE_SUBTITLE: 1137 case ES_TYPE_TELETEXT: 1138 init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */ 1139 ip->priv_type = PRIV_DVB_SUB; 1140 break; 1141 1142 default: 1143 init_ipack(ip, 2048, process_data_ps_cb); 1144 break; 1145 } 1146 ip->data = (void*)this; 1147 return ip; 1148 } 1149 1150 #ifndef USE_DRB 1151 static ssize_t safe_read(int fd, unsigned char *buf, size_t bufsize) 1152 { 1153 ssize_t size = read(fd, buf, bufsize); 1154 1155 if ((size < 0) && 1156 (errno != EAGAIN) && (errno != EOVERFLOW) && (EINTR != errno)) 1157 { 1158 VERBOSE(VB_IMPORTANT, "DVB:safe_read(): " 1159 "Error reading from DVB device." + ENO); 1160 } 1161 1162 return size; 1163 } 1164 #endif // USE_DRB -
libs/libmythtv/DeviceReadBuffer.h
1 // -*- Mode: c++ -*- 2 /* Device Buffer written by John Poet */ 3 4 #ifndef _DEVICEREADBUFFER_H_ 5 #define _DEVICEREADBUFFER_H_ 6 7 #include <unistd.h> 8 #include <pthread.h> 9 #include <sys/poll.h> 10 11 #include <qmutex.h> 12 #include <qwaitcondition.h> 13 #include <qstring.h> 14 15 #include "util.h" 16 17 class ReaderPausedCB 18 { 19 public: 20 virtual void ReaderPaused(int fd) = 0; 21 }; 22 23 /** \class DeviceReadBuffer 24 * \brief Buffers reads from device files. 25 * 26 * This allows us to read the device regularly even in the presence 27 * of long blocking conditions on writing to disk or accessing the 28 * database. 29 */ 30 class DeviceReadBuffer 31 { 32 public: 33 DeviceReadBuffer(ReaderPausedCB *callback, bool use_poll = true); 34 ~DeviceReadBuffer(); 35 36 bool Setup(const QString &streamName, int streamfd); 37 void Teardown(void); 38 39 void Start(void); 40 void Reset(const QString &streamName, int streamfd); 41 void Stop(void); 42 43 void SetRequestPause(bool request); 44 bool IsPaused(void) const; 45 bool WaitForUnpause(int timeout); 46 47 bool IsErrored(void) const { return error; } 48 bool IsEOF(void) const { return eof; } 49 50 uint Read(unsigned char *buf, uint count); 51 52 private: 53 static void *boot_ringbuffer(void *); 54 void fill_ringbuffer(void); 55 56 void SetPaused(bool); 57 void IncrWritePointer(uint len); 58 void IncrReadPointer(uint len); 59 60 bool HandlePausing(void); 61 bool Poll(void) const; 62 uint WaitForUnused(uint bytes_needed) const; 63 uint WaitForUsed (uint bytes_needed) const; 64 65 bool IsPauseRequested(void) const; 66 bool IsOpen(void) const { return _stream_fd >= 0; } 67 uint GetUnused(void) const; 68 uint GetUsed(void) const; 69 uint GetContiguousUnused(void) const; 70 71 bool CheckForErrors(ssize_t read_len, uint &err_cnt); 72 void ReportStats(void); 73 74 QString videodevice; 75 int _stream_fd; 76 77 ReaderPausedCB *readerPausedCB; 78 pthread_t thread; 79 80 // Data for managing the device ringbuffer 81 mutable QMutex lock; 82 bool run; 83 bool running; 84 bool eof; 85 bool error; 86 bool request_pause; 87 bool paused; 88 bool using_poll; 89 90 size_t size; 91 size_t used; 92 size_t dev_read_size; 93 size_t min_read; 94 unsigned char *buffer; 95 unsigned char *readPtr; 96 unsigned char *writePtr; 97 unsigned char *endPtr; 98 99 QWaitCondition pauseWait; 100 QWaitCondition unpauseWait; 101 102 // statistics 103 size_t max_used; 104 size_t avg_used; 105 size_t avg_cnt; 106 MythTimer lastReport; 107 }; 108 109 #endif // _DEVICEREADBUFFER_H_ -
libs/libmythtv/hdtvrecorder.h
3 3 * HDTVRecorder 4 4 * Copyright (c) 2003-2004 by Brandon Beattie, Doug Larrick, 5 5 * Jason Hoos, and Daniel Thor Kristjansson 6 * Device ringbuffer added by John Poet7 6 * Distributed as part of MythTV under GPL v2 and later. 8 7 */ 9 8 … … 12 11 13 12 #include "dtvrecorder.h" 14 13 #include "tsstats.h" 14 #include "DeviceReadBuffer.h" 15 15 16 16 struct AVFormatContext; 17 17 struct AVPacket; … … 28 28 * 29 29 * \sa DTVRecorder, DVBRecorder 30 30 */ 31 class HDTVRecorder : public DTVRecorder 31 class HDTVRecorder : public DTVRecorder, private ReaderPausedCB 32 32 { 33 33 Q_OBJECT 34 34 friend class ATSCStreamData; 35 35 friend class TSPacketProcessor; 36 36 public: 37 enum {report_loops = 20000};38 39 37 HDTVRecorder(TVRec *rec); 40 38 ~HDTVRecorder(); 41 39 … … 47 45 void StartRecording(void); 48 46 void StopRecording(void); 49 47 50 void Pause(bool clear = false);51 bool IsPaused(void) const;52 53 48 void Reset(void); 54 49 55 50 bool Open(void); … … 61 56 void deleteLater(void); 62 57 63 58 private: 59 bool IsOpen(void) const { return _stream_fd >= 0; } 60 bool Close(void); 61 64 62 void TeardownAll(void); 65 int ProcessData(unsigned char *buffer,int len);63 uint ProcessDataTS(unsigned char *buffer, uint len); 66 64 bool ProcessTSPacket(const TSPacket& tspacket); 67 65 void HandleVideo(const TSPacket* tspacket); 68 66 void HandleAudio(const TSPacket* tspacket); 69 67 70 68 int ResyncStream(unsigned char *buffer, int curr_pos, int len); 71 69 72 static void *boot_ringbuffer(void *); 73 void fill_ringbuffer(void); 74 int ringbuf_read(unsigned char *buffer, size_t count); 70 void ReaderPaused(int fd); 71 bool PauseAndWait(int timeout = 100); 75 72 76 private slots: 73 bool readchan(int chanfd, unsigned char* buffer, int dlen); 74 bool syncchan(int chanfd, int dlen, int keepsync); 75 76 private slots: 77 77 void WritePAT(ProgramAssociationTable*); 78 78 void WritePMT(ProgramMapTable*); 79 79 void ProcessMGT(const MasterGuideTable*); 80 80 void ProcessVCT(uint, const VirtualChannelTable*); 81 private:82 ATSCStreamData* _atsc_stream_data;83 81 82 private: 83 ATSCStreamData *_atsc_stream_data; 84 DeviceReadBuffer *_drb; 85 84 86 // statistics 85 TSStats _ts_stats; 86 long long _resync_count; 87 size_t loop; 87 TSStats _ts_stats; 88 long long _resync_count; 88 89 89 // Data for managing the device ringbuffer 90 struct { 91 pthread_t thread; 92 mutable pthread_mutex_t lock; 93 mutable pthread_mutex_t lock_stats; 94 95 bool run; 96 bool eof; 97 bool error; 98 bool request_pause; 99 bool paused; 100 size_t size; 101 size_t used; 102 size_t max_used; 103 size_t avg_used; 104 size_t avg_cnt; 105 size_t dev_read_size; 106 size_t min_read; 107 unsigned char * buffer; 108 unsigned char * readPtr; 109 unsigned char * writePtr; 110 unsigned char * endPtr; 111 } ringbuf; 90 /// unsynced packets to look at before giving up initially 91 static const uint INIT_SYNC_WINDOW_SIZE; 92 /// synced packets to require before starting recording 93 static const uint INIT_MIN_NUM_SYNC_PACKETS; 112 94 }; 113 95 114 96 #endif -
libs/libmythtv/hdtvrecorder.cpp
84 84 #include "atsctables.h" 85 85 #include "atscstreamdata.h" 86 86 #include "tv_rec.h" 87 #include "DeviceReadBuffer.h" 87 88 88 89 // AVLib/FFMPEG includes 89 90 #include "../libavcodec/avcodec.h" 90 91 #include "../libavformat/avformat.h" 91 92 #include "../libavformat/mpegts.h" 92 93 93 #define REPORT_RING_STATS 1 94 #define LOC QString("HDTVRec(%1):").arg(videodevice) 95 #define LOC_ERR QString("HDTVRec(%1) Error:").arg(videodevice) 94 96 95 97 #define DEFAULT_SUBCHANNEL 1 96 98 … … 109 111 }; 110 112 #endif 111 113 114 const uint HDTVRecorder::INIT_SYNC_WINDOW_SIZE = 50; 115 const uint HDTVRecorder::INIT_MIN_NUM_SYNC_PACKETS = 10; 116 112 117 HDTVRecorder::HDTVRecorder(TVRec *rec) 113 118 : DTVRecorder(rec, "HDTVRecorder"), _atsc_stream_data(0), _resync_count(0) 114 119 { … … 116 121 connect(_atsc_stream_data, SIGNAL(UpdatePATSingleProgram( 117 122 ProgramAssociationTable*)), 118 123 this, SLOT(WritePAT(ProgramAssociationTable*))); 119 connect(_atsc_stream_data, SIGNAL(UpdatePMTSingleProgram(ProgramMapTable*)), 120 this, SLOT(WritePMT(ProgramMapTable*))); 124 connect(_atsc_stream_data, 125 SIGNAL(UpdatePMTSingleProgram(ProgramMapTable*)), 126 this, 127 SLOT(WritePMT(ProgramMapTable*))); 121 128 connect(_atsc_stream_data, SIGNAL(UpdateMGT(const MasterGuideTable*)), 122 129 this, SLOT(ProcessMGT(const MasterGuideTable*))); 123 130 connect(_atsc_stream_data, … … 125 132 this, SLOT(ProcessVCT(uint, const VirtualChannelTable*))); 126 133 127 134 _buffer_size = TSPacket::SIZE * 1500; 128 if ((_buffer = new unsigned char[_buffer_size])) { 129 // make valgrind happy, initialize buffer memory 135 _buffer = new unsigned char[_buffer_size]; 136 137 // make valgrind happy, initialize buffer memory 138 if (_buffer) 130 139 memset(_buffer, 0xFF, _buffer_size); 131 }132 140 133 VERBOSE(VB_RECORD, QString("HD buffer size %1 KB").arg(_buffer_size/1024)); 141 VERBOSE(VB_RECORD, LOC + 142 QString("buffer size %1 KB").arg(_buffer_size/1024)); 134 143 135 ringbuf.run = false; 136 ringbuf.buffer = 0; 137 pthread_mutex_init(&ringbuf.lock, NULL); 138 pthread_mutex_init(&ringbuf.lock_stats, NULL); 139 loop = random() % (report_loops / 2); 144 _drb = new DeviceReadBuffer(this); 140 145 } 141 146 142 147 void HDTVRecorder::TeardownAll(void) 143 148 { 144 // Make SURE that the ringbuffer thread is cleaned up149 // Make SURE that the device read thread is cleaned up -- John Poet 145 150 StopRecording(); 146 151 147 if (_stream_fd >= 0) 148 { 149 close(_stream_fd); 150 _stream_fd = -1; 151 } 152 Close(); 153 152 154 if (_atsc_stream_data) 153 155 { 154 156 delete _atsc_stream_data; … … 164 166 HDTVRecorder::~HDTVRecorder() 165 167 { 166 168 TeardownAll(); 167 pthread_mutex_destroy(&ringbuf.lock); 168 pthread_mutex_destroy(&ringbuf.lock_stats); 169 delete _drb; 169 170 } 170 171 171 172 void HDTVRecorder::deleteLater(void) … … 189 190 SetOption("vbiformat", gContext->GetSetting("VbiFormat")); 190 191 } 191 192 192 bool HDTVRecorder::Open( )193 bool HDTVRecorder::Open(void) 193 194 { 194 195 if (!_atsc_stream_data || !_buffer) 195 196 return false; 196 197 197 198 #if FAKE_VIDEO 198 199 // open file instead of device 199 if (_stream_fd >=0 && close(_stream_fd))200 {201 VERBOSE(VB_IMPORTANT,202 QString("HDTVRecorder::Open(): Error, failed to close "203 "existing fd (%1)").arg(strerror(errno)));204 return false;205 }206 200 201 Close(); // close old video file 207 202 _stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR); 208 VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index])); 209 fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM; 203 204 VERBOSE(VB_IMPORTANT, LOC_ERR + QString("Opened fake video source '%1'") 205 .arg(FAKE_VIDEO_FILES[fake_video_index]) + ENO); 206 207 fake_video_index = (fake_video_index + 1) % FAKE_VIDEO_NUM; 208 210 209 #else 211 if ( _stream_fd <= 0)210 if (!IsOpen()) 212 211 _stream_fd = open(videodevice.ascii(), O_RDWR); 213 212 #endif 214 if (_stream_fd <= 0) 213 214 if (!IsOpen()) 215 215 { 216 VERBOSE(VB_IMPORTANT, QString("Can't open video device: %1 chanfd = %2")217 .arg(videodevice).arg(_stream_fd));218 perror("open video:");216 VERBOSE(VB_IMPORTANT, LOC_ERR + 217 QString("Couldn't open video device: '%1'") 218 .arg(videodevice) + ENO); 219 219 } 220 return (_stream_fd>0); 220 221 return IsOpen(); 221 222 } 222 223 224 bool HDTVRecorder::Close(void) 225 { 226 if (IsOpen() && (0 != close(_stream_fd))) 227 { 228 VERBOSE(VB_IMPORTANT, LOC_ERR + 229 "Failed to close file descriptor." + ENO); 230 return false; 231 } 232 _stream_fd = -1; 233 return true; 234 } 235 223 236 void HDTVRecorder::SetStreamData(ATSCStreamData *stream_data) 224 237 { 225 238 if (stream_data == _atsc_stream_data) … … 231 244 delete old_data; 232 245 } 233 246 234 bool readchan(int chanfd, unsigned char* buffer, int dlen) { 247 bool HDTVRecorder::readchan(int chanfd, unsigned char* buffer, int dlen) 248 { 235 249 int len = read(chanfd, buffer, dlen); // read next byte 236 250 if (dlen != len) 237 251 { 238 252 if (len < 0) 239 253 { 240 VERBOSE(VB_IMPORTANT, QString("HD1 error reading from device"));241 perror("read");254 VERBOSE(VB_IMPORTANT, LOC_ERR + 255 "Reading from device failed" + ENO); 242 256 } 243 257 else if (len == 0) 244 VERBOSE(VB_IMPORTANT, QString("HD2 end of file found in packet"));258 VERBOSE(VB_IMPORTANT, LOC_ERR + "EOF found in TS packet"); 245 259 else 246 VERBOSE(VB_IMPORTANT, QString("HD3 partial read. This shouldn't happen!")); 260 VERBOSE(VB_IMPORTANT, LOC_ERR + 261 "Partial read during initial TS sync phase"); 247 262 } 248 263 return (dlen == len); 249 264 } 250 265 251 bool syncchan(int chanfd, int dlen, int keepsync) { 266 bool HDTVRecorder::syncchan(int chanfd, int dlen, int keepsync) 267 { 252 268 unsigned char b[188]; 253 269 int i, j; 254 for (i=0; i<dlen; i++) { 270 for (i=0; i<dlen; i++) 271 { 255 272 if (!readchan(chanfd, b, 1)) 256 273 break; 257 274 if (SYNC_BYTE == b[0]) 258 275 { 259 if (readchan(chanfd, &b[1], TSPacket::SIZE-1)) { 276 if (readchan(chanfd, &b[1], TSPacket::SIZE-1)) 277 { 260 278 i += (TSPacket::SIZE - 1); 261 279 for (j=0; j<keepsync; j++) 262 280 { … … 268 286 } 269 287 if (j==keepsync) 270 288 { 271 VERBOSE(VB_RECORD, 272 QString("HD4 obtained device stream sync after reading %1 bytes"). 273 arg(dlen)); 289 VERBOSE(VB_RECORD, LOC + "Obtained TS sync, "+ 290 QString("after reading %1 bytes").arg(dlen)); 274 291 return true; 275 292 } 276 293 continue; … … 278 295 break; 279 296 } 280 297 } 281 VERBOSE(VB_IMPORTANT, QString("HD5 Error: could not obtain sync"));298 VERBOSE(VB_IMPORTANT, LOC_ERR + "Could not obtain TS sync"); 282 299 return false; 283 300 } 284 301 285 void * HDTVRecorder::boot_ringbuffer(void * arg) 286 { 287 HDTVRecorder *dtv = (HDTVRecorder *)arg; 288 dtv->fill_ringbuffer(); 289 return NULL; 290 } 302 #define SR_CHK(MSG) \ 303 if (!ok) { VERBOSE(VB_IMPORTANT, MSG); _error = true; return; } 291 304 292 void HDTVRecorder:: fill_ringbuffer(void)305 void HDTVRecorder::StartRecording(void) 293 306 { 294 int errcnt = 0; 295 int len; 296 size_t unused, used; 297 size_t contiguous; 298 size_t read_size; 299 bool run, request_pause, paused; 307 bool ok = true; 308 uint len = 0; 309 uint remainder = 0; 300 310 301 pthread_mutex_lock(&ringbuf.lock); 302 ringbuf.run = true; 303 pthread_mutex_unlock(&ringbuf.lock); 311 VERBOSE(VB_RECORD, LOC + "StartRecording()"); 304 312 305 for (;;) 306 { 307 pthread_mutex_lock(&ringbuf.lock); 308 run = ringbuf.run; 309 unused = ringbuf.size - ringbuf.used; 310 request_pause = ringbuf.request_pause; 311 paused = ringbuf.paused; 312 pthread_mutex_unlock(&ringbuf.lock); 313 ok = Open(); 314 SR_CHK("Failed to open device."); 313 315 314 if (!run)315 break;316 ok = _drb->Setup(videodevice, _stream_fd); 317 SR_CHK("Failed to allocate device read buffer."); 316 318 317 if (request_pause) 318 { 319 pthread_mutex_lock(&ringbuf.lock); 320 ringbuf.paused = true; 321 pthread_mutex_unlock(&ringbuf.lock); 319 ok = syncchan(_stream_fd, 320 INIT_SYNC_WINDOW_SIZE * TSPacket::SIZE, 321 INIT_MIN_NUM_SYNC_PACKETS); 322 SR_CHK("Failed to sync to transport stream to valid packet."); 322 323 323 pauseWait.wakeAll(); 324 if (tvrec) 325 tvrec->RecorderPaused(); 324 _drb->Start(); 326 325 327 usleep(1000);328 continue;329 }330 else if (paused)331 {332 pthread_mutex_lock(&ringbuf.lock);333 ringbuf.writePtr = ringbuf.readPtr = ringbuf.buffer;334 ringbuf.used = 0;335 ringbuf.paused = false;336 pthread_mutex_unlock(&ringbuf.lock);337 }338 339 contiguous = ringbuf.endPtr - ringbuf.writePtr;340 341 while (unused < TSPacket::SIZE && contiguous > TSPacket::SIZE)342 {343 usleep(500);344 345 pthread_mutex_lock(&ringbuf.lock);346 unused = ringbuf.size - ringbuf.used;347 request_pause = ringbuf.request_pause;348 pthread_mutex_unlock(&ringbuf.lock);349 350 if (request_pause)351 break;352 }353 if (request_pause)354 continue;355 356 read_size = unused > contiguous ? contiguous : unused;357 if (read_size > ringbuf.dev_read_size)358 read_size = ringbuf.dev_read_size;359 360 len = read(_stream_fd, ringbuf.writePtr, read_size);361 362 if (len < 0)363 {364 if (errno == EINTR)365 continue;366 367 VERBOSE(VB_IMPORTANT, QString("HD7 error reading from %1")368 .arg(videodevice));369 perror("read");370 if (++errcnt > 5)371 {372 pthread_mutex_lock(&ringbuf.lock);373 ringbuf.error = true;374 pthread_mutex_unlock(&ringbuf.lock);375 376 break;377 }378 379 usleep(500);380 continue;381 }382 else if (len == 0)383 {384 if (++errcnt > 5)385 {386 VERBOSE(VB_IMPORTANT, QString("HD8 %1 end of file found.")387 .arg(videodevice));388 389 pthread_mutex_lock(&ringbuf.lock);390 ringbuf.eof = true;391 pthread_mutex_unlock(&ringbuf.lock);392 393 break;394 }395 usleep(500);396 continue;397 }398 399 errcnt = 0;400 401 pthread_mutex_lock(&ringbuf.lock);402 ringbuf.used += len;403 used = ringbuf.used;404 ringbuf.writePtr += len;405 pthread_mutex_unlock(&ringbuf.lock);406 407 #ifdef REPORT_RING_STATS408 pthread_mutex_lock(&ringbuf.lock_stats);409 410 if (ringbuf.max_used < used)411 ringbuf.max_used = used;412 413 ringbuf.avg_used = ((ringbuf.avg_used * ringbuf.avg_cnt) + used)414 / ++ringbuf.avg_cnt;415 pthread_mutex_unlock(&ringbuf.lock_stats);416 #endif417 418 if (ringbuf.writePtr == ringbuf.endPtr)419 ringbuf.writePtr = ringbuf.buffer;420 }421 422 close(_stream_fd);423 _stream_fd = -1;424 }425 426 /* read count bytes from ring into buffer */427 int HDTVRecorder::ringbuf_read(unsigned char *buffer, size_t count)428 {429 size_t avail;430 size_t cnt = count;431 size_t min_read;432 unsigned char *cPtr = buffer;433 434 bool dev_error = false;435 bool dev_eof = false;436 437 pthread_mutex_lock(&ringbuf.lock);438 avail = ringbuf.used;439 pthread_mutex_unlock(&ringbuf.lock);440 441 min_read = cnt < ringbuf.min_read ? cnt : ringbuf.min_read;442 443 while (min_read > avail)444 {445 usleep(50000);446 447 if (request_pause || dev_error || dev_eof)448 return 0;449 450 pthread_mutex_lock(&ringbuf.lock);451 dev_error = ringbuf.error;452 dev_eof = ringbuf.eof;453 avail = ringbuf.used;454 pthread_mutex_unlock(&ringbuf.lock);455 }456 if (cnt > avail)457 cnt = avail;458 459 if (ringbuf.readPtr + cnt > ringbuf.endPtr)460 {461 size_t len;462 463 // Process as two pieces464 len = ringbuf.endPtr - ringbuf.readPtr;465 memcpy(cPtr, ringbuf.readPtr, len);466 cPtr += len;467 len = cnt - len;468 469 // Wrap arround to begining of buffer470 ringbuf.readPtr = ringbuf.buffer;471 memcpy(cPtr, ringbuf.readPtr, len);472 ringbuf.readPtr += len;473 }474 else475 {476 memcpy(cPtr, ringbuf.readPtr, cnt);477 ringbuf.readPtr += cnt;478 }479 480 pthread_mutex_lock(&ringbuf.lock);481 ringbuf.used -= cnt;482 pthread_mutex_unlock(&ringbuf.lock);483 484 if (ringbuf.readPtr == ringbuf.endPtr)485 ringbuf.readPtr = ringbuf.buffer;486 else487 {488 #ifdef REPORT_RING_STATS489 size_t samples, avg, max;490 491 if (++loop == report_loops)492 {493 loop = 0;494 pthread_mutex_lock(&ringbuf.lock_stats);495 avg = ringbuf.avg_used;496 samples = ringbuf.avg_cnt;497 max = ringbuf.max_used;498 ringbuf.avg_used = 0;499 ringbuf.avg_cnt = 0;500 ringbuf.max_used = 0;501 pthread_mutex_unlock(&ringbuf.lock_stats);502 503 VERBOSE(VB_IMPORTANT, QString("%1 ringbuf avg %2% max %3%"504 " samples %4")505 .arg(videodevice)506 .arg((static_cast<double>(avg)507 / ringbuf.size) * 100.0)508 .arg((static_cast<double>(max)509 / ringbuf.size) * 100.0)510 .arg(samples));511 }512 else513 #endif514 usleep(25);515 }516 517 return cnt;518 }519 520 void HDTVRecorder::StartRecording(void)521 {522 bool pause;523 bool dev_error, dev_eof;524 int len;525 526 const int unsyncpackets = 50; // unsynced packets to look at before giving up527 const int syncpackets = 10; // synced packets to require before starting recording528 529 VERBOSE(VB_RECORD, QString("StartRecording"));530 531 if (!Open())532 {533 _error = true;534 return;535 }536 537 326 _request_recording = true; 538 _recording = true;327 _recording = true; 539 328 540 // Setup device ringbuffer 541 delete[] ringbuf.buffer; 542 543 // ringbuf.size = 60 * 1024 * TSPacket::SIZE; 544 ringbuf.size = gContext->GetNumSetting("HDRingbufferSize", 50*188); 545 ringbuf.size *= 1024; 546 547 if ((ringbuf.buffer = 548 new unsigned char[ringbuf.size + TSPacket::SIZE]) == NULL) 329 // Process packets while recording is requested 330 while (_request_recording && !_error) 549 331 { 550 VERBOSE(VB_IMPORTANT, "Failed to allocate HDTVRecorder ring buffer."); 551 _error = true; 552 return; 553 } 554 555 memset(ringbuf.buffer, 0xFF, ringbuf.size + TSPacket::SIZE); 556 ringbuf.endPtr = ringbuf.buffer + ringbuf.size; 557 ringbuf.readPtr = ringbuf.writePtr = ringbuf.buffer; 558 ringbuf.dev_read_size = TSPacket::SIZE * 48; 559 ringbuf.min_read = TSPacket::SIZE * 4; 560 ringbuf.used = 0; 561 ringbuf.max_used = 0; 562 ringbuf.avg_used = 0; 563 ringbuf.avg_cnt = 0; 564 ringbuf.request_pause = false; 565 ringbuf.paused = false; 566 ringbuf.error = false; 567 ringbuf.eof = false; 568 569 VERBOSE(VB_RECORD, QString("HD ring buffer size %1 KB") 570 .arg(ringbuf.size/1024)); 571 572 // sync device stream so it starts with a valid ts packet 573 if (!syncchan(_stream_fd, TSPacket::SIZE*unsyncpackets, syncpackets)) 574 { 575 _error = true; 576 return; 577 } 578 579 // create thread to fill the ringbuffer 580 pthread_create(&ringbuf.thread, NULL, boot_ringbuffer, 581 reinterpret_cast<void *>(this)); 582 583 int remainder = 0; 584 // TRANSFER DATA 585 while (_request_recording) 586 { 587 pthread_mutex_lock(&ringbuf.lock); 588 dev_error = ringbuf.error; 589 dev_eof = ringbuf.eof; 590 pause = ringbuf.paused; 591 pthread_mutex_unlock(&ringbuf.lock); 592 593 if (request_pause) 594 { 595 pthread_mutex_lock(&ringbuf.lock); 596 ringbuf.request_pause = true; 597 pthread_mutex_unlock(&ringbuf.lock); 598 599 usleep(1000); 332 if (PauseAndWait()) 600 333 continue; 601 }602 else if (pause)603 {604 pthread_mutex_lock(&ringbuf.lock);605 ringbuf.request_pause = false;606 pthread_mutex_unlock(&ringbuf.lock);607 334 608 usleep(1500); 609 continue; 610 } 335 len = _drb->Read(&(_buffer[remainder]), _buffer_size - remainder); 611 336 612 if (dev_error)613 {614 VERBOSE(VB_IMPORTANT, "HDTV: device error detected");615 _error = true;616 break;617 }618 619 if (dev_eof)620 break;621 622 len = ringbuf_read(&(_buffer[remainder]), _buffer_size - remainder);623 624 337 if (len == 0) 625 338 continue; 626 339 627 340 len += remainder; 628 remainder = ProcessData (_buffer, len);341 remainder = ProcessDataTS(_buffer, len); 629 342 if (remainder > 0) // leftover bytes 630 343 memmove(_buffer, &(_buffer[_buffer_size - remainder]), 631 344 remainder); 345 346 // Check for DRB errors 347 if (_drb->IsErrored()) 348 { 349 VERBOSE(VB_IMPORTANT, LOC_ERR + "Device error detected"); 350 _error = true; 351 } 352 353 if (_drb->IsEOF()) 354 { 355 VERBOSE(VB_IMPORTANT, LOC_ERR + "Device EOF detected"); 356 _error = true; 357 } 632 358 } 633 359 634 360 FinishRecording(); 635 361 _recording = false; 636 362 } 363 #undef SR_CHK 637 364 638 365 void HDTVRecorder::StopRecording(void) 639 366 { … … 649 376 650 377 _request_recording = false; 651 378 652 pthread_mutex_lock(&ringbuf.lock); 653 bool run = ringbuf.run; 654 ringbuf.run = false; 655 pthread_mutex_unlock(&ringbuf.lock); 379 _drb->Stop(); 656 380 657 if (run) 658 pthread_join(ringbuf.thread, NULL); 381 if (ok) 382 _drb->Teardown(); 383 else // Better to have a memory leak, than a segfault? -- John Poet 384 VERBOSE(VB_IMPORTANT, LOC_ERR + "DeviceReadBuffer not cleaned up!"); 659 385 660 if (!ok) 661 { 662 // Better to have a memory leak, then a segfault? 663 VERBOSE(VB_IMPORTANT, "DTV ringbuffer not cleaned up!\n"); 664 } 665 else 666 { 667 delete[] ringbuf.buffer; 668 ringbuf.buffer = 0; 669 } 386 while (_recording) 387 usleep(2000); 388 670 389 tvrec = rec; 671 390 } 672 391 673 void HDTVRecorder:: Pause(bool /*clear*/)392 void HDTVRecorder::ReaderPaused(int /*fd*/) 674 393 { 675 pthread_mutex_lock(&ringbuf.lock); 676 ringbuf.paused = false; 677 pthread_mutex_unlock(&ringbuf.lock); 678 request_pause = true; 394 pauseWait.wakeAll(); 395 if (tvrec) 396 tvrec->RecorderPaused(); 679 397 } 680 398 681 bool HDTVRecorder:: IsPaused(void) const399 bool HDTVRecorder::PauseAndWait(int timeout) 682 400 { 683 pthread_mutex_lock(&ringbuf.lock); 684 bool paused = ringbuf.paused; 685 pthread_mutex_unlock(&ringbuf.lock); 401 #ifdef USE_DRB 402 if (request_pause) 403 { 404 paused = true; 405 if (!_drb->IsPaused()) 406 _drb->SetRequestPause(true); 686 407 408 unpauseWait.wait(timeout); 409 } 410 else if (_drb->IsPaused()) 411 { 412 _drb->SetRequestPause(false); 413 _drb->WaitForUnpause(timeout); 414 paused = _drb->IsPaused(); 415 } 416 else 417 { 418 paused = false; 419 } 687 420 return paused; 421 #else // if !USE_DRB 422 return RecorderBase::PauseAndWait(timeout); 423 #endif // !USE_DRB 688 424 } 689 425 690 426 int HDTVRecorder::ResyncStream(unsigned char *buffer, int curr_pos, int len) … … 695 431 if (nextpos >= len) 696 432 return -1; // not enough bytes; caller should try again 697 433 698 while (buffer[pos] != SYNC_BYTE || buffer[nextpos] != SYNC_BYTE) { 434 while (buffer[pos] != SYNC_BYTE || buffer[nextpos] != SYNC_BYTE) 435 { 699 436 pos++; 700 437 nextpos++; 701 438 if (nextpos == len) … … 707 444 708 445 void HDTVRecorder::WritePAT(ProgramAssociationTable *pat) 709 446 { 447 if (!pat) 448 return; 449 710 450 int next = (pat->tsheader()->ContinuityCounter()+1)&0xf; 711 451 pat->tsheader()->SetContinuityCounter(next); 712 452 ringBuffer->Write(pat->tsheader()->data(), TSPacket::SIZE); 713 453 } 714 454 715 #if WHACK_A_BUG_VIDEO716 static int WABV_base_pid = 0x100;717 #define WABV_WAIT 60718 static int WABV_wait_a_while = WABV_WAIT;719 bool WABV_started = false;720 #endif721 722 #if WHACK_A_BUG_AUDIO723 static int WABA_base_pid = 0x200;724 #define WABA_WAIT 60725 static int WABA_wait_a_while = WABA_WAIT;726 bool WABA_started = false;727 #endif728 729 455 void HDTVRecorder::WritePMT(ProgramMapTable* pmt) 730 456 { 731 if (pmt) { 732 int next = (pmt->tsheader()->ContinuityCounter()+1)&0xf; 733 pmt->tsheader()->SetContinuityCounter(next); 457 if (!pmt) 458 return; 734 459 735 #if WHACK_A_BUG_VIDEO 736 WABV_wait_a_while--; 737 if (WABV_wait_a_while<=0) { 738 WABV_started = true; 739 WABV_wait_a_while = WABV_WAIT; 740 WABV_base_pid = (((WABV_base_pid-0x100)+1)%32)+0x100; 741 if (StreamID::MPEG2Video != StreamData()->PMT()->StreamType(0)) 742 { 743 VERBOSE(VB_IMPORTANT, "HDTVRecorder::WritePMT(): Error," 744 "Whack a Bug can not rewrite PMT, wrong stream type"); 745 } 746 else 747 { 748 VERBOSE(VB_IMPORTANT, QString("Whack a Bug: new video pid %1"). 749 arg(WABV_base_pid)); 750 // rewrite video pid 751 const uint old_video_pid=StreamData()->PMT()->StreamPID(0); 752 StreamData()->PMT()->SetStreamPID(0, WABV_base_pid); 753 if (StreamData()->PMT()->PCRPID() == old_video_pid) 754 StreamData()->PMT()->SetPCRPID(WABV_base_pid); 755 StreamData()->PMT()->SetCRC(StreamData()->PMT()->CalcCRC()); 756 VERBOSE(VB_IMPORTANT, StreamData()->PMT()->toString()); 757 } 758 } 759 #endif 760 #if WHACK_A_BUG_AUDIO 761 WABA_wait_a_while--; 762 if (WABA_wait_a_while<=0) { 763 WABA_started = true; 764 WABA_wait_a_while = WABA_WAIT; 765 WABA_base_pid = (((WABA_base_pid-0x200)+1)%32)+0x200; 766 VERBOSE(VB_IMPORTANT, QString("Whack a Bug: new audio BASE pid %1").arg(WABA_base_pid)); 767 // rewrite audio pids 768 for (uint i=0; i<StreamData()->PMT()->StreamCount(); i++) { 769 if (StreamID::MPEG2Audio == StreamData()->PMT()->StreamType(i) || 770 StreamID::MPEG2Audio == StreamData()->PMT()->StreamType(i)) { 771 const uint old_audio_pid = StreamData()->PMT()->StreamPID(i); 772 const uint new_audio_pid = WABA_base_pid + old_audio_pid; 773 StreamData()->PMT()->SetStreamPID(i, new_audio_pid); 774 if (StreamData()->PMT()->PCRPID() == old_audio_pid) 775 StreamData()->PMT()->SetPCRPID(new_audio_pid); 776 StreamData()->PMT()->SetCRC(StreamData()->PMT()->CalcCRC()); 777 VERBOSE(VB_IMPORTANT, StreamData()->PMT()->toString()); 778 } 779 } 780 } 781 #endif 782 783 ringBuffer->Write(pmt->tsheader()->data(), TSPacket::SIZE); 784 } 460 int next = (pmt->tsheader()->ContinuityCounter()+1)&0xf; 461 pmt->tsheader()->SetContinuityCounter(next); 462 ringBuffer->Write(pmt->tsheader()->data(), TSPacket::SIZE); 785 463 } 786 464 787 465 /** \fn HDTVRecorder::ProcessMGT(const MasterGuideTable*) … … 820 498 { 821 499 if (vct->ProgramNumber(i) != (uint)StreamData()->DesiredProgram()) 822 500 { 823 VERBOSE(VB_RECORD, 824 QString("Resetting desired program from %1" 825 " to %2") 501 VERBOSE(VB_RECORD, LOC_ERR + 502 QString("Resetting desired program from %1 to %2") 826 503 .arg(StreamData()->DesiredProgram()) 827 504 .arg(vct->ProgramNumber(i))); 828 505 // Do a (partial?) reset here if old desired … … 834 511 } 835 512 if (!found) 836 513 { 837 VERBOSE(VB_IMPORTANT, 514 VERBOSE(VB_IMPORTANT, LOC_ERR + 838 515 QString("Desired channel %1_%2 not found;" 839 516 " using %3_%4 instead.") 840 517 .arg(StreamData()->DesiredMajorChannel()) 841 518 .arg(StreamData()->DesiredMinorChannel()) 842 519 .arg(vct->MajorChannel(0)) 843 .arg(vct->MinorChannel(0)) );844 VERBOSE(VB_IMPORTANT, vct->toString()); 520 .arg(vct->MinorChannel(0)) + "\n" + vct->toString()); 521 845 522 StreamData()->SetDesiredProgram(vct->ProgramNumber(0)); 846 523 } 847 524 } … … 854 531 if (_wait_for_keyframe && !_keyframe_seen) 855 532 return; 856 533 857 #if WHACK_A_BUG_VIDEO858 if (WABV_started)859 ((TSPacket*)(tspacket))->SetPID(WABV_base_pid);860 #endif861 862 534 ringBuffer->Write(tspacket->data(), TSPacket::SIZE); 863 535 } 864 536 … … 868 540 if (_wait_for_keyframe && !_keyframe_seen) 869 541 return; 870 542 871 #if WHACK_A_BUG_AUDIO872 if (WABA_started)873 ((TSPacket*)(tspacket))->SetPID(WABA_base_pid+tspacket->PID());874 #endif875 876 543 ringBuffer->Write(tspacket->data(), TSPacket::SIZE); 877 544 } 878 545 … … 902 569 return ok; 903 570 } 904 571 905 int HDTVRecorder::ProcessData(unsigned char *buffer,int len)572 uint HDTVRecorder::ProcessDataTS(unsigned char *buffer, uint len) 906 573 { 907 int pos = 0; 574 if (len < TSPacket::SIZE) 575 return len; 908 576 909 while (pos + 187 < len) // while we have a whole packet left 577 uint pos = 0; 578 uint end = len - TSPacket::SIZE; 579 while (pos <= end) // while we have a whole packet left 910 580 { 911 581 if (buffer[pos] != SYNC_BYTE) 912 582 { 913 583 _resync_count++; 914 if (25 == _resync_count) 915 VERBOSE(VB_RECORD, QString("Resyncing many of times, suppressing error messages")); 584 585 if (25 == _resync_count) 586 { 587 VERBOSE(VB_RECORD, LOC + "Resyncing many of times, " 588 "suppressing error messages"); 589 } 916 590 else if (25 > _resync_count) 917 VERBOSE(VB_RECORD, QString("Resyncing")); 591 { 592 VERBOSE(VB_RECORD, LOC + "Resyncing"); 593 } 594 918 595 int newpos = ResyncStream(buffer, pos, len); 919 596 if (newpos == -1) 920 597 return len - pos; … … 925 602 } 926 603 927 604 const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[pos]); 928 if (ProcessTSPacket(*pkt)) { 929 pos += TSPacket::SIZE; // Advance to next TS packet 605 if (ProcessTSPacket(*pkt)) 606 { 607 // Advance to next TS packet 608 pos += TSPacket::SIZE; 609 610 // Take care of statistics 930 611 _ts_stats.IncrTSPacketCount(); 931 if (0 == _ts_stats.TSPacketCount()%1000000) 932 VERBOSE(VB_RECORD, _ts_stats.toString()); 933 } else // Let it resync in case of dropped bytes 934 buffer[pos] = SYNC_BYTE+1; 612 if (0 == _ts_stats.TSPacketCount() % 1000000) 613 VERBOSE(VB_RECORD, LOC + "\n" + _ts_stats.toString()); 614 615 } 616 else 617 { 618 pos++; // Resync on invalid packet, in case of dropped bytes... 619 } 935 620 } 936 621 937 622 return len - pos; … … 939 624 940 625 void HDTVRecorder::Reset(void) 941 626 { 942 VERBOSE(VB_RECORD, "HDTVRecorder::Reset(void)");627 VERBOSE(VB_RECORD, LOC + "Reset(void)"); 943 628 DTVRecorder::Reset(); 944 629 945 630 _error = false; … … 947 632 _ts_stats.Reset(); 948 633 949 634 if (curRecording) 950 {951 635 curRecording->ClearPositionMap(MARK_GOP_BYFRAME); 636 637 if (!IsOpen()) 638 return /* true */; 639 640 if (!IsPaused()) 641 { 642 Pause(); 643 WaitForPause(); 952 644 } 953 645 954 if (_stream_fd >= 0) 646 if (!Close()) 647 return /* false */; 648 649 if (Open()) 955 650 { 956 if (!IsPaused()) 957 { 958 Pause(); 959 WaitForPause(); 960 } 961 int ret = close(_stream_fd); 962 if (ret < 0) 963 { 964 perror("close"); 965 return; 966 } 967 #if FAKE_VIDEO 968 // open file instead of device 969 _stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR); 970 VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index])); 971 fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM; 972 #else 973 _stream_fd = open(videodevice.ascii(), O_RDWR); 974 #endif 975 if (_stream_fd < 0) 976 { 977 VERBOSE(VB_IMPORTANT, QString("HD1 Can't open video device: %1 chanfd = %2"). 978 arg(videodevice).arg(_stream_fd)); 979 perror("open video"); 980 return; 981 } 982 else 983 { 984 pthread_mutex_lock(&ringbuf.lock); 985 ringbuf.used = 0; 986 ringbuf.max_used = 0; 987 ringbuf.readPtr = ringbuf.writePtr = ringbuf.buffer; 988 pthread_mutex_unlock(&ringbuf.lock); 989 } 651 _drb->Reset(videodevice, _stream_fd); 990 652 Unpause(); 653 return /* true */; 991 654 } 655 656 VERBOSE(VB_IMPORTANT, LOC_ERR + "Couldn't open video device: " + 657 QString("'%1'").arg(videodevice) + ENO); 658 return /* false */; 992 659 } -
libs/libmythtv/libmythtv.pro
206 206 207 207 # TVRec & Recorder base classes 208 208 HEADERS += tv_rec.h 209 HEADERS += recorderbase.h 209 HEADERS += recorderbase.h DeviceReadBuffer.h 210 210 HEADERS += dtvrecorder.h dummydtvrecorder.h 211 211 SOURCES += tv_rec.cpp 212 SOURCES += recorderbase.cpp 212 SOURCES += recorderbase.cpp DeviceReadBuffer.cpp 213 213 SOURCES += dtvrecorder.cpp dummydtvrecorder.cpp 214 214 215 215 # MPEG parsing stuff -
libs/libmythtv/dvbrecorder.h
1 // -*- Mode: c++ -*- 1 2 /* 2 3 * Copyright (C) Kenneth Aafloy 2003 3 4 * … … 17 18 #include "dtvrecorder.h" 18 19 #include "tspacket.h" 19 20 #include "transform.h" 21 #include "DeviceReadBuffer.h" 20 22 21 23 #include "dvbtypes.h" 22 24 #include "dvbchannel.h" … … 25 27 class ProgramAssociationTable; 26 28 class ProgramMapTable; 27 29 30 class PIDInfo 31 { 32 public: 33 PIDInfo() : 34 filter_fd(-1), continuityCount(0xFF), ip(NULL), 35 isVideo(false), isEncrypted(false), payloadStartSeen(false) {;} 36 37 int filter_fd; ///< Input filter file descriptor 38 uint continuityCount; ///< last Continuity Count (sentinel 0xFF) 39 ipack *ip; ///< TS->PES converter 40 bool isVideo; 41 bool isEncrypted; ///< true if PID is marked as encrypted 42 bool payloadStartSeen; ///< true if payload start packet seen on PID 43 44 inline void Close(void); 45 inline bool CheckCC(uint cc); 46 }; 47 typedef QMap<uint,PIDInfo*> PIDInfoMap; 48 28 49 /** \class DVBRecorder 29 50 * \brief This is a specialization of DTVRecorder used to 30 51 * handle streams from DVB drivers. 31 52 * 32 53 * \sa DTVRecorder, HDTVRecorder 33 54 */ 34 class DVBRecorder: public DTVRecorder 55 class DVBRecorder: public DTVRecorder, private ReaderPausedCB 35 56 { 36 57 Q_OBJECT 37 58 public: … … 47 68 48 69 void StartRecording(void); 49 70 void Reset(void); 71 void StopRecording(void); 50 72 51 73 bool Open(void); 74 bool IsOpen(void) const { return _stream_fd >= 0; } 52 75 void Close(void); 53 76 54 77 bool RecordsTransportStream(void) const … … 63 86 64 87 private: 65 88 void TeardownAll(void); 66 void ReadFromDMX(void);67 static void ProcessDataPS(unsigned char *buffer, int len, void *priv);68 void LocalProcessDataPS(unsigned char *buffer, int len);69 void LocalProcessDataTS(unsigned char *buffer, int len);70 89 90 bool Poll(void) const; 91 92 uint ProcessDataTS(unsigned char *buffer, uint len); 93 bool ProcessTSPacket(const TSPacket& tspacket); 94 95 void AutoPID(void); 96 bool OpenFilters(void); 71 97 void CloseFilters(void); 72 98 void OpenFilter(uint pid, ES_Type type, dmx_pes_type_t pes_type, 73 99 uint mpeg_stream_type); 74 bool SetDemuxFilters(void);75 void AutoPID(void);76 100 77 101 void SetPAT(ProgramAssociationTable*); 78 102 void SetPMT(ProgramMapTable*); 79 103 80 104 void CreatePAT(void); 81 105 void CreatePMT(void); 106 void WritePATPMT(void); 82 107 83 108 void DebugTSHeader(unsigned char* buffer, int len); 84 109 110 void ReaderPaused(int fd); 111 bool PauseAndWait(int timeout = 100); 112 113 ipack *CreateIPack(ES_Type type); 114 void ProcessDataPS(unsigned char *buffer, uint len); 115 static void process_data_ps_cb(unsigned char*,int,void*); 116 117 DeviceReadBuffer *_drb; 118 85 119 // Options set in SetOption() 86 120 int _card_number_option; 87 121 bool _record_transport_stream_option; … … 90 124 // DVB stuff 91 125 DVBChannel* dvbchannel; 92 126 127 // general recorder stuff 128 /// Set when we want to generate a new filter set 129 bool _reset_pid_filters; 130 QMutex _pid_lock; 131 PIDInfoMap _pid_infos; 132 93 133 // PS recorder stuff 94 134 int _ps_rec_audio_id; 95 135 int _ps_rec_video_id; 96 136 unsigned char _ps_rec_buf[3]; 97 pid_ipack_t _ps_rec_pid_ipack;98 137 99 138 // TS recorder stuff 100 139 ProgramAssociationTable *_pat; 101 140 ProgramMapTable *_pmt; 102 141 uint _next_pmt_version; 103 142 uint _ts_packets_until_psip_sync; 104 QMap<uint,bool> _payload_start_seen;105 QMap<uint,bool> _videoPID;106 143 107 144 // Input Misc 108 145 /// PMT on input side 109 146 PMTObject _input_pmt; 110 /// Input filter file descriptors111 vector<int> _pid_filters;112 /// Input polling structure for _stream_fd113 struct pollfd _polls;114 /// Set when we want to generate a new filter set115 bool _reset_pid_filters;116 /// Encrypted PID, so we can drop these117 QMap<uint,bool> _encrypted_pid;118 147 119 // locking120 QMutex _pid_lock;121 122 148 // Statistics 123 uint_continuity_error_count;124 uint_stream_overflow_count;125 uint_bad_packet_count;126 QMap<uint,int> _continuity_count;149 mutable uint _continuity_error_count; 150 mutable uint _stream_overflow_count; 151 mutable uint _bad_packet_count; 152 mutable MythTimer _poll_timer; 127 153 128 // For debugging129 bool data_found; ///< debugging variable used by transform.c130 bool keyframe_found;131 132 154 // Constants 133 155 static const int PMT_PID; 134 156 static const int TSPACKETS_BETWEEN_PSIP_SYNC; … … 136 158 static const int POLL_WARNING_TIMEOUT; 137 159 }; 138 160 161 inline void PIDInfo::Close(void) 162 { 163 if (filter_fd >= 0) 164 close(filter_fd); 165 166 if (ip) 167 { 168 free_ipack(ip); 169 free(ip); 170 } 171 } 172 173 inline bool PIDInfo::CheckCC(uint new_cnt) 174 { 175 if (continuityCount == 0xFF) 176 { 177 continuityCount = new_cnt; 178 return true; 179 } 180 181 continuityCount = (continuityCount+1) & 0xf; 182 if (continuityCount == new_cnt) 183 return true; 184 185 continuityCount = new_cnt; 186 return false; 187 } 188 139 189 #endif -
libs/libmythtv/DeviceReadBuffer.cpp
1 #include <algorithm> 2 #include <cassert> 3 4 #include "DeviceReadBuffer.h" 5 #include "mythcontext.h" 6 #include "tspacket.h" 7 8 #define REPORT_RING_STATS 1 9 10 #define LOC QString("DevRdB(%1): ").arg(videodevice) 11 #define LOC_ERR QString("DevRdB(%1) Error: ").arg(videodevice) 12 13 DeviceReadBuffer::DeviceReadBuffer(ReaderPausedCB *cb, bool use_poll) 14 : videodevice(QString::null), _stream_fd(-1), 15 readerPausedCB(cb), 16 17 // Data for managing the device ringbuffer 18 run(false), running(false), 19 eof(false), error(false), 20 request_pause(false), paused(false), 21 using_poll(use_poll), 22 23 size(0), used(0), 24 dev_read_size(0), min_read(0), 25 26 buffer(NULL), readPtr(NULL), 27 writePtr(NULL), endPtr(NULL), 28 29 // statistics 30 max_used(0), avg_used(0), 31 avg_cnt(0) 32 { 33 } 34 35 DeviceReadBuffer::~DeviceReadBuffer() 36 { 37 } 38 39 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd) 40 { 41 QMutexLocker locker(&lock); 42 43 if (buffer) 44 delete[] buffer; 45 46 videodevice = streamName; 47 _stream_fd = streamfd; 48 49 // Setup device ringbuffer 50 eof = false; 51 error = false; 52 request_pause = false; 53 paused = false; 54 55 size = gContext->GetNumSetting("HDRingbufferSize", 56 50 * TSPacket::SIZE) * 1024; 57 used = 0; 58 dev_read_size = TSPacket::SIZE * (using_poll ? 256 : 48); 59 min_read = TSPacket::SIZE * 4; 60 61 buffer = new unsigned char[size + TSPacket::SIZE]; 62 readPtr = buffer; 63 writePtr = buffer; 64 endPtr = buffer + size; 65 66 // Initialize buffer, if it exists 67 if (!buffer) 68 return false; 69 memset(buffer, 0xFF, size + TSPacket::SIZE); 70 71 // Initialize statistics 72 max_used = 0; 73 avg_used = 0; 74 avg_cnt = 0; 75 lastReport.start(); 76 77 VERBOSE(VB_RECORD, LOC + QString("buffer size %1 KB").arg(size/1024)); 78 79 return true; 80 } 81 82 void DeviceReadBuffer::Teardown(void) 83 { 84 if (buffer) 85 delete[] buffer; 86 buffer = NULL; 87 } 88 89 void DeviceReadBuffer::Start(void) 90 { 91 lock.lock(); 92 bool was_running = running; 93 lock.unlock(); 94 if (was_running) 95 { 96 VERBOSE(VB_IMPORTANT, LOC_ERR + "Start(): Already running."); 97 SetRequestPause(false); 98 return; 99 } 100 101 pthread_create(&thread, NULL, boot_ringbuffer, this); 102 } 103 104 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd) 105 { 106 QMutexLocker locker(&lock); 107 108 videodevice = streamName; 109 _stream_fd = streamfd; 110 111 used = 0; 112 readPtr = buffer; 113 writePtr = buffer; 114 } 115 116 void DeviceReadBuffer::Stop(void) 117 { 118 lock.lock(); 119 bool was_running = running; 120 run = false; 121 lock.unlock(); 122 123 if (!was_running) 124 { 125 VERBOSE(VB_IMPORTANT, LOC_ERR + "Stop(): Not running."); 126 return; 127 } 128 129 pthread_join(thread, NULL); 130 } 131 132 void DeviceReadBuffer::SetRequestPause(bool req) 133 { 134 QMutexLocker locker(&lock); 135 request_pause = req; 136 } 137 138 void DeviceReadBuffer::SetPaused(bool val) 139 { 140 lock.lock(); 141 paused = val; 142 lock.unlock(); 143 if (val) 144 pauseWait.wakeAll(); 145 else 146 unpauseWait.wakeAll(); 147 } 148 149 bool DeviceReadBuffer::IsPaused(void) const 150 { 151 QMutexLocker locker(&lock); 152 return paused; 153 } 154 155 bool DeviceReadBuffer::WaitForUnpause(int timeout) 156 { 157 if (IsPaused()) 158 unpauseWait.wait(timeout); 159 return IsPaused(); 160 } 161 162 bool DeviceReadBuffer::IsPauseRequested(void) const 163 { 164 QMutexLocker locker(&lock); 165 return request_pause; 166 } 167 168 uint DeviceReadBuffer::GetUnused(void) const 169 { 170 QMutexLocker locker(&lock); 171 return size - used; 172 } 173 174 uint DeviceReadBuffer::GetUsed(void) const 175 { 176 QMutexLocker locker(&lock); 177 return used; 178 } 179 180 uint DeviceReadBuffer::GetContiguousUnused(void) const 181 { 182 QMutexLocker locker(&lock); 183 return endPtr - writePtr; 184 } 185 186 void DeviceReadBuffer::IncrWritePointer(uint len) 187 { 188 QMutexLocker locker(&lock); 189 used += len; 190 writePtr += len; 191 writePtr = (writePtr == endPtr) ? buffer : writePtr; 192 #ifdef REPORT_RING_STATS 193 max_used = max(used, max_used); 194 avg_used = ((avg_used * avg_cnt) + used) / ++avg_cnt; 195 #endif 196 } 197 198 void DeviceReadBuffer::IncrReadPointer(uint len) 199 { 200 QMutexLocker locker(&lock); 201 used -= len; 202 readPtr += len; 203 readPtr = (readPtr == endPtr) ? buffer : readPtr; 204 assert(readPtr <= endPtr); 205 } 206 207 void *DeviceReadBuffer::boot_ringbuffer(void *arg) 208 { 209 ((DeviceReadBuffer*) arg)->fill_ringbuffer(); 210 return NULL; 211 } 212 213 void DeviceReadBuffer::fill_ringbuffer(void) 214 { 215 uint errcnt = 0; 216 217 lock.lock(); 218 run = true; 219 running = true; 220 lock.unlock(); 221 222 while (run) 223 { 224 if (!HandlePausing()) 225 continue; 226 227 if (!IsOpen()) 228 { 229 usleep(10000); 230 continue; 231 } 232 233 if (using_poll && !Poll()) 234 continue; 235 236 // Limit read size for faster return from read 237 size_t read_size = min(dev_read_size, WaitForUnused(TSPacket::SIZE)); 238 239 // if read_size > 0 do the read... 240 if (read_size) 241 { 242 ssize_t len = read(_stream_fd, writePtr, read_size); 243 if (!CheckForErrors(len, errcnt)) 244 { 245 if (errcnt > 5) 246 break; 247 else 248 continue; 249 } 250 errcnt = 0; 251 IncrWritePointer(len); 252 } 253 } 254 255 lock.lock(); 256 running = false; 257 lock.unlock(); 258 } 259 260 bool DeviceReadBuffer::HandlePausing(void) 261 { 262 if (IsPauseRequested()) 263 { 264 SetPaused(true); 265 266 if (readerPausedCB) 267 readerPausedCB->ReaderPaused(_stream_fd); 268 269 usleep(5000); 270 return false; 271 } 272 else if (IsPaused()) 273 { 274 Reset(videodevice, _stream_fd); 275 SetPaused(false); 276 } 277 return true; 278 } 279 280 bool DeviceReadBuffer::Poll(void) const 281 { 282 bool retval = true; 283 while (true) 284 { 285 struct pollfd polls; 286 polls.fd = _stream_fd; 287 polls.events = POLLIN; 288 polls.revents = 0; 289 290 int ret = poll(&polls, 1 /*number of polls*/, 10 /*msec*/); 291 if (IsPauseRequested() || !IsOpen() || !run) 292 { 293 retval = false; 294 break; // are we supposed to pause, stop, etc. 295 } 296 297 if (ret > 0) 298 break; // we have data to read :) 299 if ((-1 == ret) && (EOVERFLOW == errno)) 300 break; // we have an error to handle 301 302 if ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno))) 303 continue; // errors that tell you to try again 304 if (ret == 0) 305 continue; // timed out, try again 306 307 usleep(2500); 308 } 309 return retval; 310 } 311 312 bool DeviceReadBuffer::CheckForErrors(ssize_t len, uint &errcnt) 313 { 314 if (len < 0) 315 { 316 if (EINTR == errno) 317 return false; 318 if (EAGAIN == errno) 319 { 320 usleep(2500); 321 return false; 322 } 323 if (EOVERFLOW == errno) 324 { 325 VERBOSE(VB_IMPORTANT, LOC_ERR + "Driver buffers overflowed"); 326 return false; 327 } 328 329 VERBOSE(VB_IMPORTANT, LOC_ERR + 330 QString("Problem reading fd(%1)").arg(_stream_fd) + ENO); 331 332 if (++errcnt > 5) 333 { 334 lock.lock(); 335 error = true; 336 lock.unlock(); 337 return false; 338 } 339 340 usleep(500); 341 return false; 342 } 343 else if (len == 0) 344 { 345 if (++errcnt > 5) 346 { 347 VERBOSE(VB_IMPORTANT, LOC + 348 QString("End-Of-File? fd(%1)").arg(_stream_fd)); 349 350 lock.lock(); 351 eof = true; 352 lock.unlock(); 353 354 return false; 355 } 356 usleep(500); 357 return false; 358 } 359 return true; 360 } 361 362 /** \fn DeviceReadBuffer::Read(unsigned char*, uint) 363 * \brief Try to Read count bytes from into buffer 364 * \param buffer Buffer to put data in 365 * \param count Number of bytes to attempt to read 366 * \return number of bytes actually read 367 */ 368 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count) 369 { 370 uint avail = WaitForUsed(min(count, min_read)); 371 size_t cnt = min(count, avail); 372 373 if (!cnt) 374 return 0; 375 376 if (readPtr + cnt > endPtr) 377 { 378 // Process as two pieces 379 size_t len = endPtr - readPtr; 380 if (len) 381 { 382 memcpy(buf, readPtr, len); 383 buf += len; 384 IncrReadPointer(len); 385 } 386 if (cnt > len) 387 { 388 len = cnt - len; 389 memcpy(buf, readPtr, len); 390 IncrReadPointer(len); 391 } 392 } 393 else 394 { 395 memcpy(buf, readPtr, cnt); 396 IncrReadPointer(cnt); 397 } 398 399 ReportStats(); 400 401 return cnt; 402 } 403 404 /// \return bytes available for writing 405 uint DeviceReadBuffer::WaitForUnused(uint needed) const 406 { 407 size_t unused = GetUnused(); 408 size_t contig = GetContiguousUnused(); 409 410 if (contig > TSPacket::SIZE) 411 { 412 while (unused < needed) 413 { 414 unused = GetUnused(); 415 if (IsPauseRequested() || !IsOpen() || !run) 416 return 0; 417 usleep(5000); 418 } 419 if (IsPauseRequested() || !IsOpen() || !run) 420 return 0; 421 contig = GetContiguousUnused(); 422 } 423 424 return min(contig, unused); 425 } 426 427 /// \return bytes available for reading 428 uint DeviceReadBuffer::WaitForUsed(uint needed) const 429 { 430 size_t avail = GetUsed(); 431 while (needed > avail) 432 { 433 { 434 QMutexLocker locker(&lock); 435 avail = used; 436 if (request_pause || error || eof) 437 return 0; 438 } 439 usleep(5000); 440 } 441 return avail; 442 } 443 444 void DeviceReadBuffer::ReportStats(void) 445 { 446 #ifdef REPORT_RING_STATS 447 if (lastReport.elapsed() > 20*1000 /* msg every 20 seconds */) 448 { 449 QMutexLocker locker(&lock); 450 double rsize = 100.0 / size; 451 QString msg = QString("fill avg(%1%) ").arg(avg_used*rsize,3,'f',0); 452 msg += QString("fill max(%2%) ").arg(max_used*rsize,3,'f',0); 453 msg += QString("samples(%3)").arg(avg_cnt); 454 455 avg_used = 0; 456 avg_cnt = 0; 457 max_used = 0; 458 lastReport.start(); 459 460 VERBOSE(VB_IMPORTANT, LOC + msg); 461 } 462 #endif 463 }