Ticket #712: drb-v2.patch

File drb-v2.patch, 84.5 KB (added by danielk, 20 years ago)

updated to head, fixes some dvbrecorder issues, breaks hdtvrecorder

  • libs/libmythtv/dvbrecorder.cpp

     
    7272const int DVBRecorder::POLL_INTERVAL        =  50; // msec
    7373const int DVBRecorder::POLL_WARNING_TIMEOUT = 500; // msec
    7474
     75#define USE_DRB
     76#ifndef USE_DRB
     77static ssize_t safe_read(int fd, unsigned char *buf, size_t bufsize);
     78#endif // USE_DRB
     79
    7580#define LOC      QString("DVBRec(%1): ").arg(_card_number_option)
    7681#define LOC_WARN QString("DVBRec(%1) Warning: ").arg(_card_number_option)
    7782#define LOC_ERR  QString("DVBRec(%1) Error: ").arg(_card_number_option)
    7883
    7984DVBRecorder::DVBRecorder(TVRec *rec, DVBChannel* advbchannel)
    8085    : DTVRecorder(rec, "DVBRecorder"),
     86      _drb(NULL),
    8187      // Options set in SetOption()
    8288      _card_number_option(0), _record_transport_stream_option(false),
    8389      _hw_decoder_option(false),
    8490      // DVB stuff
    8591      dvbchannel(advbchannel),
     92      _reset_pid_filters(true),
     93      _pid_lock(true),
    8694      // PS recorder stuff
    8795      _ps_rec_audio_id(0xC0), _ps_rec_video_id(0xE0),
    8896      // Output stream info
    8997      _pat(NULL), _pmt(NULL), _next_pmt_version(0),
    9098      _ts_packets_until_psip_sync(0),
    91       // Input Misc
    92       _reset_pid_filters(true),
    93       // Locking
    94       _pid_lock(true),
    9599      // Statistics
    96100      _continuity_error_count(0), _stream_overflow_count(0),
    97101      _bad_packet_count(0)
    98102{
    99103    bzero(_ps_rec_buf, sizeof(unsigned char) * 3);
    100104
    101     bzero(&_polls, sizeof(struct pollfd));
    102     _polls.fd = _stream_fd;
     105#ifdef USE_DRB
     106    _drb = new DeviceReadBuffer(this);
     107    _buffer_size = (1024*1024 / TSPacket::SIZE) * TSPacket::SIZE;
     108#else
     109    _buffer_size = (4*1024*1024 / TSPacket::SIZE) * TSPacket::SIZE;
     110#endif
    103111
    104     _buffer_size = (4*1024*1024 / MPEG_TS_PKT_SIZE) * MPEG_TS_PKT_SIZE;
    105112    _buffer = new unsigned char[_buffer_size];
    106113    bzero(_buffer, _buffer_size);
    107114}
     
    113120
    114121void DVBRecorder::TeardownAll(void)
    115122{
    116     if (_stream_fd >= 0)
     123    // Make SURE that the device read thread is cleaned up -- John Poet
     124    StopRecording();
     125
     126    if (IsOpen())
    117127        Close();
    118128
    119129    if (_buffer)
     
    122132        _buffer = NULL;
    123133    }
    124134
     135    if (_drb)
     136    {
     137        delete _drb;
     138        _drb = NULL;
     139    }
     140
    125141    SetPAT(NULL);
    126142    SetPMT(NULL);
    127143}
     
    177193
    178194bool DVBRecorder::Open(void)
    179195{
    180     if (_stream_fd >= 0)
     196    if (IsOpen())
    181197    {
    182198        VERBOSE(VB_GENERAL, LOC_WARN + "Card already open");
    183199        return true;
     
    185201
    186202    _stream_fd = open(dvbdevice(DVB_DEV_DVR,_card_number_option),
    187203                      O_RDONLY | O_NONBLOCK);
    188     if (_stream_fd < 0)
     204    if (!IsOpen())
    189205    {
    190206        VERBOSE(VB_IMPORTANT, LOC_ERR + "Failed to open DVB device" + ENO);
    191207        return false;
    192208    }
    193209
    194     _polls.fd = _stream_fd;
    195     _polls.events = POLLIN;
    196     _polls.revents = 0;
     210#ifdef USE_DRB
     211    if (_drb)
     212        _drb->Reset(videodevice, _stream_fd);
     213#endif
    197214
    198215    connect(dvbchannel, SIGNAL(UpdatePMTObject(const PMTObject *)),
    199216            this, SLOT(SetPMTObject(const PMTObject *)));
    200217
    201     VERBOSE(VB_RECORD, LOC +
    202             QString("Card opened successfully (using %1 mode).")
     218    VERBOSE(VB_RECORD, LOC + QString("Card opened successfully fd(%1)")
     219            .arg(_stream_fd) + QString(" (using %2 mode).")
    203220            .arg(_record_transport_stream_option ? "TS" : "PS"));
    204221
    205222    dvbchannel->RecorderStarted();
     
    211228{
    212229    VERBOSE(VB_RECORD, LOC + "Close() fd("<<_stream_fd<<") -- begin");
    213230
    214     if (_stream_fd < 0)
    215         return;
     231    if (IsOpen())
     232    {
    216233
    217     CloseFilters();
     234#ifdef USE_DRB
     235    if (_drb)
     236        _drb->Reset(videodevice, -1);
     237#endif
    218238
    219     if (_stream_fd >= 0)
     239        CloseFilters();
    220240        close(_stream_fd);
     241        _stream_fd = -1;
     242    }
    221243
    222     _stream_fd = -1;
    223     _polls.fd = -1;
    224 
    225244    VERBOSE(VB_RECORD, LOC + "Close() fd("<<_stream_fd<<") -- end");
    226245}
    227246
    228247void DVBRecorder::CloseFilters(void)
    229248{
    230249    QMutexLocker change_lock(&_pid_lock);
    231     for(unsigned int i=0; i<_pid_filters.size(); i++)
    232         if (_pid_filters[i] >= 0)
    233             close(_pid_filters[i]);
    234     _pid_filters.clear();
    235250
    236     pid_ipack_t::iterator iter = _ps_rec_pid_ipack.begin();
    237     for (;iter != _ps_rec_pid_ipack.end(); iter++)
     251    PIDInfoMap::iterator it;
     252    for (it = _pid_infos.begin(); it != _pid_infos.end(); ++it)
    238253    {
    239         if ((*iter).second != NULL)
    240         {
    241             free_ipack((*iter).second);
    242             free((void*)(*iter).second);
    243         }
     254        (*it)->Close();
     255        delete *it;
    244256    }
    245     _ps_rec_pid_ipack.clear();
     257    _pid_infos.clear();
    246258}
    247259
    248260void DVBRecorder::OpenFilter(uint           pid,
     
    284296        usecs /= 2;
    285297    }
    286298    VERBOSE(VB_RECORD, LOC + "Set demux buffer size for " +
    287             QString("pid 0x%1 to %2,\n\t\t\t which gives us a %3 msec buffer.")
     299            QString("pid 0x%1 to %2,\n\t\t\twhich gives us a %3 msec buffer.")
    288300            .arg(pid,0,16).arg(sz).arg(usecs/1000));
    289301
    290302    // Set the filter type
     
    303315        return;
    304316    }
    305317
     318    PIDInfo *info = new PIDInfo();
     319    // Set isVideo based on stream type
     320    info->isVideo = StreamID::IsVideo(stream_type);
    306321    // Add the file descriptor to the filter list
    307     QMutexLocker change_lock(&_pid_lock);
    308     _pid_filters.push_back(fd_tmp);
     322    info->filter_fd = fd_tmp;
    309323
    310     // Initialize continuity count
    311     _continuity_count[pid] = -1;
    312     if (_record_transport_stream_option)
    313     {
    314         //Set the TS->PES converter to NULL
    315         _ps_rec_pid_ipack[pid] = NULL;
    316         return;
    317     }
    318 
    319324    // If we are in legacy PES mode, initialize TS->PES converter
    320     ipack* ip = (ipack*)malloc(sizeof(ipack));
    321     assert(ip);
    322     switch (type)
    323     {
    324         case ES_TYPE_VIDEO_MPEG1:
    325         case ES_TYPE_VIDEO_MPEG2:
    326             init_ipack(ip, 2048, ProcessDataPS);
    327             ip->replaceid = _ps_rec_video_id++;
    328             break;
     325    if (!_record_transport_stream_option)
     326        info->ip = CreateIPack(type);
    329327
    330         case ES_TYPE_AUDIO_MPEG1:
    331         case ES_TYPE_AUDIO_MPEG2:
    332             init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */
    333             ip->replaceid = _ps_rec_audio_id++;
    334             break;
    335 
    336         case ES_TYPE_AUDIO_AC3:
    337             init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */
    338             ip->priv_type = PRIV_TS_AC3;
    339             break;
    340 
    341         case ES_TYPE_SUBTITLE:
    342         case ES_TYPE_TELETEXT:
    343             init_ipack(ip, 65535, ProcessDataPS); /* don't repack PES */
    344             ip->priv_type = PRIV_DVB_SUB;
    345             break;
    346 
    347         default:
    348             init_ipack(ip, 2048, ProcessDataPS);
    349             break;
    350     }
    351     ip->data = (void*)this;
    352     _ps_rec_pid_ipack[pid] = ip;
     328    // Add the new info to the map
     329    QMutexLocker change_lock(&_pid_lock);   
     330    _pid_infos[pid] = info;
    353331}
    354332
    355 bool DVBRecorder::SetDemuxFilters(void)
     333bool DVBRecorder::OpenFilters(void)
    356334{
    357     QMutexLocker change_lock(&_pid_lock);
    358335    CloseFilters();
    359336
    360     _continuity_count.clear();
    361     _encrypted_pid.clear();
    362     _payload_start_seen.clear();
    363     data_found = false;
     337    QMutexLocker change_lock(&_pid_lock);
     338
    364339    _wait_for_keyframe = _wait_for_keyframe_option;
    365     keyframe_found = false;
    366340
    367341    _ps_rec_audio_id = 0xC0;
    368342    _ps_rec_video_id = 0xE0;
     
    427401                    (*es).Orig_Type);
    428402
    429403
    430     if (_pid_filters.size() == 0 && _ps_rec_pid_ipack.size() == 0)
     404    if (_pid_infos.empty())
    431405    {       
    432406        VERBOSE(VB_GENERAL, LOC_WARN +
    433407                "Recording will not commence until a PID is set.");
     
    436410    return true;
    437411}
    438412
    439 /*
    440  *  Process PMT and decide which components should be recorded
    441  */
    442 void DVBRecorder::AutoPID(void)
    443 {
    444     QMutexLocker change_lock(&_pid_lock);
    445     _videoPID.clear();
    446 
    447     VERBOSE(VB_RECORD, LOC +
    448             QString("AutoPID for MPEG Program Number(%1), PCR PID(0x%2)")
    449             .arg(_input_pmt.ServiceID).arg(((uint)_input_pmt.PCRPID),0,16));
    450 
    451     // Wanted stream types:
    452     QValueList<ES_Type> StreamTypes;
    453     StreamTypes += ES_TYPE_VIDEO_MPEG1;
    454     StreamTypes += ES_TYPE_VIDEO_MPEG2;
    455     StreamTypes += ES_TYPE_AUDIO_MPEG1;
    456     StreamTypes += ES_TYPE_AUDIO_MPEG2;
    457     StreamTypes += ES_TYPE_AUDIO_AC3;
    458     if (_record_transport_stream_option)
    459     {
    460         // The following types are only supported with TS recording
    461         StreamTypes += ES_TYPE_AUDIO_DTS;
    462         StreamTypes += ES_TYPE_AUDIO_AAC;
    463         StreamTypes += ES_TYPE_TELETEXT;
    464         StreamTypes += ES_TYPE_SUBTITLE;
    465     }
    466 
    467     QMap<ES_Type, bool> flagged;
    468     QValueList<ElementaryPIDObject>::Iterator es;
    469 
    470     if (!_hw_decoder_option)
    471     {
    472         for (es = _input_pmt.Components.begin();
    473              es != _input_pmt.Components.end(); ++es)
    474         {
    475             (*es).Record = true;
    476             flagged[(*es).Type] = true;
    477         }
    478     }
    479     else
    480     {
    481         // Wanted languages:
    482         QStringList Languages = iso639_get_language_list();
    483 
    484         for (es = _input_pmt.Components.begin();
    485              es != _input_pmt.Components.end(); ++es)
    486         {
    487             if (!StreamTypes.contains((*es).Type))
    488             {
    489                 // Type not wanted
    490                 continue;
    491             }
    492 
    493             if (((*es).Type == ES_TYPE_AUDIO_MPEG1) ||
    494                 ((*es).Type == ES_TYPE_AUDIO_MPEG2) ||
    495                 ((*es).Type == ES_TYPE_AUDIO_AC3))
    496             {
    497                 bool ignore = false;
    498                 // Check for audio descriptors
    499                 DescriptorList::Iterator dit;
    500                 for (dit = (*es).Descriptors.begin();
    501                      dit != (*es).Descriptors.end(); ++dit)
    502                 {
    503                     // Check for "Hearing impaired" or
    504                     // "Visual impaired commentary" stream
    505                     if (((*dit).Data[0] == 0x0A) &&
    506                         ((*dit).Data[5] & 0xFE == 0x02))
    507                     {
    508                         ignore = true;
    509                         break;
    510                     }
    511                 }
    512 
    513                 if (ignore)
    514                     continue; // Ignore this stream
    515             }
    516 
    517             // Limit hardware decoders to one A/V stream
    518             switch ((*es).Type)
    519             {
    520                 case ES_TYPE_VIDEO_MPEG1:
    521                 case ES_TYPE_VIDEO_MPEG2:
    522                     if (flagged.contains(ES_TYPE_VIDEO_MPEG1) ||
    523                         flagged.contains(ES_TYPE_VIDEO_MPEG2))
    524                     {
    525                         continue; // Ignore this stream
    526                     }
    527                     break;
    528 
    529                 case ES_TYPE_AUDIO_MPEG1:
    530                 case ES_TYPE_AUDIO_MPEG2:
    531                 case ES_TYPE_AUDIO_AC3:
    532                 case ES_TYPE_AUDIO_DTS:
    533                 case ES_TYPE_AUDIO_AAC:
    534                     if (flagged.contains(ES_TYPE_AUDIO_MPEG1) ||
    535                         flagged.contains(ES_TYPE_AUDIO_MPEG2) ||
    536                         flagged.contains(ES_TYPE_AUDIO_AC3)   ||
    537                         flagged.contains(ES_TYPE_AUDIO_DTS)   ||
    538                         flagged.contains(ES_TYPE_AUDIO_AAC))
    539                     {
    540                         continue; // Ignore this stream
    541                     }
    542                     break;
    543 
    544                 default:
    545                     break;
    546             }
    547             if (Languages.isEmpty() || // No specific language wanted
    548                 (*es).Language.isEmpty() || // Component has no language
    549                 Languages.contains((*es).Language)) // This language is wanted!
    550             {
    551                 (*es).Record = true;
    552                 flagged[(*es).Type] = true;
    553             }
    554         }
    555     }
    556 
    557     for (es = _input_pmt.Components.begin();
    558          es != _input_pmt.Components.end(); ++es)
    559     {
    560         if ((*es).Record)
    561         {
    562             VERBOSE(VB_RECORD, LOC +
    563                     QString("AutoPID selecting PID 0x%1, %2")
    564                     .arg((*es).PID,0,16).arg((*es).Description));
    565 
    566             switch ((*es).Type)
    567             {
    568                 case ES_TYPE_VIDEO_MPEG1:
    569                 case ES_TYPE_VIDEO_MPEG2:
    570                     _videoPID[(*es).PID] = true;
    571                     break;
    572 
    573                 default:
    574                     // Do nothing
    575                     break;
    576             }
    577         }
    578         else
    579         {
    580             VERBOSE(VB_RECORD, LOC +
    581                     QString("AutoPID skipping PID 0x%1, %2")
    582                     .arg((*es).PID,0,16).arg((*es).Description));
    583         }
    584     }
    585 
    586     VERBOSE(VB_RECORD, LOC + "AutoPID Complete - PAT/PMT Loaded for service");
    587 
    588     QString msg = (_input_pmt.FTA()) ? "unencrypted" : "ENCRYPTED";
    589     VERBOSE(VB_RECORD, LOC + "A/V Stream is " + msg);
    590 }
    591 
    592413void DVBRecorder::StartRecording(void)
    593414{
    594415    if (!Open())
     
    608429    SetPMT(NULL);
    609430    _ts_packets_until_psip_sync = 0;
    610431
    611     MythTimer t;
    612     t.start();
     432#ifdef USE_DRB
     433    bool ok = _drb->Setup(videodevice, _stream_fd);
     434    if (!ok)
     435    {
     436        VERBOSE(VB_IMPORTANT, LOC_ERR + "Failed to allocate DRB buffer");
     437        Close();
     438        _error = true;
     439        return;
     440    }
     441    _drb->Start();
     442#else
     443    _poll_timer.start();
     444#endif // USE_DRB
     445
    613446    while (_request_recording && !_error)
    614447    {
    615448        if (PauseAndWait())
    616449            continue;
    617450
    618         if (_stream_fd < 0)
     451        if (!IsOpen())
    619452        {
    620             usleep(50);
     453            usleep(5000);
    621454            continue;
    622455        }
    623456
    624457        if (_reset_pid_filters)
    625458        {
     459            _reset_pid_filters = false;
    626460            VERBOSE(VB_RECORD, LOC + "Resetting Demux Filters");
    627             if (SetDemuxFilters())
     461            if (OpenFilters())
    628462            {
    629463                CreatePAT();
    630464                CreatePMT();
    631465            }
    632             _reset_pid_filters = false;
    633466        }
    634467
    635         int ret;
    636         do
    637             ret = poll(&_polls, 1, POLL_INTERVAL);
    638         while (!request_pause && (_stream_fd >= 0) &&
    639                ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno))));
     468        if (Poll())
     469        {
     470#ifndef USE_DRB
     471            ssize_t len = safe_read(_stream_fd, _buffer, _buffer_size);
     472#else
     473            ssize_t len = _drb->Read(_buffer, _buffer_size);
     474#endif // !USE_DRB
     475            if (len > 0)
     476                ProcessDataTS(_buffer, len);
     477        }
    640478
    641         if (request_pause || _stream_fd < 0)
    642             continue;
    643 
    644         if (ret == 0 && t.elapsed() > POLL_WARNING_TIMEOUT)
     479        // Check for DRB errors
     480        if (_drb->IsErrored())
    645481        {
    646             VERBOSE(VB_GENERAL, LOC_WARN +
    647                     QString("No data from card in %1 milliseconds.")
    648                     .arg(t.elapsed()));
     482            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device error detected");
     483            _error = true;
    649484        }
    650         else if (ret == 1 && _polls.revents & POLLIN)
     485
     486        if (_drb->IsEOF())
    651487        {
    652             int msec = t.restart();
    653             if (msec >= POLL_WARNING_TIMEOUT)
    654             {
    655                 VERBOSE(VB_IMPORTANT, LOC_WARN +
    656                         QString("Got data from card after %1 ms. (>%2)")
    657                         .arg(msec).arg(POLL_WARNING_TIMEOUT));
    658             }
    659 
    660             ReadFromDMX();
    661             if (t.elapsed() >= 20)
    662             {
    663                 VERBOSE(VB_RECORD, LOC_WARN +
    664                         QString("ReadFromDMX took %1 ms").arg(t.elapsed()));
    665             }
     488            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device EOF detected");
     489            _error = true;
    666490        }
    667         else if ((ret < 0) || (ret == 1 && _polls.revents & POLLERR))
    668             VERBOSE(VB_IMPORTANT, LOC_ERR +
    669                     "Poll failed while waiting for data." + ENO);
    670491    }
    671492
     493#ifdef USE_DRB
     494    if (_drb && _drb->IsRunning())
     495        _drb->Stop();
     496#endif       
     497
    672498    Close();
    673499
    674500    FinishRecording();
     
    676502    _recording = false;
    677503}
    678504
     505#if 0
    679506void DVBRecorder::ReadFromDMX(void)
    680507{
    681508    int cardnum = _card_number_option;
     
    958785
    959786    ringBuffer->Write(buffer,len);
    960787}
     788#endif
    961789
    962790void DVBRecorder::Reset(void)
    963791{
     
    1064892    SetPMT(pmt);
    1065893}
    1066894
     895void DVBRecorder::WritePATPMT(void)
     896{
     897    if (_ts_packets_until_psip_sync <= 0)
     898    {
     899        QMutexLocker read_lock(&_pid_lock);
     900        uint next_cc;
     901        if (_pat && _pmt && ringBuffer)
     902        {
     903            next_cc = (_pat->tsheader()->ContinuityCounter()+1)&0xf;
     904            _pat->tsheader()->SetContinuityCounter(next_cc);
     905            ringBuffer->Write(_pat->tsheader()->data(), TSPacket::SIZE);
     906
     907            next_cc = (_pmt->tsheader()->ContinuityCounter()+1)&0xf;
     908            _pmt->tsheader()->SetContinuityCounter(next_cc);
     909            ringBuffer->Write(_pmt->tsheader()->data(), TSPacket::SIZE);
     910
     911            _ts_packets_until_psip_sync = TSPACKETS_BETWEEN_PSIP_SYNC;
     912        }
     913    }
     914    else
     915        _ts_packets_until_psip_sync--;
     916}
     917
     918void DVBRecorder::StopRecording(void)
     919{
     920#ifdef USE_DRB
     921    TVRec *rec = tvrec;
     922    tvrec = NULL; // don't notify of pause..
     923 
     924#if 0
     925    bool ok = true;
     926    if (!IsPaused())
     927    {
     928        VERBOSE(VB_IMPORTANT, "_recording: "<<_recording);
     929        if (_recording)
     930        {
     931            Pause();
     932            ok = WaitForPause(250);
     933        }
     934        else if (_drb && !_drb->IsPaused() && _drb->IsRunning())
     935        {
     936            VERBOSE(VB_IMPORTANT, "pausing DeviceReadBuffer");
     937            _drb->SetRequestPause(true);
     938            MythTimer t;
     939            t.start();
     940            while (!_drb->IsPaused() && t.elapsed() < 250);
     941            ok = _drb->IsPaused();           
     942        }
     943    }
     944#endif
     945
     946    _request_recording = false;
     947 
     948    if (_drb && _drb->IsRunning())
     949        _drb->Stop();
     950
     951    while (_recording)
     952        usleep(2000);
     953
     954    tvrec = rec;
     955#else
     956    DTVRecorder::StopRecording();
     957#endif
     958}
     959
     960void DVBRecorder::ReaderPaused(int /*fd*/)
     961{
     962#ifdef USE_DRB
     963    pauseWait.wakeAll();
     964    if (tvrec)
     965        tvrec->RecorderPaused();
     966#endif // USE_DRB
     967}
     968
     969bool DVBRecorder::PauseAndWait(int timeout)
     970{
     971#ifdef USE_DRB
     972    if (request_pause)
     973    {
     974        paused = true;
     975        if (!_drb->IsPaused())
     976            _drb->SetRequestPause(true);
     977
     978        unpauseWait.wait(timeout);
     979    }
     980    else if (_drb->IsPaused())
     981    {
     982        _drb->SetRequestPause(false);
     983        _drb->WaitForUnpause(timeout);
     984        paused = _drb->IsPaused();
     985    }
     986    else
     987    {
     988        paused = false;
     989    }
     990    return paused;
     991#else // if !USE_DRB
     992    return RecorderBase::PauseAndWait(timeout);
     993#endif // !USE_DRB
     994}
     995
     996uint DVBRecorder::ProcessDataTS(unsigned char *buffer, uint len)
     997{
     998    if (len % TSPacket::SIZE)
     999    {
     1000        VERBOSE(VB_IMPORTANT, LOC_ERR + "Incomplete packet received.");
     1001        len = len - (len % TSPacket::SIZE);
     1002    }
     1003    if (len < TSPacket::SIZE)
     1004        return len;
     1005
     1006    uint pos = 0;
     1007    uint end = len - TSPacket::SIZE;
     1008    while (pos <= end)
     1009    {
     1010        const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[pos]);
     1011        ProcessTSPacket(*pkt);
     1012        pos += TSPacket::SIZE;
     1013    }
     1014    return len - pos;
     1015}
     1016
     1017bool DVBRecorder::ProcessTSPacket(const TSPacket& tspacket)
     1018{
     1019    if (tspacket.TransportError())
     1020    {
     1021        VERBOSE(VB_RECORD, LOC + "Packet dropped due to uncorrectable error.");
     1022        ++_bad_packet_count;
     1023        return false; // Drop bad TS packets...
     1024    }
     1025
     1026    const uint pid = tspacket.PID();
     1027
     1028    QMutexLocker locker(&_pid_lock);
     1029    PIDInfo *info = _pid_infos[pid];
     1030    if (!info)
     1031       info = _pid_infos[pid] = new PIDInfo();
     1032
     1033    // track scrambled pids
     1034    if (tspacket.ScramplingControl())
     1035    {
     1036        if (!info->isEncrypted)
     1037        {
     1038            VERBOSE(VB_RECORD, LOC +
     1039                    QString("PID 0x%1 is encrypted, ignoring").arg(pid,0,16));
     1040            info->isEncrypted = true;
     1041        }
     1042        return true; // Drop encrypted TS packets...
     1043    }
     1044    else if (info->isEncrypted)
     1045    {
     1046        VERBOSE(VB_RECORD, LOC +
     1047                QString("PID 0x%1 is no longer encrypted").arg(pid,0,16));
     1048        info->isEncrypted = false;
     1049    }
     1050
     1051    // Check continuity counter
     1052    if (tspacket.HasPayload())
     1053    {
     1054        if (!info->CheckCC(tspacket.ContinuityCounter()))
     1055        {
     1056            VERBOSE(VB_RECORD, LOC +
     1057                    QString("PID 0x%1 discontinuity detected").arg(pid,0,16));
     1058            _continuity_error_count++;
     1059        }
     1060    }
     1061
     1062    // handle legacy PES recording mode
     1063    if (!_record_transport_stream_option)
     1064    {
     1065        // handle PS recording
     1066
     1067        ipack *ip = info->ip;
     1068        if (ip == NULL)
     1069            return true;
     1070
     1071        ip->ps = 1;
     1072
     1073        if (tspacket.PayloadStart() && (ip->plength == MMAX_PLENGTH-6))
     1074        {
     1075            ip->plength = ip->found-6;
     1076            ip->found = 0;
     1077            send_ipack(ip);
     1078            reset_ipack(ip);
     1079        }
     1080
     1081        uint afc_offset = tspacket.AFCOffset();
     1082        if (afc_offset > 187)
     1083            return true;
     1084
     1085        instant_repack((uint8_t*)tspacket.data() + afc_offset,
     1086                       TSPacket::SIZE - afc_offset, ip);
     1087        return true;
     1088    }
     1089
     1090    // handle TS recording
     1091
     1092    // Check for keyframes and count frames
     1093    if (info->isVideo)
     1094        FindKeyframes(&tspacket);
     1095
     1096    // Sync recording start to first keyframe
     1097    if (_wait_for_keyframe_option && !_keyframe_seen)
     1098        return true;
     1099
     1100    // Sync streams to the first Payload Unit Start Indicator
     1101    // _after_ first keyframe iff _wait_for_keyframe_option is true
     1102    if (!info->payloadStartSeen)
     1103    {
     1104        if (!tspacket.PayloadStart())
     1105            return true; // not payload start - drop packet
     1106
     1107        VERBOSE(VB_RECORD,
     1108                QString("PID 0x%1 Found Payload Start").arg(pid,0,16));
     1109        info->payloadStartSeen = true;
     1110    }
     1111
     1112    // Write PAT & PMT tables occasionally
     1113    WritePATPMT();
     1114
     1115    // Write Data
     1116    if (ringBuffer)
     1117        ringBuffer->Write(tspacket.data(), TSPacket::SIZE);
     1118    return true;
     1119}
     1120
     1121////////////////////////////////////////////////////////////
     1122// Stuff below this comment will be phased out after 0.20 //
     1123////////////////////////////////////////////////////////////
     1124
     1125static void print_pmt_info(
     1126    QString pre, const QValueList<ElementaryPIDObject> &pmt_list, bool fta)
     1127{
     1128    if (!(print_verbose_messages|VB_RECORD))
     1129        return;
     1130
     1131    QValueList<ElementaryPIDObject>::const_iterator it;
     1132    for (it = pmt_list.begin(); it != pmt_list.end(); ++it)
     1133    {
     1134        VERBOSE(VB_RECORD, pre +
     1135                QString("AutoPID %1 PID 0x%2, %3")
     1136                .arg(((*it).Record) ? "recording" : "skipping")
     1137                .arg((*it).PID,0,16).arg((*it).Description));
     1138    }
     1139
     1140    VERBOSE(VB_RECORD, pre + "AutoPID Complete - PAT/PMT Loaded for service\n"
     1141            "\t\t\tA/V Streams are " + ((fta ? "unencrypted" : "ENCRYPTED")));
     1142}
     1143
     1144/** \fn DVBRecorder::AutoPID(void)
     1145 *  \brief Process PMT and decide which components should be recorded
     1146 *
     1147 *   This is particularly for hardware decoders which don't
     1148 *   like to see more than one audio or one video stream.
     1149 *
     1150 *   When the hardware decoder is not specified all components
     1151 *   are recorded.
     1152 */
     1153void DVBRecorder::AutoPID(void)
     1154{
     1155    QMutexLocker change_lock(&_pid_lock);
     1156
     1157    VERBOSE(VB_RECORD, LOC +
     1158            QString("AutoPID for MPEG Program Number(%1), PCR PID(0x%2)")
     1159            .arg(_input_pmt.ServiceID).arg(((uint)_input_pmt.PCRPID),0,16));
     1160
     1161    // If this is not for a hardware decoder, record everything.
     1162    if (!_hw_decoder_option)
     1163    {
     1164        QValueList<ElementaryPIDObject> &pmt_list = _input_pmt.Components;
     1165
     1166        QValueList<ElementaryPIDObject>::iterator it;
     1167        for (it = pmt_list.begin(); it != pmt_list.end(); ++it)
     1168            (*it).Record = true;
     1169
     1170        print_pmt_info(LOC, pmt_list, _input_pmt.FTA());
     1171        return;
     1172    }
     1173
     1174    // Wanted stream types:
     1175    QValueList<ES_Type> StreamTypes;
     1176    StreamTypes += ES_TYPE_VIDEO_MPEG1;
     1177    StreamTypes += ES_TYPE_VIDEO_MPEG2;
     1178    StreamTypes += ES_TYPE_AUDIO_MPEG1;
     1179    StreamTypes += ES_TYPE_AUDIO_MPEG2;
     1180    StreamTypes += ES_TYPE_AUDIO_AC3;
     1181    if (_record_transport_stream_option)
     1182    {
     1183        // The following types are only supported with TS recording
     1184        StreamTypes += ES_TYPE_AUDIO_DTS;
     1185        StreamTypes += ES_TYPE_AUDIO_AAC;
     1186        StreamTypes += ES_TYPE_TELETEXT;
     1187        StreamTypes += ES_TYPE_SUBTITLE;
     1188    }
     1189
     1190    QMap<ES_Type, bool> flagged;
     1191    // Wanted languages:
     1192    QStringList Languages = iso639_get_language_list();
     1193
     1194    QValueList<ElementaryPIDObject>::iterator es;
     1195    for (es = _input_pmt.Components.begin();
     1196         es != _input_pmt.Components.end(); ++es)
     1197    {
     1198        if (!StreamTypes.contains((*es).Type))
     1199        {
     1200            // Type not wanted
     1201            continue;
     1202        }
     1203
     1204        if (((*es).Type == ES_TYPE_AUDIO_MPEG1) ||
     1205            ((*es).Type == ES_TYPE_AUDIO_MPEG2) ||
     1206            ((*es).Type == ES_TYPE_AUDIO_AC3))
     1207        {
     1208            bool ignore = false;
     1209            // Check for audio descriptors
     1210            DescriptorList::Iterator dit;
     1211            for (dit = (*es).Descriptors.begin();
     1212                 dit != (*es).Descriptors.end(); ++dit)
     1213            {
     1214                // Check for "Hearing impaired" or
     1215                // "Visual impaired commentary" stream
     1216                if (((*dit).Data[0] == 0x0A) &&
     1217                    ((*dit).Data[5] & 0xFE == 0x02))
     1218                {
     1219                    ignore = true;
     1220                    break;
     1221                }
     1222            }
     1223
     1224            if (ignore)
     1225                continue; // Ignore this stream
     1226        }
     1227
     1228        // Limit hardware decoders to one A/V stream
     1229        switch ((*es).Type)
     1230        {
     1231            case ES_TYPE_VIDEO_MPEG1:
     1232            case ES_TYPE_VIDEO_MPEG2:
     1233                if (flagged.contains(ES_TYPE_VIDEO_MPEG1) ||
     1234                    flagged.contains(ES_TYPE_VIDEO_MPEG2))
     1235                {
     1236                    continue; // Ignore this stream
     1237                }
     1238                break;
     1239
     1240            case ES_TYPE_AUDIO_MPEG1:
     1241            case ES_TYPE_AUDIO_MPEG2:
     1242            case ES_TYPE_AUDIO_AC3:
     1243            case ES_TYPE_AUDIO_DTS:
     1244            case ES_TYPE_AUDIO_AAC:
     1245                if (flagged.contains(ES_TYPE_AUDIO_MPEG1) ||
     1246                    flagged.contains(ES_TYPE_AUDIO_MPEG2) ||
     1247                    flagged.contains(ES_TYPE_AUDIO_AC3)   ||
     1248                    flagged.contains(ES_TYPE_AUDIO_DTS)   ||
     1249                    flagged.contains(ES_TYPE_AUDIO_AAC))
     1250                {
     1251                    continue; // Ignore this stream
     1252                }
     1253                break;
     1254
     1255            default:
     1256                break;
     1257        }
     1258        if (Languages.isEmpty() || // No specific language wanted
     1259            (*es).Language.isEmpty() || // Component has no language
     1260            Languages.contains((*es).Language)) // This language is wanted!
     1261        {
     1262            (*es).Record = true;
     1263            flagged[(*es).Type] = true;
     1264        }
     1265    }
     1266
     1267    print_pmt_info(LOC, _input_pmt.Components, _input_pmt.FTA());
     1268}
     1269
     1270bool DVBRecorder::Poll(void) const
     1271{
     1272#ifndef USE_DRB
     1273    struct pollfd polls;
     1274    polls.fd      = _stream_fd;
     1275    polls.events  = POLLIN;
     1276    polls.revents = 0;
     1277
     1278    int ret;
     1279    do
     1280        ret = poll(&polls, 1, POLL_INTERVAL);
     1281    while (!request_pause && IsOpen() &&
     1282           ((-1 == ret) && ((EAGAIN == errno) || (EINTR == errno))));
     1283
     1284    if (request_pause || !IsOpen())
     1285        return false;
     1286
     1287    if (ret > 0 && polls.revents & POLLIN)
     1288    {
     1289        if (_poll_timer.elapsed() >= POLL_WARNING_TIMEOUT)
     1290        {
     1291            VERBOSE(VB_IMPORTANT, LOC_WARN +
     1292                    QString("Got data from card after %1 ms. (>%2)")
     1293                    .arg(_poll_timer.elapsed()).arg(POLL_WARNING_TIMEOUT));
     1294        }
     1295        _poll_timer.start();
     1296        return true;
     1297    }
     1298
     1299    if (ret == 0 && _poll_timer.elapsed() > POLL_WARNING_TIMEOUT)
     1300    {
     1301        VERBOSE(VB_GENERAL, LOC_WARN +
     1302                QString("No data from card in %1 ms.")
     1303                .arg(_poll_timer.elapsed()));
     1304    }
     1305
     1306    if (ret < 0 && (EOVERFLOW == errno))
     1307    {
     1308        _stream_overflow_count++;
     1309        VERBOSE(VB_RECORD, LOC_ERR + "Driver buffer overflow detected.");
     1310    }
     1311
     1312    if ((ret < 0) || (ret > 0 && polls.revents & POLLERR))
     1313    {
     1314        VERBOSE(VB_IMPORTANT, LOC_ERR +
     1315                "Poll failed while waiting for data." + ENO);
     1316    }
     1317
     1318    return false;
     1319#else // if USE_DRB
     1320    return true;
     1321#endif // USE_DRB
     1322}
     1323
     1324void DVBRecorder::ProcessDataPS(unsigned char *buffer, uint len)
     1325{
     1326    if (len < 4)
     1327        return;
     1328
     1329    if (buffer[0] != 0x00 || buffer[1] != 0x00 || buffer[2] != 0x01)
     1330    {
     1331        if (!_wait_for_keyframe_option || _keyframe_seen)
     1332            ringBuffer->Write(buffer, len);
     1333        return;
     1334    }
     1335
     1336    uint stream_id = buffer[3];
     1337    if ((stream_id >= PESStreamID::MPEGVideoStreamBegin) &&
     1338        (stream_id <= PESStreamID::MPEGVideoStreamEnd))
     1339    {
     1340        uint pos = 8 + buffer[8];
     1341        uint datalen = len - pos;
     1342
     1343        unsigned char *bufptr = &buffer[pos];
     1344        uint state = 0xFFFFFFFF;
     1345        uint state_byte = 0;
     1346        int prvcount = -1;
     1347
     1348        while (bufptr < &buffer[pos] + datalen)
     1349        {
     1350            state_byte  = (++prvcount < 3) ? _ps_rec_buf[prvcount] : *bufptr++;
     1351            uint last   = state;
     1352            state       = ((state << 8) | state_byte) & 0xFFFFFF;
     1353            stream_id   = state_byte;
     1354
     1355            // Skip non-prefixed stream id's and skip slice PES stream id's
     1356            if ((last != 0x000001) ||
     1357                ((stream_id >= PESStreamID::SliceStartCodeBegin) &&
     1358                 (stream_id <= PESStreamID::SliceStartCodeEnd)))
     1359            {
     1360                continue;
     1361            }
     1362
     1363            // Now process the stream id's we care about
     1364            if (PESStreamID::PictureStartCode == stream_id)
     1365                _frames_written_count++;
     1366            else if (PESStreamID::SequenceStartCode == stream_id)
     1367                _keyframe_seen = true;
     1368            else if (PESStreamID::GOPStartCode == stream_id)
     1369            {
     1370                _position_map_lock.lock();
     1371                bool save_map = false;
     1372                if (!_position_map.contains(_frames_written_count))
     1373                {
     1374                    long long startpos = ringBuffer->GetWritePosition();
     1375                    _position_map_delta[_frames_written_count] = startpos;
     1376                    _position_map[_frames_written_count] = startpos;
     1377                    save_map = true;
     1378                }
     1379                _position_map_lock.unlock();
     1380                if (save_map)
     1381                    SavePositionMap(false);
     1382            }
     1383        }
     1384    }
     1385    memcpy(_ps_rec_buf, &buffer[len - 3], 3);
     1386
     1387    if (!_wait_for_keyframe_option || _keyframe_seen)
     1388        ringBuffer->Write(buffer, len);
     1389}
     1390
     1391void DVBRecorder::process_data_ps_cb(unsigned char *buffer,
     1392                                     int len, void *priv)
     1393{
     1394    ((DVBRecorder*)priv)->ProcessDataPS(buffer, (uint)len);
     1395}
     1396
     1397ipack *DVBRecorder::CreateIPack(ES_Type type)
     1398{
     1399    ipack* ip = (ipack*)malloc(sizeof(ipack));
     1400    assert(ip);
     1401    switch (type)
     1402    {
     1403        case ES_TYPE_VIDEO_MPEG1:
     1404        case ES_TYPE_VIDEO_MPEG2:
     1405            init_ipack(ip, 2048, process_data_ps_cb);
     1406            ip->replaceid = _ps_rec_video_id++;
     1407            break;
     1408
     1409        case ES_TYPE_AUDIO_MPEG1:
     1410        case ES_TYPE_AUDIO_MPEG2:
     1411            init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */
     1412            ip->replaceid = _ps_rec_audio_id++;
     1413            break;
     1414
     1415        case ES_TYPE_AUDIO_AC3:
     1416            init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */
     1417            ip->priv_type = PRIV_TS_AC3;
     1418            break;
     1419
     1420        case ES_TYPE_SUBTITLE:
     1421        case ES_TYPE_TELETEXT:
     1422            init_ipack(ip, 65535, process_data_ps_cb); /* don't repack PES */
     1423            ip->priv_type = PRIV_DVB_SUB;
     1424            break;
     1425
     1426        default:
     1427            init_ipack(ip, 2048, process_data_ps_cb);
     1428            break;
     1429    }
     1430    ip->data = (void*)this;
     1431    return ip;
     1432}
     1433
     1434#ifndef USE_DRB
     1435static ssize_t safe_read(int fd, unsigned char *buf, size_t bufsize)
     1436{
     1437    ssize_t size = read(fd, buf, bufsize);
     1438
     1439    if ((size < 0) &&
     1440        (errno != EAGAIN) && (errno != EOVERFLOW) && (EINTR != errno))
     1441    {
     1442        VERBOSE(VB_IMPORTANT, "DVB:safe_read(): "
     1443                "Error reading from DVB device." + ENO);
     1444    }
     1445
     1446    return size;
     1447}
     1448#endif // USE_DRB
     1449
    10671450void DVBRecorder::DebugTSHeader(unsigned char* buffer, int len)
    10681451{
    10691452    (void) len;
  • libs/libmythtv/DeviceReadBuffer.h

     
     1// -*- Mode: c++ -*-
     2/* Device Buffer written by John Poet */
     3
     4#ifndef _DEVICEREADBUFFER_H_
     5#define _DEVICEREADBUFFER_H_
     6
     7#include <unistd.h>
     8#include <pthread.h>
     9#include <sys/poll.h>
     10
     11#include <qmutex.h>
     12#include <qwaitcondition.h>
     13#include <qstring.h>
     14
     15#include "util.h"
     16
     17class ReaderPausedCB
     18{
     19  public:
     20    virtual void ReaderPaused(int fd) = 0;
     21};
     22
     23/** \class DeviceReadBuffer
     24 *  \brief Buffers reads from device files.
     25 *
     26 *  This allows us to read the device regularly even in the presence
     27 *  of long blocking conditions on writing to disk or accessing the
     28 *  database.
     29 */
     30class DeviceReadBuffer
     31{
     32  public:
     33    DeviceReadBuffer(ReaderPausedCB *callback, bool use_poll = true);
     34   ~DeviceReadBuffer();
     35
     36    bool Setup(const QString &streamName, int streamfd);
     37
     38    void Start(void);
     39    void Reset(const QString &streamName, int streamfd);
     40    void Stop(void);
     41
     42    void SetRequestPause(bool request);
     43    bool IsPaused(void) const;
     44    bool WaitForUnpause(int timeout);
     45   
     46    bool IsErrored(void) const { return error; }
     47    bool IsEOF(void)     const { return eof;   }
     48    bool IsRunning(void) const;
     49
     50    uint Read(unsigned char *buf, uint count);
     51
     52  private:
     53    static void *boot_ringbuffer(void *);
     54    void fill_ringbuffer(void);
     55
     56    void SetPaused(bool);
     57    void IncrWritePointer(uint len);
     58    void IncrReadPointer(uint len);
     59
     60    bool HandlePausing(void);
     61    bool Poll(void) const;
     62    uint WaitForUnused(uint bytes_needed) const;
     63    uint WaitForUsed  (uint bytes_needed) const;
     64
     65    bool IsPauseRequested(void) const;
     66    bool IsOpen(void) const { return _stream_fd >= 0; }
     67    uint GetUnused(void) const;
     68    uint GetUsed(void) const;
     69    uint GetContiguousUnused(void) const;
     70
     71    bool CheckForErrors(ssize_t read_len, uint &err_cnt);
     72    void ReportStats(void);
     73
     74    QString          videodevice;
     75    int              _stream_fd;
     76
     77    ReaderPausedCB  *readerPausedCB;
     78    pthread_t        thread;
     79
     80    // Data for managing the device ringbuffer
     81    mutable QMutex   lock;
     82    bool             run;
     83    bool             running;
     84    bool             eof;
     85    bool             error;
     86    bool             request_pause;
     87    bool             paused;
     88    bool             using_poll;
     89
     90    size_t           size;
     91    size_t           used;
     92    size_t           dev_read_size;
     93    size_t           min_read;
     94    unsigned char   *buffer;
     95    unsigned char   *readPtr;
     96    unsigned char   *writePtr;
     97    unsigned char   *endPtr;
     98
     99    QWaitCondition   pauseWait;
     100    QWaitCondition   unpauseWait;
     101
     102    // statistics
     103    size_t           max_used;
     104    size_t           avg_used;
     105    size_t           avg_cnt;
     106    MythTimer        lastReport;
     107};
     108
     109#endif // _DEVICEREADBUFFER_H_
  • libs/libmythtv/hdtvrecorder.h

     
    33 *  HDTVRecorder
    44 *  Copyright (c) 2003-2004 by Brandon Beattie, Doug Larrick,
    55 *    Jason Hoos, and Daniel Thor Kristjansson
    6  *  Device ringbuffer added by John Poet
    76 *  Distributed as part of MythTV under GPL v2 and later.
    87 */
    98
     
    1211
    1312#include "dtvrecorder.h"
    1413#include "tsstats.h"
     14#include "DeviceReadBuffer.h"
    1515
    1616struct AVFormatContext;
    1717struct AVPacket;
     
    2828 *
    2929 *  \sa DTVRecorder, DVBRecorder
    3030 */
    31 class HDTVRecorder : public DTVRecorder
     31class HDTVRecorder : public DTVRecorder, private ReaderPausedCB
    3232{
    3333    Q_OBJECT
    3434    friend class ATSCStreamData;
    3535    friend class TSPacketProcessor;
    3636  public:
    37     enum {report_loops = 20000};
    38 
    3937    HDTVRecorder(TVRec *rec);
    4038   ~HDTVRecorder();
    4139
     
    4745    void StartRecording(void);
    4846    void StopRecording(void);
    4947
    50     void Pause(bool clear = false);
    51     bool IsPaused(void) const;
    52 
    5348    void Reset(void);
    5449
    5550    bool Open(void);
     
    6156    void deleteLater(void);
    6257
    6358  private:
     59    bool IsOpen(void) const { return _stream_fd >= 0; }
     60    bool Close(void);
     61
    6462    void TeardownAll(void);
    65     int ProcessData(unsigned char *buffer, int len);
     63    uint ProcessDataTS(unsigned char *buffer, uint len);
    6664    bool ProcessTSPacket(const TSPacket& tspacket);
    6765    void HandleVideo(const TSPacket* tspacket);
    6866    void HandleAudio(const TSPacket* tspacket);
    6967
    7068    int ResyncStream(unsigned char *buffer, int curr_pos, int len);
    7169
    72     static void *boot_ringbuffer(void *);
    73     void fill_ringbuffer(void);
    74     int ringbuf_read(unsigned char *buffer, size_t count);
     70    void ReaderPaused(int fd);
     71    bool PauseAndWait(int timeout = 100);
    7572
    76  private slots:
     73    bool readchan(int chanfd, unsigned char* buffer, int dlen);
     74    bool syncchan(int chanfd, int dlen, int keepsync);
     75
     76  private slots:
    7777    void WritePAT(ProgramAssociationTable*);
    7878    void WritePMT(ProgramMapTable*);
    7979    void ProcessMGT(const MasterGuideTable*);
    8080    void ProcessVCT(uint, const VirtualChannelTable*);
    81  private:
    82     ATSCStreamData* _atsc_stream_data;
    8381
     82  private:
     83    ATSCStreamData   *_atsc_stream_data;
     84    DeviceReadBuffer *_drb;
     85
    8486    // statistics
    85     TSStats _ts_stats;
    86     long long _resync_count;
    87     size_t loop;
     87    TSStats           _ts_stats;
     88    long long         _resync_count;
    8889
    89     // Data for managing the device ringbuffer
    90     struct {
    91         pthread_t        thread;
    92         mutable pthread_mutex_t lock;
    93         mutable pthread_mutex_t lock_stats;
    94 
    95         bool             run;
    96         bool             eof;
    97         bool             error;
    98         bool             request_pause;
    99         bool             paused;
    100         size_t           size;
    101         size_t           used;
    102         size_t           max_used;
    103         size_t           avg_used;
    104         size_t           avg_cnt;
    105         size_t           dev_read_size;
    106         size_t           min_read;
    107         unsigned char  * buffer;
    108         unsigned char  * readPtr;
    109         unsigned char  * writePtr;
    110         unsigned char  * endPtr;
    111     } ringbuf;
     90    /// unsynced packets to look at before giving up initially
     91    static const uint INIT_SYNC_WINDOW_SIZE;
     92    /// synced packets to require before starting recording
     93    static const uint INIT_MIN_NUM_SYNC_PACKETS;
    11294};
    11395
    11496#endif
  • libs/libmythtv/hdtvrecorder.cpp

     
    8484#include "atsctables.h"
    8585#include "atscstreamdata.h"
    8686#include "tv_rec.h"
     87#include "DeviceReadBuffer.h"
    8788
    8889// AVLib/FFMPEG includes
    8990#include "../libavcodec/avcodec.h"
    9091#include "../libavformat/avformat.h"
    9192#include "../libavformat/mpegts.h"
    9293
    93 #define REPORT_RING_STATS 1
     94#define LOC QString("HDTVRec(%1):").arg(videodevice)
     95#define LOC_ERR QString("HDTVRec(%1) Error:").arg(videodevice)
    9496
    9597#define DEFAULT_SUBCHANNEL 1
    9698
     
    109111        };
    110112#endif
    111113
     114const uint HDTVRecorder::INIT_SYNC_WINDOW_SIZE     = 50;
     115const uint HDTVRecorder::INIT_MIN_NUM_SYNC_PACKETS = 10;
     116
    112117HDTVRecorder::HDTVRecorder(TVRec *rec)
    113118    : DTVRecorder(rec, "HDTVRecorder"), _atsc_stream_data(0), _resync_count(0)
    114119{
     
    116121    connect(_atsc_stream_data, SIGNAL(UpdatePATSingleProgram(
    117122                                          ProgramAssociationTable*)),
    118123            this, SLOT(WritePAT(ProgramAssociationTable*)));
    119     connect(_atsc_stream_data, SIGNAL(UpdatePMTSingleProgram(ProgramMapTable*)),
    120             this, SLOT(WritePMT(ProgramMapTable*)));
     124    connect(_atsc_stream_data,
     125            SIGNAL(UpdatePMTSingleProgram(ProgramMapTable*)),
     126            this,
     127            SLOT(WritePMT(ProgramMapTable*)));
    121128    connect(_atsc_stream_data, SIGNAL(UpdateMGT(const MasterGuideTable*)),
    122129            this, SLOT(ProcessMGT(const MasterGuideTable*)));
    123130    connect(_atsc_stream_data,
     
    125132            this, SLOT(ProcessVCT(uint, const VirtualChannelTable*)));
    126133
    127134    _buffer_size = TSPacket::SIZE * 1500;
    128     if ((_buffer = new unsigned char[_buffer_size])) {
    129         // make valgrind happy, initialize buffer memory
     135    _buffer = new unsigned char[_buffer_size];
     136
     137    // make valgrind happy, initialize buffer memory
     138    if (_buffer)
    130139        memset(_buffer, 0xFF, _buffer_size);
    131     }
    132140
    133     VERBOSE(VB_RECORD, QString("HD buffer size %1 KB").arg(_buffer_size/1024));
     141    VERBOSE(VB_RECORD, LOC +
     142            QString("buffer size %1 KB").arg(_buffer_size/1024));
    134143
    135     ringbuf.run = false;
    136     ringbuf.buffer = 0;
    137     pthread_mutex_init(&ringbuf.lock, NULL);
    138     pthread_mutex_init(&ringbuf.lock_stats, NULL);
    139     loop = random() % (report_loops / 2);
     144    _drb = new DeviceReadBuffer(this);
    140145}
    141146
    142147void HDTVRecorder::TeardownAll(void)
    143148{
    144     // Make SURE that the ringbuffer thread is cleaned up
     149    // Make SURE that the device read thread is cleaned up -- John Poet
    145150    StopRecording();
    146151
    147     if (_stream_fd >= 0)
    148     {
    149         close(_stream_fd);
    150         _stream_fd = -1;
    151     }
     152    Close();
     153
    152154    if (_atsc_stream_data)
    153155    {
    154156        delete _atsc_stream_data;
     
    164166HDTVRecorder::~HDTVRecorder()
    165167{
    166168    TeardownAll();
    167     pthread_mutex_destroy(&ringbuf.lock);
    168     pthread_mutex_destroy(&ringbuf.lock_stats);
     169    delete _drb;
    169170}
    170171
    171172void HDTVRecorder::deleteLater(void)
     
    189190    SetOption("vbiformat", gContext->GetSetting("VbiFormat"));
    190191}
    191192
    192 bool HDTVRecorder::Open()
     193bool HDTVRecorder::Open(void)
    193194{
    194195    if (!_atsc_stream_data || !_buffer)
    195196        return false;
    196197
    197198#if FAKE_VIDEO
    198199    // open file instead of device
    199     if (_stream_fd >=0 && close(_stream_fd))
    200     {
    201         VERBOSE(VB_IMPORTANT,
    202                 QString("HDTVRecorder::Open(): Error, failed to close "
    203                         "existing fd (%1)").arg(strerror(errno)));
    204         return false;
    205     }
    206200
     201    Close(); // close old video file
    207202    _stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
    208     VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
    209     fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
     203
     204    VERBOSE(VB_IMPORTANT, LOC_ERR + QString("Opened fake video source '%1'")
     205            .arg(FAKE_VIDEO_FILES[fake_video_index]) + ENO);
     206
     207    fake_video_index = (fake_video_index + 1) % FAKE_VIDEO_NUM;
     208
    210209#else
    211     if (_stream_fd <= 0)
     210    if (!IsOpen())
    212211        _stream_fd = open(videodevice.ascii(), O_RDWR);
    213212#endif
    214     if (_stream_fd <= 0)
     213
     214    if (!IsOpen())
    215215    {
    216         VERBOSE(VB_IMPORTANT, QString("Can't open video device: %1 chanfd = %2")
    217                 .arg(videodevice).arg(_stream_fd));
    218         perror("open video:");
     216        VERBOSE(VB_IMPORTANT, LOC_ERR +
     217                QString("Couldn't open video device: '%1'")
     218                .arg(videodevice) + ENO);
    219219    }
    220     return (_stream_fd>0);
     220
     221    return IsOpen();
    221222}
    222223
     224bool HDTVRecorder::Close(void)
     225{
     226    if (IsOpen() && (0 != close(_stream_fd)))
     227    {
     228        VERBOSE(VB_IMPORTANT, LOC_ERR +
     229                "Failed to close file descriptor." + ENO);
     230        return false;
     231    }
     232    _stream_fd = -1;
     233    return true;
     234}
     235
    223236void HDTVRecorder::SetStreamData(ATSCStreamData *stream_data)
    224237{
    225238    if (stream_data == _atsc_stream_data)
     
    231244        delete old_data;
    232245}
    233246
    234 bool readchan(int chanfd, unsigned char* buffer, int dlen) {
     247bool HDTVRecorder::readchan(int chanfd, unsigned char* buffer, int dlen)
     248{
    235249    int len = read(chanfd, buffer, dlen); // read next byte
    236250    if (dlen != len)
    237251    {
    238252        if (len < 0)
    239253        {
    240             VERBOSE(VB_IMPORTANT, QString("HD1 error reading from device"));
    241             perror("read");
     254            VERBOSE(VB_IMPORTANT, LOC_ERR +
     255                    "Reading from device failed" + ENO);
    242256        }
    243257        else if (len == 0)
    244             VERBOSE(VB_IMPORTANT, QString("HD2 end of file found in packet"));
     258            VERBOSE(VB_IMPORTANT, LOC_ERR + "EOF found in TS packet");
    245259        else
    246             VERBOSE(VB_IMPORTANT, QString("HD3 partial read. This shouldn't happen!"));
     260            VERBOSE(VB_IMPORTANT, LOC_ERR +
     261                    "Partial read during initial TS sync phase");
    247262    }
    248263    return (dlen == len);
    249264}
    250265
    251 bool syncchan(int chanfd, int dlen, int keepsync) {
     266bool HDTVRecorder::syncchan(int chanfd, int dlen, int keepsync)
     267{
    252268    unsigned char b[188];
    253269    int i, j;
    254     for (i=0; i<dlen; i++) {
     270    for (i=0; i<dlen; i++)
     271    {
    255272        if (!readchan(chanfd, b, 1))
    256273            break;
    257274        if (SYNC_BYTE == b[0])
    258275        {
    259             if (readchan(chanfd, &b[1], TSPacket::SIZE-1)) {
     276            if (readchan(chanfd, &b[1], TSPacket::SIZE-1))
     277            {
    260278                i += (TSPacket::SIZE - 1);
    261279                for (j=0; j<keepsync; j++)
    262280                {
     
    268286                }
    269287                if (j==keepsync)
    270288                {
    271                     VERBOSE(VB_RECORD,
    272                             QString("HD4 obtained device stream sync after reading %1 bytes").
    273                             arg(dlen));
     289                    VERBOSE(VB_RECORD, LOC + "Obtained TS sync, "+
     290                            QString("after reading %1 bytes").arg(dlen));
    274291                    return true;
    275292                }
    276293                continue;
     
    278295            break;
    279296        }
    280297    }
    281     VERBOSE(VB_IMPORTANT, QString("HD5 Error: could not obtain sync"));
     298    VERBOSE(VB_IMPORTANT, LOC_ERR + "Could not obtain TS sync");
    282299    return false;
    283300}
    284301
    285 void * HDTVRecorder::boot_ringbuffer(void * arg)
    286 {
    287     HDTVRecorder *dtv = (HDTVRecorder *)arg;
    288     dtv->fill_ringbuffer();
    289     return NULL;
    290 }
     302#define SR_CHK(MSG) \
     303    if (!ok) { VERBOSE(VB_IMPORTANT, MSG); _error = true; return; }
    291304
    292 void HDTVRecorder::fill_ringbuffer(void)
     305void HDTVRecorder::StartRecording(void)
    293306{
    294     int       errcnt = 0;
    295     int       len;
    296     size_t    unused, used;
    297     size_t    contiguous;
    298     size_t    read_size;
    299     bool      run, request_pause, paused;
     307    bool ok        = true;
     308    uint len       = 0;
     309    uint remainder = 0;
    300310
    301     pthread_mutex_lock(&ringbuf.lock);
    302     ringbuf.run = true;
    303     pthread_mutex_unlock(&ringbuf.lock);
     311    VERBOSE(VB_RECORD, LOC + "StartRecording()");
    304312
    305     for (;;)
    306     {
    307         pthread_mutex_lock(&ringbuf.lock);
    308         run = ringbuf.run;
    309         unused = ringbuf.size - ringbuf.used;
    310         request_pause = ringbuf.request_pause;
    311         paused = ringbuf.paused;
    312         pthread_mutex_unlock(&ringbuf.lock);
     313    ok = Open();
     314    SR_CHK("Failed to open device.");
    313315
    314         if (!run)
    315             break;
     316    ok = _drb->Setup(videodevice, _stream_fd);
     317    SR_CHK("Failed to allocate device read buffer.");
    316318
    317         if (request_pause)
    318         {
    319             pthread_mutex_lock(&ringbuf.lock);
    320             ringbuf.paused = true;
    321             pthread_mutex_unlock(&ringbuf.lock);
     319    ok = syncchan(_stream_fd,
     320                  INIT_SYNC_WINDOW_SIZE * TSPacket::SIZE,
     321                  INIT_MIN_NUM_SYNC_PACKETS);
     322    SR_CHK("Failed to sync to transport stream to valid packet.");
    322323
    323             pauseWait.wakeAll();
    324             if (tvrec)
    325                 tvrec->RecorderPaused();
     324    _drb->Start();
    326325
    327             usleep(1000);
    328             continue;
    329         }
    330         else if (paused)
    331         {
    332             pthread_mutex_lock(&ringbuf.lock);
    333             ringbuf.writePtr = ringbuf.readPtr = ringbuf.buffer;
    334             ringbuf.used = 0;
    335             ringbuf.paused = false;
    336             pthread_mutex_unlock(&ringbuf.lock);
    337         }
    338 
    339         contiguous = ringbuf.endPtr - ringbuf.writePtr;
    340 
    341         while (unused < TSPacket::SIZE && contiguous > TSPacket::SIZE)
    342         {
    343             usleep(500);
    344 
    345             pthread_mutex_lock(&ringbuf.lock);
    346             unused = ringbuf.size - ringbuf.used;
    347             request_pause = ringbuf.request_pause;
    348             pthread_mutex_unlock(&ringbuf.lock);
    349 
    350             if (request_pause)
    351                 break;
    352         }
    353         if (request_pause)
    354             continue;
    355 
    356         read_size = unused > contiguous ? contiguous : unused;
    357         if (read_size > ringbuf.dev_read_size)
    358             read_size = ringbuf.dev_read_size;
    359 
    360         len = read(_stream_fd, ringbuf.writePtr, read_size);
    361 
    362         if (len < 0)
    363         {
    364             if (errno == EINTR)
    365                 continue;
    366 
    367             VERBOSE(VB_IMPORTANT, QString("HD7 error reading from %1")
    368                     .arg(videodevice));
    369             perror("read");
    370             if (++errcnt > 5)
    371             {
    372                 pthread_mutex_lock(&ringbuf.lock);
    373                 ringbuf.error = true;
    374                 pthread_mutex_unlock(&ringbuf.lock);
    375 
    376                 break;
    377             }
    378 
    379             usleep(500);
    380             continue;
    381         }
    382         else if (len == 0)
    383         {
    384             if (++errcnt > 5)
    385             {
    386                 VERBOSE(VB_IMPORTANT, QString("HD8 %1 end of file found.")
    387                         .arg(videodevice));
    388 
    389                 pthread_mutex_lock(&ringbuf.lock);
    390                 ringbuf.eof = true;
    391                 pthread_mutex_unlock(&ringbuf.lock);
    392 
    393                 break;
    394             }
    395             usleep(500);
    396             continue;
    397         }
    398 
    399         errcnt = 0;
    400 
    401         pthread_mutex_lock(&ringbuf.lock);
    402         ringbuf.used += len;
    403         used = ringbuf.used;
    404         ringbuf.writePtr += len;
    405         pthread_mutex_unlock(&ringbuf.lock);
    406 
    407 #ifdef REPORT_RING_STATS
    408         pthread_mutex_lock(&ringbuf.lock_stats);
    409 
    410         if (ringbuf.max_used < used)
    411             ringbuf.max_used = used;
    412 
    413         ringbuf.avg_used = ((ringbuf.avg_used * ringbuf.avg_cnt) + used)
    414                            / ++ringbuf.avg_cnt;
    415         pthread_mutex_unlock(&ringbuf.lock_stats);
    416 #endif
    417 
    418         if (ringbuf.writePtr == ringbuf.endPtr)
    419             ringbuf.writePtr = ringbuf.buffer;
    420     }
    421 
    422     close(_stream_fd);
    423     _stream_fd = -1;
    424 }
    425 
    426 /* read count bytes from ring into buffer */
    427 int HDTVRecorder::ringbuf_read(unsigned char *buffer, size_t count)
    428 {
    429     size_t          avail;
    430     size_t          cnt = count;
    431     size_t          min_read;
    432     unsigned char  *cPtr = buffer;
    433 
    434     bool            dev_error = false;
    435     bool            dev_eof = false;
    436 
    437     pthread_mutex_lock(&ringbuf.lock);
    438     avail = ringbuf.used;
    439     pthread_mutex_unlock(&ringbuf.lock);
    440 
    441     min_read = cnt < ringbuf.min_read ? cnt : ringbuf.min_read;
    442 
    443     while (min_read > avail)
    444     {
    445         usleep(50000);
    446 
    447         if (request_pause || dev_error || dev_eof)
    448             return 0;
    449 
    450         pthread_mutex_lock(&ringbuf.lock);
    451         dev_error = ringbuf.error;
    452         dev_eof = ringbuf.eof;
    453         avail = ringbuf.used;
    454         pthread_mutex_unlock(&ringbuf.lock);
    455     }
    456     if (cnt > avail)
    457         cnt = avail;
    458 
    459     if (ringbuf.readPtr + cnt > ringbuf.endPtr)
    460     {
    461         size_t      len;
    462 
    463         // Process as two pieces
    464         len = ringbuf.endPtr - ringbuf.readPtr;
    465         memcpy(cPtr, ringbuf.readPtr, len);
    466         cPtr += len;
    467         len = cnt - len;
    468 
    469         // Wrap arround to begining of buffer
    470         ringbuf.readPtr = ringbuf.buffer;
    471         memcpy(cPtr, ringbuf.readPtr, len);
    472         ringbuf.readPtr += len;
    473     }
    474     else
    475     {
    476         memcpy(cPtr, ringbuf.readPtr, cnt);
    477         ringbuf.readPtr += cnt;
    478     }
    479 
    480     pthread_mutex_lock(&ringbuf.lock);
    481     ringbuf.used -= cnt;
    482     pthread_mutex_unlock(&ringbuf.lock);
    483 
    484     if (ringbuf.readPtr == ringbuf.endPtr)
    485         ringbuf.readPtr = ringbuf.buffer;
    486     else
    487     {
    488 #ifdef REPORT_RING_STATS
    489         size_t samples, avg, max;
    490 
    491         if (++loop == report_loops)
    492         {
    493             loop = 0;
    494             pthread_mutex_lock(&ringbuf.lock_stats);
    495             avg = ringbuf.avg_used;
    496             samples = ringbuf.avg_cnt;
    497             max = ringbuf.max_used;
    498             ringbuf.avg_used = 0;
    499             ringbuf.avg_cnt = 0;
    500             ringbuf.max_used = 0;
    501             pthread_mutex_unlock(&ringbuf.lock_stats);
    502 
    503             VERBOSE(VB_IMPORTANT, QString("%1 ringbuf avg %2% max %3%"
    504                                           " samples %4")
    505                     .arg(videodevice)
    506                     .arg((static_cast<double>(avg)
    507                           / ringbuf.size) * 100.0)
    508                     .arg((static_cast<double>(max)
    509                           / ringbuf.size) * 100.0)
    510                     .arg(samples));
    511         }
    512         else
    513 #endif
    514             usleep(25);
    515     }
    516 
    517     return cnt;
    518 }
    519 
    520 void HDTVRecorder::StartRecording(void)
    521 {
    522     bool            pause;
    523     bool            dev_error, dev_eof;
    524     int             len;
    525 
    526     const int unsyncpackets = 50; // unsynced packets to look at before giving up
    527     const int syncpackets   = 10; // synced packets to require before starting recording
    528 
    529     VERBOSE(VB_RECORD, QString("StartRecording"));
    530 
    531     if (!Open())
    532     {
    533         _error = true;       
    534         return;
    535     }
    536 
    537326    _request_recording = true;
    538     _recording = true;
     327    _recording         = true;
    539328
    540     // Setup device ringbuffer
    541     delete[] ringbuf.buffer;
    542 
    543 //    ringbuf.size = 60 * 1024 * TSPacket::SIZE;
    544     ringbuf.size = gContext->GetNumSetting("HDRingbufferSize", 50*188);
    545     ringbuf.size *= 1024;
    546 
    547     if ((ringbuf.buffer =
    548          new unsigned char[ringbuf.size + TSPacket::SIZE]) == NULL)
     329    // Process packets while recording is requested
     330    while (_request_recording && !_error)
    549331    {
    550         VERBOSE(VB_IMPORTANT, "Failed to allocate HDTVRecorder ring buffer.");
    551         _error = true;
    552         return;
    553     }
    554 
    555     memset(ringbuf.buffer, 0xFF, ringbuf.size + TSPacket::SIZE);
    556     ringbuf.endPtr = ringbuf.buffer + ringbuf.size;
    557     ringbuf.readPtr = ringbuf.writePtr = ringbuf.buffer;
    558     ringbuf.dev_read_size = TSPacket::SIZE * 48;
    559     ringbuf.min_read = TSPacket::SIZE * 4;
    560     ringbuf.used = 0;
    561     ringbuf.max_used = 0;
    562     ringbuf.avg_used = 0;
    563     ringbuf.avg_cnt = 0;
    564     ringbuf.request_pause = false;
    565     ringbuf.paused = false;
    566     ringbuf.error = false;
    567     ringbuf.eof = false;
    568 
    569     VERBOSE(VB_RECORD, QString("HD ring buffer size %1 KB")
    570             .arg(ringbuf.size/1024));
    571 
    572     // sync device stream so it starts with a valid ts packet
    573     if (!syncchan(_stream_fd, TSPacket::SIZE*unsyncpackets, syncpackets))
    574     {
    575         _error = true;
    576         return;
    577     }
    578 
    579     // create thread to fill the ringbuffer
    580     pthread_create(&ringbuf.thread, NULL, boot_ringbuffer,
    581                    reinterpret_cast<void *>(this));
    582 
    583     int remainder = 0;
    584     // TRANSFER DATA
    585     while (_request_recording)
    586     {
    587         pthread_mutex_lock(&ringbuf.lock);
    588         dev_error = ringbuf.error;
    589         dev_eof = ringbuf.eof;
    590         pause = ringbuf.paused;
    591         pthread_mutex_unlock(&ringbuf.lock);
    592 
    593         if (request_pause)
    594         {
    595             pthread_mutex_lock(&ringbuf.lock);
    596             ringbuf.request_pause = true;
    597             pthread_mutex_unlock(&ringbuf.lock);
    598 
    599             usleep(1000);
     332        if (PauseAndWait())
    600333            continue;
    601         }
    602         else if (pause)
    603         {
    604             pthread_mutex_lock(&ringbuf.lock);
    605             ringbuf.request_pause = false;
    606             pthread_mutex_unlock(&ringbuf.lock);
    607334
    608             usleep(1500);
    609             continue;
    610         }
     335        len = _drb->Read(&(_buffer[remainder]), _buffer_size - remainder);
    611336
    612         if (dev_error)
    613         {
    614             VERBOSE(VB_IMPORTANT, "HDTV: device error detected");
    615             _error = true;
    616             break;
    617         }
    618 
    619         if (dev_eof)
    620             break;
    621 
    622         len = ringbuf_read(&(_buffer[remainder]), _buffer_size - remainder);
    623 
    624337        if (len == 0)
    625338            continue;
    626339
    627340        len += remainder;
    628         remainder = ProcessData(_buffer, len);
     341        remainder = ProcessDataTS(_buffer, len);
    629342        if (remainder > 0) // leftover bytes
    630343            memmove(_buffer, &(_buffer[_buffer_size - remainder]),
    631344                    remainder);
     345
     346        // Check for DRB errors
     347        if (_drb->IsErrored())
     348        {
     349            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device error detected");
     350            _error = true;
     351        }
     352
     353        if (_drb->IsEOF())
     354        {
     355            VERBOSE(VB_IMPORTANT, LOC_ERR + "Device EOF detected");
     356            _error = true;
     357        }
    632358    }
    633359
    634360    FinishRecording();
    635361    _recording = false;
    636362}
     363#undef SR_CHK
    637364
    638365void HDTVRecorder::StopRecording(void)
    639366{
     
    649376
    650377    _request_recording = false;
    651378
    652     pthread_mutex_lock(&ringbuf.lock);
    653     bool run = ringbuf.run;
    654     ringbuf.run = false;
    655     pthread_mutex_unlock(&ringbuf.lock);
     379    if (_drb && _drb->IsRunning())
     380        _drb->Stop();
    656381
    657     if (run)
    658         pthread_join(ringbuf.thread, NULL);
     382    while (_recording)
     383        usleep(2000);
    659384
    660     if (!ok)
    661     {
    662         // Better to have a memory leak, then a segfault?
    663         VERBOSE(VB_IMPORTANT, "DTV ringbuffer not cleaned up!\n");
    664     }
    665     else
    666     {
    667         delete[] ringbuf.buffer;
    668         ringbuf.buffer = 0;
    669     }
    670385    tvrec = rec;
    671386}
    672387
    673 void HDTVRecorder::Pause(bool /*clear*/)
     388void HDTVRecorder::ReaderPaused(int /*fd*/)
    674389{
    675     pthread_mutex_lock(&ringbuf.lock);
    676     ringbuf.paused = false;
    677     pthread_mutex_unlock(&ringbuf.lock);
    678     request_pause = true;
     390    pauseWait.wakeAll();
     391    if (tvrec)
     392        tvrec->RecorderPaused();
    679393}
    680394
    681 bool HDTVRecorder::IsPaused(void) const
     395bool HDTVRecorder::PauseAndWait(int timeout)
    682396{
    683     pthread_mutex_lock(&ringbuf.lock);
    684     bool paused = ringbuf.paused;
    685     pthread_mutex_unlock(&ringbuf.lock);
     397#ifdef USE_DRB
     398    if (request_pause)
     399    {
     400        paused = true;
     401        if (!_drb->IsPaused())
     402            _drb->SetRequestPause(true);
    686403
     404        unpauseWait.wait(timeout);
     405    }
     406    else if (_drb->IsPaused())
     407    {
     408        _drb->SetRequestPause(false);
     409        _drb->WaitForUnpause(timeout);
     410        paused = _drb->IsPaused();
     411    }
     412    else
     413    {
     414        paused = false;
     415    }
    687416    return paused;
     417#else // if !USE_DRB
     418    return RecorderBase::PauseAndWait(timeout);
     419#endif // !USE_DRB
    688420}
    689421
    690422int HDTVRecorder::ResyncStream(unsigned char *buffer, int curr_pos, int len)
     
    695427    if (nextpos >= len)
    696428        return -1; // not enough bytes; caller should try again
    697429   
    698     while (buffer[pos] != SYNC_BYTE || buffer[nextpos] != SYNC_BYTE) {
     430    while (buffer[pos] != SYNC_BYTE || buffer[nextpos] != SYNC_BYTE)
     431    {
    699432        pos++;
    700433        nextpos++;
    701434        if (nextpos == len)
     
    707440
    708441void HDTVRecorder::WritePAT(ProgramAssociationTable *pat)
    709442{
     443    if (!pat)
     444        return;
     445
    710446    int next = (pat->tsheader()->ContinuityCounter()+1)&0xf;
    711447    pat->tsheader()->SetContinuityCounter(next);
    712448    ringBuffer->Write(pat->tsheader()->data(), TSPacket::SIZE);
    713449}
    714450
    715 #if WHACK_A_BUG_VIDEO
    716 static int WABV_base_pid     = 0x100;
    717 #define WABV_WAIT 60
    718 static int WABV_wait_a_while = WABV_WAIT;
    719 bool WABV_started = false;
    720 #endif
    721 
    722 #if WHACK_A_BUG_AUDIO
    723 static int WABA_base_pid     = 0x200;
    724 #define WABA_WAIT 60
    725 static int WABA_wait_a_while = WABA_WAIT;
    726 bool WABA_started = false;
    727 #endif
    728 
    729451void HDTVRecorder::WritePMT(ProgramMapTable* pmt)
    730452{
    731     if (pmt) {
    732         int next = (pmt->tsheader()->ContinuityCounter()+1)&0xf;
    733         pmt->tsheader()->SetContinuityCounter(next);
     453    if (!pmt)
     454        return;
    734455
    735 #if WHACK_A_BUG_VIDEO
    736         WABV_wait_a_while--;
    737         if (WABV_wait_a_while<=0) {
    738             WABV_started = true;
    739             WABV_wait_a_while = WABV_WAIT;
    740             WABV_base_pid = (((WABV_base_pid-0x100)+1)%32)+0x100;
    741             if (StreamID::MPEG2Video != StreamData()->PMT()->StreamType(0))
    742             {
    743                 VERBOSE(VB_IMPORTANT, "HDTVRecorder::WritePMT(): Error,"
    744                         "Whack a Bug can not rewrite PMT, wrong stream type");
    745             }
    746             else
    747             {
    748                 VERBOSE(VB_IMPORTANT, QString("Whack a Bug: new video pid %1").
    749                         arg(WABV_base_pid));
    750                 // rewrite video pid
    751                 const uint old_video_pid=StreamData()->PMT()->StreamPID(0);
    752                 StreamData()->PMT()->SetStreamPID(0, WABV_base_pid);
    753                 if (StreamData()->PMT()->PCRPID() == old_video_pid)
    754                     StreamData()->PMT()->SetPCRPID(WABV_base_pid);
    755                 StreamData()->PMT()->SetCRC(StreamData()->PMT()->CalcCRC());
    756                 VERBOSE(VB_IMPORTANT, StreamData()->PMT()->toString());
    757             }
    758         }
    759 #endif
    760 #if WHACK_A_BUG_AUDIO
    761         WABA_wait_a_while--;
    762         if (WABA_wait_a_while<=0) {
    763             WABA_started = true;
    764             WABA_wait_a_while = WABA_WAIT;
    765             WABA_base_pid = (((WABA_base_pid-0x200)+1)%32)+0x200;
    766             VERBOSE(VB_IMPORTANT, QString("Whack a Bug: new audio BASE pid %1").arg(WABA_base_pid));
    767             // rewrite audio pids
    768             for (uint i=0; i<StreamData()->PMT()->StreamCount(); i++) {
    769                 if (StreamID::MPEG2Audio == StreamData()->PMT()->StreamType(i) ||
    770                     StreamID::MPEG2Audio == StreamData()->PMT()->StreamType(i)) {
    771                     const uint old_audio_pid = StreamData()->PMT()->StreamPID(i);
    772                     const uint new_audio_pid = WABA_base_pid + old_audio_pid;
    773                     StreamData()->PMT()->SetStreamPID(i, new_audio_pid);
    774                     if (StreamData()->PMT()->PCRPID() == old_audio_pid)
    775                         StreamData()->PMT()->SetPCRPID(new_audio_pid);
    776                     StreamData()->PMT()->SetCRC(StreamData()->PMT()->CalcCRC());
    777                     VERBOSE(VB_IMPORTANT, StreamData()->PMT()->toString());
    778                 }
    779             }
    780         }
    781 #endif
    782 
    783         ringBuffer->Write(pmt->tsheader()->data(), TSPacket::SIZE);
    784     }
     456    int next = (pmt->tsheader()->ContinuityCounter()+1)&0xf;
     457    pmt->tsheader()->SetContinuityCounter(next);
     458    ringBuffer->Write(pmt->tsheader()->data(), TSPacket::SIZE);
    785459}
    786460
    787461/** \fn HDTVRecorder::ProcessMGT(const MasterGuideTable*)
     
    820494        {
    821495            if (vct->ProgramNumber(i) != (uint)StreamData()->DesiredProgram())
    822496            {
    823                 VERBOSE(VB_RECORD,
    824                         QString("Resetting desired program from %1"
    825                                 " to %2")
     497                VERBOSE(VB_RECORD, LOC_ERR +
     498                        QString("Resetting desired program from %1 to %2")
    826499                        .arg(StreamData()->DesiredProgram())
    827500                        .arg(vct->ProgramNumber(i)));
    828501                // Do a (partial?) reset here if old desired
     
    834507    }
    835508    if (!found)
    836509    {
    837         VERBOSE(VB_IMPORTANT,
     510        VERBOSE(VB_IMPORTANT, LOC_ERR +
    838511                QString("Desired channel %1_%2 not found;"
    839512                        " using %3_%4 instead.")
    840513                .arg(StreamData()->DesiredMajorChannel())
    841514                .arg(StreamData()->DesiredMinorChannel())
    842515                .arg(vct->MajorChannel(0))
    843                 .arg(vct->MinorChannel(0)));
    844         VERBOSE(VB_IMPORTANT, vct->toString());
     516                .arg(vct->MinorChannel(0)) + "\n" + vct->toString());
     517
    845518        StreamData()->SetDesiredProgram(vct->ProgramNumber(0));
    846519    }
    847520}
     
    854527    if (_wait_for_keyframe && !_keyframe_seen)
    855528        return;
    856529
    857 #if WHACK_A_BUG_VIDEO
    858     if (WABV_started)
    859         ((TSPacket*)(tspacket))->SetPID(WABV_base_pid);
    860 #endif
    861 
    862530    ringBuffer->Write(tspacket->data(), TSPacket::SIZE);
    863531}
    864532
     
    868536    if (_wait_for_keyframe && !_keyframe_seen)
    869537        return;
    870538
    871 #if WHACK_A_BUG_AUDIO
    872     if (WABA_started)
    873         ((TSPacket*)(tspacket))->SetPID(WABA_base_pid+tspacket->PID());
    874 #endif
    875 
    876539    ringBuffer->Write(tspacket->data(), TSPacket::SIZE);
    877540}
    878541
     
    902565    return ok;
    903566}
    904567
    905 int HDTVRecorder::ProcessData(unsigned char *buffer, int len)
     568uint HDTVRecorder::ProcessDataTS(unsigned char *buffer, uint len)
    906569{
    907     int pos = 0;
     570    if (len < TSPacket::SIZE)
     571        return len;
    908572
    909     while (pos + 187 < len) // while we have a whole packet left
     573    uint pos = 0;
     574    uint end = len - TSPacket::SIZE;
     575    while (pos <= end) // while we have a whole packet left
    910576    {
    911577        if (buffer[pos] != SYNC_BYTE)
    912578        {
    913579            _resync_count++;
    914             if (25 == _resync_count)
    915                 VERBOSE(VB_RECORD, QString("Resyncing many of times, suppressing error messages"));
     580
     581            if (25 == _resync_count)
     582            {
     583                VERBOSE(VB_RECORD, LOC + "Resyncing many of times, "
     584                        "suppressing error messages");
     585            }
    916586            else if (25 > _resync_count)
    917                 VERBOSE(VB_RECORD, QString("Resyncing"));
     587            {
     588                VERBOSE(VB_RECORD, LOC + "Resyncing");
     589            }
     590
    918591            int newpos = ResyncStream(buffer, pos, len);
    919592            if (newpos == -1)
    920593                return len - pos;
     
    925598        }
    926599
    927600        const TSPacket *pkt = reinterpret_cast<const TSPacket*>(&buffer[pos]);
    928         if (ProcessTSPacket(*pkt)) {
    929             pos += TSPacket::SIZE; // Advance to next TS packet
     601        if (ProcessTSPacket(*pkt))
     602        {
     603            // Advance to next TS packet
     604            pos += TSPacket::SIZE;
     605
     606            // Take care of statistics
    930607            _ts_stats.IncrTSPacketCount();
    931             if (0 == _ts_stats.TSPacketCount()%1000000)
    932                 VERBOSE(VB_RECORD, _ts_stats.toString());
    933         } else // Let it resync in case of dropped bytes
    934             buffer[pos] = SYNC_BYTE+1;
     608            if (0 == _ts_stats.TSPacketCount() % 1000000)
     609                VERBOSE(VB_RECORD, LOC + "\n" + _ts_stats.toString());
     610
     611        }
     612        else
     613        {
     614            pos++; // Resync on invalid packet, in case of dropped bytes...
     615        }
    935616    }
    936617
    937618    return len - pos;
     
    939620
    940621void HDTVRecorder::Reset(void)
    941622{
    942     VERBOSE(VB_RECORD, "HDTVRecorder::Reset(void)");
     623    VERBOSE(VB_RECORD, LOC + "Reset(void)");
    943624    DTVRecorder::Reset();
    944625
    945626    _error = false;
     
    947628    _ts_stats.Reset();
    948629
    949630    if (curRecording)
    950     {
    951631        curRecording->ClearPositionMap(MARK_GOP_BYFRAME);
     632
     633    if (!IsOpen())
     634        return /* true */;
     635
     636    if (!IsPaused())
     637    {
     638        Pause();
     639        WaitForPause();
    952640    }
    953641
    954     if (_stream_fd >= 0)
     642    if (!Close())
     643        return /* false */;
     644
     645    if (Open())
    955646    {
    956         if (!IsPaused())
    957         {
    958             Pause();
    959             WaitForPause();
    960         }
    961         int ret = close(_stream_fd);
    962         if (ret < 0)
    963         {
    964             perror("close");
    965             return;
    966         }
    967 #if FAKE_VIDEO
    968         // open file instead of device
    969         _stream_fd = open(FAKE_VIDEO_FILES[fake_video_index], O_RDWR);
    970         VERBOSE(VB_IMPORTANT, QString("Opened fake video source %1").arg(FAKE_VIDEO_FILES[fake_video_index]));
    971         fake_video_index = (fake_video_index+1)%FAKE_VIDEO_NUM;
    972 #else
    973         _stream_fd = open(videodevice.ascii(), O_RDWR);
    974 #endif
    975         if (_stream_fd < 0)
    976         {
    977             VERBOSE(VB_IMPORTANT, QString("HD1 Can't open video device: %1 chanfd = %2").
    978                     arg(videodevice).arg(_stream_fd));
    979             perror("open video");
    980             return;
    981         }
    982         else
    983         {
    984             pthread_mutex_lock(&ringbuf.lock);
    985             ringbuf.used = 0;
    986             ringbuf.max_used = 0;
    987             ringbuf.readPtr = ringbuf.writePtr = ringbuf.buffer;
    988             pthread_mutex_unlock(&ringbuf.lock);
    989         }
     647        _drb->Reset(videodevice, _stream_fd);
    990648        Unpause();
     649        return /* true */;
    991650    }
     651
     652    VERBOSE(VB_IMPORTANT, LOC_ERR + "Couldn't open video device: " +
     653            QString("'%1'").arg(videodevice) + ENO);
     654    return /* false */;
    992655}
  • libs/libmythtv/libmythtv.pro

     
    206206
    207207    # TVRec & Recorder base classes
    208208    HEADERS += tv_rec.h
    209     HEADERS += recorderbase.h
     209    HEADERS += recorderbase.h              DeviceReadBuffer.h
    210210    HEADERS += dtvrecorder.h               dummydtvrecorder.h
    211211    SOURCES += tv_rec.cpp
    212     SOURCES += recorderbase.cpp
     212    SOURCES += recorderbase.cpp            DeviceReadBuffer.cpp
    213213    SOURCES += dtvrecorder.cpp             dummydtvrecorder.cpp
    214214
    215215    # MPEG parsing stuff
  • libs/libmythtv/dvbrecorder.h

     
    1818#include "dtvrecorder.h"
    1919#include "tspacket.h"
    2020#include "transform.h"
     21#include "DeviceReadBuffer.h"
    2122
    2223#include "dvbtypes.h"
    2324#include "dvbchannel.h"
     
    2627class ProgramAssociationTable;
    2728class ProgramMapTable;
    2829
     30class PIDInfo
     31{
     32  public:
     33    PIDInfo() :
     34        filter_fd(-1),  continuityCount(0xFF), ip(NULL),
     35        isVideo(false), isEncrypted(false),    payloadStartSeen(false) {;}
     36
     37    int    filter_fd;         ///< Input filter file descriptor
     38    uint   continuityCount;   ///< last Continuity Count (sentinel 0xFF)
     39    ipack *ip;                ///< TS->PES converter
     40    bool   isVideo;
     41    bool   isEncrypted;       ///< true if PID is marked as encrypted
     42    bool   payloadStartSeen;  ///< true if payload start packet seen on PID
     43
     44    inline void Close(void);
     45    inline bool CheckCC(uint cc);
     46};
     47typedef QMap<uint,PIDInfo*> PIDInfoMap;
     48
    2949/** \class DVBRecorder
    3050 *  \brief This is a specialization of DTVRecorder used to
    3151 *         handle streams from DVB drivers.
    3252 *
    3353 *  \sa DTVRecorder, HDTVRecorder
    3454 */
    35 class DVBRecorder: public DTVRecorder
     55class DVBRecorder: public DTVRecorder, private ReaderPausedCB
    3656{
    3757    Q_OBJECT
    3858  public:
     
    4868
    4969    void StartRecording(void);
    5070    void Reset(void);
     71    void StopRecording(void);
    5172
    5273    bool Open(void);
     74    bool IsOpen(void) const { return _stream_fd >= 0; }
    5375    void Close(void);
    5476
    5577    bool RecordsTransportStream(void) const
     
    6486
    6587  private:
    6688    void TeardownAll(void);
    67     void ReadFromDMX(void);
    68     static void ProcessDataPS(unsigned char *buffer, int len, void *priv);
    69     void LocalProcessDataPS(unsigned char *buffer, int len);
    70     void LocalProcessDataTS(unsigned char *buffer, int len);
    7189
     90    bool Poll(void) const;
     91
     92    uint ProcessDataTS(unsigned char *buffer, uint len);
     93    bool ProcessTSPacket(const TSPacket& tspacket);
     94
     95    void AutoPID(void);
     96    bool OpenFilters(void);
    7297    void CloseFilters(void);
    7398    void OpenFilter(uint pid, ES_Type type, dmx_pes_type_t pes_type,
    7499                    uint mpeg_stream_type);
    75     bool SetDemuxFilters(void);
    76     void AutoPID(void);
    77100
    78101    void SetPAT(ProgramAssociationTable*);
    79102    void SetPMT(ProgramMapTable*);
    80103
    81104    void CreatePAT(void);
    82105    void CreatePMT(void);
     106    void WritePATPMT(void);
    83107
    84108    void DebugTSHeader(unsigned char* buffer, int len);
    85109
     110    void ReaderPaused(int fd);
     111    bool PauseAndWait(int timeout = 100);
     112
     113    ipack *CreateIPack(ES_Type type);
     114    void ProcessDataPS(unsigned char *buffer, uint len);
     115    static void process_data_ps_cb(unsigned char*,int,void*);
     116
     117    DeviceReadBuffer *_drb;
     118
    86119    // Options set in SetOption()
    87120    int             _card_number_option;
    88121    bool            _record_transport_stream_option;
     
    91124    // DVB stuff
    92125    DVBChannel*     dvbchannel;
    93126
     127    // general recorder stuff
     128    /// Set when we want to generate a new filter set
     129    bool            _reset_pid_filters;
     130    QMutex          _pid_lock;
     131    PIDInfoMap      _pid_infos;
     132
    94133    // PS recorder stuff
    95134    int             _ps_rec_audio_id;
    96135    int             _ps_rec_video_id;
    97136    unsigned char   _ps_rec_buf[3];
    98     pid_ipack_t     _ps_rec_pid_ipack;
    99137
    100138    // TS recorder stuff
    101139    ProgramAssociationTable *_pat;
    102140    ProgramMapTable         *_pmt;
    103141    uint            _next_pmt_version;
    104142    uint            _ts_packets_until_psip_sync;
    105     QMap<uint,bool> _payload_start_seen;
    106     QMap<uint,bool> _videoPID;
    107143
    108144    // Input Misc
    109145    /// PMT on input side
    110146    PMTObject       _input_pmt;
    111     /// Input filter file descriptors
    112     vector<int>     _pid_filters;
    113     /// Input polling structure for _stream_fd
    114     struct pollfd   _polls;
    115     /// Set when we want to generate a new filter set
    116     bool            _reset_pid_filters;
    117     /// Encrypted PID, so we can drop these
    118     QMap<uint,bool> _encrypted_pid;
    119147
    120     // locking
    121     QMutex          _pid_lock;
    122 
    123148    // Statistics
    124     uint            _continuity_error_count;
    125     uint            _stream_overflow_count;
    126     uint            _bad_packet_count;
    127     QMap<uint,int>  _continuity_count;
     149    mutable uint        _continuity_error_count;
     150    mutable uint        _stream_overflow_count;
     151    mutable uint        _bad_packet_count;
     152    mutable MythTimer   _poll_timer;
    128153
    129     // For debugging
    130     bool data_found; ///< debugging variable used by transform.c
    131     bool keyframe_found;
    132 
    133154    // Constants
    134155    static const int PMT_PID;
    135156    static const int TSPACKETS_BETWEEN_PSIP_SYNC;
     
    137158    static const int POLL_WARNING_TIMEOUT;
    138159};
    139160
     161inline void PIDInfo::Close(void)
     162{
     163    if (filter_fd >= 0)
     164        close(filter_fd);
     165
     166    if (ip)
     167    {
     168        free_ipack(ip);
     169        free(ip);
     170    }
     171}
     172
     173inline bool PIDInfo::CheckCC(uint new_cnt)
     174{
     175    if (continuityCount == 0xFF)
     176    {
     177        continuityCount = new_cnt;
     178        return true;
     179    }
     180
     181    continuityCount = (continuityCount+1) & 0xf;
     182    if (continuityCount == new_cnt)
     183        return true;
     184   
     185    continuityCount = new_cnt;
     186    return false;
     187}
     188
    140189#endif
  • libs/libmythtv/DeviceReadBuffer.cpp

     
     1#include <algorithm>
     2#include <cassert>
     3
     4#include "DeviceReadBuffer.h"
     5#include "mythcontext.h"
     6#include "tspacket.h"
     7
     8#define REPORT_RING_STATS 1
     9
     10#define LOC QString("DevRdB(%1): ").arg(videodevice)
     11#define LOC_ERR QString("DevRdB(%1) Error: ").arg(videodevice)
     12
     13DeviceReadBuffer::DeviceReadBuffer(ReaderPausedCB *cb, bool use_poll)
     14    : videodevice(QString::null),   _stream_fd(-1),
     15      readerPausedCB(cb),
     16
     17      // Data for managing the device ringbuffer
     18      run(false),                   running(false),
     19      eof(false),                   error(false),
     20      request_pause(false),         paused(false),
     21      using_poll(use_poll),
     22
     23      size(0),                      used(0),
     24      dev_read_size(0),             min_read(0),
     25
     26      buffer(NULL),                 readPtr(NULL),
     27      writePtr(NULL),               endPtr(NULL),
     28
     29      // statistics
     30      max_used(0),                  avg_used(0),
     31      avg_cnt(0)
     32{
     33}
     34
     35DeviceReadBuffer::~DeviceReadBuffer()
     36{
     37    if (buffer)
     38        delete[] buffer;
     39}
     40
     41bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd)
     42{
     43    QMutexLocker locker(&lock);
     44
     45    if (buffer)
     46        delete[] buffer;
     47
     48    videodevice   = streamName;
     49    _stream_fd    = streamfd;
     50
     51    // Setup device ringbuffer
     52    eof           = false;
     53    error         = false;
     54    request_pause = false;
     55    paused        = false;
     56
     57    size          = gContext->GetNumSetting("HDRingbufferSize",
     58                                            50 * TSPacket::SIZE) * 1024;
     59    used          = 0;
     60    dev_read_size = TSPacket::SIZE * (using_poll ? 256 : 48);
     61    min_read      = TSPacket::SIZE * 4;
     62
     63    buffer        = new unsigned char[size + TSPacket::SIZE];
     64    readPtr       = buffer;
     65    writePtr      = buffer;
     66    endPtr        = buffer + size;
     67
     68    // Initialize buffer, if it exists
     69    if (!buffer)
     70        return false;
     71    memset(buffer, 0xFF, size + TSPacket::SIZE);
     72
     73    // Initialize statistics
     74    max_used      = 0;
     75    avg_used      = 0;
     76    avg_cnt       = 0;
     77    lastReport.start();
     78
     79    VERBOSE(VB_RECORD, LOC + QString("buffer size %1 KB").arg(size/1024));
     80
     81    return true;
     82}
     83
     84void DeviceReadBuffer::Start(void)
     85{
     86    lock.lock();
     87    bool was_running = running;
     88    lock.unlock();
     89    if (was_running)
     90    {
     91        VERBOSE(VB_IMPORTANT, LOC_ERR + "Start(): Already running.");
     92        SetRequestPause(false);
     93        return;
     94    }
     95
     96    pthread_create(&thread, NULL, boot_ringbuffer, this);
     97}
     98
     99void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
     100{
     101    QMutexLocker locker(&lock);
     102
     103    videodevice   = streamName;
     104    _stream_fd    = streamfd;
     105
     106    used          = 0;
     107    readPtr       = buffer;
     108    writePtr      = buffer;
     109}
     110
     111void DeviceReadBuffer::Stop(void)
     112{
     113    bool was_running = IsRunning();
     114    lock.lock();
     115    run = false;
     116    lock.unlock();
     117
     118    if (!was_running)
     119    {
     120        VERBOSE(VB_IMPORTANT, LOC_ERR + "Stop(): Not running.");
     121        return;
     122    }
     123
     124    pthread_join(thread, NULL);
     125}
     126
     127void DeviceReadBuffer::SetRequestPause(bool req)
     128{
     129    QMutexLocker locker(&lock);
     130    request_pause = req;
     131}
     132
     133void DeviceReadBuffer::SetPaused(bool val)
     134{
     135    lock.lock();
     136    paused = val;
     137    lock.unlock();
     138    if (val)
     139        pauseWait.wakeAll();
     140    else
     141        unpauseWait.wakeAll();
     142}
     143
     144bool DeviceReadBuffer::IsPaused(void) const
     145{
     146    QMutexLocker locker(&lock);
     147    return paused;
     148}
     149
     150bool DeviceReadBuffer::WaitForUnpause(int timeout)
     151{
     152    if (IsPaused())
     153        unpauseWait.wait(timeout);
     154    return IsPaused();
     155}
     156
     157bool DeviceReadBuffer::IsPauseRequested(void) const
     158{
     159    QMutexLocker locker(&lock);
     160    return request_pause;
     161}
     162
     163bool DeviceReadBuffer::IsRunning(void) const
     164{
     165    QMutexLocker locker(&lock);
     166    return running;
     167}
     168
     169uint DeviceReadBuffer::GetUnused(void) const
     170{
     171    QMutexLocker locker(&lock);
     172    return size - used;
     173}
     174
     175uint DeviceReadBuffer::GetUsed(void) const
     176{
     177    QMutexLocker locker(&lock);
     178    return used;
     179}
     180
     181uint DeviceReadBuffer::GetContiguousUnused(void) const
     182{
     183    QMutexLocker locker(&lock);
     184    return endPtr - writePtr;
     185}
     186
     187void DeviceReadBuffer::IncrWritePointer(uint len)
     188{
     189    QMutexLocker locker(&lock);
     190    used     += len;
     191    writePtr += len;
     192    writePtr  = (writePtr == endPtr) ? buffer : writePtr;
     193#ifdef REPORT_RING_STATS
     194    max_used = max(used, max_used);
     195    avg_used = ((avg_used * avg_cnt) + used) / ++avg_cnt;
     196#endif
     197}
     198
     199void DeviceReadBuffer::IncrReadPointer(uint len)
     200{
     201    QMutexLocker locker(&lock);
     202    used    -= len;
     203    readPtr += len;
     204    readPtr  = (readPtr == endPtr) ? buffer : readPtr;
     205    assert(readPtr <= endPtr);
     206}
     207
     208void *DeviceReadBuffer::boot_ringbuffer(void *arg)
     209{
     210    ((DeviceReadBuffer*) arg)->fill_ringbuffer();
     211    return NULL;
     212}
     213
     214void DeviceReadBuffer::fill_ringbuffer(void)
     215{
     216    uint      errcnt = 0;
     217
     218    lock.lock();
     219    run     = true;
     220    running = true;
     221    lock.unlock();
     222
     223    while (run)
     224    {
     225        if (!HandlePausing())
     226            continue;
     227
     228        if (!IsOpen())
     229        {
     230            usleep(5000);
     231            continue;
     232        }
     233
     234        if (using_poll && !Poll())
     235            continue;
     236
     237        // Limit read size for faster return from read
     238        size_t read_size = min(dev_read_size, WaitForUnused(TSPacket::SIZE));
     239
     240        // if read_size > 0 do the read...
     241        if (read_size)
     242        {
     243            ssize_t len = read(_stream_fd, writePtr, read_size);
     244            if (!CheckForErrors(len, errcnt))
     245            {
     246                if (errcnt > 5)
     247                    break;
     248                else
     249                    continue;
     250            }
     251            errcnt = 0;
     252            IncrWritePointer(len);
     253        }
     254    }
     255
     256    lock.lock();
     257    running = false;
     258    lock.unlock();
     259}
     260
     261bool DeviceReadBuffer::HandlePausing(void)
     262{
     263    if (IsPauseRequested())
     264    {
     265        SetPaused(true);
     266
     267        if (readerPausedCB)
     268            readerPausedCB->ReaderPaused(_stream_fd);
     269
     270        usleep(5000);
     271        return false;
     272    }
     273    else if (IsPaused())
     274    {
     275        Reset(videodevice, _stream_fd);
     276        SetPaused(false);
     277    }
     278    return true;
     279}
     280
     281bool DeviceReadBuffer::Poll(void) const
     282{
     283    bool retval = true;
     284    while (true)
     285    {
     286        struct pollfd polls;
     287        polls.fd      = _stream_fd;
     288        polls.events  = POLLIN;
     289        polls.revents = 0;
     290
     291        int ret = poll(&polls, 1 /*number of polls*/, 10 /*msec*/);
     292        if (IsPauseRequested() || !IsOpen() || !run)
     293        {
     294            retval = false;
     295            break; // are we supposed to pause, stop, etc.
     296        }
     297
     298        if (ret > 0)
     299            break; // we have data to read :)
     300        if ((-1 == ret) && (EOVERFLOW == errno))
     301            break; // we have an error to handle
     302
     303        if ((-1 == ret) && ((EAGAIN == errno) || (EINTR  == errno)))
     304            continue; // errors that tell you to try again
     305        if (ret == 0)
     306            continue; // timed out, try again
     307
     308        usleep(2500);
     309    }
     310    return retval;
     311}
     312
     313bool DeviceReadBuffer::CheckForErrors(ssize_t len, uint &errcnt)
     314{
     315    if (len < 0)
     316    {
     317        if (EINTR == errno)
     318            return false;
     319        if (EAGAIN == errno)
     320        {
     321            usleep(2500);
     322            return false;
     323        }
     324        if (EOVERFLOW == errno)
     325        {
     326            VERBOSE(VB_IMPORTANT, LOC_ERR + "Driver buffers overflowed");
     327            return false;
     328        }
     329
     330        VERBOSE(VB_IMPORTANT, LOC_ERR +
     331                QString("Problem reading fd(%1)").arg(_stream_fd) + ENO);
     332
     333        if (++errcnt > 5)
     334        {
     335            lock.lock();
     336            error = true;
     337            lock.unlock();
     338            return false;
     339        }
     340
     341        usleep(500);
     342        return false;
     343    }
     344    else if (len == 0)
     345    {
     346        if (++errcnt > 5)
     347        {
     348            VERBOSE(VB_IMPORTANT, LOC +
     349                    QString("End-Of-File? fd(%1)").arg(_stream_fd));
     350
     351            lock.lock();
     352            eof = true;
     353            lock.unlock();
     354
     355            return false;
     356        }
     357        usleep(500);
     358        return false;
     359    }
     360    return true;
     361}
     362
     363/** \fn DeviceReadBuffer::Read(unsigned char*, uint)
     364 *  \brief Try to Read count bytes from into buffer
     365 *  \param buffer Buffer to put data in
     366 *  \param count  Number of bytes to attempt to read
     367 *  \return number of bytes actually read
     368 */
     369uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
     370{
     371    uint avail = WaitForUsed(min(count, min_read));
     372    size_t cnt = min(count, avail);
     373
     374    if (!cnt)
     375        return 0;
     376
     377    if (readPtr + cnt > endPtr)
     378    {
     379        // Process as two pieces
     380        size_t len = endPtr - readPtr;
     381        if (len)
     382        {
     383            memcpy(buf, readPtr, len);
     384            buf += len;
     385            IncrReadPointer(len);
     386        }
     387        if (cnt > len)
     388        {
     389            len = cnt - len;
     390            memcpy(buf, readPtr, len);
     391            IncrReadPointer(len);
     392        }
     393    }
     394    else
     395    {
     396        memcpy(buf, readPtr, cnt);
     397        IncrReadPointer(cnt);
     398    }
     399
     400    ReportStats();
     401
     402    return cnt;
     403}
     404
     405/// \return bytes available for writing
     406uint DeviceReadBuffer::WaitForUnused(uint needed) const
     407{
     408    size_t unused = GetUnused();
     409    size_t contig = GetContiguousUnused();
     410
     411    if (contig > TSPacket::SIZE)
     412    {
     413        while (unused < needed)
     414        {
     415            unused = GetUnused();
     416            if (IsPauseRequested() || !IsOpen() || !run)
     417                return 0;
     418            usleep(5000);
     419        }
     420        if (IsPauseRequested() || !IsOpen() || !run)
     421            return 0;
     422        contig = GetContiguousUnused();
     423    }
     424
     425    return min(contig, unused);
     426}
     427
     428/// \return bytes available for reading
     429uint DeviceReadBuffer::WaitForUsed(uint needed) const
     430{
     431    size_t avail = GetUsed();
     432    while ((needed > avail) && running)
     433    {
     434        {
     435            QMutexLocker locker(&lock);
     436            avail = used;
     437            if (request_pause || error || eof)
     438                return 0;
     439        }
     440        usleep(5000);
     441    }
     442    return avail;
     443}
     444
     445void DeviceReadBuffer::ReportStats(void)
     446{
     447#ifdef REPORT_RING_STATS
     448    if (lastReport.elapsed() > 20*1000 /* msg every 20 seconds */)
     449    {
     450        QMutexLocker locker(&lock);
     451        double rsize = 100.0 / size;
     452        QString msg  = QString("fill avg(%1%) ").arg(avg_used*rsize,3,'f',0);
     453        msg         += QString("fill max(%2%) ").arg(max_used*rsize,3,'f',0);
     454        msg         += QString("samples(%3)").arg(avg_cnt);
     455
     456        avg_used    = 0;
     457        avg_cnt     = 0;
     458        max_used    = 0;
     459        lastReport.start();
     460
     461        VERBOSE(VB_IMPORTANT, LOC + msg);
     462    }
     463#endif
     464}