diff options
| author | Matthias P. Braendli (think) <matthias@mpb.li> | 2013-11-10 21:50:12 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli (think) <matthias@mpb.li> | 2013-11-10 21:50:12 +0100 | 
| commit | 5d965e80be2e6ab62bc82fb2e0d4d472153ad241 (patch) | |
| tree | 5add36f337b0de524b3d098f0b1fcc8d68aba0d7 /src | |
| parent | 4f9a01a80570437b86e69eb0542b13df9a20743d (diff) | |
| download | dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.gz dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.tar.bz2 dabmod-5d965e80be2e6ab62bc82fb2e0d4d472153ad241.zip | |
crc-dabmod: add ZeroMQ input module
Diffstat (limited to 'src')
| -rw-r--r-- | src/DabMod.cpp | 79 | ||||
| -rw-r--r-- | src/FIRFilter.h | 51 | ||||
| -rw-r--r-- | src/InputFileReader.cpp | 47 | ||||
| -rw-r--r-- | src/InputReader.h | 76 | ||||
| -rw-r--r-- | src/InputZeroMQReader.cpp | 135 | ||||
| -rw-r--r-- | src/Makefile.am | 9 | ||||
| -rw-r--r-- | src/Makefile.in | 24 | ||||
| -rw-r--r-- | src/OutputFile.cpp | 9 | ||||
| -rw-r--r-- | src/OutputFile.h | 5 | ||||
| -rw-r--r-- | src/ThreadsafeQueue.h | 100 | 
10 files changed, 434 insertions, 101 deletions
| diff --git a/src/DabMod.cpp b/src/DabMod.cpp index 64e557c..1e8937d 100644 --- a/src/DabMod.cpp +++ b/src/DabMod.cpp @@ -153,8 +153,9 @@ int main(int argc, char* argv[])      int ret = 0;      bool loop = false;      std::string inputName = ""; +    std::string inputTransport = "file"; -    const char* outputName; +    std::string outputName;      int useFileOutput = 0;      int useUHDOutput = 0; @@ -190,6 +191,10 @@ int main(int argc, char* argv[])      Logger logger;      InputFileReader inputFileReader(logger); +#if defined(HAVE_INPUT_ZEROMQ) +    InputZeroMQReader inputZeroMQReader(logger); +#endif +    InputReader* inputReader;      signal(SIGINT, signalHandler); @@ -206,7 +211,7 @@ int main(int argc, char* argv[])          if (c != 'C') {              use_configuration_cmdline = true;          } -             +          switch (c) {          case 'C':              use_configuration_file = true; @@ -335,7 +340,8 @@ int main(int argc, char* argv[])              loop = true;          } -        inputName = pt.get("input.filename", "/dev/stdin"); +        inputTransport = pt.get("input.transport", "file"); +        inputName = pt.get("input.source", "/dev/stdin");          // log parameters:          if (pt.get("log.syslog", 0) == 1) { @@ -391,7 +397,7 @@ int main(int argc, char* argv[])          if (output_selected == "file") {              try { -                outputName = pt.get<std::string>("fileoutput.filename").c_str(); +                outputName = pt.get<std::string>("fileoutput.filename");              }              catch (std::exception &e) {                  std::cerr << "Error: " << e.what() << "\n"; @@ -477,15 +483,22 @@ int main(int argc, char* argv[])          modconf.delay_calculation_pipeline_stages += FIRFILTER_PIPELINE_DELAY;      } -      // Setting ETI input filename      if (inputName == "") {          if (optind < argc) {              inputName = argv[optind++]; -        } else { + +            if (inputName.substr(0, 4) == "zmq+" && +                inputName.find("://") != std::string::npos) { +                // if the name starts with zmq+XYZ://somewhere:port +                inputTransport = "zeromq"; +            } +        } +        else {              inputName = "/dev/stdin";          }      } +      // Checking unused arguments      if (optind != argc) {          fprintf(stderr, "Invalid arguments:"); @@ -507,13 +520,14 @@ int main(int argc, char* argv[])      // Print settings      fprintf(stderr, "Input\n"); -    fprintf(stderr, "  Name: %s\n", inputName.c_str()); +    fprintf(stderr, "  Type: %s\n", inputTransport.c_str()); +    fprintf(stderr, "  Source: %s\n", inputName.c_str());      fprintf(stderr, "Output\n");      if (useUHDOutput) {          fprintf(stderr, " UHD, Device: %s\n", outputuhd_conf.device);      }      else if (useFileOutput) { -        fprintf(stderr, "  Name: %s\n", outputName); +        fprintf(stderr, "  Name: %s\n", outputName.c_str());      }      fprintf(stderr, "  Sampling rate: ");      if (outputRate > 1000) { @@ -526,20 +540,39 @@ int main(int argc, char* argv[])          fprintf(stderr, "%zu Hz\n", outputRate);      } -    // Opening ETI input file -    if (inputFileReader.Open(inputName) == -1) { -        fprintf(stderr, "Unable to open input file!\n"); -        logger.level(error) << "Unable to open input file!"; +    if (inputTransport == "file") { +        // Opening ETI input file +        if (inputFileReader.Open(inputName, loop) == -1) { +            fprintf(stderr, "Unable to open input file!\n"); +            logger.level(error) << "Unable to open input file!"; +            ret = -1; +            goto END_MAIN; +        } + +        inputReader = &inputFileReader; +    } +    else if (inputTransport == "zeromq") { +#if !defined(HAVE_INPUT_ZEROMQ) +        fprintf(stderr, "Error, ZeroMQ input transport selected, but not compiled in!\n");          ret = -1;          goto END_MAIN; +#else +        inputZeroMQReader.Open(inputName); +        inputReader = &inputZeroMQReader; +#endif      } +    else +    { +        fprintf(stderr, "Error, invalid input transport %s selected!\n", inputTransport.c_str()); +        ret = -1; +        goto END_MAIN; +    } +      if (useFileOutput) {          // Opening COFDM output file -        if (outputName != NULL) { -            fprintf(stderr, "Using file output\n"); -            output = new OutputFile(outputName); -        } +        fprintf(stderr, "Using file output '%s'\n", outputName.c_str()); +        output = new OutputFile(outputName);      }      else if (useUHDOutput) {          fprintf(stderr, "Using UHD output\n"); @@ -553,7 +586,6 @@ int main(int argc, char* argv[])              logger.level(error) << "UHD initialisation failed:" << e.what();              goto END_MAIN;          } -              }      flowgraph = new Flowgraph(); @@ -568,7 +600,7 @@ int main(int argc, char* argv[])          ((OutputUHD*)output)->setETIReader(modulator->getEtiReader());      } -    inputFileReader.PrintInfo(); +    inputReader->PrintInfo();      try {          while (running) { @@ -578,7 +610,7 @@ int main(int argc, char* argv[])              PDEBUG("*****************************************\n");              PDEBUG("* Starting main loop\n");              PDEBUG("*****************************************\n"); -            while ((framesize = inputFileReader.GetNextFrame(data.getData())) > 0) { +            while ((framesize = inputReader->GetNextFrame(data.getData())) > 0) {                  if (!running) {                      break;                  } @@ -588,7 +620,6 @@ int main(int argc, char* argv[])                  PDEBUG("*****************************************\n");                  PDEBUG("* Read frame %lu\n", frame);                  PDEBUG("*****************************************\n"); -                fprintf(stderr, "Reading frame %lu\n", frame);                  ////////////////////////////////////////////////////////////////                  // Proccessing data @@ -597,17 +628,11 @@ int main(int argc, char* argv[])              }              if (framesize == 0) {                  fprintf(stderr, "End of file reached.\n"); -                if (!loop) { -                    running = false; -                } else { -                    fprintf(stderr, "Rewinding file.\n"); -                    inputFileReader.Rewind(); -                }              }              else {                  fprintf(stderr, "Input read error.\n"); -                running = false;              } +            running = false;          }      } catch (std::exception& e) {          fprintf(stderr, "EXCEPTION: %s\n", e.what()); diff --git a/src/FIRFilter.h b/src/FIRFilter.h index b9abb3e..8acd444 100644 --- a/src/FIRFilter.h +++ b/src/FIRFilter.h @@ -30,11 +30,12 @@  #endif  #include <boost/thread.hpp> -#include <queue> +#include "ThreadsafeQueue.h"  #include "RemoteControl.h"  #include "ModCodec.h"  #include "PcDebug.h" +#include "ThreadsafeQueue.h"  #include <sys/types.h>  #include <complex> @@ -47,54 +48,6 @@  typedef std::complex<float> complexf; -template<typename T> -class ThreadsafeQueue -{ -private: -    std::queue<T> the_queue; -    mutable boost::mutex the_mutex; -    boost::condition_variable the_condition_variable; -public: -    void push(T const& val) -    { -        boost::mutex::scoped_lock lock(the_mutex); -        the_queue.push(val); -        lock.unlock(); -        the_condition_variable.notify_one(); -    } - -    bool empty() const -    { -        boost::mutex::scoped_lock lock(the_mutex); -        return the_queue.empty(); -    } - -    bool try_pop(T& popped_value) -    { -        boost::mutex::scoped_lock lock(the_mutex); -        if(the_queue.empty()) -        { -            return false; -        } - -        popped_value = the_queue.front(); -        the_queue.pop(); -        return true; -    } - -    void wait_and_pop(T& popped_value) -    { -        boost::mutex::scoped_lock lock(the_mutex); -        while(the_queue.empty()) -        { -            the_condition_variable.wait(lock); -        } -         -        popped_value = the_queue.front(); -        the_queue.pop(); -    } -}; -  struct FIRFilterWorkerData {      /* Thread-safe queues to give data to and get data from       * the worker diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 1be1ad7..2514af8 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -43,9 +43,10 @@  #include "InputReader.h"  #include "PcDebug.h" -int InputFileReader::Open(std::string filename) +int InputFileReader::Open(std::string filename, bool loop)  {      filename_ = filename; +    loop_ = loop;      inputfile_ = fopen(filename_.c_str(), "r");      if (inputfile_ == NULL) {          fprintf(stderr, "Unable to open input file!\n"); @@ -59,7 +60,7 @@ int InputFileReader::Open(std::string filename)  int InputFileReader::Rewind()  { -    rewind(inputfile_); +    rewind(inputfile_); // Also clears the EOF flag      return IdentifyType();  } @@ -213,7 +214,7 @@ void InputFileReader::PrintInfo()              fprintf(stderr, "framed");              break;          default: -            fprintf(stderr, "unkown!"); +            fprintf(stderr, "unknown!");              break;      }      fprintf(stderr, "\n"); @@ -235,9 +236,24 @@ int InputFileReader::GetNextFrame(void* buffer)      }      else {          if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { -            PDEBUG("End of file!\n"); -            logger_.level(error) << "Reached end of file!"; -            return 0; +            logger_.level(error) << "Reached end of file."; +            if (loop_) { +                if (Rewind() == 0) { +                    if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { +                        PDEBUG("Error after rewinding file!\n"); +                        logger_.level(error) << "Error after rewinding file!"; +                        return -1; +                    } +                } +                else { +                    PDEBUG("Impossible to rewind file!\n"); +                    logger_.level(error) << "Impossible to rewind file!"; +                    return -1; +                } +            } +            else { +                return 0; +            }          }      }      if (frameSize > 6144) { // there might be a better limit @@ -247,8 +263,25 @@ int InputFileReader::GetNextFrame(void* buffer)      }      PDEBUG("Frame size: %u\n", frameSize); +    size_t read_bytes = fread(buffer, 1, frameSize, inputfile_); +    if (    loop_ && +            streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144 +            read_bytes == 0 && feof(inputfile_)) { +        // in case of an EOF from a RAW that we loop, rewind +        // otherwise, we won't tolerate it + +        if (Rewind() == 0) { +            read_bytes = fread(buffer, 1, frameSize, inputfile_); +        } +        else { +            PDEBUG("Impossible to rewind file!\n"); +            logger_.level(error) << "Impossible to rewind file!"; +            return -1; +        } +    } + -    if (fread(buffer, frameSize, 1, inputfile_) != 1) { +    if (read_bytes != frameSize) {          // A short read of a frame (i.e. reading an incomplete frame)          // is not tolerated. Input files must not contain incomplete frames          fprintf(stderr, diff --git a/src/InputReader.h b/src/InputReader.h index dbc7c11..8917922 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -31,6 +31,10 @@  #endif  #include <cstdio> +#if defined(HAVE_INPUT_ZEROMQ) +#  include <zmq.hpp> +#  include "ThreadsafeQueue.h" +#endif  #include "porting.h"  #include "Log.h" @@ -69,8 +73,13 @@ enum EtiStreamType {  class InputReader  {      public: -        // Save the next frame into the buffer, and return the number of bytes read. +        // Put next frame into buffer. This function will never write more than +        // 6144 bytes into buffer. +        // returns number of bytes written to buffer, 0 on eof, -1 on error          virtual int GetNextFrame(void* buffer) = 0; + +        // Print some information +        virtual void PrintInfo() = 0;  };  class InputFileReader : public InputReader @@ -90,18 +99,12 @@ class InputFileReader : public InputReader          }          // open file and determine stream type -        int Open(std::string filename); +        // When loop=1, GetNextFrame will never return 0 +        int Open(std::string filename, bool loop);          // Print information about the file opened          void PrintInfo(); -        // Rewind the file, and replay anew -        // returns 0 on success, -1 on failure -        int Rewind(); - -        // Put next frame into buffer. This function will never write more than -        // 6144 bytes into buffer. -        // returns number of bytes written to buffer, 0 on eof, -1 on error          int GetNextFrame(void* buffer);          EtiStreamType GetStreamType() @@ -112,6 +115,11 @@ class InputFileReader : public InputReader      private:          int IdentifyType(); +        // Rewind the file, and replay anew +        // returns 0 on success, -1 on failure +        int Rewind(); + +        bool loop_; // if shall we loop the file over and over          std::string filename_;          EtiStreamType streamtype_;          FILE* inputfile_; @@ -122,4 +130,54 @@ class InputFileReader : public InputReader                              // after 2**32 * 24ms ~= 3.3 years  }; +#if defined(HAVE_INPUT_ZEROMQ) +/* A ZeroMQ input. See www.zeromq.org for more info */ + +struct InputZeroMQThreadData +{ +    ThreadsafeQueue<zmq::message_t*> *in_messages; +    std::string uri; +}; + +class InputZeroMQWorker +{ +    public: +        InputZeroMQWorker() : +            zmqcontext(1), subscriber(zmqcontext, ZMQ_SUB) {} + +        void Start(struct InputZeroMQThreadData* workerdata); +        void Stop(); +    private: +        void RecvProcess(struct InputZeroMQThreadData* workerdata); +        bool running; +        zmq::context_t zmqcontext; +        zmq::socket_t subscriber; +        boost::thread recv_thread; +}; + +class InputZeroMQReader : public InputReader +{ +    public: +        InputZeroMQReader(Logger logger) : +            logger_(logger), in_messages_(10) +        { +            workerdata_.in_messages = &in_messages_; +        } + +        int Open(std::string uri); + +        int GetNextFrame(void* buffer); + +        void PrintInfo(); + +    private: +        Logger logger_; +        std::string uri_; + +        InputZeroMQWorker worker_; +        ThreadsafeQueue<zmq::message_t*> in_messages_; +        struct InputZeroMQThreadData workerdata_; +}; + +#endif  #endif diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp new file mode 100644 index 0000000..e689e4c --- /dev/null +++ b/src/InputZeroMQReader.cpp @@ -0,0 +1,135 @@ +/* +   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 +   Her Majesty the Queen in Right of Canada (Communications Research +   Center Canada) + +   Copyrigth (C) 2013 +   Matthias P. Braendli, matthias.braendli@mpb.li + */ +/* +   This file is part of CRC-DADMOD. + +   CRC-DADMOD is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   CRC-DADMOD is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with CRC-DADMOD.  If not, see <http://www.gnu.org/licenses/>. + */ + +#ifdef HAVE_CONFIG_H +#   include "config.h" +#endif + +#if defined(HAVE_INPUT_ZEROMQ) + +#include <string> +#include <cstring> +#include <cstdio> +#include <stdint.h> +#include <zmq.hpp> +#include <boost/thread/thread.hpp> +#include "porting.h" +#include "InputReader.h" +#include "PcDebug.h" + +#define MAX_QUEUE_SIZE 50 + +int InputZeroMQReader::Open(std::string uri) +{ +    uri_ = uri; +    workerdata_.uri = uri; +    // launch receiver thread +    worker_.Start(&workerdata_); + +    return 0; +} + +int InputZeroMQReader::GetNextFrame(void* buffer) +{ +    zmq::message_t* incoming; +    in_messages_.wait_and_pop(incoming); + +    size_t framesize = incoming->size(); + +    // guarantee that we never will write more than 6144 bytes +    if (framesize > 6144) { +        fprintf(stderr, "ZeroMQ message too large: %zu!\n", framesize); +        logger_.level(error) << "ZeroMQ message too large" << framesize; +        return -1; +    } + +    memcpy(buffer, incoming->data(), framesize); + +    delete incoming; + +    // pad to 6144 bytes +    memset(&((uint8_t*)buffer)[framesize], 0x55, 6144 - framesize); + + +    return 6144; +} + +void InputZeroMQReader::PrintInfo() +{ +    fprintf(stderr, "Input ZeroMQ:\n"); +    fprintf(stderr, "  Receiving from %s\n\n", uri_.c_str()); +} + +// ------------- Worker functions + +void InputZeroMQWorker::RecvProcess(struct InputZeroMQThreadData* workerdata) +{ +    size_t queue_size = 0; + +    try { +        subscriber.connect(workerdata->uri.c_str()); + +        subscriber.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages + +        while (running) +        { +            zmq::message_t incoming; +            subscriber.recv(&incoming); + +            if (queue_size < MAX_QUEUE_SIZE) { +                zmq::message_t* holder = new zmq::message_t(); +                holder->move(&incoming); // move the message into the holder +                queue_size = workerdata->in_messages->push(holder); +            } +            else +            { +                workerdata->in_messages->notify(); +                fprintf(stderr, "ZeroMQ message overfull: %zu elements !\n", queue_size); +            } + +            if (queue_size < 5) { +                fprintf(stderr, "ZeroMQ message underfull: %zu elements !\n", queue_size); +            } +        } +    } +    catch ( zmq::error_t err ) { +        printf("ZeroMQ error in RecvProcess: '%s'\n", err.what()); +    } +} + +void InputZeroMQWorker::Start(struct InputZeroMQThreadData* workerdata) +{ +    running = true; +    recv_thread = boost::thread(&InputZeroMQWorker::RecvProcess, this, workerdata); +} + +void InputZeroMQWorker::Stop() +{ +    subscriber.close(); +    running = false; +} + +#endif + diff --git a/src/Makefile.am b/src/Makefile.am index 0bb9dc9..fa4b5bd 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -22,6 +22,12 @@ else  HGVERSION_FLAGS = -DHGVERSION="\"-modified\""  endif +if HAVE_INPUT_ZEROMQ_TEST +ZMQ_LIBS    =-lzmq +else +ZMQ_LIBS    = +endif +  FFT_DIR=$(top_builddir)/lib/kiss_fft129  FFT_INC=-I$(FFT_DIR) -I$(FFT_DIR)/tools  FFT_SRC=$(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h @@ -41,6 +47,7 @@ $(FFT_DIR):  	fi  crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS) +crc_dabmod_LDADD    = $(ZMQ_LIBS)  crc_dabmod_SOURCES =  DabMod.cpp \                        PcDebug.h \                        porting.c porting.h \ @@ -64,7 +71,7 @@ crc_dabmod_SOURCES =  DabMod.cpp \                        OutputUHD.cpp OutputUHD.h \                        ModOutput.cpp ModOutput.h \                        InputMemory.cpp InputMemory.h \ -					  InputFileReader.cpp InputReader.h \ +					  InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \                        OutputFile.cpp OutputFile.h \                        FrameMultiplexer.cpp FrameMultiplexer.h \                        ModMux.cpp ModMux.h \ diff --git a/src/Makefile.in b/src/Makefile.in index 0a1e7ad..0817c90 100644 --- a/src/Makefile.in +++ b/src/Makefile.in @@ -130,6 +130,7 @@ am_crc_dabmod_OBJECTS = crc_dabmod-DabMod.$(OBJEXT) \  	crc_dabmod-OutputUHD.$(OBJEXT) crc_dabmod-ModOutput.$(OBJEXT) \  	crc_dabmod-InputMemory.$(OBJEXT) \  	crc_dabmod-InputFileReader.$(OBJEXT) \ +	crc_dabmod-InputZeroMQReader.$(OBJEXT) \  	crc_dabmod-OutputFile.$(OBJEXT) \  	crc_dabmod-FrameMultiplexer.$(OBJEXT) \  	crc_dabmod-ModMux.$(OBJEXT) crc_dabmod-PrbsGenerator.$(OBJEXT) \ @@ -153,7 +154,8 @@ am__objects_1 = crc_dabmod-kiss_fft.$(OBJEXT) \  nodist_crc_dabmod_OBJECTS = $(am__objects_1)  crc_dabmod_OBJECTS = $(am_crc_dabmod_OBJECTS) \  	$(nodist_crc_dabmod_OBJECTS) -crc_dabmod_LDADD = $(LDADD) +am__DEPENDENCIES_1 = +crc_dabmod_DEPENDENCIES = $(am__DEPENDENCIES_1)  am__vpath_adj_setup = srcdirstrip=`echo "$(srcdir)" | sed 's|.|.|g'`;  am__vpath_adj = case $$p in \      $(srcdir)/*) f=`echo "$$p" | sed "s|^$$srcdirstrip/||"`;; \ @@ -361,11 +363,14 @@ top_builddir = @top_builddir@  top_srcdir = @top_srcdir@  @IS_HG_REPO_FALSE@HGVERSION_FLAGS = -DHGVERSION="\"-modified\""  @IS_HG_REPO_TRUE@HGVERSION_FLAGS = -DHGVERSION="\"`hg parents --template '-{node|short}'`\"" +@HAVE_INPUT_ZEROMQ_TEST_FALSE@ZMQ_LIBS =  +@HAVE_INPUT_ZEROMQ_TEST_TRUE@ZMQ_LIBS = -lzmq  FFT_DIR = $(top_builddir)/lib/kiss_fft129  FFT_INC = -I$(FFT_DIR) -I$(FFT_DIR)/tools  FFT_SRC = $(FFT_DIR)/kiss_fft.c $(FFT_DIR)/kiss_fft.h $(FFT_DIR)/tools/kiss_fftr.c $(FFT_DIR)/tools/kiss_fftr.h kiss_fftsimd.c kiss_fftsimd.h  FFT_FLG = -ffast-math  crc_dabmod_CPPFLAGS = $(FFT_INC) $(FFT_FLG) -msse -msse2 $(HGVERSION_FLAGS) +crc_dabmod_LDADD = $(ZMQ_LIBS)  crc_dabmod_SOURCES = DabMod.cpp \                        PcDebug.h \                        porting.c porting.h \ @@ -389,7 +394,7 @@ crc_dabmod_SOURCES = DabMod.cpp \                        OutputUHD.cpp OutputUHD.h \                        ModOutput.cpp ModOutput.h \                        InputMemory.cpp InputMemory.h \ -					  InputFileReader.cpp InputReader.h \ +					  InputFileReader.cpp InputZeroMQReader.cpp InputReader.h \                        OutputFile.cpp OutputFile.h \                        FrameMultiplexer.cpp FrameMultiplexer.h \                        ModMux.cpp ModMux.h \ @@ -553,6 +558,7 @@ distclean-compile:  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-GuardIntervalInserter.Po@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputFileReader.Po@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputMemory.Po@am__quote@ +@AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-InputZeroMQReader.Po@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-Log.Po@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModCodec.Po@am__quote@  @AMDEP_TRUE@@am__include@ @am__quote@./$(DEPDIR)/crc_dabmod-ModFormat.Po@am__quote@ @@ -973,6 +979,20 @@ crc_dabmod-InputFileReader.obj: InputFileReader.cpp  @AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@  @am__fastdepCXX_FALSE@	$(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputFileReader.obj `if test -f 'InputFileReader.cpp'; then $(CYGPATH_W) 'InputFileReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputFileReader.cpp'; fi` +crc_dabmod-InputZeroMQReader.o: InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@	$(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.o -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@	$(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	$(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.o' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@	$(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.o `test -f 'InputZeroMQReader.cpp' || echo '$(srcdir)/'`InputZeroMQReader.cpp + +crc_dabmod-InputZeroMQReader.obj: InputZeroMQReader.cpp +@am__fastdepCXX_TRUE@	$(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-InputZeroMQReader.obj -MD -MP -MF $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi` +@am__fastdepCXX_TRUE@	$(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-InputZeroMQReader.Tpo $(DEPDIR)/crc_dabmod-InputZeroMQReader.Po +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	$(AM_V_CXX)source='InputZeroMQReader.cpp' object='crc_dabmod-InputZeroMQReader.obj' libtool=no @AMDEPBACKSLASH@ +@AMDEP_TRUE@@am__fastdepCXX_FALSE@	DEPDIR=$(DEPDIR) $(CXXDEPMODE) $(depcomp) @AMDEPBACKSLASH@ +@am__fastdepCXX_FALSE@	$(AM_V_CXX@am__nodep@)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -c -o crc_dabmod-InputZeroMQReader.obj `if test -f 'InputZeroMQReader.cpp'; then $(CYGPATH_W) 'InputZeroMQReader.cpp'; else $(CYGPATH_W) '$(srcdir)/InputZeroMQReader.cpp'; fi` +  crc_dabmod-OutputFile.o: OutputFile.cpp  @am__fastdepCXX_TRUE@	$(AM_V_CXX)$(CXX) $(DEFS) $(DEFAULT_INCLUDES) $(INCLUDES) $(crc_dabmod_CPPFLAGS) $(CPPFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS) -MT crc_dabmod-OutputFile.o -MD -MP -MF $(DEPDIR)/crc_dabmod-OutputFile.Tpo -c -o crc_dabmod-OutputFile.o `test -f 'OutputFile.cpp' || echo '$(srcdir)/'`OutputFile.cpp  @am__fastdepCXX_TRUE@	$(AM_V_at)$(am__mv) $(DEPDIR)/crc_dabmod-OutputFile.Tpo $(DEPDIR)/crc_dabmod-OutputFile.Po diff --git a/src/OutputFile.cpp b/src/OutputFile.cpp index c411c85..d0d87c8 100644 --- a/src/OutputFile.cpp +++ b/src/OutputFile.cpp @@ -22,20 +22,21 @@  #include "OutputFile.h"  #include "PcDebug.h" +#include <string>  #include <assert.h>  #include <stdexcept> -OutputFile::OutputFile(const char* filename) : +OutputFile::OutputFile(std::string filename) :      ModOutput(ModFormat(1), ModFormat(0)),      myFilename(filename)  {      PDEBUG("OutputFile::OutputFile(filename: %s) @ %p\n", -            filename, this); +            filename.c_str(), this); -    myFile = fopen(filename, "w"); +    myFile = fopen(filename.c_str(), "w");      if (myFile == NULL) { -        perror(filename); +        perror(filename.c_str());          throw std::runtime_error(                  "OutputFile::OutputFile() unable to open file!");      } diff --git a/src/OutputFile.h b/src/OutputFile.h index 1223aef..e4cd91f 100644 --- a/src/OutputFile.h +++ b/src/OutputFile.h @@ -29,6 +29,7 @@  #include "ModOutput.h" +#include <string>  #include <stdio.h>  #include <sys/types.h> @@ -36,14 +37,14 @@  class OutputFile : public ModOutput  {  public: -    OutputFile(const char* filename); +    OutputFile(std::string filename);      virtual ~OutputFile();      virtual int process(Buffer* dataIn, Buffer* dataOut);      const char* name() { return "OutputFile"; }  protected: -    const char* myFilename; +    std::string myFilename;      FILE* myFile;  }; diff --git a/src/ThreadsafeQueue.h b/src/ThreadsafeQueue.h new file mode 100644 index 0000000..d89bf74 --- /dev/null +++ b/src/ThreadsafeQueue.h @@ -0,0 +1,100 @@ +/* +   Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in +   Right of Canada (Communications Research Center Canada) + +   Copyright (C) 2013 +   Matthias P. Braendli, matthias.braendli@mpb.li + +   An implementation for a threadsafe queue using boost thread library + +   When creating a ThreadsafeQueue, one can specify the minimal number +   of elements it must contain before it is possible to take one +   element out. + */ +/* +   This file is part of CRC-DADMOD. + +   CRC-DADMOD is free software: you can redistribute it and/or modify +   it under the terms of the GNU General Public License as +   published by the Free Software Foundation, either version 3 of the +   License, or (at your option) any later version. + +   CRC-DADMOD is distributed in the hope that it will be useful, +   but WITHOUT ANY WARRANTY; without even the implied warranty of +   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the +   GNU General Public License for more details. + +   You should have received a copy of the GNU General Public License +   along with CRC-DADMOD.  If not, see <http://www.gnu.org/licenses/>. + */ + +#ifndef THREADSAFE_QUEUE_H +#define THREADSAFE_QUEUE_H + +#include <boost/thread.hpp> +#include <queue> + +template<typename T> +class ThreadsafeQueue +{ +private: +    std::queue<T> the_queue; +    mutable boost::mutex the_mutex; +    boost::condition_variable the_condition_variable; +    size_t the_required_size; +public: + +    ThreadsafeQueue() : the_required_size(1) {} + +    ThreadsafeQueue(size_t required_size) : the_required_size(required_size) {} + +    size_t push(T const& val) +    { +        boost::mutex::scoped_lock lock(the_mutex); +        the_queue.push(val); +        size_t queue_size = the_queue.size(); +        lock.unlock(); +        the_condition_variable.notify_one(); + +        return queue_size; +    } + +    void notify() +    { +        the_condition_variable.notify_one(); +    } + +    bool empty() const +    { +        boost::mutex::scoped_lock lock(the_mutex); +        return the_queue.empty(); +    } + +    bool try_pop(T& popped_value) +    { +        boost::mutex::scoped_lock lock(the_mutex); +        if(the_queue.size() < the_required_size) +        { +            return false; +        } + +        popped_value = the_queue.front(); +        the_queue.pop(); +        return true; +    } + +    void wait_and_pop(T& popped_value) +    { +        boost::mutex::scoped_lock lock(the_mutex); +        while(the_queue.size() < the_required_size) +        { +            the_condition_variable.wait(lock); +        } + +        popped_value = the_queue.front(); +        the_queue.pop(); +    } +}; + +#endif + | 
