diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-10-30 15:29:31 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2016-10-30 15:29:31 +0100 |
commit | 128768f7fd719eb455a946a0f716d7128b4ded63 (patch) | |
tree | dfa03ccfed3f175182b04fd84bd59aacd623ac54 /src/input | |
parent | 555121f96e769fdeb9529e7381560d8bbb6e2713 (diff) | |
download | dabmux-128768f7fd719eb455a946a0f716d7128b4ded63.tar.gz dabmux-128768f7fd719eb455a946a0f716d7128b4ded63.tar.bz2 dabmux-128768f7fd719eb455a946a0f716d7128b4ded63.zip |
Start reworking inputs, break all but Prbs and ZMQ
Diffstat (limited to 'src/input')
-rw-r--r-- | src/input/Prbs.cpp | 101 | ||||
-rw-r--r-- | src/input/Prbs.h | 56 | ||||
-rw-r--r-- | src/input/Zmq.cpp | 625 | ||||
-rw-r--r-- | src/input/Zmq.h | 271 |
4 files changed, 1053 insertions, 0 deletions
diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp new file mode 100644 index 0000000..b9e244b --- /dev/null +++ b/src/input/Prbs.cpp @@ -0,0 +1,101 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + Pseudo-Random Bit Sequence generator for test purposes. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "input/Prbs.h" + +#include <stdexcept> +#include <sstream> +#include <string.h> +#include <limits.h> +#include <stdlib.h> +#include <errno.h> +#include "utils.h" + +using namespace std; + +namespace Inputs { + +// ETS 300 799 Clause G.2.1 +// Preferred polynomial is G(x) = x^20 + x^17 + 1 +const uint32_t PRBS_DEFAULT_POLY = (1 << 19) | (1 << 16) | 1; + +int Prbs::open(const string& name) +{ + if (name.empty()) { + m_prbs.setup(PRBS_DEFAULT_POLY); + } + else { + if (name[0] != ':') { + throw invalid_argument( + "Invalid PRBS address format. " + "Must be prbs://:polynomial."); + } + + const string poly_str = name.substr(1); + + long polynomial = hexparse(poly_str); + + if (polynomial == 0) { + throw invalid_argument("No polynomial given for PRBS input"); + } + + m_prbs.setup(polynomial); + } + rewind(); + + return 0; +} + +int Prbs::readFrame(void* buffer, int size) +{ + unsigned char* cbuffer = reinterpret_cast<unsigned char*>(buffer); + + for (int i = 0; i < size; ++i) { + cbuffer[i] = m_prbs.step(); + } + + return size; +} + +int Prbs::setBitrate(int bitrate) +{ + return bitrate; +} + +int Prbs::close() +{ + return 0; +} + +int Prbs::rewind() +{ + m_prbs.rewind(); + return 0; +} + +}; diff --git a/src/input/Prbs.h b/src/input/Prbs.h new file mode 100644 index 0000000..47b52ad --- /dev/null +++ b/src/input/Prbs.h @@ -0,0 +1,56 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + Pseudo-Random Bit Sequence generator for test purposes. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include <string> + +#include "input/inputs.h" +#include "prbs.h" + +namespace Inputs { + +class Prbs : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + private: + virtual int rewind(); + + PrbsGenerator m_prbs; +}; + +}; + diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp new file mode 100644 index 0000000..6ef5fce --- /dev/null +++ b/src/input/Zmq.cpp @@ -0,0 +1,625 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + ZeroMQ input. see www.zeromq.org for more info + + For the AAC+ input, each zeromq message must contain one superframe + or one zmq_frame_header_t followed by a superframe. + + For the MPEG input, each zeromq message must contain one frame. + + Encryption is provided by zmq_curve, see the corresponding manpage. + + From the ZeroMQ manpage 'zmq': + + The 0MQ lightweight messaging kernel is a library which extends the standard + socket interfaces with features traditionally provided by specialised + messaging middleware products. 0MQ sockets provide an abstraction of + asynchronous message queues, multiple messaging patterns, message filtering + (subscriptions), seamless access to multiple transport protocols and more. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "input/Zmq.h" + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef HAVE_INPUT_ZEROMQ + +#include "zmq.hpp" +#include <cstdio> +#include <cstdlib> +#include <list> +#include <exception> +#include <cstring> +#include <string> +#include <sstream> +#include <limits.h> +#include "PcDebug.h" +#include "Log.h" + +#ifdef __MINGW32__ +# define bzero(s, n) memset(s, 0, n) +#endif + +namespace Inputs { + +using namespace std; + +int readkey(string& keyfile, char* key) +{ + FILE* fd = fopen(keyfile.c_str(), "r"); + if (fd == nullptr) { + return -1; + } + + int ret = fread(key, CURVE_KEYLEN, 1, fd); + fclose(fd); + if (ret == 0) { + return -1; + } + + /* It needs to be zero-terminated */ + key[CURVE_KEYLEN] = '\0'; + + return 0; +} + +/***** Common functions (MPEG and AAC) ******/ + +/* If necessary, unbind the socket, then check the keys, + * if they are ok and encryption is required, set the + * keys to the socket, and finally bind the socket + * to the new address + */ +void ZmqBase::rebind() +{ + if (! m_zmq_sock_bound_to.empty()) { + try { + m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str()); + } + catch (zmq::error_t& err) { + etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed"; + } + } + + m_zmq_sock_bound_to = ""; + + /* Load each key independently */ + if (! m_config.curve_public_keyfile.empty()) { + int rc = readkey(m_config.curve_public_keyfile, m_curve_public_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid public key for input " << + m_name; + + INVALIDATE_KEY(m_curve_public_key); + } + } + + if (! m_config.curve_secret_keyfile.empty()) { + int rc = readkey(m_config.curve_secret_keyfile, m_curve_secret_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid secret key for input " << + m_name; + + INVALIDATE_KEY(m_curve_secret_key); + } + } + + if (! m_config.curve_encoder_keyfile.empty()) { + int rc = readkey(m_config.curve_encoder_keyfile, m_curve_encoder_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid encoder key for input " << + m_name; + + INVALIDATE_KEY(m_curve_encoder_key); + } + } + + /* If you want encryption, you need to have defined all + * key files + */ + if ( m_config.enable_encryption && + ( ! (KEY_VALID(m_curve_public_key) && + KEY_VALID(m_curve_secret_key) && + KEY_VALID(m_curve_encoder_key) ) ) ) { + throw std::runtime_error("When enabling encryption, all three " + "keyfiles must be valid!"); + } + + if (m_config.enable_encryption) { + try { + /* We want to check that the encoder is the right one, + * so the encoder is the CURVE server. + */ + m_zmq_sock.setsockopt(ZMQ_CURVE_SERVERKEY, + m_curve_encoder_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set encoder key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + try { + m_zmq_sock.setsockopt(ZMQ_CURVE_PUBLICKEY, + m_curve_public_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set public key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + try { + m_zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, + m_curve_secret_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set secret key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + } + else { + try { + /* This forces the socket to go to the ZMQ_NULL auth + * mechanism + */ + const int no = 0; + m_zmq_sock.setsockopt(ZMQ_CURVE_SERVER, &no, sizeof(no)); + } + catch (zmq::error_t& err) { + etiLog.level(warn) << "ZMQ disable encryption keys for input " << + m_name << " failed: " << err.what(); + } + + } + + // Prepare the ZMQ socket to accept connections + try { + m_zmq_sock.bind(m_inputUri.c_str()); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ bind for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + m_zmq_sock_bound_to = m_inputUri; + + try { + m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set socket options for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } +} + +int ZmqBase::open(const std::string& inputUri) +{ + m_inputUri = inputUri; + + /* Let caller handle exceptions when we open() */ + rebind(); + + // We want to appear in the statistics ! + m_stats.registerAtServer(); + + return 0; +} + +int ZmqBase::close() +{ + m_zmq_sock.close(); + return 0; +} + +int ZmqBase::setBitrate(int bitrate) +{ + m_bitrate = bitrate; + return bitrate; // TODO do a nice check here +} + +// size corresponds to a frame size. It is constant for a given bitrate +int ZmqBase::readFrame(void* buffer, int size) +{ + int rc; + + /* We must *always* read data from the ZMQ socket, + * to make sure that ZMQ internal buffers are emptied + * quickly. It's the only way to control the buffers + * of the whole path from encoder to our frame_buffer. + */ + rc = readFromSocket(size); + + /* Notify of a buffer overrun, and drop some frames */ + if (m_frame_buffer.size() >= m_config.buffer_size) { + m_stats.notifyOverrun(); + + /* If the buffer is really too full, we drop as many frames as needed + * to get down to the prebuffering size. We would like to have our buffer + * filled to the prebuffering length. + */ + if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) { + size_t over_max = m_frame_buffer.size() - m_config.prebuffering; + + while (over_max--) { + delete[] m_frame_buffer.front(); + m_frame_buffer.pop_front(); + } + } + else { + /* Our frame_buffer contains DAB logical frames. Five of these make one + * AAC superframe. + * + * Dropping this superframe amounts to dropping 120ms of audio. + * + * We're actually not sure to drop five DAB logical frames + * belonging to the same AAC superframe. It is assumed that no + * receiver will crash because of this. At least, the DAB logical frame + * vs. AAC superframe alignment is preserved. + * + * TODO: of course this assumption probably doesn't hold. Fix this ! + * TODO: also, with MPEG, the above doesn't hold, so we drop five + * frames even though we could drop less. + * */ + for (int frame_del_count = 0; frame_del_count < 5; frame_del_count++) { + delete[] m_frame_buffer.front(); + m_frame_buffer.pop_front(); + } + } + } + + if (m_prebuf_current > 0) { + if (rc > 0) + m_prebuf_current--; + if (m_prebuf_current == 0) + etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", + m_name.c_str()); + + /* During prebuffering, give a zeroed frame to the mux */ + m_stats.notifyUnderrun(); + memset(buffer, 0, size); + return size; + } + + // Save stats data in bytes, not in frames + m_stats.notifyBuffer(m_frame_buffer.size() * size); + + if (m_frame_buffer.empty()) { + etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", + m_name.c_str()); + // reset prebuffering + m_prebuf_current = m_config.prebuffering; + + /* We have no data to give, we give a zeroed frame */ + m_stats.notifyUnderrun(); + memset(buffer, 0, size); + return size; + } + else + { + /* Normal situation, give a frame from the frame_buffer */ + uint8_t* newframe = m_frame_buffer.front(); + memcpy(buffer, newframe, size); + delete[] newframe; + m_frame_buffer.pop_front(); + return size; + } +} + + +/******** MPEG input *******/ + +// Read a MPEG frame from the socket, and push to list +int ZmqMPEG::readFromSocket(size_t framesize) +{ + bool messageReceived = false; + zmq::message_t msg; + + try { + messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (!messageReceived) { + return 0; + } + + } + catch (zmq::error_t& err) + { + etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " << + m_name << ": " << err.what(); + } + + /* This is the old 'one superframe per ZMQ message' format */ + uint8_t* data = (uint8_t*)msg.data(); + size_t datalen = msg.size(); + + /* Look for the new zmq_frame_header_t format */ + zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); + + if (msg.size() == ZMQ_FRAME_SIZE(frame) && + frame->version == 1 && + frame->encoder == ZMQ_ENCODER_TOOLAME) { + datalen = frame->datasize; + data = ZMQ_FRAME_DATA(frame); + + m_stats.notifyPeakLevels(frame->audiolevel_left, + frame->audiolevel_right); + } + + + if (datalen == framesize) + { + if (m_frame_buffer.size() > m_config.buffer_size) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming frame !"; + messageReceived = false; + } + else if (m_enable_input) { + // copy the input frame blockwise into the frame_buffer + auto framedata = new uint8_t[framesize]; + memcpy(framedata, data, framesize); + m_frame_buffer.push_back(framedata); + } + else { + return 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " verify bitrate: recv'd " << msg.size() << " B" << + ", need " << framesize << "."; + messageReceived = false; + } + + return messageReceived ? msg.size() : 0; +} + +/******** AAC+ input *******/ + +// Read a AAC+ superframe from the socket, cut it into five frames, +// and push to list +int ZmqAAC::readFromSocket(size_t framesize) +{ + bool messageReceived; + zmq::message_t msg; + + try { + messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (!messageReceived) { + return 0; + } + + } + catch (zmq::error_t& err) + { + etiLog.level(error) << + "Failed to receive AAC superframe from zmq socket " << + m_name << ": " << err.what(); + } + + /* This is the old 'one superframe per ZMQ message' format */ + uint8_t* data = (uint8_t*)msg.data(); + size_t datalen = msg.size(); + + /* Look for the new zmq_frame_header_t format */ + zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); + + if (msg.size() == ZMQ_FRAME_SIZE(frame) && + frame->version == 1 && + frame->encoder == ZMQ_ENCODER_FDK) { + datalen = frame->datasize; + data = ZMQ_FRAME_DATA(frame); + + m_stats.notifyPeakLevels(frame->audiolevel_left, + frame->audiolevel_right); + } + + + /* TS 102 563, Section 6: + * Audio super frames are transported in five successive DAB logical frames + * with additional error protection. + */ + if (datalen) + { + if (datalen == 5*framesize) + { + if (m_frame_buffer.size() > m_config.buffer_size) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming superframe !"; + datalen = 0; + } + else if (m_enable_input) { + // copy the input frame blockwise into the frame_buffer + for (uint8_t* framestart = data; + framestart < &data[5*framesize]; + framestart += framesize) { + auto audioframe = new uint8_t[framesize]; + memcpy(audioframe, framestart, framesize); + m_frame_buffer.push_back(audioframe); + } + } + else { + datalen = 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " verify bitrate: recv'd " << msg.size() << " B" << + ", need " << 5*framesize << "."; + + datalen = 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " invalid frame received"; + } + + return datalen; +} + +/********* REMOTE CONTROL ***********/ + +void ZmqBase::set_parameter(const string& parameter, + const string& value) +{ + if (parameter == "buffer") { + size_t new_limit = atol(value.c_str()); + + if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_config.buffer_size = new_limit; + } + else if (parameter == "prebuffering") { + size_t new_prebuf = atol(value.c_str()); + + if (new_prebuf < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired prebuffering too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_prebuf > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired prebuffering too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_config.prebuffering = new_prebuf; + } + else if (parameter == "enable") { + if (value == "1") { + m_enable_input = true; + } + else if (value == "0") { + m_enable_input = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } + } + else if (parameter == "encryption") { + if (value == "1") { + m_config.enable_encryption = true; + } + else if (value == "0") { + m_config.enable_encryption = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } + + try { + rebind(); + } + catch (std::runtime_error &e) { + stringstream ss; + ss << "Could not bind socket again with new keys." << + e.what(); + throw ParameterError(ss.str()); + } + } + else if (parameter == "secretkey") { + m_config.curve_secret_keyfile = value; + } + else if (parameter == "publickey") { + m_config.curve_public_keyfile = value; + } + else if (parameter == "encoderkey") { + m_config.curve_encoder_keyfile = value; + } + else { + stringstream ss; + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } +} + +const string ZmqBase::get_parameter(const string& parameter) const +{ + stringstream ss; + if (parameter == "buffer") { + ss << m_config.buffer_size; + } + else if (parameter == "prebuffering") { + ss << m_config.prebuffering; + } + else if (parameter == "enable") { + if (m_enable_input) + ss << "true"; + else + ss << "false"; + } + else if (parameter == "encryption") { + if (m_config.enable_encryption) + ss << "true"; + else + ss << "false"; + } + else if (parameter == "secretkey") { + ss << m_config.curve_secret_keyfile; + } + else if (parameter == "publickey") { + ss << m_config.curve_public_keyfile; + } + else if (parameter == "encoderkey") { + ss << m_config.curve_encoder_keyfile; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); + +} + +}; + +#endif + diff --git a/src/input/Zmq.h b/src/input/Zmq.h new file mode 100644 index 0000000..d1dd2d5 --- /dev/null +++ b/src/input/Zmq.h @@ -0,0 +1,271 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + ZeroMQ input. see www.zeromq.org for more info + + For the AAC+ input, each zeromq message must contain one superframe, + or one zmq_frame_header_t followed by a superframe. + + For the MPEG input, each zeromq message must contain one frame. + + Encryption is provided by zmq_curve, see the corresponding manpage. + + From the ZeroMQ manpage 'zmq': + + The 0MQ lightweight messaging kernel is a library which extends the standard + socket interfaces with features traditionally provided by specialised + messaging middleware products. 0MQ sockets provide an abstraction of + asynchronous message queues, multiple messaging patterns, message filtering + (subscriptions), seamless access to multiple transport protocols and more. + */ +/* + This file is part of ODR-DabMux. + + It defines a ZeroMQ input for dabplus data. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef HAVE_INPUT_ZEROMQ + +#include <list> +#include <string> +#include <stdint.h> +#include "zmq.hpp" +#include "input/inputs.h" +#include "ManagementServer.h" + +namespace Inputs { + +/* The frame_buffer contains DAB logical frames as defined in + * TS 102 563, section 6. + * Five elements of this buffer make one AAC superframe (120ms audio) + */ + +// Minimum frame_buffer size in number of elements +// This is one AAC superframe, but you probably don't want to +// go that low anyway. +const size_t INPUT_ZMQ_MIN_BUFFER_SIZE = 5*1; // 120ms + +// Maximum frame_buffer size in number of elements +// One minute is clearly way over what everybody would +// want. +const size_t INPUT_ZMQ_MAX_BUFFER_SIZE = 5*500; // 60s + +/* The ZeroMQ Curve key is 40 bytes long in Z85 representation + * + * But we need to store it as zero-terminated string. + */ +const size_t CURVE_KEYLEN = 40; + +/* helper to invalidate a key */ +#define INVALIDATE_KEY(k) memset(k, 0, CURVE_KEYLEN+1) + +/* Verification for key validity */ +#define KEY_VALID(k) (k[0] != '\0') + +/* Read a key from file into key + * + * Returns 0 on success, negative value on failure + */ +int readkey(std::string& keyfile, char* key); + +struct dab_input_zmq_config_t +{ + /* The size of the internal buffer, measured in number + * of elements. + * + * Each element corresponds to five frames, + * or one AAC superframe. + */ + size_t buffer_size; + + /* The amount of prebuffering to do before we start streaming + * + * Same units as buffer_size + */ + size_t prebuffering; + + /* Whether to enforce encryption or not + */ + bool enable_encryption; + + /* Full path to file containing public key. + */ + std::string curve_public_keyfile; + + /* Full path to file containing secret key. + */ + std::string curve_secret_keyfile; + + /* Full path to file containing encoder public key. + */ + std::string curve_encoder_keyfile; +}; + +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +/* This defines the on-wire representation of a ZMQ message header. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ + uint16_t version; // we support version=1 now + uint16_t encoder; // see ZMQ_ENCODER_XYZ + + /* length of the 'data' field */ + uint32_t datasize; + + /* Audio level, peak, linear PCM */ + int16_t audiolevel_left; + int16_t audiolevel_right; + + /* Data follows this header */ +} __attribute__ ((packed)); + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) ) + + +class ZmqBase : public InputBase, public RemoteControllable { + public: + ZmqBase(const std::string name, + dab_input_zmq_config_t config) + : RemoteControllable(name), + m_zmq_context(1), + m_zmq_sock(m_zmq_context, ZMQ_SUB), + m_zmq_sock_bound_to(""), + m_bitrate(0), + m_enable_input(true), + m_config(config), + m_stats(m_name), + m_prebuf_current(config.prebuffering) { + RC_ADD_PARAMETER(enable, + "If the input is enabled. Set to zero to empty the buffer."); + + RC_ADD_PARAMETER(encryption, + "If encryption is enabled or disabled [1 or 0]." + " If 1 is written, the keys are reloaded."); + + RC_ADD_PARAMETER(publickey, + "The multiplexer's public key file."); + + RC_ADD_PARAMETER(secretkey, + "The multiplexer's secret key file."); + + RC_ADD_PARAMETER(encoderkey, + "The encoder's public key file."); + + /* Set all keys to zero */ + INVALIDATE_KEY(m_curve_public_key); + INVALIDATE_KEY(m_curve_secret_key); + INVALIDATE_KEY(m_curve_encoder_key); + } + + virtual int open(const std::string& inputUri); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + /* Remote control */ + virtual void set_parameter(const std::string& parameter, + const std::string& value); + + /* Getting a parameter always returns a string. */ + virtual const std::string get_parameter(const std::string& parameter) const; + + protected: + virtual int readFromSocket(size_t framesize) = 0; + + virtual void rebind(); + + zmq::context_t m_zmq_context; + zmq::socket_t m_zmq_sock; // handle for the zmq socket + + /* If the socket is bound, this saves the endpoint, + * otherwise, it's an empty string + */ + std::string m_zmq_sock_bound_to; + int m_bitrate; + + /* set this to zero to empty the input buffer */ + bool m_enable_input; + + /* stores elements of type char[<superframesize>] */ + std::list<uint8_t*> m_frame_buffer; + + dab_input_zmq_config_t m_config; + + /* Key management, keys need to be zero-terminated */ + char m_curve_public_key[CURVE_KEYLEN+1]; + char m_curve_secret_key[CURVE_KEYLEN+1]; + char m_curve_encoder_key[CURVE_KEYLEN+1]; + + std::string m_inputUri; + + InputStat m_stats; + + private: + size_t m_prebuf_current; +}; + +class ZmqMPEG : public ZmqBase { + public: + ZmqMPEG(const std::string name, + dab_input_zmq_config_t config) + : ZmqBase(name, config) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [mpeg frames]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [mpeg frames]"); + } + + private: + virtual int readFromSocket(size_t framesize); +}; + +class ZmqAAC : public ZmqBase { + public: + ZmqAAC(const std::string name, + dab_input_zmq_config_t config) + : ZmqBase(name, config) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [aac superframes]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [aac superframes]"); + } + + private: + virtual int readFromSocket(size_t framesize); +}; + +}; +#endif // HAVE_INPUT_ZMQ + + |