aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-12-04 14:53:54 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-12-04 14:53:54 +0100
commitc89b5e3c0d9515f07892af464bd6c60fb3427c36 (patch)
tree18c7573859606dada82e56d5115cd5d1e71129db
parent814ec3abaede73ea38c7130333c7bc0a18e05d91 (diff)
downloadODR-SourceCompanion-c89b5e3c0d9515f07892af464bd6c60fb3427c36.tar.gz
ODR-SourceCompanion-c89b5e3c0d9515f07892af464bd6c60fb3427c36.tar.bz2
ODR-SourceCompanion-c89b5e3c0d9515f07892af464bd6c60fb3427c36.zip
Timestamp arrival of UDP packets
-rw-r--r--src/AVTInput.cpp18
-rw-r--r--src/AVTInput.h6
-rw-r--r--src/OrderedQueue.cpp19
-rw-r--r--src/OrderedQueue.h15
-rw-r--r--src/Outputs.cpp51
-rw-r--r--src/Outputs.h6
-rw-r--r--src/odr-sourcecompanion.cpp17
7 files changed, 76 insertions, 56 deletions
diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp
index 11dd4cc..d39f2ef 100644
--- a/src/AVTInput.cpp
+++ b/src/AVTInput.cpp
@@ -368,6 +368,7 @@ bool AVTInput::_readFrame()
size_t dataSize = 0;
auto packet = _input_socket.receive(MAX_AVT_FRAME_SIZE);
+ const timestamp_t ts = std::chrono::system_clock::now();
const size_t readBytes = packet.buffer.size();
if (readBytes > 0) {
@@ -380,7 +381,7 @@ bool AVTInput::_readFrame()
if (dataPtr) {
if (dataSize == _dab24msFrameSize) {
- _ordered.push(frameNumber, dataPtr, dataSize);
+ _ordered.push(frameNumber, dataPtr, dataSize, ts);
}
else ERROR("Wrong frame size from encoder %zu != %zu\n", dataSize, _dab24msFrameSize);
}
@@ -392,10 +393,8 @@ bool AVTInput::_readFrame()
return readBytes > 0;
}
-ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)
+size_t AVTInput::getNextFrame(std::vector<uint8_t> &buf, std::chrono::system_clock::time_point& ts)
{
- ssize_t nbBytes = 0;
-
//printf("A: _padFrameQueue size=%zu\n", _padFrameQueue.size());
// Read all messages from encoder (in priority)
@@ -409,16 +408,19 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)
int32_t returnedIndex = -1;
while (_nbFrames < 5) {
- auto part = _ordered.pop(&returnedIndex);
+ const auto queue_data = _ordered.pop(&returnedIndex);
+ const auto& part = queue_data.buf;
if (part.empty()) {
break;
}
while (_checkMessage()) {};
+
if (not _frameAligned) {
if (returnedIndex % 5 == 0) {
_frameAligned = true;
+ _frameZeroTimestamp = queue_data.capture_timestamp;
memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size());
_currentFrameSize += part.size();
@@ -430,6 +432,10 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)
memcpy(_currentFrame.data() + _currentFrameSize, part.data(), part.size());
_currentFrameSize += part.size();
_nbFrames++;
+
+ // UDP packets arrive with jitter, we intentionally only consider
+ // their timestamp after a discontinuity.
+ _frameZeroTimestamp += std::chrono::milliseconds(24);
}
else {
_nbFrames = 0;
@@ -441,11 +447,13 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf)
}
}
+ size_t nbBytes = 0;
if (_nbFrames == 5 && _currentFrameSize <= buf.size()) {
memcpy(&buf[0], _currentFrame.data(), _currentFrameSize);
nbBytes = _currentFrameSize;
_currentFrameSize = 0;
_nbFrames = 0;
+ ts = _frameZeroTimestamp;
}
//printf("C: _padFrameQueue size=%zu\n", _padFrameQueue.size());
diff --git a/src/AVTInput.h b/src/AVTInput.h
index e925a80..fd6cf02 100644
--- a/src/AVTInput.h
+++ b/src/AVTInput.h
@@ -37,6 +37,7 @@
#include <string>
#include <queue>
#include <vector>
+#include <chrono>
#define DEF_BR 64
@@ -79,12 +80,12 @@ class AVTInput
*/
int setDabPlusParameters(int bitrate, int channels, int sample_rate, bool sbr, bool ps);
- /*! Read incomming frames from the encoder, reorder and reassemble then into DAB+ superframes
+ /*! Read incoming frames from the encoder, reorder and reassemble then into DAB+ superframes
*! Give the next reassembled audio frame (120ms for DAB+)
*
* \return the size of the frame or 0 if none are available yet
*/
- ssize_t getNextFrame(std::vector<uint8_t> &buf);
+ size_t getNextFrame(std::vector<uint8_t> &buf, std::chrono::system_clock::time_point& ts);
/*! Store a new PAD frame.
*! Frames are sent to the encoder on request
@@ -118,6 +119,7 @@ class AVTInput
bool _frameAligned = false;
std::vector<uint8_t> _currentFrame;
int32_t _nbFrames = 0;
+ std::chrono::system_clock::time_point _frameZeroTimestamp;
size_t _currentFrameSize = 0;
bool _parseURI(const char* uri, std::string& address, long& port);
diff --git a/src/OrderedQueue.cpp b/src/OrderedQueue.cpp
index eb2cf97..707f0f9 100644
--- a/src/OrderedQueue.cpp
+++ b/src/OrderedQueue.cpp
@@ -22,6 +22,8 @@
#include <cstdio>
#include <stdint.h>
+using namespace std;
+
#define DEBUG(fmt, A...) fprintf(stderr, "OrderedQueue: " fmt, ##A)
//#define DEBUG(x...)
#define ERROR(fmt, A...) fprintf(stderr, "OrderedQueue: ERROR " fmt, ##A)
@@ -32,7 +34,7 @@ OrderedQueue::OrderedQueue(int maxIndex, size_t capacity) :
{
}
-void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size)
+void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size, const timestamp_t& ts)
{
// DEBUG("OrderedQueue::push index=%d\n", index);
index = (index + _maxIndex) % _maxIndex;
@@ -52,8 +54,11 @@ void OrderedQueue::push(int32_t index, const uint8_t* buf, size_t size)
DEBUG("Duplicated index=%d\n", index);
}
- OrderedQueueData oqd(size);
- copy(buf, buf + size, oqd.begin());
+ OrderedQueueData oqd;
+ oqd.buf.resize(size);
+ oqd.capture_timestamp = ts;
+
+ copy(buf, buf + size, oqd.buf.begin());
_stock[index] = move(oqd);
}
else {
@@ -73,9 +78,9 @@ bool OrderedQueue::availableData() const
return _stock.size() > 0;
}
-std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex)
+OrderedQueueData OrderedQueue::pop(int32_t *returnedIndex)
{
- OrderedQueueData buf;
+ OrderedQueueData oqd;
uint32_t gap = 0;
if (_stock.size() > 0) {
@@ -83,7 +88,7 @@ std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex)
bool found = false;
while (not found) {
try {
- buf = move(_stock.at(nextIndex));
+ oqd = move(_stock.at(nextIndex));
_stock.erase(nextIndex);
_lastIndexPop = nextIndex;
if (returnedIndex) *returnedIndex = _lastIndexPop;
@@ -108,6 +113,6 @@ std::vector<uint8_t> OrderedQueue::pop(int32_t *returnedIndex)
DEBUG("index jump of %d\n", gap);
}
- return buf;
+ return oqd;
}
diff --git a/src/OrderedQueue.h b/src/OrderedQueue.h
index 7cc59ee..654ae61 100644
--- a/src/OrderedQueue.h
+++ b/src/OrderedQueue.h
@@ -22,9 +22,18 @@
#include <string>
#include <map>
#include <vector>
+#include <chrono>
#include <cstdint>
#include <cstdio>
+using timestamp_t = std::chrono::system_clock::time_point;
+using vec_u8 = std::vector<uint8_t>;
+
+struct OrderedQueueData {
+ vec_u8 buf;
+ timestamp_t capture_timestamp;
+};
+
/* An queue that receives indexed frames, potentially out-of-order,
* which returns the frames in-order.
*/
@@ -36,13 +45,11 @@ class OrderedQueue
*/
OrderedQueue(int32_t maxIndex, size_t capacity);
- void push(int32_t index, const uint8_t* buf, size_t size);
+ void push(int32_t index, const uint8_t* buf, size_t size, const timestamp_t& ts);
bool availableData() const;
/* Return the next buffer, or an empty buffer if none available */
- std::vector<uint8_t> pop(int32_t *returnedIndex=nullptr);
-
- using OrderedQueueData = std::vector<uint8_t>;
+ OrderedQueueData pop(int32_t *returnedIndex=nullptr);
private:
int32_t _maxIndex;
diff --git a/src/Outputs.cpp b/src/Outputs.cpp
index d0d3ca4..3b4de65 100644
--- a/src/Outputs.cpp
+++ b/src/Outputs.cpp
@@ -136,12 +136,13 @@ bool ZMQ::write_frame(const uint8_t *buf, size_t len)
}
EDI::EDI() :
+ m_time_last_version_sent(chrono::steady_clock::now()),
m_clock_tai({})
{ }
EDI::~EDI() { }
-void EDI::add_udp_destination(const std::string& host, unsigned int port)
+void EDI::add_udp_destination(const string& host, unsigned int port)
{
auto dest = make_shared<edi::udp_destination_t>();
dest->dest_addr = host;
@@ -154,7 +155,7 @@ void EDI::add_udp_destination(const std::string& host, unsigned int port)
// TODO make FEC configurable
}
-void EDI::add_tcp_destination(const std::string& host, unsigned int port)
+void EDI::add_tcp_destination(const string& host, unsigned int port)
{
auto dest = make_shared<edi::tcp_client_t>();
dest->dest_addr = host;
@@ -172,10 +173,22 @@ bool EDI::enabled() const
return not m_edi_conf.destinations.empty();
}
-void EDI::set_tist(bool enable, uint32_t delay_ms)
+void EDI::set_tist(bool enable, uint32_t delay_ms, const chrono::system_clock::time_point& ts)
{
m_tist = enable;
m_delay_ms = delay_ms;
+
+ const auto ts_with_delay = ts + chrono::milliseconds(m_delay_ms);
+
+ const auto ts_s = chrono::time_point_cast<chrono::seconds>(ts_with_delay);
+ const auto remainder = ts_with_delay - ts_s;
+ if (remainder < chrono::milliseconds(0)) {
+ throw logic_error("EDI::set_tist remainder duration negative!");
+ }
+ const uint32_t remainder_ms = chrono::duration_cast<chrono::milliseconds>(remainder).count();
+
+ m_edi_time = chrono::system_clock::to_time_t(ts_s);
+ m_timestamp += remainder_ms << 14; // Shift ms by 14 to Timestamp level 2
}
bool EDI::write_frame(const uint8_t *buf, size_t len)
@@ -184,33 +197,11 @@ bool EDI::write_frame(const uint8_t *buf, size_t len)
m_edi_sender = make_shared<edi::Sender>(m_edi_conf);
}
- if (m_edi_time == 0) {
- using Sec = chrono::seconds;
- const auto now = chrono::time_point_cast<Sec>(chrono::system_clock::now());
- m_edi_time = chrono::system_clock::to_time_t(now) + (m_delay_ms / 1000);
- m_send_version_at_time = m_edi_time;
-
- /* TODO we still have to see if 24ms granularity is achievable, given that
- * one DAB+ super frame is carried over more than 1 ETI frame.
- */
- for (int32_t sub_ms = (m_delay_ms % 1000); sub_ms > 0; sub_ms -= 24) {
- m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2
- }
- }
-
edi::TagStarPTR edi_tagStarPtr("DSTI");
m_edi_tagDSTI.stihf = false;
m_edi_tagDSTI.atstf = m_tist;
- m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2
- if (m_timestamp > 0xf9FFff) {
- m_timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second
- m_edi_time += 1;
-
- m_num_seconds_sent++;
- }
-
m_edi_tagDSTI.set_edi_time(m_edi_time, m_clock_tai.get_offset());
m_edi_tagDSTI.tsta = m_timestamp & 0xffffff;
@@ -231,8 +222,10 @@ bool EDI::write_frame(const uint8_t *buf, size_t len)
#else
PACKAGE_VERSION;
#endif
- edi::TagODRVersion edi_tagVersion(ss.str(), m_num_seconds_sent);
+ // We always send in 24ms interval
+ const size_t num_seconds_sent = m_num_frames_sent * 1000 / 24;
+ edi::TagODRVersion edi_tagVersion(ss.str(), num_seconds_sent);
// The above Tag Items will be assembled into a TAG Packet
edi::TagPacket edi_tagpacket(m_edi_conf.tagpacket_alignment);
@@ -244,13 +237,15 @@ bool EDI::write_frame(const uint8_t *buf, size_t len)
edi_tagpacket.tag_items.push_back(&edi_tagAudioLevels);
// Send version information only every 10 seconds to save bandwidth
- if (m_send_version_at_time < m_edi_time) {
- m_send_version_at_time += 10;
+ if (m_time_last_version_sent + chrono::seconds(10) < chrono::steady_clock::now()) {
+ m_time_last_version_sent += chrono::seconds(10);
edi_tagpacket.tag_items.push_back(&edi_tagVersion);
}
m_edi_sender->write(edi_tagpacket);
+ m_num_frames_sent++;
+
// TODO Handle TCP disconnect
return true;
}
diff --git a/src/Outputs.h b/src/Outputs.h
index 0f1f34f..1f17491 100644
--- a/src/Outputs.h
+++ b/src/Outputs.h
@@ -136,7 +136,7 @@ class EDI: public Base {
void add_udp_destination(const std::string& host, unsigned int port);
void add_tcp_destination(const std::string& host, unsigned int port);
- void set_tist(bool enable, uint32_t delay_ms);
+ void set_tist(bool enable, uint32_t delay_ms, const std::chrono::system_clock::time_point& ts);
bool enabled() const;
@@ -147,9 +147,9 @@ class EDI: public Base {
std::shared_ptr<edi::Sender> m_edi_sender;
uint32_t m_timestamp = 0;
- uint32_t m_num_seconds_sent = 0;
+ uint32_t m_num_frames_sent = 0;
std::time_t m_edi_time = 0;
- std::time_t m_send_version_at_time = 0;
+ std::chrono::steady_clock::time_point m_time_last_version_sent;
edi::TagDSTI m_edi_tagDSTI;
diff --git a/src/odr-sourcecompanion.cpp b/src/odr-sourcecompanion.cpp
index 02475eb..b8c5547 100644
--- a/src/odr-sourcecompanion.cpp
+++ b/src/odr-sourcecompanion.cpp
@@ -323,10 +323,6 @@ int main(int argc, char *argv[])
}
}
- if (not edi_output_uris.empty()) {
- edi_output.set_tist(tist_enabled, tist_delay_ms);
- }
-
if (padlen != 0) {
int flags;
if (mkfifo(pad_fifo, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH) != 0) {
@@ -436,14 +432,21 @@ int main(int argc, char *argv[])
}
}
- numOutBytes = avtinput.getNextFrame(outbuf);
- if (numOutBytes == 0) {
+ chrono::system_clock::time_point ts;
+ numOutBytes = avtinput.getNextFrame(outbuf, ts);
+ if (numOutBytes > 0) {
+ if (not edi_output_uris.empty()) {
+ edi_output.set_tist(tist_enabled, tist_delay_ms, ts);
+ }
+ }
+ else {
const auto curTime = std::chrono::steady_clock::now();
const auto diff = curTime - timeout_start;
if (diff > timeout_duration) {
fprintf(stderr, "timeout reached\n");
timedout = true;
- } else {
+ }
+ else {
const int wait_ms = 1;
usleep(wait_ms * 1000);
}