From 128768f7fd719eb455a946a0f716d7128b4ded63 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 15:29:31 +0100 Subject: Start reworking inputs, break all but Prbs and ZMQ --- src/input/Prbs.cpp | 101 +++++++++ src/input/Prbs.h | 56 +++++ src/input/Zmq.cpp | 625 +++++++++++++++++++++++++++++++++++++++++++++++++++++ src/input/Zmq.h | 271 +++++++++++++++++++++++ 4 files changed, 1053 insertions(+) create mode 100644 src/input/Prbs.cpp create mode 100644 src/input/Prbs.h create mode 100644 src/input/Zmq.cpp create mode 100644 src/input/Zmq.h (limited to 'src/input') diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp new file mode 100644 index 0000000..b9e244b --- /dev/null +++ b/src/input/Prbs.cpp @@ -0,0 +1,101 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + Pseudo-Random Bit Sequence generator for test purposes. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Prbs.h" + +#include +#include +#include +#include +#include +#include +#include "utils.h" + +using namespace std; + +namespace Inputs { + +// ETS 300 799 Clause G.2.1 +// Preferred polynomial is G(x) = x^20 + x^17 + 1 +const uint32_t PRBS_DEFAULT_POLY = (1 << 19) | (1 << 16) | 1; + +int Prbs::open(const string& name) +{ + if (name.empty()) { + m_prbs.setup(PRBS_DEFAULT_POLY); + } + else { + if (name[0] != ':') { + throw invalid_argument( + "Invalid PRBS address format. " + "Must be prbs://:polynomial."); + } + + const string poly_str = name.substr(1); + + long polynomial = hexparse(poly_str); + + if (polynomial == 0) { + throw invalid_argument("No polynomial given for PRBS input"); + } + + m_prbs.setup(polynomial); + } + rewind(); + + return 0; +} + +int Prbs::readFrame(void* buffer, int size) +{ + unsigned char* cbuffer = reinterpret_cast(buffer); + + for (int i = 0; i < size; ++i) { + cbuffer[i] = m_prbs.step(); + } + + return size; +} + +int Prbs::setBitrate(int bitrate) +{ + return bitrate; +} + +int Prbs::close() +{ + return 0; +} + +int Prbs::rewind() +{ + m_prbs.rewind(); + return 0; +} + +}; diff --git a/src/input/Prbs.h b/src/input/Prbs.h new file mode 100644 index 0000000..47b52ad --- /dev/null +++ b/src/input/Prbs.h @@ -0,0 +1,56 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + + Pseudo-Random Bit Sequence generator for test purposes. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include + +#include "input/inputs.h" +#include "prbs.h" + +namespace Inputs { + +class Prbs : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + private: + virtual int rewind(); + + PrbsGenerator m_prbs; +}; + +}; + diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp new file mode 100644 index 0000000..6ef5fce --- /dev/null +++ b/src/input/Zmq.cpp @@ -0,0 +1,625 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + ZeroMQ input. see www.zeromq.org for more info + + For the AAC+ input, each zeromq message must contain one superframe + or one zmq_frame_header_t followed by a superframe. + + For the MPEG input, each zeromq message must contain one frame. + + Encryption is provided by zmq_curve, see the corresponding manpage. + + From the ZeroMQ manpage 'zmq': + + The 0MQ lightweight messaging kernel is a library which extends the standard + socket interfaces with features traditionally provided by specialised + messaging middleware products. 0MQ sockets provide an abstraction of + asynchronous message queues, multiple messaging patterns, message filtering + (subscriptions), seamless access to multiple transport protocols and more. + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Zmq.h" + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef HAVE_INPUT_ZEROMQ + +#include "zmq.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include "PcDebug.h" +#include "Log.h" + +#ifdef __MINGW32__ +# define bzero(s, n) memset(s, 0, n) +#endif + +namespace Inputs { + +using namespace std; + +int readkey(string& keyfile, char* key) +{ + FILE* fd = fopen(keyfile.c_str(), "r"); + if (fd == nullptr) { + return -1; + } + + int ret = fread(key, CURVE_KEYLEN, 1, fd); + fclose(fd); + if (ret == 0) { + return -1; + } + + /* It needs to be zero-terminated */ + key[CURVE_KEYLEN] = '\0'; + + return 0; +} + +/***** Common functions (MPEG and AAC) ******/ + +/* If necessary, unbind the socket, then check the keys, + * if they are ok and encryption is required, set the + * keys to the socket, and finally bind the socket + * to the new address + */ +void ZmqBase::rebind() +{ + if (! m_zmq_sock_bound_to.empty()) { + try { + m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str()); + } + catch (zmq::error_t& err) { + etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed"; + } + } + + m_zmq_sock_bound_to = ""; + + /* Load each key independently */ + if (! m_config.curve_public_keyfile.empty()) { + int rc = readkey(m_config.curve_public_keyfile, m_curve_public_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid public key for input " << + m_name; + + INVALIDATE_KEY(m_curve_public_key); + } + } + + if (! m_config.curve_secret_keyfile.empty()) { + int rc = readkey(m_config.curve_secret_keyfile, m_curve_secret_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid secret key for input " << + m_name; + + INVALIDATE_KEY(m_curve_secret_key); + } + } + + if (! m_config.curve_encoder_keyfile.empty()) { + int rc = readkey(m_config.curve_encoder_keyfile, m_curve_encoder_key); + + if (rc < 0) { + etiLog.level(warn) << "Invalid encoder key for input " << + m_name; + + INVALIDATE_KEY(m_curve_encoder_key); + } + } + + /* If you want encryption, you need to have defined all + * key files + */ + if ( m_config.enable_encryption && + ( ! (KEY_VALID(m_curve_public_key) && + KEY_VALID(m_curve_secret_key) && + KEY_VALID(m_curve_encoder_key) ) ) ) { + throw std::runtime_error("When enabling encryption, all three " + "keyfiles must be valid!"); + } + + if (m_config.enable_encryption) { + try { + /* We want to check that the encoder is the right one, + * so the encoder is the CURVE server. + */ + m_zmq_sock.setsockopt(ZMQ_CURVE_SERVERKEY, + m_curve_encoder_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set encoder key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + try { + m_zmq_sock.setsockopt(ZMQ_CURVE_PUBLICKEY, + m_curve_public_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set public key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + try { + m_zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, + m_curve_secret_key, CURVE_KEYLEN); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set secret key for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + } + else { + try { + /* This forces the socket to go to the ZMQ_NULL auth + * mechanism + */ + const int no = 0; + m_zmq_sock.setsockopt(ZMQ_CURVE_SERVER, &no, sizeof(no)); + } + catch (zmq::error_t& err) { + etiLog.level(warn) << "ZMQ disable encryption keys for input " << + m_name << " failed: " << err.what(); + } + + } + + // Prepare the ZMQ socket to accept connections + try { + m_zmq_sock.bind(m_inputUri.c_str()); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ bind for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } + + m_zmq_sock_bound_to = m_inputUri; + + try { + m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0); + } + catch (zmq::error_t& err) { + std::ostringstream os; + os << "ZMQ set socket options for input " << m_name << " failed" << + err.what(); + throw std::runtime_error(os.str()); + } +} + +int ZmqBase::open(const std::string& inputUri) +{ + m_inputUri = inputUri; + + /* Let caller handle exceptions when we open() */ + rebind(); + + // We want to appear in the statistics ! + m_stats.registerAtServer(); + + return 0; +} + +int ZmqBase::close() +{ + m_zmq_sock.close(); + return 0; +} + +int ZmqBase::setBitrate(int bitrate) +{ + m_bitrate = bitrate; + return bitrate; // TODO do a nice check here +} + +// size corresponds to a frame size. It is constant for a given bitrate +int ZmqBase::readFrame(void* buffer, int size) +{ + int rc; + + /* We must *always* read data from the ZMQ socket, + * to make sure that ZMQ internal buffers are emptied + * quickly. It's the only way to control the buffers + * of the whole path from encoder to our frame_buffer. + */ + rc = readFromSocket(size); + + /* Notify of a buffer overrun, and drop some frames */ + if (m_frame_buffer.size() >= m_config.buffer_size) { + m_stats.notifyOverrun(); + + /* If the buffer is really too full, we drop as many frames as needed + * to get down to the prebuffering size. We would like to have our buffer + * filled to the prebuffering length. + */ + if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) { + size_t over_max = m_frame_buffer.size() - m_config.prebuffering; + + while (over_max--) { + delete[] m_frame_buffer.front(); + m_frame_buffer.pop_front(); + } + } + else { + /* Our frame_buffer contains DAB logical frames. Five of these make one + * AAC superframe. + * + * Dropping this superframe amounts to dropping 120ms of audio. + * + * We're actually not sure to drop five DAB logical frames + * belonging to the same AAC superframe. It is assumed that no + * receiver will crash because of this. At least, the DAB logical frame + * vs. AAC superframe alignment is preserved. + * + * TODO: of course this assumption probably doesn't hold. Fix this ! + * TODO: also, with MPEG, the above doesn't hold, so we drop five + * frames even though we could drop less. + * */ + for (int frame_del_count = 0; frame_del_count < 5; frame_del_count++) { + delete[] m_frame_buffer.front(); + m_frame_buffer.pop_front(); + } + } + } + + if (m_prebuf_current > 0) { + if (rc > 0) + m_prebuf_current--; + if (m_prebuf_current == 0) + etiLog.log(info, "inputZMQ %s input pre-buffering complete\n", + m_name.c_str()); + + /* During prebuffering, give a zeroed frame to the mux */ + m_stats.notifyUnderrun(); + memset(buffer, 0, size); + return size; + } + + // Save stats data in bytes, not in frames + m_stats.notifyBuffer(m_frame_buffer.size() * size); + + if (m_frame_buffer.empty()) { + etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n", + m_name.c_str()); + // reset prebuffering + m_prebuf_current = m_config.prebuffering; + + /* We have no data to give, we give a zeroed frame */ + m_stats.notifyUnderrun(); + memset(buffer, 0, size); + return size; + } + else + { + /* Normal situation, give a frame from the frame_buffer */ + uint8_t* newframe = m_frame_buffer.front(); + memcpy(buffer, newframe, size); + delete[] newframe; + m_frame_buffer.pop_front(); + return size; + } +} + + +/******** MPEG input *******/ + +// Read a MPEG frame from the socket, and push to list +int ZmqMPEG::readFromSocket(size_t framesize) +{ + bool messageReceived = false; + zmq::message_t msg; + + try { + messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (!messageReceived) { + return 0; + } + + } + catch (zmq::error_t& err) + { + etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " << + m_name << ": " << err.what(); + } + + /* This is the old 'one superframe per ZMQ message' format */ + uint8_t* data = (uint8_t*)msg.data(); + size_t datalen = msg.size(); + + /* Look for the new zmq_frame_header_t format */ + zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); + + if (msg.size() == ZMQ_FRAME_SIZE(frame) && + frame->version == 1 && + frame->encoder == ZMQ_ENCODER_TOOLAME) { + datalen = frame->datasize; + data = ZMQ_FRAME_DATA(frame); + + m_stats.notifyPeakLevels(frame->audiolevel_left, + frame->audiolevel_right); + } + + + if (datalen == framesize) + { + if (m_frame_buffer.size() > m_config.buffer_size) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming frame !"; + messageReceived = false; + } + else if (m_enable_input) { + // copy the input frame blockwise into the frame_buffer + auto framedata = new uint8_t[framesize]; + memcpy(framedata, data, framesize); + m_frame_buffer.push_back(framedata); + } + else { + return 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " verify bitrate: recv'd " << msg.size() << " B" << + ", need " << framesize << "."; + messageReceived = false; + } + + return messageReceived ? msg.size() : 0; +} + +/******** AAC+ input *******/ + +// Read a AAC+ superframe from the socket, cut it into five frames, +// and push to list +int ZmqAAC::readFromSocket(size_t framesize) +{ + bool messageReceived; + zmq::message_t msg; + + try { + messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (!messageReceived) { + return 0; + } + + } + catch (zmq::error_t& err) + { + etiLog.level(error) << + "Failed to receive AAC superframe from zmq socket " << + m_name << ": " << err.what(); + } + + /* This is the old 'one superframe per ZMQ message' format */ + uint8_t* data = (uint8_t*)msg.data(); + size_t datalen = msg.size(); + + /* Look for the new zmq_frame_header_t format */ + zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data(); + + if (msg.size() == ZMQ_FRAME_SIZE(frame) && + frame->version == 1 && + frame->encoder == ZMQ_ENCODER_FDK) { + datalen = frame->datasize; + data = ZMQ_FRAME_DATA(frame); + + m_stats.notifyPeakLevels(frame->audiolevel_left, + frame->audiolevel_right); + } + + + /* TS 102 563, Section 6: + * Audio super frames are transported in five successive DAB logical frames + * with additional error protection. + */ + if (datalen) + { + if (datalen == 5*framesize) + { + if (m_frame_buffer.size() > m_config.buffer_size) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming superframe !"; + datalen = 0; + } + else if (m_enable_input) { + // copy the input frame blockwise into the frame_buffer + for (uint8_t* framestart = data; + framestart < &data[5*framesize]; + framestart += framesize) { + auto audioframe = new uint8_t[framesize]; + memcpy(audioframe, framestart, framesize); + m_frame_buffer.push_back(audioframe); + } + } + else { + datalen = 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " verify bitrate: recv'd " << msg.size() << " B" << + ", need " << 5*framesize << "."; + + datalen = 0; + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " invalid frame received"; + } + + return datalen; +} + +/********* REMOTE CONTROL ***********/ + +void ZmqBase::set_parameter(const string& parameter, + const string& value) +{ + if (parameter == "buffer") { + size_t new_limit = atol(value.c_str()); + + if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired buffer size too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_config.buffer_size = new_limit; + } + else if (parameter == "prebuffering") { + size_t new_prebuf = atol(value.c_str()); + + if (new_prebuf < INPUT_ZMQ_MIN_BUFFER_SIZE) { + throw ParameterError("Desired prebuffering too small." + " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) ); + } + else if (new_prebuf > INPUT_ZMQ_MAX_BUFFER_SIZE) { + throw ParameterError("Desired prebuffering too large." + " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) ); + } + + m_config.prebuffering = new_prebuf; + } + else if (parameter == "enable") { + if (value == "1") { + m_enable_input = true; + } + else if (value == "0") { + m_enable_input = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } + } + else if (parameter == "encryption") { + if (value == "1") { + m_config.enable_encryption = true; + } + else if (value == "0") { + m_config.enable_encryption = false; + } + else { + throw ParameterError("Value not understood, specify 0 or 1."); + } + + try { + rebind(); + } + catch (std::runtime_error &e) { + stringstream ss; + ss << "Could not bind socket again with new keys." << + e.what(); + throw ParameterError(ss.str()); + } + } + else if (parameter == "secretkey") { + m_config.curve_secret_keyfile = value; + } + else if (parameter == "publickey") { + m_config.curve_public_keyfile = value; + } + else if (parameter == "encoderkey") { + m_config.curve_encoder_keyfile = value; + } + else { + stringstream ss; + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } +} + +const string ZmqBase::get_parameter(const string& parameter) const +{ + stringstream ss; + if (parameter == "buffer") { + ss << m_config.buffer_size; + } + else if (parameter == "prebuffering") { + ss << m_config.prebuffering; + } + else if (parameter == "enable") { + if (m_enable_input) + ss << "true"; + else + ss << "false"; + } + else if (parameter == "encryption") { + if (m_config.enable_encryption) + ss << "true"; + else + ss << "false"; + } + else if (parameter == "secretkey") { + ss << m_config.curve_secret_keyfile; + } + else if (parameter == "publickey") { + ss << m_config.curve_public_keyfile; + } + else if (parameter == "encoderkey") { + ss << m_config.curve_encoder_keyfile; + } + else { + ss << "Parameter '" << parameter << + "' is not exported by controllable " << get_rc_name(); + throw ParameterError(ss.str()); + } + return ss.str(); + +} + +}; + +#endif + diff --git a/src/input/Zmq.h b/src/input/Zmq.h new file mode 100644 index 0000000..d1dd2d5 --- /dev/null +++ b/src/input/Zmq.h @@ -0,0 +1,271 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + ZeroMQ input. see www.zeromq.org for more info + + For the AAC+ input, each zeromq message must contain one superframe, + or one zmq_frame_header_t followed by a superframe. + + For the MPEG input, each zeromq message must contain one frame. + + Encryption is provided by zmq_curve, see the corresponding manpage. + + From the ZeroMQ manpage 'zmq': + + The 0MQ lightweight messaging kernel is a library which extends the standard + socket interfaces with features traditionally provided by specialised + messaging middleware products. 0MQ sockets provide an abstraction of + asynchronous message queues, multiple messaging patterns, message filtering + (subscriptions), seamless access to multiple transport protocols and more. + */ +/* + This file is part of ODR-DabMux. + + It defines a ZeroMQ input for dabplus data. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#ifdef HAVE_INPUT_ZEROMQ + +#include +#include +#include +#include "zmq.hpp" +#include "input/inputs.h" +#include "ManagementServer.h" + +namespace Inputs { + +/* The frame_buffer contains DAB logical frames as defined in + * TS 102 563, section 6. + * Five elements of this buffer make one AAC superframe (120ms audio) + */ + +// Minimum frame_buffer size in number of elements +// This is one AAC superframe, but you probably don't want to +// go that low anyway. +const size_t INPUT_ZMQ_MIN_BUFFER_SIZE = 5*1; // 120ms + +// Maximum frame_buffer size in number of elements +// One minute is clearly way over what everybody would +// want. +const size_t INPUT_ZMQ_MAX_BUFFER_SIZE = 5*500; // 60s + +/* The ZeroMQ Curve key is 40 bytes long in Z85 representation + * + * But we need to store it as zero-terminated string. + */ +const size_t CURVE_KEYLEN = 40; + +/* helper to invalidate a key */ +#define INVALIDATE_KEY(k) memset(k, 0, CURVE_KEYLEN+1) + +/* Verification for key validity */ +#define KEY_VALID(k) (k[0] != '\0') + +/* Read a key from file into key + * + * Returns 0 on success, negative value on failure + */ +int readkey(std::string& keyfile, char* key); + +struct dab_input_zmq_config_t +{ + /* The size of the internal buffer, measured in number + * of elements. + * + * Each element corresponds to five frames, + * or one AAC superframe. + */ + size_t buffer_size; + + /* The amount of prebuffering to do before we start streaming + * + * Same units as buffer_size + */ + size_t prebuffering; + + /* Whether to enforce encryption or not + */ + bool enable_encryption; + + /* Full path to file containing public key. + */ + std::string curve_public_keyfile; + + /* Full path to file containing secret key. + */ + std::string curve_secret_keyfile; + + /* Full path to file containing encoder public key. + */ + std::string curve_encoder_keyfile; +}; + +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +/* This defines the on-wire representation of a ZMQ message header. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ + uint16_t version; // we support version=1 now + uint16_t encoder; // see ZMQ_ENCODER_XYZ + + /* length of the 'data' field */ + uint32_t datasize; + + /* Audio level, peak, linear PCM */ + int16_t audiolevel_left; + int16_t audiolevel_right; + + /* Data follows this header */ +} __attribute__ ((packed)); + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) ) + + +class ZmqBase : public InputBase, public RemoteControllable { + public: + ZmqBase(const std::string name, + dab_input_zmq_config_t config) + : RemoteControllable(name), + m_zmq_context(1), + m_zmq_sock(m_zmq_context, ZMQ_SUB), + m_zmq_sock_bound_to(""), + m_bitrate(0), + m_enable_input(true), + m_config(config), + m_stats(m_name), + m_prebuf_current(config.prebuffering) { + RC_ADD_PARAMETER(enable, + "If the input is enabled. Set to zero to empty the buffer."); + + RC_ADD_PARAMETER(encryption, + "If encryption is enabled or disabled [1 or 0]." + " If 1 is written, the keys are reloaded."); + + RC_ADD_PARAMETER(publickey, + "The multiplexer's public key file."); + + RC_ADD_PARAMETER(secretkey, + "The multiplexer's secret key file."); + + RC_ADD_PARAMETER(encoderkey, + "The encoder's public key file."); + + /* Set all keys to zero */ + INVALIDATE_KEY(m_curve_public_key); + INVALIDATE_KEY(m_curve_secret_key); + INVALIDATE_KEY(m_curve_encoder_key); + } + + virtual int open(const std::string& inputUri); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + /* Remote control */ + virtual void set_parameter(const std::string& parameter, + const std::string& value); + + /* Getting a parameter always returns a string. */ + virtual const std::string get_parameter(const std::string& parameter) const; + + protected: + virtual int readFromSocket(size_t framesize) = 0; + + virtual void rebind(); + + zmq::context_t m_zmq_context; + zmq::socket_t m_zmq_sock; // handle for the zmq socket + + /* If the socket is bound, this saves the endpoint, + * otherwise, it's an empty string + */ + std::string m_zmq_sock_bound_to; + int m_bitrate; + + /* set this to zero to empty the input buffer */ + bool m_enable_input; + + /* stores elements of type char[] */ + std::list m_frame_buffer; + + dab_input_zmq_config_t m_config; + + /* Key management, keys need to be zero-terminated */ + char m_curve_public_key[CURVE_KEYLEN+1]; + char m_curve_secret_key[CURVE_KEYLEN+1]; + char m_curve_encoder_key[CURVE_KEYLEN+1]; + + std::string m_inputUri; + + InputStat m_stats; + + private: + size_t m_prebuf_current; +}; + +class ZmqMPEG : public ZmqBase { + public: + ZmqMPEG(const std::string name, + dab_input_zmq_config_t config) + : ZmqBase(name, config) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [mpeg frames]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [mpeg frames]"); + } + + private: + virtual int readFromSocket(size_t framesize); +}; + +class ZmqAAC : public ZmqBase { + public: + ZmqAAC(const std::string name, + dab_input_zmq_config_t config) + : ZmqBase(name, config) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [aac superframes]"); + + RC_ADD_PARAMETER(prebuffering, + "Min buffer level before streaming starts [aac superframes]"); + } + + private: + virtual int readFromSocket(size_t framesize); +}; + +}; +#endif // HAVE_INPUT_ZMQ + + -- cgit v1.2.3 From 804fe1979f9ed7bef7badaf0aa08e35e09874772 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 16:28:27 +0100 Subject: Add rudimentary file input No nonblock support yet --- src/ConfigParser.cpp | 11 +++ src/Makefile.am | 1 + src/dabInputMpegFile.cpp | 31 ------- src/input/File.cpp | 227 +++++++++++++++++++++++++++++++++++++++++++++++ src/input/File.h | 69 ++++++++++++++ src/input/Prbs.h | 4 - src/input/Zmq.cpp | 9 -- src/input/Zmq.h | 7 -- src/input/inputs.h | 52 +++++++++++ src/mpeg.c | 26 ++++++ src/mpeg.h | 7 ++ 11 files changed, 393 insertions(+), 51 deletions(-) create mode 100644 src/input/File.cpp create mode 100644 src/input/File.h create mode 100644 src/input/inputs.h (limited to 'src/input') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index bdc2099..2a8d3da 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -55,6 +55,7 @@ #include "input/Prbs.h" #include "input/Zmq.h" +#include "input/File.h" #ifdef _WIN32 @@ -930,6 +931,16 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, if (nonblock) { // TODO } + + if (type == "audio") { + subchan->input = make_shared(); + } + else if (type == "dabplus") { + subchan->input = make_shared(); + } + else { + throw logic_error("Incomplete handling of file input"); + } } else if (proto == "tcp" || proto == "epgm" || diff --git a/src/Makefile.am b/src/Makefile.am index b8de4e8..b356566 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -50,6 +50,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ input/inputs.h \ input/Prbs.cpp input/Prbs.h \ input/Zmq.cpp input/Zmq.h \ + input/File.cpp input/File.h \ dabOutput/dabOutput.h \ dabOutput/dabOutputFile.cpp \ dabOutput/dabOutputFifo.cpp \ diff --git a/src/dabInputMpegFile.cpp b/src/dabInputMpegFile.cpp index 804ea29..6f24f32 100644 --- a/src/dabInputMpegFile.cpp +++ b/src/dabInputMpegFile.cpp @@ -47,37 +47,6 @@ struct dabInputOperations dabInputMpegFileOperations = { }; -#define MPEG_FREQUENCY -2 -#define MPEG_PADDING -3 -#define MPEG_COPYRIGHT -4 -#define MPEG_ORIGINAL -5 -#define MPEG_EMPHASIS -6 -int checkDabMpegFrame(void* data) { - mpegHeader* header = (mpegHeader*)data; - unsigned long* headerData = (unsigned long*)data; - if ((*headerData & 0x0f0ffcff) == 0x0004fcff) return 0; - if ((*headerData & 0x0f0ffcff) == 0x0004f4ff) return 0; - if (getMpegFrequency(header) != 48000) { - if (getMpegFrequency(header) != 24000) { - return MPEG_FREQUENCY; - } - } - if (header->padding != 0) { - return MPEG_PADDING; - } - if (header->copyright != 0) { - return MPEG_COPYRIGHT; - } - if (header->original != 0) { - return MPEG_ORIGINAL; - } - if (header->emphasis != 0) { - return MPEG_EMPHASIS; - } - return -1; -} - - int dabInputMpegFileRead(dabInputOperations* ops, void* args, void* buffer, int size) { dabInputFileData* data = (dabInputFileData*)args; diff --git a/src/input/File.cpp b/src/input/File.cpp new file mode 100644 index 0000000..9721b97 --- /dev/null +++ b/src/input/File.cpp @@ -0,0 +1,227 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include +#include +#include +#include +#include +#include +#ifndef _WIN32 +# define O_BINARY 0 +#endif + +#include "input/File.h" +#include "mpeg.h" + +namespace Inputs { + +int FileBase::open(const std::string& name) +{ + m_fd = ::open(name.c_str(), O_RDONLY | O_BINARY); + if (m_fd == -1) { + std::stringstream ss; + ss << "Could not open input file " << name << ": " << + strerror(errno); + throw std::runtime_error(ss.str()); + } + return 0; +} + +int FileBase::close() +{ + if (m_fd != -1) { + ::close(m_fd); + m_fd = -1; + } + return 0; +} + +int FileBase::rewind() +{ + return ::lseek(m_fd, 0, SEEK_SET); +} + +int MPEGFile::readFrame(void* buffer, int size) +{ + int result; + bool do_rewind = false; +READ_SUBCHANNEL: + if (m_parity) { + result = readData(m_fd, buffer, size, 2); + m_parity = false; + return 0; + } else { + result = readMpegHeader(m_fd, buffer, size); + if (result > 0) { + result = readMpegFrame(m_fd, buffer, size); + if (result < 0 && getMpegFrequency(buffer) == 24000) { + m_parity = true; + result = size; + } + } + } + switch (result) { + case MPEG_BUFFER_UNDERFLOW: + etiLog.log(warn, "data underflow -> frame muted\n"); + goto MUTE_SUBCHANNEL; + case MPEG_BUFFER_OVERFLOW: + etiLog.log(warn, "bitrate too high -> frame muted\n"); + goto MUTE_SUBCHANNEL; + case MPEG_FILE_EMPTY: + if (do_rewind) { + etiLog.log(error, "file rewinded and still empty " + "-> frame muted\n"); + goto MUTE_SUBCHANNEL; + } + else { + etiLog.log(info, "reach end of file -> rewinding\n"); + rewind(); + goto READ_SUBCHANNEL; + } + case MPEG_FILE_ERROR: + etiLog.log(alert, "can't read file (%i) -> frame muted\n", errno); + perror(""); + goto MUTE_SUBCHANNEL; + case MPEG_SYNC_NOT_FOUND: + etiLog.log(alert, "mpeg sync not found, maybe is not a valid file " + "-> frame muted\n"); + goto MUTE_SUBCHANNEL; + case MPEG_INVALID_FRAME: + etiLog.log(alert, "file is not a valid mpeg file " + "-> frame muted\n"); + goto MUTE_SUBCHANNEL; + default: + if (result < 0) { + etiLog.log(alert, + "unknown error (code = %i) -> frame muted\n", + result); +MUTE_SUBCHANNEL: + memset(buffer, 0, size); + } + else { + if (result < size) { + etiLog.log(warn, "bitrate too low from file " + "-> frame padded\n"); + memset((char*)buffer + result, 0, size - result); + } + + result = checkDabMpegFrame(buffer); + switch (result) { + case MPEG_FREQUENCY: + etiLog.log(error, "file has a frame with an invalid " + "frequency: %i, should be 48000 or 24000\n", + getMpegFrequency(buffer)); + break; + case MPEG_PADDING: + etiLog.log(warn, + "file has a frame with padding bit set\n"); + break; + case MPEG_COPYRIGHT: + result = 0; + break; + case MPEG_ORIGINAL: + result = 0; + break; + case MPEG_EMPHASIS: + etiLog.log(warn, + "file has a frame with emphasis bits set\n"); + break; + default: + if (result < 0) { + etiLog.log(alert, "mpeg file has an invalid DAB " + "mpeg frame (unknown reason: %i)\n", result); + } + break; + } + } + } + return result; +} + +int MPEGFile::setBitrate(int bitrate) +{ + if (bitrate == 0) { + char buffer[4]; + + if (readFrame(buffer, 4) == 0) { + bitrate = getMpegBitrate(buffer); + } + else { + bitrate = -1; + } + rewind(); + } + return bitrate; +} + + +int DABPlusFile::readFrame(void* buffer, int size) +{ + uint8_t* dataOut = reinterpret_cast(buffer); + + ssize_t ret = read(m_fd, dataOut, size); + + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < size) { + ssize_t sizeOut = ret; + etiLog.log(info, "reach end of file -> rewinding\n"); + if (rewind() == -1) { + etiLog.log(alert, "ERROR: Can't rewind file\n"); + return -1; + } + + ret = read(m_fd, dataOut + sizeOut, size - sizeOut); + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < size) { + etiLog.log(alert, "ERROR: Not enough data in file\n"); + return -1; + } + } + + return size; +} + +int DABPlusFile::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + return -1; + } + + return bitrate; +} + +}; diff --git a/src/input/File.h b/src/input/File.h new file mode 100644 index 0000000..61be8b1 --- /dev/null +++ b/src/input/File.h @@ -0,0 +1,69 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 Matthias P. Braendli + http://www.opendigitalradio.org + + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include +#include +#include +#include "input/inputs.h" +#include "ManagementServer.h" + +namespace Inputs { + +class FileBase : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size) = 0; + virtual int setBitrate(int bitrate) = 0; + virtual int close(); + + /* Rewind the file + * Returns -1 on failure, 0 on success + */ + virtual int rewind(); + protected: + // We use unix open() instead of fopen() because + // we want to do non-blocking I/O + int m_fd = -1; +}; + +class MPEGFile : public FileBase { + public: + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + + private: + bool m_parity = false; +}; + +class DABPlusFile : public FileBase { + public: + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); +}; + + +}; diff --git a/src/input/Prbs.h b/src/input/Prbs.h index 47b52ad..1ad5047 100644 --- a/src/input/Prbs.h +++ b/src/input/Prbs.h @@ -28,10 +28,6 @@ #pragma once -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - #include #include "input/inputs.h" diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 6ef5fce..985fad3 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -41,13 +41,6 @@ #include "input/Zmq.h" -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#ifdef HAVE_INPUT_ZEROMQ - -#include "zmq.hpp" #include #include #include @@ -621,5 +614,3 @@ const string ZmqBase::get_parameter(const string& parameter) const }; -#endif - diff --git a/src/input/Zmq.h b/src/input/Zmq.h index d1dd2d5..02fce3a 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -43,12 +43,6 @@ #pragma once -#ifdef HAVE_CONFIG_H -# include "config.h" -#endif - -#ifdef HAVE_INPUT_ZEROMQ - #include #include #include @@ -266,6 +260,5 @@ class ZmqAAC : public ZmqBase { }; }; -#endif // HAVE_INPUT_ZMQ diff --git a/src/input/inputs.h b/src/input/inputs.h new file mode 100644 index 0000000..3bc1fa4 --- /dev/null +++ b/src/input/inputs.h @@ -0,0 +1,52 @@ +/* + Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif +#include "Log.h" +#include "RemoteControl.h" +#include + +namespace Inputs { + +/* New input object base */ +class InputBase { + public: + virtual int open(const std::string& name) = 0; + virtual int readFrame(void* buffer, int size) = 0; + virtual int setBitrate(int bitrate) = 0; + virtual int close() = 0; + + virtual ~InputBase() {} + protected: + InputBase() {} +}; + +}; + diff --git a/src/mpeg.c b/src/mpeg.c index f7aed34..fb6591a 100644 --- a/src/mpeg.c +++ b/src/mpeg.c @@ -219,3 +219,29 @@ int readMpegFrame(int file, void* data, int size) } return framelength; } + +int checkDabMpegFrame(void* data) { + mpegHeader* header = (mpegHeader*)data; + unsigned long* headerData = (unsigned long*)data; + if ((*headerData & 0x0f0ffcff) == 0x0004fcff) return 0; + if ((*headerData & 0x0f0ffcff) == 0x0004f4ff) return 0; + if (getMpegFrequency(header) != 48000) { + if (getMpegFrequency(header) != 24000) { + return MPEG_FREQUENCY; + } + } + if (header->padding != 0) { + return MPEG_PADDING; + } + if (header->copyright != 0) { + return MPEG_COPYRIGHT; + } + if (header->original != 0) { + return MPEG_ORIGINAL; + } + if (header->emphasis != 0) { + return MPEG_EMPHASIS; + } + return -1; +} + diff --git a/src/mpeg.h b/src/mpeg.h index aa7cfb6..15b9b80 100644 --- a/src/mpeg.h +++ b/src/mpeg.h @@ -75,6 +75,13 @@ ssize_t readData(int file, void* data, size_t size, unsigned int tries); int readMpegHeader(int file, void* data, int size); int readMpegFrame(int file, void* data, int size); +#define MPEG_FREQUENCY -2 +#define MPEG_PADDING -3 +#define MPEG_COPYRIGHT -4 +#define MPEG_ORIGINAL -5 +#define MPEG_EMPHASIS -6 +int checkDabMpegFrame(void* data); + #ifdef __cplusplus } #endif -- cgit v1.2.3 From 7068a697b235f1ae05bc1a5cf93e7eeefbe7a1df Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sun, 30 Oct 2016 20:15:35 +0100 Subject: Fix PRBS url parsing --- src/input/Prbs.cpp | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) (limited to 'src/input') diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp index b9e244b..8e5a9ae 100644 --- a/src/input/Prbs.cpp +++ b/src/input/Prbs.cpp @@ -46,17 +46,23 @@ const uint32_t PRBS_DEFAULT_POLY = (1 << 19) | (1 << 16) | 1; int Prbs::open(const string& name) { - if (name.empty()) { + if (name.substr(0, 7) != "prbs://") { + throw logic_error("Invalid PRBS name"); + } + + const string& url_polynomial = name.substr(7); + + if (url_polynomial.empty()) { m_prbs.setup(PRBS_DEFAULT_POLY); } else { - if (name[0] != ':') { + if (url_polynomial[0] != ':') { throw invalid_argument( "Invalid PRBS address format. " "Must be prbs://:polynomial."); } - const string poly_str = name.substr(1); + const string poly_str = url_polynomial.substr(1); long polynomial = hexparse(poly_str); -- cgit v1.2.3 From 51491533a312884862849082b3507e49c1829d22 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 14:27:50 +0100 Subject: Add new UDP input --- src/ConfigParser.cpp | 30 +++++++++--- src/Makefile.am | 1 + src/UdpSocket.cpp | 18 +++++-- src/UdpSocket.h | 10 +++- src/input/Udp.cpp | 134 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/input/Udp.h | 52 ++++++++++++++++++++ 6 files changed, 232 insertions(+), 13 deletions(-) create mode 100644 src/input/Udp.cpp create mode 100644 src/input/Udp.h (limited to 'src/input') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 2a8d3da..e48200a 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -56,6 +56,7 @@ #include "input/Prbs.h" #include "input/Zmq.h" #include "input/File.h" +#include "input/Udp.h" #ifdef _WIN32 @@ -922,16 +923,15 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, dabProtection* protection = &subchan->protection; const bool nonblock = pt.get("nonblock", false); + if (nonblock) { + etiLog.level(warn) << "The nonblock option is not supported"; + } if (type == "dabplus" or type == "audio") { subchan->type = subchannel_type_t::Audio; subchan->bitrate = 0; if (proto == "file") { - if (nonblock) { - // TODO - } - if (type == "audio") { subchan->input = make_shared(); } @@ -946,10 +946,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, proto == "epgm" || proto == "ipc") { - if (nonblock) { - etiLog.level(warn) << "The nonblock option is meaningless for the zmq input"; - } - auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -983,6 +979,24 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->type = subchannel_type_t::DataDmb; subchan->bitrate = DEFAULT_DATA_BITRATE; } + else if (type == "data") { + if (proto == "udp") { + subchan->input = make_shared(); + } else if (proto == "file") { + // TODO + } else if (proto == "fifo") { + // TODO + } else { + stringstream ss; + ss << "Subchannel with uid " << subchanuid << + ": Invalid protocol for data input (" << + proto << ")" << endl; + throw runtime_error(ss.str()); + } + + subchan->type = subchannel_type_t::DataDmb; + subchan->bitrate = DEFAULT_DATA_BITRATE; + } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << " has unknown type!"; diff --git a/src/Makefile.am b/src/Makefile.am index b356566..084cf7b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -51,6 +51,7 @@ odr_dabmux_SOURCES =DabMux.cpp DabMux.h \ input/Prbs.cpp input/Prbs.h \ input/Zmq.cpp input/Zmq.h \ input/File.cpp input/File.h \ + input/Udp.cpp input/Udp.h \ dabOutput/dabOutput.h \ dabOutput/dabOutputFile.cpp \ dabOutput/dabOutputFifo.cpp \ diff --git a/src/UdpSocket.cpp b/src/UdpSocket.cpp index 020e3f5..ccdd7ed 100644 --- a/src/UdpSocket.cpp +++ b/src/UdpSocket.cpp @@ -37,19 +37,19 @@ using namespace std; UdpSocket::UdpSocket() : listenSocket(INVALID_SOCKET) { - init_sock(0, ""); + reinit(0, ""); } UdpSocket::UdpSocket(int port) : listenSocket(INVALID_SOCKET) { - init_sock(port, ""); + reinit(port, ""); } UdpSocket::UdpSocket(int port, const std::string& name) : listenSocket(INVALID_SOCKET) { - init_sock(port, name); + reinit(port, name); } @@ -67,7 +67,7 @@ int UdpSocket::setBlocking(bool block) return 0; } -int UdpSocket::init_sock(int port, const std::string& name) +int UdpSocket::reinit(int port, const std::string& name) { if (listenSocket != INVALID_SOCKET) { ::close(listenSocket); @@ -98,6 +98,16 @@ int UdpSocket::init_sock(int port, const std::string& name) return 0; } +int UdpSocket::close() +{ + if (listenSocket != INVALID_SOCKET) { + ::close(listenSocket); + } + + listenSocket = INVALID_SOCKET; + + return 0; +} UdpSocket::~UdpSocket() { diff --git a/src/UdpSocket.h b/src/UdpSocket.h index 535499e..dfeaac1 100644 --- a/src/UdpSocket.h +++ b/src/UdpSocket.h @@ -80,6 +80,15 @@ class UdpSocket UdpSocket(const UdpSocket& other) = delete; const UdpSocket& operator=(const UdpSocket& other) = delete; + /** reinitialise socket. Close the already open socket, and + * create a new one + */ + int reinit(int port, const std::string& name); + + /** Close the socket + */ + int close(void); + /** Send an UDP packet. * @param packet The UDP packet to be sent. It includes the data and the * destination address @@ -111,7 +120,6 @@ class UdpSocket int setBlocking(bool block); protected: - int init_sock(int port, const std::string& name); /// The address on which the socket is bound. InetAddress address; diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp new file mode 100644 index 0000000..e534a06 --- /dev/null +++ b/src/input/Udp.cpp @@ -0,0 +1,134 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#include "input/Udp.h" + +#include +#include +#include +#include +#include +#include +#include "utils.h" + +using namespace std; + +namespace Inputs { + +int Udp::open(const std::string& name) +{ + // Skip the udp:// part if it is present + const string endpoint = (name.substr(0, 6) == "udp://") ? + name.substr(6) : name; + + // The endpoint should be address:port + const auto colon_pos = endpoint.find_first_of(":"); + if (colon_pos == string::npos) { + stringstream ss; + ss << "'" << name << + " is an invalid format for udp address: " + "expected [udp://]address:port"; + throw invalid_argument(ss.str()); + } + + const auto address = endpoint.substr(0, colon_pos); + const auto port_str = endpoint.substr(colon_pos + 1); + + const long port = strtol(port_str.c_str(), nullptr, 10); + + if ((port == LONG_MIN or port == LONG_MAX) and errno == ERANGE) { + throw out_of_range("udp input: port out of range"); + } + else if (port == 0 and errno != 0) { + stringstream ss; + ss << "udp input port parse error: " << strerror(errno); + throw invalid_argument(ss.str()); + } + + if (port == 0) { + throw out_of_range("can't use port number 0 in udp address"); + } + + if (m_sock.reinit(port, address) == -1) { + stringstream ss; + ss << "Could not init UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + if (m_sock.setBlocking(false) == -1) { + stringstream ss; + ss << "Could not set non-blocking UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + return 0; +} + +int Udp::readFrame(void* buffer, int size) +{ + uint8_t* data = reinterpret_cast(buffer); + + // Regardless of buffer contents, try receiving data. + UdpPacket packet; + int ret = m_sock.receive(packet); + + if (ret == -1) { + stringstream ss; + ss << "Could not read from UDP socket: " << inetErrMsg; + throw runtime_error(ss.str()); + } + + std::copy(packet.getData(), packet.getData() + packet.getSize(), + back_inserter(m_buffer)); + + // Take data from the buffer if it contains enough data, + // in any case write the buffer + if (m_buffer.size() >= (size_t)size) { + std::copy(m_buffer.begin(), m_buffer.begin() + size, data); + } + else { + memset(data, 0x0, size); + } + + return size; +} + +int Udp::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + return -1; + } + + return bitrate; +} + +int Udp::close() +{ + return m_sock.close(); +} + +}; diff --git a/src/input/Udp.h b/src/input/Udp.h new file mode 100644 index 0000000..b6705e9 --- /dev/null +++ b/src/input/Udp.h @@ -0,0 +1,52 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMux. + + ODR-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMux. If not, see . + */ + +#pragma once + +#include +#include +#include "input/inputs.h" +#include "UdpSocket.h" + +namespace Inputs { + +class Udp : public InputBase { + public: + virtual int open(const std::string& name); + virtual int readFrame(void* buffer, int size); + virtual int setBitrate(int bitrate); + virtual int close(); + + private: + UdpSocket m_sock; + + // The content of the UDP packets gets written into the + // buffer, and the UDP packet boundaries disappear there. + std::vector m_buffer; +}; + +}; + -- cgit v1.2.3 From 8db328c61832a92bf3f7641061b68767141104f3 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 14:49:11 +0100 Subject: Change readFrame argument types --- src/dabInput.h | 2 +- src/input/File.cpp | 18 ++++++++---------- src/input/File.h | 6 +++--- src/input/Prbs.cpp | 8 +++----- src/input/Prbs.h | 2 +- src/input/Udp.cpp | 2 +- src/input/Udp.h | 2 +- src/input/Zmq.cpp | 2 +- src/input/Zmq.h | 2 +- src/input/inputs.h | 2 +- 10 files changed, 21 insertions(+), 25 deletions(-) (limited to 'src/input') diff --git a/src/dabInput.h b/src/dabInput.h index 0accddb..d2c5f49 100644 --- a/src/dabInput.h +++ b/src/dabInput.h @@ -62,7 +62,7 @@ class DabInputCompatible : public DabInputBase { virtual int setbuf(int size) { return m_ops.setbuf(args, size); } - virtual int readFrame(void* buffer, int size) + virtual int readFrame(uint8_t* buffer, size_t size) { if (m_ops.lock) { m_ops.lock(args); diff --git a/src/input/File.cpp b/src/input/File.cpp index 9721b97..eb26136 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -64,7 +64,7 @@ int FileBase::rewind() return ::lseek(m_fd, 0, SEEK_SET); } -int MPEGFile::readFrame(void* buffer, int size) +int MPEGFile::readFrame(uint8_t* buffer, size_t size) { int result; bool do_rewind = false; @@ -122,7 +122,7 @@ MUTE_SUBCHANNEL: memset(buffer, 0, size); } else { - if (result < size) { + if (result < (ssize_t)size) { etiLog.log(warn, "bitrate too low from file " "-> frame padded\n"); memset((char*)buffer + result, 0, size - result); @@ -164,7 +164,7 @@ MUTE_SUBCHANNEL: int MPEGFile::setBitrate(int bitrate) { if (bitrate == 0) { - char buffer[4]; + uint8_t buffer[4]; if (readFrame(buffer, 4) == 0) { bitrate = getMpegBitrate(buffer); @@ -178,11 +178,9 @@ int MPEGFile::setBitrate(int bitrate) } -int DABPlusFile::readFrame(void* buffer, int size) +int DABPlusFile::readFrame(uint8_t* buffer, size_t size) { - uint8_t* dataOut = reinterpret_cast(buffer); - - ssize_t ret = read(m_fd, dataOut, size); + ssize_t ret = read(m_fd, buffer, size); if (ret == -1) { etiLog.log(alert, "ERROR: Can't read file\n"); @@ -190,7 +188,7 @@ int DABPlusFile::readFrame(void* buffer, int size) return -1; } - if (ret < size) { + if (ret < (ssize_t)size) { ssize_t sizeOut = ret; etiLog.log(info, "reach end of file -> rewinding\n"); if (rewind() == -1) { @@ -198,14 +196,14 @@ int DABPlusFile::readFrame(void* buffer, int size) return -1; } - ret = read(m_fd, dataOut + sizeOut, size - sizeOut); + ret = read(m_fd, buffer + sizeOut, size - sizeOut); if (ret == -1) { etiLog.log(alert, "ERROR: Can't read file\n"); perror(""); return -1; } - if (ret < size) { + if (ret < (ssize_t)size) { etiLog.log(alert, "ERROR: Not enough data in file\n"); return -1; } diff --git a/src/input/File.h b/src/input/File.h index 61be8b1..01f4f21 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -36,7 +36,7 @@ namespace Inputs { class FileBase : public InputBase { public: virtual int open(const std::string& name); - virtual int readFrame(void* buffer, int size) = 0; + virtual int readFrame(uint8_t* buffer, size_t size) = 0; virtual int setBitrate(int bitrate) = 0; virtual int close(); @@ -52,7 +52,7 @@ class FileBase : public InputBase { class MPEGFile : public FileBase { public: - virtual int readFrame(void* buffer, int size); + virtual int readFrame(uint8_t* buffer, size_t size); virtual int setBitrate(int bitrate); private: @@ -61,7 +61,7 @@ class MPEGFile : public FileBase { class DABPlusFile : public FileBase { public: - virtual int readFrame(void* buffer, int size); + virtual int readFrame(uint8_t* buffer, size_t size); virtual int setBitrate(int bitrate); }; diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp index 8e5a9ae..607ce9f 100644 --- a/src/input/Prbs.cpp +++ b/src/input/Prbs.cpp @@ -77,12 +77,10 @@ int Prbs::open(const string& name) return 0; } -int Prbs::readFrame(void* buffer, int size) +int Prbs::readFrame(uint8_t* buffer, size_t size) { - unsigned char* cbuffer = reinterpret_cast(buffer); - - for (int i = 0; i < size; ++i) { - cbuffer[i] = m_prbs.step(); + for (size_t i = 0; i < size; ++i) { + buffer[i] = m_prbs.step(); } return size; diff --git a/src/input/Prbs.h b/src/input/Prbs.h index 1ad5047..3b2b7d4 100644 --- a/src/input/Prbs.h +++ b/src/input/Prbs.h @@ -38,7 +38,7 @@ namespace Inputs { class Prbs : public InputBase { public: virtual int open(const std::string& name); - virtual int readFrame(void* buffer, int size); + virtual int readFrame(uint8_t* buffer, size_t size); virtual int setBitrate(int bitrate); virtual int close(); diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp index e534a06..a238d9b 100644 --- a/src/input/Udp.cpp +++ b/src/input/Udp.cpp @@ -87,7 +87,7 @@ int Udp::open(const std::string& name) return 0; } -int Udp::readFrame(void* buffer, int size) +int Udp::readFrame(uint8_t* buffer, size_t size) { uint8_t* data = reinterpret_cast(buffer); diff --git a/src/input/Udp.h b/src/input/Udp.h index b6705e9..379dbf3 100644 --- a/src/input/Udp.h +++ b/src/input/Udp.h @@ -36,7 +36,7 @@ namespace Inputs { class Udp : public InputBase { public: virtual int open(const std::string& name); - virtual int readFrame(void* buffer, int size); + virtual int readFrame(uint8_t* buffer, size_t size); virtual int setBitrate(int bitrate); virtual int close(); diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp index 985fad3..a5601fa 100644 --- a/src/input/Zmq.cpp +++ b/src/input/Zmq.cpp @@ -246,7 +246,7 @@ int ZmqBase::setBitrate(int bitrate) } // size corresponds to a frame size. It is constant for a given bitrate -int ZmqBase::readFrame(void* buffer, int size) +int ZmqBase::readFrame(uint8_t* buffer, size_t size) { int rc; diff --git a/src/input/Zmq.h b/src/input/Zmq.h index 02fce3a..8d729e0 100644 --- a/src/input/Zmq.h +++ b/src/input/Zmq.h @@ -181,7 +181,7 @@ class ZmqBase : public InputBase, public RemoteControllable { } virtual int open(const std::string& inputUri); - virtual int readFrame(void* buffer, int size); + virtual int readFrame(uint8_t* buffer, size_t size); virtual int setBitrate(int bitrate); virtual int close(); diff --git a/src/input/inputs.h b/src/input/inputs.h index 3bc1fa4..bfb1fb6 100644 --- a/src/input/inputs.h +++ b/src/input/inputs.h @@ -39,7 +39,7 @@ namespace Inputs { class InputBase { public: virtual int open(const std::string& name) = 0; - virtual int readFrame(void* buffer, int size) = 0; + virtual int readFrame(uint8_t* buffer, size_t size) = 0; virtual int setBitrate(int bitrate) = 0; virtual int close() = 0; -- cgit v1.2.3 From 7405d574963abb37732de8a90dd9e42174e0410f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 4 Nov 2016 14:57:16 +0100 Subject: Use RawFile for DAB+ and for data --- src/ConfigParser.cpp | 8 +++----- src/input/File.cpp | 4 ++-- src/input/File.h | 3 +-- 3 files changed, 6 insertions(+), 9 deletions(-) (limited to 'src/input') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index ddcb9ed..a311d63 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -936,7 +936,7 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, subchan->input = make_shared(); } else if (type == "dabplus") { - subchan->input = make_shared(); + subchan->input = make_shared(); } else { throw logic_error("Incomplete handling of file input"); @@ -982,10 +982,8 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, else if (type == "data") { if (proto == "udp") { subchan->input = make_shared(); - } else if (proto == "file") { - // TODO - } else if (proto == "fifo") { - // TODO + } else if (proto == "file" or proto == "fifo") { + subchan->input = make_shared(); } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << diff --git a/src/input/File.cpp b/src/input/File.cpp index eb26136..732f2a2 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -178,7 +178,7 @@ int MPEGFile::setBitrate(int bitrate) } -int DABPlusFile::readFrame(uint8_t* buffer, size_t size) +int RawFile::readFrame(uint8_t* buffer, size_t size) { ssize_t ret = read(m_fd, buffer, size); @@ -212,7 +212,7 @@ int DABPlusFile::readFrame(uint8_t* buffer, size_t size) return size; } -int DABPlusFile::setBitrate(int bitrate) +int RawFile::setBitrate(int bitrate) { if (bitrate <= 0) { etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); diff --git a/src/input/File.h b/src/input/File.h index 01f4f21..99e0a87 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -59,11 +59,10 @@ class MPEGFile : public FileBase { bool m_parity = false; }; -class DABPlusFile : public FileBase { +class RawFile : public FileBase { public: virtual int readFrame(uint8_t* buffer, size_t size); virtual int setBitrate(int bitrate); }; - }; -- cgit v1.2.3 From 5fd4d99aded3677497c6cf5ab31517a5383333cb Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 5 Nov 2016 14:35:25 +0100 Subject: Add Packet File input --- src/ConfigParser.cpp | 7 ++ src/input/File.cpp | 282 +++++++++++++++++++++++++++++++++++++++++++++------ src/input/File.h | 26 ++++- 3 files changed, 280 insertions(+), 35 deletions(-) (limited to 'src/input') diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 3167d49..1ed1bac 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -1003,6 +1003,13 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, etiLog.level(warn) << "uid " << subchanuid << " of type Dmb uses RAW input"; } } + else if (type == "packet" or type == "enhancedpacket") { + subchan->type = subchannel_type_t::Packet; + subchan->bitrate = DEFAULT_PACKET_BITRATE; + + bool enhanced = (type == "enhancedpacket"); + subchan->input = make_shared(enhanced); + } else { stringstream ss; ss << "Subchannel with uid " << subchanuid << " has unknown type!"; diff --git a/src/input/File.cpp b/src/input/File.cpp index 732f2a2..5c61fd4 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -35,9 +35,31 @@ #include "input/File.h" #include "mpeg.h" +#include "ReedSolomon.h" namespace Inputs { +#ifdef _WIN32 +# pragma pack(push, 1) +#endif +struct packetHeader { + unsigned char addressHigh:2; + unsigned char last:1; + unsigned char first:1; + unsigned char continuityIndex:2; + unsigned char packetLength:2; + unsigned char addressLow; + unsigned char dataLength:7; + unsigned char command; +} +#ifdef _WIN32 +# pragma pack(pop) +#else +__attribute((packed)) +#endif +; + + int FileBase::open(const std::string& name) { m_fd = ::open(name.c_str(), O_RDONLY | O_BINARY); @@ -50,6 +72,17 @@ int FileBase::open(const std::string& name) return 0; } +int FileBase::setBitrate(int bitrate) +{ + if (bitrate <= 0) { + etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + return -1; + } + + return bitrate; +} + + int FileBase::close() { if (m_fd != -1) { @@ -64,6 +97,40 @@ int FileBase::rewind() return ::lseek(m_fd, 0, SEEK_SET); } +ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size) +{ + ssize_t ret = read(m_fd, buffer, size); + + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < (ssize_t)size) { + ssize_t sizeOut = ret; + etiLog.log(info, "reach end of file -> rewinding\n"); + if (rewind() == -1) { + etiLog.log(alert, "ERROR: Can't rewind file\n"); + return -1; + } + + ret = read(m_fd, buffer + sizeOut, size - sizeOut); + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file\n"); + perror(""); + return -1; + } + + if (ret < (ssize_t)size) { + etiLog.log(alert, "ERROR: Not enough data in file\n"); + return -1; + } + } + + return size; +} + int MPEGFile::readFrame(uint8_t* buffer, size_t size) { int result; @@ -177,49 +244,200 @@ int MPEGFile::setBitrate(int bitrate) return bitrate; } - int RawFile::readFrame(uint8_t* buffer, size_t size) { - ssize_t ret = read(m_fd, buffer, size); + return readFromFile(buffer, size); +} - if (ret == -1) { - etiLog.log(alert, "ERROR: Can't read file\n"); - perror(""); - return -1; - } +PacketFile::PacketFile(bool enhancedPacketMode) +{ + m_enhancedPacketEnabled = enhancedPacketMode; +} - if (ret < (ssize_t)size) { - ssize_t sizeOut = ret; - etiLog.log(info, "reach end of file -> rewinding\n"); - if (rewind() == -1) { - etiLog.log(alert, "ERROR: Can't rewind file\n"); - return -1; - } +int PacketFile::readFrame(uint8_t* buffer, size_t size) +{ + size_t written = 0; + int length; + packetHeader* header; + int indexRow; + int indexCol; - ret = read(m_fd, buffer + sizeOut, size - sizeOut); - if (ret == -1) { - etiLog.log(alert, "ERROR: Can't read file\n"); - perror(""); - return -1; + while (written < size) { + if (m_enhancedPacketWaiting > 0) { + *buffer = 192 - m_enhancedPacketWaiting; + *buffer /= 22; + *buffer <<= 2; + *(buffer++) |= 0x03; + *(buffer++) = 0xfe; + indexCol = 188; + indexCol += (192 - m_enhancedPacketWaiting) / 12; + indexRow = 0; + indexRow += (192 - m_enhancedPacketWaiting) % 12; + for (int j = 0; j < 22; ++j) { + if (m_enhancedPacketWaiting == 0) { + *(buffer++) = 0; + } + else { + *(buffer++) = m_enhancedPacketData[indexRow][indexCol]; + if (++indexRow == 12) { + indexRow = 0; + ++indexCol; + } + --m_enhancedPacketWaiting; + } + } + written += 24; + if (m_enhancedPacketWaiting == 0) { + m_enhancedPacketLength = 0; + } } + else if (m_packetLength != 0) { + header = (packetHeader*)(&m_packetData[0]); + if (written + m_packetLength > (unsigned)size) { + memset(buffer, 0, 22); + buffer[22] = 0x60; + buffer[23] = 0x4b; + length = 24; + } + else if (m_enhancedPacketEnabled) { + if (m_enhancedPacketLength + m_packetLength > (12 * 188)) { + memset(buffer, 0, 22); + buffer[22] = 0x60; + buffer[23] = 0x4b; + length = 24; + } + else { + std::copy(m_packetData.begin(), + m_packetData.begin() + m_packetLength, + buffer); + length = m_packetLength; + m_packetLength = 0; + } + } + else { + std::copy(m_packetData.begin(), + m_packetData.begin() + m_packetLength, + buffer); + length = m_packetLength; + m_packetLength = 0; + } - if (ret < (ssize_t)size) { - etiLog.log(alert, "ERROR: Not enough data in file\n"); - return -1; + if (m_enhancedPacketEnabled) { + indexCol = m_enhancedPacketLength / 12; + indexRow = m_enhancedPacketLength % 12; // TODO Check if always 0 + for (int j = 0; j < length; ++j) { + m_enhancedPacketData[indexRow][indexCol] = buffer[j]; + if (++indexRow == 12) { + indexRow = 0; + ++indexCol; + } + } + m_enhancedPacketLength += length; + if (m_enhancedPacketLength >= (12 * 188)) { + m_enhancedPacketLength = (12 * 188); + ReedSolomon encoder(204, 188); + for (int j = 0; j < 12; ++j) { + encoder.encode(&m_enhancedPacketData[j][0], 188); + } + m_enhancedPacketWaiting = 192; + } + } + written += length; + buffer += length; } - } + else { + int nbBytes = readFromFile(buffer, 3); + header = (packetHeader*)buffer; + if (nbBytes == -1) { + if (errno == EAGAIN) goto END_PACKET; + perror("Packet file"); + return -1; + } + else if (nbBytes == 0) { + if (rewind() == -1) { + goto END_PACKET; + } + continue; + } + else if (nbBytes < 3) { + etiLog.log(error, + "Error while reading file for packet header; " + "read %i out of 3 bytes\n", nbBytes); + break; + } - return size; -} + length = header->packetLength * 24 + 24; + if (written + length > size) { + memcpy(&m_packetData[0], header, 3); + readFromFile(&m_packetData[3], length - 3); + m_packetLength = length; + continue; + } -int RawFile::setBitrate(int bitrate) -{ - if (bitrate <= 0) { - etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); - return -1; - } + if (m_enhancedPacketEnabled) { + if (m_enhancedPacketLength + length > (12 * 188)) { + memcpy(&m_packetData[0], header, 3); + readFromFile(&m_packetData[3], length - 3); + m_packetLength = length; + continue; + } + } - return bitrate; + nbBytes = readFromFile(buffer + 3, length - 3); + if (nbBytes == -1) { + perror("Packet file"); + return -1; + } + else if (nbBytes == 0) { + etiLog.log(info, + "Packet header read, but no data!\n"); + if (rewind() == -1) { + goto END_PACKET; + } + continue; + } + else if (nbBytes < length - 3) { + etiLog.log(error, "Error while reading packet file; " + "read %i out of %i bytes\n", nbBytes, length - 3); + break; + } + + if (m_enhancedPacketEnabled) { + indexCol = m_enhancedPacketLength / 12; + indexRow = m_enhancedPacketLength % 12; // TODO Check if always 0 + for (int j = 0; j < length; ++j) { + m_enhancedPacketData[indexRow][indexCol] = buffer[j]; + if (++indexRow == 12) { + indexRow = 0; + ++indexCol; + } + } + m_enhancedPacketLength += length; + if (m_enhancedPacketLength >= (12 * 188)) { + if (m_enhancedPacketLength > (12 * 188)) { + etiLog.log(error, + "Error, too much enhanced packet data!\n"); + } + ReedSolomon encoder(204, 188); + for (int j = 0; j < 12; ++j) { + encoder.encode(&m_enhancedPacketData[j][0], 188); + } + m_enhancedPacketWaiting = 192; + } + } + written += length; + buffer += length; + } + } +END_PACKET: + while (written < size) { + memset(buffer, 0, 22); + buffer[22] = 0x60; + buffer[23] = 0x4b; + buffer += 24; + written += 24; + } + return written; } }; diff --git a/src/input/File.h b/src/input/File.h index 99e0a87..bf99748 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -26,6 +26,7 @@ #pragma once #include +#include #include #include #include "input/inputs.h" @@ -37,7 +38,7 @@ class FileBase : public InputBase { public: virtual int open(const std::string& name); virtual int readFrame(uint8_t* buffer, size_t size) = 0; - virtual int setBitrate(int bitrate) = 0; + virtual int setBitrate(int bitrate); virtual int close(); /* Rewind the file @@ -45,8 +46,13 @@ class FileBase : public InputBase { */ virtual int rewind(); protected: + /* Read len bytes from the file into buf, and return + * the number of bytes read, or -1 in case of error. + */ + virtual ssize_t readFromFile(uint8_t* buf, size_t len); + // We use unix open() instead of fopen() because - // we want to do non-blocking I/O + // we might want to do non-blocking I/O in the future int m_fd = -1; }; @@ -62,7 +68,21 @@ class MPEGFile : public FileBase { class RawFile : public FileBase { public: virtual int readFrame(uint8_t* buffer, size_t size); - virtual int setBitrate(int bitrate); +}; + +class PacketFile : public FileBase { + public: + PacketFile(bool enhancedPacketMode); + virtual int readFrame(uint8_t* buffer, size_t size); + + protected: + std::array m_packetData; + size_t m_packetLength; + + bool m_enhancedPacketEnabled = false; + std::array,12> m_enhancedPacketData; + size_t m_enhancedPacketWaiting; + size_t m_enhancedPacketLength; }; }; -- cgit v1.2.3 From b55b73e8b0b1bfb87e3dd68dd867967ae1ef2b3f Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 5 Nov 2016 16:34:04 +0100 Subject: Fix default PRBS polynomial --- src/input/Prbs.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/input') diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp index 607ce9f..7856a46 100644 --- a/src/input/Prbs.cpp +++ b/src/input/Prbs.cpp @@ -42,7 +42,7 @@ namespace Inputs { // ETS 300 799 Clause G.2.1 // Preferred polynomial is G(x) = x^20 + x^17 + 1 -const uint32_t PRBS_DEFAULT_POLY = (1 << 19) | (1 << 16) | 1; +const uint32_t PRBS_DEFAULT_POLY = (1 << 20) | (1 << 17) | (1 << 0); int Prbs::open(const string& name) { -- cgit v1.2.3 From 1e1e43d9bfa006049bde3d210030e5f0b2d38ca3 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 5 Nov 2016 16:34:40 +0100 Subject: Describe inputs a bit better in advanced.mux --- doc/advanced.mux | 32 ++++++++++++++++++++++++++++++-- src/input/File.h | 3 +++ 2 files changed, 33 insertions(+), 2 deletions(-) (limited to 'src/input') diff --git a/doc/advanced.mux b/doc/advanced.mux index 8d460d2..41a3446 100644 --- a/doc/advanced.mux +++ b/doc/advanced.mux @@ -130,8 +130,17 @@ services { } ; The subchannels are defined in the corresponding section. -; supported types are : audio, bridge, data, enhancedpacket, -; dabplus, dmb, packet, test +; supported types are : audio, data, enhancedpacket, +; dabplus, packet +; +; Type 'packet' expects to receive data in the format described +; in EN 300 401 Clause 3.5.2. +; +; 'enhancedpacket' mode will calculate FEC for MSC packet mode +; as described in EN 300 401 Clause 5.3.5. +; +; 'data' will read from the source and write it unmodified into +; the MSC. subchannels { sub-fu { type audio @@ -235,6 +244,25 @@ subchannels { zmq-buffer 40 zmq-prebuffering 20 } + + ; 'prbs' will generate a pseudorandom bit sequence according to + ; ETS 300 799 Clause G.2.1. This is useful for testing purposes and + ; measurement of bit error rate. + sub-prbs { + type data + + ; Use the default PRBS polynomial. + inputfile "prbs://" + + ; To use another polynomial, set it in the url as hexadecimal + ; The default polynomial is G(x) = x^20 + x^17 + 1, represented as + ; (1 << 20) + (1 << 17) + (1 << 0) = 0x120001 + ;inputuri "prbs://:0x120001 + + bitrate 16 + id 5 + protection 3 + } } ; For now, each component links one service to one subchannel diff --git a/src/input/File.h b/src/input/File.h index bf99748..080d6b5 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -79,6 +79,9 @@ class PacketFile : public FileBase { std::array m_packetData; size_t m_packetLength; + /* Enhanced packet mode enables FEC for MSC packet mode + * as described in EN 300 401 Clause 5.3.5 + */ bool m_enhancedPacketEnabled = false; std::array,12> m_enhancedPacketData; size_t m_enhancedPacketWaiting; -- cgit v1.2.3