diff options
-rw-r--r-- | Makefile.am | 1 | ||||
-rw-r--r-- | doc/example.ini | 4 | ||||
-rw-r--r-- | src/DabMod.cpp | 14 | ||||
-rw-r--r-- | src/InputReader.h | 44 | ||||
-rw-r--r-- | src/InputTcpReader.cpp | 122 |
5 files changed, 180 insertions, 5 deletions
diff --git a/Makefile.am b/Makefile.am index 90fc577..31ceb1d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -85,6 +85,7 @@ odr_dabmod_SOURCES = src/DabMod.cpp \ src/InputMemory.cpp \ src/InputMemory.h \ src/InputFileReader.cpp \ + src/InputTcpReader.cpp \ src/InputZeroMQReader.cpp \ src/InputReader.h \ src/OutputFile.cpp \ diff --git a/doc/example.ini b/doc/example.ini index a2de463..b5fce01 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -53,6 +53,10 @@ loop=0 ; that can be in the input queue ;max_frames_queued=100 +; ETI-over-TCP example: +;transport=tcp +;source=localhost:9200 + [modulator] ; Gain mode: 0=FIX, 1=MAX, 2=VAR ; diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 6f35e22..904c3c8 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -178,6 +178,8 @@ int launch_modulator(int argc, char* argv[]) auto inputZeroMQReader = make_shared<InputZeroMQReader>(); #endif + auto inputTcpReader = make_shared<InputTcpReader>(); + struct sigaction sa; memset(&sa, 0, sizeof(struct sigaction)); sa.sa_handler = &signalHandler; @@ -616,6 +618,9 @@ int launch_modulator(int argc, char* argv[]) // if the name starts with zmq+XYZ://somewhere:port inputTransport = "zeromq"; } + else if (inputName.substr(0, 6) == "tcp://") { + inputTransport = "tcp"; + } } else { inputName = "/dev/stdin"; @@ -705,6 +710,10 @@ int launch_modulator(int argc, char* argv[]) m.inputReader = inputZeroMQReader.get(); #endif } + else if (inputTransport == "tcp") { + inputTcpReader->Open(inputName); + m.inputReader = inputTcpReader.get(); + } else { fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str()); @@ -816,6 +825,11 @@ int launch_modulator(int argc, char* argv[]) m.inputReader = inputZeroMQReader.get(); #endif } + else if (inputTransport == "tcp") { + inputTcpReader = make_shared<InputTcpReader>(); + inputTcpReader->Open(inputName); + m.inputReader = inputTcpReader.get(); + } break; case run_modulator_state_t::reconfigure: etiLog.level(warn) << "Detected change in ensemble configuration."; diff --git a/src/InputReader.h b/src/InputReader.h index daacc9e..4d0792b 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,8 +3,10 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyrigth (C) 2013, 2015 + Copyright (C) 2016 Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org */ /* This file is part of ODR-DabMod. @@ -39,6 +41,14 @@ #endif #include "porting.h" #include "Log.h" +#include <sys/socket.h> +#include <netinet/in.h> +#include <unistd.h> +#include <netdb.h> +#include <arpa/inet.h> +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 /* Known types of input streams. Description taken from the CRC mmbTools forum. @@ -115,8 +125,8 @@ class InputFileReader : public InputReader } private: - InputFileReader(const InputFileReader& other); - InputFileReader& operator=(const InputFileReader& other); + InputFileReader(const InputFileReader& other) = delete; + InputFileReader& operator=(const InputFileReader& other) = delete; int IdentifyType(); @@ -134,6 +144,30 @@ class InputFileReader : public InputReader // after 2**32 * 24ms ~= 3.3 years }; +class InputTcpReader : public InputReader +{ + public: + InputTcpReader(); + ~InputTcpReader(); + + // Endpoint is either host:port or tcp://host:port + void Open(const std::string& endpoint); + + // Put next frame into buffer. This function will never write more than + // 6144 bytes into buffer. + // returns number of bytes written to buffer, 0 on eof, -1 on error + virtual int GetNextFrame(void* buffer); + + // Print some information + virtual void PrintInfo(); + + private: + InputTcpReader(const InputTcpReader& other) = delete; + InputTcpReader& operator=(const InputTcpReader& other) = delete; + SOCKET m_sock; + std::string m_uri; +}; + struct zmq_input_overflow : public std::exception { const char* what () const throw () @@ -201,8 +235,8 @@ class InputZeroMQReader : public InputReader void PrintInfo(); private: - InputZeroMQReader(const InputZeroMQReader& other); - InputZeroMQReader& operator=(const InputZeroMQReader& other); + InputZeroMQReader(const InputZeroMQReader& other) = delete; + InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; std::string uri_; InputZeroMQWorker worker_; diff --git a/src/InputTcpReader.cpp b/src/InputTcpReader.cpp new file mode 100644 index 0000000..34ccb2e --- /dev/null +++ b/src/InputTcpReader.cpp @@ -0,0 +1,122 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 + Her Majesty the Queen in Right of Canada (Communications Research + Center Canada) + + Copyright (C) 2016 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://www.opendigitalradio.org + */ +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "InputReader.h" +#include "PcDebug.h" +#include "Utils.h" + +InputTcpReader::InputTcpReader() +{ + if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) { + throw std::runtime_error("Can't create TCP socket"); + } +} + +InputTcpReader::~InputTcpReader() +{ + if (m_sock != INVALID_SOCKET) { + close(m_sock); + } +} + +void InputTcpReader::Open(const std::string& endpoint) +{ + std::string hostname; + if (endpoint.compare(0, 6, "tcp://") == 0) { + hostname = endpoint.substr(6, std::string::npos); + } + else { + hostname = endpoint; + } + + size_t colon_pos = hostname.find(":"); + if (colon_pos == std::string::npos) { + std::stringstream ss; + ss << "Could not parse TCP endpoint " << endpoint; + throw std::runtime_error(ss.str()); + } + + long port = strtol(hostname.c_str() + colon_pos + 1, NULL, 10); + if (errno == ERANGE) { + std::stringstream ss; + ss << "Could not parse port in TCP endpoint " << endpoint; + throw std::runtime_error(ss.str()); + } + + hostname = hostname.substr(0, colon_pos); + + struct sockaddr_in addr; + addr.sin_family = PF_INET; + addr.sin_addr.s_addr = htons(INADDR_ANY); + addr.sin_port = htons(port); + + hostent *host = gethostbyname(hostname.c_str()); + if (host) { + addr.sin_addr = *(in_addr *)(host->h_addr); + } + else { + std::stringstream ss; + ss << "Could not resolve hostname " << hostname << ": " << strerror(errno); + throw std::runtime_error(ss.str()); + } + + if (connect(m_sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) { + std::stringstream ss; + ss << "Could not connect to " << hostname << ":" << port << " :" << strerror(errno); + throw std::runtime_error(ss.str()); + } + + m_uri = endpoint; +} + +int InputTcpReader::GetNextFrame(void* buffer) +{ + uint8_t* buf = (uint8_t*)buffer; + + const size_t framesize = 6144; + + ssize_t r = recv(m_sock, buf, framesize, MSG_WAITALL); + + if (r == -1) { + std::stringstream ss; + ss << "Could not receive from socket :" << strerror(errno); + throw std::runtime_error(ss.str()); + } + + return r; +} + +void InputTcpReader::PrintInfo() +{ + fprintf(stderr, "Input TCP:\n"); + fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str()); +} + |