aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-26 10:57:58 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-06-26 10:57:58 +0200
commit8e7a15754a3fef09cc5de372f207936740459c56 (patch)
treef1c3cc289eebc9ed9b620696b24467680288cc39
parent899dcb83ec873cb35d38583d6f48922e1312e9be (diff)
downloadODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.tar.gz
ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.tar.bz2
ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.zip
Refactor OrderedQueue a bit
-rw-r--r--src/AVTInput.cpp11
-rw-r--r--src/AVTInput.h12
-rw-r--r--src/OrderedQueue.cpp59
-rw-r--r--src/OrderedQueue.h18
4 files changed, 52 insertions, 48 deletions
diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp
index 973ed7b..0e5b669 100644
--- a/src/AVTInput.cpp
+++ b/src/AVTInput.cpp
@@ -69,9 +69,7 @@ AVTInput::AVTInput(const std::string& input_uri,
_input_pad_packet(2048),
_ordered(5000, _jitterBufferSize),
_lastInfoFrameType(_typeCantExtract)
-{
-
-}
+{ }
int AVTInput::prepare(void)
{
@@ -430,14 +428,13 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)
//printf("B: _padFrameQueue size=%zu\n", _padFrameQueue.size());
// Assemble next frame
- int32_t nb = 0;
std::vector<uint8_t> part;
- while (_nbFrames < 5 && (nb = _ordered.pop(part)) != 0)
+ while (_nbFrames < 5 and not (part = _ordered.pop()).empty())
{
while (_checkMessage());
- memcpy(_currentFrame.data() + _currentFrameSize, part.data(), nb);
- _currentFrameSize += nb;
+ memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size());
+ _currentFrameSize += part.size();
_nbFrames ++;
}
diff --git a/src/AVTInput.h b/src/AVTInput.h
index 62b2248..0f58418 100644
--- a/src/AVTInput.h
+++ b/src/AVTInput.h
@@ -101,12 +101,12 @@ class AVTInput
uint32_t _pad_port;
size_t _jitterBufferSize;
- Socket::UDPSocket _input_socket;
- Socket::UDPSocket _output_socket;
- Socket::UDPPacket _output_packet;
- Socket::UDPSocket _input_pad_socket;
- Socket::UDPPacket _input_pad_packet;
- OrderedQueue _ordered;
+ Socket::UDPSocket _input_socket;
+ Socket::UDPSocket _output_socket;
+ Socket::UDPPacket _output_packet;
+ Socket::UDPSocket _input_pad_socket;
+ Socket::UDPPacket _input_pad_packet;
+ OrderedQueue _ordered;
std::queue<std::vector<uint8_t> > _padFrameQueue;
int32_t _subChannelIndex = DEF_BR/8;
diff --git a/src/OrderedQueue.cpp b/src/OrderedQueue.cpp
index 8d545df..8a768e7 100644
--- a/src/OrderedQueue.cpp
+++ b/src/OrderedQueue.cpp
@@ -26,39 +26,40 @@
//#define DEBUG(x...)
#define ERROR(fmt, A...) fprintf(stderr, "OrderedQueue: ERROR " fmt, ##A)
-OrderedQueue::OrderedQueue(int countModulo, size_t capacity) :
- _countModulo(countModulo),
+OrderedQueue::OrderedQueue(int maxIndex, size_t capacity) :
+ _maxIndex(maxIndex),
_capacity(capacity)
{
}
-void OrderedQueue::push(int32_t count, const uint8_t* buf, size_t size)
+void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size)
{
-// DEBUG("OrderedQueue::push count=%d\n", count);
- count = (count+_countModulo) % _countModulo;
+ // DEBUG("OrderedQueue::push index=%d\n", index);
+ index = (index + _maxIndex) % _maxIndex;
- // First frame makes the count initialisation.
- if (_lastCount == -1) {
- _lastCount = (count+_countModulo-1) % _countModulo;
+ // First frame makes the index initialisation.
+ if (_lastIndexPop == -1) {
+ // Equivalent to index - 1 in modulo arithmetic:
+ _lastIndexPop = (index + _maxIndex-1) % _maxIndex;
}
if (_stock.size() < _capacity) {
- if (_stock.find(count) == _stock.end()) {
- // count already exists, duplicated frame
+ if (_stock.find(index) == _stock.end()) {
+ // index already exists, duplicated frame
// Replace the old one by the new one.
- // the old one could a an old frame from the previous count loop
+ // the old one could a an old frame from the previous index loop
_duplicated++;
- DEBUG("Duplicated count=%d\n", count);
+ DEBUG("Duplicated index=%d\n", index);
}
OrderedQueueData oqd(size);
copy(buf, buf + size, oqd.begin());
- _stock[count] = move(oqd);
+ _stock[index] = move(oqd);
}
else {
_overruns++;
if (_overruns < 100) {
- DEBUG("Overruns (size=%zu) count=%d not inserted\n", _stock.size(), count);
+ DEBUG("Overruns (size=%zu) index=%d not inserted\n", _stock.size(), index);
}
else if (_overruns == 100) {
DEBUG("stop displaying Overruns\n");
@@ -72,43 +73,41 @@ bool OrderedQueue::availableData() const
return _stock.size() > 0;
}
-size_t OrderedQueue::pop(std::vector<uint8_t>& buf, int32_t *retCount)
+std::vector<uint8_t> OrderedQueue::pop(int32_t *retCount)
{
- size_t nbBytes = 0;
+ OrderedQueueData buf;
uint32_t gap = 0;
if (_stock.size() > 0) {
- int32_t nextCount = (_lastCount+1) % _countModulo;
+ int32_t nextIndex = (_lastIndexPop+1) % _maxIndex;
bool found = false;
while (not found) {
try {
- auto& oqd = _stock.at(nextCount);
- buf = move(oqd);
- _stock.erase(nextCount);
- _lastCount = nextCount;
- if (retCount) *retCount = _lastCount;
+ buf = move(_stock.at(nextIndex));
+ _stock.erase(nextIndex);
+ _lastIndexPop = nextIndex;
+ if (retCount) *retCount = _lastIndexPop;
found = true;
}
- catch (const std::out_of_range&)
- {
+ catch (const std::out_of_range&) {
if (_stock.size() < _capacity) {
- found = true;
+ break;
}
else {
- // Search for the new reference count, starting from the current one
+ // Search for the new index, starting from the current one
// This could be optimised, but the modulo makes things
// not easy.
gap++;
- nextCount = (nextCount+1) % _countModulo;
+ nextIndex = (nextIndex+1) % _maxIndex;
}
}
}
}
if (gap > 0) {
- DEBUG("Count jump of %d\n", gap);
+ DEBUG("index jump of %d\n", gap);
}
-// if (nbBytes > 0 && retCount) DEBUG("OrderedQueue::pop count=%d\n", *retCount);
- return nbBytes;
+
+ return buf;
}
diff --git a/src/OrderedQueue.h b/src/OrderedQueue.h
index c8958cb..4652762 100644
--- a/src/OrderedQueue.h
+++ b/src/OrderedQueue.h
@@ -25,23 +25,31 @@
#include <cstdint>
#include <cstdio>
+/* An queue that receives indexed frames, potentially out-of-order,
+ * which returns the frames in-order.
+ */
class OrderedQueue
{
public:
- OrderedQueue(int32_t countModulo, size_t capacity);
+ /* Indexes of frames must be between 0 and maxIndex.
+ * The queue will fill to capacity if there is a gap.
+ */
+ OrderedQueue(int32_t maxIndex, size_t capacity);
- void push(int32_t count, const uint8_t* buf, size_t size);
+ void push(int32_t index, const uint8_t* buf, size_t size);
bool availableData() const;
- size_t pop(std::vector<uint8_t>& buf, int32_t *retCount=nullptr);
+
+ /* Return the next buffer, or an empty buffer if none available */
+ std::vector<uint8_t> pop(int32_t *retCount=nullptr);
using OrderedQueueData = std::vector<uint8_t>;
private:
- int32_t _countModulo;
+ int32_t _maxIndex;
size_t _capacity;
uint64_t _duplicated = 0;
uint64_t _overruns = 0;
- int32_t _lastCount = -1;
+ int32_t _lastIndexPop = -1;
std::map<int, OrderedQueueData> _stock;
};