/*
Copyright (C) 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010,
2011, 2012 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
Copyright (C) 2017
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
*/
/*
This file is part of ODR-DabMux.
ODR-DabMux is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
ODR-DabMux is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with ODR-DabMux. If not, see .
*/
#include "dabOutput/dabOutput.h"
#include "dabOutput/edi/TagItems.h"
#include "dabOutput/edi/TagPacket.h"
#include "dabOutput/edi/AFPacket.h"
#include "dabOutput/edi/PFT.h"
#include "dabOutput/edi/Interleaver.h"
#include "Log.h"
#include "zmq.hpp"
#include "math.h"
#include
#include
#include
#include
#include
static edi_configuration_t edi_conf;
static std::ofstream edi_debug_file;
// The TagPacket will then be placed into an AFPacket
static edi::AFPacketiser edi_afPacketiser;
// The AF Packet will be protected with reed-solomon and split in fragments
static edi::PFT edi_pft;
// To mitigate for burst packet loss, PFT fragments can be sent out-of-order
static edi::Interleaver edi_interleaver;
// This metadata gets transmitted in the zmq stream
struct metadata_t {
uint32_t edi_time;
int16_t utc_offset;
uint16_t dlfc;
};
void usage(void)
{
using namespace std;
cerr << "Usage:" << endl;
cerr << "odr-zmq2edi [options] " << endl << endl;
cerr << "Where the options are:" << endl;
cerr << " is a ZMQ URL that points to a ODR-DabMux ZMQ output." << endl;
cerr << " -d sets the destination ip." << endl;
cerr << " -p sets the destination port." << endl;
cerr << " -s sets the source port." << endl;
cerr << " -S select the source IP in case we want to use multicast." << endl;
cerr << " -t set the packet's TTL." << endl;
cerr << " -P Disable PFT and send AFPackets." << endl;
cerr << " -f sets the FEC." << endl;
cerr << " -i enables the interleaved with this latency." << endl;
cerr << " -D dumps the EDI to edi.debug file." << endl;
cerr << " -v Enables verbose mode." << endl;
cerr << " -a sets the alignment of the TAG Packet (default 8)." << endl;
}
static void edi_setup(void) {
if (edi_conf.verbose) {
etiLog.log(info, "Setup EDI");
}
if (edi_conf.dump) {
edi_debug_file.open("./edi.debug");
}
if (edi_conf.enabled()) {
for (auto& edi_destination : edi_conf.destinations) {
auto edi_output = std::make_shared(edi_destination.source_port);
if (not edi_destination.source_addr.empty()) {
int err = edi_output->setMulticastSource(edi_destination.source_addr.c_str());
if (err) {
throw std::runtime_error("EDI socket set source failed!");
}
err = edi_output->setMulticastTTL(edi_destination.ttl);
if (err) {
throw std::runtime_error("EDI socket set TTL failed!");
}
}
edi_destination.socket = edi_output;
}
}
if (edi_conf.verbose) {
etiLog.log(info, "EDI set up");
}
// The AF Packet will be protected with reed-solomon and split in fragments
edi::PFT pft(edi_conf);
edi_pft = pft;
if (edi_conf.interleaver_enabled()) {
edi_interleaver.SetLatency(edi_conf.latency_frames);
}
}
static void print_edi_conf(void)
{
if (edi_conf.enabled()) {
etiLog.level(info) << "EDI";
etiLog.level(info) << " verbose " << edi_conf.verbose;
for (auto& edi_dest : edi_conf.destinations) {
etiLog.level(info) << " to " << edi_dest.dest_addr << ":" << edi_conf.dest_port;
if (not edi_dest.source_addr.empty()) {
etiLog.level(info) << " source " << edi_dest.source_addr;
etiLog.level(info) << " ttl " << edi_dest.ttl;
}
etiLog.level(info) << " source port " << edi_dest.source_port;
}
if (edi_conf.interleaver_enabled()) {
etiLog.level(info) << " interleave " << edi_conf.latency_frames * 24 << " ms";
}
}
else {
etiLog.level(info) << "EDI disabled";
}
}
static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes)
{
size_t remaining = size;
if (remaining < 3) {
etiLog.level(warn) << "Insufficient data to parse metadata";
throw std::runtime_error("Insufficient data");
}
metadata_t md;
bool utc_offset_received = false;
bool edi_time_received = false;
bool dlfc_received = false;
while (remaining) {
uint8_t id = buf[0];
uint16_t len = (((uint16_t)buf[0]) << 8) + buf[1];
if (id == static_cast(output_metadata_id_e::separation_marker)) {
if (len != 0) {
etiLog.level(warn) << "Invalid length for metadata: separation_marker";
}
if (not utc_offset_received or not edi_time_received or not dlfc_received) {
throw std::runtime_error("Incomplete metadata received");
}
remaining -= 3;
*consumed_bytes = size - remaining;
return md;
}
else if (id == static_cast(output_metadata_id_e::utc_offset)) {
if (len != 2) {
etiLog.level(warn) << "Invalid length for metadata: utc_offset";
}
if (remaining < 2) {
throw std::runtime_error("Insufficient data for utc_offset");
}
uint16_t utco;
std::memcpy(&utco, buf + 2, sizeof(utco));
md.utc_offset = ntohl(utco);
utc_offset_received = true;
remaining -= 5;
buf += 5;
}
else if (id == static_cast(output_metadata_id_e::edi_time)) {
if (len != 4) {
etiLog.level(warn) << "Invalid length for metadata: edi_time";
}
if (remaining < 4) {
throw std::runtime_error("Insufficient data for edi_time");
}
uint32_t edi_time;
std::memcpy(&edi_time, buf + 2, sizeof(edi_time));
md.edi_time = ntohl(edi_time);
edi_time_received = true;
remaining -= 7;
buf += 7;
}
else if (id == static_cast(output_metadata_id_e::dlfc)) {
if (len != 3) {
etiLog.level(warn) << "Invalid length for metadata: dlfc";
}
if (remaining < 2) {
throw std::runtime_error("Insufficient data for dlfc");
}
uint16_t dlfc;
std::memcpy(&dlfc, buf + 2, sizeof(dlfc));
md.dlfc = ntohl(dlfc);
dlfc_received = true;
remaining -= 5;
buf += 5;
}
}
throw std::runtime_error("Insufficient data");
}
static void send_eti_frame(uint8_t* p, metadata_t metadata)
{
edi::TagDETI edi_tagDETI;
edi::TagStarPTR edi_tagStarPtr;
std::map edi_subchannelToTag;
// The above Tag Items will be assembled into a TAG Packet
edi::TagPacket edi_tagpacket(edi_conf.tagpacket_alignment);
// SYNC
edi_tagDETI.stat = p[0];
// LIDATA FCT
//const int fct = p[4];
edi_tagDETI.dlfc = metadata.dlfc;
bool ficf = (p[5] & 0x80) >> 7;
const int nst = p[5] & 0x7F;
edi_tagDETI.fp = (p[6] & 0xE0) >> 5;
const int mid = (p[6] & 0x18) >> 3;
edi_tagDETI.mid = mid;
//const int fl = (p[6] & 0x07) * 256 + p[7];
int ficl = 0;
if (ficf == 0) {
etiLog.level(warn) << "Not FIC in data stream!";
return;
}
else if (mid == 3) {
ficl = 32;
}
else {
ficl = 24;
}
std::vector sad(nst);
std::vector stl(nst);
// Loop over STC subchannels:
for (int i=0; i < nst; i++) {
uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
stl[i] = (p[10+4*i] & 0x03) * 256 + \
p[11+4*i];
edi::TagESTn tag_ESTn;
tag_ESTn.id = i;
tag_ESTn.scid = scid;
tag_ESTn.sad = sad[i];
tag_ESTn.tpl = tpl;
tag_ESTn.rfa = 0; // two bits
tag_ESTn.mst_length = stl[i];
tag_ESTn.mst_data = nullptr;
edi_subchannelToTag[i] = tag_ESTn;
}
const uint16_t mnsc = p[8 + 4*nst] * 256 + \
p[8 + 4*nst + 1];
edi_tagDETI.mnsc = mnsc;
/*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
p[8 + 4*nst + 3]; */
edi_tagDETI.fic_data = p + 12 + 4*nst;
edi_tagDETI.fic_length = ficl * 4;
// loop over MSC subchannels
int offset = 0;
for (int i=0; i < nst; i++) {
edi::TagESTn& tag = edi_subchannelToTag[i];
tag.mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
offset += stl[i] * 8;
}
/*
const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
// TIST
const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
(uint32_t)(p[tist_ix+1]) << 16 |
(uint32_t)(p[tist_ix+2]) << 8 |
(uint32_t)(p[tist_ix+3]);
edi_tagDETI.tsta = tist;
edi_tagDETI.atstf = 1;
edi_tagDETI.utco = metadata.utc_offset;
edi_tagDETI.seconds = metadata.edi_time;
if (edi_conf.enabled()) {
// put tags *ptr, DETI and all subchannels into one TagPacket
edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
edi_tagpacket.tag_items.push_back(&edi_tagDETI);
for (auto& tag : edi_subchannelToTag) {
edi_tagpacket.tag_items.push_back(&tag.second);
}
// Assemble into one AF Packet
edi::AFPacket edi_afpacket = edi_afPacketiser.Assemble(edi_tagpacket);
if (edi_conf.enable_pft) {
// Apply PFT layer to AF Packet (Reed Solomon FEC and Fragmentation)
std::vector edi_fragments = edi_pft.Assemble(edi_afpacket);
if (edi_conf.verbose) {
fprintf(stderr, "EDI number of PFT fragment before interleaver %zu\n",
edi_fragments.size());
}
if (edi_conf.interleaver_enabled()) {
edi_fragments = edi_interleaver.Interleave(edi_fragments);
}
// Send over ethernet
for (const auto& edi_frag : edi_fragments) {
for (auto& dest : edi_conf.destinations) {
InetAddress addr;
addr.setAddress(dest.dest_addr.c_str());
addr.setPort(edi_conf.dest_port);
dest.socket->send(edi_frag, addr);
}
if (edi_conf.dump) {
std::ostream_iterator debug_iterator(edi_debug_file);
std::copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
}
}
if (edi_conf.verbose) {
fprintf(stderr, "EDI number of PFT fragments %zu\n",
edi_fragments.size());
}
}
else {
// Send over ethernet
for (auto& dest : edi_conf.destinations) {
InetAddress addr;
addr.setAddress(dest.dest_addr.c_str());
addr.setPort(edi_conf.dest_port);
dest.socket->send(edi_afpacket, addr);
}
if (edi_conf.dump) {
std::ostream_iterator debug_iterator(edi_debug_file);
std::copy(edi_afpacket.begin(), edi_afpacket.end(), debug_iterator);
}
}
}
}
int start(int argc, char **argv)
{
edi_destination_t edi_destination;
edi_conf.enable_pft = true;
if (argc == 0) {
usage();
return 1;
}
char ch = 0;
while (ch != -1) {
ch = getopt(argc, argv, "d:p:s:S:t:Pf:i:Dva:");
switch (ch) {
case -1:
break;
case 'd':
edi_destination.dest_addr = optarg;
break;
case 'p':
edi_conf.dest_port = std::stoi(optarg);
break;
case 's':
edi_destination.source_port = std::stoi(optarg);
break;
case 'S':
edi_destination.source_addr = optarg;
break;
case 't':
edi_destination.ttl = std::stoi(optarg);
break;
case 'P':
edi_conf.enable_pft = false;
break;
case 'f':
edi_conf.fec = std::stoi(optarg);
break;
case 'i':
{
double interleave_ms = std::stod(optarg);
if (interleave_ms != 0.0) {
if (interleave_ms < 0) {
throw std::runtime_error("EDI output: negative interleave value is invalid.");
}
auto latency_rounded = lround(interleave_ms / 24.0);
if (latency_rounded * 24 > 30000) {
throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
}
edi_conf.latency_frames = latency_rounded;
}
}
break;
case 'D':
edi_conf.dump = true;
break;
case 'v':
edi_conf.verbose = true;
break;
case 'a':
edi_conf.tagpacket_alignment = std::stoi(optarg);
break;
case 'h':
usage();
return 1;
default:
etiLog.log(error, "Option '%c' not understood", ch);
usage();
return 1;
}
}
if (optind >= argc) {
etiLog.level(error) << "source option is missing";
return 1;
}
print_edi_conf();
edi_setup();
const char* source_url = argv[optind];
etiLog.level(info) << "Opening ZMQ input: " << source_url;
zmq::context_t zmq_ctx(1);
zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
zmq_sock.connect(source_url);
zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
etiLog.level(info) << "Entering main loop";
size_t frame_count = 0;
size_t loop_counter = 0;
size_t error_count = 0;
while (error_count < 10)
{
zmq::message_t incoming;
zmq_sock.recv(&incoming);
zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
if (dab_msg->version != 1) {
etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
int offset = sizeof(dab_msg->version) +
NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
std::list, metadata_t> > all_frames;
for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
if (dab_msg->buflen[i] <= 0 ||
dab_msg->buflen[i] > 6144)
{
etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
dab_msg->buflen[i];
error_count++;
}
else {
std::vector buf(6144, 0x55);
const int framesize = dab_msg->buflen[i];
memcpy(&buf.front(),
((uint8_t*)incoming.data()) + offset,
framesize);
all_frames.emplace_back(
std::piecewise_construct,
std::make_tuple(std::move(buf)),
std::make_tuple());
offset += framesize;
}
}
for (auto &f : all_frames) {
size_t consumed_bytes = 0;
std::cerr << "MD: " << incoming.size() - offset << std::endl;
f.second = get_md_one_frame(
static_cast(incoming.data()) + offset,
incoming.size() - offset,
&consumed_bytes);
offset += consumed_bytes;
}
for (auto &f : all_frames) {
send_eti_frame(f.first.data(), f.second);
frame_count++;
}
loop_counter++;
if (loop_counter > 250) {
etiLog.level(info) << "Transmitted " << frame_count << " ETI frames";
loop_counter = 0;
}
}
return error_count > 0 ? 2 : 0;
}
int main(int argc, char **argv)
{
etiLog.level(info) << "ZMQ2EDI converter from " <<
PACKAGE_NAME << " " <<
#if defined(GITVERSION)
GITVERSION <<
#else
PACKAGE_VERSION <<
#endif
" starting up";
try {
return start(argc, argv);
}
catch (std::runtime_error &e) {
etiLog.level(error) << "Error: " << e.what();
}
return 1;
}