aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/DabMultiplexer.cpp118
-rw-r--r--src/DabMultiplexer.h9
-rw-r--r--src/zmq2edi/EDISender.cpp390
-rw-r--r--src/zmq2edi/EDISender.h91
-rw-r--r--src/zmq2edi/zmq2edi.cpp419
5 files changed, 957 insertions, 70 deletions
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index e6e6782..31b12bb 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -47,22 +47,21 @@ static vector<string> split_pipe_separated_string(const std::string& s)
return components;
}
-uint64_t MuxTime::init(uint32_t tist_at_fct0_us)
+uint64_t MuxTime::init(uint32_t tist_at_fct0_us, double tist_offset)
{
- m_tist_at_fct0_us = tist_at_fct0_us;
-
- /* At startup, derive edi_time, TIST and CIF count such that there is
- * a consistency across mux restarts. Ensure edi_time and TIST represent
- * current time.
- *
- * FCT and DLFC are directly derived from m_currentFrame.
- * Every 6s, FCT overflows. DLFC overflows at 5000 every 120s.
- *
- * Keep a granularity of 24ms, which corresponds to the duration of an ETI
- * frame, to get nicer timestamps.
- */
+ // Things we must guarantee, up to granularity of 24ms:
+ // Difference between current time and EDI time = tist_offset
+ // TIST of frame 0 = tist_at_fct0_us
+ // In order to achieve the second, we calculate the initial
+ // counter value so that FCT0 corresponds to the desired TIST.
+ //
+ // Changing the tist_offset at runtime will throw off the TIST@FCT0 value
+ m_tist_offset_ms = std::lround(tist_offset * 1000);
+
using Sec = chrono::seconds;
- const auto now = chrono::system_clock::now();
+ const auto now = chrono::system_clock::now() +
+ chrono::milliseconds(std::lround(tist_offset * 1000.0));
+
const auto offset = now - chrono::time_point_cast<Sec>(now);
if (offset >= chrono::seconds(1)) {
throw std::logic_error("Invalid startup offset calculation for TIST! " +
@@ -70,43 +69,26 @@ uint64_t MuxTime::init(uint32_t tist_at_fct0_us)
" ms");
}
const time_t t_now = chrono::system_clock::to_time_t(chrono::time_point_cast<Sec>(now));
+ const auto offset_ms = chrono::duration_cast<chrono::milliseconds>(offset).count();
- m_edi_time = t_now - (t_now % 6);
- uint64_t currentFrame = 0;
- time_t edi_time_at_cif0 = t_now - (t_now % 120);
- while (edi_time_at_cif0 < m_edi_time) {
- edi_time_at_cif0 += 6;
- currentFrame += 250;
- }
+ m_edi_time = t_now;
+ m_pps_offset_ms = std::lround(offset_ms / 24.0) * 24;
- if (edi_time_at_cif0 != m_edi_time) {
- throw std::logic_error("Invalid startup offset calculation for CIF!");
- }
+ const auto counter_offset = tist_at_fct0_us / 24;
+ const auto offset_as_count = m_pps_offset_ms / 24;
- int64_t offset_ms = chrono::duration_cast<chrono::milliseconds>(offset).count();
- offset_ms += 1000 * (t_now - m_edi_time);
+ etiLog.level(debug) << "Init " << counter_offset << " " << offset_as_count;
- if (tist_at_fct0_us >= 1000000) {
- etiLog.level(error) << "tist_at_fct0 may not be larger than 1s";
- throw MuxInitException();
- }
-
- m_timestamp = (uint64_t)tist_at_fct0_us * 16384 / 1000;
- while (offset_ms >= 24) {
- increment_timestamp();
- currentFrame++;
- offset_ms -= 24;
- }
- return currentFrame;
+ return (250 - counter_offset + offset_as_count) % 250;
}
constexpr int TIMESTAMP_LEVEL_2_SHIFT = 14;
void MuxTime::increment_timestamp()
{
- m_timestamp += 24 << TIMESTAMP_LEVEL_2_SHIFT; // Shift 24ms by 14 to Timestamp level 2
- if (m_timestamp > 0xf9FFff) {
- m_timestamp -= 0xfa0000; // Subtract 16384000, corresponding to one second
+ m_pps_offset_ms += 24;
+ if (m_pps_offset_ms >= 1000) {
+ m_pps_offset_ms -= 1000;
m_edi_time += 1;
// Also update MNSC time for next time FP==0
@@ -114,27 +96,32 @@ void MuxTime::increment_timestamp()
}
}
-std::pair<uint32_t, std::time_t> MuxTime::get_tist_seconds()
+void MuxTime::set_tist_offset(double new_tist_offset)
{
- // The user-visible configuration tist_offset is the effective
- // offset, but since we implicitly add the tist_at_fct0 to it,
- // we must compensate.
- double corrected_tist_offset = tist_offset - (m_tist_at_fct0_us / 1e6);
-
- // negative tist_offset not supported, because the calculation is annoying
- if (corrected_tist_offset < 0) return {m_timestamp, m_edi_time};
-
- double fractional_part = corrected_tist_offset - std::floor(corrected_tist_offset);
- const size_t steps = std::lround(std::floor(fractional_part / 24e-3));
- uint32_t timestamp = m_timestamp + (24 << TIMESTAMP_LEVEL_2_SHIFT) * steps;
-
- std::time_t edi_time = m_edi_time + std::lround(std::floor(corrected_tist_offset));
-
- if (timestamp > 0xf9FFff) {
- edi_time += 1;
+ int32_t new_tist_offset_ms = std::lround(new_tist_offset * 1000.0);
+ if (new_tist_offset_ms > 0) {
+ while (new_tist_offset_ms > 0) {
+ increment_timestamp();
+ new_tist_offset_ms -= 24;
+ }
+ }
+ else if (new_tist_offset_ms < 0) {
+ while (new_tist_offset_ms < 0) {
+ m_edi_time -= 1;
+ new_tist_offset_ms += 1000;
+ }
+ // compensate the we subtracted too much
+ while (new_tist_offset_ms > 0) {
+ increment_timestamp();
+ new_tist_offset_ms -= 24;
+ }
}
+}
- return {timestamp % 0xfa0000, edi_time};
+std::pair<uint32_t, std::time_t> MuxTime::get_tist_seconds()
+{
+ auto timestamp = m_pps_offset_ms * 16384;
+ return {timestamp % 0xfa0000, m_edi_time};
}
std::pair<uint32_t, std::time_t> MuxTime::get_milliseconds_seconds()
@@ -153,7 +140,6 @@ DabMultiplexer::DabMultiplexer(boost::property_tree::ptree pt) :
fig_carousel(ensemble, [&]() { return m_time.get_milliseconds_seconds(); })
{
RC_ADD_PARAMETER(frames, "Show number of frames generated [read-only]");
- RC_ADD_PARAMETER(tist_offset, "Timestamp offset in fractional number of seconds");
rcs.enrol(&m_clock_tai);
}
@@ -200,11 +186,10 @@ void DabMultiplexer::prepare(bool require_tai_clock)
}
const uint32_t tist_at_fct0_us = m_pt.get<double>("general.tist_at_fct0", 0);
- currentFrame = m_time.init(tist_at_fct0_us);
+ currentFrame = m_time.init(tist_at_fct0_us, m_pt.get<double>("general.tist_offset", 0.0));
m_time.mnsc_increment_time = false;
bool tist_enabled = m_pt.get("general.tist", false);
- m_time.tist_offset = m_pt.get<double>("general.tist_offset", 0.0);
auto tist_edi_time = m_time.get_tist_seconds();
const auto timestamp = tist_edi_time.first;
@@ -487,6 +472,8 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
auto tist_edi_time = m_time.get_tist_seconds();
const auto timestamp = tist_edi_time.first;
const auto edi_time = tist_edi_time.second;
+ etiLog.level(debug) << "Frame " << currentFrame << " " << edi_time <<
+ " + " << (timestamp >> TIMESTAMP_LEVEL_2_SHIFT);
// Initialise the ETI frame
memset(etiFrame, 0, 6144);
@@ -520,7 +507,6 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
eti_FC *fc = (eti_FC *) &etiFrame[4];
//****** FCT ******//
- // Incremente for each frame, overflows at 249
fc->FCT = currentFrame % 250;
edi_tagDETI.dlfc = currentFrame % 5000;
@@ -857,7 +843,7 @@ void DabMultiplexer::set_parameter(const std::string& parameter,
throw ParameterError(ss.str());
}
else if (parameter == "tist_offset") {
- m_time.tist_offset = std::stod(value);
+ m_time.set_tist_offset(std::stod(value));
}
else {
stringstream ss;
@@ -875,7 +861,7 @@ const std::string DabMultiplexer::get_parameter(const std::string& parameter) co
ss << currentFrame;
}
else if (parameter == "tist_offset") {
- ss << m_time.tist_offset;
+ ss << m_time.tist_offset();
}
else {
ss << "Parameter '" << parameter <<
@@ -890,7 +876,7 @@ const json::map_t DabMultiplexer::get_all_values() const
{
json::map_t map;
map["frames"].v = currentFrame;
- map["tist_offset"].v = m_time.tist_offset;
+ map["tist_offset"].v = m_time.tist_offset();
return map;
}
diff --git a/src/DabMultiplexer.h b/src/DabMultiplexer.h
index 5a0d906..07b42a1 100644
--- a/src/DabMultiplexer.h
+++ b/src/DabMultiplexer.h
@@ -45,15 +45,14 @@ constexpr uint32_t ETI_FSYNC1 = 0x49C5F8;
class MuxTime {
private:
- uint32_t m_timestamp = 0;
std::time_t m_edi_time = 0;
- uint32_t m_tist_at_fct0_us = 0;
+ uint32_t m_pps_offset_ms = 0;
+ int64_t m_tist_offset_ms = 0;
public:
std::pair<uint32_t, std::time_t> get_tist_seconds();
std::pair<uint32_t, std::time_t> get_milliseconds_seconds();
- double tist_offset = 0;
/* Pre v3 odr-dabmux did the MNSC calculation differently,
* which works with the easydabv2. The rework in odr-dabmux,
@@ -69,8 +68,10 @@ class MuxTime {
std::time_t mnsc_time = 0;
/* Setup the time and return the initial currentFrame counter value */
- uint64_t init(uint32_t tist_at_fct0_us);
+ uint64_t init(uint32_t tist_at_fct0_us, double tist_offset);
void increment_timestamp();
+ double tist_offset() const { return m_tist_offset_ms * 1000.0; }
+ void set_tist_offset(double new_tist_offset);
};
class DabMultiplexer : public RemoteControllable {
diff --git a/src/zmq2edi/EDISender.cpp b/src/zmq2edi/EDISender.cpp
new file mode 100644
index 0000000..06b7420
--- /dev/null
+++ b/src/zmq2edi/EDISender.cpp
@@ -0,0 +1,390 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "EDISender.h"
+#include "Log.h"
+#include <cmath>
+#include <numeric>
+#include <map>
+#include <algorithm>
+
+using namespace std;
+
+EDISender::~EDISender()
+{
+ if (running.load()) {
+ running.store(false);
+
+ // Unblock thread
+ frame_t emptyframe;
+ frames.push(emptyframe);
+
+ process_thread.join();
+ }
+}
+
+void EDISender::start(const edi_configuration_t& conf,
+ int delay_ms, int max_delay_ms)
+{
+ edi_conf = conf;
+ tist_delay_ms = delay_ms;
+ tist_max_delay_ms = max_delay_ms;
+
+ if (edi_conf.verbose) {
+ etiLog.log(info, "Setup EDI");
+ }
+
+ if (edi_conf.dump) {
+ edi_debug_file.open("./edi.debug");
+ }
+
+ if (edi_conf.enabled()) {
+ for (auto& edi_destination : edi_conf.destinations) {
+ auto edi_output = make_shared<UdpSocket>(edi_destination.source_port);
+
+ if (not edi_destination.source_addr.empty()) {
+ int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
+ if (err) {
+ throw runtime_error("EDI socket set source failed!");
+ }
+ err = edi_output->setMulticastTTL(edi_destination.ttl);
+ if (err) {
+ throw runtime_error("EDI socket set TTL failed!");
+ }
+ }
+
+ edi_destination.socket = edi_output;
+ }
+ }
+
+ if (edi_conf.verbose) {
+ etiLog.log(info, "EDI set up");
+ }
+
+ // The AF Packet will be protected with reed-solomon and split in fragments
+ edi::PFT pft(edi_conf);
+ edi_pft = pft;
+
+ if (edi_conf.interleaver_enabled()) {
+ edi_interleaver.SetLatency(edi_conf.latency_frames);
+ }
+
+ startTime = std::chrono::steady_clock::now();
+ running.store(true);
+ process_thread = thread(&EDISender::process, this);
+}
+
+void EDISender::push_frame(const frame_t& frame)
+{
+ frames.push(frame);
+}
+
+void EDISender::print_configuration()
+{
+ if (edi_conf.enabled()) {
+ etiLog.level(info) << "EDI";
+ etiLog.level(info) << " verbose " << edi_conf.verbose;
+ for (auto& edi_dest : edi_conf.destinations) {
+ etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
+ if (not edi_dest.source_addr.empty()) {
+ etiLog.level(info) << " source " << edi_dest.source_addr;
+ etiLog.level(info) << " ttl " << edi_dest.ttl;
+ }
+ etiLog.level(info) << " source port " << edi_dest.source_port;
+ }
+ if (edi_conf.interleaver_enabled()) {
+ etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
+ }
+ }
+ else {
+ etiLog.level(info) << "EDI disabled";
+ }
+}
+
+void EDISender::send_eti_frame(uint8_t* p, metadata_t metadata)
+{
+ edi::TagDETI edi_tagDETI;
+ edi::TagStarPTR edi_tagStarPtr;
+ map<int, edi::TagESTn> edi_subchannelToTag;
+ // The above Tag Items will be assembled into a TAG Packet
+ edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
+
+ // SYNC
+ edi_tagDETI.stat = p[0];
+
+ // LIDATA FCT
+ edi_tagDETI.dlfc = metadata.dlfc;
+
+ const int fct = p[4];
+ if (metadata.dlfc % 250 != fct) {
+ etiLog.level(warn) << "Frame FCT=" << fct <<
+ " does not correspond to DLFC=" << metadata.dlfc;
+ }
+
+ bool ficf = (p[5] & 0x80) >> 7;
+ edi_tagDETI.ficf = ficf;
+
+ const int nst = p[5] & 0x7F;
+
+ edi_tagDETI.fp = (p[6] & 0xE0) >> 5;
+ const int mid = (p[6] & 0x18) >> 3;
+ edi_tagDETI.mid = mid;
+ //const int fl = (p[6] & 0x07) * 256 + p[7];
+
+ int ficl = 0;
+ if (ficf == 0) {
+ etiLog.level(warn) << "Not FIC in data stream!";
+ return;
+ }
+ else if (mid == 3) {
+ ficl = 32;
+ }
+ else {
+ ficl = 24;
+ }
+
+ vector<uint32_t> sad(nst);
+ vector<uint32_t> stl(nst);
+ // Loop over STC subchannels:
+ for (int i=0; i < nst; i++) {
+ // EDI stream index is 1-indexed
+ const int edi_stream_id = i + 1;
+
+ uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
+ sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
+ uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
+ stl[i] = (p[10+4*i] & 0x03) * 256 + \
+ p[11+4*i];
+
+ edi::TagESTn tag_ESTn;
+ tag_ESTn.id = edi_stream_id;
+ tag_ESTn.scid = scid;
+ tag_ESTn.sad = sad[i];
+ tag_ESTn.tpl = tpl;
+ tag_ESTn.rfa = 0; // two bits
+ tag_ESTn.mst_length = stl[i];
+ tag_ESTn.mst_data = nullptr;
+
+ edi_subchannelToTag[i] = tag_ESTn;
+ }
+
+ const uint16_t mnsc = p[8 + 4*nst] * 256 + \
+ p[8 + 4*nst + 1];
+ edi_tagDETI.mnsc = mnsc;
+
+ /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
+ p[8 + 4*nst + 3]; */
+
+ edi_tagDETI.fic_data = p + 12 + 4*nst;
+ edi_tagDETI.fic_length = ficl * 4;
+
+ // loop over MSC subchannels
+ int offset = 0;
+ for (int i=0; i < nst; i++) {
+ edi::TagESTn& tag = edi_subchannelToTag[i];
+ tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
+
+ offset += stl[i] * 8;
+ }
+
+ /*
+ const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
+ p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
+
+ // TIST
+ const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
+ uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
+ (uint32_t)(p[tist_ix+1]) << 16 |
+ (uint32_t)(p[tist_ix+2]) << 8 |
+ (uint32_t)(p[tist_ix+3]);
+
+ std::time_t posix_timestamp_1_jan_2000 = 946684800;
+
+ // Wait until our time is tist_delay after the TIST before
+ // we release that frame
+
+ using namespace std::chrono;
+
+ const auto seconds = metadata.edi_time;
+ const auto pps_offset = milliseconds(std::lrint((tist & 0xFFFFFF) / 16384.0));
+ const auto t_frame = system_clock::from_time_t(
+ seconds + posix_timestamp_1_jan_2000) + pps_offset;
+
+ const auto t_release = t_frame + milliseconds(tist_delay_ms);
+ const auto t_now = system_clock::now();
+
+ /*
+ etiLog.level(debug) << "seconds " << seconds + posix_timestamp_1_jan_2000;
+ etiLog.level(debug) << "now " << system_clock::to_time_t(t_now);
+ etiLog.level(debug) << "wait " << wait_time.count();
+ */
+
+ const auto wait_time = t_release - t_now;
+ wait_times.push_back(duration_cast<microseconds>(wait_time).count());
+
+ if (tist_max_delay_ms > 0) {
+ const auto t_latest_release = t_frame + milliseconds(tist_max_delay_ms);
+
+ if (t_now > t_latest_release) {
+ // drop frame
+ num_dropped.fetch_add(1);
+ return;
+ }
+ }
+
+ if (t_release > t_now) {
+ std::this_thread::sleep_for(wait_time);
+ }
+
+ edi_tagDETI.tsta = tist;
+ edi_tagDETI.atstf = 1;
+ edi_tagDETI.utco = metadata.utc_offset;
+ edi_tagDETI.seconds = metadata.edi_time;
+
+ if (edi_conf.enabled()) {
+ // put tags *ptr, DETI and all subchannels into one TagPacket
+ edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
+ edi_tagpacket.tag_items.push_back(&edi_tagDETI);
+
+ for (auto& tag : edi_subchannelToTag) {
+ edi_tagpacket.tag_items.push_back(&tag.second);
+ }
+
+ // Assemble into one AF Packet
+ edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
+
+ if (edi_conf.enable_pft) {
+ // Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
+ vector<edi::PFTFragment> edi_fragments = edi_pft.Assemble(edi_afpacket);
+
+ if (edi_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
+ edi_fragments.size());
+ }
+
+ if (edi_conf.interleaver_enabled()) {
+ edi_fragments = edi_interleaver.Interleave(edi_fragments);
+ }
+
+ // Send over ethernet
+ for (const auto& edi_frag : edi_fragments) {
+ for (auto& dest : edi_conf.destinations) {
+ InetAddress addr;
+ addr.setAddress(dest.dest_addr.c_str());
+ addr.setPort(edi_conf.dest_port);
+
+ dest.socket->send(edi_frag, addr);
+ }
+
+ if (edi_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ }
+ }
+
+ if (edi_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
+ edi_fragments.size());
+ }
+ }
+ else {
+ // Send over ethernet
+ for (auto& dest : edi_conf.destinations) {
+ InetAddress addr;
+ addr.setAddress(dest.dest_addr.c_str());
+ addr.setPort(edi_conf.dest_port);
+
+ dest.socket->send(edi_afpacket, addr);
+ }
+
+ if (edi_conf.dump) {
+ std::ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
+ }
+ }
+ }
+}
+
+void EDISender::process()
+{
+ while (running.load()) {
+ frame_t frame;
+ frames.wait_and_pop(frame);
+
+ if (not running.load() or frame.first.empty()) {
+ break;
+ }
+
+ if (frame.first.size() == 6144) {
+ send_eti_frame(frame.first.data(), frame.second);
+ }
+ else {
+ etiLog.level(warn) << "Ignoring short ETI frame, "
+ "DFLC=" << frame.second.dlfc << ", len=" <<
+ frame.first.size();
+ }
+
+ if (wait_times.size() == 250) { // every six seconds
+ const double n = wait_times.size();
+
+ double sum = accumulate(wait_times.begin(), wait_times.end(), 0);
+ size_t num_late = std::count_if(wait_times.begin(), wait_times.end(),
+ [](double v){ return v < 0; });
+ double mean = sum / n;
+
+ double sq_sum = 0;
+ for (const auto t : wait_times) {
+ sq_sum += (t-mean) * (t-mean);
+ }
+ double stdev = sqrt(sq_sum / n);
+ auto min_max = minmax_element(wait_times.begin(), wait_times.end());
+
+ /* Debug code
+ stringstream ss;
+ ss << "times:";
+ for (const auto t : wait_times) {
+ ss << " " << t;
+ }
+ etiLog.level(debug) << ss.str();
+ */
+
+ const size_t dropped = num_dropped.exchange(0);
+
+ etiLog.level(info) << "Wait time statistics [microseconds]:"
+ " min: " << *min_max.first <<
+ " max: " << *min_max.second <<
+ " mean: " << mean <<
+ " stdev: " << stdev <<
+ " late: " <<
+ num_late << " of " << wait_times.size() << " (" <<
+ num_late * 100.0 / n << "%)" <<
+ " dropped: " << dropped;
+
+ wait_times.clear();
+ }
+ }
+}
diff --git a/src/zmq2edi/EDISender.h b/src/zmq2edi/EDISender.h
new file mode 100644
index 0000000..44502c1
--- /dev/null
+++ b/src/zmq2edi/EDISender.h
@@ -0,0 +1,91 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#pragma once
+#include <iostream>
+#include <iterator>
+#include <thread>
+#include <vector>
+#include <chrono>
+#include <atomic>
+#include "ThreadsafeQueue.h"
+#include "dabOutput/dabOutput.h"
+#include "dabOutput/edi/TagItems.h"
+#include "dabOutput/edi/TagPacket.h"
+#include "dabOutput/edi/AFPacket.h"
+#include "dabOutput/edi/PFT.h"
+#include "dabOutput/edi/Interleaver.h"
+
+// This metadata gets transmitted in the zmq stream
+struct metadata_t {
+ uint32_t edi_time = 0;
+ int16_t utc_offset = 0;
+ uint16_t dlfc = 0;
+};
+
+using frame_t = std::pair<std::vector<uint8_t>, metadata_t>;
+
+class EDISender {
+ public:
+ EDISender() = default;
+ EDISender(const EDISender& other) = delete;
+ EDISender& operator=(const EDISender& other) = delete;
+ ~EDISender();
+ void start(const edi_configuration_t& conf, int delay_ms, int max_delay_ms);
+ void push_frame(const frame_t& frame);
+ void print_configuration(void);
+
+ private:
+ void send_eti_frame(uint8_t* p, metadata_t metadata);
+ void process(void);
+
+ int tist_delay_ms = 0;
+ int tist_max_delay_ms = 0;
+ std::atomic<bool> running = ATOMIC_VAR_INIT(false);
+ std::thread process_thread;
+ edi_configuration_t edi_conf;
+ std::chrono::steady_clock::time_point startTime;
+ ThreadsafeQueue<frame_t> frames;
+ std::ofstream edi_debug_file;
+
+ // The TagPacket will then be placed into an AFPacket
+ edi::AFPacketiser edi_afPacketiser;
+
+ // The AF Packet will be protected with reed-solomon and split in fragments
+ edi::PFT edi_pft;
+
+ // To mitigate for burst packet loss, PFT fragments can be sent out-of-order
+ edi::Interleaver edi_interleaver;
+
+ // For statistics about wait time before we transmit packets,
+ // in microseconds
+ std::vector<double> wait_times;
+
+ // Number of frames dropped because their TIST was larger than max_delay
+ std::atomic<size_t> num_dropped = ATOMIC_VAR_INIT(0);
+
+};
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
new file mode 100644
index 0000000..63c3228
--- /dev/null
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -0,0 +1,419 @@
+/*
+ Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
+ 2011, 2012 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2018
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "Log.h"
+#include "zmq.hpp"
+#include <math.h>
+#include <getopt.h>
+#include <string.h>
+#include <iostream>
+#include <iterator>
+#include <vector>
+
+#include "EDISender.h"
+#include "dabOutput/dabOutput.h"
+
+constexpr size_t MAX_ERROR_COUNT = 10;
+constexpr long ZMQ_TIMEOUT_MS = 1000;
+
+static edi_configuration_t edi_conf;
+
+static EDISender edisender;
+
+void usage(void)
+{
+ using namespace std;
+
+ cerr << "Usage:" << endl;
+ cerr << "odr-zmq2edi [options] <source>" << endl << endl;
+
+ cerr << "Options:" << endl;
+ cerr << "The following options can be given only once:" << endl;
+ cerr << " <source> is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
+ cerr << " -w <delay> Keep every ETI frame until TIST is <delay> milliseconds after current system time." << endl;
+ cerr << " -W <max_delay> Drop ETI frames if TIST is <max_delay> later than current system time." << endl;
+ cerr << " -p <destination port> sets the destination port." << endl;
+ cerr << " -P Disable PFT and send AFPackets." << endl;
+ cerr << " -f <fec> sets the FEC." << endl;
+ cerr << " -i <interleave> enables the interleaved with this latency." << endl;
+ cerr << " -D dumps the EDI to edi.debug file." << endl;
+ cerr << " -v Enables verbose mode." << endl;
+ cerr << " -a <tagpacket alignement> sets the alignment of the TAG Packet (default 8)." << endl << endl;
+
+ cerr << "The following options can be given several times, when more than once destination is addressed:" << endl;
+ cerr << " -d <destination ip> sets the destination ip." << endl;
+ cerr << " -s <source port> sets the source port." << endl;
+ cerr << " -S <source ip> select the source IP in case we want to use multicast." << endl;
+ cerr << " -t <ttl> set the packet's TTL." << endl << endl;
+
+ cerr << "odr-zmq2edi will quit if it does not receive data for " <<
+ (int)(MAX_ERROR_COUNT * ZMQ_TIMEOUT_MS / 1000.0) << " seconds." << endl;
+ cerr << "It is best practice to run this tool under a process supervisor that will restart it automatically." << endl;
+}
+
+static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes)
+{
+ size_t remaining = size;
+ if (remaining < 3) {
+ etiLog.level(warn) << "Insufficient data to parse metadata";
+ throw std::runtime_error("Insufficient data");
+ }
+
+ metadata_t md;
+ bool utc_offset_received = false;
+ bool edi_time_received = false;
+ bool dlfc_received = false;
+
+ while (remaining) {
+ uint8_t id = buf[0];
+ uint16_t len = (((uint16_t)buf[1]) << 8) + buf[2];
+
+ if (id == static_cast<uint8_t>(output_metadata_id_e::separation_marker)) {
+ if (len != 0) {
+ etiLog.level(warn) << "Invalid length " << len << " for metadata: separation_marker";
+ }
+
+ if (not utc_offset_received or not edi_time_received or not dlfc_received) {
+ throw std::runtime_error("Incomplete metadata received");
+ }
+
+ remaining -= 3;
+ *consumed_bytes = size - remaining;
+ return md;
+ }
+ else if (id == static_cast<uint8_t>(output_metadata_id_e::utc_offset)) {
+ if (len != 2) {
+ etiLog.level(warn) << "Invalid length " << len << " for metadata: utc_offset";
+ }
+ if (remaining < 2) {
+ throw std::runtime_error("Insufficient data for utc_offset");
+ }
+ uint16_t utco;
+ std::memcpy(&utco, buf + 3, sizeof(utco));
+ md.utc_offset = ntohs(utco);
+ utc_offset_received = true;
+ remaining -= 5;
+ buf += 5;
+ }
+ else if (id == static_cast<uint8_t>(output_metadata_id_e::edi_time)) {
+ if (len != 4) {
+ etiLog.level(warn) << "Invalid length " << len << " for metadata: edi_time";
+ }
+ if (remaining < 4) {
+ throw std::runtime_error("Insufficient data for edi_time");
+ }
+ uint32_t edi_time;
+ std::memcpy(&edi_time, buf + 3, sizeof(edi_time));
+ md.edi_time = ntohl(edi_time);
+ edi_time_received = true;
+ remaining -= 7;
+ buf += 7;
+ }
+ else if (id == static_cast<uint8_t>(output_metadata_id_e::dlfc)) {
+ if (len != 2) {
+ etiLog.level(warn) << "Invalid length " << len << " for metadata: dlfc";
+ }
+ if (remaining < 2) {
+ throw std::runtime_error("Insufficient data for dlfc");
+ }
+ uint16_t dlfc;
+ std::memcpy(&dlfc, buf + 3, sizeof(dlfc));
+ md.dlfc = ntohs(dlfc);
+ dlfc_received = true;
+ remaining -= 5;
+ buf += 5;
+ }
+ }
+
+ throw std::runtime_error("Insufficient data");
+}
+
+/* There is some state inside the parsing of destination arguments,
+ * because several destinations can be given. */
+
+static edi_destination_t edi_destination;
+static bool source_port_set = false;
+static bool source_addr_set = false;
+static bool ttl_set = false;
+static bool dest_addr_set = false;
+
+static void add_edi_destination(void)
+{
+ if (not dest_addr_set) {
+ throw std::runtime_error("Destination address not specified for destination number " +
+ std::to_string(edi_conf.destinations.size() + 1));
+ }
+
+ edi_conf.destinations.push_back(edi_destination);
+ edi_destination_t newdest;
+ edi_destination = newdest;
+
+ source_port_set = false;
+ source_addr_set = false;
+ ttl_set = false;
+ dest_addr_set = false;
+}
+
+static void parse_destination_args(char option)
+{
+ switch (option) {
+ case 's':
+ if (source_port_set) {
+ add_edi_destination();
+ }
+ edi_destination.source_port = std::stoi(optarg);
+ source_port_set = true;
+ break;
+ case 'S':
+ if (source_addr_set) {
+ add_edi_destination();
+ }
+ edi_destination.source_addr = optarg;
+ source_addr_set = true;
+ break;
+ case 't':
+ if (ttl_set) {
+ add_edi_destination();
+ }
+ edi_destination.ttl = std::stoi(optarg);
+ ttl_set = true;
+ break;
+ case 'd':
+ if (dest_addr_set) {
+ add_edi_destination();
+ }
+ edi_destination.dest_addr = optarg;
+ dest_addr_set = true;
+ break;
+ default:
+ throw std::logic_error("parse_destination_args invalid");
+ }
+}
+
+int start(int argc, char **argv)
+{
+ edi_conf.enable_pft = true;
+
+ if (argc == 0) {
+ usage();
+ return 1;
+ }
+
+ int delay_ms = 500;
+ int max_delay_ms = 0; // no max delay
+
+ int ch = 0;
+ while (ch != -1) {
+ ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:w:W:");
+ switch (ch) {
+ case -1:
+ break;
+ case 'd':
+ case 's':
+ case 'S':
+ case 't':
+ parse_destination_args(ch);
+ break;
+ case 'p':
+ edi_conf.dest_port = std::stoi(optarg);
+ break;
+ case 'P':
+ edi_conf.enable_pft = false;
+ break;
+ case 'f':
+ edi_conf.fec = std::stoi(optarg);
+ break;
+ case 'i':
+ {
+ double interleave_ms = std::stod(optarg);
+ if (interleave_ms != 0.0) {
+ if (interleave_ms < 0) {
+ throw std::runtime_error("EDI output: negative interleave value is invalid.");
+ }
+
+ auto latency_rounded = lround(interleave_ms / 24.0);
+ if (latency_rounded * 24 > 30000) {
+ throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ }
+
+ edi_conf.latency_frames = latency_rounded;
+ }
+ }
+ break;
+ case 'D':
+ edi_conf.dump = true;
+ break;
+ case 'v':
+ edi_conf.verbose = true;
+ break;
+ case 'a':
+ edi_conf.tagpacket_alignment = std::stoi(optarg);
+ break;
+ case 'w':
+ delay_ms = std::stoi(optarg);
+ break;
+ case 'W':
+ max_delay_ms = std::stoi(optarg);
+ break;
+ case 'h':
+ default:
+ usage();
+ return 1;
+ }
+ }
+
+ add_edi_destination();
+
+ if (optind >= argc) {
+ etiLog.level(error) << "source option is missing";
+ return 1;
+ }
+
+ if (edi_conf.dest_port == 0) {
+ etiLog.level(error) << "No EDI destination port defined";
+ return 1;
+ }
+
+ if (edi_conf.destinations.empty()) {
+ etiLog.level(error) << "No EDI destinations set";
+ return 1;
+ }
+
+ if (max_delay_ms > 0) {
+ etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms and max delay " << max_delay_ms << " ms";
+ }
+ else {
+ etiLog.level(info) << "Setting up EDI Sender with delay " << delay_ms << " ms";
+ }
+ edisender.start(edi_conf, delay_ms, max_delay_ms);
+ edisender.print_configuration();
+
+ const char* source_url = argv[optind];
+
+
+ size_t frame_count = 0;
+ size_t error_count = 0;
+
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
+
+ zmq::context_t zmq_ctx(1);
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ while (error_count < MAX_ERROR_COUNT) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const int num_events = zmq::poll(items, 1, ZMQ_TIMEOUT_MS);
+ if (num_events == 0) { // timeout
+ error_count++;
+ }
+ else {
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
+
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ error_count++;
+ }
+
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
+
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
+ error_count++;
+ }
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
+
+ const int framesize = dab_msg->buflen[i];
+
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
+
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
+
+ offset += framesize;
+ }
+ }
+
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
+
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
+
+ offset += consumed_bytes;
+ }
+
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ frame_count++;
+ }
+ }
+ }
+
+ etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+
+ return 0;
+}
+
+int main(int argc, char **argv)
+{
+ etiLog.level(info) << "ZMQ2EDI converter from " <<
+ PACKAGE_NAME << " " <<
+#if defined(GITVERSION)
+ GITVERSION <<
+#else
+ PACKAGE_VERSION <<
+#endif
+ " starting up";
+
+ try {
+ return start(argc, argv);
+ }
+ catch (std::runtime_error &e) {
+ etiLog.level(error) << "Error: " << e.what();
+ }
+
+ return 1;
+}