From 7fd9d945f0e0abaf2f381adb9e763a3e9ee6b78b Mon Sep 17 00:00:00 2001
From: Lawrence Rust <lvr@softsystem.co.uk>
Date: Sun, 31 Jul 2011 17:14:52 +0200
Subject: [PATCH 1/9] ringbuffer: Adapt readahead for low bit rate (64kbps radio) streams

- The existing readahead block size is too large for 64/128 kBps audio
streams and can cause audio underruns.

- Make WaitFailAvail return the bytes available if less than that requested
for low bit rate streams (where fill_min is < 32kB).

- Make fileringbuffer::safe_read retry if reading from a remote file. Low
bit rate sources such as radio can have considerable latency.

Signed-off-by: Lawrence Rust <lvr@softsystem.co.uk>
---
 mythtv/libs/libmythtv/fileringbuffer.cpp |   48 +++++---
 mythtv/libs/libmythtv/ringbuffer.cpp     |  193 ++++++++++++++++--------------
 mythtv/libs/libmythtv/ringbuffer.h       |    2 +
 3 files changed, 138 insertions(+), 105 deletions(-)

diff --git a/mythtv/libs/libmythtv/fileringbuffer.cpp b/mythtv/libs/libmythtv/fileringbuffer.cpp
index a3b73f3..f4b90d7 100644
--- a/mythtv/libs/libmythtv/fileringbuffer.cpp
+++ b/mythtv/libs/libmythtv/fileringbuffer.cpp
@@ -356,7 +356,15 @@ bool FileRingBuffer::OpenFile(const QString &lfilename, uint retry_ms)
     commserror = false;
     numfailures = 0;
 
-    rawbitrate = 8000;
+    // The initial bitrate needs to be set with consideration for low bit rate
+    // streams (e.g. radio @ 64Kbps) such that fill_min bytes are received
+    // in a reasonable time period to enable decoders to peek the first few KB
+    // to determine type & settings.
+    if (is_local)
+        rawbitrate = 256; // Allow for radio
+    else
+        rawbitrate = 128; // remotefile
+
     CalcReadAheadThresh();
 
     bool ok = fd2 >= 0 || remotefile;
@@ -458,24 +466,32 @@ int FileRingBuffer::safe_read(int fd, void *data, uint sz)
  */
 int FileRingBuffer::safe_read(RemoteFile *rf, void *data, uint sz)
 {
-    int ret = rf->Read(data, sz);
-    if (ret < 0)
-    {
-        LOG(VB_GENERAL, LOG_ERR, LOC +
-            "safe_read(RemoteFile* ...): read failed");
-            
-        poslock.lockForRead();
-        rf->Seek(internalreadpos - readAdjust, SEEK_SET);
-        poslock.unlock();
-        numfailures++;
-    }
-    else if (ret == 0)
+    for (int retries = 0; ; ++retries)
     {
-        LOG(VB_FILE, LOG_INFO, LOC +
-            "safe_read(RemoteFile* ...): at EOF");
+        int ret = rf->Read(data, sz);
+        if (ret > 0)
+            return ret;
+        else if (ret < 0)
+        {
+            LOG(VB_GENERAL, LOG_ERR, LOC +
+                "safe_read(RemoteFile* ...): read failed");
+
+            poslock.lockForRead();
+            rf->Seek(internalreadpos - readAdjust, SEEK_SET);
+            poslock.unlock();
+            numfailures++;
+            return ret;
+        }
+        // Retry for 300mS if liveTV for low bit rate (radio) streams
+        else if (!livetvchain || retries >= 5)
+            break;
+
+        usleep(60000);
     }
 
-    return ret;
+    LOG(VB_FILE, LOG_INFO, LOC +
+        "safe_read(RemoteFile* ...): at EOF");
+    return 0;
 }
 
 long long FileRingBuffer::GetReadPosition(void) const
diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
index 1caacef..cd3b6ce 100644
--- a/mythtv/libs/libmythtv/ringbuffer.cpp
+++ b/mythtv/libs/libmythtv/ringbuffer.cpp
@@ -289,7 +289,8 @@ void RingBuffer::UpdateRawBitrate(uint raw_bitrate)
 {
     LOG(VB_FILE, LOG_INFO, LOC +
         QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate));
