Ticket #712: drb-v1.patch

File drb-v1.patch, 92.1 KB (added by danielk, 20 years ago)

Implements device reading ringbuffer, not quite done but usable.

  • libs/libmythtv/dvbrecorder.cpp

     
    7272const int DVBRecorder::POLL_INTERVAL        =  50; // msec
    7373const int DVBRecorder::POLL_WARNING_TIMEOUT = 500; // msec
    7474
     75#define USE_DRB
     76#ifndef USE_DRB
     77static ssize_t safe_read(int fd, unsigned char *buf, size_t bufsize);
     78#endif // USE_DRB
     79
    7580#define LOC      QString("DVBRec(%1): ").arg(_card_number_option)
    7681#define LOC_WARN QString("DVBRec(%1) Warning: ").arg(_card_number_option)
    7782#define LOC_ERR  QString("DVBRec(%1) Error: ").arg(_card_number_option)
    7883
    7984DVBRecorder::DVBRecorder(TVRec *rec, DVBChannel* advbchannel)
    8085    : DTVRecorder(rec, "DVBRecorder"),
     86      _drb(NULL),
    8187      // Options set in SetOption()
    8288      _card_number_option(0), _record_transport_stream_option(false),
    8389      _hw_decoder_option(false),
    8490      // DVB stuff
    8591      dvbchannel(advbchannel),
     92      _reset_pid_filters(true),
     93      _pid_lock(true),
    8694      // PS recorder stuff
    8795      _ps_rec_audio_id(0xC0), _ps_rec_video_id(0xE0),
    8896      // Output stream info
    8997      _pat(NULL), _pmt(NULL), _next_pmt_version(0),
    9098      _ts_packets_until_psip_sync(0),
    91       // Input Misc
    92       _reset_pid_filters(true),
    93       // Locking
    94       _pid_lock(true),
    9599      // Statistics
    96100      _continuity_error_count(0), _stream_overflow_count(0),
    97101      _bad_packet_count(0)
    98102{
    99103    bzero(_ps_rec_buf, sizeof(unsigned char) * 3);
    100104
    101     bzero(&_polls, sizeof(struct pollfd));
    102     _polls.fd = _stream_fd;
     105#ifdef USE_DRB
     106    _drb = new DeviceReadBuffer(this);
     107    _buffer_size = (1024*1024 / TSPacket::SIZE) * TSPacket::SIZE;
     108#else
     109    _buffer_size = (4*1024*1024 / TSPacket::SIZE) * TSPacket::SIZE;
     110#endif
    103111
    104     _buffer_size = (4*1024*1024 / MPEG_TS_PKT_SIZE) * MPEG_TS_PKT_SIZE;
    105112    _buffer = new unsigned char[_buffer_size];
    106113    bzero(_buffer, _buffer_size);
    107114}
     
    113120
    114121void DVBRecorder::TeardownAll(void)
    115122{
    116     if (_stream_fd >= 0)
     123    // Make SURE that the device read thread is cleaned up -- John Poet
     124    StopRecording();
     125
     126    if (IsOpen())
    117127        Close();
    118128
    119129    if (_buffer)
     
    177187
    178188bool DVBRecorder::Open(void)
    179189{
    180     if (_stream_fd >= 0)
     190    if (IsOpen())
    181191    {
    182192        VERBOSE(VB_GENERAL, LOC_WARN + "Card already open");
    183193        return true;
     
    185195
    186196    _stream_fd = open(dvbdevice(DVB_DEV_DVR,_card_number_option),
    187197                      O_RDONLY | O_NONBLOCK);
    188     if (_stream_fd < 0)
     198    if (!IsOpen())
    189199    {
    190200        VERBOSE(VB_IMPORTANT, LOC_ERR + "Failed to open DVB device" + ENO);
    191201        return false;
    192202    }
     203#ifdef USE_DRB
     204    if (_drb)
     205        _drb->Reset(videodevice, _stream_fd);
     206#endif
    193207
    194     _polls.fd = _stream_fd;
    195     _polls.events = POLLIN;
    196     _polls.revents = 0;
    197 
    198208    connect(dvbchannel, SIGNAL(UpdatePMTObject(const PMTObject *)),
    199209            this, SLOT(SetPMTObject(const PMTObject *)));
    200210
    201     VERBOSE(VB_RECORD, LOC +
    202             QString("Card opened successfully (using %1 mode).")
     211    VERBOSE(VB_RECORD, LOC + QString("Card opened successfully fd(%1)")
     212            .arg(_stream_fd) + QString(" (using %2 mode).")
    203213            .arg(_record_transport_stream_option ? "TS" : "PS"));
    204214
    205215    dvbchannel->RecorderStarted();
     
    211221{
    212222    VERBOSE(VB_RECORD, LOC + "Close() fd("<<_stream_fd<<") -- begin");
    213223
    214     if (_stream_fd < 0)
    215         return;
    216 
    217     CloseFilters();
    218 
    219     if (_stream_fd >= 0)
     224    if (IsOpen())
     225    {
     226        CloseFilters();
    220227        close(_stream_fd);
     228        _stream_fd = -1;
     229    }
    221230
    222     _stream_fd = -1;
    223     _polls.fd = -1;
    224 
    225231    VERBOSE(VB_RECORD, LOC + "Close() fd("<<_stream_fd<<") -- end");
    226232}
    227233
    228234void DVBRecorder::CloseFilters(void)
    229235{
    230236    QMutexLocker change_lock(&_pid_lock);
    231     for(unsigned int i=0; i<_pid_filters.size(); i++)
    232         if (_pid_filters[i] >= 0)
    233             close(_pid_filters[i]);
    234     _pid_filters.clear();
    235237
    236     pid_ipack_t::iterator iter = _ps_rec_pid_ipack.begin();
    237     for (;iter != _ps_rec_pid_ipack.end(); iter++)
     238    PIDInfoMap::iterator it;
     239    for (it = _pid_infos.begin(); it != _pid_infos.end(); ++it)
    238240    {
    239         if ((*iter).second != NULL)
    240         {
    241             free_ipack((*iter).second);
    242             free((void*)(*iter).second);
    243         }
     241        (*it)->Close();
     242        delete *it;
    244243    }
    245     _ps_rec_pid_ipack.clear();
     244    _pid_infos.clear();
    246245}
    247246
    248247void DVBRecorder::OpenFilter(uint           pid,
     
    284283        usecs /= 2;
    285284    }
    286285    VERBOSE(VB_RECORD, LOC + "Set demux buffer size for " +
    287             QString("pid 0x%1 to %2,\n\t\t\t which gives us a %3 msec buffer.")
     286            QString("pid 0x%1 to %2,\n\t\t\twhich gives us a %3 msec buffer.")
    288287            .arg(pid,0,16).arg(sz).arg(usecs/1000));
    289288
    290289    // Set the filter type
     
    303302        return;
    304303    }
    305304
     305    PIDInfo *info = new PIDInfo();
     306    // Set isVideo based on stream type
     307    info->isVideo = StreamID::IsVideo(stream_type);
    306308    // Add the file descriptor to the filter list
    307     QMutexLocker change_lock(&_pid_lock);
    308     _pid_filters.push_back(fd_tmp);
    309 
    310     // Initialize continuity count
    311     _continuity_count[pid] = -1;
    312     if (_record_transport_stream_option)
    313     {
    314         //Set the TS->PES converter to NULL
    315         _ps_rec_pid_ipack[pid] = NULL;
    316         return;
    317     }
    318 
     309    info->filter_fd = fd_tmp;
    319310    // If we are in legacy PES mode, initialize TS->PES converter
    320     ipack* ip = (ipack*)malloc(sizeof(ipack));
    321     assert(ip);
    322     switch (type)
    323     {
    324         case ES_TYPE_VIDEO_MPEG1:
    325         case ES_TYPE_VIDEO_MPEG2:
    326             init_ipack(ip, 2048, ProcessDataPS);
    327             ip->replaceid = _ps_rec_video_id++;
    328             break;
     311    if (!_record_transport_stream_option)
     312        info->ip = CreateIPack(type);
    329313
    330         case ES_TYPE_AUDIO_MPEG1:
    331         case ES_TYPE_AUDIO_MPEG2:
    332             init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */
    333             ip->replaceid = _ps_rec_audio_id++;
    334             break;
    335 
    336         case ES_TYPE_AUDIO_AC3:
    337             init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */
    338             ip->priv_type = PRIV_TS_AC3;
    339             break;
    340 
    341         case ES_TYPE_SUBTITLE:
    342         case ES_TYPE_TELETEXT:
    343             init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */
    344             ip->priv_type = PRIV_DVB_SUB;
    345             break;
    346 
    347         default:
    348             init_ipack(ip, 2048, ProcessDataPS);
    349             break;
    350     }
    351     ip->data = (void*)this;
    352     _ps_rec_pid_ipack[pid] = ip;
     314    // Add the new info to the map
     315    QMutexLocker change_lock(&_pid_lock);   
     316    _pid_infos[pid] = info;
    353317}
    354318
    355 bool DVBRecorder::SetDemuxFilters(void)
     319bool DVBRecorder::OpenFilters(void)
    356320{
    357     QMutexLocker change_lock(&_pid_lock);
    358321    CloseFilters();
    359322
    360     _continuity_count.clear();
    361     _encrypted_pid.clear();
    362     _payload_start_seen.clear();
    363     data_found = false;
     323    QMutexLocker change_lock(&_pid_lock);
     324
    364325    _wait_for_keyframe = _wait_for_keyframe_option;
    365     keyframe_found = false;
    366326
    367327    _ps_rec_audio_id = 0xC0;
    368328    _ps_rec_video_id = 0xE0;
     
    427387                    (*es).Orig_Type);
    428388
    429389
    430     if (_pid_filters.size() == 0 && _ps_rec_pid_ipack.size() == 0)
     390    if (_pid_infos.empty())
    431391    {       
    432392        VERBOSE(VB_GENERAL, LOC_WARN +
    433393                "Recording will not commence until a PID is set.");
     
    436396    return true;
    437397}
    438398
    439 /*
    440  *  Process PMT and decide which components should be recorded
     399/** \fn DVBRecorder::AutoPID(void)
     400 *  \brief Process PMT and decide which components should be recorded
     401 *
     402 *  This is particularly for the hardware decoders which don't like
     403 *  to see more than one audio or video stream.
    441404 */
    442405void DVBRecorder::AutoPID(void)
    443406{
    444407    QMutexLocker change_lock(&_pid_lock);
    445     _videoPID.clear();
    446408
    447409    VERBOSE(VB_RECORD, LOC +
    448410            QString("AutoPID for MPEG Program Number(%1), PCR PID(0x%2)")
    449411            .arg(_input_pmt.ServiceID).arg(((uint)_input_pmt.PCRPID),0,16));
    450412
    451     // Wanted languages:
    452     //QStringList Languages = iso639_get_language_list();
    453 
    454413    // Wanted stream types:
    455414    QValueList<ES_Type> StreamTypes;
    456415    StreamTypes += ES_TYPE_VIDEO_MPEG1;
     
    467426        StreamTypes += ES_TYPE_SUBTITLE;
    468427    }
    469428
    470     QMap<ES_Type, bool> flagged;
     429    uint has_video = 0;
     430    uint has_audio = 0;
     431
    471432    QValueList<ElementaryPIDObject>::Iterator es;
    472433    for (es = _input_pmt.Components.begin();
    473434         es != _input_pmt.Components.end(); ++es)
    474435    {
     436        // Check if this is a stream we know about
    475437        if (!StreamTypes.contains((*es).Type))
    476         {
    477             // Type not wanted
    478438            continue;
    479         }
    480439
    481         if (((*es).Type == ES_TYPE_AUDIO_MPEG1) ||
    482             ((*es).Type == ES_TYPE_AUDIO_MPEG2) ||
    483             ((*es).Type == ES_TYPE_AUDIO_AC3))
    484         {
    485             bool ignore = false;
    486 
    487             // Check for audio descriptors
    488             DescriptorList::Iterator dit;
    489             for (dit = (*es).Descriptors.begin();
    490                  dit != (*es).Descriptors.end(); ++dit)
    491             {
    492                 // Check for "Hearing impaired" or
    493                 // "Visual impaired commentary" stream
    494                 if (((*dit).Data[0] == 0x0A) &&
    495                     ((*dit).Data[5] & 0xFE == 0x02))
    496                 {
    497                     ignore = true;
    498                     break;
    499                 }
    500             }
    501 
    502             if (ignore)
    503                 continue; // Ignore this stream
    504         }
    505 
    506440        if (_hw_decoder_option)
    507441        {
    508             // Limit hardware decoders to one A/V stream
    509             switch ((*es).Type)
    510             {
    511                 case ES_TYPE_VIDEO_MPEG1:
    512                 case ES_TYPE_VIDEO_MPEG2:
    513                     if (flagged.contains(ES_TYPE_VIDEO_MPEG1) ||
    514                         flagged.contains(ES_TYPE_VIDEO_MPEG2))
    515                     {
    516                         continue; // Ignore this stream
    517                     }
    518                     break;
     442            has_video += StreamID::IsVideo((*es).Orig_Type) ? 1 : 0;
     443            has_audio += StreamID::IsAudio((*es).Orig_Type) ? 1 : 0;
    519444
    520                 case ES_TYPE_AUDIO_MPEG1:
    521                 case ES_TYPE_AUDIO_MPEG2:
    522                     if (flagged.contains(ES_TYPE_AUDIO_MPEG1) ||
    523                         flagged.contains(ES_TYPE_AUDIO_MPEG2))
    524                     {
    525                         continue; // Ignore this stream
    526                     }
    527                     break;
    528 
    529                 default:
    530                     break;
    531             }
     445            // Limit hardware decoders streams to one video stream
     446            if (StreamID::IsVideo((*es).Orig_Type) && has_video > 1)
     447                continue;
     448            // Limit hardware decoders streams to one audio stream
     449            if (StreamID::IsAudio((*es).Orig_Type) && has_audio > 1)
     450                continue;
    532451        }
    533452
    534         //if (Languages.isEmpty() // No specific language wanted
    535         //    || (*es).Language.isEmpty() // Component has no language
    536         //    || Languages.contains((*es).Language)) // This language is wanted!
    537         {
    538             (*es).Record = true;
    539             flagged[(*es).Type] = true;
    540         }
     453        (*es).Record = true;
    541454    }
    542455
     456    if (!(print_verbose_messages|VB_RECORD))
     457        return;
     458
     459    // print out some debugging info
    543460    for (es = _input_pmt.Components.begin();
    544461         es != _input_pmt.Components.end(); ++es)
    545462    {
    546         if (StreamTypes.contains((*es).Type) && !flagged.contains((*es).Type))
    547         {
    548             // We want this stream type but no component was flagged
    549             (*es).Record = true;
    550         }
    551 
    552         if ((*es).Record)
    553         {
    554             VERBOSE(VB_RECORD, LOC +
    555                     QString("AutoPID selecting PID 0x%1, %2")
    556                     .arg((*es).PID,0,16).arg((*es).Description));
    557 
    558             switch ((*es).Type)
    559             {
    560                 case ES_TYPE_VIDEO_MPEG1:
    561                 case ES_TYPE_VIDEO_MPEG2:
    562                     _videoPID[(*es).PID] = true;
    563                     break;
    564 
    565                 default:
    566                     // Do nothing
    567                     break;
    568             }
    569         }
    570         else
    571         {
    572             VERBOSE(VB_RECORD, LOC +
    573                     QString("AutoPID skipping PID 0x%1, %2")
    574                     .arg((*es).PID,0,16).arg((*es).Description));
    575         }
     463        QString msg = ((*es).Record) ? "recording" : "skipping";
     464        VERBOSE(VB_RECORD, LOC +
     465                QString("AutoPID %1 PID 0x%2, %3")
     466                .arg(msg).arg((*es).PID,0,16).arg((*es).Description));
    576467    }
    577468
    578469    VERBOSE(VB_RECORD, LOC + "AutoPID Complete - PAT/PMT Loaded for service");
    579470
    580471    QString msg = (_input_pmt.FTA()) ? "unencrypted" : "ENCRYPTED";
    581     VERBOSE(VB_RECORD, LOC + "A/V Stream is " + msg);
     472    VERBOSE(VB_RECORD, LOC + "AutoPID A/V Stream is " + msg);
    582473}
    583474
    584475void DVBRecorder::StartRecording(void)
     
    600491    SetPMT(NULL);
    601492    _ts_packets_until_psip_sync = 0;
    602493
    603     MythTimer t;
    604     t.start();
     494#ifdef USE_DRB
     495    bool ok = _drb->Setup(videodevice, _stream_fd);
     496    if (!ok)
     497    {
     498        VERBOSE(VB_IMPORTANT, LOC_ERR + "Failed to allocate DRB buffer");
     499        Close();
     500        _error = true;
     501        return;
     502    }
     503    _drb->Start();
     504#else
     505    _poll_timer.start();
     506#endif // USE_DRB
     507
    605508    while (_request_recording && !_error)
    606509    {
    607510        if (PauseAndWait())
    608511            continue;
    609512
    610         if (_stream_fd < 0)
     513        if (!IsOpen())
    611514        {
    612             usleep(50);
     515            usleep(2000);
    613516            continue;
    614517        }
    615518
    616519        if (_reset_pid_filters)
    617520        {
     521            _reset_pid_filters = false;
    618522            VERBOSE(VB_RECORD, LOC + "Resetting Demux Filters");
    619             if (SetDemuxFilters())
     523            if (OpenFilters())
    620524            {
    621525                CreatePAT();
    622526                CreatePMT();
    623527            }
    624             _reset_pid_filters = false;
    625528        }
    626529
    627         int ret;
    628         do
    629             ret = poll(&_polls, 1, POLL_INTERVAL);
    630         while (!request_pause && (_stream_fd >= 0) &&
    631                ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno))));
     530        if (Poll())
     531        {
     532#ifndef USE_DRB
     533            ssize_t len = safe_read(_stream_fd, _buffer, _buffer_size);
     534#else
     535            ssize_t len = _drb->Read(_buffer, _buffer_size);
     536#endif // !USE_DRB
     537            if (len > 0)
     538                ProcessDataTS(_buffer, len);
     539        }
    632540
    633         if (request_pause || _stream_fd < 0)
    634             continue;
    635 
    636         if (ret == 0 && t.elapsed() > POLL_WARNING_TIMEOUT)
     541        // Check for DRB errors
     542        if (_drb->IsErrored())
    637543        {
    638             VERBOSE(VB_GENERAL, LOC_WARN +
    639                     QString("No data from card in %1 milliseconds.")
    640                     .arg(t.elapsed()));
     544            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device error detected");
     545            _error = true;
    641546        }
    642         else if (ret == 1 && _polls.revents & POLLIN)
     547
     548        if (_drb->IsEOF())
    643549        {
    644             int msec = t.restart();
    645             if (msec >= POLL_WARNING_TIMEOUT)
    646             {
    647                 VERBOSE(VB_IMPORTANT, LOC_WARN +
    648                         QString("Got data from card after %1 ms. (>%2)")
    649                         .arg(msec).arg(POLL_WARNING_TIMEOUT));
    650             }
    651 
    652             ReadFromDMX();
    653             if (t.elapsed() >= 20)
    654             {
    655                 VERBOSE(VB_RECORD, LOC_WARN +
    656                         QString("ReadFromDMX took %1 ms").arg(t.elapsed()));
    657             }
     550            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device EOF detected");
     551            _error = true;
    658552        }
    659         else if ((ret < 0) || (ret == 1 && _polls.revents & POLLERR))
    660             VERBOSE(VB_IMPORTANT, LOC_ERR +
    661                     "Poll failed while waiting for data." + ENO);
    662553    }
    663554
     555#ifdef USE_DRB
     556    if (_drb)
     557        _drb->Stop();
     558#endif       
     559
    664560    Close();
    665561
    666562    FinishRecording();
     
    668564    _recording = false;
    669565}
    670566
    671 void DVBRecorder::ReadFromDMX(void)
     567void DVBRecorder::StopRecording(void)
    672568{
    673     int cardnum = _card_number_option;
    674     int readsz = 1;
    675     unsigned char *pktbuf;
     569#ifdef USE_DRB
     570    TVRec *rec = tvrec;
     571    tvrec = NULL; // don't notify of pause..
    676572
    677     while (readsz > 0)
     573    bool ok = true;
     574    if (!IsPaused())
    678575    {
    679         readsz = read(_stream_fd, _buffer, _buffer_size);
    680         if (readsz < 0)
    681         {
    682             if (errno == EOVERFLOW)
    683             {
    684                 ++_stream_overflow_count;
    685                 VERBOSE(VB_RECORD, LOC +
    686                         "DVB Buffer overflow error detected on read");
    687                 break;
    688             }
     576        Pause();
     577        ok = WaitForPause(250);
     578    }
    689579
    690             if (errno == EAGAIN)
    691                 break;
    692             VERBOSE(VB_IMPORTANT, LOC_ERR +
    693                     "Error reading from DVB device." + ENO);
    694             break;
    695         } else if (readsz == 0)
    696             break;
     580    _request_recording = false;
    697581
    698         if (readsz % MPEG_TS_PKT_SIZE)
    699         {
    700             VERBOSE(VB_IMPORTANT, LOC_ERR + "Incomplete packet received.");
    701             readsz = readsz - (readsz % MPEG_TS_PKT_SIZE);
    702         }
     582    _drb->Stop();
    703583
    704         int pkts = readsz / MPEG_TS_PKT_SIZE;
    705         int curpkt = 0;
     584    if (ok)
     585        _drb->Teardown();
     586    else // Better to have a memory leak, than a segfault? -- John Poet
     587        VERBOSE(VB_IMPORTANT, LOC_ERR + "DeviceReadBuffer not cleaned up!");
    706588
    707         _pid_lock.lock();
    708         while (curpkt < pkts)
    709         {
    710             if (data_found == false)
    711             {
    712                 GENERAL("Data read from DMX - "
    713                         "This is for debugging with transform.c");
    714                 data_found = true;
    715             }
     589    while (_recording)
     590        usleep(2000);
    716591
    717             pktbuf = _buffer + (curpkt * MPEG_TS_PKT_SIZE);
    718             curpkt++;
     592    tvrec = rec;
     593#else
     594    DTVRecorder::StopRecording();
     595#endif
     596}
    719597
    720             int pes_offset = 0;
    721             int pid = ((pktbuf[1]&0x1f) << 8) | pktbuf[2];
    722             uint8_t scrambling = (pktbuf[3] >> 6) & 0x03;
    723             uint8_t cc = pktbuf[3] & 0xf;
    724             uint8_t content = (pktbuf[3] & 0x30) >> 4;
     598void DVBRecorder::ReaderPaused(int /*fd*/)
     599{
     600#ifdef USE_DRB
     601    pauseWait.wakeAll();
     602    if (tvrec)
     603        tvrec->RecorderPaused();
     604#endif // USE_DRB
     605}
    725606
    726             if (pktbuf[1] & 0x80)
    727             {
    728                 VERBOSE(VB_RECORD, LOC +
    729                         "Packet dropped due to uncorrectable error.");
    730                 ++_bad_packet_count;
    731                 continue;
    732             }
     607bool DVBRecorder::PauseAndWait(int timeout)
     608{
     609#ifdef USE_DRB
     610    if (request_pause)
     611    {
     612        paused = true;
     613        if (!_drb->IsPaused())
     614            _drb->SetRequestPause(true);
    733615
    734             if (scrambling)
    735             {
    736                 if (!_encrypted_pid[pid])
    737                 {
    738                     VERBOSE(VB_RECORD, LOC +
    739                             QString("PID 0x%1 is encrypted, ignoring")
    740                             .arg(pid,0,16));
    741                     _encrypted_pid[pid] = true;
    742                 }
    743                 continue; // Drop encrypted TS packet
    744             }
     616        unpauseWait.wait(timeout);
     617    }
     618    else if (_drb->IsPaused())
     619    {
     620        _drb->SetRequestPause(false);
     621        _drb->WaitForUnpause(timeout);
     622        paused = _drb->IsPaused();
     623    }
     624    else
     625    {
     626        paused = false;
     627    }
     628    return paused;
     629#else // if !USE_DRB
     630    return RecorderBase::PauseAndWait(timeout);
     631#endif // !USE_DRB
     632}
    745633
    746             if (_encrypted_pid[pid])
    747             {
    748                 VERBOSE(VB_RECORD, LOC +
    749                         QString("PID 0x%1 is no longer encrypted")
    750                         .arg(pid,0,16));
    751                 _encrypted_pid[pid] = false;
    752             }
    753             if (content & 0x1)
    754             {
    755                  if (_continuity_count[pid] < 0)
    756                      _continuity_count[pid] = cc;
    757                  else
    758                  {
    759                      _continuity_count[pid] = (_continuity_count[pid]+1) & 0xf;
    760                      if (_continuity_count[pid] != cc)
    761                      {
    762                          VERBOSE(VB_RECORD, LOC +
    763                                  QString("PID 0x%1 _continuity_count %2 cc %3")
    764                                  .arg(pid,0,16).arg(_continuity_count[pid])
    765                                  .arg(cc));
    766                          _continuity_count[pid] = cc;
    767                          ++_continuity_error_count;
    768                      }
    769                  }
    770             }
     634uint DVBRecorder::ProcessDataTS(unsigned char *buffer, uint len)
     635{
     636    if (len % TSPacket::SIZE)
     637    {
     638        VERBOSE(VB_IMPORTANT, LOC_ERR + "Incomplete packet received.");
     639        len = len - (len % TSPacket::SIZE);
     640    }
     641    if (len < TSPacket::SIZE)
     642        return len;
    771643
    772             if (_record_transport_stream_option)
    773             {   // handle TS recording
    774                 MythTimer t;
    775                 t.start();
    776                 if (_videoPID[pid])
    777                 {
    778                     // Check for keyframe
    779                     const TSPacket *pkt =
    780                         reinterpret_cast<const TSPacket*>(pktbuf);
    781                     FindKeyframes(pkt);
    782                 }
    783                 if (t.elapsed() > 10)
    784                 {
    785                     VERBOSE(VB_RECORD, LOC_WARN +
    786                             QString("Find keyframes took %1 ms!")
    787                             .arg(t.elapsed()));
    788                 }
     644    uint pos = 0;
     645    uint end = len - TSPacket::SIZE;
     646    while (pos <= end)
     647    {
     648        const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[pos]);
     649        ProcessTSPacket(*pkt);
     650        pos += TSPacket::SIZE;
     651    }
     652    return len - pos;
     653}
    789654
    790                 // Sync recording start to first keyframe
    791                 if (_wait_for_keyframe && !_keyframe_seen)
    792                     continue;
    793                 if (!keyframe_found)
    794                 {
    795                     keyframe_found = true;
    796                     VERBOSE(VB_RECORD, QString("Found first keyframe"));
    797                 }
     655bool DVBRecorder::ProcessTSPacket(const TSPacket& tspacket)
     656{
     657    if (tspacket.TransportError())
     658    {
     659        VERBOSE(VB_RECORD, LOC + "Packet dropped due to uncorrectable error.");
     660        ++_bad_packet_count;
     661        return false; // Drop bad TS packets...
     662    }
    798663
    799                 // Sync streams to the first Payload Unit Start Indicator
    800                 // _after_ first keyframe iff _wait_for_keyframe is true
    801                 if (!_payload_start_seen[pid])
    802                 {
    803                     if ((pktbuf[1] & 0x40) == 0)
    804                         continue; // not payload start - drop packet
     664    const uint pid = tspacket.PID();
    805665
    806                     VERBOSE(VB_RECORD, QString("Found Payload Start for PID %1").arg(pid));
    807                     _payload_start_seen[pid] = true;
    808                 }
     666    QMutexLocker locker(&_pid_lock);
     667    PIDInfo *info = _pid_infos[pid];
     668    if (!info)
     669       info = _pid_infos[pid] = new PIDInfo();
    809670
    810                 t.start();
    811                 LocalProcessDataTS(pktbuf, MPEG_TS_PKT_SIZE);
    812                 if (t.elapsed() > 10)
    813                 {
    814                     VERBOSE(VB_RECORD, LOC_WARN +
    815                             QString("TS packet write took %1 ms!")
    816                             .arg(t.elapsed()));
    817                 }
    818             }
    819             else
    820             {   // handle PS recording
    821                 ipack *ip = _ps_rec_pid_ipack[pid];
    822                 if (ip == NULL)
    823                     continue;
    824 
    825                 ip->ps = 1;
    826 
    827                 if ((pktbuf[1] & 0x40) && (ip->plength == MMAX_PLENGTH-6))
    828                 {
    829                     ip->plength = ip->found-6;
    830                     ip->found = 0;
    831                     send_ipack(ip);
    832                     reset_ipack(ip);
    833                 }
    834 
    835                 if (content & 0x2)
    836                     pes_offset = pktbuf[4] + 1;
    837 
    838                 if (pes_offset > 183)
    839                     continue;
    840 
    841                 instant_repack(pktbuf + 4 + pes_offset,
    842                                MPEG_TS_PKT_SIZE - 4 - pes_offset, ip);
    843             }
     671    // track scrambled pids
     672    if (tspacket.ScramplingControl())
     673    {
     674        if (!info->isEncrypted)
     675        {
     676            VERBOSE(VB_RECORD, LOC +
     677                    QString("PID 0x%1 is encrypted, ignoring").arg(pid,0,16));
     678            info->isEncrypted = true;
    844679        }
    845         _pid_lock.unlock();
     680        return true; // Drop encrypted TS packets...
    846681    }
    847 }
     682    else if (info->isEncrypted)
     683    {
     684        VERBOSE(VB_RECORD, LOC +
     685                QString("PID 0x%1 is no longer encrypted").arg(pid,0,16));
     686        info->isEncrypted = false;
     687    }
    848688
    849 #define SEQ_START     0x000001B3
    850 #define GOP_START     0x000001B8
    851 #define PICTURE_START 0x00000100
    852 #define SLICE_MIN     0x00000101
    853 #define SLICE_MAX     0x000001af
    854 
    855 void DVBRecorder::ProcessDataPS(unsigned char *buffer, int len, void *priv)
    856 {
    857     ((DVBRecorder*)priv)->LocalProcessDataPS(buffer, len);
    858 }
    859 
    860 void DVBRecorder::LocalProcessDataPS(unsigned char *buffer, int len)
    861 {
    862     if (buffer[0] != 0x00 || buffer[1] != 0x00 || buffer[2] != 0x01)
     689    // Check continuity counter
     690    if (tspacket.HasPayload())
    863691    {
    864         if (!_wait_for_keyframe)
    865             ringBuffer->Write(buffer, len);
    866         return;
     692        if (!info->CheckCC(tspacket.ContinuityCounter()))
     693        {
     694            VERBOSE(VB_RECORD, LOC +
     695                    QString("PID 0x%1 discontinuity detected").arg(pid,0,16));
     696            _continuity_error_count++;
     697        }
    867698    }
    868699
    869     if (buffer[3] >= VIDEO_STREAM_S && buffer[3] <= VIDEO_STREAM_E)
     700    if (_record_transport_stream_option)
    870701    {
    871         int pos = 8 + buffer[8];
    872         int datalen = len - pos;
     702        // handle TS recording
    873703
    874         unsigned char *bufptr = &buffer[pos];
    875         uint state = 0xFFFFFFFF;
    876         uint state_byte = 0;
    877         int prvcount = -1;
     704        // Check for keyframes and count frames
     705        if (info->isVideo)
     706            FindKeyframes(&tspacket);
    878707
    879         while (bufptr < &buffer[pos] + datalen)
     708        // Sync recording start to first keyframe
     709        if (_wait_for_keyframe_option && !_keyframe_seen)
     710            return true;
     711
     712        // Sync streams to the first Payload Unit Start Indicator
     713        // _after_ first keyframe iff _wait_for_keyframe_option is true
     714        if (!info->payloadStartSeen)
    880715        {
    881             if (++prvcount < 3)
    882                 state_byte = _ps_rec_buf[prvcount];
    883             else
    884                 state_byte = *bufptr++;
     716            if (!tspacket.PayloadStart())
     717                return true; // not payload start - drop packet
    885718
    886             if (state != 0x000001)
    887             {
    888                 state = ((state << 8) | state_byte) & 0xFFFFFF;
    889                 continue;
    890             }
     719            VERBOSE(VB_RECORD,
     720                    QString("PID 0x%1 Found Payload Start").arg(pid,0,16));
     721            info->payloadStartSeen = true;
     722        }
    891723
    892             state = ((state << 8) | state_byte) & 0xFFFFFF;
    893             if (state >= SLICE_MIN && state <= SLICE_MAX)
    894                 continue;
     724        // Write PAT & PMT tables occasionally
     725        WritePATPMT();
    895726
    896             if (state == SEQ_START)
    897                 _wait_for_keyframe = false;
     727        // Write Data
     728        ringBuffer->Write(tspacket.data(), TSPacket::SIZE);
     729    }
     730    else
     731    {
     732        // handle PS recording
    898733
    899             if (GOP_START == state)
    900             {
    901                 long long startpos = ringBuffer->GetWritePosition();
    902                        
    903                 if (!_position_map.contains(_frames_written_count))
    904                 {
    905                     _position_map_delta[_frames_written_count] = startpos;
    906                     _position_map[_frames_written_count] = startpos;
     734        ipack *ip = info->ip;
     735        if (ip == NULL)
     736            return true;
    907737
    908                     if (curRecording &&
    909                         ((_position_map_delta.size() % 30) == 0))
    910                     {
    911                         curRecording->SetPositionMapDelta(
    912                             _position_map_delta,
    913                             MARK_GOP_BYFRAME);
    914                         curRecording->SetFilesize(startpos);
    915                         _position_map_delta.clear();
    916                     }
    917                 }
    918             }
    919             else if (PICTURE_START == state)
    920                 _frames_written_count++;
     738        ip->ps = 1;
     739
     740        if (tspacket.PayloadStart() && (ip->plength == MMAX_PLENGTH-6))
     741        {
     742            ip->plength = ip->found-6;
     743            ip->found = 0;
     744            send_ipack(ip);
     745            reset_ipack(ip);
    921746        }
     747
     748        uint afc_offset = tspacket.AFCOffset();
     749        if (afc_offset > 187)
     750            return true;
     751
     752        instant_repack((uint8_t*)tspacket.data() + afc_offset,
     753                       TSPacket::SIZE - afc_offset, ip);
    922754    }
    923     memcpy(_ps_rec_buf, &buffer[len - 3], 3);
    924 
    925     if (!_wait_for_keyframe)
    926         ringBuffer->Write(buffer, len);
     755    return true;
    927756}
    928757
    929 void DVBRecorder::LocalProcessDataTS(unsigned char *buffer, int len)
     758void DVBRecorder::WritePATPMT(void)
    930759{
    931     QMutexLocker read_lock(&_pid_lock);
    932760    if (_ts_packets_until_psip_sync == 0)
    933761    {
     762        QMutexLocker read_lock(&_pid_lock);
    934763        if (_pat && _pmt)
    935764        {
    936765            ringBuffer->Write(_pat->tsheader()->data(), TSPacket::SIZE);
     
    940769    }
    941770    else
    942771        _ts_packets_until_psip_sync--;
    943 
    944     ringBuffer->Write(buffer,len);
    945772}
    946773
    947774void DVBRecorder::Reset(void)
     
    977804void DVBRecorder::CreatePAT(void)
    978805{
    979806    QMutexLocker read_lock(&_pid_lock);
    980     int next_cc = 0;
     807    uint next_cc = 0;
    981808    if (_pat)
    982809        next_cc = (_pat->tsheader()->ContinuityCounter() + 1) & 0x0F;
    983810
     
    989816    SetPAT(ProgramAssociationTable::Create(tsid, next_cc, pnum, pid));
    990817}
    991818
     819//#define USE_OLD_CREATE_PMT
     820#ifndef USE_OLD_CREATE_PMT
     821static void DescList_to_desc_list(DescriptorList &list, desc_list_t &vec)
     822{
     823    vec.clear();
     824    for (DescriptorList::iterator it = list.begin(); it != list.end(); ++it)
     825        vec.push_back((*it).Data);
     826}
     827
    992828void DVBRecorder::CreatePMT(void)
    993829{
    994830    QMutexLocker read_lock(&_pid_lock);
    995     int pmt_cc = 0;
     831
     832    // Figure out what goes into the PMT
     833    uint programNumber = 1; // MPEG Program Number
     834    desc_list_t gdesc;
     835    vector<uint> pids;
     836    vector<uint> types;
     837    vector<desc_list_t> pdesc;
     838    QValueList<ElementaryPIDObject>::iterator it;
     839
     840    DescList_to_desc_list(_input_pmt.Descriptors, gdesc);
     841
     842    it = _input_pmt.Components.begin();
     843    for (; it != _input_pmt.Components.end(); ++it)
     844    {
     845        if ((*it).Record)
     846        {
     847            pids.push_back((*it).PID);
     848            types.push_back((*it).Orig_Type);
     849            pdesc.resize(pdesc.size()+1);
     850            DescList_to_desc_list((*it).Descriptors, pdesc.back());
     851        }
     852    }
     853
     854    // Create the PMT
     855    ProgramMapTable *pmt = ProgramMapTable::Create(
     856        programNumber, PMT_PID, _input_pmt.PCRPID,
     857        _next_pmt_version, gdesc,
     858        pids, types, pdesc);
     859
     860    // Increment the continuity counter...
     861    uint pmt_cc = 0;
    996862    if (_pmt)
    997863        pmt_cc = (_pmt->tsheader()->ContinuityCounter() + 1) & 0x0F;
     864    pmt->tsheader()->SetContinuityCounter(pmt_cc);
    998865
    999     TSPacket pkt;
    1000     uint8_t *ts_packet = pkt.data();
    1001     memset(ts_packet, 0xFF, TSPacket::SIZE);
     866    SetPMT(pmt);
     867}
     868#endif   
    1002869
    1003     ts_packet[0] = 0x47;                            // sync byte
    1004     ts_packet[1] = 0x40 | ((PMT_PID >> 8) & 0x1F);  // payload start & PID
    1005     ts_packet[2] = PMT_PID & 0xFF;                  // PID
    1006     // scrambling, adaptation & continuity counter
    1007     ts_packet[3] = 0x10 | pmt_cc;
    1008     ts_packet[4] = 0x00;                            // pointer field
     870////////////////////////////////////////////////////////////
     871// Stuff below this comment will be phased out after 0.20 //
     872////////////////////////////////////////////////////////////
    1009873
    1010     ++pmt_cc &= 0x0F;   // inc. continuity counter
    1011     uint8_t *pmt_data = ts_packet + 5;
    1012     int p = 0;
     874#ifdef USE_OLD_CREATE_PMT
     875void DVBRecorder::CreatePMT(void)
     876{
     877    QMutexLocker read_lock(&_pid_lock);
     878    uint pmt_cc = 0;
     879    if (_pmt)
     880        pmt_cc = (_pmt->tsheader()->ContinuityCounter() + 1) & 0x0F;
    1013881
    1014     pmt_data[p++] = PMT_TID; // table ID
    1015     pmt_data[p++] = 0xB0;    // section syntax indicator
    1016     p++;                // section length (set later)
    1017     pmt_data[p++] = 0;       // program number (ServiceID)
    1018     pmt_data[p++] = 1;       // program number (ServiceID)
    1019     pmt_data[p++] = 0xC1 + (_next_pmt_version << 1); // Version + Current/Next
    1020     pmt_data[p++] = 0;       // Current Section
    1021     pmt_data[p++] = 0;       // Last Section
    1022     pmt_data[p++] = (_input_pmt.PCRPID >> 8) & 0x1F;
    1023     pmt_data[p++] = _input_pmt.PCRPID & 0xFF;
     882    ProgramMapTable *pmt = ProgramMapTable::CreateBlank();
     883    pmt->tsheader()->SetPID(PMT_PID);
     884    pmt->tsheader()->SetContinuityCounter(pmt_cc);
    1024885
     886    pmt->SetProgramNumber(1);
     887    pmt->SetPCRPID(_input_pmt.PCRPID);
     888    pmt->SetVersionNumber(_next_pmt_version);
     889    uint8_t *pmt_data = pmt->tsheader()->data() + 5;
     890    uint p = 10;
     891
    1025892    // Write descriptors
    1026893    int program_info_length = 0;
    1027894    DescriptorList::Iterator dit;
     
    1111978    pmt_data[p++] = (crc >> 16) & 0xFF;
    1112979    pmt_data[p++] = (crc >> 8) & 0xFF;
    1113980    pmt_data[p++] = crc & 0xFF;
    1114    
    1115     SetPMT(new ProgramMapTable(PSIPTable(pkt)));
     981
     982    SetPMT(pmt);
    1116983}
     984#endif   
    1117985
    1118 void DVBRecorder::DebugTSHeader(unsigned char* buffer, int len)
     986bool DVBRecorder::Poll(void) const
    1119987{
    1120     (void) len;
     988#ifndef USE_DRB
     989    struct pollfd polls;
     990    polls.fd      = _stream_fd;
     991    polls.events  = POLLIN;
     992    polls.revents = 0;
    1121993
    1122     uint8_t sync = buffer[0];
    1123     uint8_t transport_error = (buffer[1] & 0x80) >> 7;
    1124     uint8_t payload_start = (buffer[1] & 0x40) >> 6;
    1125     uint16_t pid = (buffer[1] & 0x1F) << 8 | buffer[2];
    1126     uint8_t transport_scrambled = (buffer[3] & 0xB0) >> 6;
    1127     uint8_t adaptation_control = (buffer[3] & 0x30) >> 4;
    1128     uint8_t counter = buffer[3] & 0x0F;
     994    int ret;
     995    do
     996        ret = poll(&polls, 1, POLL_INTERVAL);
     997    while (!request_pause && IsOpen() &&
     998           ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno))));
    1129999
    1130     int pos=4;
    1131     if (adaptation_control == 2 || adaptation_control == 3)
     1000    if (request_pause || !IsOpen())
     1001        return false;
     1002
     1003    if (ret > 0 && polls.revents & POLLIN)
    11321004    {
    1133         unsigned char adaptation_length;
    1134         adaptation_length = buffer[pos++];
    1135         pos += adaptation_length;
     1005        if (_poll_timer.elapsed() >= POLL_WARNING_TIMEOUT)
     1006        {
     1007            VERBOSE(VB_IMPORTANT, LOC_WARN +
     1008                    QString("Got data from card after %1 ms. (>%2)")
     1009                    .arg(_poll_timer.elapsed()).arg(POLL_WARNING_TIMEOUT));
     1010        }
     1011        _poll_timer.start();
     1012        return true;
    11361013    }
    11371014
    1138     QString debugmsg =
    1139         QString("sync: %1 err: %2 paystart: %3 "
    1140                 "pid: %4 enc: %5 adaptation: %6 counter: %7")
    1141         .arg(sync, 2, 16)
    1142         .arg(transport_error)
    1143         .arg(payload_start)
    1144         .arg(pid)
    1145         .arg(transport_scrambled)
    1146         .arg(adaptation_control)
    1147         .arg(counter);
     1015    if (ret == 0 && _poll_timer.elapsed() > POLL_WARNING_TIMEOUT)
     1016    {
     1017        VERBOSE(VB_GENERAL, LOC_WARN +
     1018                QString("No data from card in %1 ms.")
     1019                .arg(_poll_timer.elapsed()));
     1020    }
    11481021
    1149     const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[0]);
    1150     FindKeyframes(pkt);
     1022    if (ret < 0 && (EOVERFLOW == errno))
     1023    {
     1024        _stream_overflow_count++;
     1025        VERBOSE(VB_RECORD, LOC_ERR + "Driver buffer overflow detected.");
     1026    }
    11511027
    1152     int cardnum = _card_number_option;
    1153     GENERAL(debugmsg);
     1028    if ((ret < 0) || (ret > 0 && polls.revents & POLLERR))
     1029    {
     1030        VERBOSE(VB_IMPORTANT, LOC_ERR +
     1031                "Poll failed while waiting for data." + ENO);
     1032    }
     1033
     1034    return false;
     1035#else // if USE_DRB
     1036    return true;
     1037#endif // USE_DRB
    11541038}
     1039
     1040void DVBRecorder::ProcessDataPS(unsigned char *buffer, uint len)
     1041{
     1042    if (len < 4)
     1043        return;
     1044
     1045    if (buffer[0] != 0x00 || buffer[1] != 0x00 || buffer[2] != 0x01)
     1046    {
     1047        if (!_wait_for_keyframe_option || _keyframe_seen)
     1048            ringBuffer->Write(buffer, len);
     1049        return;
     1050    }
     1051
     1052    uint stream_id = buffer[3];
     1053    if ((stream_id >= PESStreamID::MPEGVideoStreamBegin) &&
     1054        (stream_id <= PESStreamID::MPEGVideoStreamEnd))
     1055    {
     1056        uint pos = 8 + buffer[8];
     1057        uint datalen = len - pos;
     1058
     1059        unsigned char *bufptr = &buffer[pos];
     1060        uint state = 0xFFFFFFFF;
     1061        uint state_byte = 0;
     1062        int prvcount = -1;
     1063
     1064        while (bufptr < &buffer[pos] + datalen)
     1065        {
     1066            state_byte  = (++prvcount < 3) ? _ps_rec_buf[prvcount] : *bufptr++;
     1067            uint last   = state;
     1068            state       = ((state << 8) | state_byte) & 0xFFFFFF;
     1069            stream_id   = state_byte;
     1070
     1071            // Skip non-prefixed stream id's and skip slice PES stream id's
     1072            if ((last != 0x000001) ||
     1073                ((stream_id >= PESStreamID::SliceStartCodeBegin) &&
     1074                 (stream_id <= PESStreamID::SliceStartCodeEnd)))
     1075            {
     1076                continue;
     1077            }
     1078
     1079            // Now process the stream id's we care about
     1080            if (PESStreamID::PictureStartCode == stream_id)
     1081                _frames_written_count++;
     1082            else if (PESStreamID::SequenceStartCode == stream_id)
     1083                _keyframe_seen = true;
     1084            else if (PESStreamID::GOPStartCode == stream_id)
     1085            {
     1086                _position_map_lock.lock();
     1087                bool save_map = false;
     1088                if (!_position_map.contains(_frames_written_count))
     1089                {
     1090                    long long startpos = ringBuffer->GetWritePosition();
     1091                    _position_map_delta[_frames_written_count] = startpos;
     1092                    _position_map[_frames_written_count] = startpos;
     1093                    save_map = true;
     1094                }
     1095                _position_map_lock.unlock();
     1096                if (save_map)
     1097                    SavePositionMap(false);
     1098            }
     1099        }
     1100    }
     1101    memcpy(_ps_rec_buf, &buffer[len - 3], 3);
     1102
     1103    if (!_wait_for_keyframe_option || _keyframe_seen)
     1104        ringBuffer->Write(buffer, len);
     1105}
     1106
     1107void DVBRecorder::process_data_ps_cb(unsigned char *buffer,
     1108                                     int len, void *priv)
     1109{
     1110    ((DVBRecorder*)priv)->ProcessDataPS(buffer, (uint)len);
     1111}
     1112
     1113ipack *DVBRecorder::CreateIPack(ES_Type type)
     1114{
     1115    ipack* ip = (ipack*)malloc(sizeof(ipack));
     1116    assert(ip);
     1117    switch (type)
     1118    {
     1119        case ES_TYPE_VIDEO_MPEG1:
     1120        case ES_TYPE_VIDEO_MPEG2:
     1121            init_ipack(ip, 2048, process_data_ps_cb);
     1122            ip->replaceid = _ps_rec_video_id++;
     1123            break;
     1124
     1125        case ES_TYPE_AUDIO_MPEG1:
     1126        case ES_TYPE_AUDIO_MPEG2:
     1127            init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */
     1128            ip->replaceid = _ps_rec_audio_id++;
     1129            break;
     1130
     1131        case ES_TYPE_AUDIO_AC3:
     1132            init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */
     1133            ip->priv_type = PRIV_TS_AC3;
     1134            break;
     1135
     1136        case ES_TYPE_SUBTITLE:
     1137        case ES_TYPE_TELETEXT:
     1138            init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */
     1139            ip->priv_type = PRIV_DVB_SUB;
     1140            break;
     1141
     1142        default:
     1143            init_ipack(ip, 2048, process_data_ps_cb);
     1144            break;
     1145    }
     1146    ip->data = (void*)this;
     1147    return ip;
     1148}
     1149
     1150#ifndef USE_DRB
     1151static ssize_t safe_read(int fd, unsigned char *buf, size_t bufsize)
     1152{
     1153    ssize_t size = read(fd, buf, bufsize);
     1154
     1155    if ((size < 0) &&
     1156        (errno != EAGAIN) && (errno != EOVERFLOW) && (EINTR != errno))
     1157    {
     1158        VERBOSE(VB_IMPORTANT, "DVB:safe_read(): "
     1159                "Error reading from DVB device." + ENO);
     1160    }
     1161
     1162    return size;
     1163}
     1164#endif // USE_DRB
  • libs/libmythtv/DeviceReadBuffer.h

     
     1// -*- Mode: c++ -*-
     2/* Device Buffer written by John Poet */
     3
     4#ifndef _DEVICEREADBUFFER_H_
     5#define _DEVICEREADBUFFER_H_
     6
     7#include <unistd.h>
     8#include <pthread.h>
     9#include <sys/poll.h>
     10
     11#include <qmutex.h>
     12#include <qwaitcondition.h>
     13#include <qstring.h>
     14
     15#include "util.h"
     16
     17class ReaderPausedCB
     18{
     19  public:
     20    virtual void ReaderPaused(int fd) = 0;
     21};
     22
     23/** \class DeviceReadBuffer
     24 *  \brief Buffers reads from device files.
     25 *
     26 *  This allows us to read the device regularly even in the presence
     27 *  of long blocking conditions on writing to disk or accessing the
     28 *  database.
     29 */
     30class DeviceReadBuffer
     31{
     32  public:
     33    DeviceReadBuffer(ReaderPausedCB *callback, bool use_poll = true);
     34   ~DeviceReadBuffer();
     35
     36    bool Setup(const QString &streamName, int streamfd);
     37    void Teardown(void);
     38
     39    void Start(void);
     40    void Reset(const QString &streamName, int streamfd);
     41    void Stop(void);
     42
     43    void SetRequestPause(bool request);
     44    bool IsPaused(void) const;
     45    bool WaitForUnpause(int timeout);
     46   
     47    bool IsErrored(void) const { return error; }
     48    bool IsEOF(void)     const { return eof;   }
     49
     50    uint Read(unsigned char *buf, uint count);
     51
     52  private:
     53    static void *boot_ringbuffer(void *);
     54    void fill_ringbuffer(void);
     55
     56    void SetPaused(bool);
     57    void IncrWritePointer(uint len);
     58    void IncrReadPointer(uint len);
     59
     60    bool HandlePausing(void);
     61    bool Poll(void) const;
     62    uint WaitForUnused(uint bytes_needed) const;
     63    uint WaitForUsed  (uint bytes_needed) const;
     64
     65    bool IsPauseRequested(void) const;
     66    bool IsOpen(void) const { return _stream_fd >= 0; }
     67    uint GetUnused(void) const;
     68    uint GetUsed(void) const;
     69    uint GetContiguousUnused(void) const;
     70
     71    bool CheckForErrors(ssize_t read_len, uint &err_cnt);
     72    void ReportStats(void);
     73
     74    QString          videodevice;
     75    int              _stream_fd;
     76
     77    ReaderPausedCB  *readerPausedCB;
     78    pthread_t        thread;
     79
     80    // Data for managing the device ringbuffer
     81    mutable QMutex   lock;
     82    bool             run;
     83    bool             running;
     84    bool             eof;
     85    bool             error;
     86    bool             request_pause;
     87    bool             paused;
     88    bool             using_poll;
     89
     90    size_t           size;
     91    size_t           used;
     92    size_t           dev_read_size;
     93    size_t           min_read;
     94    unsigned char   *buffer;
     95    unsigned char   *readPtr;
     96    unsigned char   *writePtr;
     97    unsigned char   *endPtr;
     98
     99    QWaitCondition   pauseWait;
     100    QWaitCondition   unpauseWait;
     101
     102    // statistics
     103    size_t           max_used;
     104    size_t           avg_used;
     105    size_t           avg_cnt;
     106    MythTimer        lastReport;
     107};
     108
     109#endif // _DEVICEREADBUFFER_H_
  • libs/libmythtv/hdtvrecorder.h

     
    33 *  HDTVRecorder
    44 *  Copyright (c) 2003-2004 by Brandon Beattie, Doug Larrick,
    55 *    Jason Hoos, and Daniel Thor Kristjansson
    6  *  Device ringbuffer added by John Poet
    76 *  Distributed as part of MythTV under GPL v2 and later.
    87 */
    98
     
    1211
    1312#include "dtvrecorder.h"
    1413#include "tsstats.h"
     14#include "DeviceReadBuffer.h"
    1515
    1616struct AVFormatContext;
    1717struct AVPacket;
     
    2828 *
    2929 *  \sa DTVRecorder, DVBRecorder
    3030 */
    31 class HDTVRecorder : public DTVRecorder
     31class HDTVRecorder : public DTVRecorder, private ReaderPausedCB
    3232{
    3333    Q_OBJECT
    3434    friend class ATSCStreamData;
    3535    friend class TSPacketProcessor;
    3636  public:
    37     enum {report_loops = 20000};
    38 
    3937    HDTVRecorder(TVRec *rec);
    4038   ~HDTVRecorder();
    4139
     
    4745    void StartRecording(void);
    4846    void StopRecording(void);
    4947
    50     void Pause(bool clear = false);
    51     bool IsPaused(void) const;
    52 
    5348    void Reset(void);
    5449
    5550    bool Open(void);
     
    6156    void deleteLater(void);
    6257
    6358  private:
     59    bool IsOpen(void) const { return _stream_fd >= 0; }
     60    bool Close(void);
     61
    6462    void TeardownAll(void);
    65     int ProcessData(unsigned char *buffer, int len);
     63    uint ProcessDataTS(unsigned char *buffer, uint len);
    6664    bool ProcessTSPacket(const TSPacket& tspacket);
    6765    void HandleVideo(const TSPacket* tspacket);
    6866    void HandleAudio(const TSPacket* tspacket);
    6967
    7068    int ResyncStream(unsigned char *buffer, int curr_pos, int len);
    7169
    72     static void *boot_ringbuffer(void *);
    73     void fill_ringbuffer(void);
    74     int ringbuf_read(unsigned char *buffer, size_t count);
     70    void ReaderPaused(int fd);
     71    bool PauseAndWait(int timeout = 100);
    7572
    76  private slots:
     73    bool readchan(int chanfd, unsigned char* buffer, int dlen);
     74    bool syncchan(int chanfd, int dlen, int keepsync);
     75
     76  private slots:
    7777    void WritePAT(ProgramAssociationTable*);
    7878    void WritePMT(ProgramMapTable*);
    7979    void ProcessMGT(const MasterGuideTable*);
    8080    void ProcessVCT(uint, const VirtualChannelTable*);
    81  private:
    82     ATSCStreamData* _atsc_stream_data;
    8381
     82  private:
     83    ATSCStreamData   *_atsc_stream_data;
     84    DeviceReadBuffer *_drb;
     85
    8486    // statistics
    85     TSStats _ts_stats;
    86     long long _resync_count;
    87     size_t loop;
     87    TSStats           _ts_stats;
     88    long long         _resync_count;
    8889
    89     // Data for managing the device ringbuffer
    90     struct {
    91         pthread_t        thread;
    92         mutable pthread_mutex_t lock;
    93         mutable pthread_mutex_t lock_stats;
    94 
    95         bool             run;
    96         bool             eof;
    97         bool             error;
    98         bool             request_pause;
    99         bool             paused;
    100         size_t           size;
    101         size_t           used;
    102         size_t           max_used;
    103         size_t           avg_used;
    104         size_t           avg_cnt;
    105         size_t           dev_read_size;
    106         size_t           min_read;
    107         unsigned char  * buffer;
    108         unsigned char  * readPtr;
    109         unsigned char  * writePtr;
    110         unsigned char  * endPtr;
    111     } ringbuf;
     90    /// unsynced packets to look at before giving up initially
     91    static const uint INIT_SYNC_WINDOW_SIZE;
     92    /// synced packets to require before starting recording
     93    static const uint INIT_MIN_NUM_SYNC_PACKETS;
    11294};
    11395
    11496#endif
  • libs/libmythtv/hdtvrecorder.cpp

     
    8484#include "atsctables.h"
    8585#include "atscstreamdata.h"
    8686#include "tv_rec.h"
     87#include "DeviceReadBuffer.h"
    8788
    8889// AVLib/FFMPEG includes
    8990#include "../libavcodec/avcodec.h"
    9091#include "../libavformat/avformat.h"
    9192#include "../libavformat/mpegts.h"
    9293
    93 #define REPORT_RING_STATS 1
     94#define LOC QString("HDTVRec(%1):").arg(videodevice)
     95#define LOC_ERR QString("HDTVRec(%1) Error:").arg(videodevice)
    9496
    9597#define DEFAULT_SUBCHANNEL 1
    9698
     
    109111        };
    110112#endif
    111113
     114const uint HDTVRecorder::INIT_SYNC_WINDOW_SIZE     = 50;
     115const uint HDTVRecorder::INIT_MIN_NUM_SYNC_PACKETS = 10;
     116
    112117HDTVRecorder::HDTVRecorder(TVRec *rec)
    113118    : DTVRecorder(rec, "HDTVRecorder"), _atsc_stream_data(0), _resync_count(0)
    114119{
     
    116121    connect(_atsc_stream_data, SIGNAL(UpdatePATSingleProgram(
    117122                                          ProgramAssociationTable*)),
    118123            this, SLOT(WritePAT(ProgramAssociationTable*)));
    119     connect(_atsc_stream_data, SIGNAL(UpdatePMTSingleProgram(ProgramMapTable*)),
    120             this, SLOT(WritePMT(ProgramMapTable*)));
     124    connect(_atsc_stream_data,
     125            SIGNAL(UpdatePMTSingleProgram(ProgramMapTable*)),
     126            this,
     127            SLOT(WritePMT(ProgramMapTable*)));
    121128    connect(_atsc_stream_data, SIGNAL(UpdateMGT(const MasterGuideTable*)),
    122129            this, SLOT(ProcessMGT(const MasterGuideTable*)));
    123130    connect(_atsc_stream_data,
     
    125132            this, SLOT(ProcessVCT(uint, const VirtualChannelTable*)));
    126133
    127134    _buffer_size = TSPacket::SIZE * 1500;
    128     if ((_buffer = new unsigned char[_buffer_size])) {
    129         // make valgrind happy, initialize buffer memory
     135    _buffer = new unsigned char[_buffer_size];
     136
     137    // make valgrind happy, initialize buffer memory
     138    if (_buffer)
    130139        memset(_buffer, 0xFF, _buffer_size);
    131     }
    132140
    133     VERBOSE(VB_RECORD, QString("HD buffer size %1 KB").arg(_buffer_size/1024));
     141    VERBOSE(VB_RECORD, LOC +
     142            QString("buffer size %1 KB").arg(_buffer_size/1024));
    134143
    135     ringbuf.run = false;
    136     ringbuf.buffer = 0;
    137     pthread_mutex_init(&ringbuf.lock, NULL);
    138     pthread_mutex_init(&ringbuf.lock_stats, NULL);
    139     loop = random() % (report_loops / 2);
     144    _drb = new DeviceReadBuffer(this);
    140145}
    141146
    142147void HDTVRecorder::TeardownAll(void)
    143148{
    144     // Make SURE that the ringbuffer thread is cleaned up
     149    // Make SURE that the device read thread is cleaned up -- John Poet
    145150    StopRecording();
    146151
    147     if (_stream_fd >= 0)
    148     {
    149         close(_stream_fd);
    150         _stream_fd = -1;
    151     }
     152    Close();
     153
    152154    if (_atsc_stream_data)
    153155    {
    154156        delete _atsc_stream_data;
     
    164166HDTVRecorder::~HDTVRecorder()
    165167{
    166168    TeardownAll();
    167     pthread_mutex_destroy(&ringbuf.lock);
    168     pthread_mutex_destroy(&ringbuf.lock_stats);
     169    delete _drb;
    169170}
    170171
    171172void HDTVRecorder::deleteLater(void)
     
    189190    SetOption("vbiformat", gContext->GetSetting("VbiFormat"));
    190191}
    191192
    192 bool HDTVRecorder::Open()
     193bool HDTVRecorder::Open(void)
    193194{
    194195    if (!_atsc_stream_data || !_buffer)
    195196        return false;
    196197
    197198#if FAKE_VIDEO
    198199    // open file instead of device
    199     if (_stream_fd >=0 && close(_stream_fd))
    200     {
    201         VERBOSE(VB_IMPORTANT,
    202                 QString("HDTVRecorder::Open(): Error, failed to close "
    203                         "existing fd (%1)").arg(strerror(errno)));
    204         return false;
    205     }
    206200
     201    Close(); // close old video file
    207202    _stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
    208     VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
    209     fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
     203
     204    VERBOSE(VB_IMPORTANT, LOC_ERR + QString("Opened fake video source '%1'")
     205            .arg(FAKE_VIDEO_FILES[fake_video_index]) + ENO);
     206
     207    fake_video_index = (fake_video_index + 1) % FAKE_VIDEO_NUM;
     208
    210209#else
    211     if (_stream_fd <= 0)
     210    if (!IsOpen())
    212211        _stream_fd = open(videodevice.ascii(), O_RDWR);
    213212#endif
    214     if (_stream_fd <= 0)
     213
     214    if (!IsOpen())
    215215    {
    216         VERBOSE(VB_IMPORTANT, QString("Can't open video device: %1 chanfd = %2")
    217                 .arg(videodevice).arg(_stream_fd));
    218         perror("open video:");
     216        VERBOSE(VB_IMPORTANT, LOC_ERR +
     217                QString("Couldn't open video device: '%1'")
     218                .arg(videodevice) + ENO);
    219219    }
    220     return (_stream_fd>0);
     220
     221    return IsOpen();
    221222}
    222223
     224bool HDTVRecorder::Close(void)
     225{
     226    if (IsOpen() && (0 != close(_stream_fd)))
     227    {
     228        VERBOSE(VB_IMPORTANT, LOC_ERR +
     229                "Failed to close file descriptor." + ENO);
     230        return false;
     231    }
     232    _stream_fd = -1;
     233    return true;
     234}
     235
    223236void HDTVRecorder::SetStreamData(ATSCStreamData *stream_data)
    224237{
    225238    if (stream_data == _atsc_stream_data)
     
    231244        delete old_data;
    232245}
    233246
    234 bool readchan(int chanfd, unsigned char* buffer, int dlen) {
     247bool HDTVRecorder::readchan(int chanfd, unsigned char* buffer, int dlen)
     248{
    235249    int len = read(chanfd, buffer, dlen); // read next byte
    236250    if (dlen != len)
    237251    {
    238252        if (len < 0)
    239253        {
    240             VERBOSE(VB_IMPORTANT, QString("HD1 error reading from device"));
    241             perror("read");
     254            VERBOSE(VB_IMPORTANT, LOC_ERR +
     255                    "Reading from device failed" + ENO);
    242256        }
    243257        else if (len == 0)
    244             VERBOSE(VB_IMPORTANT, QString("HD2 end of file found in packet"));
     258            VERBOSE(VB_IMPORTANT, LOC_ERR + "EOF found in TS packet");
    245259        else
    246             VERBOSE(VB_IMPORTANT, QString("HD3 partial read. This shouldn't happen!"));
     260            VERBOSE(VB_IMPORTANT, LOC_ERR +
     261                    "Partial read during initial TS sync phase");
    247262    }
    248263    return (dlen == len);
    249264}
    250265
    251 bool syncchan(int chanfd, int dlen, int keepsync) {
     266bool HDTVRecorder::syncchan(int chanfd, int dlen, int keepsync)
     267{
    252268    unsigned char b[188];
    253269    int i, j;
    254     for (i=0; i<dlen; i++) {
     270    for (i=0; i<dlen; i++)
     271    {
    255272        if (!readchan(chanfd, b, 1))
    256273            break;
    257274        if (SYNC_BYTE == b[0])
    258275        {
    259             if (readchan(chanfd, &b[1], TSPacket::SIZE-1)) {
     276            if (readchan(chanfd, &b[1], TSPacket::SIZE-1))
     277            {
    260278                i += (TSPacket::SIZE - 1);
    261279                for (j=0; j<keepsync; j++)
    262280                {
     
    268286                }
    269287                if (j==keepsync)
    270288                {
    271                     VERBOSE(VB_RECORD,
    272                             QString("HD4 obtained device stream sync after reading %1 bytes").
    273                             arg(dlen));
     289                    VERBOSE(VB_RECORD, LOC + "Obtained TS sync, "+
     290                            QString("after reading %1 bytes").arg(dlen));
    274291                    return true;
    275292                }
    276293                continue;
     
    278295            break;
    279296        }
    280297    }
    281     VERBOSE(VB_IMPORTANT, QString("HD5 Error: could not obtain sync"));
     298    VERBOSE(VB_IMPORTANT, LOC_ERR + "Could not obtain TS sync");
    282299    return false;
    283300}
    284301
    285 void * HDTVRecorder::boot_ringbuffer(void * arg)
    286 {
    287     HDTVRecorder *dtv = (HDTVRecorder *)arg;
    288     dtv->fill_ringbuffer();
    289     return NULL;
    290 }
     302#define SR_CHK(MSG) \
     303    if (!ok) { VERBOSE(VB_IMPORTANT, MSG); _error = true; return; }
    291304
    292 void HDTVRecorder::fill_ringbuffer(void)
     305void HDTVRecorder::StartRecording(void)
    293306{
    294     int       errcnt = 0;
    295     int       len;
    296     size_t    unused, used;
    297     size_t    contiguous;
    298     size_t    read_size;
    299     bool      run, request_pause, paused;
     307    bool ok        = true;
     308    uint len       = 0;
     309    uint remainder = 0;
    300310
    301     pthread_mutex_lock(&ringbuf.lock);
    302     ringbuf.run = true;
    303     pthread_mutex_unlock(&ringbuf.lock);
     311    VERBOSE(VB_RECORD, LOC + "StartRecording()");
    304312
    305     for (;;)
    306     {
    307         pthread_mutex_lock(&ringbuf.lock);
    308         run = ringbuf.run;
    309         unused = ringbuf.size - ringbuf.used;
    310         request_pause = ringbuf.request_pause;
    311         paused = ringbuf.paused;
    312         pthread_mutex_unlock(&ringbuf.lock);
     313    ok = Open();
     314    SR_CHK("Failed to open device.");
    313315
    314         if (!run)
    315             break;
     316    ok = _drb->Setup(videodevice, _stream_fd);
     317    SR_CHK("Failed to allocate device read buffer.");
    316318
    317         if (request_pause)
    318         {
    319             pthread_mutex_lock(&ringbuf.lock);
    320             ringbuf.paused = true;
    321             pthread_mutex_unlock(&ringbuf.lock);
     319    ok = syncchan(_stream_fd,
     320                  INIT_SYNC_WINDOW_SIZE * TSPacket::SIZE,
     321                  INIT_MIN_NUM_SYNC_PACKETS);
     322    SR_CHK("Failed to sync to transport stream to valid packet.");
    322323
    323             pauseWait.wakeAll();
    324             if (tvrec)
    325                 tvrec->RecorderPaused();
     324    _drb->Start();
    326325
    327             usleep(1000);
    328             continue;
    329         }
    330         else if (paused)
    331         {
    332             pthread_mutex_lock(&ringbuf.lock);
    333             ringbuf.writePtr = ringbuf.readPtr = ringbuf.buffer;
    334             ringbuf.used = 0;
    335             ringbuf.paused = false;
    336             pthread_mutex_unlock(&ringbuf.lock);
    337         }
    338 
    339         contiguous = ringbuf.endPtr - ringbuf.writePtr;
    340 
    341         while (unused < TSPacket::SIZE && contiguous > TSPacket::SIZE)
    342         {
    343             usleep(500);
    344 
    345             pthread_mutex_lock(&ringbuf.lock);
    346             unused = ringbuf.size - ringbuf.used;
    347             request_pause = ringbuf.request_pause;
    348             pthread_mutex_unlock(&ringbuf.lock);
    349 
    350             if (request_pause)
    351                 break;
    352         }
    353         if (request_pause)
    354             continue;
    355 
    356         read_size = unused > contiguous ? contiguous : unused;
    357         if (read_size > ringbuf.dev_read_size)
    358             read_size = ringbuf.dev_read_size;
    359 
    360         len = read(_stream_fd, ringbuf.writePtr, read_size);
    361 
    362         if (len < 0)
    363         {
    364             if (errno == EINTR)
    365                 continue;
    366 
    367             VERBOSE(VB_IMPORTANT, QString("HD7 error reading from %1")
    368                     .arg(videodevice));
    369             perror("read");
    370             if (++errcnt > 5)
    371             {
    372                 pthread_mutex_lock(&ringbuf.lock);
    373                 ringbuf.error = true;
    374                 pthread_mutex_unlock(&ringbuf.lock);
    375 
    376                 break;
    377             }
    378 
    379             usleep(500);
    380             continue;
    381         }
    382         else if (len == 0)
    383         {
    384             if (++errcnt > 5)
    385             {
    386                 VERBOSE(VB_IMPORTANT, QString("HD8 %1 end of file found.")
    387                         .arg(videodevice));
    388 
    389                 pthread_mutex_lock(&ringbuf.lock);
    390                 ringbuf.eof = true;
    391                 pthread_mutex_unlock(&ringbuf.lock);
    392 
    393                 break;
    394             }
    395             usleep(500);
    396             continue;
    397         }
    398 
    399         errcnt = 0;
    400 
    401         pthread_mutex_lock(&ringbuf.lock);
    402         ringbuf.used += len;
    403         used = ringbuf.used;
    404         ringbuf.writePtr += len;
    405         pthread_mutex_unlock(&ringbuf.lock);
    406 
    407 #ifdef REPORT_RING_STATS
    408         pthread_mutex_lock(&ringbuf.lock_stats);
    409 
    410         if (ringbuf.max_used < used)
    411             ringbuf.max_used = used;
    412 
    413         ringbuf.avg_used = ((ringbuf.avg_used * ringbuf.avg_cnt) + used)
    414                            / ++ringbuf.avg_cnt;
    415         pthread_mutex_unlock(&ringbuf.lock_stats);
    416 #endif
    417 
    418         if (ringbuf.writePtr == ringbuf.endPtr)
    419             ringbuf.writePtr = ringbuf.buffer;
    420     }
    421 
    422     close(_stream_fd);
    423     _stream_fd = -1;
    424 }
    425 
    426 /* read count bytes from ring into buffer */
    427 int HDTVRecorder::ringbuf_read(unsigned char *buffer, size_t count)
    428 {
    429     size_t          avail;
    430     size_t          cnt = count;
    431     size_t          min_read;
    432     unsigned char  *cPtr = buffer;
    433 
    434     bool            dev_error = false;
    435     bool            dev_eof = false;
    436 
    437     pthread_mutex_lock(&ringbuf.lock);
    438     avail = ringbuf.used;
    439     pthread_mutex_unlock(&ringbuf.lock);
    440 
    441     min_read = cnt < ringbuf.min_read ? cnt : ringbuf.min_read;
    442 
    443     while (min_read > avail)
    444     {
    445         usleep(50000);
    446 
    447         if (request_pause || dev_error || dev_eof)
    448             return 0;
    449 
    450         pthread_mutex_lock(&ringbuf.lock);
    451         dev_error = ringbuf.error;
    452         dev_eof = ringbuf.eof;
    453         avail = ringbuf.used;
    454         pthread_mutex_unlock(&ringbuf.lock);
    455     }
    456     if (cnt > avail)
    457         cnt = avail;
    458 
    459     if (ringbuf.readPtr + cnt > ringbuf.endPtr)
    460     {
    461         size_t      len;
    462 
    463         // Process as two pieces
    464         len = ringbuf.endPtr - ringbuf.readPtr;
    465         memcpy(cPtr, ringbuf.readPtr, len);
    466         cPtr += len;
    467         len = cnt - len;
    468 
    469         // Wrap arround to begining of buffer
    470         ringbuf.readPtr = ringbuf.buffer;
    471         memcpy(cPtr, ringbuf.readPtr, len);
    472         ringbuf.readPtr += len;
    473     }
    474     else
    475     {
    476         memcpy(cPtr, ringbuf.readPtr, cnt);
    477         ringbuf.readPtr += cnt;
    478     }
    479 
    480     pthread_mutex_lock(&ringbuf.lock);
    481     ringbuf.used -= cnt;
    482     pthread_mutex_unlock(&ringbuf.lock);
    483 
    484     if (ringbuf.readPtr == ringbuf.endPtr)
    485         ringbuf.readPtr = ringbuf.buffer;
    486     else
    487     {
    488 #ifdef REPORT_RING_STATS
    489         size_t samples, avg, max;
    490 
    491         if (++loop == report_loops)
    492         {
    493             loop = 0;
    494             pthread_mutex_lock(&ringbuf.lock_stats);
    495             avg = ringbuf.avg_used;
    496             samples = ringbuf.avg_cnt;
    497             max = ringbuf.max_used;
    498             ringbuf.avg_used = 0;
    499             ringbuf.avg_cnt = 0;
    500             ringbuf.max_used = 0;
    501             pthread_mutex_unlock(&ringbuf.lock_stats);
    502 
    503             VERBOSE(VB_IMPORTANT, QString("%1 ringbuf avg %2% max %3%"
    504                                           " samples %4")
    505                     .arg(videodevice)
    506                     .arg((static_cast<double>(avg)
    507                           / ringbuf.size) * 100.0)
    508                     .arg((static_cast<double>(max)
    509                           / ringbuf.size) * 100.0)
    510                     .arg(samples));
    511         }
    512         else
    513 #endif
    514             usleep(25);
    515     }
    516 
    517     return cnt;
    518 }
    519 
    520 void HDTVRecorder::StartRecording(void)
    521 {
    522     bool            pause;
    523     bool            dev_error, dev_eof;
    524     int             len;
    525 
    526     const int unsyncpackets = 50; // unsynced packets to look at before giving up
    527     const int syncpackets   = 10; // synced packets to require before starting recording
    528 
    529     VERBOSE(VB_RECORD, QString("StartRecording"));
    530 
    531     if (!Open())
    532     {
    533         _error = true;       
    534         return;
    535     }
    536 
    537326    _request_recording = true;
    538     _recording = true;
     327    _recording         = true;
    539328
    540     // Setup device ringbuffer
    541     delete[] ringbuf.buffer;
    542 
    543 //    ringbuf.size = 60 * 1024 * TSPacket::SIZE;
    544     ringbuf.size = gContext->GetNumSetting("HDRingbufferSize", 50*188);
    545     ringbuf.size *= 1024;
    546 
    547     if ((ringbuf.buffer =
    548          new unsigned char[ringbuf.size + TSPacket::SIZE]) == NULL)
     329    // Process packets while recording is requested
     330    while (_request_recording && !_error)
    549331    {
    550         VERBOSE(VB_IMPORTANT, "Failed to allocate HDTVRecorder ring buffer.");
    551         _error = true;
    552         return;
    553     }
    554 
    555     memset(ringbuf.buffer, 0xFF, ringbuf.size + TSPacket::SIZE);
    556     ringbuf.endPtr = ringbuf.buffer + ringbuf.size;
    557     ringbuf.readPtr = ringbuf.writePtr = ringbuf.buffer;
    558     ringbuf.dev_read_size = TSPacket::SIZE * 48;
    559     ringbuf.min_read = TSPacket::SIZE * 4;
    560     ringbuf.used = 0;
    561     ringbuf.max_used = 0;
    562     ringbuf.avg_used = 0;
    563     ringbuf.avg_cnt = 0;
    564     ringbuf.request_pause = false;
    565     ringbuf.paused = false;
    566     ringbuf.error = false;
    567     ringbuf.eof = false;
    568 
    569     VERBOSE(VB_RECORD, QString("HD ring buffer size %1 KB")
    570             .arg(ringbuf.size/1024));
    571 
    572     // sync device stream so it starts with a valid ts packet
    573     if (!syncchan(_stream_fd, TSPacket::SIZE*unsyncpackets, syncpackets))
    574     {
    575         _error = true;
    576         return;
    577     }
    578 
    579     // create thread to fill the ringbuffer
    580     pthread_create(&ringbuf.thread, NULL, boot_ringbuffer,
    581                    reinterpret_cast<void *>(this));
    582 
    583     int remainder = 0;
    584     // TRANSFER DATA
    585     while (_request_recording)
    586     {
    587         pthread_mutex_lock(&ringbuf.lock);
    588         dev_error = ringbuf.error;
    589         dev_eof = ringbuf.eof;
    590         pause = ringbuf.paused;
    591         pthread_mutex_unlock(&ringbuf.lock);
    592 
    593         if (request_pause)
    594         {
    595             pthread_mutex_lock(&ringbuf.lock);
    596             ringbuf.request_pause = true;
    597             pthread_mutex_unlock(&ringbuf.lock);
    598 
    599             usleep(1000);
     332        if (PauseAndWait())
    600333            continue;
    601         }
    602         else if (pause)
    603         {
    604             pthread_mutex_lock(&ringbuf.lock);
    605             ringbuf.request_pause = false;
    606             pthread_mutex_unlock(&ringbuf.lock);
    607334
    608             usleep(1500);
    609             continue;
    610         }
     335        len = _drb->Read(&(_buffer[remainder]), _buffer_size - remainder);
    611336
    612         if (dev_error)
    613         {
    614             VERBOSE(VB_IMPORTANT, "HDTV: device error detected");
    615             _error = true;
    616             break;
    617         }
    618 
    619         if (dev_eof)
    620             break;
    621 
    622         len = ringbuf_read(&(_buffer[remainder]), _buffer_size - remainder);
    623 
    624337        if (len == 0)
    625338            continue;
    626339
    627340        len += remainder;
    628         remainder = ProcessData(_buffer, len);
     341        remainder = ProcessDataTS(_buffer, len);
    629342        if (remainder > 0) // leftover bytes
    630343            memmove(_buffer, &(_buffer[_buffer_size - remainder]),
    631344                    remainder);
     345
     346        // Check for DRB errors
     347        if (_drb->IsErrored())
     348        {
     349            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device error detected");
     350            _error = true;
     351        }
     352
     353        if (_drb->IsEOF())
     354        {
     355            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device EOF detected");
     356            _error = true;
     357        }
    632358    }
    633359
    634360    FinishRecording();
    635361    _recording = false;
    636362}
     363#undef SR_CHK
    637364
    638365void HDTVRecorder::StopRecording(void)
    639366{
     
    649376
    650377    _request_recording = false;
    651378
    652     pthread_mutex_lock(&ringbuf.lock);
    653     bool run = ringbuf.run;
    654     ringbuf.run = false;
    655     pthread_mutex_unlock(&ringbuf.lock);
     379    _drb->Stop();
    656380
    657     if (run)
    658         pthread_join(ringbuf.thread, NULL);
     381    if (ok)
     382        _drb->Teardown();
     383    else // Better to have a memory leak, than a segfault? -- John Poet
     384        VERBOSE(VB_IMPORTANT, LOC_ERR + "DeviceReadBuffer not cleaned up!");
    659385
    660     if (!ok)
    661     {
    662         // Better to have a memory leak, then a segfault?
    663         VERBOSE(VB_IMPORTANT, "DTV ringbuffer not cleaned up!\n");
    664     }
    665     else
    666     {
    667         delete[] ringbuf.buffer;
    668         ringbuf.buffer = 0;
    669     }
     386    while (_recording)
     387        usleep(2000);
     388
    670389    tvrec = rec;
    671390}
    672391
    673 void HDTVRecorder::Pause(bool /*clear*/)
     392void HDTVRecorder::ReaderPaused(int /*fd*/)
    674393{
    675     pthread_mutex_lock(&ringbuf.lock);
    676     ringbuf.paused = false;
    677     pthread_mutex_unlock(&ringbuf.lock);
    678     request_pause = true;
     394    pauseWait.wakeAll();
     395    if (tvrec)
     396        tvrec->RecorderPaused();
    679397}
    680398
    681 bool HDTVRecorder::IsPaused(void) const
     399bool HDTVRecorder::PauseAndWait(int timeout)
    682400{
    683     pthread_mutex_lock(&ringbuf.lock);
    684     bool paused = ringbuf.paused;
    685     pthread_mutex_unlock(&ringbuf.lock);
     401#ifdef USE_DRB
     402    if (request_pause)
     403    {
     404        paused = true;
     405        if (!_drb->IsPaused())
     406            _drb->SetRequestPause(true);
    686407
     408        unpauseWait.wait(timeout);
     409    }
     410    else if (_drb->IsPaused())
     411    {
     412        _drb->SetRequestPause(false);
     413        _drb->WaitForUnpause(timeout);
     414        paused = _drb->IsPaused();
     415    }
     416    else
     417    {
     418        paused = false;
     419    }
    687420    return paused;
     421#else // if !USE_DRB
     422    return RecorderBase::PauseAndWait(timeout);
     423#endif // !USE_DRB
    688424}
    689425
    690426int HDTVRecorder::ResyncStream(unsigned char *buffer, int curr_pos, int len)
     
    695431    if (nextpos >= len)
    696432        return -1; // not enough bytes; caller should try again
    697433   
    698     while (buffer[pos] != SYNC_BYTE || buffer[nextpos] != SYNC_BYTE) {
     434    while (buffer[pos] != SYNC_BYTE || buffer[nextpos] != SYNC_BYTE)
     435    {
    699436        pos++;
    700437        nextpos++;
    701438        if (nextpos == len)
     
    707444
    708445void HDTVRecorder::WritePAT(ProgramAssociationTable *pat)
    709446{
     447    if (!pat)
     448        return;
     449
    710450    int next = (pat->tsheader()->ContinuityCounter()+1)&0xf;
    711451    pat->tsheader()->SetContinuityCounter(next);
    712452    ringBuffer->Write(pat->tsheader()->data(), TSPacket::SIZE);
    713453}
    714454
    715 #if WHACK_A_BUG_VIDEO
    716 static int WABV_base_pid     = 0x100;
    717 #define WABV_WAIT 60
    718 static int WABV_wait_a_while = WABV_WAIT;
    719 bool WABV_started = false;
    720 #endif
    721 
    722 #if WHACK_A_BUG_AUDIO
    723 static int WABA_base_pid     = 0x200;
    724 #define WABA_WAIT 60
    725 static int WABA_wait_a_while = WABA_WAIT;
    726 bool WABA_started = false;
    727 #endif
    728 
    729455void HDTVRecorder::WritePMT(ProgramMapTable* pmt)
    730456{
    731     if (pmt) {
    732         int next = (pmt->tsheader()->ContinuityCounter()+1)&0xf;
    733         pmt->tsheader()->SetContinuityCounter(next);
     457    if (!pmt)
     458        return;
    734459
    735 #if WHACK_A_BUG_VIDEO
    736         WABV_wait_a_while--;
    737         if (WABV_wait_a_while<=0) {
    738             WABV_started = true;
    739             WABV_wait_a_while = WABV_WAIT;
    740             WABV_base_pid = (((WABV_base_pid-0x100)+1)%32)+0x100;
    741             if (StreamID::MPEG2Video != StreamData()->PMT()->StreamType(0))
    742             {
    743                 VERBOSE(VB_IMPORTANT, "HDTVRecorder::WritePMT(): Error,"
    744                         "Whack a Bug can not rewrite PMT, wrong stream type");
    745             }
    746             else
    747             {
    748                 VERBOSE(VB_IMPORTANT, QString("Whack a Bug: new video pid %1").
    749                         arg(WABV_base_pid));
    750                 // rewrite video pid
    751                 const uint old_video_pid=StreamData()->PMT()->StreamPID(0);
    752                 StreamData()->PMT()->SetStreamPID(0, WABV_base_pid);
    753                 if (StreamData()->PMT()->PCRPID() == old_video_pid)
    754                     StreamData()->PMT()->SetPCRPID(WABV_base_pid);
    755                 StreamData()->PMT()->SetCRC(StreamData()->PMT()->CalcCRC());
    756                 VERBOSE(VB_IMPORTANT, StreamData()->PMT()->toString());
    757             }
    758         }
    759 #endif
    760 #if WHACK_A_BUG_AUDIO
    761         WABA_wait_a_while--;
    762         if (WABA_wait_a_while<=0) {
    763             WABA_started = true;
    764             WABA_wait_a_while = WABA_WAIT;
    765             WABA_base_pid = (((WABA_base_pid-0x200)+1)%32)+0x200;
    766             VERBOSE(VB_IMPORTANT, QString("Whack a Bug: new audio BASE pid %1").arg(WABA_base_pid));
    767             // rewrite audio pids
    768             for (uint i=0; i<StreamData()->PMT()->StreamCount(); i++) {
    769                 if (StreamID::MPEG2Audio == StreamData()->PMT()->StreamType(i) ||
    770                     StreamID::MPEG2Audio == StreamData()->PMT()->StreamType(i)) {
    771                     const uint old_audio_pid = StreamData()->PMT()->StreamPID(i);
    772                     const uint new_audio_pid = WABA_base_pid + old_audio_pid;
    773                     StreamData()->PMT()->SetStreamPID(i, new_audio_pid);
    774                     if (StreamData()->PMT()->PCRPID() == old_audio_pid)
    775                         StreamData()->PMT()->SetPCRPID(new_audio_pid);
    776                     StreamData()->PMT()->SetCRC(StreamData()->PMT()->CalcCRC());
    777                     VERBOSE(VB_IMPORTANT, StreamData()->PMT()->toString());
    778                 }
    779             }
    780         }
    781 #endif
    782 
    783         ringBuffer->Write(pmt->tsheader()->data(), TSPacket::SIZE);
    784     }
     460    int next = (pmt->tsheader()->ContinuityCounter()+1)&0xf;
     461    pmt->tsheader()->SetContinuityCounter(next);
     462    ringBuffer->Write(pmt->tsheader()->data(), TSPacket::SIZE);
    785463}
    786464
    787465/** \fn HDTVRecorder::ProcessMGT(const MasterGuideTable*)
     
    820498        {
    821499            if (vct->ProgramNumber(i) != (uint)StreamData()->DesiredProgram())
    822500            {
    823                 VERBOSE(VB_RECORD,
    824                         QString("Resetting desired program from %1"
    825                                 " to %2")
     501                VERBOSE(VB_RECORD, LOC_ERR +
     502                        QString("Resetting desired program from %1 to %2")
    826503                        .arg(StreamData()->DesiredProgram())
    827504                        .arg(vct->ProgramNumber(i)));
    828505                // Do a (partial?) reset here if old desired
     
    834511    }
    835512    if (!found)
    836513    {
    837         VERBOSE(VB_IMPORTANT,
     514        VERBOSE(VB_IMPORTANT, LOC_ERR +
    838515                QString("Desired channel %1_%2 not found;"
    839516                        " using %3_%4 instead.")
    840517                .arg(StreamData()->DesiredMajorChannel())
    841518                .arg(StreamData()->DesiredMinorChannel())
    842519                .arg(vct->MajorChannel(0))
    843                 .arg(vct->MinorChannel(0)));
    844         VERBOSE(VB_IMPORTANT, vct->toString());
     520                .arg(vct->MinorChannel(0)) + "\n" + vct->toString());
     521
    845522        StreamData()->SetDesiredProgram(vct->ProgramNumber(0));
    846523    }
    847524}
     
    854531    if (_wait_for_keyframe && !_keyframe_seen)
    855532        return;
    856533
    857 #if WHACK_A_BUG_VIDEO
    858     if (WABV_started)
    859         ((TSPacket*)(tspacket))->SetPID(WABV_base_pid);
    860 #endif
    861 
    862534    ringBuffer->Write(tspacket->data(), TSPacket::SIZE);
    863535}
    864536
     
    868540    if (_wait_for_keyframe && !_keyframe_seen)
    869541        return;
    870542
    871 #if WHACK_A_BUG_AUDIO
    872     if (WABA_started)
    873         ((TSPacket*)(tspacket))->SetPID(WABA_base_pid+tspacket->PID());
    874 #endif
    875 
    876543    ringBuffer->Write(tspacket->data(), TSPacket::SIZE);
    877544}
    878545
     
    902569    return ok;
    903570}
    904571
    905 int HDTVRecorder::ProcessData(unsigned char *buffer, int len)
     572uint HDTVRecorder::ProcessDataTS(unsigned char *buffer, uint len)
    906573{
    907     int pos = 0;
     574    if (len < TSPacket::SIZE)
     575        return len;
    908576
    909     while (pos + 187 < len) // while we have a whole packet left
     577    uint pos = 0;
     578    uint end = len - TSPacket::SIZE;
     579    while (pos <= end) // while we have a whole packet left
    910580    {
    911581        if (buffer[pos] != SYNC_BYTE)
    912582        {
    913583            _resync_count++;
    914             if (25 == _resync_count)
    915                 VERBOSE(VB_RECORD, QString("Resyncing many of times, suppressing error messages"));
     584
     585            if (25 == _resync_count)
     586            {
     587                VERBOSE(VB_RECORD, LOC + "Resyncing many of times, "
     588                        "suppressing error messages");
     589            }
    916590            else if (25 > _resync_count)
    917                 VERBOSE(VB_RECORD, QString("Resyncing"));
     591            {
     592                VERBOSE(VB_RECORD, LOC + "Resyncing");
     593            }
     594
    918595            int newpos = ResyncStream(buffer, pos, len);
    919596            if (newpos == -1)
    920597                return len - pos;
     
    925602        }
    926603
    927604        const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[pos]);
    928         if (ProcessTSPacket(*pkt)) {
    929             pos += TSPacket::SIZE; // Advance to next TS packet
     605        if (ProcessTSPacket(*pkt))
     606        {
     607            // Advance to next TS packet
     608            pos += TSPacket::SIZE;
     609
     610            // Take care of statistics
    930611            _ts_stats.IncrTSPacketCount();
    931             if (0 == _ts_stats.TSPacketCount()%1000000)
    932                 VERBOSE(VB_RECORD, _ts_stats.toString());
    933         } else // Let it resync in case of dropped bytes
    934             buffer[pos] = SYNC_BYTE+1;
     612            if (0 == _ts_stats.TSPacketCount() % 1000000)
     613                VERBOSE(VB_RECORD, LOC + "\n" + _ts_stats.toString());
     614
     615        }
     616        else
     617        {
     618            pos++; // Resync on invalid packet, in case of dropped bytes...
     619        }
    935620    }
    936621
    937622    return len - pos;
     
    939624
    940625void HDTVRecorder::Reset(void)
    941626{
    942     VERBOSE(VB_RECORD, "HDTVRecorder::Reset(void)");
     627    VERBOSE(VB_RECORD, LOC + "Reset(void)");
    943628    DTVRecorder::Reset();
    944629
    945630    _error = false;
     
    947632    _ts_stats.Reset();
    948633
    949634    if (curRecording)
    950     {
    951635        curRecording->ClearPositionMap(MARK_GOP_BYFRAME);
     636
     637    if (!IsOpen())
     638        return /* true */;
     639
     640    if (!IsPaused())
     641    {
     642        Pause();
     643        WaitForPause();
    952644    }
    953645
    954     if (_stream_fd >= 0)
     646    if (!Close())
     647        return /* false */;
     648
     649    if (Open())
    955650    {
    956         if (!IsPaused())
    957         {
    958             Pause();
    959             WaitForPause();
    960         }
    961         int ret = close(_stream_fd);
    962         if (ret < 0)
    963         {
    964             perror("close");
    965             return;
    966         }
    967 #if FAKE_VIDEO
    968         // open file instead of device
    969         _stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
    970         VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
    971         fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
    972 #else
    973         _stream_fd = open(videodevice.ascii(), O_RDWR);
    974 #endif
    975         if (_stream_fd < 0)
    976         {
    977             VERBOSE(VB_IMPORTANT, QString("HD1 Can't open video device: %1 chanfd = %2").
    978                     arg(videodevice).arg(_stream_fd));
    979             perror("open video");
    980             return;
    981         }
    982         else
    983         {
    984             pthread_mutex_lock(&ringbuf.lock);
    985             ringbuf.used = 0;
    986             ringbuf.max_used = 0;
    987             ringbuf.readPtr = ringbuf.writePtr = ringbuf.buffer;
    988             pthread_mutex_unlock(&ringbuf.lock);
    989         }
     651        _drb->Reset(videodevice, _stream_fd);
    990652        Unpause();
     653        return /* true */;
    991654    }
     655
     656    VERBOSE(VB_IMPORTANT, LOC_ERR + "Couldn't open video device: " +
     657            QString("'%1'").arg(videodevice) + ENO);
     658    return /* false */;
    992659}
  • libs/libmythtv/libmythtv.pro

     
    206206
    207207    # TVRec & Recorder base classes
    208208    HEADERS += tv_rec.h
    209     HEADERS += recorderbase.h
     209    HEADERS += recorderbase.h              DeviceReadBuffer.h
    210210    HEADERS += dtvrecorder.h               dummydtvrecorder.h
    211211    SOURCES += tv_rec.cpp
    212     SOURCES += recorderbase.cpp
     212    SOURCES += recorderbase.cpp            DeviceReadBuffer.cpp
    213213    SOURCES += dtvrecorder.cpp             dummydtvrecorder.cpp
    214214
    215215    # MPEG parsing stuff
  • libs/libmythtv/dvbrecorder.h

     
     1// -*- Mode: c++ -*-
    12/*
    23 *  Copyright (C) Kenneth Aafloy 2003
    34 * 
     
    1718#include "dtvrecorder.h"
    1819#include "tspacket.h"
    1920#include "transform.h"
     21#include "DeviceReadBuffer.h"
    2022
    2123#include "dvbtypes.h"
    2224#include "dvbchannel.h"
     
    2527class ProgramAssociationTable;
    2628class ProgramMapTable;
    2729
     30class PIDInfo
     31{
     32  public:
     33    PIDInfo() :
     34        filter_fd(-1),  continuityCount(0xFF), ip(NULL),
     35        isVideo(false), isEncrypted(false),    payloadStartSeen(false) {;}
     36
     37    int    filter_fd;         ///< Input filter file descriptor
     38    uint   continuityCount;   ///< last Continuity Count (sentinel 0xFF)
     39    ipack *ip;                ///< TS->PES converter
     40    bool   isVideo;
     41    bool   isEncrypted;       ///< true if PID is marked as encrypted
     42    bool   payloadStartSeen;  ///< true if payload start packet seen on PID
     43
     44    inline void Close(void);
     45    inline bool CheckCC(uint cc);
     46};
     47typedef QMap<uint,PIDInfo*> PIDInfoMap;
     48
    2849/** \class DVBRecorder
    2950 *  \brief This is a specialization of DTVRecorder used to
    3051 *         handle streams from DVB drivers.
    3152 *
    3253 *  \sa DTVRecorder, HDTVRecorder
    3354 */
    34 class DVBRecorder: public DTVRecorder
     55class DVBRecorder: public DTVRecorder, private ReaderPausedCB
    3556{
    3657    Q_OBJECT
    3758  public:
     
    4768
    4869    void StartRecording(void);
    4970    void Reset(void);
     71    void StopRecording(void);
    5072
    5173    bool Open(void);
     74    bool IsOpen(void) const { return _stream_fd >= 0; }
    5275    void Close(void);
    5376
    5477    bool RecordsTransportStream(void) const
     
    6386
    6487  private:
    6588    void TeardownAll(void);
    66     void ReadFromDMX(void);
    67     static void ProcessDataPS(unsigned char *buffer, int len, void *priv);
    68     void LocalProcessDataPS(unsigned char *buffer, int len);
    69     void LocalProcessDataTS(unsigned char *buffer, int len);
    7089
     90    bool Poll(void) const;
     91
     92    uint ProcessDataTS(unsigned char *buffer, uint len);
     93    bool ProcessTSPacket(const TSPacket& tspacket);
     94
     95    void AutoPID(void);
     96    bool OpenFilters(void);
    7197    void CloseFilters(void);
    7298    void OpenFilter(uint pid, ES_Type type, dmx_pes_type_t pes_type,
    7399                    uint mpeg_stream_type);
    74     bool SetDemuxFilters(void);
    75     void AutoPID(void);
    76100
    77101    void SetPAT(ProgramAssociationTable*);
    78102    void SetPMT(ProgramMapTable*);
    79103
    80104    void CreatePAT(void);
    81105    void CreatePMT(void);
     106    void WritePATPMT(void);
    82107
    83108    void DebugTSHeader(unsigned char* buffer, int len);
    84109
     110    void ReaderPaused(int fd);
     111    bool PauseAndWait(int timeout = 100);
     112
     113    ipack *CreateIPack(ES_Type type);
     114    void ProcessDataPS(unsigned char *buffer, uint len);
     115    static void process_data_ps_cb(unsigned char*,int,void*);
     116
     117    DeviceReadBuffer *_drb;
     118
    85119    // Options set in SetOption()
    86120    int             _card_number_option;
    87121    bool            _record_transport_stream_option;
     
    90124    // DVB stuff
    91125    DVBChannel*     dvbchannel;
    92126
     127    // general recorder stuff
     128    /// Set when we want to generate a new filter set
     129    bool            _reset_pid_filters;
     130    QMutex          _pid_lock;
     131    PIDInfoMap      _pid_infos;
     132
    93133    // PS recorder stuff
    94134    int             _ps_rec_audio_id;
    95135    int             _ps_rec_video_id;
    96136    unsigned char   _ps_rec_buf[3];
    97     pid_ipack_t     _ps_rec_pid_ipack;
    98137
    99138    // TS recorder stuff
    100139    ProgramAssociationTable *_pat;
    101140    ProgramMapTable         *_pmt;
    102141    uint            _next_pmt_version;
    103142    uint            _ts_packets_until_psip_sync;
    104     QMap<uint,bool> _payload_start_seen;
    105     QMap<uint,bool> _videoPID;
    106143
    107144    // Input Misc
    108145    /// PMT on input side
    109146    PMTObject       _input_pmt;
    110     /// Input filter file descriptors
    111     vector<int>     _pid_filters;
    112     /// Input polling structure for _stream_fd
    113     struct pollfd   _polls;
    114     /// Set when we want to generate a new filter set
    115     bool            _reset_pid_filters;
    116     /// Encrypted PID, so we can drop these
    117     QMap<uint,bool> _encrypted_pid;
    118147
    119     // locking
    120     QMutex          _pid_lock;
    121 
    122148    // Statistics
    123     uint            _continuity_error_count;
    124     uint            _stream_overflow_count;
    125     uint            _bad_packet_count;
    126     QMap<uint,int>  _continuity_count;
     149    mutable uint        _continuity_error_count;
     150    mutable uint        _stream_overflow_count;
     151    mutable uint        _bad_packet_count;
     152    mutable MythTimer   _poll_timer;
    127153
    128     // For debugging
    129     bool data_found; ///< debugging variable used by transform.c
    130     bool keyframe_found;
    131 
    132154    // Constants
    133155    static const int PMT_PID;
    134156    static const int TSPACKETS_BETWEEN_PSIP_SYNC;
     
    136158    static const int POLL_WARNING_TIMEOUT;
    137159};
    138160
     161inline void PIDInfo::Close(void)
     162{
     163    if (filter_fd >= 0)
     164        close(filter_fd);
     165
     166    if (ip)
     167    {
     168        free_ipack(ip);
     169        free(ip);
     170    }
     171}
     172
     173inline bool PIDInfo::CheckCC(uint new_cnt)
     174{
     175    if (continuityCount == 0xFF)
     176    {
     177        continuityCount = new_cnt;
     178        return true;
     179    }
     180
     181    continuityCount = (continuityCount+1) & 0xf;
     182    if (continuityCount == new_cnt)
     183        return true;
     184   
     185    continuityCount = new_cnt;
     186    return false;
     187}
     188
    139189#endif
  • libs/libmythtv/DeviceReadBuffer.cpp

     
     1#include <algorithm>
     2#include <cassert>
     3
     4#include "DeviceReadBuffer.h"
     5#include "mythcontext.h"
     6#include "tspacket.h"
     7
     8#define REPORT_RING_STATS 1
     9
     10#define LOC QString("DevRdB(%1): ").arg(videodevice)
     11#define LOC_ERR QString("DevRdB(%1) Error: ").arg(videodevice)
     12
     13DeviceReadBuffer::DeviceReadBuffer(ReaderPausedCB *cb, bool use_poll)
     14    : videodevice(QString::null),   _stream_fd(-1),
     15      readerPausedCB(cb),
     16
     17      // Data for managing the device ringbuffer
     18      run(false),                   running(false),
     19      eof(false),                   error(false),
     20      request_pause(false),         paused(false),
     21      using_poll(use_poll),
     22
     23      size(0),                      used(0),
     24      dev_read_size(0),             min_read(0),
     25
     26      buffer(NULL),                 readPtr(NULL),
     27      writePtr(NULL),               endPtr(NULL),
     28
     29      // statistics
     30      max_used(0),                  avg_used(0),
     31      avg_cnt(0)
     32{
     33}
     34
     35DeviceReadBuffer::~DeviceReadBuffer()
     36{
     37}
     38
     39bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd)
     40{
     41    QMutexLocker locker(&lock);
     42
     43    if (buffer)
     44        delete[] buffer;
     45
     46    videodevice   = streamName;
     47    _stream_fd    = streamfd;
     48
     49    // Setup device ringbuffer
     50    eof           = false;
     51    error         = false;
     52    request_pause = false;
     53    paused        = false;
     54
     55    size          = gContext->GetNumSetting("HDRingbufferSize",
     56                                            50 * TSPacket::SIZE) * 1024;
     57    used          = 0;
     58    dev_read_size = TSPacket::SIZE * (using_poll ? 256 : 48);
     59    min_read      = TSPacket::SIZE * 4;
     60
     61    buffer        = new unsigned char[size + TSPacket::SIZE];
     62    readPtr       = buffer;
     63    writePtr      = buffer;
     64    endPtr        = buffer + size;
     65
     66    // Initialize buffer, if it exists
     67    if (!buffer)
     68        return false;
     69    memset(buffer, 0xFF, size + TSPacket::SIZE);
     70
     71    // Initialize statistics
     72    max_used      = 0;
     73    avg_used      = 0;
     74    avg_cnt       = 0;
     75    lastReport.start();
     76
     77    VERBOSE(VB_RECORD, LOC + QString("buffer size %1 KB").arg(size/1024));
     78
     79    return true;
     80}
     81
     82void DeviceReadBuffer::Teardown(void)
     83{
     84    if (buffer)
     85        delete[] buffer;
     86    buffer = NULL;
     87}
     88
     89void DeviceReadBuffer::Start(void)
     90{
     91    lock.lock();
     92    bool was_running = running;
     93    lock.unlock();
     94    if (was_running)
     95    {
     96        VERBOSE(VB_IMPORTANT, LOC_ERR + "Start(): Already running.");
     97        SetRequestPause(false);
     98        return;
     99    }
     100
     101    pthread_create(&thread, NULL, boot_ringbuffer, this);
     102}
     103
     104void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
     105{
     106    QMutexLocker locker(&lock);
     107
     108    videodevice   = streamName;
     109    _stream_fd    = streamfd;
     110
     111    used          = 0;
     112    readPtr       = buffer;
     113    writePtr      = buffer;
     114}
     115
     116void DeviceReadBuffer::Stop(void)
     117{
     118    lock.lock();
     119    bool was_running = running;
     120    run = false;
     121    lock.unlock();
     122
     123    if (!was_running)
     124    {
     125        VERBOSE(VB_IMPORTANT, LOC_ERR + "Stop(): Not running.");
     126        return;
     127    }
     128
     129    pthread_join(thread, NULL);
     130}
     131
     132void DeviceReadBuffer::SetRequestPause(bool req)
     133{
     134    QMutexLocker locker(&lock);
     135    request_pause = req;
     136}
     137
     138void DeviceReadBuffer::SetPaused(bool val)
     139{
     140    lock.lock();
     141    paused = val;
     142    lock.unlock();
     143    if (val)
     144        pauseWait.wakeAll();
     145    else
     146        unpauseWait.wakeAll();
     147}
     148
     149bool DeviceReadBuffer::IsPaused(void) const
     150{
     151    QMutexLocker locker(&lock);
     152    return paused;
     153}
     154
     155bool DeviceReadBuffer::WaitForUnpause(int timeout)
     156{
     157    if (IsPaused())
     158        unpauseWait.wait(timeout);
     159    return IsPaused();
     160}
     161
     162bool DeviceReadBuffer::IsPauseRequested(void) const
     163{
     164    QMutexLocker locker(&lock);
     165    return request_pause;
     166}
     167
     168uint DeviceReadBuffer::GetUnused(void) const
     169{
     170    QMutexLocker locker(&lock);
     171    return size - used;
     172}
     173
     174uint DeviceReadBuffer::GetUsed(void) const
     175{
     176    QMutexLocker locker(&lock);
     177    return used;
     178}
     179
     180uint DeviceReadBuffer::GetContiguousUnused(void) const
     181{
     182    QMutexLocker locker(&lock);
     183    return endPtr - writePtr;
     184}
     185
     186void DeviceReadBuffer::IncrWritePointer(uint len)
     187{
     188    QMutexLocker locker(&lock);
     189    used     += len;
     190    writePtr += len;
     191    writePtr  = (writePtr == endPtr) ? buffer : writePtr;
     192#ifdef REPORT_RING_STATS
     193    max_used = max(used, max_used);
     194    avg_used = ((avg_used * avg_cnt) + used) / ++avg_cnt;
     195#endif
     196}
     197
     198void DeviceReadBuffer::IncrReadPointer(uint len)
     199{
     200    QMutexLocker locker(&lock);
     201    used    -= len;
     202    readPtr += len;
     203    readPtr  = (readPtr == endPtr) ? buffer : readPtr;
     204    assert(readPtr <= endPtr);
     205}
     206
     207void *DeviceReadBuffer::boot_ringbuffer(void *arg)
     208{
     209    ((DeviceReadBuffer*) arg)->fill_ringbuffer();
     210    return NULL;
     211}
     212
     213void DeviceReadBuffer::fill_ringbuffer(void)
     214{
     215    uint      errcnt = 0;
     216
     217    lock.lock();
     218    run     = true;
     219    running = true;
     220    lock.unlock();
     221
     222    while (run)
     223    {
     224        if (!HandlePausing())
     225            continue;
     226
     227        if (!IsOpen())
     228        {
     229            usleep(10000);
     230            continue;
     231        }
     232
     233        if (using_poll && !Poll())
     234            continue;
     235
     236        // Limit read size for faster return from read
     237        size_t read_size = min(dev_read_size, WaitForUnused(TSPacket::SIZE));
     238
     239        // if read_size > 0 do the read...
     240        if (read_size)
     241        {
     242            ssize_t len = read(_stream_fd, writePtr, read_size);
     243            if (!CheckForErrors(len, errcnt))
     244            {
     245                if (errcnt > 5)
     246                    break;
     247                else
     248                    continue;
     249            }
     250            errcnt = 0;
     251            IncrWritePointer(len);
     252        }
     253    }
     254
     255    lock.lock();
     256    running = false;
     257    lock.unlock();
     258}
     259
     260bool DeviceReadBuffer::HandlePausing(void)
     261{
     262    if (IsPauseRequested())
     263    {
     264        SetPaused(true);
     265
     266        if (readerPausedCB)
     267            readerPausedCB->ReaderPaused(_stream_fd);
     268
     269        usleep(5000);
     270        return false;
     271    }
     272    else if (IsPaused())
     273    {
     274        Reset(videodevice, _stream_fd);
     275        SetPaused(false);
     276    }
     277    return true;
     278}
     279
     280bool DeviceReadBuffer::Poll(void) const
     281{
     282    bool retval = true;
     283    while (true)
     284    {
     285        struct pollfd polls;
     286        polls.fd      = _stream_fd;
     287        polls.events  = POLLIN;
     288        polls.revents = 0;
     289
     290        int ret = poll(&polls, 1 /*number of polls*/, 10 /*msec*/);
     291        if (IsPauseRequested() || !IsOpen() || !run)
     292        {
     293            retval = false;
     294            break; // are we supposed to pause, stop, etc.
     295        }
     296
     297        if (ret > 0)
     298            break; // we have data to read :)
     299        if ((-1 == ret) && (EOVERFLOW == errno))
     300            break; // we have an error to handle
     301
     302        if ((-1 == ret) && ((EAGAIN == errno) || (EINTR  == errno)))
     303            continue; // errors that tell you to try again
     304        if (ret == 0)
     305            continue; // timed out, try again
     306
     307        usleep(2500);
     308    }
     309    return retval;
     310}
     311
     312bool DeviceReadBuffer::CheckForErrors(ssize_t len, uint &errcnt)
     313{
     314    if (len < 0)
     315    {
     316        if (EINTR == errno)
     317            return false;
     318        if (EAGAIN == errno)
     319        {
     320            usleep(2500);
     321            return false;
     322        }
     323        if (EOVERFLOW == errno)
     324        {
     325            VERBOSE(VB_IMPORTANT, LOC_ERR + "Driver buffers overflowed");
     326            return false;
     327        }
     328
     329        VERBOSE(VB_IMPORTANT, LOC_ERR +
     330                QString("Problem reading fd(%1)").arg(_stream_fd) + ENO);
     331
     332        if (++errcnt > 5)
     333        {
     334            lock.lock();
     335            error = true;
     336            lock.unlock();
     337            return false;
     338        }
     339
     340        usleep(500);
     341        return false;
     342    }
     343    else if (len == 0)
     344    {
     345        if (++errcnt > 5)
     346        {
     347            VERBOSE(VB_IMPORTANT, LOC +
     348                    QString("End-Of-File? fd(%1)").arg(_stream_fd));
     349
     350            lock.lock();
     351            eof = true;
     352            lock.unlock();
     353
     354            return false;
     355        }
     356        usleep(500);
     357        return false;
     358    }
     359    return true;
     360}
     361
     362/** \fn DeviceReadBuffer::Read(unsigned char*, uint)
     363 *  \brief Try to Read count bytes from into buffer
     364 *  \param buffer Buffer to put data in
     365 *  \param count  Number of bytes to attempt to read
     366 *  \return number of bytes actually read
     367 */
     368uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
     369{
     370    uint avail = WaitForUsed(min(count, min_read));
     371    size_t cnt = min(count, avail);
     372
     373    if (!cnt)
     374        return 0;
     375
     376    if (readPtr + cnt > endPtr)
     377    {
     378        // Process as two pieces
     379        size_t len = endPtr - readPtr;
     380        if (len)
     381        {
     382            memcpy(buf, readPtr, len);
     383            buf += len;
     384            IncrReadPointer(len);
     385        }
     386        if (cnt > len)
     387        {
     388            len = cnt - len;
     389            memcpy(buf, readPtr, len);
     390            IncrReadPointer(len);
     391        }
     392    }
     393    else
     394    {
     395        memcpy(buf, readPtr, cnt);
     396        IncrReadPointer(cnt);
     397    }
     398
     399    ReportStats();
     400
     401    return cnt;
     402}
     403
     404/// \return bytes available for writing
     405uint DeviceReadBuffer::WaitForUnused(uint needed) const
     406{
     407    size_t unused = GetUnused();
     408    size_t contig = GetContiguousUnused();
     409
     410    if (contig > TSPacket::SIZE)
     411    {
     412        while (unused < needed)
     413        {
     414            unused = GetUnused();
     415            if (IsPauseRequested() || !IsOpen() || !run)
     416                return 0;
     417            usleep(5000);
     418        }
     419        if (IsPauseRequested() || !IsOpen() || !run)
     420            return 0;
     421        contig = GetContiguousUnused();
     422    }
     423
     424    return min(contig, unused);
     425}
     426
     427/// \return bytes available for reading
     428uint DeviceReadBuffer::WaitForUsed(uint needed) const
     429{
     430    size_t avail = GetUsed();
     431    while (needed > avail)
     432    {
     433        {
     434            QMutexLocker locker(&lock);
     435            avail = used;
     436            if (request_pause || error || eof)
     437                return 0;
     438        }
     439        usleep(5000);
     440    }
     441    return avail;
     442}
     443
     444void DeviceReadBuffer::ReportStats(void)
     445{
     446#ifdef REPORT_RING_STATS
     447    if (lastReport.elapsed() > 20*1000 /* msg every 20 seconds */)
     448    {
     449        QMutexLocker locker(&lock);
     450        double rsize = 100.0 / size;
     451        QString msg  = QString("fill avg(%1%) ").arg(avg_used*rsize,3,'f',0);
     452        msg         += QString("fill max(%2%) ").arg(max_used*rsize,3,'f',0);
     453        msg         += QString("samples(%3)").arg(avg_cnt);
     454
     455        avg_used    = 0;
     456        avg_cnt     = 0;
     457        max_used    = 0;
     458        lastReport.start();
     459
     460        VERBOSE(VB_IMPORTANT, LOC + msg);
     461    }
     462#endif
     463}