From 7f5faf87e6373d27ae6709a1185154ca8e98276b Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 1 Nov 2014 13:45:25 +0100 Subject: Add new ZeroMQ IQ output --- configure.ac | 14 +++++++---- src/DabMod.cpp | 33 ++++++++++++++++++++------ src/Makefile.am | 9 ++----- src/OutputZeroMQ.cpp | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/OutputZeroMQ.h | 62 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 167 insertions(+), 18 deletions(-) create mode 100644 src/OutputZeroMQ.cpp create mode 100644 src/OutputZeroMQ.h diff --git a/configure.ac b/configure.ac index 2053c9c..7f7ad0a 100644 --- a/configure.ac +++ b/configure.ac @@ -74,6 +74,10 @@ AC_ARG_ENABLE([kiss_fft], AC_ARG_ENABLE([input_zeromq], AS_HELP_STRING([--enable-input-zeromq], [Enable ZeroMQ input])) +# ZeroMQ message IQ output +AC_ARG_ENABLE([output_zeromq], + AS_HELP_STRING([--enable-output-zeromq], [Enable ZeroMQ output])) + # UHD support control AC_ARG_ENABLE([output_uhd], [AS_HELP_STRING([--enable-output-uhd], [Enable UHD output])], @@ -91,7 +95,10 @@ echo "Checking input zeromq" AS_IF([test "x$enable_input_zeromq" = "xyes"], [AC_DEFINE(HAVE_INPUT_ZEROMQ, [1], [Define if ZeroMQ input is enabled]) , - AC_CHECK_LIB(zmq, zmq_init, ,[AC_MSG_ERROR([ZeroMQ libzmq is required])])]) + AC_CHECK_LIB(zmq, zmq_init, ZMQ_LIBS="-lzmq" ,[AC_MSG_ERROR([ZeroMQ libzmq is required])])]) +AS_IF([test "x$enable_output_zeromq" = "xyes"], + [AC_DEFINE(HAVE_OUTPUT_ZEROMQ, [1], [Define if ZeroMQ output is enabled]) , + AC_CHECK_LIB(zmq, zmq_init, ZMQ_LIBS="-lzmq" ,[AC_MSG_ERROR([ZeroMQ libzmq is required])])]) AS_IF([test "x$enable_debug" = "xno"], [OPTIM="-O2" DEBUG="" EXTRA="$EXTRA -DNDEBUG"], [OPTIM="-O0" DEBUG="-ggdb" EXTRA="$EXTRA"]) @@ -101,7 +108,6 @@ AS_IF([test "x$enable_prof" != "xno"], # Define conditionals for Makefile.am AM_CONDITIONAL([USE_KISS_FFT], [test "x$enable_fftw" = "xno"]) AM_CONDITIONAL([DEBUG], [test "x$enable_trace" = "xyes"]) -AM_CONDITIONAL([HAVE_INPUT_ZEROMQ_TEST], [test "x$enable_input_zeromq" = "xyes"]) AM_CONDITIONAL([IS_GIT_REPO], [test -d '.git']) # Defines for config.h @@ -116,7 +122,7 @@ AX_PTHREAD([], AC_MSG_ERROR([requires pthread])) AC_SUBST([CFLAGS], ["$OPTIM $DEBUG $EXTRA $FFTW_CFLAGS $PTHREAD_CFLAGS"]) AC_SUBST([CXXFLAGS], ["$OPTIM $DEBUG $EXTRA $FFTW_CFLAGS $PTHREAD_CFLAGS"]) -AC_SUBST([LIBS], ["$FFTW_LIBS $PTHREAD_LIBS"]) +AC_SUBST([LIBS], ["$FFTW_LIBS $PTHREAD_LIBS $ZMQ_LIBS"]) # Checks for UHD. AS_IF([test "x$enable_output_uhd" = "xyes"], @@ -199,7 +205,7 @@ echo "***********************************************" echo enabled="" disabled="" -for feat in debug prof trace fftw fft_simd output_uhd input_zeromq +for feat in debug prof trace fftw fft_simd output_uhd input_zeromq output_zeromq do eval var=\$enable_$feat AS_IF([test "x$var" = "xyes"], diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 67ad12d..f27d720 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -38,6 +38,7 @@ #if defined(HAVE_OUTPUT_UHD) # include "OutputUHD.h" #endif +#include "OutputZeroMQ.h" #include "InputReader.h" #include "PcDebug.h" #include "TimestampDecoder.h" @@ -166,6 +167,7 @@ int main(int argc, char* argv[]) std::string inputTransport = "file"; std::string outputName; + int useZeroMQOutput = 0; int useFileOutput = 0; int useUHDOutput = 0; @@ -399,7 +401,7 @@ int main(int argc, char* argv[]) clockRate = pt.get("modulator.dac_clk_rate", (size_t)0); digitalgain = pt.get("modulator.digital_gain", digitalgain); outputRate = pt.get("modulator.rate", outputRate); - + // FIR Filter parameters: if (pt.get("firfilter.enabled", 0) == 1) { try { @@ -530,6 +532,12 @@ int main(int argc, char* argv[]) useUHDOutput = 1; } +#endif +#if defined(HAVE_OUTPUT_ZEROMQ) + else if (output_selected == "zmq") { + outputName = pt.get("zmqoutput.listen"); + useZeroMQOutput = 1; + } #endif else { std::cerr << "Error: Invalid output defined.\n"; @@ -612,7 +620,7 @@ int main(int argc, char* argv[]) goto END_MAIN; } - if (!useFileOutput && !useUHDOutput) { + if (!useFileOutput && !useUHDOutput && !useZeroMQOutput) { logger.level(error) << "Output not specified"; fprintf(stderr, "Must specify output !"); goto END_MAIN; @@ -623,8 +631,12 @@ int main(int argc, char* argv[]) fprintf(stderr, " Type: %s\n", inputTransport.c_str()); fprintf(stderr, " Source: %s\n", inputName.c_str()); fprintf(stderr, "Output\n"); + + if (useFileOutput) { + fprintf(stderr, " Name: %s\n", outputName.c_str()); + } #if defined(HAVE_OUTPUT_UHD) - if (useUHDOutput) { + else if (useUHDOutput) { fprintf(stderr, " UHD\n" " Device: %s\n" " Type: %s\n" @@ -633,12 +645,13 @@ int main(int argc, char* argv[]) outputuhd_conf.usrpType.c_str(), outputuhd_conf.masterClockRate); } - else if (useFileOutput) { -#else - if (useFileOutput) { #endif - fprintf(stderr, " Name: %s\n", outputName.c_str()); + else if (useZeroMQOutput) { + fprintf(stderr, " ZeroMQ\n" + " Listening on: %s\n", + outputName.c_str()); } + fprintf(stderr, " Sampling rate: "); if (outputRate > 1000) { if (outputRate > 1000000) { @@ -713,6 +726,12 @@ int main(int argc, char* argv[]) } } #endif + else if (useZeroMQOutput) { + /* We normalise the same way as for the UHD output */ + normalise = 1.0f/50000.0f; + + output = new OutputZeroMQ(outputName); + } flowgraph = new Flowgraph(); data.setLength(6144); diff --git a/src/Makefile.am b/src/Makefile.am index 6c83cb1..635a3d8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -22,12 +22,6 @@ else GITVERSION_FLAGS = endif -if HAVE_INPUT_ZEROMQ_TEST -ZMQ_LIBS = -lzmq -else -ZMQ_LIBS = -endif - if HAVE_OUTPUT_UHD_TEST UHD_SOURCES = OutputUHD.cpp OutputUHD.h else @@ -76,7 +70,7 @@ endif odr_dabmod_CPPFLAGS = -Wall \ $(FFT_INC) $(FFT_FLG) $(SIMD_CFLAGS) $(GITVERSION_FLAGS) -odr_dabmod_LDADD = $(ZMQ_LIBS) $(FFT_LDADD) +odr_dabmod_LDADD = $(FFT_LDADD) odr_dabmod_SOURCES = DabMod.cpp \ PcDebug.h \ porting.c porting.h \ @@ -96,6 +90,7 @@ odr_dabmod_SOURCES = DabMod.cpp \ Flowgraph.cpp Flowgraph.h \ GainControl.cpp GainControl.h \ OutputMemory.cpp OutputMemory.h \ + OutputZeroMQ.cpp OutputZeroMQ.h \ TimestampDecoder.h TimestampDecoder.cpp \ $(UHD_SOURCES) \ ModOutput.cpp ModOutput.h \ diff --git a/src/OutputZeroMQ.cpp b/src/OutputZeroMQ.cpp new file mode 100644 index 0000000..0e759dd --- /dev/null +++ b/src/OutputZeroMQ.cpp @@ -0,0 +1,67 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + */ +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see . + */ + +#include "OutputZeroMQ.h" +#include "PcDebug.h" +#include +#include +#include + +#if defined(HAVE_OUTPUT_ZEROMQ) + +OutputZeroMQ::OutputZeroMQ(std::string endpoint, Buffer* dataOut) + : ModOutput(ModFormat(1), ModFormat(0)), + m_zmq_context(1), + m_zmq_pub_sock(m_zmq_context, ZMQ_PUB), + m_endpoint(endpoint) +{ + PDEBUG("OutputZeroMQ::OutputZeroMQ(%p) @ %p\n", dataOut, this); + + std::stringstream ss; + ss << "OutputZeroMQ(" << m_endpoint << ")"; + m_name = ss.str(); + + m_zmq_pub_sock.bind(m_endpoint.c_str()); +} + +OutputZeroMQ::~OutputZeroMQ() +{ + PDEBUG("OutputZeroMQ::~OutputZeroMQ() @ %p\n", this); +} + +int OutputZeroMQ::process(Buffer* dataIn, Buffer* dataOut) +{ + PDEBUG("OutputZeroMQ::process" + "(dataIn: %p, dataOut: %p)\n", + dataIn, dataOut); + + m_zmq_pub_sock.send(dataIn->getData(), dataIn->getLength()); + + return dataIn->getLength(); +} + +#endif // HAVE_OUTPUT_ZEROMQ_H + diff --git a/src/OutputZeroMQ.h b/src/OutputZeroMQ.h new file mode 100644 index 0000000..a3ac060 --- /dev/null +++ b/src/OutputZeroMQ.h @@ -0,0 +1,62 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2014 + Matthias P. Braendli, matthias.braendli@mpb.li + + http://opendigitalradio.org + */ +/* + This file is part of ODR-DabMod. + + ODR-DabMod is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + ODR-DabMod is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with ODR-DabMod. If not, see . + */ + +#ifndef OUTPUT_ZEROMQ_H +#define OUTPUT_ZEROMQ_H + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(HAVE_OUTPUT_ZEROMQ) + +#include "ModOutput.h" +#include "zmq.hpp" + +class OutputZeroMQ : public ModOutput +{ + public: + OutputZeroMQ(std::string endpoint, Buffer* dataOut = NULL); + virtual ~OutputZeroMQ(); + virtual int process(Buffer* dataIn, Buffer* dataOut); + const char* name() { return m_name.c_str(); } + + protected: + zmq::context_t m_zmq_context; // handle for the zmq context + zmq::socket_t m_zmq_pub_sock; // handle for the zmq publisher socket + + std::string m_endpoint; // On which port to listen: e.g. + // tcp://*:58300 + + std::string m_name; + + Buffer* m_data_out; +}; + +#endif // HAVE_OUTPUT_ZEROMQ_H + +#endif // OUTPUT_ZEROMQ_H + -- cgit v1.2.3