-    if (raw_bitrate < 2500)
+    // NB DVB-S radio can be 64kbps
+    if (raw_bitrate < 64)
     {
         LOG(VB_FILE, LOG_INFO, LOC +
             QString("UpdateRawBitrate(%1Kb) - ignoring bitrate,")
@@ -339,35 +340,38 @@ void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska)
  */
 void RingBuffer::CalcReadAheadThresh(void)
 {
-    uint estbitrate = 0;
-
     readsallowed   = false;
     readblocksize  = max(readblocksize, CHUNK);
 
-    // loop without sleeping if the buffered data is less than this
-    fill_threshold = 7 * bufferSize / 8;
-
-    const uint KB32  =  32*1024;
-    const uint KB64  =  64*1024;
-    const uint KB128 = 128*1024;
-    const uint KB256 = 256*1024;
-    const uint KB512 = 512*1024;
-
-    estbitrate     = (uint) max(abs(rawbitrate * playspeed),
+    uint estbitrate = (uint) max(abs(rawbitrate * playspeed),
                                 0.5f * rawbitrate);
-    estbitrate     = min(rawbitrate * 3, estbitrate);
-    int rbs        = (estbitrate > 2500)  ? KB64  : KB32;
-    rbs            = (estbitrate > 5000)  ? KB128 : rbs;
-    rbs            = (estbitrate > 9000)  ? KB256 : rbs;
-    rbs            = (estbitrate > 18000) ? KB512 : rbs;
-    readblocksize  = max(rbs,readblocksize);
+    estbitrate      = min(rawbitrate * 3, estbitrate);
+
+    int const KB1 = 1024;
+    int const rbs = estbitrate > 18000 ? 512*KB1 :
+                    estbitrate > 9000  ? 256*KB1 :
+                    estbitrate > 5000  ? 128*KB1 :
+                    estbitrate > 2500  ? 64*KB1 :
+                    estbitrate >  250  ? 32*KB1 : // 32KB~=0.25s @ 1Mbps
+                                         16*KB1 ;
+     if (rbs < CHUNK)
+         readblocksize = rbs;
+     else
+         readblocksize = max(rbs,readblocksize);
 
     // minumum seconds of buffering before allowing read
-    float secs_min = 0.25;
+    float const secs_min = 0.25f;
     // set the minimum buffering before allowing ffmpeg read
-    fill_min        = (uint) ((estbitrate * secs_min) * 0.125f);
-    // make this a multiple of ffmpeg block size..
-    fill_min        = ((fill_min / KB32) + 1) * KB32;
+    fill_min = (uint) (estbitrate * ((KB1 / 8) * secs_min));
+    if (fill_min < readblocksize)
+        fill_min = readblocksize;
+    if (fill_min > CHUNK)
+        fill_min = ((fill_min + CHUNK - 1) / CHUNK) * CHUNK;
+    if ((uint)fill_min >= bufferSize)
+        fill_min = bufferSize - 1;
+
+    // loop without sleeping if the buffered data is less than this
+    fill_threshold = max((uint)fill_min, bufferSize / 8);
 
     LOG(VB_FILE, LOG_INFO, LOC +
         QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> "
@@ -389,7 +393,11 @@ bool RingBuffer::IsNearEnd(double fps, uint vvf) const
     // WARNING: readahead_frames can greatly overestimate or underestimate
     //          the number of frames available in the read ahead buffer
     //          when rh_frames is less than the keyframe distance.
+    if (fps == 0.)
+        return false;
     double bytes_per_frame = kbits_per_sec * (1000.0/8.0) / fps;
+    if (bytes_per_frame == 0.)
+        return false;
     double readahead_frames = sz / bytes_per_frame;
 
     bool near_end = ((vvf + readahead_frames) < 10.0) || (sz < rbs*1.5);
@@ -428,6 +436,18 @@ int RingBuffer::ReadBufAvail(void) const
     return ret;
 }
 
+inline int RingBuffer::ReadBufUsed() const
+{
+    return (bufferSize - 1) - ReadBufFree();
+}
+
+inline bool RingBuffer::ReadsAllowed() const
+{
+    return ateof || setswitchtonext || commserror ||
+        // Ensure some hysteresis around fill_min
+        ReadBufUsed() >= (readsallowed ? 1 : fill_min);
+}
+
 /** \fn RingBuffer::ResetReadAhead(long long)
  *  \brief Restart the read-ahead thread at the 'newinternal' position.
  *
@@ -780,7 +800,8 @@ void RingBuffer::run(void)
                 readtimeavg = (readtimeavg * 9 + readinterval) / 10;
 
                 if (readtimeavg < 150 && 
-                    (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2))
+                    (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2) &&
+                    readblocksize >= CHUNK)
                 {
                     int old_block_size = readblocksize;
                     readblocksize = 3 * readblocksize / 2;
@@ -869,13 +890,9 @@ void RingBuffer::run(void)
             }
         }
 
-        int used = bufferSize - ReadBufFree();
-
         bool reads_were_allowed = readsallowed;
 
-        if ((0 == read_return) || (numfailures > 5) ||
-            (readsallowed != (used >= fill_min || ateof ||
-                              setswitchtonext || commserror)))
+        if ((0 == read_return) || (numfailures > 5) || ReadsAllowed() != readsallowed)
         {
             // If readpos changes while the lock is released
             // we should not handle the 0 read_return now.
@@ -886,8 +903,14 @@ void RingBuffer::run(void)
 
             commserror |= (numfailures > 5);
 
-            readsallowed = used >= fill_min || ateof ||
-                setswitchtonext || commserror;
+            bool bReadsAllowed = ReadsAllowed();
+            if (readsallowed != bReadsAllowed)
+            {
+                readsallowed = bReadsAllowed;
+                LOG(VB_FILE, LOG_INFO, LOC + (bReadsAllowed ?
+                    QString("Reads allowed: %1 bytes available").arg(ReadBufUsed()) :
+                    QString("Rebuffering %1..%2").arg(ReadBufUsed()).arg(fill_min)) );
+            }
 
             if (0 == read_return && old_readpos == readpos)
             {
@@ -910,13 +933,12 @@ void RingBuffer::run(void)
 
             rwlock.unlock();
             rwlock.lockForRead();
-            used = bufferSize - ReadBufFree();
         }
 
         LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop");
 
         if (!readsallowed || commserror || ateof || setswitchtonext ||
-            (wanttoread <= used && wanttoread > 0))
+            (wanttoread <= ReadBufUsed() && wanttoread > 0))
         {
             // To give other threads a good chance to handle these
             // conditions, even if they are only requesting a read lock
@@ -930,7 +952,7 @@ void RingBuffer::run(void)
         {
             // yield if we have nothing to do...
             if (!request_pause && reads_were_allowed &&
-                (used >= fill_threshold || ateof || setswitchtonext))
+                (ReadBufUsed() >= fill_threshold || ateof || setswitchtonext || ignoreliveeof))
             {
                 generalWait.wait(&rwlock, 50);
             }
@@ -940,7 +962,7 @@ void RingBuffer::run(void)
                 generalWait.wakeAll();
                 rwlock.unlock();
                 usleep(5 * 1000);
-                rwlock.lockForRead();            
+                rwlock.lockForRead();
             }
         }
     }
@@ -996,22 +1018,25 @@ bool RingBuffer::WaitForReadsAllowed(void)
     while (!readsallowed && !stopreads &&
            !request_pause && !commserror && readaheadrunning)
     {
-        generalWait.wait(&rwlock, 1000);
-        if (!readsallowed && t.elapsed() > 1000)
+        // The timeout should allow for congestion of internet streamed media
+        if (t.elapsed() >= 30000)
         {
-            LOG(VB_GENERAL, LOG_WARNING, LOC +
-                "Taking too long to be allowed to read..");
-
-            if (t.elapsed() > 10000)
-            {
-                LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to "
-                                               "be allowed to read, aborting.");
-                return false;
-            }
+            LOG(VB_GENERAL, LOG_ERR, LOC +
+                QString("Waited %1 seconds to be allowed to read, aborting.")
+                .arg(t.elapsed()/1000) );
+            return false;
         }
+
+        generalWait.wait(&rwlock, 250);
     }
 
-    return readsallowed;
+    if (t.elapsed() >= 500)
+    {
+        LOG(VB_GENERAL, LOG_WARNING, LOC +
+            QString("Waited %1 mS to be allowed to read (avail=%2 fill_min=%3)..")
+            .arg(t.elapsed()).arg(ReadBufAvail()).arg(fill_min) );
+    }
+    return true;
 }
 
 bool RingBuffer::WaitForAvail(int count)
@@ -1035,43 +1060,31 @@ bool RingBuffer::WaitForAvail(int count)
         generalWait.wakeAll();
     }
 
-    MythTimer t;
-    t.start();
-    while ((avail < count) && !stopreads &&
-           !request_pause && !commserror && readaheadrunning)
+    MythTimer t; t.start();
+    wanttoread = count;
+    while (avail < count && !stopreads && !request_pause &&
+            !commserror && readaheadrunning)
     {
-        wanttoread = count;
-        generalWait.wait(&rwlock, 250);
-        avail = ReadBufAvail();
-
-        if (ateof && avail < count)
-            count = avail;
-
-        if (avail < count)
+        uint elapsed = t.elapsed();
+        if (elapsed >= 10000)
         {
-            int elapsed = t.elapsed();
-            if  (((elapsed > 250)  && (elapsed < 500))  ||
-                 ((elapsed > 500)  && (elapsed < 750))  ||
-                 ((elapsed > 1000) && (elapsed < 1250)) ||
-                 ((elapsed > 2000) && (elapsed < 2250)) ||
-                 ((elapsed > 4000) && (elapsed < 4250)) ||
-                 ((elapsed > 8000) && (elapsed < 8250)) ||
-                 ((elapsed > 9000)))
-            {
-                LOG(VB_GENERAL, LOG_INFO, LOC + "Waited " +
-                    QString("%1").arg((elapsed / 250) * 0.25f, 3, 'f', 1) +
-                    " seconds for data \n\t\t\tto become available..." +
-                    QString(" %2 < %3") .arg(avail).arg(count));
-            }
-
-            if (elapsed > 16000)
-            {
-                LOG(VB_GENERAL, LOG_ERR, LOC + "Waited " +
-                    QString("%1").arg(elapsed/1000) +
-                    " seconds for data, aborting.");
-                return false;
-            }
+            LOG(VB_GENERAL, LOG_ERR, LOC +
+                QString("Timed out waiting for data available (wanted=%1, avail=%2)")
+                .arg(count).arg(avail) );
+            break;
         }
+        else if (elapsed >= 100 && avail)
+        {
+            LOG(VB_GENERAL, LOG_INFO, LOC +
+                QString("Waited %1 mS for %2 bytes (wanted %3)")
+                .arg(elapsed).arg(avail).arg(count) );
+            count = avail;
+            generalWait.wakeAll();
+            break;
+        }
+
+        generalWait.wait(&rwlock, 100);
+        avail = ReadBufAvail();
     }
 
     wanttoread = 0;
@@ -1133,7 +1146,7 @@ int RingBuffer::ReadDirect(void *buf, int count, bool peek)
         if (new_pos != old_pos)
         {
             LOG(VB_GENERAL, LOG_ERR, LOC +
-                QString("Peek() Failed to return from new "
+                QString("Seek() Failed to return from new "
                         "position %1 to old position %2, now "
                         "at position %3")
                     .arg(old_pos - ret).arg(old_pos).arg(new_pos));
@@ -1153,12 +1166,14 @@ int RingBuffer::ReadDirect(void *buf, int count, bool peek)
  */
 int RingBuffer::ReadPriv(void *buf, int count, bool peek)
 {
-    QString loc_desc = QString("ReadPriv(..%1, %2)")
+    const QString loc_desc = QString("ReadPriv(..%1, %2)")
         .arg(count).arg(peek?"peek":"normal");
-    LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
-        QString(" @%1 -- begin").arg(rbrpos));
 
     rwlock.lockForRead();
+
+    LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
+        QString(" @%1 avail=%2 -- begin").arg(rbrpos).arg(ReadBufAvail()));
+
     if (writemode)
     {
         LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
@@ -1189,6 +1204,7 @@ int RingBuffer::ReadPriv(void *buf, int count, bool peek)
         if (request_pause || stopreads ||
             !readaheadrunning || (ignorereadpos >= 0))
         {
+            LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- direct read");
             int ret = ReadDirect(buf, count, peek);
             LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
                 QString(": ReadDirect checksum %1")
@@ -1203,10 +1219,9 @@ int RingBuffer::ReadPriv(void *buf, int count, bool peek)
     if (!WaitForReadsAllowed())
     {
         LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
-        rwlock.unlock();
-        stopreads = true; // this needs to be outside the lock
-        rwlock.lockForWrite();
-        wanttoread = 0;
+        // NB don't set stopreads or else the next ReadPriv will call ReadDirect
+        // which, if there's any readahead, will cause data to be returned out
+        // of sequence
         rwlock.unlock();
         return 0;
     }
diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h
index f551ecd..8dc633d 100644
--- a/mythtv/libs/libmythtv/ringbuffer.h
+++ b/mythtv/libs/libmythtv/ringbuffer.h
@@ -166,6 +166,8 @@ class MTV_PUBLIC RingBuffer : protected MThread
 
     int ReadBufFree(void) const;
     int ReadBufAvail(void) const;
+    int ReadBufUsed() const;
+    bool ReadsAllowed() const;
 
     void ResetReadAhead(long long newinternal);
     void KillReadAheadThread(void);
-- 
1.7.4.1

