diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-26 10:57:58 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-06-26 10:57:58 +0200 |
commit | 8e7a15754a3fef09cc5de372f207936740459c56 (patch) | |
tree | f1c3cc289eebc9ed9b620696b24467680288cc39 | |
parent | 899dcb83ec873cb35d38583d6f48922e1312e9be (diff) | |
download | ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.tar.gz ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.tar.bz2 ODR-SourceCompanion-8e7a15754a3fef09cc5de372f207936740459c56.zip |
Refactor OrderedQueue a bit
-rw-r--r-- | src/AVTInput.cpp | 11 | ||||
-rw-r--r-- | src/AVTInput.h | 12 | ||||
-rw-r--r-- | src/OrderedQueue.cpp | 59 | ||||
-rw-r--r-- | src/OrderedQueue.h | 18 |
4 files changed, 52 insertions, 48 deletions
diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp index 973ed7b..0e5b669 100644 --- a/src/AVTInput.cpp +++ b/src/AVTInput.cpp @@ -69,9 +69,7 @@ AVTInput::AVTInput(const std::string& input_uri, _input_pad_packet(2048), _ordered(5000, _jitterBufferSize), _lastInfoFrameType(_typeCantExtract) -{ - -} +{ } int AVTInput::prepare(void) { @@ -430,14 +428,13 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf) //printf("B: _padFrameQueue size=%zu\n", _padFrameQueue.size()); // Assemble next frame - int32_t nb = 0; std::vector<uint8_t> part; - while (_nbFrames < 5 && (nb = _ordered.pop(part)) != 0) + while (_nbFrames < 5 and not (part = _ordered.pop()).empty()) { while (_checkMessage()); - memcpy(_currentFrame.data() + _currentFrameSize, part.data(), nb); - _currentFrameSize += nb; + memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size()); + _currentFrameSize += part.size(); _nbFrames ++; } diff --git a/src/AVTInput.h b/src/AVTInput.h index 62b2248..0f58418 100644 --- a/src/AVTInput.h +++ b/src/AVTInput.h @@ -101,12 +101,12 @@ class AVTInput uint32_t _pad_port; size_t _jitterBufferSize; - Socket::UDPSocket _input_socket; - Socket::UDPSocket _output_socket; - Socket::UDPPacket _output_packet; - Socket::UDPSocket _input_pad_socket; - Socket::UDPPacket _input_pad_packet; - OrderedQueue _ordered; + Socket::UDPSocket _input_socket; + Socket::UDPSocket _output_socket; + Socket::UDPPacket _output_packet; + Socket::UDPSocket _input_pad_socket; + Socket::UDPPacket _input_pad_packet; + OrderedQueue _ordered; std::queue<std::vector<uint8_t> > _padFrameQueue; int32_t _subChannelIndex = DEF_BR/8; diff --git a/src/OrderedQueue.cpp b/src/OrderedQueue.cpp index 8d545df..8a768e7 100644 --- a/src/OrderedQueue.cpp +++ b/src/OrderedQueue.cpp @@ -26,39 +26,40 @@ //#define DEBUG(x...) #define ERROR(fmt, A...) fprintf(stderr, "OrderedQueue: ERROR " fmt, ##A) -OrderedQueue::OrderedQueue(int countModulo, size_t capacity) : - _countModulo(countModulo), +OrderedQueue::OrderedQueue(int maxIndex, size_t capacity) : + _maxIndex(maxIndex), _capacity(capacity) { } -void OrderedQueue::push(int32_t count, const uint8_t* buf, size_t size) +void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size) { -// DEBUG("OrderedQueue::push count=%d\n", count); - count = (count+_countModulo) % _countModulo; + // DEBUG("OrderedQueue::push index=%d\n", index); + index = (index + _maxIndex) % _maxIndex; - // First frame makes the count initialisation. - if (_lastCount == -1) { - _lastCount = (count+_countModulo-1) % _countModulo; + // First frame makes the index initialisation. + if (_lastIndexPop == -1) { + // Equivalent to index - 1 in modulo arithmetic: + _lastIndexPop = (index + _maxIndex-1) % _maxIndex; } if (_stock.size() < _capacity) { - if (_stock.find(count) == _stock.end()) { - // count already exists, duplicated frame + if (_stock.find(index) == _stock.end()) { + // index already exists, duplicated frame // Replace the old one by the new one. - // the old one could a an old frame from the previous count loop + // the old one could a an old frame from the previous index loop _duplicated++; - DEBUG("Duplicated count=%d\n", count); + DEBUG("Duplicated index=%d\n", index); } OrderedQueueData oqd(size); copy(buf, buf + size, oqd.begin()); - _stock[count] = move(oqd); + _stock[index] = move(oqd); } else { _overruns++; if (_overruns < 100) { - DEBUG("Overruns (size=%zu) count=%d not inserted\n", _stock.size(), count); + DEBUG("Overruns (size=%zu) index=%d not inserted\n", _stock.size(), index); } else if (_overruns == 100) { DEBUG("stop displaying Overruns\n"); @@ -72,43 +73,41 @@ bool OrderedQueue::availableData() const return _stock.size() > 0; } -size_t OrderedQueue::pop(std::vector<uint8_t>& buf, int32_t *retCount) +std::vector<uint8_t> OrderedQueue::pop(int32_t *retCount) { - size_t nbBytes = 0; + OrderedQueueData buf; uint32_t gap = 0; if (_stock.size() > 0) { - int32_t nextCount = (_lastCount+1) % _countModulo; + int32_t nextIndex = (_lastIndexPop+1) % _maxIndex; bool found = false; while (not found) { try { - auto& oqd = _stock.at(nextCount); - buf = move(oqd); - _stock.erase(nextCount); - _lastCount = nextCount; - if (retCount) *retCount = _lastCount; + buf = move(_stock.at(nextIndex)); + _stock.erase(nextIndex); + _lastIndexPop = nextIndex; + if (retCount) *retCount = _lastIndexPop; found = true; } - catch (const std::out_of_range&) - { + catch (const std::out_of_range&) { if (_stock.size() < _capacity) { - found = true; + break; } else { - // Search for the new reference count, starting from the current one + // Search for the new index, starting from the current one // This could be optimised, but the modulo makes things // not easy. gap++; - nextCount = (nextCount+1) % _countModulo; + nextIndex = (nextIndex+1) % _maxIndex; } } } } if (gap > 0) { - DEBUG("Count jump of %d\n", gap); + DEBUG("index jump of %d\n", gap); } -// if (nbBytes > 0 && retCount) DEBUG("OrderedQueue::pop count=%d\n", *retCount); - return nbBytes; + + return buf; } diff --git a/src/OrderedQueue.h b/src/OrderedQueue.h index c8958cb..4652762 100644 --- a/src/OrderedQueue.h +++ b/src/OrderedQueue.h @@ -25,23 +25,31 @@ #include <cstdint> #include <cstdio> +/* An queue that receives indexed frames, potentially out-of-order, + * which returns the frames in-order. + */ class OrderedQueue { public: - OrderedQueue(int32_t countModulo, size_t capacity); + /* Indexes of frames must be between 0 and maxIndex. + * The queue will fill to capacity if there is a gap. + */ + OrderedQueue(int32_t maxIndex, size_t capacity); - void push(int32_t count, const uint8_t* buf, size_t size); + void push(int32_t index, const uint8_t* buf, size_t size); bool availableData() const; - size_t pop(std::vector<uint8_t>& buf, int32_t *retCount=nullptr); + + /* Return the next buffer, or an empty buffer if none available */ + std::vector<uint8_t> pop(int32_t *retCount=nullptr); using OrderedQueueData = std::vector<uint8_t>; private: - int32_t _countModulo; + int32_t _maxIndex; size_t _capacity; uint64_t _duplicated = 0; uint64_t _overruns = 0; - int32_t _lastCount = -1; + int32_t _lastIndexPop = -1; std::map<int, OrderedQueueData> _stock; }; |