diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-01-20 01:20:41 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2018-01-20 01:20:41 +0100 |
commit | 28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d (patch) | |
tree | f22941e989bb775aacda52876c97ada7b899a7dd /src/InputReader.h | |
parent | 95f556cf0797ab4c23f431e5c8c5accfa7f4c30b (diff) | |
parent | f52b0e13f61a947c26236504ffb4b072352abc04 (diff) | |
download | dabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.tar.gz dabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.tar.bz2 dabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.zip |
Merge branch 'outputRefactoring' into next
Diffstat (limited to 'src/InputReader.h')
-rw-r--r-- | src/InputReader.h | 204 |
1 files changed, 77 insertions, 127 deletions
diff --git a/src/InputReader.h b/src/InputReader.h index 7d6b373..07326cf 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -33,6 +33,7 @@ #include <cstdio> #include <vector> +#include <atomic> #include <memory> #if defined(HAVE_ZEROMQ) # include "zmq.hpp" @@ -40,47 +41,8 @@ #endif #include "porting.h" #include "Log.h" -#include "lib/UdpSocket.h" -#include <sys/socket.h> -#include <netinet/in.h> #include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#define SOCKET int #define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 - -/* Known types of input streams. Description taken from the CRC mmbTools forum. - - All numbers are little-endian. - - Framed format is used for file recording. It is the default format. The - padding can be removed from data. Format: - uint32_t nbFrames - for each frame - uint16_t frameSize - uint8_t data[frameSize] - - Streamed format is used for streamed applications. As the total number of - frames is unknown before end of transmission, the corresponding field is - removed. The padding can be removed from data. Format: - for each frame - uint16_t frameSize - uint8_t data[frameSize] - - Raw format is a bit-by-bit (but byte aligned on sync) recording of a G.703 - data stream. The padding is always present. Format: - for each frame - uint8_t data[6144] - - Please note that our raw format can also be referred to as ETI(NI, G.703) or ETI(NI). -*/ -enum EtiStreamType { - ETI_STREAM_TYPE_NONE = 0, - ETI_STREAM_TYPE_RAW, - ETI_STREAM_TYPE_STREAMED, - ETI_STREAM_TYPE_FRAMED, -}; class InputReader { @@ -91,43 +53,25 @@ class InputReader virtual int GetNextFrame(void* buffer) = 0; // Print some information - virtual void PrintInfo() = 0; + virtual void PrintInfo() const = 0; }; class InputFileReader : public InputReader { public: - InputFileReader() : - streamtype_(ETI_STREAM_TYPE_NONE), - inputfile_(NULL) { } - - ~InputFileReader() - { - if (inputfile_ != NULL) { - fprintf(stderr, "\nClosing input file...\n"); - - fclose(inputfile_); - } - } + InputFileReader() = default; + InputFileReader(const InputFileReader& other) = delete; + InputFileReader& operator=(const InputFileReader& other) = delete; // open file and determine stream type // When loop=1, GetNextFrame will never return 0 int Open(std::string filename, bool loop); // Print information about the file opened - void PrintInfo(); - + void PrintInfo() const; int GetNextFrame(void* buffer); - EtiStreamType GetStreamType() - { - return streamtype_; - } - private: - InputFileReader(const InputFileReader& other) = delete; - InputFileReader& operator=(const InputFileReader& other) = delete; - int IdentifyType(); // Rewind the file, and replay anew @@ -136,19 +80,60 @@ class InputFileReader : public InputReader bool loop_; // if shall we loop the file over and over std::string filename_; - EtiStreamType streamtype_; - FILE* inputfile_; - size_t inputfilelength_; - uint64_t nbframes_; // 64-bit because 32-bit overflow is - // after 2**32 * 24ms ~= 3.3 years + /* Known types of input streams. Description taken from the CRC + * mmbTools forum. All values are are little-endian. */ + enum class EtiStreamType { + /* Not yet identified */ + None, + + /* Raw format is a bit-by-bit (but byte aligned on sync) recording + * of a G.703 data stream. The padding is always present. + * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI). + * Format: + for each frame: + uint8_t data[6144] + */ + Raw, + + /* Streamed format is used for streamed applications. As the total + * number of frames is unknown before end of transmission, the + * corresponding field is removed. The padding can be removed from + * data. + * Format: + for each frame: + uint16_t frameSize + uint8_t data[frameSize] + */ + Streamed, + + /* Framed format is used for file recording. It is the default format. + * The padding can be removed from data. + * Format: + uint32_t nbFrames + for each frame: + uint16_t frameSize + uint8_t data[frameSize] + */ + Framed, + }; + + EtiStreamType streamtype_ = EtiStreamType::None; + struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; + std::unique_ptr<FILE, FILEDeleter> inputfile_; + + size_t inputfilelength_ = 0; + uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is + // after 2**32 * 24ms ~= 3.3 years }; class InputTcpReader : public InputReader { public: InputTcpReader(); - ~InputTcpReader(); + InputTcpReader(const InputTcpReader& other) = delete; + InputTcpReader& operator=(const InputTcpReader& other) = delete; + virtual ~InputTcpReader(); // Endpoint is either host:port or tcp://host:port void Open(const std::string& endpoint); @@ -159,89 +144,54 @@ class InputTcpReader : public InputReader virtual int GetNextFrame(void* buffer); // Print some information - virtual void PrintInfo(); + virtual void PrintInfo() const; private: - InputTcpReader(const InputTcpReader& other) = delete; - InputTcpReader& operator=(const InputTcpReader& other) = delete; - SOCKET m_sock; + int m_sock = INVALID_SOCKET; std::string m_uri; }; struct zmq_input_overflow : public std::exception { - const char* what () const throw () - { - return "InputZMQ buffer overflow"; - } + const char* what () const throw () + { + return "InputZMQ buffer overflow"; + } }; #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ -struct InputZeroMQThreadData -{ - ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages; - std::string uri; - size_t max_queued_frames; -}; - -class InputZeroMQWorker +class InputZeroMQReader : public InputReader { public: - InputZeroMQWorker() : - running(false), - zmqcontext(1), - m_to_drop(0) { } + InputZeroMQReader() = default; + InputZeroMQReader(const InputZeroMQReader& other) = delete; + InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; + ~InputZeroMQReader(); - void Start(struct InputZeroMQThreadData* workerdata); - void Stop(); + int Open(const std::string& uri, size_t max_queued_frames); + int GetNextFrame(void* buffer); + void PrintInfo() const; - bool is_running(void) { return running; } private: - bool running; + std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); + std::string m_uri; + size_t m_max_queued_frames = 0; + ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages; - void RecvProcess(struct InputZeroMQThreadData* workerdata); + void RecvProcess(void); - zmq::context_t zmqcontext; // is thread-safe - boost::thread recv_thread; + zmq::context_t m_zmqcontext; // is thread-safe + boost::thread m_recv_thread; /* We must be careful to keep frame phase consistent. If we * drop a single ETI frame, we will break the transmission * frame vs. ETI frame phase. * - * Here we keep track of how many ETI frames we must drop + * Here we keep track of how many ETI frames we must drop. */ - int m_to_drop; -}; - -class InputZeroMQReader : public InputReader -{ - public: - InputZeroMQReader() - { - workerdata_.in_messages = &in_messages_; - } - - ~InputZeroMQReader() - { - worker_.Stop(); - } - - int Open(const std::string& uri, size_t max_queued_frames); - - int GetNextFrame(void* buffer); - - void PrintInfo(); - - private: - InputZeroMQReader(const InputZeroMQReader& other) = delete; - InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; - std::string uri_; - - InputZeroMQWorker worker_; - ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > in_messages_; - struct InputZeroMQThreadData workerdata_; + int m_to_drop = 0; }; #endif |