From 7fd9d945f0e0abaf2f381adb9e763a3e9ee6b78b Mon Sep 17 00:00:00 2001
From: Lawrence Rust <lvr@softsystem.co.uk>
Date: Sun, 31 Jul 2011 17:14:52 +0200
Subject: [PATCH 1/9] ringbuffer: Adapt readahead for low bit rate (64kbps radio) streams
- The existing readahead block size is too large for 64/128 kBps audio
streams and can cause audio underruns.
- Make WaitFailAvail return the bytes available if less than that requested
for low bit rate streams (where fill_min is < 32kB).
- Make fileringbuffer::safe_read retry if reading from a remote file. Low
bit rate sources such as radio can have considerable latency.
Signed-off-by: Lawrence Rust <lvr@softsystem.co.uk>
---
mythtv/libs/libmythtv/fileringbuffer.cpp | 48 +++++---
mythtv/libs/libmythtv/ringbuffer.cpp | 193 ++++++++++++++++--------------
mythtv/libs/libmythtv/ringbuffer.h | 2 +
3 files changed, 138 insertions(+), 105 deletions(-)
diff --git a/mythtv/libs/libmythtv/fileringbuffer.cpp b/mythtv/libs/libmythtv/fileringbuffer.cpp
index a3b73f3..f4b90d7 100644
|
a
|
b
|
bool FileRingBuffer::OpenFile(const QString &lfilename, uint retry_ms)
|
| 356 | 356 | commserror = false; |
| 357 | 357 | numfailures = 0; |
| 358 | 358 | |
| 359 | | rawbitrate = 8000; |
| | 359 | // The initial bitrate needs to be set with consideration for low bit rate |
| | 360 | // streams (e.g. radio @ 64Kbps) such that fill_min bytes are received |
| | 361 | // in a reasonable time period to enable decoders to peek the first few KB |
| | 362 | // to determine type & settings. |
| | 363 | if (is_local) |
| | 364 | rawbitrate = 256; // Allow for radio |
| | 365 | else |
| | 366 | rawbitrate = 128; // remotefile |
| | 367 | |
| 360 | 368 | CalcReadAheadThresh(); |
| 361 | 369 | |
| 362 | 370 | bool ok = fd2 >= 0 || remotefile; |
| … |
… |
int FileRingBuffer::safe_read(int fd, void *data, uint sz)
|
| 458 | 466 | */ |
| 459 | 467 | int FileRingBuffer::safe_read(RemoteFile *rf, void *data, uint sz) |
| 460 | 468 | { |
| 461 | | int ret = rf->Read(data, sz); |
| 462 | | if (ret < 0) |
| 463 | | { |
| 464 | | LOG(VB_GENERAL, LOG_ERR, LOC + |
| 465 | | "safe_read(RemoteFile* ...): read failed"); |
| 466 | | |
| 467 | | poslock.lockForRead(); |
| 468 | | rf->Seek(internalreadpos - readAdjust, SEEK_SET); |
| 469 | | poslock.unlock(); |
| 470 | | numfailures++; |
| 471 | | } |
| 472 | | else if (ret == 0) |
| | 469 | for (int retries = 0; ; ++retries) |
| 473 | 470 | { |
| 474 | | LOG(VB_FILE, LOG_INFO, LOC + |
| 475 | | "safe_read(RemoteFile* ...): at EOF"); |
| | 471 | int ret = rf->Read(data, sz); |
| | 472 | if (ret > 0) |
| | 473 | return ret; |
| | 474 | else if (ret < 0) |
| | 475 | { |
| | 476 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| | 477 | "safe_read(RemoteFile* ...): read failed"); |
| | 478 | |
| | 479 | poslock.lockForRead(); |
| | 480 | rf->Seek(internalreadpos - readAdjust, SEEK_SET); |
| | 481 | poslock.unlock(); |
| | 482 | numfailures++; |
| | 483 | return ret; |
| | 484 | } |
| | 485 | // Retry for 300mS if liveTV for low bit rate (radio) streams |
| | 486 | else if (!livetvchain || retries >= 5) |
| | 487 | break; |
| | 488 | |
| | 489 | usleep(60000); |
| 476 | 490 | } |
| 477 | 491 | |
| 478 | | return ret; |
| | 492 | LOG(VB_FILE, LOG_INFO, LOC + |
| | 493 | "safe_read(RemoteFile* ...): at EOF"); |
| | 494 | return 0; |
| 479 | 495 | } |
| 480 | 496 | |
| 481 | 497 | long long FileRingBuffer::GetReadPosition(void) const |
diff --git a/mythtv/libs/libmythtv/ringbuffer.cpp b/mythtv/libs/libmythtv/ringbuffer.cpp
index 1caacef..cd3b6ce 100644
|
a
|
b
|
void RingBuffer::UpdateRawBitrate(uint raw_bitrate)
|
| 289 | 289 | { |
| 290 | 290 | LOG(VB_FILE, LOG_INFO, LOC + |
| 291 | 291 | QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate)); |
| 292 | | if (raw_bitrate < 2500) |
| | 292 | // NB DVB-S radio can be 64kbps |
| | 293 | if (raw_bitrate < 64) |
| 293 | 294 | { |
| 294 | 295 | LOG(VB_FILE, LOG_INFO, LOC + |
| 295 | 296 | QString("UpdateRawBitrate(%1Kb) - ignoring bitrate,") |
| … |
… |
void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska)
|
| 339 | 340 | */ |
| 340 | 341 | void RingBuffer::CalcReadAheadThresh(void) |
| 341 | 342 | { |
| 342 | | uint estbitrate = 0; |
| 343 | | |
| 344 | 343 | readsallowed = false; |
| 345 | 344 | readblocksize = max(readblocksize, CHUNK); |
| 346 | 345 | |
| 347 | | // loop without sleeping if the buffered data is less than this |
| 348 | | fill_threshold = 7 * bufferSize / 8; |
| 349 | | |
| 350 | | const uint KB32 = 32*1024; |
| 351 | | const uint KB64 = 64*1024; |
| 352 | | const uint KB128 = 128*1024; |
| 353 | | const uint KB256 = 256*1024; |
| 354 | | const uint KB512 = 512*1024; |
| 355 | | |
| 356 | | estbitrate = (uint) max(abs(rawbitrate * playspeed), |
| | 346 | uint estbitrate = (uint) max(abs(rawbitrate * playspeed), |
| 357 | 347 | 0.5f * rawbitrate); |
| 358 | | estbitrate = min(rawbitrate * 3, estbitrate); |
| 359 | | int rbs = (estbitrate > 2500) ? KB64 : KB32; |
| 360 | | rbs = (estbitrate > 5000) ? KB128 : rbs; |
| 361 | | rbs = (estbitrate > 9000) ? KB256 : rbs; |
| 362 | | rbs = (estbitrate > 18000) ? KB512 : rbs; |
| 363 | | readblocksize = max(rbs,readblocksize); |
| | 348 | estbitrate = min(rawbitrate * 3, estbitrate); |
| | 349 | |
| | 350 | int const KB1 = 1024; |
| | 351 | int const rbs = estbitrate > 18000 ? 512*KB1 : |
| | 352 | estbitrate > 9000 ? 256*KB1 : |
| | 353 | estbitrate > 5000 ? 128*KB1 : |
| | 354 | estbitrate > 2500 ? 64*KB1 : |
| | 355 | estbitrate > 250 ? 32*KB1 : // 32KB~=0.25s @ 1Mbps |
| | 356 | 16*KB1 ; |
| | 357 | if (rbs < CHUNK) |
| | 358 | readblocksize = rbs; |
| | 359 | else |
| | 360 | readblocksize = max(rbs,readblocksize); |
| 364 | 361 | |
| 365 | 362 | // minumum seconds of buffering before allowing read |
| 366 | | float secs_min = 0.25; |
| | 363 | float const secs_min = 0.25f; |
| 367 | 364 | // set the minimum buffering before allowing ffmpeg read |
| 368 | | fill_min = (uint) ((estbitrate * secs_min) * 0.125f); |
| 369 | | // make this a multiple of ffmpeg block size.. |
| 370 | | fill_min = ((fill_min / KB32) + 1) * KB32; |
| | 365 | fill_min = (uint) (estbitrate * ((KB1 / 8) * secs_min)); |
| | 366 | if (fill_min < readblocksize) |
| | 367 | fill_min = readblocksize; |
| | 368 | if (fill_min > CHUNK) |
| | 369 | fill_min = ((fill_min + CHUNK - 1) / CHUNK) * CHUNK; |
| | 370 | if ((uint)fill_min >= bufferSize) |
| | 371 | fill_min = bufferSize - 1; |
| | 372 | |
| | 373 | // loop without sleeping if the buffered data is less than this |
| | 374 | fill_threshold = max((uint)fill_min, bufferSize / 8); |
| 371 | 375 | |
| 372 | 376 | LOG(VB_FILE, LOG_INFO, LOC + |
| 373 | 377 | QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> " |
| … |
… |
bool RingBuffer::IsNearEnd(double fps, uint vvf) const
|
| 389 | 393 | // WARNING: readahead_frames can greatly overestimate or underestimate |
| 390 | 394 | // the number of frames available in the read ahead buffer |
| 391 | 395 | // when rh_frames is less than the keyframe distance. |
| | 396 | if (fps == 0.) |
| | 397 | return false; |
| 392 | 398 | double bytes_per_frame = kbits_per_sec * (1000.0/8.0) / fps; |
| | 399 | if (bytes_per_frame == 0.) |
| | 400 | return false; |
| 393 | 401 | double readahead_frames = sz / bytes_per_frame; |
| 394 | 402 | |
| 395 | 403 | bool near_end = ((vvf + readahead_frames) < 10.0) || (sz < rbs*1.5); |
| … |
… |
int RingBuffer::ReadBufAvail(void) const
|
| 428 | 436 | return ret; |
| 429 | 437 | } |
| 430 | 438 | |
| | 439 | inline int RingBuffer::ReadBufUsed() const |
| | 440 | { |
| | 441 | return (bufferSize - 1) - ReadBufFree(); |
| | 442 | } |
| | 443 | |
| | 444 | inline bool RingBuffer::ReadsAllowed() const |
| | 445 | { |
| | 446 | return ateof || setswitchtonext || commserror || |
| | 447 | // Ensure some hysteresis around fill_min |
| | 448 | ReadBufUsed() >= (readsallowed ? 1 : fill_min); |
| | 449 | } |
| | 450 | |
| 431 | 451 | /** \fn RingBuffer::ResetReadAhead(long long) |
| 432 | 452 | * \brief Restart the read-ahead thread at the 'newinternal' position. |
| 433 | 453 | * |
| … |
… |
void RingBuffer::run(void)
|
| 780 | 800 | readtimeavg = (readtimeavg * 9 + readinterval) / 10; |
| 781 | 801 | |
| 782 | 802 | if (readtimeavg < 150 && |
| 783 | | (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2)) |
| | 803 | (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2) && |
| | 804 | readblocksize >= CHUNK) |
| 784 | 805 | { |
| 785 | 806 | int old_block_size = readblocksize; |
| 786 | 807 | readblocksize = 3 * readblocksize / 2; |
| … |
… |
void RingBuffer::run(void)
|
| 869 | 890 | } |
| 870 | 891 | } |
| 871 | 892 | |
| 872 | | int used = bufferSize - ReadBufFree(); |
| 873 | | |
| 874 | 893 | bool reads_were_allowed = readsallowed; |
| 875 | 894 | |
| 876 | | if ((0 == read_return) || (numfailures > 5) || |
| 877 | | (readsallowed != (used >= fill_min || ateof || |
| 878 | | setswitchtonext || commserror))) |
| | 895 | if ((0 == read_return) || (numfailures > 5) || ReadsAllowed() != readsallowed) |
| 879 | 896 | { |
| 880 | 897 | // If readpos changes while the lock is released |
| 881 | 898 | // we should not handle the 0 read_return now. |
| … |
… |
void RingBuffer::run(void)
|
| 886 | 903 | |
| 887 | 904 | commserror |= (numfailures > 5); |
| 888 | 905 | |
| 889 | | readsallowed = used >= fill_min || ateof || |
| 890 | | setswitchtonext || commserror; |
| | 906 | bool bReadsAllowed = ReadsAllowed(); |
| | 907 | if (readsallowed != bReadsAllowed) |
| | 908 | { |
| | 909 | readsallowed = bReadsAllowed; |
| | 910 | LOG(VB_FILE, LOG_INFO, LOC + (bReadsAllowed ? |
| | 911 | QString("Reads allowed: %1 bytes available").arg(ReadBufUsed()) : |
| | 912 | QString("Rebuffering %1..%2").arg(ReadBufUsed()).arg(fill_min)) ); |
| | 913 | } |
| 891 | 914 | |
| 892 | 915 | if (0 == read_return && old_readpos == readpos) |
| 893 | 916 | { |
| … |
… |
void RingBuffer::run(void)
|
| 910 | 933 | |
| 911 | 934 | rwlock.unlock(); |
| 912 | 935 | rwlock.lockForRead(); |
| 913 | | used = bufferSize - ReadBufFree(); |
| 914 | 936 | } |
| 915 | 937 | |
| 916 | 938 | LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop"); |
| 917 | 939 | |
| 918 | 940 | if (!readsallowed || commserror || ateof || setswitchtonext || |
| 919 | | (wanttoread <= used && wanttoread > 0)) |
| | 941 | (wanttoread <= ReadBufUsed() && wanttoread > 0)) |
| 920 | 942 | { |
| 921 | 943 | // To give other threads a good chance to handle these |
| 922 | 944 | // conditions, even if they are only requesting a read lock |
| … |
… |
void RingBuffer::run(void)
|
| 930 | 952 | { |
| 931 | 953 | // yield if we have nothing to do... |
| 932 | 954 | if (!request_pause && reads_were_allowed && |
| 933 | | (used >= fill_threshold || ateof || setswitchtonext)) |
| | 955 | (ReadBufUsed() >= fill_threshold || ateof || setswitchtonext || ignoreliveeof)) |
| 934 | 956 | { |
| 935 | 957 | generalWait.wait(&rwlock, 50); |
| 936 | 958 | } |
| … |
… |
void RingBuffer::run(void)
|
| 940 | 962 | generalWait.wakeAll(); |
| 941 | 963 | rwlock.unlock(); |
| 942 | 964 | usleep(5 * 1000); |
| 943 | | rwlock.lockForRead(); |
| | 965 | rwlock.lockForRead(); |
| 944 | 966 | } |
| 945 | 967 | } |
| 946 | 968 | } |
| … |
… |
bool RingBuffer::WaitForReadsAllowed(void)
|
| 996 | 1018 | while (!readsallowed && !stopreads && |
| 997 | 1019 | !request_pause && !commserror && readaheadrunning) |
| 998 | 1020 | { |
| 999 | | generalWait.wait(&rwlock, 1000); |
| 1000 | | if (!readsallowed && t.elapsed() > 1000) |
| | 1021 | // The timeout should allow for congestion of internet streamed media |
| | 1022 | if (t.elapsed() >= 30000) |
| 1001 | 1023 | { |
| 1002 | | LOG(VB_GENERAL, LOG_WARNING, LOC + |
| 1003 | | "Taking too long to be allowed to read.."); |
| 1004 | | |
| 1005 | | if (t.elapsed() > 10000) |
| 1006 | | { |
| 1007 | | LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to " |
| 1008 | | "be allowed to read, aborting."); |
| 1009 | | return false; |
| 1010 | | } |
| | 1024 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| | 1025 | QString("Waited %1 seconds to be allowed to read, aborting.") |
| | 1026 | .arg(t.elapsed()/1000) ); |
| | 1027 | return false; |
| 1011 | 1028 | } |
| | 1029 | |
| | 1030 | generalWait.wait(&rwlock, 250); |
| 1012 | 1031 | } |
| 1013 | 1032 | |
| 1014 | | return readsallowed; |
| | 1033 | if (t.elapsed() >= 500) |
| | 1034 | { |
| | 1035 | LOG(VB_GENERAL, LOG_WARNING, LOC + |
| | 1036 | QString("Waited %1 mS to be allowed to read (avail=%2 fill_min=%3)..") |
| | 1037 | .arg(t.elapsed()).arg(ReadBufAvail()).arg(fill_min) ); |
| | 1038 | } |
| | 1039 | return true; |
| 1015 | 1040 | } |
| 1016 | 1041 | |
| 1017 | 1042 | bool RingBuffer::WaitForAvail(int count) |
| … |
… |
bool RingBuffer::WaitForAvail(int count)
|
| 1035 | 1060 | generalWait.wakeAll(); |
| 1036 | 1061 | } |
| 1037 | 1062 | |
| 1038 | | MythTimer t; |
| 1039 | | t.start(); |
| 1040 | | while ((avail < count) && !stopreads && |
| 1041 | | !request_pause && !commserror && readaheadrunning) |
| | 1063 | MythTimer t; t.start(); |
| | 1064 | wanttoread = count; |
| | 1065 | while (avail < count && !stopreads && !request_pause && |
| | 1066 | !commserror && readaheadrunning) |
| 1042 | 1067 | { |
| 1043 | | wanttoread = count; |
| 1044 | | generalWait.wait(&rwlock, 250); |
| 1045 | | avail = ReadBufAvail(); |
| 1046 | | |
| 1047 | | if (ateof && avail < count) |
| 1048 | | count = avail; |
| 1049 | | |
| 1050 | | if (avail < count) |
| | 1068 | uint elapsed = t.elapsed(); |
| | 1069 | if (elapsed >= 10000) |
| 1051 | 1070 | { |
| 1052 | | int elapsed = t.elapsed(); |
| 1053 | | if (((elapsed > 250) && (elapsed < 500)) || |
| 1054 | | ((elapsed > 500) && (elapsed < 750)) || |
| 1055 | | ((elapsed > 1000) && (elapsed < 1250)) || |
| 1056 | | ((elapsed > 2000) && (elapsed < 2250)) || |
| 1057 | | ((elapsed > 4000) && (elapsed < 4250)) || |
| 1058 | | ((elapsed > 8000) && (elapsed < 8250)) || |
| 1059 | | ((elapsed > 9000))) |
| 1060 | | { |
| 1061 | | LOG(VB_GENERAL, LOG_INFO, LOC + "Waited " + |
| 1062 | | QString("%1").arg((elapsed / 250) * 0.25f, 3, 'f', 1) + |
| 1063 | | " seconds for data \n\t\t\tto become available..." + |
| 1064 | | QString(" %2 < %3") .arg(avail).arg(count)); |
| 1065 | | } |
| 1066 | | |
| 1067 | | if (elapsed > 16000) |
| 1068 | | { |
| 1069 | | LOG(VB_GENERAL, LOG_ERR, LOC + "Waited " + |
| 1070 | | QString("%1").arg(elapsed/1000) + |
| 1071 | | " seconds for data, aborting."); |
| 1072 | | return false; |
| 1073 | | } |
| | 1071 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| | 1072 | QString("Timed out waiting for data available (wanted=%1, avail=%2)") |
| | 1073 | .arg(count).arg(avail) ); |
| | 1074 | break; |
| 1074 | 1075 | } |
| | 1076 | else if (elapsed >= 100 && avail) |
| | 1077 | { |
| | 1078 | LOG(VB_GENERAL, LOG_INFO, LOC + |
| | 1079 | QString("Waited %1 mS for %2 bytes (wanted %3)") |
| | 1080 | .arg(elapsed).arg(avail).arg(count) ); |
| | 1081 | count = avail; |
| | 1082 | generalWait.wakeAll(); |
| | 1083 | break; |
| | 1084 | } |
| | 1085 | |
| | 1086 | generalWait.wait(&rwlock, 100); |
| | 1087 | avail = ReadBufAvail(); |
| 1075 | 1088 | } |
| 1076 | 1089 | |
| 1077 | 1090 | wanttoread = 0; |
| … |
… |
int RingBuffer::ReadDirect(void *buf, int count, bool peek)
|
| 1133 | 1146 | if (new_pos != old_pos) |
| 1134 | 1147 | { |
| 1135 | 1148 | LOG(VB_GENERAL, LOG_ERR, LOC + |
| 1136 | | QString("Peek() Failed to return from new " |
| | 1149 | QString("Seek() Failed to return from new " |
| 1137 | 1150 | "position %1 to old position %2, now " |
| 1138 | 1151 | "at position %3") |
| 1139 | 1152 | .arg(old_pos - ret).arg(old_pos).arg(new_pos)); |
| … |
… |
int RingBuffer::ReadDirect(void *buf, int count, bool peek)
|
| 1153 | 1166 | */ |
| 1154 | 1167 | int RingBuffer::ReadPriv(void *buf, int count, bool peek) |
| 1155 | 1168 | { |
| 1156 | | QString loc_desc = QString("ReadPriv(..%1, %2)") |
| | 1169 | const QString loc_desc = QString("ReadPriv(..%1, %2)") |
| 1157 | 1170 | .arg(count).arg(peek?"peek":"normal"); |
| 1158 | | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
| 1159 | | QString(" @%1 -- begin").arg(rbrpos)); |
| 1160 | 1171 | |
| 1161 | 1172 | rwlock.lockForRead(); |
| | 1173 | |
| | 1174 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
| | 1175 | QString(" @%1 avail=%2 -- begin").arg(rbrpos).arg(ReadBufAvail())); |
| | 1176 | |
| 1162 | 1177 | if (writemode) |
| 1163 | 1178 | { |
| 1164 | 1179 | LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc + |
| … |
… |
int RingBuffer::ReadPriv(void *buf, int count, bool peek)
|
| 1189 | 1204 | if (request_pause || stopreads || |
| 1190 | 1205 | !readaheadrunning || (ignorereadpos >= 0)) |
| 1191 | 1206 | { |
| | 1207 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- direct read"); |
| 1192 | 1208 | int ret = ReadDirect(buf, count, peek); |
| 1193 | 1209 | LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + |
| 1194 | 1210 | QString(": ReadDirect checksum %1") |
| … |
… |
int RingBuffer::ReadPriv(void *buf, int count, bool peek)
|
| 1203 | 1219 | if (!WaitForReadsAllowed()) |
| 1204 | 1220 | { |
| 1205 | 1221 | LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()"); |
| 1206 | | rwlock.unlock(); |
| 1207 | | stopreads = true; // this needs to be outside the lock |
| 1208 | | rwlock.lockForWrite(); |
| 1209 | | wanttoread = 0; |
| | 1222 | // NB don't set stopreads or else the next ReadPriv will call ReadDirect |
| | 1223 | // which, if there's any readahead, will cause data to be returned out |
| | 1224 | // of sequence |
| 1210 | 1225 | rwlock.unlock(); |
| 1211 | 1226 | return 0; |
| 1212 | 1227 | } |
diff --git a/mythtv/libs/libmythtv/ringbuffer.h b/mythtv/libs/libmythtv/ringbuffer.h
index f551ecd..8dc633d 100644
|
a
|
b
|
class MTV_PUBLIC RingBuffer : protected MThread
|
| 166 | 166 | |
| 167 | 167 | int ReadBufFree(void) const; |
| 168 | 168 | int ReadBufAvail(void) const; |
| | 169 | int ReadBufUsed() const; |
| | 170 | bool ReadsAllowed() const; |
| 169 | 171 | |
| 170 | 172 | void ResetReadAhead(long long newinternal); |
| 171 | 173 | void KillReadAheadThread(void); |