summaryrefslogtreecommitdiffstats
path: root/src/input
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-10-30 15:29:31 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-10-30 15:29:31 +0100
commit128768f7fd719eb455a946a0f716d7128b4ded63 (patch)
treedfa03ccfed3f175182b04fd84bd59aacd623ac54 /src/input
parent555121f96e769fdeb9529e7381560d8bbb6e2713 (diff)
downloaddabmux-128768f7fd719eb455a946a0f716d7128b4ded63.tar.gz
dabmux-128768f7fd719eb455a946a0f716d7128b4ded63.tar.bz2
dabmux-128768f7fd719eb455a946a0f716d7128b4ded63.zip
Start reworking inputs, break all but Prbs and ZMQ
Diffstat (limited to 'src/input')
-rw-r--r--src/input/Prbs.cpp101
-rw-r--r--src/input/Prbs.h56
-rw-r--r--src/input/Zmq.cpp625
-rw-r--r--src/input/Zmq.h271
4 files changed, 1053 insertions, 0 deletions
diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp
new file mode 100644
index 0000000..b9e244b
--- /dev/null
+++ b/src/input/Prbs.cpp
@@ -0,0 +1,101 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ Pseudo-Random Bit Sequence generator for test purposes.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Prbs.h"
+
+#include <stdexcept>
+#include <sstream>
+#include <string.h>
+#include <limits.h>
+#include <stdlib.h>
+#include <errno.h>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+// ETS 300 799 Clause G.2.1
+// Preferred polynomial is G(x) = x^20 + x^17 + 1
+const uint32_t PRBS_DEFAULT_POLY = (1 << 19) | (1 << 16) | 1;
+
+int Prbs::open(const string& name)
+{
+ if (name.empty()) {
+ m_prbs.setup(PRBS_DEFAULT_POLY);
+ }
+ else {
+ if (name[0] != ':') {
+ throw invalid_argument(
+ "Invalid PRBS address format. "
+ "Must be prbs://:polynomial.");
+ }
+
+ const string poly_str = name.substr(1);
+
+ long polynomial = hexparse(poly_str);
+
+ if (polynomial == 0) {
+ throw invalid_argument("No polynomial given for PRBS input");
+ }
+
+ m_prbs.setup(polynomial);
+ }
+ rewind();
+
+ return 0;
+}
+
+int Prbs::readFrame(void* buffer, int size)
+{
+ unsigned char* cbuffer = reinterpret_cast<unsigned char*>(buffer);
+
+ for (int i = 0; i < size; ++i) {
+ cbuffer[i] = m_prbs.step();
+ }
+
+ return size;
+}
+
+int Prbs::setBitrate(int bitrate)
+{
+ return bitrate;
+}
+
+int Prbs::close()
+{
+ return 0;
+}
+
+int Prbs::rewind()
+{
+ m_prbs.rewind();
+ return 0;
+}
+
+};
diff --git a/src/input/Prbs.h b/src/input/Prbs.h
new file mode 100644
index 0000000..47b52ad
--- /dev/null
+++ b/src/input/Prbs.h
@@ -0,0 +1,56 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ Pseudo-Random Bit Sequence generator for test purposes.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <string>
+
+#include "input/inputs.h"
+#include "prbs.h"
+
+namespace Inputs {
+
+class Prbs : public InputBase {
+ public:
+ virtual int open(const std::string& name);
+ virtual int readFrame(void* buffer, int size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ private:
+ virtual int rewind();
+
+ PrbsGenerator m_prbs;
+};
+
+};
+
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
new file mode 100644
index 0000000..6ef5fce
--- /dev/null
+++ b/src/input/Zmq.cpp
@@ -0,0 +1,625 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016 Matthias P. Braendli
+ http://www.opendigitalradio.org
+
+ ZeroMQ input. see www.zeromq.org for more info
+
+ For the AAC+ input, each zeromq message must contain one superframe
+ or one zmq_frame_header_t followed by a superframe.
+
+ For the MPEG input, each zeromq message must contain one frame.
+
+ Encryption is provided by zmq_curve, see the corresponding manpage.
+
+ From the ZeroMQ manpage 'zmq':
+
+ The 0MQ lightweight messaging kernel is a library which extends the standard
+ socket interfaces with features traditionally provided by specialised
+ messaging middleware products. 0MQ sockets provide an abstraction of
+ asynchronous message queues, multiple messaging patterns, message filtering
+ (subscriptions), seamless access to multiple transport protocols and more.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Zmq.h"
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#ifdef HAVE_INPUT_ZEROMQ
+
+#include "zmq.hpp"
+#include <cstdio>
+#include <cstdlib>
+#include <list>
+#include <exception>
+#include <cstring>
+#include <string>
+#include <sstream>
+#include <limits.h>
+#include "PcDebug.h"
+#include "Log.h"
+
+#ifdef __MINGW32__
+# define bzero(s, n) memset(s, 0, n)
+#endif
+
+namespace Inputs {
+
+using namespace std;
+
+int readkey(string& keyfile, char* key)
+{
+ FILE* fd = fopen(keyfile.c_str(), "r");
+ if (fd == nullptr) {
+ return -1;
+ }
+
+ int ret = fread(key, CURVE_KEYLEN, 1, fd);
+ fclose(fd);
+ if (ret == 0) {
+ return -1;
+ }
+
+ /* It needs to be zero-terminated */
+ key[CURVE_KEYLEN] = '\0';
+
+ return 0;
+}
+
+/***** Common functions (MPEG and AAC) ******/
+
+/* If necessary, unbind the socket, then check the keys,
+ * if they are ok and encryption is required, set the
+ * keys to the socket, and finally bind the socket
+ * to the new address
+ */
+void ZmqBase::rebind()
+{
+ if (! m_zmq_sock_bound_to.empty()) {
+ try {
+ m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str());
+ }
+ catch (zmq::error_t& err) {
+ etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed";
+ }
+ }
+
+ m_zmq_sock_bound_to = "";
+
+ /* Load each key independently */
+ if (! m_config.curve_public_keyfile.empty()) {
+ int rc = readkey(m_config.curve_public_keyfile, m_curve_public_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid public key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_public_key);
+ }
+ }
+
+ if (! m_config.curve_secret_keyfile.empty()) {
+ int rc = readkey(m_config.curve_secret_keyfile, m_curve_secret_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid secret key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_secret_key);
+ }
+ }
+
+ if (! m_config.curve_encoder_keyfile.empty()) {
+ int rc = readkey(m_config.curve_encoder_keyfile, m_curve_encoder_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid encoder key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_encoder_key);
+ }
+ }
+
+ /* If you want encryption, you need to have defined all
+ * key files
+ */
+ if ( m_config.enable_encryption &&
+ ( ! (KEY_VALID(m_curve_public_key) &&
+ KEY_VALID(m_curve_secret_key) &&
+ KEY_VALID(m_curve_encoder_key) ) ) ) {
+ throw std::runtime_error("When enabling encryption, all three "
+ "keyfiles must be valid!");
+ }
+
+ if (m_config.enable_encryption) {
+ try {
+ /* We want to check that the encoder is the right one,
+ * so the encoder is the CURVE server.
+ */
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SERVERKEY,
+ m_curve_encoder_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set encoder key for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_CURVE_PUBLICKEY,
+ m_curve_public_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set public key for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY,
+ m_curve_secret_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set secret key for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+ }
+ else {
+ try {
+ /* This forces the socket to go to the ZMQ_NULL auth
+ * mechanism
+ */
+ const int no = 0;
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SERVER, &no, sizeof(no));
+ }
+ catch (zmq::error_t& err) {
+ etiLog.level(warn) << "ZMQ disable encryption keys for input " <<
+ m_name << " failed: " << err.what();
+ }
+
+ }
+
+ // Prepare the ZMQ socket to accept connections
+ try {
+ m_zmq_sock.bind(m_inputUri.c_str());
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ bind for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+
+ m_zmq_sock_bound_to = m_inputUri;
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set socket options for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+}
+
+int ZmqBase::open(const std::string& inputUri)
+{
+ m_inputUri = inputUri;
+
+ /* Let caller handle exceptions when we open() */
+ rebind();
+
+ // We want to appear in the statistics !
+ m_stats.registerAtServer();
+
+ return 0;
+}
+
+int ZmqBase::close()
+{
+ m_zmq_sock.close();
+ return 0;
+}
+
+int ZmqBase::setBitrate(int bitrate)
+{
+ m_bitrate = bitrate;
+ return bitrate; // TODO do a nice check here
+}
+
+// size corresponds to a frame size. It is constant for a given bitrate
+int ZmqBase::readFrame(void* buffer, int size)
+{
+ int rc;
+
+ /* We must *always* read data from the ZMQ socket,
+ * to make sure that ZMQ internal buffers are emptied
+ * quickly. It's the only way to control the buffers
+ * of the whole path from encoder to our frame_buffer.
+ */
+ rc = readFromSocket(size);
+
+ /* Notify of a buffer overrun, and drop some frames */
+ if (m_frame_buffer.size() >= m_config.buffer_size) {
+ m_stats.notifyOverrun();
+
+ /* If the buffer is really too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length.
+ */
+ if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) {
+ size_t over_max = m_frame_buffer.size() - m_config.prebuffering;
+
+ while (over_max--) {
+ delete[] m_frame_buffer.front();
+ m_frame_buffer.pop_front();
+ }
+ }
+ else {
+ /* Our frame_buffer contains DAB logical frames. Five of these make one
+ * AAC superframe.
+ *
+ * Dropping this superframe amounts to dropping 120ms of audio.
+ *
+ * We're actually not sure to drop five DAB logical frames
+ * belonging to the same AAC superframe. It is assumed that no
+ * receiver will crash because of this. At least, the DAB logical frame
+ * vs. AAC superframe alignment is preserved.
+ *
+ * TODO: of course this assumption probably doesn't hold. Fix this !
+ * TODO: also, with MPEG, the above doesn't hold, so we drop five
+ * frames even though we could drop less.
+ * */
+ for (int frame_del_count = 0; frame_del_count < 5; frame_del_count++) {
+ delete[] m_frame_buffer.front();
+ m_frame_buffer.pop_front();
+ }
+ }
+ }
+
+ if (m_prebuf_current > 0) {
+ if (rc > 0)
+ m_prebuf_current--;
+ if (m_prebuf_current == 0)
+ etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
+ m_name.c_str());
+
+ /* During prebuffering, give a zeroed frame to the mux */
+ m_stats.notifyUnderrun();
+ memset(buffer, 0, size);
+ return size;
+ }
+
+ // Save stats data in bytes, not in frames
+ m_stats.notifyBuffer(m_frame_buffer.size() * size);
+
+ if (m_frame_buffer.empty()) {
+ etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
+ m_name.c_str());
+ // reset prebuffering
+ m_prebuf_current = m_config.prebuffering;
+
+ /* We have no data to give, we give a zeroed frame */
+ m_stats.notifyUnderrun();
+ memset(buffer, 0, size);
+ return size;
+ }
+ else
+ {
+ /* Normal situation, give a frame from the frame_buffer */
+ uint8_t* newframe = m_frame_buffer.front();
+ memcpy(buffer, newframe, size);
+ delete[] newframe;
+ m_frame_buffer.pop_front();
+ return size;
+ }
+}
+
+
+/******** MPEG input *******/
+
+// Read a MPEG frame from the socket, and push to list
+int ZmqMPEG::readFromSocket(size_t framesize)
+{
+ bool messageReceived = false;
+ zmq::message_t msg;
+
+ try {
+ messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ if (!messageReceived) {
+ return 0;
+ }
+
+ }
+ catch (zmq::error_t& err)
+ {
+ etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " <<
+ m_name << ": " << err.what();
+ }
+
+ /* This is the old 'one superframe per ZMQ message' format */
+ uint8_t* data = (uint8_t*)msg.data();
+ size_t datalen = msg.size();
+
+ /* Look for the new zmq_frame_header_t format */
+ zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data();
+
+ if (msg.size() == ZMQ_FRAME_SIZE(frame) &&
+ frame->version == 1 &&
+ frame->encoder == ZMQ_ENCODER_TOOLAME) {
+ datalen = frame->datasize;
+ data = ZMQ_FRAME_DATA(frame);
+
+ m_stats.notifyPeakLevels(frame->audiolevel_left,
+ frame->audiolevel_right);
+ }
+
+
+ if (datalen == framesize)
+ {
+ if (m_frame_buffer.size() > m_config.buffer_size) {
+ etiLog.level(warn) <<
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
+ " dropping incoming frame !";
+ messageReceived = false;
+ }
+ else if (m_enable_input) {
+ // copy the input frame blockwise into the frame_buffer
+ auto framedata = new uint8_t[framesize];
+ memcpy(framedata, data, framesize);
+ m_frame_buffer.push_back(framedata);
+ }
+ else {
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " verify bitrate: recv'd " << msg.size() << " B" <<
+ ", need " << framesize << ".";
+ messageReceived = false;
+ }
+
+ return messageReceived ? msg.size() : 0;
+}
+
+/******** AAC+ input *******/
+
+// Read a AAC+ superframe from the socket, cut it into five frames,
+// and push to list
+int ZmqAAC::readFromSocket(size_t framesize)
+{
+ bool messageReceived;
+ zmq::message_t msg;
+
+ try {
+ messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ if (!messageReceived) {
+ return 0;
+ }
+
+ }
+ catch (zmq::error_t& err)
+ {
+ etiLog.level(error) <<
+ "Failed to receive AAC superframe from zmq socket " <<
+ m_name << ": " << err.what();
+ }
+
+ /* This is the old 'one superframe per ZMQ message' format */
+ uint8_t* data = (uint8_t*)msg.data();
+ size_t datalen = msg.size();
+
+ /* Look for the new zmq_frame_header_t format */
+ zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data();
+
+ if (msg.size() == ZMQ_FRAME_SIZE(frame) &&
+ frame->version == 1 &&
+ frame->encoder == ZMQ_ENCODER_FDK) {
+ datalen = frame->datasize;
+ data = ZMQ_FRAME_DATA(frame);
+
+ m_stats.notifyPeakLevels(frame->audiolevel_left,
+ frame->audiolevel_right);
+ }
+
+
+ /* TS 102 563, Section 6:
+ * Audio super frames are transported in five successive DAB logical frames
+ * with additional error protection.
+ */
+ if (datalen)
+ {
+ if (datalen == 5*framesize)
+ {
+ if (m_frame_buffer.size() > m_config.buffer_size) {
+ etiLog.level(warn) <<
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
+ " dropping incoming superframe !";
+ datalen = 0;
+ }
+ else if (m_enable_input) {
+ // copy the input frame blockwise into the frame_buffer
+ for (uint8_t* framestart = data;
+ framestart < &data[5*framesize];
+ framestart += framesize) {
+ auto audioframe = new uint8_t[framesize];
+ memcpy(audioframe, framestart, framesize);
+ m_frame_buffer.push_back(audioframe);
+ }
+ }
+ else {
+ datalen = 0;
+ }
+ }
+ else {
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " verify bitrate: recv'd " << msg.size() << " B" <<
+ ", need " << 5*framesize << ".";
+
+ datalen = 0;
+ }
+ }
+ else {
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " invalid frame received";
+ }
+
+ return datalen;
+}
+
+/********* REMOTE CONTROL ***********/
+
+void ZmqBase::set_parameter(const string& parameter,
+ const string& value)
+{
+ if (parameter == "buffer") {
+ size_t new_limit = atol(value.c_str());
+
+ if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too small."
+ " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) );
+ }
+ else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too large."
+ " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
+ }
+
+ m_config.buffer_size = new_limit;
+ }
+ else if (parameter == "prebuffering") {
+ size_t new_prebuf = atol(value.c_str());
+
+ if (new_prebuf < INPUT_ZMQ_MIN_BUFFER_SIZE) {
+ throw ParameterError("Desired prebuffering too small."
+ " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) );
+ }
+ else if (new_prebuf > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ throw ParameterError("Desired prebuffering too large."
+ " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
+ }
+
+ m_config.prebuffering = new_prebuf;
+ }
+ else if (parameter == "enable") {
+ if (value == "1") {
+ m_enable_input = true;
+ }
+ else if (value == "0") {
+ m_enable_input = false;
+ }
+ else {
+ throw ParameterError("Value not understood, specify 0 or 1.");
+ }
+ }
+ else if (parameter == "encryption") {
+ if (value == "1") {
+ m_config.enable_encryption = true;
+ }
+ else if (value == "0") {
+ m_config.enable_encryption = false;
+ }
+ else {
+ throw ParameterError("Value not understood, specify 0 or 1.");
+ }
+
+ try {
+ rebind();
+ }
+ catch (std::runtime_error &e) {
+ stringstream ss;
+ ss << "Could not bind socket again with new keys." <<
+ e.what();
+ throw ParameterError(ss.str());
+ }
+ }
+ else if (parameter == "secretkey") {
+ m_config.curve_secret_keyfile = value;
+ }
+ else if (parameter == "publickey") {
+ m_config.curve_public_keyfile = value;
+ }
+ else if (parameter == "encoderkey") {
+ m_config.curve_encoder_keyfile = value;
+ }
+ else {
+ stringstream ss;
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+}
+
+const string ZmqBase::get_parameter(const string& parameter) const
+{
+ stringstream ss;
+ if (parameter == "buffer") {
+ ss << m_config.buffer_size;
+ }
+ else if (parameter == "prebuffering") {
+ ss << m_config.prebuffering;
+ }
+ else if (parameter == "enable") {
+ if (m_enable_input)
+ ss << "true";
+ else
+ ss << "false";
+ }
+ else if (parameter == "encryption") {
+ if (m_config.enable_encryption)
+ ss << "true";
+ else
+ ss << "false";
+ }
+ else if (parameter == "secretkey") {
+ ss << m_config.curve_secret_keyfile;
+ }
+ else if (parameter == "publickey") {
+ ss << m_config.curve_public_keyfile;
+ }
+ else if (parameter == "encoderkey") {
+ ss << m_config.curve_encoder_keyfile;
+ }
+ else {
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+
+}
+
+};
+
+#endif
+
diff --git a/src/input/Zmq.h b/src/input/Zmq.h
new file mode 100644
index 0000000..d1dd2d5
--- /dev/null
+++ b/src/input/Zmq.h
@@ -0,0 +1,271 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016 Matthias P. Braendli
+ http://www.opendigitalradio.org
+
+ ZeroMQ input. see www.zeromq.org for more info
+
+ For the AAC+ input, each zeromq message must contain one superframe,
+ or one zmq_frame_header_t followed by a superframe.
+
+ For the MPEG input, each zeromq message must contain one frame.
+
+ Encryption is provided by zmq_curve, see the corresponding manpage.
+
+ From the ZeroMQ manpage 'zmq':
+
+ The 0MQ lightweight messaging kernel is a library which extends the standard
+ socket interfaces with features traditionally provided by specialised
+ messaging middleware products. 0MQ sockets provide an abstraction of
+ asynchronous message queues, multiple messaging patterns, message filtering
+ (subscriptions), seamless access to multiple transport protocols and more.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ It defines a ZeroMQ input for dabplus data.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#ifdef HAVE_INPUT_ZEROMQ
+
+#include <list>
+#include <string>
+#include <stdint.h>
+#include "zmq.hpp"
+#include "input/inputs.h"
+#include "ManagementServer.h"
+
+namespace Inputs {
+
+/* The frame_buffer contains DAB logical frames as defined in
+ * TS 102 563, section 6.
+ * Five elements of this buffer make one AAC superframe (120ms audio)
+ */
+
+// Minimum frame_buffer size in number of elements
+// This is one AAC superframe, but you probably don't want to
+// go that low anyway.
+const size_t INPUT_ZMQ_MIN_BUFFER_SIZE = 5*1; // 120ms
+
+// Maximum frame_buffer size in number of elements
+// One minute is clearly way over what everybody would
+// want.
+const size_t INPUT_ZMQ_MAX_BUFFER_SIZE = 5*500; // 60s
+
+/* The ZeroMQ Curve key is 40 bytes long in Z85 representation
+ *
+ * But we need to store it as zero-terminated string.
+ */
+const size_t CURVE_KEYLEN = 40;
+
+/* helper to invalidate a key */
+#define INVALIDATE_KEY(k) memset(k, 0, CURVE_KEYLEN+1)
+
+/* Verification for key validity */
+#define KEY_VALID(k) (k[0] != '\0')
+
+/* Read a key from file into key
+ *
+ * Returns 0 on success, negative value on failure
+ */
+int readkey(std::string& keyfile, char* key);
+
+struct dab_input_zmq_config_t
+{
+ /* The size of the internal buffer, measured in number
+ * of elements.
+ *
+ * Each element corresponds to five frames,
+ * or one AAC superframe.
+ */
+ size_t buffer_size;
+
+ /* The amount of prebuffering to do before we start streaming
+ *
+ * Same units as buffer_size
+ */
+ size_t prebuffering;
+
+ /* Whether to enforce encryption or not
+ */
+ bool enable_encryption;
+
+ /* Full path to file containing public key.
+ */
+ std::string curve_public_keyfile;
+
+ /* Full path to file containing secret key.
+ */
+ std::string curve_secret_keyfile;
+
+ /* Full path to file containing encoder public key.
+ */
+ std::string curve_encoder_keyfile;
+};
+
+#define ZMQ_ENCODER_FDK 1
+#define ZMQ_ENCODER_TOOLAME 2
+
+/* This defines the on-wire representation of a ZMQ message header.
+ *
+ * The data follows right after this header */
+struct zmq_frame_header_t
+{
+ uint16_t version; // we support version=1 now
+ uint16_t encoder; // see ZMQ_ENCODER_XYZ
+
+ /* length of the 'data' field */
+ uint32_t datasize;
+
+ /* Audio level, peak, linear PCM */
+ int16_t audiolevel_left;
+ int16_t audiolevel_right;
+
+ /* Data follows this header */
+} __attribute__ ((packed));
+
+/* The expected frame size incl data of the given frame */
+#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize)
+
+#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) )
+
+
+class ZmqBase : public InputBase, public RemoteControllable {
+ public:
+ ZmqBase(const std::string name,
+ dab_input_zmq_config_t config)
+ : RemoteControllable(name),
+ m_zmq_context(1),
+ m_zmq_sock(m_zmq_context, ZMQ_SUB),
+ m_zmq_sock_bound_to(""),
+ m_bitrate(0),
+ m_enable_input(true),
+ m_config(config),
+ m_stats(m_name),
+ m_prebuf_current(config.prebuffering) {
+ RC_ADD_PARAMETER(enable,
+ "If the input is enabled. Set to zero to empty the buffer.");
+
+ RC_ADD_PARAMETER(encryption,
+ "If encryption is enabled or disabled [1 or 0]."
+ " If 1 is written, the keys are reloaded.");
+
+ RC_ADD_PARAMETER(publickey,
+ "The multiplexer's public key file.");
+
+ RC_ADD_PARAMETER(secretkey,
+ "The multiplexer's secret key file.");
+
+ RC_ADD_PARAMETER(encoderkey,
+ "The encoder's public key file.");
+
+ /* Set all keys to zero */
+ INVALIDATE_KEY(m_curve_public_key);
+ INVALIDATE_KEY(m_curve_secret_key);
+ INVALIDATE_KEY(m_curve_encoder_key);
+ }
+
+ virtual int open(const std::string& inputUri);
+ virtual int readFrame(void* buffer, int size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ /* Remote control */
+ virtual void set_parameter(const std::string& parameter,
+ const std::string& value);
+
+ /* Getting a parameter always returns a string. */
+ virtual const std::string get_parameter(const std::string& parameter) const;
+
+ protected:
+ virtual int readFromSocket(size_t framesize) = 0;
+
+ virtual void rebind();
+
+ zmq::context_t m_zmq_context;
+ zmq::socket_t m_zmq_sock; // handle for the zmq socket
+
+ /* If the socket is bound, this saves the endpoint,
+ * otherwise, it's an empty string
+ */
+ std::string m_zmq_sock_bound_to;
+ int m_bitrate;
+
+ /* set this to zero to empty the input buffer */
+ bool m_enable_input;
+
+ /* stores elements of type char[<superframesize>] */
+ std::list<uint8_t*> m_frame_buffer;
+
+ dab_input_zmq_config_t m_config;
+
+ /* Key management, keys need to be zero-terminated */
+ char m_curve_public_key[CURVE_KEYLEN+1];
+ char m_curve_secret_key[CURVE_KEYLEN+1];
+ char m_curve_encoder_key[CURVE_KEYLEN+1];
+
+ std::string m_inputUri;
+
+ InputStat m_stats;
+
+ private:
+ size_t m_prebuf_current;
+};
+
+class ZmqMPEG : public ZmqBase {
+ public:
+ ZmqMPEG(const std::string name,
+ dab_input_zmq_config_t config)
+ : ZmqBase(name, config) {
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [mpeg frames]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [mpeg frames]");
+ }
+
+ private:
+ virtual int readFromSocket(size_t framesize);
+};
+
+class ZmqAAC : public ZmqBase {
+ public:
+ ZmqAAC(const std::string name,
+ dab_input_zmq_config_t config)
+ : ZmqBase(name, config) {
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [aac superframes]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [aac superframes]");
+ }
+
+ private:
+ virtual int readFromSocket(size_t framesize);
+};
+
+};
+#endif // HAVE_INPUT_ZMQ
+
+