summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/edi/STIWriter.cpp1
-rw-r--r--lib/edi/common.cpp25
-rw-r--r--lib/edi/common.hpp4
-rw-r--r--src/input/Edi.cpp48
-rw-r--r--src/input/Edi.h3
-rw-r--r--src/utils.cpp2
6 files changed, 72 insertions, 11 deletions
diff --git a/lib/edi/STIWriter.cpp b/lib/edi/STIWriter.cpp
index 6964eb1..399922a 100644
--- a/lib/edi/STIWriter.cpp
+++ b/lib/edi/STIWriter.cpp
@@ -120,6 +120,7 @@ void STIWriter::assemble()
m_stiFrame.frame = m_payload.istd;
m_stiFrame.timestamp.seconds = m_seconds;
m_stiFrame.timestamp.utco = m_utco;
+ m_stiFrame.timestamp.tsta = m_management_data.tsta;
}
sti_frame_t STIWriter::getFrame()
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index bc0fa1b..b4b0c79 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -25,18 +25,31 @@
#include <iomanip>
#include <sstream>
#include <cassert>
+#include <cmath>
#include <cstdio>
namespace EdiDecoder {
using namespace std;
+bool frame_timestamp_t::valid() const
+{
+ return tsta != 0xFFFFFF;
+}
+
string frame_timestamp_t::to_string() const
{
const time_t seconds_in_unix_epoch = to_unix_epoch();
stringstream ss;
- ss << "Timestamp: " << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z");
+ if (valid()) {
+ ss << "Timestamp: ";
+ }
+ else {
+ ss << "Timestamp not valid: ";
+ }
+ ss << std::put_time(std::gmtime(&seconds_in_unix_epoch), "%c %Z") <<
+ " + " << ((double)tsta / 16384000.0);
return ss.str();
}
@@ -48,6 +61,16 @@ time_t frame_timestamp_t::to_unix_epoch() const
return 946684800 + seconds - utco;
}
+std::chrono::system_clock::time_point frame_timestamp_t::to_system_clock() const
+{
+ auto ts = chrono::system_clock::from_time_t(to_unix_epoch());
+
+ // PPS offset in seconds = tsta / 16384000
+ ts += chrono::nanoseconds(std::lrint(tsta / 0.016384));
+
+ return ts;
+}
+
TagDispatcher::TagDispatcher(
std::function<void()>&& af_packet_completed, bool verbose) :
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index 1433004..887bc3d 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -23,6 +23,7 @@
#include "PFT.hpp"
#include <functional>
#include <map>
+#include <chrono>
#include <string>
#include <vector>
#include <cstddef>
@@ -33,9 +34,12 @@ namespace EdiDecoder {
struct frame_timestamp_t {
uint32_t seconds = 0;
uint32_t utco = 0;
+ uint32_t tsta = 0; // According to EN 300 797 Annex B
+ bool valid() const;
std::string to_string() const;
time_t to_unix_epoch() const;
+ std::chrono::system_clock::time_point to_system_clock() const;
};
struct decode_state_t {
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index 8aee296..765a355 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -27,6 +27,7 @@
#include "input/Edi.h"
#include <regex>
+#include <chrono>
#include <stdexcept>
#include <sstream>
#include <cstring>
@@ -41,7 +42,7 @@ namespace Inputs {
constexpr bool VERBOSE = false;
constexpr size_t TCP_BLOCKSIZE = 2048;
-constexpr size_t MAX_FRAMES_QUEUED = 10;
+constexpr size_t MAX_FRAMES_QUEUED = 1000;
Edi::Edi() :
m_tcp_receive_server(TCP_BLOCKSIZE),
@@ -96,23 +97,53 @@ int Edi::open(const std::string& name)
int Edi::readFrame(uint8_t* buffer, size_t size)
{
- vector<uint8_t> frame;
+ if (m_pending_sti_frame.frame.empty()) {
+ m_frames.try_pop(m_pending_sti_frame);
+ }
+
+ if (not m_pending_sti_frame.frame.empty()) {
+ if (m_pending_sti_frame.frame.size() != size) {
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " <<
+ m_pending_sti_frame.frame.size() << " received, " << size << " requested";
+ memset(buffer, 0, size * sizeof(*buffer));
+ }
+ else {
+ const auto now = chrono::system_clock::now();
+
+ if (m_pending_sti_frame.timestamp.to_system_clock() <= now) {
+ etiLog.level(debug) << "EDI input take frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+
+ std::copy(m_pending_sti_frame.frame.cbegin(), m_pending_sti_frame.frame.cend(), buffer);
+ m_pending_sti_frame.frame.clear();
+ }
+ else {
+ etiLog.level(debug) << "EDI input skip frame with TS " <<
+ m_pending_sti_frame.timestamp.to_string() << " queue size " << m_frames.size();
+ }
+ }
+ }
+
+ return size;
+
+#if 0
+ EdiDecoder::sti_frame_t sti;
if (m_is_prebuffering) {
m_is_prebuffering = m_frames.size() < 10;
if (not m_is_prebuffering) {
etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
}
}
- else if (m_frames.try_pop(frame)) {
- if (frame.size() == 0) {
+ else if (m_frames.try_pop(sti)) {
+ if (sti.frame.size() == 0) {
etiLog.level(debug) << "EDI input " << m_name << " empty frame";
return 0;
}
- else if (frame.size() == size) {
- std::copy(frame.cbegin(), frame.cend(), buffer);
+ else if (sti.frame.size() == size) {
+ std::copy(sti.frame.cbegin(), sti.frame.cend(), buffer);
}
else {
- etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << frame.size() <<
+ etiLog.level(debug) << "EDI input " << m_name << " size mismatch: " << sti.frame.size() <<
" received, " << size << " requested";
memset(buffer, 0, size * sizeof(*buffer));
}
@@ -123,6 +154,7 @@ int Edi::readFrame(uint8_t* buffer, size_t size)
etiLog.level(info) << "EDI input " << m_name << " re-enabling pre-buffering";
}
return size;
+#endif
}
void Edi::m_run()
@@ -159,7 +191,7 @@ void Edi::m_run()
const auto sti = m_sti_writer.getFrame();
if (not sti.frame.empty()) {
- m_frames.push_wait_if_full(move(sti.frame), MAX_FRAMES_QUEUED);
+ m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED);
work_done = true;
}
diff --git a/src/input/Edi.h b/src/input/Edi.h
index 7b3dc04..66ff682 100644
--- a/src/input/Edi.h
+++ b/src/input/Edi.h
@@ -71,7 +71,8 @@ class Edi : public InputBase {
EdiDecoder::STIDecoder m_sti_decoder;
std::thread m_thread;
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
- ThreadsafeQueue<std::vector<uint8_t> > m_frames;
+ ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+ EdiDecoder::sti_frame_t m_pending_sti_frame;
bool m_is_prebuffering = true;
diff --git a/src/utils.cpp b/src/utils.cpp
index 721c145..3e3e86e 100644
--- a/src/utils.cpp
+++ b/src/utils.cpp
@@ -328,7 +328,7 @@ void printSubchannels(const vec_sp_subchannel& subchannels)
etiLog.level(info) << " URI: " << subchannel->inputUri;
switch (subchannel->type) {
case subchannel_type_t::DABAudio:
- etiLog.log(info, " type: DAbAudio");
+ etiLog.log(info, " type: DABAudio");
break;
case subchannel_type_t::DABPlusAudio:
etiLog.log(info, " type: DABPlusAudio");