aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Makefile.am1
-rw-r--r--doc/example.ini4
-rw-r--r--src/DabMod.cpp14
-rw-r--r--src/InputReader.h44
-rw-r--r--src/InputTcpReader.cpp122
5 files changed, 180 insertions, 5 deletions
diff --git a/Makefile.am b/Makefile.am
index 90fc577..31ceb1d 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -85,6 +85,7 @@ odr_dabmod_SOURCES = src/DabMod.cpp \
src/InputMemory.cpp \
src/InputMemory.h \
src/InputFileReader.cpp \
+ src/InputTcpReader.cpp \
src/InputZeroMQReader.cpp \
src/InputReader.h \
src/OutputFile.cpp \
diff --git a/doc/example.ini b/doc/example.ini
index a2de463..b5fce01 100644
--- a/doc/example.ini
+++ b/doc/example.ini
@@ -53,6 +53,10 @@ loop=0
; that can be in the input queue
;max_frames_queued=100
+; ETI-over-TCP example:
+;transport=tcp
+;source=localhost:9200
+
[modulator]
; Gain mode: 0=FIX, 1=MAX, 2=VAR
;
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 6f35e22..904c3c8 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -178,6 +178,8 @@ int launch_modulator(int argc, char* argv[])
auto inputZeroMQReader = make_shared<InputZeroMQReader>();
#endif
+ auto inputTcpReader = make_shared<InputTcpReader>();
+
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = &signalHandler;
@@ -616,6 +618,9 @@ int launch_modulator(int argc, char* argv[])
// if the name starts with zmq+XYZ://somewhere:port
inputTransport = "zeromq";
}
+ else if (inputName.substr(0, 6) == "tcp://") {
+ inputTransport = "tcp";
+ }
}
else {
inputName = "/dev/stdin";
@@ -705,6 +710,10 @@ int launch_modulator(int argc, char* argv[])
m.inputReader = inputZeroMQReader.get();
#endif
}
+ else if (inputTransport == "tcp") {
+ inputTcpReader->Open(inputName);
+ m.inputReader = inputTcpReader.get();
+ }
else
{
fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str());
@@ -816,6 +825,11 @@ int launch_modulator(int argc, char* argv[])
m.inputReader = inputZeroMQReader.get();
#endif
}
+ else if (inputTransport == "tcp") {
+ inputTcpReader = make_shared<InputTcpReader>();
+ inputTcpReader->Open(inputName);
+ m.inputReader = inputTcpReader.get();
+ }
break;
case run_modulator_state_t::reconfigure:
etiLog.level(warn) << "Detected change in ensemble configuration.";
diff --git a/src/InputReader.h b/src/InputReader.h
index daacc9e..4d0792b 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -3,8 +3,10 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyrigth (C) 2013, 2015
+ Copyright (C) 2016
Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
*/
/*
This file is part of ODR-DabMod.
@@ -39,6 +41,14 @@
#endif
#include "porting.h"
#include "Log.h"
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <arpa/inet.h>
+#define SOCKET int
+#define INVALID_SOCKET -1
+#define SOCKET_ERROR -1
/* Known types of input streams. Description taken from the CRC mmbTools forum.
@@ -115,8 +125,8 @@ class InputFileReader : public InputReader
}
private:
- InputFileReader(const InputFileReader& other);
- InputFileReader& operator=(const InputFileReader& other);
+ InputFileReader(const InputFileReader& other) = delete;
+ InputFileReader& operator=(const InputFileReader& other) = delete;
int IdentifyType();
@@ -134,6 +144,30 @@ class InputFileReader : public InputReader
// after 2**32 * 24ms ~= 3.3 years
};
+class InputTcpReader : public InputReader
+{
+ public:
+ InputTcpReader();
+ ~InputTcpReader();
+
+ // Endpoint is either host:port or tcp://host:port
+ void Open(const std::string& endpoint);
+
+ // Put next frame into buffer. This function will never write more than
+ // 6144 bytes into buffer.
+ // returns number of bytes written to buffer, 0 on eof, -1 on error
+ virtual int GetNextFrame(void* buffer);
+
+ // Print some information
+ virtual void PrintInfo();
+
+ private:
+ InputTcpReader(const InputTcpReader& other) = delete;
+ InputTcpReader& operator=(const InputTcpReader& other) = delete;
+ SOCKET m_sock;
+ std::string m_uri;
+};
+
struct zmq_input_overflow : public std::exception
{
const char* what () const throw ()
@@ -201,8 +235,8 @@ class InputZeroMQReader : public InputReader
void PrintInfo();
private:
- InputZeroMQReader(const InputZeroMQReader& other);
- InputZeroMQReader& operator=(const InputZeroMQReader& other);
+ InputZeroMQReader(const InputZeroMQReader& other) = delete;
+ InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
std::string uri_;
InputZeroMQWorker worker_;
diff --git a/src/InputTcpReader.cpp b/src/InputTcpReader.cpp
new file mode 100644
index 0000000..34ccb2e
--- /dev/null
+++ b/src/InputTcpReader.cpp
@@ -0,0 +1,122 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Copyright (C) 2016
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This file is part of ODR-DabMod.
+
+ ODR-DabMod is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ ODR-DabMod is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with ODR-DabMod. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "InputReader.h"
+#include "PcDebug.h"
+#include "Utils.h"
+
+InputTcpReader::InputTcpReader()
+{
+ if ((m_sock = socket(PF_INET, SOCK_STREAM, 0)) == INVALID_SOCKET) {
+ throw std::runtime_error("Can't create TCP socket");
+ }
+}
+
+InputTcpReader::~InputTcpReader()
+{
+ if (m_sock != INVALID_SOCKET) {
+ close(m_sock);
+ }
+}
+
+void InputTcpReader::Open(const std::string& endpoint)
+{
+ std::string hostname;
+ if (endpoint.compare(0, 6, "tcp://") == 0) {
+ hostname = endpoint.substr(6, std::string::npos);
+ }
+ else {
+ hostname = endpoint;
+ }
+
+ size_t colon_pos = hostname.find(":");
+ if (colon_pos == std::string::npos) {
+ std::stringstream ss;
+ ss << "Could not parse TCP endpoint " << endpoint;
+ throw std::runtime_error(ss.str());
+ }
+
+ long port = strtol(hostname.c_str() + colon_pos + 1, NULL, 10);
+ if (errno == ERANGE) {
+ std::stringstream ss;
+ ss << "Could not parse port in TCP endpoint " << endpoint;
+ throw std::runtime_error(ss.str());
+ }
+
+ hostname = hostname.substr(0, colon_pos);
+
+ struct sockaddr_in addr;
+ addr.sin_family = PF_INET;
+ addr.sin_addr.s_addr = htons(INADDR_ANY);
+ addr.sin_port = htons(port);
+
+ hostent *host = gethostbyname(hostname.c_str());
+ if (host) {
+ addr.sin_addr = *(in_addr *)(host->h_addr);
+ }
+ else {
+ std::stringstream ss;
+ ss << "Could not resolve hostname " << hostname << ": " << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+
+ if (connect(m_sock, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
+ std::stringstream ss;
+ ss << "Could not connect to " << hostname << ":" << port << " :" << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+
+ m_uri = endpoint;
+}
+
+int InputTcpReader::GetNextFrame(void* buffer)
+{
+ uint8_t* buf = (uint8_t*)buffer;
+
+ const size_t framesize = 6144;
+
+ ssize_t r = recv(m_sock, buf, framesize, MSG_WAITALL);
+
+ if (r == -1) {
+ std::stringstream ss;
+ ss << "Could not receive from socket :" << strerror(errno);
+ throw std::runtime_error(ss.str());
+ }
+
+ return r;
+}
+
+void InputTcpReader::PrintInfo()
+{
+ fprintf(stderr, "Input TCP:\n");
+ fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str());
+}
+