From 5d965e80be2e6ab62bc82fb2e0d4d472153ad241 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli (think)" Date: Sun, 10 Nov 2013 21:50:12 +0100 Subject: crc-dabmod: add ZeroMQ input module --- src/DabMod.cpp | 79 +++++++++++++++++---------- src/FIRFilter.h | 51 +----------------- src/InputFileReader.cpp | 47 +++++++++++++--- src/InputReader.h | 76 ++++++++++++++++++++++---- src/InputZeroMQReader.cpp | 135 ++++++++++++++++++++++++++++++++++++++++++++++ src/Makefile.am | 9 +++- src/Makefile.in | 24 ++++++++- src/OutputFile.cpp | 9 ++-- src/OutputFile.h | 5 +- src/ThreadsafeQueue.h | 100 ++++++++++++++++++++++++++++++++++ 10 files changed, 434 insertions(+), 101 deletions(-) create mode 100644 src/InputZeroMQReader.cpp create mode 100644 src/ThreadsafeQueue.h (limited to 'src') diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 64e557c..1e8937d 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -153,8 +153,9 @@ int main(int argc, char* argv[]) int ret = 0; bool loop = false; std::string inputName = ""; + std::string inputTransport = "file"; - const char* outputName; + std::string outputName; int useFileOutput = 0; int useUHDOutput = 0; @@ -190,6 +191,10 @@ int main(int argc, char* argv[]) Logger logger; InputFileReader inputFileReader(logger); +#if defined(HAVE_INPUT_ZEROMQ) + InputZeroMQReader inputZeroMQReader(logger); +#endif + InputReader* inputReader; signal(SIGINT, signalHandler); @@ -206,7 +211,7 @@ int main(int argc, char* argv[]) if (c != 'C') { use_configuration_cmdline = true; } - + switch (c) { case 'C': use_configuration_file = true; @@ -335,7 +340,8 @@ int main(int argc, char* argv[]) loop = true; } - inputName = pt.get("input.filename", "/dev/stdin"); + inputTransport = pt.get("input.transport", "file"); + inputName = pt.get("input.source", "/dev/stdin"); // log parameters: if (pt.get("log.syslog", 0) == 1) { @@ -391,7 +397,7 @@ int main(int argc, char* argv[]) if (output_selected == "file") { try { - outputName = pt.get("fileoutput.filename").c_str(); + outputName = pt.get("fileoutput.filename"); } catch (std::exception &e) { std::cerr << "Error: " << e.what() << "\n"; @@ -477,15 +483,22 @@ int main(int argc, char* argv[]) modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY; } - // Setting ETI input filename if (inputName == "") { if (optind < argc) { inputName = argv[optind++]; - } else { + + if (inputName.substr(0, 4) == "zmq+" && + inputName.find("://") != std::string::npos) { + // if the name starts with zmq+XYZ://somewhere:port + inputTransport = "zeromq"; + } + } + else { inputName = "/dev/stdin"; } } + // Checking unused arguments if (optind != argc) { fprintf(stderr, "Invalid arguments:"); @@ -507,13 +520,14 @@ int main(int argc, char* argv[]) // Print settings fprintf(stderr, "Input\n"); - fprintf(stderr, " Name: %s\n", inputName.c_str()); + fprintf(stderr, " Type: %s\n", inputTransport.c_str()); + fprintf(stderr, " Source: %s\n", inputName.c_str()); fprintf(stderr, "Output\n"); if (useUHDOutput) { fprintf(stderr, " UHD, Device: %s\n", outputuhd_conf.device); } else if (useFileOutput) { - fprintf(stderr, " Name: %s\n", outputName); + fprintf(stderr, " Name: %s\n", outputName.c_str()); } fprintf(stderr, " Sampling rate: "); if (outputRate > 1000) { @@ -526,20 +540,39 @@ int main(int argc, char* argv[]) fprintf(stderr, "%zu Hz\n", outputRate); } - // Opening ETI input file - if (inputFileReader.Open(inputName) == -1) { - fprintf(stderr, "Unable to open input file!\n"); - logger.level(error) << "Unable to open input file!"; + if (inputTransport == "file") { + // Opening ETI input file + if (inputFileReader.Open(inputName, loop) == -1) { + fprintf(stderr, "Unable to open input file!\n"); + logger.level(error) << "Unable to open input file!"; + ret = -1; + goto END_MAIN; + } + + inputReader = &inputFileReader; + } + else if (inputTransport == "zeromq") { +#if !defined(HAVE_INPUT_ZEROMQ) + fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n"); ret = -1; goto END_MAIN; +#else + inputZeroMQReader.Open(inputName); + inputReader = &inputZeroMQReader; +#endif } + else + { + fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str()); + ret = -1; + goto END_MAIN; + } + if (useFileOutput) { // Opening COFDM output file - if (outputName != NULL) { - fprintf(stderr, "Using file output\n"); - output = new OutputFile(outputName); - } + fprintf(stderr, "Using file output '%s'\n", outputName.c_str()); + output = new OutputFile(outputName); } else if (useUHDOutput) { fprintf(stderr, "Using UHD output\n"); @@ -553,7 +586,6 @@ int main(int argc, char* argv[]) logger.level(error) << "UHD initialisation failed:" << e.what(); goto END_MAIN; } - } flowgraph = new Flowgraph(); @@ -568,7 +600,7 @@ int main(int argc, char* argv[]) ((OutputUHD*)output)->setETIReader(modulator->getEtiReader()); } - inputFileReader.PrintInfo(); + inputReader->PrintInfo(); try { while (running) { @@ -578,7 +610,7 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Starting main loop\n"); PDEBUG("*****************************************\n"); - while ((framesize = inputFileReader.GetNextFrame(data.getData())) > 0) { + while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) { if (!running) { break; } @@ -588,7 +620,6 @@ int main(int argc, char* argv[]) PDEBUG("*****************************************\n"); PDEBUG("* Read frame %lu\n", frame); PDEBUG("*****************************************\n"); - fprintf(stderr, "Reading frame %lu\n", frame); //////////////////////////////////////////////////////////////// // Proccessing data @@ -597,17 +628,11 @@ int main(int argc, char* argv[]) } if (framesize == 0) { fprintf(stderr, "End of file reached.\n"); - if (!loop) { - running = false; - } else { - fprintf(stderr, "Rewinding file.\n"); - inputFileReader.Rewind(); - } } else { fprintf(stderr, "Input read error.\n"); - running = false; } + running = false; } } catch (std::exception& e) { fprintf(stderr, "EXCEPTION: %s\n", e.what()); diff --git a/src/FIRFilter.h b/src/FIRFilter.h index b9abb3e..8acd444 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -30,11 +30,12 @@ #endif #include -#include +#include "ThreadsafeQueue.h" #include "RemoteControl.h" #include "ModCodec.h" #include "PcDebug.h" +#include "ThreadsafeQueue.h" #include #include @@ -47,54 +48,6 @@ typedef std::complex complexf; -template -class ThreadsafeQueue -{ -private: - std::queue the_queue; - mutable boost::mutex the_mutex; - boost::condition_variable the_condition_variable; -public: - void push(T const& val) - { - boost::mutex::scoped_lock lock(the_mutex); - the_queue.push(val); - lock.unlock(); - the_condition_variable.notify_one(); - } - - bool empty() const - { - boost::mutex::scoped_lock lock(the_mutex); - return the_queue.empty(); - } - - bool try_pop(T& popped_value) - { - boost::mutex::scoped_lock lock(the_mutex); - if(the_queue.empty()) - { - return false; - } - - popped_value = the_queue.front(); - the_queue.pop(); - return true; - } - - void wait_and_pop(T& popped_value) - { - boost::mutex::scoped_lock lock(the_mutex); - while(the_queue.empty()) - { - the_condition_variable.wait(lock); - } - - popped_value = the_queue.front(); - the_queue.pop(); - } -}; - struct FIRFilterWorkerData { /* Thread-safe queues to give data to and get data from * the worker diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 1be1ad7..2514af8 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -43,9 +43,10 @@ #include "InputReader.h" #include "PcDebug.h" -int InputFileReader::Open(std::string filename) +int InputFileReader::Open(std::string filename, bool loop) { filename_ = filename; + loop_ = loop; inputfile_ = fopen(filename_.c_str(), "r"); if (inputfile_ == NULL) { fprintf(stderr, "Unable to open input file!\n"); @@ -59,7 +60,7 @@ int InputFileReader::Open(std::string filename) int InputFileReader::Rewind() { - rewind(inputfile_); + rewind(inputfile_); // Also clears the EOF flag return IdentifyType(); } @@ -213,7 +214,7 @@ void InputFileReader::PrintInfo() fprintf(stderr, "framed"); break; default: - fprintf(stderr, "unkown!"); + fprintf(stderr, "unknown!"); break; } fprintf(stderr, "\n"); @@ -235,9 +236,24 @@ int InputFileReader::GetNextFrame(void* buffer) } else { if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { - PDEBUG("End of file!\n"); - logger_.level(error) << "Reached end of file!"; - return 0; + logger_.level(error) << "Reached end of file."; + if (loop_) { + if (Rewind() == 0) { + if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { + PDEBUG("Error after rewinding file!\n"); + logger_.level(error) << "Error after rewinding file!"; + return -1; + } + } + else { + PDEBUG("Impossible to rewind file!\n"); + logger_.level(error) << "Impossible to rewind file!"; + return -1; + } + } + else { + return 0; + } } } if (frameSize > 6144) { // there might be a better limit @@ -247,8 +263,25 @@ int InputFileReader::GetNextFrame(void* buffer) } PDEBUG("Frame size: %u\n", frameSize); + size_t read_bytes = fread(buffer, 1, frameSize, inputfile_); + if ( loop_ && + streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144 + read_bytes == 0 && feof(inputfile_)) { + // in case of an EOF from a RAW that we loop, rewind + // otherwise, we won't tolerate it + + if (Rewind() == 0) { + read_bytes = fread(buffer, 1, frameSize, inputfile_); + } + else { + PDEBUG("Impossible to rewind file!\n"); + logger_.level(error) << "Impossible to rewind file!"; + return -1; + } + } + - if (fread(buffer, frameSize, 1, inputfile_) != 1) { + if (read_bytes != frameSize) { // A short read of a frame (i.e. reading an incomplete frame) // is not tolerated. Input files must not contain incomplete frames fprintf(stderr, diff --git a/src/InputReader.h b/src/InputReader.h index dbc7c11..8917922 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -31,6 +31,10 @@ #endif #include +#if defined(HAVE_INPUT_ZEROMQ) +# include +# include "ThreadsafeQueue.h" +#endif #include "porting.h" #include "Log.h" @@ -69,8 +73,13 @@ enum EtiStreamType { class InputReader { public: - // Save the next frame into the buffer, and return the number of bytes read. + // Put next frame into buffer. This function will never write more than + // 6144 bytes into buffer. + // returns number of bytes written to buffer, 0 on eof, -1 on error virtual int GetNextFrame(void* buffer) = 0; + + // Print some information + virtual void PrintInfo() = 0; }; class InputFileReader : public InputReader @@ -90,18 +99,12 @@ class InputFileReader : public InputReader } // open file and determine stream type - int Open(std::string filename); + // When loop=1, GetNextFrame will never return 0 + int Open(std::string filename, bool loop); // Print information about the file opened void PrintInfo(); - // Rewind the file, and replay anew - // returns 0 on success, -1 on failure - int Rewind(); - - // Put next frame into buffer. This function will never write more than - // 6144 bytes into buffer. - // returns number of bytes written to buffer, 0 on eof, -1 on error int GetNextFrame(void* buffer); EtiStreamType GetStreamType() @@ -112,6 +115,11 @@ class InputFileReader : public InputReader private: int IdentifyType(); + // Rewind the file, and replay anew + // returns 0 on success, -1 on failure + int Rewind(); + + bool loop_; // if shall we loop the file over and over std::string filename_; EtiStreamType streamtype_; FILE* inputfile_; @@ -122,4 +130,54 @@ class InputFileReader : public InputReader // after 2**32 * 24ms ~= 3.3 years }; +#if defined(HAVE_INPUT_ZEROMQ) +/* A ZeroMQ input. See www.zeromq.org for more info */ + +struct InputZeroMQThreadData +{ + ThreadsafeQueue *in_messages; + std::string uri; +}; + +class InputZeroMQWorker +{ + public: + InputZeroMQWorker() : + zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {} + + void Start(struct InputZeroMQThreadData* workerdata); + void Stop(); + private: + void RecvProcess(struct InputZeroMQThreadData* workerdata); + bool running; + zmq::context_t zmqcontext; + zmq::socket_t subscriber; + boost::thread recv_thread; +}; + +class InputZeroMQReader : public InputReader +{ + public: + InputZeroMQReader(Logger logger) : + logger_(logger), in_messages_(10) + { + workerdata_.in_messages = &in_messages_; + } + + int Open(std::string uri); + + int GetNextFrame(void* buffer); + + void PrintInfo(); + + private: + Logger logger_; + std::string uri_; + + InputZeroMQWorker worker_; + ThreadsafeQueue in_messages_; + struct InputZeroMQThreadData workerdata_; +}; + +#endif #endif diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp new file mode 100644 index 0000000..e689e4c --- /dev/null +++ b/src/InputZeroMQReader.cpp @@ -0,0 +1,135 @@ +/* + Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 + Her Majesty the Queen in Right of Canada (Communications Research + Center Canada) + + Copyrigth (C) 2013 + Matthias P. Braendli, matthias.braendli@mpb.li + */ +/* + This file is part of CRC-DADMOD. + + CRC-DADMOD is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DADMOD is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DADMOD. If not, see . + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#if defined(HAVE_INPUT_ZEROMQ) + +#include +#include +#include +#include +#include +#include +#include "porting.h" +#include "InputReader.h" +#include "PcDebug.h" + +#define MAX_QUEUE_SIZE 50 + +int InputZeroMQReader::Open(std::string uri) +{ + uri_ = uri; + workerdata_.uri = uri; + // launch receiver thread + worker_.Start(&workerdata_); + + return 0; +} + +int InputZeroMQReader::GetNextFrame(void* buffer) +{ + zmq::message_t* incoming; + in_messages_.wait_and_pop(incoming); + + size_t framesize = incoming->size(); + + // guarantee that we never will write more than 6144 bytes + if (framesize > 6144) { + fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize); + logger_.level(error) << "ZeroMQ message too large" << framesize; + return -1; + } + + memcpy(buffer, incoming->data(), framesize); + + delete incoming; + + // pad to 6144 bytes + memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize); + + + return 6144; +} + +void InputZeroMQReader::PrintInfo() +{ + fprintf(stderr, "Input ZeroMQ:\n"); + fprintf(stderr, " Receiving from %s\n\n", uri_.c_str()); +} + +// ------------- Worker functions + +void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) +{ + size_t queue_size = 0; + + try { + subscriber.connect(workerdata->uri.c_str()); + + subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + + while (running) + { + zmq::message_t incoming; + subscriber.recv(&incoming); + + if (queue_size < MAX_QUEUE_SIZE) { + zmq::message_t* holder = new zmq::message_t(); + holder->move(&incoming); // move the message into the holder + queue_size = workerdata->in_messages->push(holder); + } + else + { + workerdata->in_messages->notify(); + fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size); + } + + if (queue_size < 5) { + fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size); + } + } + } + catch ( zmq::error_t err ) { + printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); + } +} + +void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) +{ + running = true; + recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); +} + +void InputZeroMQWorker::Stop() +{ + subscriber.close(); + running = false; +} + +#endif + diff --git a/src/Makefile.am b/src/Makefile.am index 0bb9dc9..fa4b5bd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -22,6 +22,12 @@ else HGVERSION_FLAGS = -DHGVERSION="\"-modified\"" endif +if HAVE_INPUT_ZEROMQ_TEST +ZMQ_LIBS =-lzmq +else +ZMQ_LIBS = +endif + FFT_DIR=$(top_builddir)/lib/kiss_fft129 FFT_INC=-I$(FFT_DIR) -I$(FFT_DIR)/tools FFT_SRC=$(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h @@ -41,6 +47,7 @@ $(FFT_DIR): fi crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS) +crc_dabmod_LDADD = $(ZMQ_LIBS) crc_dabmod_SOURCES = DabMod.cpp \ PcDebug.h \ porting.c porting.h \ @@ -64,7 +71,7 @@ crc_dabmod_SOURCES = DabMod.cpp \ OutputUHD.cpp OutputUHD.h \ ModOutput.cpp ModOutput.h \ InputMemory.cpp InputMemory.h \ - InputFileReader.cpp InputReader.h \ + InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \ OutputFile.cpp OutputFile.h \ FrameMultiplexer.cpp FrameMultiplexer.h \ ModMux.cpp ModMux.h \ diff --git a/src/Makefile.in b/src/Makefile.in index 0a1e7ad..0817c90 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -130,6 +130,7 @@ am_crc_dabmod_OBJECTS = crc_dabmod-DabMod.$(OBJEXT) \ crc_dabmod-OutputUHD.$(OBJEXT) crc_dabmod-ModOutput.$(OBJEXT) \ crc_dabmod-InputMemory.$(OBJEXT) \ crc_dabmod-InputFileReader.$(OBJEXT) \ + crc_dabmod-InputZeroMQReader.$(OBJEXT) \ crc_dabmod-OutputFile.$(OBJEXT) \ crc_dabmod-FrameMultiplexer.$(OBJEXT) \ crc_dabmod-ModMux.$(OBJEXT) crc_dabmod-PrbsGenerator.$(OBJEXT) \ @@ -153,7 +154,8 @@ am__objects_1 = crc_dabmod-kiss_fft.$(OBJEXT) \ nodist_crc_dabmod_OBJECTS = $(am__objects_1) crc_dabmod_OBJECTS = $(am_crc_dabmod_OBJECTS) \ $(nodist_crc_dabmod_OBJECTS) -crc_dabmod_LDADD = $(LDADD) +am__DEPENDENCIES_1 = +crc_dabmod_DEPENDENCIES = $(am__DEPENDENCIES_1) am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`; am__vpath_adj = case $$p in \ $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \ @@ -361,11 +363,14 @@ top_builddir = @top_builddir@ top_srcdir = @top_srcdir@ @IS_HG_REPO_FALSE@HGVERSION_FLAGS = -DHGVERSION="\"-modified\"" @IS_HG_REPO_TRUE@HGVERSION_FLAGS = -DHGVERSION="\"`hg parents --template '-{node|short}'`\"" +@HAVE_INPUT_ZEROMQ_TEST_FALSE@ZMQ_LIBS = +@HAVE_INPUT_ZEROMQ_TEST_TRUE@ZMQ_LIBS = -lzmq FFT_DIR = $(top_builddir)/lib/kiss_fft129 FFT_INC = -I$(FFT_DIR) -I$(FFT_DIR)/tools FFT_SRC = $(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h FFT_FLG = -ffast-math crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS) +crc_dabmod_LDADD = $(ZMQ_LIBS) crc_dabmod_SOURCES = DabMod.cpp \ PcDebug.h \ porting.c porting.h \ @@ -389,7 +394,7 @@ crc_dabmod_SOURCES = DabMod.cpp \ OutputUHD.cpp OutputUHD.h \ ModOutput.cpp ModOutput.h \ InputMemory.cpp InputMemory.h \ - InputFileReader.cpp InputReader.h \ + InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \ OutputFile.cpp OutputFile.h \ FrameMultiplexer.cpp FrameMultiplexer.h \ ModMux.cpp ModMux.h \ @@ -553,6 +558,7 @@ distclean-compile: @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-GuardIntervalInserter.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputFileReader.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputMemory.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputZeroMQReader.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-Log.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModCodec.Po@am__quote@ @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModFormat.Po@am__quote@ @@ -973,6 +979,20 @@ crc_dabmod-InputFileReader.obj: InputFileReader.cpp @AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ @am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputFileReader.obj `if test -f 'InputFileReader.cpp'; then $(CYGPATH_W) 'InputFileReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputFileReader.cpp'; fi` +crc_dabmod-InputZeroMQReader.o: InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.o -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ $(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.o' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp + +crc_dabmod-InputZeroMQReader.obj: InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.obj -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi` +@am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ $(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.obj' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@ DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@ $(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi` + crc_dabmod-OutputFile.o: OutputFile.cpp @am__fastdepCXX_TRUE@ $(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-OutputFile.o -MD -MP -MF $(DEPDIR)/crc_dabmod-OutputFile.Tpo -c -o crc_dabmod-OutputFile.o `test -f 'OutputFile.cpp' || echo '$(srcdir)/'`OutputFile.cpp @am__fastdepCXX_TRUE@ $(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-OutputFile.Tpo $(DEPDIR)/crc_dabmod-OutputFile.Po diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index c411c85..d0d87c8 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -22,20 +22,21 @@ #include "OutputFile.h" #include "PcDebug.h" +#include #include #include -OutputFile::OutputFile(const char* filename) : +OutputFile::OutputFile(std::string filename) : ModOutput(ModFormat(1), ModFormat(0)), myFilename(filename) { PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n", - filename, this); + filename.c_str(), this); - myFile = fopen(filename, "w"); + myFile = fopen(filename.c_str(), "w"); if (myFile == NULL) { - perror(filename); + perror(filename.c_str()); throw std::runtime_error( "OutputFile::OutputFile() unable to open file!"); } diff --git a/src/OutputFile.h b/src/OutputFile.h index 1223aef..e4cd91f 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -29,6 +29,7 @@ #include "ModOutput.h" +#include #include #include @@ -36,14 +37,14 @@ class OutputFile : public ModOutput { public: - OutputFile(const char* filename); + OutputFile(std::string filename); virtual ~OutputFile(); virtual int process(Buffer* dataIn, Buffer* dataOut); const char* name() { return "OutputFile"; } protected: - const char* myFilename; + std::string myFilename; FILE* myFile; }; diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h new file mode 100644 index 0000000..d89bf74 --- /dev/null +++ b/src/ThreadsafeQueue.h @@ -0,0 +1,100 @@ +/* + Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in + Right of Canada (Communications Research Center Canada) + + Copyright (C) 2013 + Matthias P. Braendli, matthias.braendli@mpb.li + + An implementation for a threadsafe queue using boost thread library + + When creating a ThreadsafeQueue, one can specify the minimal number + of elements it must contain before it is possible to take one + element out. + */ +/* + This file is part of CRC-DADMOD. + + CRC-DADMOD is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DADMOD is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DADMOD. If not, see . + */ + +#ifndef THREADSAFE_QUEUE_H +#define THREADSAFE_QUEUE_H + +#include +#include + +template +class ThreadsafeQueue +{ +private: + std::queue the_queue; + mutable boost::mutex the_mutex; + boost::condition_variable the_condition_variable; + size_t the_required_size; +public: + + ThreadsafeQueue() : the_required_size(1) {} + + ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {} + + size_t push(T const& val) + { + boost::mutex::scoped_lock lock(the_mutex); + the_queue.push(val); + size_t queue_size = the_queue.size(); + lock.unlock(); + the_condition_variable.notify_one(); + + return queue_size; + } + + void notify() + { + the_condition_variable.notify_one(); + } + + bool empty() const + { + boost::mutex::scoped_lock lock(the_mutex); + return the_queue.empty(); + } + + bool try_pop(T& popped_value) + { + boost::mutex::scoped_lock lock(the_mutex); + if(the_queue.size() < the_required_size) + { + return false; + } + + popped_value = the_queue.front(); + the_queue.pop(); + return true; + } + + void wait_and_pop(T& popped_value) + { + boost::mutex::scoped_lock lock(the_mutex); + while(the_queue.size() < the_required_size) + { + the_condition_variable.wait(lock); + } + + popped_value = the_queue.front(); + the_queue.pop(); + } +}; + +#endif + -- cgit v1.2.3