aboutsummaryrefslogtreecommitdiffstats
path: root/src/input
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-11-07 21:37:47 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-11-07 21:37:47 +0100
commit95b197a53c8314a68fd8cf73e495018844e7d708 (patch)
treed7a1f48c0c7f04bea34c9e19dc219195469df563 /src/input
parent33e51f5996c02c6a6aee27b57d91d90e3f1db5a2 (diff)
parent21d8cacffc1201514b0e1143f8de60e754262291 (diff)
downloaddabmux-95b197a53c8314a68fd8cf73e495018844e7d708.tar.gz
dabmux-95b197a53c8314a68fd8cf73e495018844e7d708.tar.bz2
dabmux-95b197a53c8314a68fd8cf73e495018844e7d708.zip
Merge rework of inputs into 'next'
This breaks the non-blocking fifo inputs, as it seems nobody is using them. It restores some UDP functionality, not necessarily in the same way as before.
Diffstat (limited to 'src/input')
-rw-r--r--src/input/File.cpp443
-rw-r--r--src/input/File.h91
-rw-r--r--src/input/Prbs.cpp105
-rw-r--r--src/input/Prbs.h52
-rw-r--r--src/input/Udp.cpp134
-rw-r--r--src/input/Udp.h52
-rw-r--r--src/input/Zmq.cpp616
-rw-r--r--src/input/Zmq.h264
-rw-r--r--src/input/inputs.h52
9 files changed, 1809 insertions, 0 deletions
diff --git a/src/input/File.cpp b/src/input/File.cpp
new file mode 100644
index 0000000..5c61fd4
--- /dev/null
+++ b/src/input/File.cpp
@@ -0,0 +1,443 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016 Matthias P. Braendli
+ http://www.opendigitalradio.org
+
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <sstream>
+#include <errno.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <fcntl.h>
+#include <unistd.h>
+#ifndef _WIN32
+# define O_BINARY 0
+#endif
+
+#include "input/File.h"
+#include "mpeg.h"
+#include "ReedSolomon.h"
+
+namespace Inputs {
+
+#ifdef _WIN32
+# pragma pack(push, 1)
+#endif
+struct packetHeader {
+ unsigned char addressHigh:2;
+ unsigned char last:1;
+ unsigned char first:1;
+ unsigned char continuityIndex:2;
+ unsigned char packetLength:2;
+ unsigned char addressLow;
+ unsigned char dataLength:7;
+ unsigned char command;
+}
+#ifdef _WIN32
+# pragma pack(pop)
+#else
+__attribute((packed))
+#endif
+;
+
+
+int FileBase::open(const std::string& name)
+{
+ m_fd = ::open(name.c_str(), O_RDONLY | O_BINARY);
+ if (m_fd == -1) {
+ std::stringstream ss;
+ ss << "Could not open input file " << name << ": " <<
+ strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+ return 0;
+}
+
+int FileBase::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ etiLog.log(error, "Invalid bitrate (%i)\n", bitrate);
+ return -1;
+ }
+
+ return bitrate;
+}
+
+
+int FileBase::close()
+{
+ if (m_fd != -1) {
+ ::close(m_fd);
+ m_fd = -1;
+ }
+ return 0;
+}
+
+int FileBase::rewind()
+{
+ return ::lseek(m_fd, 0, SEEK_SET);
+}
+
+ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size)
+{
+ ssize_t ret = read(m_fd, buffer, size);
+
+ if (ret == -1) {
+ etiLog.log(alert, "ERROR: Can't read file\n");
+ perror("");
+ return -1;
+ }
+
+ if (ret < (ssize_t)size) {
+ ssize_t sizeOut = ret;
+ etiLog.log(info, "reach end of file -> rewinding\n");
+ if (rewind() == -1) {
+ etiLog.log(alert, "ERROR: Can't rewind file\n");
+ return -1;
+ }
+
+ ret = read(m_fd, buffer + sizeOut, size - sizeOut);
+ if (ret == -1) {
+ etiLog.log(alert, "ERROR: Can't read file\n");
+ perror("");
+ return -1;
+ }
+
+ if (ret < (ssize_t)size) {
+ etiLog.log(alert, "ERROR: Not enough data in file\n");
+ return -1;
+ }
+ }
+
+ return size;
+}
+
+int MPEGFile::readFrame(uint8_t* buffer, size_t size)
+{
+ int result;
+ bool do_rewind = false;
+READ_SUBCHANNEL:
+ if (m_parity) {
+ result = readData(m_fd, buffer, size, 2);
+ m_parity = false;
+ return 0;
+ } else {
+ result = readMpegHeader(m_fd, buffer, size);
+ if (result > 0) {
+ result = readMpegFrame(m_fd, buffer, size);
+ if (result < 0 && getMpegFrequency(buffer) == 24000) {
+ m_parity = true;
+ result = size;
+ }
+ }
+ }
+ switch (result) {
+ case MPEG_BUFFER_UNDERFLOW:
+ etiLog.log(warn, "data underflow -> frame muted\n");
+ goto MUTE_SUBCHANNEL;
+ case MPEG_BUFFER_OVERFLOW:
+ etiLog.log(warn, "bitrate too high -> frame muted\n");
+ goto MUTE_SUBCHANNEL;
+ case MPEG_FILE_EMPTY:
+ if (do_rewind) {
+ etiLog.log(error, "file rewinded and still empty "
+ "-> frame muted\n");
+ goto MUTE_SUBCHANNEL;
+ }
+ else {
+ etiLog.log(info, "reach end of file -> rewinding\n");
+ rewind();
+ goto READ_SUBCHANNEL;
+ }
+ case MPEG_FILE_ERROR:
+ etiLog.log(alert, "can't read file (%i) -> frame muted\n", errno);
+ perror("");
+ goto MUTE_SUBCHANNEL;
+ case MPEG_SYNC_NOT_FOUND:
+ etiLog.log(alert, "mpeg sync not found, maybe is not a valid file "
+ "-> frame muted\n");
+ goto MUTE_SUBCHANNEL;
+ case MPEG_INVALID_FRAME:
+ etiLog.log(alert, "file is not a valid mpeg file "
+ "-> frame muted\n");
+ goto MUTE_SUBCHANNEL;
+ default:
+ if (result < 0) {
+ etiLog.log(alert,
+ "unknown error (code = %i) -> frame muted\n",
+ result);
+MUTE_SUBCHANNEL:
+ memset(buffer, 0, size);
+ }
+ else {
+ if (result < (ssize_t)size) {
+ etiLog.log(warn, "bitrate too low from file "
+ "-> frame padded\n");
+ memset((char*)buffer + result, 0, size - result);
+ }
+
+ result = checkDabMpegFrame(buffer);
+ switch (result) {
+ case MPEG_FREQUENCY:
+ etiLog.log(error, "file has a frame with an invalid "
+ "frequency: %i, should be 48000 or 24000\n",
+ getMpegFrequency(buffer));
+ break;
+ case MPEG_PADDING:
+ etiLog.log(warn,
+ "file has a frame with padding bit set\n");
+ break;
+ case MPEG_COPYRIGHT:
+ result = 0;
+ break;
+ case MPEG_ORIGINAL:
+ result = 0;
+ break;
+ case MPEG_EMPHASIS:
+ etiLog.log(warn,
+ "file has a frame with emphasis bits set\n");
+ break;
+ default:
+ if (result < 0) {
+ etiLog.log(alert, "mpeg file has an invalid DAB "
+ "mpeg frame (unknown reason: %i)\n", result);
+ }
+ break;
+ }
+ }
+ }
+ return result;
+}
+
+int MPEGFile::setBitrate(int bitrate)
+{
+ if (bitrate == 0) {
+ uint8_t buffer[4];
+
+ if (readFrame(buffer, 4) == 0) {
+ bitrate = getMpegBitrate(buffer);
+ }
+ else {
+ bitrate = -1;
+ }
+ rewind();
+ }
+ return bitrate;
+}
+
+int RawFile::readFrame(uint8_t* buffer, size_t size)
+{
+ return readFromFile(buffer, size);
+}
+
+PacketFile::PacketFile(bool enhancedPacketMode)
+{
+ m_enhancedPacketEnabled = enhancedPacketMode;
+}
+
+int PacketFile::readFrame(uint8_t* buffer, size_t size)
+{
+ size_t written = 0;
+ int length;
+ packetHeader* header;
+ int indexRow;
+ int indexCol;
+
+ while (written < size) {
+ if (m_enhancedPacketWaiting > 0) {
+ *buffer = 192 - m_enhancedPacketWaiting;
+ *buffer /= 22;
+ *buffer <<= 2;
+ *(buffer++) |= 0x03;
+ *(buffer++) = 0xfe;
+ indexCol = 188;
+ indexCol += (192 - m_enhancedPacketWaiting) / 12;
+ indexRow = 0;
+ indexRow += (192 - m_enhancedPacketWaiting) % 12;
+ for (int j = 0; j < 22; ++j) {
+ if (m_enhancedPacketWaiting == 0) {
+ *(buffer++) = 0;
+ }
+ else {
+ *(buffer++) = m_enhancedPacketData[indexRow][indexCol];
+ if (++indexRow == 12) {
+ indexRow = 0;
+ ++indexCol;
+ }
+ --m_enhancedPacketWaiting;
+ }
+ }
+ written += 24;
+ if (m_enhancedPacketWaiting == 0) {
+ m_enhancedPacketLength = 0;
+ }
+ }
+ else if (m_packetLength != 0) {
+ header = (packetHeader*)(&m_packetData[0]);
+ if (written + m_packetLength > (unsigned)size) {
+ memset(buffer, 0, 22);
+ buffer[22] = 0x60;
+ buffer[23] = 0x4b;
+ length = 24;
+ }
+ else if (m_enhancedPacketEnabled) {
+ if (m_enhancedPacketLength + m_packetLength > (12 * 188)) {
+ memset(buffer, 0, 22);
+ buffer[22] = 0x60;
+ buffer[23] = 0x4b;
+ length = 24;
+ }
+ else {
+ std::copy(m_packetData.begin(),
+ m_packetData.begin() + m_packetLength,
+ buffer);
+ length = m_packetLength;
+ m_packetLength = 0;
+ }
+ }
+ else {
+ std::copy(m_packetData.begin(),
+ m_packetData.begin() + m_packetLength,
+ buffer);
+ length = m_packetLength;
+ m_packetLength = 0;
+ }
+
+ if (m_enhancedPacketEnabled) {
+ indexCol = m_enhancedPacketLength / 12;
+ indexRow = m_enhancedPacketLength % 12; // TODO Check if always 0
+ for (int j = 0; j < length; ++j) {
+ m_enhancedPacketData[indexRow][indexCol] = buffer[j];
+ if (++indexRow == 12) {
+ indexRow = 0;
+ ++indexCol;
+ }
+ }
+ m_enhancedPacketLength += length;
+ if (m_enhancedPacketLength >= (12 * 188)) {
+ m_enhancedPacketLength = (12 * 188);
+ ReedSolomon encoder(204, 188);
+ for (int j = 0; j < 12; ++j) {
+ encoder.encode(&m_enhancedPacketData[j][0], 188);
+ }
+ m_enhancedPacketWaiting = 192;
+ }
+ }
+ written += length;
+ buffer += length;
+ }
+ else {
+ int nbBytes = readFromFile(buffer, 3);
+ header = (packetHeader*)buffer;
+ if (nbBytes == -1) {
+ if (errno == EAGAIN) goto END_PACKET;
+ perror("Packet file");
+ return -1;
+ }
+ else if (nbBytes == 0) {
+ if (rewind() == -1) {
+ goto END_PACKET;
+ }
+ continue;
+ }
+ else if (nbBytes < 3) {
+ etiLog.log(error,
+ "Error while reading file for packet header; "
+ "read %i out of 3 bytes\n", nbBytes);
+ break;
+ }
+
+ length = header->packetLength * 24 + 24;
+ if (written + length > size) {
+ memcpy(&m_packetData[0], header, 3);
+ readFromFile(&m_packetData[3], length - 3);
+ m_packetLength = length;
+ continue;
+ }
+
+ if (m_enhancedPacketEnabled) {
+ if (m_enhancedPacketLength + length > (12 * 188)) {
+ memcpy(&m_packetData[0], header, 3);
+ readFromFile(&m_packetData[3], length - 3);
+ m_packetLength = length;
+ continue;
+ }
+ }
+
+ nbBytes = readFromFile(buffer + 3, length - 3);
+ if (nbBytes == -1) {
+ perror("Packet file");
+ return -1;
+ }
+ else if (nbBytes == 0) {
+ etiLog.log(info,
+ "Packet header read, but no data!\n");
+ if (rewind() == -1) {
+ goto END_PACKET;
+ }
+ continue;
+ }
+ else if (nbBytes < length - 3) {
+ etiLog.log(error, "Error while reading packet file; "
+ "read %i out of %i bytes\n", nbBytes, length - 3);
+ break;
+ }
+
+ if (m_enhancedPacketEnabled) {
+ indexCol = m_enhancedPacketLength / 12;
+ indexRow = m_enhancedPacketLength % 12; // TODO Check if always 0
+ for (int j = 0; j < length; ++j) {
+ m_enhancedPacketData[indexRow][indexCol] = buffer[j];
+ if (++indexRow == 12) {
+ indexRow = 0;
+ ++indexCol;
+ }
+ }
+ m_enhancedPacketLength += length;
+ if (m_enhancedPacketLength >= (12 * 188)) {
+ if (m_enhancedPacketLength > (12 * 188)) {
+ etiLog.log(error,
+ "Error, too much enhanced packet data!\n");
+ }
+ ReedSolomon encoder(204, 188);
+ for (int j = 0; j < 12; ++j) {
+ encoder.encode(&m_enhancedPacketData[j][0], 188);
+ }
+ m_enhancedPacketWaiting = 192;
+ }
+ }
+ written += length;
+ buffer += length;
+ }
+ }
+END_PACKET:
+ while (written < size) {
+ memset(buffer, 0, 22);
+ buffer[22] = 0x60;
+ buffer[23] = 0x4b;
+ buffer += 24;
+ written += 24;
+ }
+ return written;
+}
+
+};
diff --git a/src/input/File.h b/src/input/File.h
new file mode 100644
index 0000000..080d6b5
--- /dev/null
+++ b/src/input/File.h
@@ -0,0 +1,91 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016 Matthias P. Braendli
+ http://www.opendigitalradio.org
+
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <vector>
+#include <array>
+#include <string>
+#include <stdint.h>
+#include "input/inputs.h"
+#include "ManagementServer.h"
+
+namespace Inputs {
+
+class FileBase : public InputBase {
+ public:
+ virtual int open(const std::string& name);
+ virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ /* Rewind the file
+ * Returns -1 on failure, 0 on success
+ */
+ virtual int rewind();
+ protected:
+ /* Read len bytes from the file into buf, and return
+ * the number of bytes read, or -1 in case of error.
+ */
+ virtual ssize_t readFromFile(uint8_t* buf, size_t len);
+
+ // We use unix open() instead of fopen() because
+ // we might want to do non-blocking I/O in the future
+ int m_fd = -1;
+};
+
+class MPEGFile : public FileBase {
+ public:
+ virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual int setBitrate(int bitrate);
+
+ private:
+ bool m_parity = false;
+};
+
+class RawFile : public FileBase {
+ public:
+ virtual int readFrame(uint8_t* buffer, size_t size);
+};
+
+class PacketFile : public FileBase {
+ public:
+ PacketFile(bool enhancedPacketMode);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+
+ protected:
+ std::array<uint8_t,96> m_packetData;
+ size_t m_packetLength;
+
+ /* Enhanced packet mode enables FEC for MSC packet mode
+ * as described in EN 300 401 Clause 5.3.5
+ */
+ bool m_enhancedPacketEnabled = false;
+ std::array<std::array<uint8_t, 204>,12> m_enhancedPacketData;
+ size_t m_enhancedPacketWaiting;
+ size_t m_enhancedPacketLength;
+};
+
+};
diff --git a/src/input/Prbs.cpp b/src/input/Prbs.cpp
new file mode 100644
index 0000000..7856a46
--- /dev/null
+++ b/src/input/Prbs.cpp
@@ -0,0 +1,105 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ Pseudo-Random Bit Sequence generator for test purposes.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Prbs.h"
+
+#include <stdexcept>
+#include <sstream>
+#include <string.h>
+#include <limits.h>
+#include <stdlib.h>
+#include <errno.h>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+// ETS 300 799 Clause G.2.1
+// Preferred polynomial is G(x) = x^20 + x^17 + 1
+const uint32_t PRBS_DEFAULT_POLY = (1 << 20) | (1 << 17) | (1 << 0);
+
+int Prbs::open(const string& name)
+{
+ if (name.substr(0, 7) != "prbs://") {
+ throw logic_error("Invalid PRBS name");
+ }
+
+ const string& url_polynomial = name.substr(7);
+
+ if (url_polynomial.empty()) {
+ m_prbs.setup(PRBS_DEFAULT_POLY);
+ }
+ else {
+ if (url_polynomial[0] != ':') {
+ throw invalid_argument(
+ "Invalid PRBS address format. "
+ "Must be prbs://:polynomial.");
+ }
+
+ const string poly_str = url_polynomial.substr(1);
+
+ long polynomial = hexparse(poly_str);
+
+ if (polynomial == 0) {
+ throw invalid_argument("No polynomial given for PRBS input");
+ }
+
+ m_prbs.setup(polynomial);
+ }
+ rewind();
+
+ return 0;
+}
+
+int Prbs::readFrame(uint8_t* buffer, size_t size)
+{
+ for (size_t i = 0; i < size; ++i) {
+ buffer[i] = m_prbs.step();
+ }
+
+ return size;
+}
+
+int Prbs::setBitrate(int bitrate)
+{
+ return bitrate;
+}
+
+int Prbs::close()
+{
+ return 0;
+}
+
+int Prbs::rewind()
+{
+ m_prbs.rewind();
+ return 0;
+}
+
+};
diff --git a/src/input/Prbs.h b/src/input/Prbs.h
new file mode 100644
index 0000000..3b2b7d4
--- /dev/null
+++ b/src/input/Prbs.h
@@ -0,0 +1,52 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ Pseudo-Random Bit Sequence generator for test purposes.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+
+#include "input/inputs.h"
+#include "prbs.h"
+
+namespace Inputs {
+
+class Prbs : public InputBase {
+ public:
+ virtual int open(const std::string& name);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ private:
+ virtual int rewind();
+
+ PrbsGenerator m_prbs;
+};
+
+};
+
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
new file mode 100644
index 0000000..a238d9b
--- /dev/null
+++ b/src/input/Udp.cpp
@@ -0,0 +1,134 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Udp.h"
+
+#include <stdexcept>
+#include <sstream>
+#include <string.h>
+#include <limits.h>
+#include <stdlib.h>
+#include <errno.h>
+#include "utils.h"
+
+using namespace std;
+
+namespace Inputs {
+
+int Udp::open(const std::string& name)
+{
+ // Skip the udp:// part if it is present
+ const string endpoint = (name.substr(0, 6) == "udp://") ?
+ name.substr(6) : name;
+
+ // The endpoint should be address:port
+ const auto colon_pos = endpoint.find_first_of(":");
+ if (colon_pos == string::npos) {
+ stringstream ss;
+ ss << "'" << name <<
+ " is an invalid format for udp address: "
+ "expected [udp://]address:port";
+ throw invalid_argument(ss.str());
+ }
+
+ const auto address = endpoint.substr(0, colon_pos);
+ const auto port_str = endpoint.substr(colon_pos + 1);
+
+ const long port = strtol(port_str.c_str(), nullptr, 10);
+
+ if ((port == LONG_MIN or port == LONG_MAX) and errno == ERANGE) {
+ throw out_of_range("udp input: port out of range");
+ }
+ else if (port == 0 and errno != 0) {
+ stringstream ss;
+ ss << "udp input port parse error: " << strerror(errno);
+ throw invalid_argument(ss.str());
+ }
+
+ if (port == 0) {
+ throw out_of_range("can't use port number 0 in udp address");
+ }
+
+ if (m_sock.reinit(port, address) == -1) {
+ stringstream ss;
+ ss << "Could not init UDP socket: " << inetErrMsg;
+ throw runtime_error(ss.str());
+ }
+
+ if (m_sock.setBlocking(false) == -1) {
+ stringstream ss;
+ ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
+ throw runtime_error(ss.str());
+ }
+
+ return 0;
+}
+
+int Udp::readFrame(uint8_t* buffer, size_t size)
+{
+ uint8_t* data = reinterpret_cast<uint8_t*>(buffer);
+
+ // Regardless of buffer contents, try receiving data.
+ UdpPacket packet;
+ int ret = m_sock.receive(packet);
+
+ if (ret == -1) {
+ stringstream ss;
+ ss << "Could not read from UDP socket: " << inetErrMsg;
+ throw runtime_error(ss.str());
+ }
+
+ std::copy(packet.getData(), packet.getData() + packet.getSize(),
+ back_inserter(m_buffer));
+
+ // Take data from the buffer if it contains enough data,
+ // in any case write the buffer
+ if (m_buffer.size() >= (size_t)size) {
+ std::copy(m_buffer.begin(), m_buffer.begin() + size, data);
+ }
+ else {
+ memset(data, 0x0, size);
+ }
+
+ return size;
+}
+
+int Udp::setBitrate(int bitrate)
+{
+ if (bitrate <= 0) {
+ etiLog.log(error, "Invalid bitrate (%i)\n", bitrate);
+ return -1;
+ }
+
+ return bitrate;
+}
+
+int Udp::close()
+{
+ return m_sock.close();
+}
+
+};
diff --git a/src/input/Udp.h b/src/input/Udp.h
new file mode 100644
index 0000000..379dbf3
--- /dev/null
+++ b/src/input/Udp.h
@@ -0,0 +1,52 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <string>
+#include <vector>
+#include "input/inputs.h"
+#include "UdpSocket.h"
+
+namespace Inputs {
+
+class Udp : public InputBase {
+ public:
+ virtual int open(const std::string& name);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ private:
+ UdpSocket m_sock;
+
+ // The content of the UDP packets gets written into the
+ // buffer, and the UDP packet boundaries disappear there.
+ std::vector<uint8_t> m_buffer;
+};
+
+};
+
diff --git a/src/input/Zmq.cpp b/src/input/Zmq.cpp
new file mode 100644
index 0000000..a5601fa
--- /dev/null
+++ b/src/input/Zmq.cpp
@@ -0,0 +1,616 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016 Matthias P. Braendli
+ http://www.opendigitalradio.org
+
+ ZeroMQ input. see www.zeromq.org for more info
+
+ For the AAC+ input, each zeromq message must contain one superframe
+ or one zmq_frame_header_t followed by a superframe.
+
+ For the MPEG input, each zeromq message must contain one frame.
+
+ Encryption is provided by zmq_curve, see the corresponding manpage.
+
+ From the ZeroMQ manpage 'zmq':
+
+ The 0MQ lightweight messaging kernel is a library which extends the standard
+ socket interfaces with features traditionally provided by specialised
+ messaging middleware products. 0MQ sockets provide an abstraction of
+ asynchronous message queues, multiple messaging patterns, message filtering
+ (subscriptions), seamless access to multiple transport protocols and more.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "input/Zmq.h"
+
+#include <cstdio>
+#include <cstdlib>
+#include <list>
+#include <exception>
+#include <cstring>
+#include <string>
+#include <sstream>
+#include <limits.h>
+#include "PcDebug.h"
+#include "Log.h"
+
+#ifdef __MINGW32__
+# define bzero(s, n) memset(s, 0, n)
+#endif
+
+namespace Inputs {
+
+using namespace std;
+
+int readkey(string& keyfile, char* key)
+{
+ FILE* fd = fopen(keyfile.c_str(), "r");
+ if (fd == nullptr) {
+ return -1;
+ }
+
+ int ret = fread(key, CURVE_KEYLEN, 1, fd);
+ fclose(fd);
+ if (ret == 0) {
+ return -1;
+ }
+
+ /* It needs to be zero-terminated */
+ key[CURVE_KEYLEN] = '\0';
+
+ return 0;
+}
+
+/***** Common functions (MPEG and AAC) ******/
+
+/* If necessary, unbind the socket, then check the keys,
+ * if they are ok and encryption is required, set the
+ * keys to the socket, and finally bind the socket
+ * to the new address
+ */
+void ZmqBase::rebind()
+{
+ if (! m_zmq_sock_bound_to.empty()) {
+ try {
+ m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str());
+ }
+ catch (zmq::error_t& err) {
+ etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed";
+ }
+ }
+
+ m_zmq_sock_bound_to = "";
+
+ /* Load each key independently */
+ if (! m_config.curve_public_keyfile.empty()) {
+ int rc = readkey(m_config.curve_public_keyfile, m_curve_public_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid public key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_public_key);
+ }
+ }
+
+ if (! m_config.curve_secret_keyfile.empty()) {
+ int rc = readkey(m_config.curve_secret_keyfile, m_curve_secret_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid secret key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_secret_key);
+ }
+ }
+
+ if (! m_config.curve_encoder_keyfile.empty()) {
+ int rc = readkey(m_config.curve_encoder_keyfile, m_curve_encoder_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid encoder key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_encoder_key);
+ }
+ }
+
+ /* If you want encryption, you need to have defined all
+ * key files
+ */
+ if ( m_config.enable_encryption &&
+ ( ! (KEY_VALID(m_curve_public_key) &&
+ KEY_VALID(m_curve_secret_key) &&
+ KEY_VALID(m_curve_encoder_key) ) ) ) {
+ throw std::runtime_error("When enabling encryption, all three "
+ "keyfiles must be valid!");
+ }
+
+ if (m_config.enable_encryption) {
+ try {
+ /* We want to check that the encoder is the right one,
+ * so the encoder is the CURVE server.
+ */
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SERVERKEY,
+ m_curve_encoder_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set encoder key for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_CURVE_PUBLICKEY,
+ m_curve_public_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set public key for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY,
+ m_curve_secret_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set secret key for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+ }
+ else {
+ try {
+ /* This forces the socket to go to the ZMQ_NULL auth
+ * mechanism
+ */
+ const int no = 0;
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SERVER, &no, sizeof(no));
+ }
+ catch (zmq::error_t& err) {
+ etiLog.level(warn) << "ZMQ disable encryption keys for input " <<
+ m_name << " failed: " << err.what();
+ }
+
+ }
+
+ // Prepare the ZMQ socket to accept connections
+ try {
+ m_zmq_sock.bind(m_inputUri.c_str());
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ bind for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+
+ m_zmq_sock_bound_to = m_inputUri;
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set socket options for input " << m_name << " failed" <<
+ err.what();
+ throw std::runtime_error(os.str());
+ }
+}
+
+int ZmqBase::open(const std::string& inputUri)
+{
+ m_inputUri = inputUri;
+
+ /* Let caller handle exceptions when we open() */
+ rebind();
+
+ // We want to appear in the statistics !
+ m_stats.registerAtServer();
+
+ return 0;
+}
+
+int ZmqBase::close()
+{
+ m_zmq_sock.close();
+ return 0;
+}
+
+int ZmqBase::setBitrate(int bitrate)
+{
+ m_bitrate = bitrate;
+ return bitrate; // TODO do a nice check here
+}
+
+// size corresponds to a frame size. It is constant for a given bitrate
+int ZmqBase::readFrame(uint8_t* buffer, size_t size)
+{
+ int rc;
+
+ /* We must *always* read data from the ZMQ socket,
+ * to make sure that ZMQ internal buffers are emptied
+ * quickly. It's the only way to control the buffers
+ * of the whole path from encoder to our frame_buffer.
+ */
+ rc = readFromSocket(size);
+
+ /* Notify of a buffer overrun, and drop some frames */
+ if (m_frame_buffer.size() >= m_config.buffer_size) {
+ m_stats.notifyOverrun();
+
+ /* If the buffer is really too full, we drop as many frames as needed
+ * to get down to the prebuffering size. We would like to have our buffer
+ * filled to the prebuffering length.
+ */
+ if (m_frame_buffer.size() >= 1.5*m_config.buffer_size) {
+ size_t over_max = m_frame_buffer.size() - m_config.prebuffering;
+
+ while (over_max--) {
+ delete[] m_frame_buffer.front();
+ m_frame_buffer.pop_front();
+ }
+ }
+ else {
+ /* Our frame_buffer contains DAB logical frames. Five of these make one
+ * AAC superframe.
+ *
+ * Dropping this superframe amounts to dropping 120ms of audio.
+ *
+ * We're actually not sure to drop five DAB logical frames
+ * belonging to the same AAC superframe. It is assumed that no
+ * receiver will crash because of this. At least, the DAB logical frame
+ * vs. AAC superframe alignment is preserved.
+ *
+ * TODO: of course this assumption probably doesn't hold. Fix this !
+ * TODO: also, with MPEG, the above doesn't hold, so we drop five
+ * frames even though we could drop less.
+ * */
+ for (int frame_del_count = 0; frame_del_count < 5; frame_del_count++) {
+ delete[] m_frame_buffer.front();
+ m_frame_buffer.pop_front();
+ }
+ }
+ }
+
+ if (m_prebuf_current > 0) {
+ if (rc > 0)
+ m_prebuf_current--;
+ if (m_prebuf_current == 0)
+ etiLog.log(info, "inputZMQ %s input pre-buffering complete\n",
+ m_name.c_str());
+
+ /* During prebuffering, give a zeroed frame to the mux */
+ m_stats.notifyUnderrun();
+ memset(buffer, 0, size);
+ return size;
+ }
+
+ // Save stats data in bytes, not in frames
+ m_stats.notifyBuffer(m_frame_buffer.size() * size);
+
+ if (m_frame_buffer.empty()) {
+ etiLog.log(warn, "inputZMQ %s input empty, re-enabling pre-buffering\n",
+ m_name.c_str());
+ // reset prebuffering
+ m_prebuf_current = m_config.prebuffering;
+
+ /* We have no data to give, we give a zeroed frame */
+ m_stats.notifyUnderrun();
+ memset(buffer, 0, size);
+ return size;
+ }
+ else
+ {
+ /* Normal situation, give a frame from the frame_buffer */
+ uint8_t* newframe = m_frame_buffer.front();
+ memcpy(buffer, newframe, size);
+ delete[] newframe;
+ m_frame_buffer.pop_front();
+ return size;
+ }
+}
+
+
+/******** MPEG input *******/
+
+// Read a MPEG frame from the socket, and push to list
+int ZmqMPEG::readFromSocket(size_t framesize)
+{
+ bool messageReceived = false;
+ zmq::message_t msg;
+
+ try {
+ messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ if (!messageReceived) {
+ return 0;
+ }
+
+ }
+ catch (zmq::error_t& err)
+ {
+ etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " <<
+ m_name << ": " << err.what();
+ }
+
+ /* This is the old 'one superframe per ZMQ message' format */
+ uint8_t* data = (uint8_t*)msg.data();
+ size_t datalen = msg.size();
+
+ /* Look for the new zmq_frame_header_t format */
+ zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data();
+
+ if (msg.size() == ZMQ_FRAME_SIZE(frame) &&
+ frame->version == 1 &&
+ frame->encoder == ZMQ_ENCODER_TOOLAME) {
+ datalen = frame->datasize;
+ data = ZMQ_FRAME_DATA(frame);
+
+ m_stats.notifyPeakLevels(frame->audiolevel_left,
+ frame->audiolevel_right);
+ }
+
+
+ if (datalen == framesize)
+ {
+ if (m_frame_buffer.size() > m_config.buffer_size) {
+ etiLog.level(warn) <<
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
+ " dropping incoming frame !";
+ messageReceived = false;
+ }
+ else if (m_enable_input) {
+ // copy the input frame blockwise into the frame_buffer
+ auto framedata = new uint8_t[framesize];
+ memcpy(framedata, data, framesize);
+ m_frame_buffer.push_back(framedata);
+ }
+ else {
+ return 0;
+ }
+ }
+ else {
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " verify bitrate: recv'd " << msg.size() << " B" <<
+ ", need " << framesize << ".";
+ messageReceived = false;
+ }
+
+ return messageReceived ? msg.size() : 0;
+}
+
+/******** AAC+ input *******/
+
+// Read a AAC+ superframe from the socket, cut it into five frames,
+// and push to list
+int ZmqAAC::readFromSocket(size_t framesize)
+{
+ bool messageReceived;
+ zmq::message_t msg;
+
+ try {
+ messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ if (!messageReceived) {
+ return 0;
+ }
+
+ }
+ catch (zmq::error_t& err)
+ {
+ etiLog.level(error) <<
+ "Failed to receive AAC superframe from zmq socket " <<
+ m_name << ": " << err.what();
+ }
+
+ /* This is the old 'one superframe per ZMQ message' format */
+ uint8_t* data = (uint8_t*)msg.data();
+ size_t datalen = msg.size();
+
+ /* Look for the new zmq_frame_header_t format */
+ zmq_frame_header_t* frame = (zmq_frame_header_t*)msg.data();
+
+ if (msg.size() == ZMQ_FRAME_SIZE(frame) &&
+ frame->version == 1 &&
+ frame->encoder == ZMQ_ENCODER_FDK) {
+ datalen = frame->datasize;
+ data = ZMQ_FRAME_DATA(frame);
+
+ m_stats.notifyPeakLevels(frame->audiolevel_left,
+ frame->audiolevel_right);
+ }
+
+
+ /* TS 102 563, Section 6:
+ * Audio super frames are transported in five successive DAB logical frames
+ * with additional error protection.
+ */
+ if (datalen)
+ {
+ if (datalen == 5*framesize)
+ {
+ if (m_frame_buffer.size() > m_config.buffer_size) {
+ etiLog.level(warn) <<
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
+ " dropping incoming superframe !";
+ datalen = 0;
+ }
+ else if (m_enable_input) {
+ // copy the input frame blockwise into the frame_buffer
+ for (uint8_t* framestart = data;
+ framestart < &data[5*framesize];
+ framestart += framesize) {
+ auto audioframe = new uint8_t[framesize];
+ memcpy(audioframe, framestart, framesize);
+ m_frame_buffer.push_back(audioframe);
+ }
+ }
+ else {
+ datalen = 0;
+ }
+ }
+ else {
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " verify bitrate: recv'd " << msg.size() << " B" <<
+ ", need " << 5*framesize << ".";
+
+ datalen = 0;
+ }
+ }
+ else {
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " invalid frame received";
+ }
+
+ return datalen;
+}
+
+/********* REMOTE CONTROL ***********/
+
+void ZmqBase::set_parameter(const string& parameter,
+ const string& value)
+{
+ if (parameter == "buffer") {
+ size_t new_limit = atol(value.c_str());
+
+ if (new_limit < INPUT_ZMQ_MIN_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too small."
+ " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) );
+ }
+ else if (new_limit > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ throw ParameterError("Desired buffer size too large."
+ " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
+ }
+
+ m_config.buffer_size = new_limit;
+ }
+ else if (parameter == "prebuffering") {
+ size_t new_prebuf = atol(value.c_str());
+
+ if (new_prebuf < INPUT_ZMQ_MIN_BUFFER_SIZE) {
+ throw ParameterError("Desired prebuffering too small."
+ " Minimum " STRINGIFY(INPUT_ZMQ_MIN_BUFFER_SIZE) );
+ }
+ else if (new_prebuf > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ throw ParameterError("Desired prebuffering too large."
+ " Maximum " STRINGIFY(INPUT_ZMQ_MAX_BUFFER_SIZE) );
+ }
+
+ m_config.prebuffering = new_prebuf;
+ }
+ else if (parameter == "enable") {
+ if (value == "1") {
+ m_enable_input = true;
+ }
+ else if (value == "0") {
+ m_enable_input = false;
+ }
+ else {
+ throw ParameterError("Value not understood, specify 0 or 1.");
+ }
+ }
+ else if (parameter == "encryption") {
+ if (value == "1") {
+ m_config.enable_encryption = true;
+ }
+ else if (value == "0") {
+ m_config.enable_encryption = false;
+ }
+ else {
+ throw ParameterError("Value not understood, specify 0 or 1.");
+ }
+
+ try {
+ rebind();
+ }
+ catch (std::runtime_error &e) {
+ stringstream ss;
+ ss << "Could not bind socket again with new keys." <<
+ e.what();
+ throw ParameterError(ss.str());
+ }
+ }
+ else if (parameter == "secretkey") {
+ m_config.curve_secret_keyfile = value;
+ }
+ else if (parameter == "publickey") {
+ m_config.curve_public_keyfile = value;
+ }
+ else if (parameter == "encoderkey") {
+ m_config.curve_encoder_keyfile = value;
+ }
+ else {
+ stringstream ss;
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+}
+
+const string ZmqBase::get_parameter(const string& parameter) const
+{
+ stringstream ss;
+ if (parameter == "buffer") {
+ ss << m_config.buffer_size;
+ }
+ else if (parameter == "prebuffering") {
+ ss << m_config.prebuffering;
+ }
+ else if (parameter == "enable") {
+ if (m_enable_input)
+ ss << "true";
+ else
+ ss << "false";
+ }
+ else if (parameter == "encryption") {
+ if (m_config.enable_encryption)
+ ss << "true";
+ else
+ ss << "false";
+ }
+ else if (parameter == "secretkey") {
+ ss << m_config.curve_secret_keyfile;
+ }
+ else if (parameter == "publickey") {
+ ss << m_config.curve_public_keyfile;
+ }
+ else if (parameter == "encoderkey") {
+ ss << m_config.curve_encoder_keyfile;
+ }
+ else {
+ ss << "Parameter '" << parameter <<
+ "' is not exported by controllable " << get_rc_name();
+ throw ParameterError(ss.str());
+ }
+ return ss.str();
+
+}
+
+};
+
diff --git a/src/input/Zmq.h b/src/input/Zmq.h
new file mode 100644
index 0000000..8d729e0
--- /dev/null
+++ b/src/input/Zmq.h
@@ -0,0 +1,264 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+
+ Copyright (C) 2016 Matthias P. Braendli
+ http://www.opendigitalradio.org
+
+ ZeroMQ input. see www.zeromq.org for more info
+
+ For the AAC+ input, each zeromq message must contain one superframe,
+ or one zmq_frame_header_t followed by a superframe.
+
+ For the MPEG input, each zeromq message must contain one frame.
+
+ Encryption is provided by zmq_curve, see the corresponding manpage.
+
+ From the ZeroMQ manpage 'zmq':
+
+ The 0MQ lightweight messaging kernel is a library which extends the standard
+ socket interfaces with features traditionally provided by specialised
+ messaging middleware products. 0MQ sockets provide an abstraction of
+ asynchronous message queues, multiple messaging patterns, message filtering
+ (subscriptions), seamless access to multiple transport protocols and more.
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ It defines a ZeroMQ input for dabplus data.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#include <list>
+#include <string>
+#include <stdint.h>
+#include "zmq.hpp"
+#include "input/inputs.h"
+#include "ManagementServer.h"
+
+namespace Inputs {
+
+/* The frame_buffer contains DAB logical frames as defined in
+ * TS 102 563, section 6.
+ * Five elements of this buffer make one AAC superframe (120ms audio)
+ */
+
+// Minimum frame_buffer size in number of elements
+// This is one AAC superframe, but you probably don't want to
+// go that low anyway.
+const size_t INPUT_ZMQ_MIN_BUFFER_SIZE = 5*1; // 120ms
+
+// Maximum frame_buffer size in number of elements
+// One minute is clearly way over what everybody would
+// want.
+const size_t INPUT_ZMQ_MAX_BUFFER_SIZE = 5*500; // 60s
+
+/* The ZeroMQ Curve key is 40 bytes long in Z85 representation
+ *
+ * But we need to store it as zero-terminated string.
+ */
+const size_t CURVE_KEYLEN = 40;
+
+/* helper to invalidate a key */
+#define INVALIDATE_KEY(k) memset(k, 0, CURVE_KEYLEN+1)
+
+/* Verification for key validity */
+#define KEY_VALID(k) (k[0] != '\0')
+
+/* Read a key from file into key
+ *
+ * Returns 0 on success, negative value on failure
+ */
+int readkey(std::string& keyfile, char* key);
+
+struct dab_input_zmq_config_t
+{
+ /* The size of the internal buffer, measured in number
+ * of elements.
+ *
+ * Each element corresponds to five frames,
+ * or one AAC superframe.
+ */
+ size_t buffer_size;
+
+ /* The amount of prebuffering to do before we start streaming
+ *
+ * Same units as buffer_size
+ */
+ size_t prebuffering;
+
+ /* Whether to enforce encryption or not
+ */
+ bool enable_encryption;
+
+ /* Full path to file containing public key.
+ */
+ std::string curve_public_keyfile;
+
+ /* Full path to file containing secret key.
+ */
+ std::string curve_secret_keyfile;
+
+ /* Full path to file containing encoder public key.
+ */
+ std::string curve_encoder_keyfile;
+};
+
+#define ZMQ_ENCODER_FDK 1
+#define ZMQ_ENCODER_TOOLAME 2
+
+/* This defines the on-wire representation of a ZMQ message header.
+ *
+ * The data follows right after this header */
+struct zmq_frame_header_t
+{
+ uint16_t version; // we support version=1 now
+ uint16_t encoder; // see ZMQ_ENCODER_XYZ
+
+ /* length of the 'data' field */
+ uint32_t datasize;
+
+ /* Audio level, peak, linear PCM */
+ int16_t audiolevel_left;
+ int16_t audiolevel_right;
+
+ /* Data follows this header */
+} __attribute__ ((packed));
+
+/* The expected frame size incl data of the given frame */
+#define ZMQ_FRAME_SIZE(f) (sizeof(zmq_frame_header_t) + f->datasize)
+
+#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(zmq_frame_header_t) )
+
+
+class ZmqBase : public InputBase, public RemoteControllable {
+ public:
+ ZmqBase(const std::string name,
+ dab_input_zmq_config_t config)
+ : RemoteControllable(name),
+ m_zmq_context(1),
+ m_zmq_sock(m_zmq_context, ZMQ_SUB),
+ m_zmq_sock_bound_to(""),
+ m_bitrate(0),
+ m_enable_input(true),
+ m_config(config),
+ m_stats(m_name),
+ m_prebuf_current(config.prebuffering) {
+ RC_ADD_PARAMETER(enable,
+ "If the input is enabled. Set to zero to empty the buffer.");
+
+ RC_ADD_PARAMETER(encryption,
+ "If encryption is enabled or disabled [1 or 0]."
+ " If 1 is written, the keys are reloaded.");
+
+ RC_ADD_PARAMETER(publickey,
+ "The multiplexer's public key file.");
+
+ RC_ADD_PARAMETER(secretkey,
+ "The multiplexer's secret key file.");
+
+ RC_ADD_PARAMETER(encoderkey,
+ "The encoder's public key file.");
+
+ /* Set all keys to zero */
+ INVALIDATE_KEY(m_curve_public_key);
+ INVALIDATE_KEY(m_curve_secret_key);
+ INVALIDATE_KEY(m_curve_encoder_key);
+ }
+
+ virtual int open(const std::string& inputUri);
+ virtual int readFrame(uint8_t* buffer, size_t size);
+ virtual int setBitrate(int bitrate);
+ virtual int close();
+
+ /* Remote control */
+ virtual void set_parameter(const std::string& parameter,
+ const std::string& value);
+
+ /* Getting a parameter always returns a string. */
+ virtual const std::string get_parameter(const std::string& parameter) const;
+
+ protected:
+ virtual int readFromSocket(size_t framesize) = 0;
+
+ virtual void rebind();
+
+ zmq::context_t m_zmq_context;
+ zmq::socket_t m_zmq_sock; // handle for the zmq socket
+
+ /* If the socket is bound, this saves the endpoint,
+ * otherwise, it's an empty string
+ */
+ std::string m_zmq_sock_bound_to;
+ int m_bitrate;
+
+ /* set this to zero to empty the input buffer */
+ bool m_enable_input;
+
+ /* stores elements of type char[<superframesize>] */
+ std::list<uint8_t*> m_frame_buffer;
+
+ dab_input_zmq_config_t m_config;
+
+ /* Key management, keys need to be zero-terminated */
+ char m_curve_public_key[CURVE_KEYLEN+1];
+ char m_curve_secret_key[CURVE_KEYLEN+1];
+ char m_curve_encoder_key[CURVE_KEYLEN+1];
+
+ std::string m_inputUri;
+
+ InputStat m_stats;
+
+ private:
+ size_t m_prebuf_current;
+};
+
+class ZmqMPEG : public ZmqBase {
+ public:
+ ZmqMPEG(const std::string name,
+ dab_input_zmq_config_t config)
+ : ZmqBase(name, config) {
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [mpeg frames]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [mpeg frames]");
+ }
+
+ private:
+ virtual int readFromSocket(size_t framesize);
+};
+
+class ZmqAAC : public ZmqBase {
+ public:
+ ZmqAAC(const std::string name,
+ dab_input_zmq_config_t config)
+ : ZmqBase(name, config) {
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [aac superframes]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [aac superframes]");
+ }
+
+ private:
+ virtual int readFromSocket(size_t framesize);
+};
+
+};
+
+
diff --git a/src/input/inputs.h b/src/input/inputs.h
new file mode 100644
index 0000000..bfb1fb6
--- /dev/null
+++ b/src/input/inputs.h
@@ -0,0 +1,52 @@
+/*
+ Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMux.
+
+ ODR-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+#include "Log.h"
+#include "RemoteControl.h"
+#include <string>
+
+namespace Inputs {
+
+/* New input object base */
+class InputBase {
+ public:
+ virtual int open(const std::string& name) = 0;
+ virtual int readFrame(uint8_t* buffer, size_t size) = 0;
+ virtual int setBitrate(int bitrate) = 0;
+ virtual int close() = 0;
+
+ virtual ~InputBase() {}
+ protected:
+ InputBase() {}
+};
+
+};
+