From 3edd67dc81cd637e06ea22221f3aebfa0111d989 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 23 Sep 2019 20:30:57 +0200 Subject: Add output code from ODR-AudioEnc with EDI output --- Makefile.am | 3 +- src/Outputs.cpp | 258 ++++++++++++++++++++++++++++++++++++++++++++ src/Outputs.h | 161 +++++++++++++++++++++++++++ src/common.h | 32 ++++++ src/odr-sourcecompanion.cpp | 132 +++++++++++++---------- 5 files changed, 529 insertions(+), 57 deletions(-) create mode 100644 src/Outputs.cpp create mode 100644 src/Outputs.h create mode 100644 src/common.h diff --git a/Makefile.am b/Makefile.am index 4884dab..6f925f6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -9,11 +9,12 @@ endif odr_sourcecompanion_LDADD = -lzmq odr_sourcecompanion_CFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc/fec -Ilib +odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc -Ilib odr_sourcecompanion_SOURCES = src/odr-sourcecompanion.cpp \ src/AACDecoder.h src/AACDecoder.cpp \ src/AVTInput.h src/AVTInput.cpp \ src/OrderedQueue.h src/OrderedQueue.cpp \ + src/Outputs.h src/Outputs.cpp \ src/StatsPublish.h src/StatsPublish.cpp \ src/encryption.h src/encryption.c \ src/utils.h src/utils.c \ diff --git a/src/Outputs.cpp b/src/Outputs.cpp new file mode 100644 index 0000000..d0d3ca4 --- /dev/null +++ b/src/Outputs.cpp @@ -0,0 +1,258 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#include "Outputs.h" +#include +#include +#include +#include +#include + +namespace Output { + +using namespace std; + +void Base::update_audio_levels(int16_t audiolevel_left, int16_t audiolevel_right) +{ + m_audio_left = audiolevel_left; + m_audio_right = audiolevel_right; +} + +File::File(const char *filename) +{ + m_fd = fopen(filename, "wb"); + if (m_fd == nullptr) { + throw runtime_error(string("Error opening output file: ") + strerror(errno)); + } +} + +File::File(FILE *fd) : m_fd(fd) { } + +File::~File() { + if (m_fd) { + fclose(m_fd); + m_fd = nullptr; + } +} + +bool File::write_frame(const uint8_t *buf, size_t len) +{ + if (m_fd == nullptr) { + throw logic_error("Invalid usage of closed File output"); + } + + return fwrite(buf, len, 1, m_fd) == 1; +} + +ZMQ::ZMQ() : + m_ctx(), + m_sock(m_ctx, ZMQ_PUB) +{ + // Do not wait at teardown to send all data out + int linger = 0; + m_sock.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +} + +ZMQ::~ZMQ() {} + +void ZMQ::connect(const char *uri, const char *keyfile) +{ + if (keyfile) { + fprintf(stderr, "Enabling encryption\n"); + + int rc = readkey(keyfile, m_secretkey); + if (rc) { + throw runtime_error("Error reading secret key"); + } + + const int yes = 1; + m_sock.setsockopt(ZMQ_CURVE_SERVER, + &yes, sizeof(yes)); + + m_sock.setsockopt(ZMQ_CURVE_SECRETKEY, + m_secretkey, CURVE_KEYLEN); + } + m_sock.connect(uri); +} + +void ZMQ::set_encoder_type(encoder_selection_t& enc, int bitrate) +{ + m_encoder = enc; + m_bitrate = bitrate; +} + +bool ZMQ::write_frame(const uint8_t *buf, size_t len) +{ + if (m_framebuf.size() != ZMQ_HEADER_SIZE + len) { + m_framebuf.resize(ZMQ_HEADER_SIZE + len); + } + + zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)m_framebuf.data(); + + try { + switch (m_encoder) { + case encoder_selection_t::fdk_dabplus: + zmq_frame_header->encoder = ZMQ_ENCODER_FDK; + break; + case encoder_selection_t::toolame_dab: + zmq_frame_header->encoder = ZMQ_ENCODER_TOOLAME; + break; + } + + zmq_frame_header->version = 1; + zmq_frame_header->datasize = len; + zmq_frame_header->audiolevel_left = m_audio_left; + zmq_frame_header->audiolevel_right = m_audio_right; + + assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= m_framebuf.size()); + + memcpy(ZMQ_FRAME_DATA(zmq_frame_header), buf, len); + + m_sock.send(m_framebuf.data(), ZMQ_FRAME_SIZE(zmq_frame_header), + ZMQ_DONTWAIT); + } + catch (zmq::error_t& e) { + fprintf(stderr, "ZeroMQ send error !\n"); + return false; + } + + return true; +} + +EDI::EDI() : + m_clock_tai({}) +{ } + +EDI::~EDI() { } + +void EDI::add_udp_destination(const std::string& host, unsigned int port) +{ + auto dest = make_shared(); + dest->dest_addr = host; + m_edi_conf.dest_port = port; + m_edi_conf.destinations.push_back(dest); + + // We cannot carry AF packets over UDP, because they would be too large. + m_edi_conf.enable_pft = true; + + // TODO make FEC configurable +} + +void EDI::add_tcp_destination(const std::string& host, unsigned int port) +{ + auto dest = make_shared(); + dest->dest_addr = host; + if (dest->dest_port != 0 and dest->dest_port != port) { + throw runtime_error("All EDI UDP outputs must be to the same destination port"); + } + dest->dest_port = port; + m_edi_conf.destinations.push_back(dest); + + m_edi_conf.dump = true; +} + +bool EDI::enabled() const +{ + return not m_edi_conf.destinations.empty(); +} + +void EDI::set_tist(bool enable, uint32_t delay_ms) +{ + m_tist = enable; + m_delay_ms = delay_ms; +} + +bool EDI::write_frame(const uint8_t *buf, size_t len) +{ + if (not m_edi_sender) { + m_edi_sender = make_shared(m_edi_conf); + } + + if (m_edi_time == 0) { + using Sec = chrono::seconds; + const auto now = chrono::time_point_cast(chrono::system_clock::now()); + m_edi_time = chrono::system_clock::to_time_t(now) + (m_delay_ms / 1000); + m_send_version_at_time = m_edi_time; + + /* TODO we still have to see if 24ms granularity is achievable, given that + * one DAB+ super frame is carried over more than 1 ETI frame. + */ + for (int32_t sub_ms = (m_delay_ms % 1000); sub_ms > 0; sub_ms -= 24) { + m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 + } + } + + edi::TagStarPTR edi_tagStarPtr("DSTI"); + + m_edi_tagDSTI.stihf = false; + m_edi_tagDSTI.atstf = m_tist; + + m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2 + if (m_timestamp > 0xf9FFff) { + m_timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second + m_edi_time += 1; + + m_num_seconds_sent++; + } + + m_edi_tagDSTI.set_edi_time(m_edi_time, m_clock_tai.get_offset()); + m_edi_tagDSTI.tsta = m_timestamp & 0xffffff; + + m_edi_tagDSTI.rfadf = false; + // DFCT is handled inside the TagDSTI + + edi::TagSSm edi_tagPayload; + // TODO make edi_tagPayload.stid configurable + edi_tagPayload.istd_data = buf; + edi_tagPayload.istd_length = len; + + edi::TagODRAudioLevels edi_tagAudioLevels(m_audio_left, m_audio_right); + + stringstream ss; + ss << PACKAGE_NAME << " " << +#if defined(GITVERSION) + GITVERSION; +#else + PACKAGE_VERSION; +#endif + edi::TagODRVersion edi_tagVersion(ss.str(), m_num_seconds_sent); + + + // The above Tag Items will be assembled into a TAG Packet + edi::TagPacket edi_tagpacket(m_edi_conf.tagpacket_alignment); + + // put tags *ptr, DETI and all subchannels into one TagPacket + edi_tagpacket.tag_items.push_back(&edi_tagStarPtr); + edi_tagpacket.tag_items.push_back(&m_edi_tagDSTI); + edi_tagpacket.tag_items.push_back(&edi_tagPayload); + edi_tagpacket.tag_items.push_back(&edi_tagAudioLevels); + + // Send version information only every 10 seconds to save bandwidth + if (m_send_version_at_time < m_edi_time) { + m_send_version_at_time += 10; + edi_tagpacket.tag_items.push_back(&edi_tagVersion); + } + + m_edi_sender->write(edi_tagpacket); + + // TODO Handle TCP disconnect + return true; +} + +} diff --git a/src/Outputs.h b/src/Outputs.h new file mode 100644 index 0000000..0f1f34f --- /dev/null +++ b/src/Outputs.h @@ -0,0 +1,161 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include "common.h" +#include "zmq.hpp" +#include "ClockTAI.h" +#include "edi/TagItems.h" +#include "edi/TagPacket.h" +#include "edi/AFPacket.h" +#include "edi/Transport.h" +extern "C" { +#include "encryption.h" +} + +namespace Output { + +/*! \file Outputs.h + * + * Declaration of all outputs + */ + +class Base { + public: + virtual ~Base() {}; + + /*! Write a buffer of encoded data to the output */ + virtual bool write_frame(const uint8_t *buf, size_t len) = 0; + + /*! Update peak audio level information */ + virtual void update_audio_levels( + int16_t audiolevel_left, int16_t audiolevel_right); + + protected: + int16_t m_audio_left = 0; + int16_t m_audio_right = 0; +}; + +class File : public Base { + public: + File(const char *filename); + File(FILE *file); + File(const File&) = delete; + File& operator=(const File&) = delete; + virtual ~File() override; + + virtual bool write_frame(const uint8_t *buf, size_t len) override; + + private: + FILE *m_fd = nullptr; +}; + +/*! This defines the on-wire representation of a ZMQ message header. + * It must be compatible with the definition in ODR-DabMux. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ + uint16_t version; // we support version=1 now + uint16_t encoder; // see ZMQ_ENCODER_XYZ + + /* length of the 'data' field */ + uint32_t datasize; + + /* Audio level, peak, linear PCM */ + int16_t audiolevel_left; + int16_t audiolevel_right; + + /* Data follows this header */ +} __attribute__ ((packed)); + +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +#define ZMQ_HEADER_SIZE sizeof(struct zmq_frame_header_t) + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(struct zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(struct zmq_frame_header_t) ) + + +class ZMQ: public Base { + public: + ZMQ(); + ZMQ(const ZMQ&) = delete; + ZMQ& operator=(const ZMQ&) = delete; + virtual ~ZMQ() override; + + void connect(const char *uri, const char *keyfile); + void set_encoder_type(encoder_selection_t& enc, int bitrate); + + virtual bool write_frame(const uint8_t *buf, size_t len) override; + + private: + zmq::context_t m_ctx; + zmq::socket_t m_sock; + + int m_bitrate = 0; + char m_secretkey[CURVE_KEYLEN+1]; + encoder_selection_t m_encoder = encoder_selection_t::fdk_dabplus; + using vec_u8 = std::vector; + vec_u8 m_framebuf; +}; + + +class EDI: public Base { + public: + EDI(); + EDI(const EDI&) = delete; + EDI& operator=(const EDI&) = delete; + virtual ~EDI() override; + + void add_udp_destination(const std::string& host, unsigned int port); + void add_tcp_destination(const std::string& host, unsigned int port); + + void set_tist(bool enable, uint32_t delay_ms); + + bool enabled() const; + + virtual bool write_frame(const uint8_t *buf, size_t len) override; + + private: + edi::configuration_t m_edi_conf; + std::shared_ptr m_edi_sender; + + uint32_t m_timestamp = 0; + uint32_t m_num_seconds_sent = 0; + std::time_t m_edi_time = 0; + std::time_t m_send_version_at_time = 0; + + edi::TagDSTI m_edi_tagDSTI; + + ClockTAI m_clock_tai; + bool m_tist = false; + uint32_t m_delay_ms = 0; +}; + +} diff --git a/src/common.h b/src/common.h new file mode 100644 index 0000000..774a4a0 --- /dev/null +++ b/src/common.h @@ -0,0 +1,32 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2016 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once + +// 16 bits per sample is fine for now +#define BYTES_PER_SAMPLE 2 + +// How many samples we insert into the queue each call +#define NUM_SAMPLES_PER_CALL 10 // 10 samples @ 32kHz = 3.125ms + +//! Enumeration of encoders we can use +enum class encoder_selection_t { + fdk_dabplus, + toolame_dab +}; + diff --git a/src/odr-sourcecompanion.cpp b/src/odr-sourcecompanion.cpp index a69f705..8b5b701 100644 --- a/src/odr-sourcecompanion.cpp +++ b/src/odr-sourcecompanion.cpp @@ -27,6 +27,7 @@ #include "zmq.hpp" #include "AVTInput.h" +#include "Outputs.h" #include "AACDecoder.h" #include "StatsPublish.h" #include @@ -89,10 +90,10 @@ void usage(const char* name) { " --ps Force the usage of PS\n" " Output and pad parameters:\n" " -o, --output=URI Output ZMQ uri. (e.g. 'tcp://localhost:9000')\n" - " -or- Output file uri. (e.g. 'file.dabp')\n" - " -or- a single dash '-' to denote stdout\n" " If more than one ZMQ output is given, the socket\n" " will be connected to all listed endpoints.\n" + " -e, --edi=URI EDI output uri, (e.g. 'tcp://localhost:7000')\n" + " -T, --timestamp-delay=DELAY_MS Enabled timestamps in EDI (requires TAI clock bulletin download) and\n" " -k, --secret-key=FILE Enable ZMQ encryption with the given secret key.\n" " -p, --pad=BYTES Set PAD size in bytes.\n" " -P, --pad-fifo=FILENAME Set PAD data input fifo name" @@ -137,12 +138,13 @@ int main(int argc, char *argv[]) string send_stats_to = ""; /* Data for ZMQ CURVE authentication */ - char* keyfile = nullptr; - char secretkey[CURVE_KEYLEN+1]; + char *keyfile = nullptr; const struct option longopts[] = { {"bitrate", required_argument, 0, 'b'}, {"channels", required_argument, 0, 'c'}, + {"edi", required_argument, 0, 'e'}, + {"timestamp-delay", required_argument, 0, 'T'}, {"output", required_argument, 0, 'o'}, {"pad", required_argument, 0, 'p'}, {"pad-fifo", required_argument, 0, 'P'}, @@ -183,13 +185,17 @@ int main(int argc, char *argv[]) bool allowSBR = false; bool allowPS = false; + vector edi_output_uris; + bool tist_enabled = false; + uint32_t tist_delay_ms = 0; + int bitrate = 0; int channels = 2; int sample_rate = 48000; char ch = 0; int index; while(ch != -1) { - ch = getopt_long(argc, argv, "hlb:c:k:o:r:p:P:I:", longopts, &index); + ch = getopt_long(argc, argv, "hlb:c:e:T:k:o:r:p:P:I:", longopts, &index); switch (ch) { case 0: // AAC-LC allowPS = false; @@ -204,10 +210,17 @@ int main(int argc, char *argv[]) allowSBR = true; break; case 'b': - bitrate = atoi(optarg); + bitrate = stoi(optarg); break; case 'c': - channels = atoi(optarg); + channels = stoi(optarg); + break; + case 'e': + edi_output_uris.push_back(optarg); + break; + case 'T': + tist_enabled = true; + tist_delay_ms = std::stoi(optarg); break; case 'k': keyfile = optarg; @@ -219,13 +232,13 @@ int main(int argc, char *argv[]) output_uris.push_back(optarg); break; case 'p': - padlen = atoi(optarg); + padlen = stoi(optarg); break; case 'P': pad_fifo = optarg; break; case 'r': - sample_rate = atoi(optarg); + sample_rate = stoi(optarg); break; case 'S': send_stats_to = optarg; @@ -238,16 +251,16 @@ int main(int argc, char *argv[]) avt_output_uri = optarg; break; case 7: - avt_timeout = atoi(optarg); + avt_timeout = stoi(optarg); if (avt_timeout < 0) { avt_timeout = 2000; } break; case 8: - avt_pad_port = atoi(optarg); + avt_pad_port = stoi(optarg); break; case 9: - avt_jitterBufferSize = atoi(optarg); + avt_jitterBufferSize = stoi(optarg); break; case '?': case 'h': @@ -266,33 +279,52 @@ int main(int argc, char *argv[]) return 1; } - zmq::context_t zmq_ctx; - zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB); + shared_ptr zmq_output; + Output::EDI edi_output; - if (not output_uris.empty()) { - for (auto uri : output_uris) { - if (keyfile) { - fprintf(stderr, "Enabling encryption\n"); + if (output_uris.empty() and edi_output_uris.empty()) { + fprintf(stderr, "No output URIs defined\n"); + return 1; + } - int rc = readkey(keyfile, secretkey); - if (rc) { - fprintf(stderr, "Error reading secret key\n"); - return 2; - } + for (const auto& uri : output_uris) { + if (not zmq_output) { + zmq_output = make_shared(); + } - const int yes = 1; - zmq_sock.setsockopt(ZMQ_CURVE_SERVER, - &yes, sizeof(yes)); + zmq_output->connect(uri.c_str(), keyfile); + } - zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, - secretkey, CURVE_KEYLEN); + for (const auto& uri : edi_output_uris) { + if (uri.compare(0, 6, "tcp://") == 0 or + uri.compare(0, 6, "udp://") == 0) { + auto host_port_sep_ix = uri.find(':', 6); + if (host_port_sep_ix != string::npos) { + auto host = uri.substr(6, host_port_sep_ix - 6); + auto port = std::stoi(uri.substr(host_port_sep_ix + 1)); + + auto proto = uri.substr(0, 3); + if (proto == "tcp") { + edi_output.add_tcp_destination(host, port); + } + else if (proto == "udp") { + edi_output.add_udp_destination(host, port); + } + else { + throw logic_error("unhandled proto"); + } } - zmq_sock.connect(uri.c_str()); + else { + fprintf(stderr, "Invalid EDI URL host!\n"); + } + } + else { + fprintf(stderr, "Invalid EDI protocol!\n"); } } - else { - fprintf(stderr, "No output URI defined\n"); - return 1; + + if (not edi_output_uris.empty()) { + edi_output.set_tist(tist_enabled, tist_delay_ms); } if (padlen != 0) { @@ -350,19 +382,15 @@ int main(int argc, char *argv[]) } int outbuf_size; - std::vector zmqframebuf; std::vector outbuf; outbuf_size = bitrate/8*120; outbuf.resize(24*120); - zmqframebuf.resize(ZMQ_HEADER_SIZE + 24*120); if (outbuf_size % 5 != 0) { fprintf(stderr, "Warning: (outbuf_size mod 5) = %d\n", outbuf_size % 5); } - zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)&zmqframebuf[0]; - unsigned char pad_buf[padlen + 1]; fprintf(stderr, "Starting encoding\n"); @@ -449,28 +477,22 @@ int main(int argc, char *argv[]) read_bytes = numOutBytes; if (numOutBytes != 0) { - // ------------ ZeroMQ transmit - try { - zmq_frame_header->encoder = ZMQ_ENCODER_FDK; - zmq_frame_header->version = 1; - zmq_frame_header->datasize = numOutBytes; - zmq_frame_header->audiolevel_left = peak_left; - zmq_frame_header->audiolevel_right = peak_right; - - assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= zmqframebuf.size()); - - memcpy(ZMQ_FRAME_DATA(zmq_frame_header), - &outbuf[0], numOutBytes); - zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header), - ZMQ_DONTWAIT); + bool success = false; + if (zmq_output) { + zmq_output->update_audio_levels(peak_left, peak_right); + success = zmq_output->write_frame(outbuf.data(), outbuf.size()); } - catch (zmq::error_t& e) { - fprintf(stderr, "ZeroMQ send error !\n"); - send_error_count ++; + else if (edi_output.enabled()) { + edi_output.update_audio_levels(peak_left, peak_right); + success = edi_output.write_frame(outbuf.data(), outbuf.size()); + } + + if (not success) { + send_error_count++; } if (send_error_count > 10) { - fprintf(stderr, "ZeroMQ send failed ten times, aborting!\n"); + fprintf(stderr, "Send failed ten times, aborting!\n"); retval = 4; break; } @@ -500,7 +522,5 @@ int main(int argc, char *argv[]) fprintf(stderr, "\n"); - zmq_sock.close(); - return retval; } -- cgit v1.2.3