diff options
author | Matthias P. Braendli (think) <matthias@mpb.li> | 2013-11-10 21:50:12 +0100 |
---|---|---|
committer | Matthias P. Braendli (think) <matthias@mpb.li> | 2013-11-10 21:50:12 +0100 |
commit | 5d965e80be2e6ab62bc82fb2e0d4d472153ad241 (patch) | |
tree | 5add36f337b0de524b3d098f0b1fcc8d68aba0d7 | |
parent | 4f9a01a80570437b86e69eb0542b13df9a20743d (diff) | |
download | dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.gz dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.bz2 dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.zip |
crc-dabmod: add ZeroMQ input module
-rw-r--r-- | config.h.in | 6 | ||||
-rwxr-xr-x | configure | 82 | ||||
-rw-r--r-- | configure.ac | 11 | ||||
-rw-r--r-- | doc/example.ini | 14 | ||||
-rw-r--r-- | src/DabMod.cpp | 79 | ||||
-rw-r--r-- | src/FIRFilter.h | 51 | ||||
-rw-r--r-- | src/InputFileReader.cpp | 47 | ||||
-rw-r--r-- | src/InputReader.h | 76 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 135 | ||||
-rw-r--r-- | src/Makefile.am | 9 | ||||
-rw-r--r-- | src/Makefile.in | 24 | ||||
-rw-r--r-- | src/OutputFile.cpp | 9 | ||||
-rw-r--r-- | src/OutputFile.h | 5 | ||||
-rw-r--r-- | src/ThreadsafeQueue.h | 100 |
14 files changed, 545 insertions, 103 deletions
diff --git a/config.h.in b/config.h.in index 7971a35..89c993d 100644 --- a/config.h.in +++ b/config.h.in @@ -22,6 +22,9 @@ /* Define to 1 if you have the `gettimeofday' function. */ #undef HAVE_GETTIMEOFDAY +/* Define if ZeroMQ input is enabled */ +#undef HAVE_INPUT_ZEROMQ + /* Define to 1 if you have the <inttypes.h> header file. */ #undef HAVE_INTTYPES_H @@ -43,6 +46,9 @@ /* Define to 1 if you have the `uhd' library (-luhd). */ #undef HAVE_LIBUHD +/* Define to 1 if you have the `zmq' library (-lzmq). */ +#undef HAVE_LIBZMQ + /* Define to 1 if you have the <limits.h> header file. */ #undef HAVE_LIMITS_H @@ -632,6 +632,8 @@ EGREP GREP BOOST_LDFLAGS BOOST_CPPFLAGS +HAVE_INPUT_ZEROMQ_TEST_FALSE +HAVE_INPUT_ZEROMQ_TEST_TRUE CPP am__fastdepCC_FALSE am__fastdepCC_TRUE @@ -743,6 +745,7 @@ enable_prof with_debug_malloc enable_trace enable_fft_simd +enable_input_zeromq with_boost with_boost_libdir ' @@ -1387,6 +1390,7 @@ Optional Features: --enable-prof Enable profiling --enable-trace Enable trace output --enable-fft-simd Enable SIMD instructions for kiss-fft (unstable) + --disable-input-zeromq Disable ZeroMQ input Optional Packages: --with-PACKAGE[=ARG] use PACKAGE [ARG=yes] @@ -4769,6 +4773,69 @@ else enable_fft_simd=no fi +# ZeroMQ message queue input +# Check whether --enable-input_zeromq was given. +if test "${enable_input_zeromq+set}" = set; then : + enableval=$enable_input_zeromq; +else + enable_input_zeromq=yes +fi + + + +if test "x$enable_input_zeromq" = "xyes"; then : + +$as_echo "#define HAVE_INPUT_ZEROMQ 1" >>confdefs.h + +else + { $as_echo "$as_me:${as_lineno-$LINENO}: checking for zmq_init in -lzmq" >&5 +$as_echo_n "checking for zmq_init in -lzmq... " >&6; } +if ${ac_cv_lib_zmq_zmq_init+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lzmq $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char zmq_init (); +int +main () +{ +return zmq_init (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_zmq_zmq_init=yes +else + ac_cv_lib_zmq_zmq_init=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_zmq_zmq_init" >&5 +$as_echo "$ac_cv_lib_zmq_zmq_init" >&6; } +if test "x$ac_cv_lib_zmq_zmq_init" = xyes; then : + cat >>confdefs.h <<_ACEOF +#define HAVE_LIBZMQ 1 +_ACEOF + + LIBS="-lzmq $LIBS" + +else + as_fn_error $? "ZeroMQ libzmq is required" "$LINENO" 5 +fi + +fi if test "x$enable_debug" = "xno"; then : OPTIM="-O2" DEBUG="" EXTRA="$EXTRA -DNDEBUG" else @@ -4784,12 +4851,21 @@ if test "x$enable_fft_simd" != "xno"; then : EXTRA="$EXTRA -DUSE_SIMD" fi + if test "x$enable_input_zeromq" = "xyes"; then + HAVE_INPUT_ZEROMQ_TEST_TRUE= + HAVE_INPUT_ZEROMQ_TEST_FALSE='#' +else + HAVE_INPUT_ZEROMQ_TEST_TRUE='#' + HAVE_INPUT_ZEROMQ_TEST_FALSE= +fi + + + CFLAGS="$OPTIM $DEBUG $EXTRA" CXXFLAGS="$OPTIM $DEBUG $EXTRA" - { $as_echo "$as_me:${as_lineno-$LINENO}: checking for main in -luhd" >&5 $as_echo_n "checking for main in -luhd... " >&6; } if ${ac_cv_lib_uhd_main+:} false; then : @@ -6475,6 +6551,10 @@ if test -z "${am__fastdepCC_TRUE}" && test -z "${am__fastdepCC_FALSE}"; then as_fn_error $? "conditional \"am__fastdepCC\" was never defined. Usually this means the macro was only invoked conditionally." "$LINENO" 5 fi +if test -z "${HAVE_INPUT_ZEROMQ_TEST_TRUE}" && test -z "${HAVE_INPUT_ZEROMQ_TEST_FALSE}"; then + as_fn_error $? "conditional \"HAVE_INPUT_ZEROMQ_TEST\" was never defined. +Usually this means the macro was only invoked conditionally." "$LINENO" 5 +fi if test -z "${IS_HG_REPO_TRUE}" && test -z "${IS_HG_REPO_FALSE}"; then as_fn_error $? "conditional \"IS_HG_REPO\" was never defined. Usually this means the macro was only invoked conditionally." "$LINENO" 5 diff --git a/configure.ac b/configure.ac index 4b41729..76845d0 100644 --- a/configure.ac +++ b/configure.ac @@ -61,6 +61,14 @@ AC_ARG_ENABLE([trace], AC_ARG_ENABLE([fft_simd], [AS_HELP_STRING([--enable-fft-simd], [Enable SIMD instructions for kiss-fft (unstable)])], [], [enable_fft_simd=no]) +# ZeroMQ message queue input +AC_ARG_ENABLE([input_zeromq], + [AS_HELP_STRING([--disable-input-zeromq], [Disable ZeroMQ input])], + [], [enable_input_zeromq=yes]) + +AS_IF([test "x$enable_input_zeromq" = "xyes"], + [AC_DEFINE(HAVE_INPUT_ZEROMQ, [1], [Define if ZeroMQ input is enabled])], + [AC_CHECK_LIB(zmq, zmq_init, ,[AC_MSG_ERROR([ZeroMQ libzmq is required])])]) AS_IF([test "x$enable_debug" = "xno"], [OPTIM="-O2" DEBUG="" EXTRA="$EXTRA -DNDEBUG"], [OPTIM="-O0" DEBUG="-ggdb" EXTRA="$EXTRA -Wall"]) @@ -71,6 +79,9 @@ AS_IF([test "x$enable_trace" != "xno"], AS_IF([test "x$enable_fft_simd" != "xno"], [EXTRA="$EXTRA -DUSE_SIMD"]) +AM_CONDITIONAL([HAVE_INPUT_ZEROMQ_TEST], [test "x$enable_input_zeromq" = "xyes"]) + + AC_SUBST([CFLAGS], ["$OPTIM $DEBUG $EXTRA"]) AC_SUBST([CXXFLAGS], ["$OPTIM $DEBUG $EXTRA"]) diff --git a/doc/example.ini b/doc/example.ini index f76b7ad..8f61b24 100644 --- a/doc/example.ini +++ b/doc/example.ini @@ -1,16 +1,28 @@ ; Sample configuration file for CRC-DABMOD [remotecontrol] +; enable the telnet remote control on localhost:2121 +; Since this is totally unsecure telnet, the software +; will only listen on the local loopback interface. +; To get secure remote access, use SSH port forwarding telnet=1 telnetport=2121 [log] +; Write to a logfile or to syslog. +; Setting filename to stderr is very useful during tests and development syslog=0 filelog=1 filename=/dev/stderr [input] -filename=/dev/stdin +; A file or fifo input is using transport=file +transport=file +source=/dev/stdin + +; When recieving data using ZeroMQ, the source is the URI to be used +;transport=zeromq +;source=tcp://localhost:8080 loop=1 [modulator] diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 64e557c..1e8937d 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -153,8 +153,9 @@ int main(int argc, char* argv[]) int ret = 0; bool loop = false; std::string inputName = ""; + std::string inputTransport = "file"; - const char* outputName; + std::string outputName; int useFileOutput = 0; int useUHDOutput = 0; @@ -190,6 +191,10 @@ int main(int argc, char* argv[]) Logger logger; InputFileReader inputFileReader(logger); +#if defined(HAVE_INPUT_ZEROMQ) + InputZeroMQReader inputZeroMQReader(logger); +#endif + InputReader* inputReader; signal(SIGINT, signalHandler); @@ -206,7 +211,7 @@ int main(int argc, char* argv[]) if (c != 'C') { use_configuration_cmdline = true; } - + switch (c) { case 'C': use_configuration_file = true; @@ -335,7 +340,8 @@ int main(int argc, char* argv[]) loop = true; } - inputName = pt.get("input.filename", "/dev/stdin"); + inputTransport = pt.get("input.transport", "file"); + inputName = pt.get("input.source", "/dev/stdin"); // log parameters: if (pt.get("log.syslog", 0) == 1) { @@ -391,7 +397,7 @@ int main(int argc, char* argv[]) if (output_selected == "file") { try { - outputName = pt.get<std::string>("fileoutput.filename").c_str(); + outputName = pt.get<std::string>("fileoutput.filename"); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; @@ -477,15 +483,22 @@ int main(int argc, char* argv[]) modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY; } - // Setting ETI input filename if (inputName == "") { if (optind < argc) { inputName = argv[optind++]; - } else { + + if (inputName.substr(0, 4) == "zmq+" && + inputName.find("://") != std::string::npos) { + // if the name starts with zmq+XYZ://somewhere:port + inputTransport = "zeromq"; + } + } + else { inputName = "/dev/stdin"; } } + // Checking unused arguments if (optind != argc) { fprintf(stderr, "Invalid arguments:"); @@ -507,13 +520,14 @@ int main(int argc, char* argv[]) // Print settings fprintf(stderr, "Input\n"); - fprintf(stderr, " Name: %s\n", inputName.c_str()); + fprintf(stderr, " Type: %s\n", inputTransport.c_str()); + fprintf(stderr, " Source: %s\n", inputName.c_str()); fprintf(stderr, "Output\n"); if (useUHDOutput) { fprintf(stderr, " UHD, Device: %s\n", outputuhd_conf.device); } else if (useFileOutput) { - fprintf(stderr, " Name: %s\n", outputName); + fprintf(stderr, " Name: %s\n", outputName.c_str()); } fprintf(stderr, " Sampling rate: "); if (outputRate > 1000) { @@ -526,20 +540,39 @@ int main(int argc, char* argv[]) fprintf(stderr, "%zu Hz\n", outputRate); } - // Opening ETI input file - if (inputFileReader.Open(inputName) == -1) { - fprintf(stderr, "Unable to open input file!\n"); - logger.level(error) << "Unable to open input file!"; + if (inputTransport == "file") { + // Opening ETI input file + if (inputFileReader.Open(inputName, loop) == -1) { + fprintf(stderr, "Unable to open input file!\n"); + logger.level(error) << "Unable to open input file!"; + ret = -1; + goto END_MAIN; + } + + inputReader = &inputFileReader; + } + else if (inputTransport == "zeromq") { +#if !defined(HAVE_INPUT_ZEROMQ) + fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n"); ret = -1; goto END_MAIN; +#else + inputZeroMQReader.Open(inputName); + inputReader = &inputZeroMQReader; +#endif } + else + { + fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str()); + ret = -1; + goto END_MAIN; + } + if (useFileOutput) { // Opening COFDM output file - if (outputName != NULL) { - fprintf(stderr, "Using file output\n"); - output = new OutputFile(outputName); - } + fprintf(stderr, "Using file output '%s'\n", outputName.c_str()); + output = new OutputFile(outputName); } else if (useUHDOutput) { fprintf(stderr, "Using UHD output\n"); @@ -553,7 +586,6 @@ int main(int argc, char* argv[]) logger.level(error) << "UHD initialisation failed:" << e.what(); goto END_MAIN; } - } flowgraph = new Flowgraph(); @@ -568,7 +600,7 @@ int main(int argc, char* argv[]) ((OutputUHD*)output)->setETIReader(modulator->getEtiReader()); } - inputFileReader.PrintInfo(); + inputReader->PrintInfo(); try { while (running) { @@ -578,7 +610,7 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Starting main loop\n"); PDEBUG("*****************************************\n"); - while ((framesize = inputFileReader.GetNextFrame(data.getData())) > 0) { + while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { if (!running) { break; } @@ -588,7 +620,6 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Read frame %lu\n", frame); PDEBUG("*****************************************\n"); - fprintf(stderr, "Reading frame %lu\n", frame); //////////////////////////////////////////////////////////////// // Proccessing data @@ -597,17 +628,11 @@ int main(int argc, char* argv[]) } if (framesize == 0) { fprintf(stderr, "End of file reached.\n"); - if (!loop) { - running = false; - } else { - fprintf(stderr, "Rewinding file.\n"); - inputFileReader.Rewind(); - } } else { fprintf(stderr, "Input read error.\n"); - running = false; } + running = false; } } catch (std::exception& e) { fprintf(stderr, "EXCEPTION: %s\n", e.what()); diff --git a/src/FIRFilter.h b/src/FIRFilter.h index b9abb3e..8acd444 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -30,11 +30,12 @@ #endif #include <boost/thread.hpp> -#include <queue> +#include "ThreadsafeQueue.h" #include "RemoteControl.h" #include "ModCodec.h" #include "PcDebug.h" +#include "ThreadsafeQueue.h" #include <sys/types.h> #include <complex> @@ -47,54 +48,6 @@ typedef std::complex<float> complexf; -template<typename T> -class ThreadsafeQueue -{ -private: - std::queue<T> the_queue; - mutable boost::mutex the_mutex; - boost::condition_variable the_condition_variable; -public: - void push(T const& val) - { - boost::mutex::scoped_lock lock(the_mutex); - the_queue.push(val); - lock.unlock(); - the_condition_variable.notify_one(); - } - - bool empty() const - { - boost::mutex::scoped_lock lock(the_mutex); - return the_queue.empty(); - } - - bool try_pop(T& popped_value) - { - boost::mutex::scoped_lock lock(the_mutex); - if(the_queue.empty()) - { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - return true; - } - - void wait_and_pop(T& popped_value) - { - boost::mutex::scoped_lock lock(the_mutex); - while(the_queue.empty()) - { - the_condition_variable.wait(lock); - } - - popped_value = the_queue.front(); - the_queue.pop(); - } -}; - struct FIRFilterWorkerData { /* Thread-safe queues to give data to and get data from * the worker diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 1be1ad7..2514af8 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -43,9 +43,10 @@ #include "InputReader.h" #include "PcDebug.h" -int InputFileReader::Open(std::string filename) +int InputFileReader::Open(std::string filename, bool loop) { filename_ = filename; + loop_ = loop; inputfile_ = fopen(filename_.c_str(), "r"); if (inputfile_ == NULL) { fprintf(stderr, "Unable to open input file!\n"); @@ -59,7 +60,7 @@ int InputFileReader::Open(std::string filename) int InputFileReader::Rewind() { - rewind(inputfile_); + rewind(inputfile_); // Also clears the EOF flag return IdentifyType(); } @@ -213,7 +214,7 @@ void InputFileReader::PrintInfo() fprintf(stderr, "framed"); break; default: - fprintf(stderr, "unkown!"); + fprintf(stderr, "unknown!"); break; } fprintf(stderr, "\n"); @@ -235,9 +236,24 @@ int InputFileReader::GetNextFrame(void* buffer) } else { if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { - PDEBUG("End of file!\n"); - logger_.level(error) << "Reached end of file!"; - return 0; + logger_.level(error) << "Reached end of file."; + if (loop_) { + if (Rewind() == 0) { + if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { + PDEBUG("Error after rewinding file!\n"); + logger_.level(error) << "Error after rewinding file!"; + return -1; + } + } + else { + PDEBUG("Impossible to rewind file!\n"); + logger_.level(error) << "Impossible to rewind file!"; + return -1; + } + } + else { + return 0; + } } } if (frameSize > 6144) { // there might be a better limit @@ -247,8 +263,25 @@ int InputFileReader::GetNextFrame(void* buffer) } PDEBUG("Frame size: %u\n", frameSize); + size_t read_bytes = fread(buffer, 1, frameSize, inputfile_); + if ( loop_ && + streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144 + read_bytes == 0 && feof(inputfile_)) { + // in case of an EOF from a RAW that we loop, rewind + // otherwise, we won't tolerate it + + if (Rewind() == 0) { + read_bytes = fread(buffer, 1, frameSize, inputfile_); + } + else { + PDEBUG("Impossible to rewind file!\n"); + logger_.level(error) << "Impossible to rewind file!"; + return -1; + } + } + - if (fread(buffer, frameSize, 1, inputfile_) != 1) { + if (read_bytes != frameSize) { // A short read of a frame (i.e. reading an incomplete frame) // is not tolerated. Input files must not contain incomplete frames fprintf(stderr, diff --git a/src/InputReader.h b/src/InputReader.h index dbc7c11..8917922 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -31,6 +31,10 @@ #endif #include <cstdio> +#if defined(HAVE_INPUT_ZEROMQ) +# include <zmq.hpp> +# include "ThreadsafeQueue.h" +#endif #include "porting.h" #include "Log.h" @@ -69,8 +73,13 @@ enum EtiStreamType { class InputReader { public: - // Save the next frame into the buffer, and return the number of bytes read. + // Put next frame into buffer. This function will never write more than + // 6144 bytes into buffer. + // returns number of bytes written to buffer, 0 on eof, -1 on error virtual int GetNextFrame(void* buffer) = 0; + + // Print some information + virtual void PrintInfo() = 0; }; class InputFileReader : public InputReader @@ -90,18 +99,12 @@ class InputFileReader : public InputReader } // open file and determine stream type - int Open(std::string filename); + // When loop=1, GetNextFrame will never return 0 + int Open(std::string filename, bool loop); // Print information about the file opened void PrintInfo(); - // Rewind the file, and replay anew - // returns 0 on success, -1 on failure - int Rewind(); - - // Put next frame into buffer. This function will never write more than - // 6144 bytes into buffer. - // returns number of bytes written to buffer, 0 on eof, -1 on error int GetNextFrame(void* buffer); EtiStreamType GetStreamType() @@ -112,6 +115,11 @@ class InputFileReader : public InputReader private: int IdentifyType(); + // Rewind the file, and replay anew + // returns 0 on success, -1 on failure + int Rewind(); + + bool loop_; // if shall we loop the file over and over std::string filename_; EtiStreamType streamtype_; FILE* inputfile_; @@ -122,4 +130,54 @@ class InputFileReader : public InputReader // after 2**32 * 24ms ~= 3.3 years }; +#if defined(HAVE_INPUT_ZEROMQ) +/* A ZeroMQ input. See www.zeromq.org for more info */ + +struct InputZeroMQThreadData +{ + ThreadsafeQueue<zmq::message_t*> *in_messages; + std::string uri; +}; + +class InputZeroMQWorker +{ + public: + InputZeroMQWorker() : + zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {} + + void Start(struct InputZeroMQThreadData* workerdata); + void Stop(); + private: + void RecvProcess(struct InputZeroMQThreadData* workerdata); + bool running; + zmq::context_t zmqcontext; + zmq::socket_t subscriber; + boost::thread recv_thread; +}; + +class InputZeroMQReader : public InputReader +{ + public: + InputZeroMQReader(Logger logger) : + logger_(logger), in_messages_(10) + { + workerdata_.in_messages = &in_messages_; + } + + int Open(std::string uri); + + int GetNextFrame(void* buffer); + + void PrintInfo(); + + private: + Logger logger_; + std::string uri_; + + InputZeroMQWorker worker_; + ThreadsafeQueue<zmq::message_t*> in_messages_; + struct InputZeroMQThreadData workerdata_; +}; + +#endif #endif diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp new file mode 100644 index 0000000..e689e4c --- /dev/null +++ b/src/InputZeroMQReader.cpp @@ -0,0 +1,135 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 + Her Majesty the Queen in Right of Canada (Communications Research + Center Canada) + + Copyrigth (C) 2013 + Matthias P. Braendli, matthias.braendli@mpb.li + */ +/* + This file is part of CRC-DADMOD. + + CRC-DADMOD is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DADMOD is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DADMOD. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(HAVE_INPUT_ZEROMQ) + +#include <string> +#include <cstring> +#include <cstdio> +#include <stdint.h> +#include <zmq.hpp> +#include <boost/thread/thread.hpp> +#include "porting.h" +#include "InputReader.h" +#include "PcDebug.h" + +#define MAX_QUEUE_SIZE 50 + +int InputZeroMQReader::Open(std::string uri) +{ + uri_ = uri; + workerdata_.uri = uri; + // launch receiver thread + worker_.Start(&workerdata_); + + return 0; +} + +int InputZeroMQReader::GetNextFrame(void* buffer) +{ + zmq::message_t* incoming; + in_messages_.wait_and_pop(incoming); + + size_t framesize = incoming->size(); + + // guarantee that we never will write more than 6144 bytes + if (framesize > 6144) { + fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize); + logger_.level(error) << "ZeroMQ message too large" << framesize; + return -1; + } + + memcpy(buffer, incoming->data(), framesize); + + delete incoming; + + // pad to 6144 bytes + memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize); + + + return 6144; +} + +void InputZeroMQReader::PrintInfo() +{ + fprintf(stderr, "Input ZeroMQ:\n"); + fprintf(stderr, " Receiving from %s\n\n", uri_.c_str()); +} + +// ------------- Worker functions + +void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) +{ + size_t queue_size = 0; + + try { + subscriber.connect(workerdata->uri.c_str()); + + subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + + while (running) + { + zmq::message_t incoming; + subscriber.recv(&incoming); + + if (queue_size < MAX_QUEUE_SIZE) { + zmq::message_t* holder = new zmq::message_t(); + holder->move(&incoming); // move the message into the holder + queue_size = workerdata->in_messages->push(holder); + } + else + { + workerdata->in_messages->notify(); + fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size); + } + + if (queue_size < 5) { + fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size); + } + } + } + catch ( zmq::error_t err ) { + printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); + } +} + +void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) +{ + running = true; + recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); +} + +void InputZeroMQWorker::Stop() +{ + subscriber.close(); + running = false; +} + +#endif + diff --git a/src/Makefile.am b/src/Makefile.am index 0bb9dc9..fa4b5bd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -22,6 +22,12 @@ else HGVERSION_FLAGS = -DHGVERSION="\"-modified\"" endif +if HAVE_INPUT_ZEROMQ_TEST +ZMQ_LIBS =-lzmq +else +ZMQ_LIBS = +endif + FFT_DIR=$(top_builddir)/lib/kiss_fft129 FFT_INC=-I$(FFT_DIR) -I$(FFT_DIR)/tools FFT_SRC=$(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h @@ -41,6 +47,7 @@ $(FFT_DIR): fi crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS) +crc_dabmod_LDADD = $(ZMQ_LIBS) crc_dabmod_SOURCES = DabMod.cpp \ PcDebug.h \ porting.c porting.h \ @@ -64,7 +71,7 @@ crc_dabmod_SOURCES = DabMod.cpp \ OutputUHD.cpp OutputUHD.h \ ModOutput.cpp ModOutput.h \ InputMemory.cpp InputMemory.h \ - InputFileReader.cpp InputReader.h \ + InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \ OutputFile.cpp OutputFile.h \ FrameMultiplexer.cpp FrameMultiplexer.h \ ModMux.cpp ModMux.h \ diff --git a/src/Makefile.in b/src/Makefile.in index 0a1e7ad..0817c90 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -130,6 +130,7 @@ am_crc_dabmod_OBJECTS = crc_dabmod-DabMod.$(OBJEXT) \ crc_dabmod-OutputUHD.$(OBJEXT) crc_dabmod-ModOutput.$(OBJEXT) \ crc_dabmod-InputMemory.$(OBJEXT) \ crc_dabmod-InputFileReader.$(OBJEXT) \ + crc_dabmod-InputZeroMQReader.$(OBJEXT) \ crc_dabmod-OutputFile.$(OBJEXT) \ crc_dabmod-FrameMultiplexer.$(OBJEXT) \ crc_dabmod-ModMux.$(OBJEXT) crc_dabmod-PrbsGenerator.$(OBJEXT) \ @@ -153,7 +154,8 @@ am__objects_1 = crc_dabmod-kiss_fft.$(OBJEXT) \ nodist_crc_dabmod_OBJECTS = $(am__objects_1) crc_dabmod_OBJECTS = $(am_crc_dabmod_OBJECTS) \ $(nodist_crc_dabmod_OBJECTS) -crc_dabmod_LDADD = $(LDADD) +am__DEPENDENCIES_1 = +crc_dabmod_DEPENDENCIES = $(am__DEPENDENCIES_1) am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; am__vpath_adj = case $$p in \ $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \ @@ -361,11 +363,14 @@ top_builddir = @top_builddir@ top_srcdir = @top_srcdir@ @IS_HG_REPO_FALSE@HGVERSION_FLAGS = -DHGVERSION="\"-modified\"" @IS_HG_REPO_TRUE@HGVERSION_FLAGS = -DHGVERSION="\"`hg parents --template '-{node|short}'`\"" +@HAVE_INPUT_ZEROMQ_TEST_FALSE@ZMQ_LIBS = +@HAVE_INPUT_ZEROMQ_TEST_TRUE@ZMQ_LIBS = -lzmq FFT_DIR = $(top_builddir)/lib/kiss_fft129 FFT_INC = -I$(FFT_DIR) -I$(FFT_DIR)/tools FFT_SRC = $(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h FFT_FLG = -ffast-math crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS) +crc_dabmod_LDADD = $(ZMQ_LIBS) crc_dabmod_SOURCES = DabMod.cpp \ PcDebug.h \ porting.c porting.h \ @@ -389,7 +394,7 @@ crc_dabmod_SOURCES = DabMod.cpp \ OutputUHD.cpp OutputUHD.h \ ModOutput.cpp ModOutput.h \ InputMemory.cpp InputMemory.h \ - InputFileReader.cpp InputReader.h \ + InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \ OutputFile.cpp OutputFile.h \ FrameMultiplexer.cpp FrameMultiplexer.h \ ModMux.cpp ModMux.h \ @@ -553,6 +558,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-GuardIntervalInserter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputFileReader.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputMemory.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputZeroMQReader.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-Log.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModCodec.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModFormat.Po@am__quote@ @@ -973,6 +979,20 @@ crc_dabmod-InputFileReader.obj: InputFileReader.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputFileReader.obj `if test -f 'InputFileReader.cpp'; then $(CYGPATH_W) 'InputFileReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputFileReader.cpp'; fi` +crc_dabmod-InputZeroMQReader.o: InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.o -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ $(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.o' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp + +crc_dabmod-InputZeroMQReader.obj: InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.obj -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi` +@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ $(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.obj' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi` + crc_dabmod-OutputFile.o: OutputFile.cpp @am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-OutputFile.o -MD -MP -MF $(DEPDIR)/crc_dabmod-OutputFile.Tpo -c -o crc_dabmod-OutputFile.o `test -f 'OutputFile.cpp' || echo '$(srcdir)/'`OutputFile.cpp @am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-OutputFile.Tpo $(DEPDIR)/crc_dabmod-OutputFile.Po diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index c411c85..d0d87c8 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -22,20 +22,21 @@ #include "OutputFile.h" #include "PcDebug.h" +#include <string> #include <assert.h> #include <stdexcept> -OutputFile::OutputFile(const char* filename) : +OutputFile::OutputFile(std::string filename) : ModOutput(ModFormat(1), ModFormat(0)), myFilename(filename) { PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n", - filename, this); + filename.c_str(), this); - myFile = fopen(filename, "w"); + myFile = fopen(filename.c_str(), "w"); if (myFile == NULL) { - perror(filename); + perror(filename.c_str()); throw std::runtime_error( "OutputFile::OutputFile() unable to open file!"); } diff --git a/src/OutputFile.h b/src/OutputFile.h index 1223aef..e4cd91f 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -29,6 +29,7 @@ #include "ModOutput.h" +#include <string> #include <stdio.h> #include <sys/types.h> @@ -36,14 +37,14 @@ class OutputFile : public ModOutput { public: - OutputFile(const char* filename); + OutputFile(std::string filename); virtual ~OutputFile(); virtual int process(Buffer* dataIn, Buffer* dataOut); const char* name() { return "OutputFile"; } protected: - const char* myFilename; + std::string myFilename; FILE* myFile; }; diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h new file mode 100644 index 0000000..d89bf74 --- /dev/null +++ b/src/ThreadsafeQueue.h @@ -0,0 +1,100 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2013 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue using boost thread library + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This file is part of CRC-DADMOD. + + CRC-DADMOD is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DADMOD is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DADMOD. If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef THREADSAFE_QUEUE_H +#define THREADSAFE_QUEUE_H + +#include <boost/thread.hpp> +#include <queue> + +template<typename T> +class ThreadsafeQueue +{ +private: + std::queue<T> the_queue; + mutable boost::mutex the_mutex; + boost::condition_variable the_condition_variable; + size_t the_required_size; +public: + + ThreadsafeQueue() : the_required_size(1) {} + + ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {} + + size_t push(T const& val) + { + boost::mutex::scoped_lock lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + the_condition_variable.notify_one(); + + return queue_size; + } + + void notify() + { + the_condition_variable.notify_one(); + } + + bool empty() const + { + boost::mutex::scoped_lock lock(the_mutex); + return the_queue.empty(); + } + + bool try_pop(T& popped_value) + { + boost::mutex::scoped_lock lock(the_mutex); + if(the_queue.size() < the_required_size) + { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + return true; + } + + void wait_and_pop(T& popped_value) + { + boost::mutex::scoped_lock lock(the_mutex); + while(the_queue.size() < the_required_size) + { + the_condition_variable.wait(lock); + } + + popped_value = the_queue.front(); + the_queue.pop(); + } +}; + +#endif + |