aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli (think) <matthias@mpb.li>2013-11-10 21:50:12 +0100
committerMatthias P. Braendli (think) <matthias@mpb.li>2013-11-10 21:50:12 +0100
commit5d965e80be2e6ab62bc82fb2e0d4d472153ad241 (patch)
tree5add36f337b0de524b3d098f0b1fcc8d68aba0d7
parent4f9a01a80570437b86e69eb0542b13df9a20743d (diff)
downloaddabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.gz
dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.bz2
dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.zip
crc-dabmod: add ZeroMQ input module
-rw-r--r--config.h.in6
-rwxr-xr-xconfigure82
-rw-r--r--configure.ac11
-rw-r--r--doc/example.ini14
-rw-r--r--src/DabMod.cpp79
-rw-r--r--src/FIRFilter.h51
-rw-r--r--src/InputFileReader.cpp47
-rw-r--r--src/InputReader.h76
-rw-r--r--src/InputZeroMQReader.cpp135
-rw-r--r--src/Makefile.am9
-rw-r--r--src/Makefile.in24
-rw-r--r--src/OutputFile.cpp9
-rw-r--r--src/OutputFile.h5
-rw-r--r--src/ThreadsafeQueue.h100
14 files changed, 545 insertions, 103 deletions
diff --git a/config.h.in b/config.h.in
index 7971a35..89c993d 100644
--- a/config.h.in
+++ b/config.h.in
@@ -22,6 +22,9 @@
/* Define to 1 if you have the `gettimeofday' function. */
#undef HAVE_GETTIMEOFDAY
+/* Define if ZeroMQ input is enabled */
+#undef HAVE_INPUT_ZEROMQ
+
/* Define to 1 if you have the <inttypes.h> header file. */
#undef HAVE_INTTYPES_H
@@ -43,6 +46,9 @@
/* Define to 1 if you have the `uhd' library (-luhd). */
#undef HAVE_LIBUHD
+/* Define to 1 if you have the `zmq' library (-lzmq). */
+#undef HAVE_LIBZMQ
+
/* Define to 1 if you have the <limits.h> header file. */
#undef HAVE_LIMITS_H
diff --git a/configure b/configure
index e2a834b..0f10f2c 100755
--- a/configure
+++ b/configure
@@ -632,6 +632,8 @@ EGREP
GREP
BOOST_LDFLAGS
BOOST_CPPFLAGS
+HAVE_INPUT_ZEROMQ_TEST_FALSE
+HAVE_INPUT_ZEROMQ_TEST_TRUE
CPP
am__fastdepCC_FALSE
am__fastdepCC_TRUE
@@ -743,6 +745,7 @@ enable_prof
with_debug_malloc
enable_trace
enable_fft_simd
+enable_input_zeromq
with_boost
with_boost_libdir
'
@@ -1387,6 +1390,7 @@ Optional Features:
--enable-prof Enable profiling
--enable-trace Enable trace output
--enable-fft-simd Enable SIMD instructions for kiss-fft (unstable)
+ --disable-input-zeromq Disable ZeroMQ input
Optional Packages:
--with-PACKAGE[=ARG] use PACKAGE [ARG=yes]
@@ -4769,6 +4773,69 @@ else
enable_fft_simd=no
fi
+# ZeroMQ message queue input
+# Check whether --enable-input_zeromq was given.
+if test "${enable_input_zeromq+set}" = set; then :
+ enableval=$enable_input_zeromq;
+else
+ enable_input_zeromq=yes
+fi
+
+
+
+if test "x$enable_input_zeromq" = "xyes"; then :
+
+$as_echo "#define HAVE_INPUT_ZEROMQ 1" >>confdefs.h
+
+else
+ { $as_echo "$as_me:${as_lineno-$LINENO}: checking for zmq_init in -lzmq" >&5
+$as_echo_n "checking for zmq_init in -lzmq... " >&6; }
+if ${ac_cv_lib_zmq_zmq_init+:} false; then :
+ $as_echo_n "(cached) " >&6
+else
+ ac_check_lib_save_LIBS=$LIBS
+LIBS="-lzmq $LIBS"
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h. */
+
+/* Override any GCC internal prototype to avoid an error.
+ Use char because int might match the return type of a GCC
+ builtin and then its argument prototype would still apply. */
+#ifdef __cplusplus
+extern "C"
+#endif
+char zmq_init ();
+int
+main ()
+{
+return zmq_init ();
+ ;
+ return 0;
+}
+_ACEOF
+if ac_fn_c_try_link "$LINENO"; then :
+ ac_cv_lib_zmq_zmq_init=yes
+else
+ ac_cv_lib_zmq_zmq_init=no
+fi
+rm -f core conftest.err conftest.$ac_objext \
+ conftest$ac_exeext conftest.$ac_ext
+LIBS=$ac_check_lib_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zmq_zmq_init" >&5
+$as_echo "$ac_cv_lib_zmq_zmq_init" >&6; }
+if test "x$ac_cv_lib_zmq_zmq_init" = xyes; then :
+ cat >>confdefs.h <<_ACEOF
+#define HAVE_LIBZMQ 1
+_ACEOF
+
+ LIBS="-lzmq $LIBS"
+
+else
+ as_fn_error $? "ZeroMQ libzmq is required" "$LINENO" 5
+fi
+
+fi
if test "x$enable_debug" = "xno"; then :
OPTIM="-O2" DEBUG="" EXTRA="$EXTRA -DNDEBUG"
else
@@ -4784,12 +4851,21 @@ if test "x$enable_fft_simd" != "xno"; then :
EXTRA="$EXTRA -DUSE_SIMD"
fi
+ if test "x$enable_input_zeromq" = "xyes"; then
+ HAVE_INPUT_ZEROMQ_TEST_TRUE=
+ HAVE_INPUT_ZEROMQ_TEST_FALSE='#'
+else
+ HAVE_INPUT_ZEROMQ_TEST_TRUE='#'
+ HAVE_INPUT_ZEROMQ_TEST_FALSE=
+fi
+
+
+
CFLAGS="$OPTIM $DEBUG $EXTRA"
CXXFLAGS="$OPTIM $DEBUG $EXTRA"
-
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for main in -luhd" >&5
$as_echo_n "checking for main in -luhd... " >&6; }
if ${ac_cv_lib_uhd_main+:} false; then :
@@ -6475,6 +6551,10 @@ if test -z "${am__fastdepCC_TRUE}" && test -z "${am__fastdepCC_FALSE}"; then
as_fn_error $? "conditional \"am__fastdepCC\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
fi
+if test -z "${HAVE_INPUT_ZEROMQ_TEST_TRUE}" && test -z "${HAVE_INPUT_ZEROMQ_TEST_FALSE}"; then
+ as_fn_error $? "conditional \"HAVE_INPUT_ZEROMQ_TEST\" was never defined.
+Usually this means the macro was only invoked conditionally." "$LINENO" 5
+fi
if test -z "${IS_HG_REPO_TRUE}" && test -z "${IS_HG_REPO_FALSE}"; then
as_fn_error $? "conditional \"IS_HG_REPO\" was never defined.
Usually this means the macro was only invoked conditionally." "$LINENO" 5
diff --git a/configure.ac b/configure.ac
index 4b41729..76845d0 100644
--- a/configure.ac
+++ b/configure.ac
@@ -61,6 +61,14 @@ AC_ARG_ENABLE([trace],
AC_ARG_ENABLE([fft_simd],
[AS_HELP_STRING([--enable-fft-simd], [Enable SIMD instructions for kiss-fft (unstable)])],
[], [enable_fft_simd=no])
+# ZeroMQ message queue input
+AC_ARG_ENABLE([input_zeromq],
+ [AS_HELP_STRING([--disable-input-zeromq], [Disable ZeroMQ input])],
+ [], [enable_input_zeromq=yes])
+
+AS_IF([test "x$enable_input_zeromq" = "xyes"],
+ [AC_DEFINE(HAVE_INPUT_ZEROMQ, [1], [Define if ZeroMQ input is enabled])],
+ [AC_CHECK_LIB(zmq, zmq_init, ,[AC_MSG_ERROR([ZeroMQ libzmq is required])])])
AS_IF([test "x$enable_debug" = "xno"],
[OPTIM="-O2" DEBUG="" EXTRA="$EXTRA -DNDEBUG"],
[OPTIM="-O0" DEBUG="-ggdb" EXTRA="$EXTRA -Wall"])
@@ -71,6 +79,9 @@ AS_IF([test "x$enable_trace" != "xno"],
AS_IF([test "x$enable_fft_simd" != "xno"],
[EXTRA="$EXTRA -DUSE_SIMD"])
+AM_CONDITIONAL([HAVE_INPUT_ZEROMQ_TEST], [test "x$enable_input_zeromq" = "xyes"])
+
+
AC_SUBST([CFLAGS], ["$OPTIM $DEBUG $EXTRA"])
AC_SUBST([CXXFLAGS], ["$OPTIM $DEBUG $EXTRA"])
diff --git a/doc/example.ini b/doc/example.ini
index f76b7ad..8f61b24 100644
--- a/doc/example.ini
+++ b/doc/example.ini
@@ -1,16 +1,28 @@
; Sample configuration file for CRC-DABMOD
[remotecontrol]
+; enable the telnet remote control on localhost:2121
+; Since this is totally unsecure telnet, the software
+; will only listen on the local loopback interface.
+; To get secure remote access, use SSH port forwarding
telnet=1
telnetport=2121
[log]
+; Write to a logfile or to syslog.
+; Setting filename to stderr is very useful during tests and development
syslog=0
filelog=1
filename=/dev/stderr
[input]
-filename=/dev/stdin
+; A file or fifo input is using transport=file
+transport=file
+source=/dev/stdin
+
+; When recieving data using ZeroMQ, the source is the URI to be used
+;transport=zeromq
+;source=tcp://localhost:8080
loop=1
[modulator]
diff --git a/src/DabMod.cpp b/src/DabMod.cpp
index 64e557c..1e8937d 100644
--- a/src/DabMod.cpp
+++ b/src/DabMod.cpp
@@ -153,8 +153,9 @@ int main(int argc, char* argv[])
int ret = 0;
bool loop = false;
std::string inputName = "";
+ std::string inputTransport = "file";
- const char* outputName;
+ std::string outputName;
int useFileOutput = 0;
int useUHDOutput = 0;
@@ -190,6 +191,10 @@ int main(int argc, char* argv[])
Logger logger;
InputFileReader inputFileReader(logger);
+#if defined(HAVE_INPUT_ZEROMQ)
+ InputZeroMQReader inputZeroMQReader(logger);
+#endif
+ InputReader* inputReader;
signal(SIGINT, signalHandler);
@@ -206,7 +211,7 @@ int main(int argc, char* argv[])
if (c != 'C') {
use_configuration_cmdline = true;
}
-
+
switch (c) {
case 'C':
use_configuration_file = true;
@@ -335,7 +340,8 @@ int main(int argc, char* argv[])
loop = true;
}
- inputName = pt.get("input.filename", "/dev/stdin");
+ inputTransport = pt.get("input.transport", "file");
+ inputName = pt.get("input.source", "/dev/stdin");
// log parameters:
if (pt.get("log.syslog", 0) == 1) {
@@ -391,7 +397,7 @@ int main(int argc, char* argv[])
if (output_selected == "file") {
try {
- outputName = pt.get<std::string>("fileoutput.filename").c_str();
+ outputName = pt.get<std::string>("fileoutput.filename");
}
catch (std::exception &e) {
std::cerr << "Error: " << e.what() << "\n";
@@ -477,15 +483,22 @@ int main(int argc, char* argv[])
modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY;
}
-
// Setting ETI input filename
if (inputName == "") {
if (optind < argc) {
inputName = argv[optind++];
- } else {
+
+ if (inputName.substr(0, 4) == "zmq+" &&
+ inputName.find("://") != std::string::npos) {
+ // if the name starts with zmq+XYZ://somewhere:port
+ inputTransport = "zeromq";
+ }
+ }
+ else {
inputName = "/dev/stdin";
}
}
+
// Checking unused arguments
if (optind != argc) {
fprintf(stderr, "Invalid arguments:");
@@ -507,13 +520,14 @@ int main(int argc, char* argv[])
// Print settings
fprintf(stderr, "Input\n");
- fprintf(stderr, " Name: %s\n", inputName.c_str());
+ fprintf(stderr, " Type: %s\n", inputTransport.c_str());
+ fprintf(stderr, " Source: %s\n", inputName.c_str());
fprintf(stderr, "Output\n");
if (useUHDOutput) {
fprintf(stderr, " UHD, Device: %s\n", outputuhd_conf.device);
}
else if (useFileOutput) {
- fprintf(stderr, " Name: %s\n", outputName);
+ fprintf(stderr, " Name: %s\n", outputName.c_str());
}
fprintf(stderr, " Sampling rate: ");
if (outputRate > 1000) {
@@ -526,20 +540,39 @@ int main(int argc, char* argv[])
fprintf(stderr, "%zu Hz\n", outputRate);
}
- // Opening ETI input file
- if (inputFileReader.Open(inputName) == -1) {
- fprintf(stderr, "Unable to open input file!\n");
- logger.level(error) << "Unable to open input file!";
+ if (inputTransport == "file") {
+ // Opening ETI input file
+ if (inputFileReader.Open(inputName, loop) == -1) {
+ fprintf(stderr, "Unable to open input file!\n");
+ logger.level(error) << "Unable to open input file!";
+ ret = -1;
+ goto END_MAIN;
+ }
+
+ inputReader = &inputFileReader;
+ }
+ else if (inputTransport == "zeromq") {
+#if !defined(HAVE_INPUT_ZEROMQ)
+ fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n");
ret = -1;
goto END_MAIN;
+#else
+ inputZeroMQReader.Open(inputName);
+ inputReader = &inputZeroMQReader;
+#endif
}
+ else
+ {
+ fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str());
+ ret = -1;
+ goto END_MAIN;
+ }
+
if (useFileOutput) {
// Opening COFDM output file
- if (outputName != NULL) {
- fprintf(stderr, "Using file output\n");
- output = new OutputFile(outputName);
- }
+ fprintf(stderr, "Using file output '%s'\n", outputName.c_str());
+ output = new OutputFile(outputName);
}
else if (useUHDOutput) {
fprintf(stderr, "Using UHD output\n");
@@ -553,7 +586,6 @@ int main(int argc, char* argv[])
logger.level(error) << "UHD initialisation failed:" << e.what();
goto END_MAIN;
}
-
}
flowgraph = new Flowgraph();
@@ -568,7 +600,7 @@ int main(int argc, char* argv[])
((OutputUHD*)output)->setETIReader(modulator->getEtiReader());
}
- inputFileReader.PrintInfo();
+ inputReader->PrintInfo();
try {
while (running) {
@@ -578,7 +610,7 @@ int main(int argc, char* argv[])
PDEBUG("*****************************************\n");
PDEBUG("* Starting main loop\n");
PDEBUG("*****************************************\n");
- while ((framesize = inputFileReader.GetNextFrame(data.getData())) > 0) {
+ while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) {
if (!running) {
break;
}
@@ -588,7 +620,6 @@ int main(int argc, char* argv[])
PDEBUG("*****************************************\n");
PDEBUG("* Read frame %lu\n", frame);
PDEBUG("*****************************************\n");
- fprintf(stderr, "Reading frame %lu\n", frame);
////////////////////////////////////////////////////////////////
// Proccessing data
@@ -597,17 +628,11 @@ int main(int argc, char* argv[])
}
if (framesize == 0) {
fprintf(stderr, "End of file reached.\n");
- if (!loop) {
- running = false;
- } else {
- fprintf(stderr, "Rewinding file.\n");
- inputFileReader.Rewind();
- }
}
else {
fprintf(stderr, "Input read error.\n");
- running = false;
}
+ running = false;
}
} catch (std::exception& e) {
fprintf(stderr, "EXCEPTION: %s\n", e.what());
diff --git a/src/FIRFilter.h b/src/FIRFilter.h
index b9abb3e..8acd444 100644
--- a/src/FIRFilter.h
+++ b/src/FIRFilter.h
@@ -30,11 +30,12 @@
#endif
#include <boost/thread.hpp>
-#include <queue>
+#include "ThreadsafeQueue.h"
#include "RemoteControl.h"
#include "ModCodec.h"
#include "PcDebug.h"
+#include "ThreadsafeQueue.h"
#include <sys/types.h>
#include <complex>
@@ -47,54 +48,6 @@
typedef std::complex<float> complexf;
-template<typename T>
-class ThreadsafeQueue
-{
-private:
- std::queue<T> the_queue;
- mutable boost::mutex the_mutex;
- boost::condition_variable the_condition_variable;
-public:
- void push(T const& val)
- {
- boost::mutex::scoped_lock lock(the_mutex);
- the_queue.push(val);
- lock.unlock();
- the_condition_variable.notify_one();
- }
-
- bool empty() const
- {
- boost::mutex::scoped_lock lock(the_mutex);
- return the_queue.empty();
- }
-
- bool try_pop(T& popped_value)
- {
- boost::mutex::scoped_lock lock(the_mutex);
- if(the_queue.empty())
- {
- return false;
- }
-
- popped_value = the_queue.front();
- the_queue.pop();
- return true;
- }
-
- void wait_and_pop(T& popped_value)
- {
- boost::mutex::scoped_lock lock(the_mutex);
- while(the_queue.empty())
- {
- the_condition_variable.wait(lock);
- }
-
- popped_value = the_queue.front();
- the_queue.pop();
- }
-};
-
struct FIRFilterWorkerData {
/* Thread-safe queues to give data to and get data from
* the worker
diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp
index 1be1ad7..2514af8 100644
--- a/src/InputFileReader.cpp
+++ b/src/InputFileReader.cpp
@@ -43,9 +43,10 @@
#include "InputReader.h"
#include "PcDebug.h"
-int InputFileReader::Open(std::string filename)
+int InputFileReader::Open(std::string filename, bool loop)
{
filename_ = filename;
+ loop_ = loop;
inputfile_ = fopen(filename_.c_str(), "r");
if (inputfile_ == NULL) {
fprintf(stderr, "Unable to open input file!\n");
@@ -59,7 +60,7 @@ int InputFileReader::Open(std::string filename)
int InputFileReader::Rewind()
{
- rewind(inputfile_);
+ rewind(inputfile_); // Also clears the EOF flag
return IdentifyType();
}
@@ -213,7 +214,7 @@ void InputFileReader::PrintInfo()
fprintf(stderr, "framed");
break;
default:
- fprintf(stderr, "unkown!");
+ fprintf(stderr, "unknown!");
break;
}
fprintf(stderr, "\n");
@@ -235,9 +236,24 @@ int InputFileReader::GetNextFrame(void* buffer)
}
else {
if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
- PDEBUG("End of file!\n");
- logger_.level(error) << "Reached end of file!";
- return 0;
+ logger_.level(error) << "Reached end of file.";
+ if (loop_) {
+ if (Rewind() == 0) {
+ if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
+ PDEBUG("Error after rewinding file!\n");
+ logger_.level(error) << "Error after rewinding file!";
+ return -1;
+ }
+ }
+ else {
+ PDEBUG("Impossible to rewind file!\n");
+ logger_.level(error) << "Impossible to rewind file!";
+ return -1;
+ }
+ }
+ else {
+ return 0;
+ }
}
}
if (frameSize > 6144) { // there might be a better limit
@@ -247,8 +263,25 @@ int InputFileReader::GetNextFrame(void* buffer)
}
PDEBUG("Frame size: %u\n", frameSize);
+ size_t read_bytes = fread(buffer, 1, frameSize, inputfile_);
+ if ( loop_ &&
+ streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144
+ read_bytes == 0 && feof(inputfile_)) {
+ // in case of an EOF from a RAW that we loop, rewind
+ // otherwise, we won't tolerate it
+
+ if (Rewind() == 0) {
+ read_bytes = fread(buffer, 1, frameSize, inputfile_);
+ }
+ else {
+ PDEBUG("Impossible to rewind file!\n");
+ logger_.level(error) << "Impossible to rewind file!";
+ return -1;
+ }
+ }
+
- if (fread(buffer, frameSize, 1, inputfile_) != 1) {
+ if (read_bytes != frameSize) {
// A short read of a frame (i.e. reading an incomplete frame)
// is not tolerated. Input files must not contain incomplete frames
fprintf(stderr,
diff --git a/src/InputReader.h b/src/InputReader.h
index dbc7c11..8917922 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -31,6 +31,10 @@
#endif
#include <cstdio>
+#if defined(HAVE_INPUT_ZEROMQ)
+# include <zmq.hpp>
+# include "ThreadsafeQueue.h"
+#endif
#include "porting.h"
#include "Log.h"
@@ -69,8 +73,13 @@ enum EtiStreamType {
class InputReader
{
public:
- // Save the next frame into the buffer, and return the number of bytes read.
+ // Put next frame into buffer. This function will never write more than
+ // 6144 bytes into buffer.
+ // returns number of bytes written to buffer, 0 on eof, -1 on error
virtual int GetNextFrame(void* buffer) = 0;
+
+ // Print some information
+ virtual void PrintInfo() = 0;
};
class InputFileReader : public InputReader
@@ -90,18 +99,12 @@ class InputFileReader : public InputReader
}
// open file and determine stream type
- int Open(std::string filename);
+ // When loop=1, GetNextFrame will never return 0
+ int Open(std::string filename, bool loop);
// Print information about the file opened
void PrintInfo();
- // Rewind the file, and replay anew
- // returns 0 on success, -1 on failure
- int Rewind();
-
- // Put next frame into buffer. This function will never write more than
- // 6144 bytes into buffer.
- // returns number of bytes written to buffer, 0 on eof, -1 on error
int GetNextFrame(void* buffer);
EtiStreamType GetStreamType()
@@ -112,6 +115,11 @@ class InputFileReader : public InputReader
private:
int IdentifyType();
+ // Rewind the file, and replay anew
+ // returns 0 on success, -1 on failure
+ int Rewind();
+
+ bool loop_; // if shall we loop the file over and over
std::string filename_;
EtiStreamType streamtype_;
FILE* inputfile_;
@@ -122,4 +130,54 @@ class InputFileReader : public InputReader
// after 2**32 * 24ms ~= 3.3 years
};
+#if defined(HAVE_INPUT_ZEROMQ)
+/* A ZeroMQ input. See www.zeromq.org for more info */
+
+struct InputZeroMQThreadData
+{
+ ThreadsafeQueue<zmq::message_t*> *in_messages;
+ std::string uri;
+};
+
+class InputZeroMQWorker
+{
+ public:
+ InputZeroMQWorker() :
+ zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {}
+
+ void Start(struct InputZeroMQThreadData* workerdata);
+ void Stop();
+ private:
+ void RecvProcess(struct InputZeroMQThreadData* workerdata);
+ bool running;
+ zmq::context_t zmqcontext;
+ zmq::socket_t subscriber;
+ boost::thread recv_thread;
+};
+
+class InputZeroMQReader : public InputReader
+{
+ public:
+ InputZeroMQReader(Logger logger) :
+ logger_(logger), in_messages_(10)
+ {
+ workerdata_.in_messages = &in_messages_;
+ }
+
+ int Open(std::string uri);
+
+ int GetNextFrame(void* buffer);
+
+ void PrintInfo();
+
+ private:
+ Logger logger_;
+ std::string uri_;
+
+ InputZeroMQWorker worker_;
+ ThreadsafeQueue<zmq::message_t*> in_messages_;
+ struct InputZeroMQThreadData workerdata_;
+};
+
+#endif
#endif
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
new file mode 100644
index 0000000..e689e4c
--- /dev/null
+++ b/src/InputZeroMQReader.cpp
@@ -0,0 +1,135 @@
+/*
+ Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Copyrigth (C) 2013
+ Matthias P. Braendli, matthias.braendli@mpb.li
+ */
+/*
+ This file is part of CRC-DADMOD.
+
+ CRC-DADMOD is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ CRC-DADMOD is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with CRC-DADMOD. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#if defined(HAVE_INPUT_ZEROMQ)
+
+#include <string>
+#include <cstring>
+#include <cstdio>
+#include <stdint.h>
+#include <zmq.hpp>
+#include <boost/thread/thread.hpp>
+#include "porting.h"
+#include "InputReader.h"
+#include "PcDebug.h"
+
+#define MAX_QUEUE_SIZE 50
+
+int InputZeroMQReader::Open(std::string uri)
+{
+ uri_ = uri;
+ workerdata_.uri = uri;
+ // launch receiver thread
+ worker_.Start(&workerdata_);
+
+ return 0;
+}
+
+int InputZeroMQReader::GetNextFrame(void* buffer)
+{
+ zmq::message_t* incoming;
+ in_messages_.wait_and_pop(incoming);
+
+ size_t framesize = incoming->size();
+
+ // guarantee that we never will write more than 6144 bytes
+ if (framesize > 6144) {
+ fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize);
+ logger_.level(error) << "ZeroMQ message too large" << framesize;
+ return -1;
+ }
+
+ memcpy(buffer, incoming->data(), framesize);
+
+ delete incoming;
+
+ // pad to 6144 bytes
+ memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize);
+
+
+ return 6144;
+}
+
+void InputZeroMQReader::PrintInfo()
+{
+ fprintf(stderr, "Input ZeroMQ:\n");
+ fprintf(stderr, " Receiving from %s\n\n", uri_.c_str());
+}
+
+// ------------- Worker functions
+
+void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata)
+{
+ size_t queue_size = 0;
+
+ try {
+ subscriber.connect(workerdata->uri.c_str());
+
+ subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ while (running)
+ {
+ zmq::message_t incoming;
+ subscriber.recv(&incoming);
+
+ if (queue_size < MAX_QUEUE_SIZE) {
+ zmq::message_t* holder = new zmq::message_t();
+ holder->move(&incoming); // move the message into the holder
+ queue_size = workerdata->in_messages->push(holder);
+ }
+ else
+ {
+ workerdata->in_messages->notify();
+ fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size);
+ }
+
+ if (queue_size < 5) {
+ fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size);
+ }
+ }
+ }
+ catch ( zmq::error_t err ) {
+ printf("ZeroMQ error in RecvProcess: '%s'\n", err.what());
+ }
+}
+
+void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata)
+{
+ running = true;
+ recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata);
+}
+
+void InputZeroMQWorker::Stop()
+{
+ subscriber.close();
+ running = false;
+}
+
+#endif
+
diff --git a/src/Makefile.am b/src/Makefile.am
index 0bb9dc9..fa4b5bd 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -22,6 +22,12 @@ else
HGVERSION_FLAGS = -DHGVERSION="\"-modified\""
endif
+if HAVE_INPUT_ZEROMQ_TEST
+ZMQ_LIBS =-lzmq
+else
+ZMQ_LIBS =
+endif
+
FFT_DIR=$(top_builddir)/lib/kiss_fft129
FFT_INC=-I$(FFT_DIR) -I$(FFT_DIR)/tools
FFT_SRC=$(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h
@@ -41,6 +47,7 @@ $(FFT_DIR):
fi
crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS)
+crc_dabmod_LDADD = $(ZMQ_LIBS)
crc_dabmod_SOURCES = DabMod.cpp \
PcDebug.h \
porting.c porting.h \
@@ -64,7 +71,7 @@ crc_dabmod_SOURCES = DabMod.cpp \
OutputUHD.cpp OutputUHD.h \
ModOutput.cpp ModOutput.h \
InputMemory.cpp InputMemory.h \
- InputFileReader.cpp InputReader.h \
+ InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \
OutputFile.cpp OutputFile.h \
FrameMultiplexer.cpp FrameMultiplexer.h \
ModMux.cpp ModMux.h \
diff --git a/src/Makefile.in b/src/Makefile.in
index 0a1e7ad..0817c90 100644
--- a/src/Makefile.in
+++ b/src/Makefile.in
@@ -130,6 +130,7 @@ am_crc_dabmod_OBJECTS = crc_dabmod-DabMod.$(OBJEXT) \
crc_dabmod-OutputUHD.$(OBJEXT) crc_dabmod-ModOutput.$(OBJEXT) \
crc_dabmod-InputMemory.$(OBJEXT) \
crc_dabmod-InputFileReader.$(OBJEXT) \
+ crc_dabmod-InputZeroMQReader.$(OBJEXT) \
crc_dabmod-OutputFile.$(OBJEXT) \
crc_dabmod-FrameMultiplexer.$(OBJEXT) \
crc_dabmod-ModMux.$(OBJEXT) crc_dabmod-PrbsGenerator.$(OBJEXT) \
@@ -153,7 +154,8 @@ am__objects_1 = crc_dabmod-kiss_fft.$(OBJEXT) \
nodist_crc_dabmod_OBJECTS = $(am__objects_1)
crc_dabmod_OBJECTS = $(am_crc_dabmod_OBJECTS) \
$(nodist_crc_dabmod_OBJECTS)
-crc_dabmod_LDADD = $(LDADD)
+am__DEPENDENCIES_1 =
+crc_dabmod_DEPENDENCIES = $(am__DEPENDENCIES_1)
am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;
am__vpath_adj = case $$p in \
$(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \
@@ -361,11 +363,14 @@ top_builddir = @top_builddir@
top_srcdir = @top_srcdir@
@IS_HG_REPO_FALSE@HGVERSION_FLAGS = -DHGVERSION="\"-modified\""
@IS_HG_REPO_TRUE@HGVERSION_FLAGS = -DHGVERSION="\"`hg parents --template '-{node|short}'`\""
+@HAVE_INPUT_ZEROMQ_TEST_FALSE@ZMQ_LIBS =
+@HAVE_INPUT_ZEROMQ_TEST_TRUE@ZMQ_LIBS = -lzmq
FFT_DIR = $(top_builddir)/lib/kiss_fft129
FFT_INC = -I$(FFT_DIR) -I$(FFT_DIR)/tools
FFT_SRC = $(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h
FFT_FLG = -ffast-math
crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS)
+crc_dabmod_LDADD = $(ZMQ_LIBS)
crc_dabmod_SOURCES = DabMod.cpp \
PcDebug.h \
porting.c porting.h \
@@ -389,7 +394,7 @@ crc_dabmod_SOURCES = DabMod.cpp \
OutputUHD.cpp OutputUHD.h \
ModOutput.cpp ModOutput.h \
InputMemory.cpp InputMemory.h \
- InputFileReader.cpp InputReader.h \
+ InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \
OutputFile.cpp OutputFile.h \
FrameMultiplexer.cpp FrameMultiplexer.h \
ModMux.cpp ModMux.h \
@@ -553,6 +558,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-GuardIntervalInserter.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputFileReader.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputMemory.Po@am__quote@
+@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputZeroMQReader.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-Log.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModCodec.Po@am__quote@
@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModFormat.Po@am__quote@
@@ -973,6 +979,20 @@ crc_dabmod-InputFileReader.obj: InputFileReader.cpp
@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
@am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputFileReader.obj `if test -f 'InputFileReader.cpp'; then $(CYGPATH_W) 'InputFileReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputFileReader.cpp'; fi`
+crc_dabmod-InputZeroMQReader.o: InputZeroMQReader.cpp
+@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.o -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp
+@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ $(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.o' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp
+
+crc_dabmod-InputZeroMQReader.obj: InputZeroMQReader.cpp
+@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.obj -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi`
+@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ $(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.obj' libtool=no @AMDEPBACKSLASH@
+@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@
+@am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi`
+
crc_dabmod-OutputFile.o: OutputFile.cpp
@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-OutputFile.o -MD -MP -MF $(DEPDIR)/crc_dabmod-OutputFile.Tpo -c -o crc_dabmod-OutputFile.o `test -f 'OutputFile.cpp' || echo '$(srcdir)/'`OutputFile.cpp
@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-OutputFile.Tpo $(DEPDIR)/crc_dabmod-OutputFile.Po
diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp
index c411c85..d0d87c8 100644
--- a/src/OutputFile.cpp
+++ b/src/OutputFile.cpp
@@ -22,20 +22,21 @@
#include "OutputFile.h"
#include "PcDebug.h"
+#include <string>
#include <assert.h>
#include <stdexcept>
-OutputFile::OutputFile(const char* filename) :
+OutputFile::OutputFile(std::string filename) :
ModOutput(ModFormat(1), ModFormat(0)),
myFilename(filename)
{
PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n",
- filename, this);
+ filename.c_str(), this);
- myFile = fopen(filename, "w");
+ myFile = fopen(filename.c_str(), "w");
if (myFile == NULL) {
- perror(filename);
+ perror(filename.c_str());
throw std::runtime_error(
"OutputFile::OutputFile() unable to open file!");
}
diff --git a/src/OutputFile.h b/src/OutputFile.h
index 1223aef..e4cd91f 100644
--- a/src/OutputFile.h
+++ b/src/OutputFile.h
@@ -29,6 +29,7 @@
#include "ModOutput.h"
+#include <string>
#include <stdio.h>
#include <sys/types.h>
@@ -36,14 +37,14 @@
class OutputFile : public ModOutput
{
public:
- OutputFile(const char* filename);
+ OutputFile(std::string filename);
virtual ~OutputFile();
virtual int process(Buffer* dataIn, Buffer* dataOut);
const char* name() { return "OutputFile"; }
protected:
- const char* myFilename;
+ std::string myFilename;
FILE* myFile;
};
diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h
new file mode 100644
index 0000000..d89bf74
--- /dev/null
+++ b/src/ThreadsafeQueue.h
@@ -0,0 +1,100 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
+ Right of Canada (Communications Research Center Canada)
+
+ Copyright (C) 2013
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ An implementation for a threadsafe queue using boost thread library
+
+ When creating a ThreadsafeQueue, one can specify the minimal number
+ of elements it must contain before it is possible to take one
+ element out.
+ */
+/*
+ This file is part of CRC-DADMOD.
+
+ CRC-DADMOD is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ CRC-DADMOD is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with CRC-DADMOD. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef THREADSAFE_QUEUE_H
+#define THREADSAFE_QUEUE_H
+
+#include <boost/thread.hpp>
+#include <queue>
+
+template<typename T>
+class ThreadsafeQueue
+{
+private:
+ std::queue<T> the_queue;
+ mutable boost::mutex the_mutex;
+ boost::condition_variable the_condition_variable;
+ size_t the_required_size;
+public:
+
+ ThreadsafeQueue() : the_required_size(1) {}
+
+ ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {}
+
+ size_t push(T const& val)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ the_queue.push(val);
+ size_t queue_size = the_queue.size();
+ lock.unlock();
+ the_condition_variable.notify_one();
+
+ return queue_size;
+ }
+
+ void notify()
+ {
+ the_condition_variable.notify_one();
+ }
+
+ bool empty() const
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ return the_queue.empty();
+ }
+
+ bool try_pop(T& popped_value)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ if(the_queue.size() < the_required_size)
+ {
+ return false;
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+ return true;
+ }
+
+ void wait_and_pop(T& popped_value)
+ {
+ boost::mutex::scoped_lock lock(the_mutex);
+ while(the_queue.size() < the_required_size)
+ {
+ the_condition_variable.wait(lock);
+ }
+
+ popped_value = the_queue.front();
+ the_queue.pop();
+ }
+};
+
+#endif
+