diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-12-04 14:53:54 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2019-12-04 14:53:54 +0100 |
commit | c89b5e3c0d9515f07892af464bd6c60fb3427c36 (patch) | |
tree | 18c7573859606dada82e56d5115cd5d1e71129db /src | |
parent | 814ec3abaede73ea38c7130333c7bc0a18e05d91 (diff) | |
download | ODR-SourceCompanion-c89b5e3c0d9515f07892af464bd6c60fb3427c36.tar.gz ODR-SourceCompanion-c89b5e3c0d9515f07892af464bd6c60fb3427c36.tar.bz2 ODR-SourceCompanion-c89b5e3c0d9515f07892af464bd6c60fb3427c36.zip |
Timestamp arrival of UDP packets
Diffstat (limited to 'src')
-rw-r--r-- | src/AVTInput.cpp | 18 | ||||
-rw-r--r-- | src/AVTInput.h | 6 | ||||
-rw-r--r-- | src/OrderedQueue.cpp | 19 | ||||
-rw-r--r-- | src/OrderedQueue.h | 15 | ||||
-rw-r--r-- | src/Outputs.cpp | 51 | ||||
-rw-r--r-- | src/Outputs.h | 6 | ||||
-rw-r--r-- | src/odr-sourcecompanion.cpp | 17 |
7 files changed, 76 insertions, 56 deletions
diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp index 11dd4cc..d39f2ef 100644 --- a/src/AVTInput.cpp +++ b/src/AVTInput.cpp @@ -368,6 +368,7 @@ bool AVTInput::_readFrame() size_t dataSize = 0; auto packet = _input_socket.receive(MAX_AVT_FRAME_SIZE); + const timestamp_t ts = std::chrono::system_clock::now(); const size_t readBytes = packet.buffer.size(); if (readBytes > 0) { @@ -380,7 +381,7 @@ bool AVTInput::_readFrame() if (dataPtr) { if (dataSize == _dab24msFrameSize) { - _ordered.push(frameNumber, dataPtr, dataSize); + _ordered.push(frameNumber, dataPtr, dataSize, ts); } else ERROR("Wrong frame size from encoder %zu != %zu\n", dataSize, _dab24msFrameSize); } @@ -392,10 +393,8 @@ bool AVTInput::_readFrame() return readBytes > 0; } -ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf) +size_t AVTInput::getNextFrame(std::vector<uint8_t> &buf, std::chrono::system_clock::time_point& ts) { - ssize_t nbBytes = 0; - //printf("A: _padFrameQueue size=%zu\n", _padFrameQueue.size()); // Read all messages from encoder (in priority) @@ -409,16 +408,19 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf) int32_t returnedIndex = -1; while (_nbFrames < 5) { - auto part = _ordered.pop(&returnedIndex); + const auto queue_data = _ordered.pop(&returnedIndex); + const auto& part = queue_data.buf; if (part.empty()) { break; } while (_checkMessage()) {}; + if (not _frameAligned) { if (returnedIndex % 5 == 0) { _frameAligned = true; + _frameZeroTimestamp = queue_data.capture_timestamp; memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size()); _currentFrameSize += part.size(); @@ -430,6 +432,10 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf) memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size()); _currentFrameSize += part.size(); _nbFrames++; + + // UDP packets arrive with jitter, we intentionally only consider + // their timestamp after a discontinuity. + _frameZeroTimestamp += std::chrono::milliseconds(24); } else { _nbFrames = 0; @@ -441,11 +447,13 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf) } } + size_t nbBytes = 0; if (_nbFrames == 5 && _currentFrameSize <= buf.size()) { memcpy(&buf[0], _currentFrame.data(), _currentFrameSize); nbBytes = _currentFrameSize; _currentFrameSize = 0; _nbFrames = 0; + ts = _frameZeroTimestamp; } //printf("C: _padFrameQueue size=%zu\n", _padFrameQueue.size()); diff --git a/src/AVTInput.h b/src/AVTInput.h index e925a80..fd6cf02 100644 --- a/src/AVTInput.h +++ b/src/AVTInput.h @@ -37,6 +37,7 @@ #include <string> #include <queue> #include <vector> +#include <chrono> #define DEF_BR 64 @@ -79,12 +80,12 @@ class AVTInput */ int setDabPlusParameters(int bitrate, int channels, int sample_rate, bool sbr, bool ps); - /*! Read incomming frames from the encoder, reorder and reassemble then into DAB+ superframes + /*! Read incoming frames from the encoder, reorder and reassemble then into DAB+ superframes *! Give the next reassembled audio frame (120ms for DAB+) * * \return the size of the frame or 0 if none are available yet */ - ssize_t getNextFrame(std::vector<uint8_t> &buf); + size_t getNextFrame(std::vector<uint8_t> &buf, std::chrono::system_clock::time_point& ts); /*! Store a new PAD frame. *! Frames are sent to the encoder on request @@ -118,6 +119,7 @@ class AVTInput bool _frameAligned = false; std::vector<uint8_t> _currentFrame; int32_t _nbFrames = 0; + std::chrono::system_clock::time_point _frameZeroTimestamp; size_t _currentFrameSize = 0; bool _parseURI(const char* uri, std::string& address, long& port); diff --git a/src/OrderedQueue.cpp b/src/OrderedQueue.cpp index eb2cf97..707f0f9 100644 --- a/src/OrderedQueue.cpp +++ b/src/OrderedQueue.cpp @@ -22,6 +22,8 @@ #include <cstdio> #include <stdint.h> +using namespace std; + #define DEBUG(fmt, A...) fprintf(stderr, "OrderedQueue: " fmt, ##A) //#define DEBUG(x...) #define ERROR(fmt, A...) fprintf(stderr, "OrderedQueue: ERROR " fmt, ##A) @@ -32,7 +34,7 @@ OrderedQueue::OrderedQueue(int maxIndex, size_t capacity) : { } -void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size) +void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size, const timestamp_t& ts) { // DEBUG("OrderedQueue::push index=%d\n", index); index = (index + _maxIndex) % _maxIndex; @@ -52,8 +54,11 @@ void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size) DEBUG("Duplicated index=%d\n", index); } - OrderedQueueData oqd(size); - copy(buf, buf + size, oqd.begin()); + OrderedQueueData oqd; + oqd.buf.resize(size); + oqd.capture_timestamp = ts; + + copy(buf, buf + size, oqd.buf.begin()); _stock[index] = move(oqd); } else { @@ -73,9 +78,9 @@ bool OrderedQueue::availableData() const return _stock.size() > 0; } -std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex) +OrderedQueueData OrderedQueue::pop(int32_t *returnedIndex) { - OrderedQueueData buf; + OrderedQueueData oqd; uint32_t gap = 0; if (_stock.size() > 0) { @@ -83,7 +88,7 @@ std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex) bool found = false; while (not found) { try { - buf = move(_stock.at(nextIndex)); + oqd = move(_stock.at(nextIndex)); _stock.erase(nextIndex); _lastIndexPop = nextIndex; if (returnedIndex) *returnedIndex = _lastIndexPop; @@ -108,6 +113,6 @@ std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex) DEBUG("index jump of %d\n", gap); } - return buf; + return oqd; } diff --git a/src/OrderedQueue.h b/src/OrderedQueue.h index 7cc59ee..654ae61 100644 --- a/src/OrderedQueue.h +++ b/src/OrderedQueue.h @@ -22,9 +22,18 @@ #include <string> #include <map> #include <vector> +#include <chrono> #include <cstdint> #include <cstdio> +using timestamp_t = std::chrono::system_clock::time_point; +using vec_u8 = std::vector<uint8_t>; + +struct OrderedQueueData { + vec_u8 buf; + timestamp_t capture_timestamp; +}; + /* An queue that receives indexed frames, potentially out-of-order, * which returns the frames in-order. */ @@ -36,13 +45,11 @@ class OrderedQueue */ OrderedQueue(int32_t maxIndex, size_t capacity); - void push(int32_t index, const uint8_t* buf, size_t size); + void push(int32_t index, const uint8_t* buf, size_t size, const timestamp_t& ts); bool availableData() const; /* Return the next buffer, or an empty buffer if none available */ - std::vector<uint8_t> pop(int32_t *returnedIndex=nullptr); - - using OrderedQueueData = std::vector<uint8_t>; + OrderedQueueData pop(int32_t *returnedIndex=nullptr); private: int32_t _maxIndex; diff --git a/src/Outputs.cpp b/src/Outputs.cpp index d0d3ca4..3b4de65 100644 --- a/src/Outputs.cpp +++ b/src/Outputs.cpp @@ -136,12 +136,13 @@ bool ZMQ::write_frame(const uint8_t *buf, size_t len) } EDI::EDI() : + m_time_last_version_sent(chrono::steady_clock::now()), m_clock_tai({}) { } EDI::~EDI() { } -void EDI::add_udp_destination(const std::string& host, unsigned int port) +void EDI::add_udp_destination(const string& host, unsigned int port) { auto dest = make_shared<edi::udp_destination_t>(); dest->dest_addr = host; @@ -154,7 +155,7 @@ void EDI::add_udp_destination(const std::string& host, unsigned int port) // TODO make FEC configurable } -void EDI::add_tcp_destination(const std::string& host, unsigned int port) +void EDI::add_tcp_destination(const string& host, unsigned int port) { auto dest = make_shared<edi::tcp_client_t>(); dest->dest_addr = host; @@ -172,10 +173,22 @@ bool EDI::enabled() const return not m_edi_conf.destinations.empty(); } -void EDI::set_tist(bool enable, uint32_t delay_ms) +void EDI::set_tist(bool enable, uint32_t delay_ms, const chrono::system_clock::time_point& ts) { m_tist = enable; m_delay_ms = delay_ms; + + const auto ts_with_delay = ts + chrono::milliseconds(m_delay_ms); + + const auto ts_s = chrono::time_point_cast<chrono::seconds>(ts_with_delay); + const auto remainder = ts_with_delay - ts_s; + if (remainder < chrono::milliseconds(0)) { + throw logic_error("EDI::set_tist remainder duration negative!"); + } + const uint32_t remainder_ms = chrono::duration_cast<chrono::milliseconds>(remainder).count(); + + m_edi_time = chrono::system_clock::to_time_t(ts_s); + m_timestamp += remainder_ms << 14; // Shift ms by 14 to Timestamp level 2 } bool EDI::write_frame(const uint8_t *buf, size_t len) @@ -184,33 +197,11 @@ bool EDI::write_frame(const uint8_t *buf, size_t len) m_edi_sender = make_shared<edi::Sender>(m_edi_conf); } - if (m_edi_time == 0) { - using Sec = chrono::seconds; - const auto now = chrono::time_point_cast<Sec>(chrono::system_clock::now()); - m_edi_time = chrono::system_clock::to_time_t(now) + (m_delay_ms / 1000); - m_send_version_at_time = m_edi_time; - - /* TODO we still have to see if 24ms granularity is achievable, given that - * one DAB+ super frame is carried over more than 1 ETI frame. - */ - for (int32_t sub_ms = (m_delay_ms % 1000); sub_ms > 0; sub_ms -= 24) { - m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 - } - } - edi::TagStarPTR edi_tagStarPtr("DSTI"); m_edi_tagDSTI.stihf = false; m_edi_tagDSTI.atstf = m_tist; - m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 - if (m_timestamp > 0xf9FFff) { - m_timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second - m_edi_time += 1; - - m_num_seconds_sent++; - } - m_edi_tagDSTI.set_edi_time(m_edi_time, m_clock_tai.get_offset()); m_edi_tagDSTI.tsta = m_timestamp & 0xffffff; @@ -231,8 +222,10 @@ bool EDI::write_frame(const uint8_t *buf, size_t len) #else PACKAGE_VERSION; #endif - edi::TagODRVersion edi_tagVersion(ss.str(), m_num_seconds_sent); + // We always send in 24ms interval + const size_t num_seconds_sent = m_num_frames_sent * 1000 / 24; + edi::TagODRVersion edi_tagVersion(ss.str(), num_seconds_sent); // The above Tag Items will be assembled into a TAG Packet edi::TagPacket edi_tagpacket(m_edi_conf.tagpacket_alignment); @@ -244,13 +237,15 @@ bool EDI::write_frame(const uint8_t *buf, size_t len) edi_tagpacket.tag_items.push_back(&edi_tagAudioLevels); // Send version information only every 10 seconds to save bandwidth - if (m_send_version_at_time < m_edi_time) { - m_send_version_at_time += 10; + if (m_time_last_version_sent + chrono::seconds(10) < chrono::steady_clock::now()) { + m_time_last_version_sent += chrono::seconds(10); edi_tagpacket.tag_items.push_back(&edi_tagVersion); } m_edi_sender->write(edi_tagpacket); + m_num_frames_sent++; + // TODO Handle TCP disconnect return true; } diff --git a/src/Outputs.h b/src/Outputs.h index 0f1f34f..1f17491 100644 --- a/src/Outputs.h +++ b/src/Outputs.h @@ -136,7 +136,7 @@ class EDI: public Base { void add_udp_destination(const std::string& host, unsigned int port); void add_tcp_destination(const std::string& host, unsigned int port); - void set_tist(bool enable, uint32_t delay_ms); + void set_tist(bool enable, uint32_t delay_ms, const std::chrono::system_clock::time_point& ts); bool enabled() const; @@ -147,9 +147,9 @@ class EDI: public Base { std::shared_ptr<edi::Sender> m_edi_sender; uint32_t m_timestamp = 0; - uint32_t m_num_seconds_sent = 0; + uint32_t m_num_frames_sent = 0; std::time_t m_edi_time = 0; - std::time_t m_send_version_at_time = 0; + std::chrono::steady_clock::time_point m_time_last_version_sent; edi::TagDSTI m_edi_tagDSTI; diff --git a/src/odr-sourcecompanion.cpp b/src/odr-sourcecompanion.cpp index 02475eb..b8c5547 100644 --- a/src/odr-sourcecompanion.cpp +++ b/src/odr-sourcecompanion.cpp @@ -323,10 +323,6 @@ int main(int argc, char *argv[]) } } - if (not edi_output_uris.empty()) { - edi_output.set_tist(tist_enabled, tist_delay_ms); - } - if (padlen != 0) { int flags; if (mkfifo(pad_fifo, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH) != 0) { @@ -436,14 +432,21 @@ int main(int argc, char *argv[]) } } - numOutBytes = avtinput.getNextFrame(outbuf); - if (numOutBytes == 0) { + chrono::system_clock::time_point ts; + numOutBytes = avtinput.getNextFrame(outbuf, ts); + if (numOutBytes > 0) { + if (not edi_output_uris.empty()) { + edi_output.set_tist(tist_enabled, tist_delay_ms, ts); + } + } + else { const auto curTime = std::chrono::steady_clock::now(); const auto diff = curTime - timeout_start; if (diff > timeout_duration) { fprintf(stderr, "timeout reached\n"); timedout = true; - } else { + } + else { const int wait_ms = 1; usleep(wait_ms * 1000); } |