/* Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org */ /* This file is part of ODR-DabMod. ODR-DabMod is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. ODR-DabMod is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with ODR-DabMod. If not, see . */ #pragma once #ifdef HAVE_CONFIG_H # include "config.h" #endif #include #include #include #include #include #include #include #if defined(HAVE_ZEROMQ) # include "zmq.hpp" # include "ThreadsafeQueue.h" # include "RemoteControl.h" #endif #include "Log.h" #include "Socket.h" #define INVALID_SOCKET -1 class InputReader { public: // Put next frame into buffer. This function will never write more than // 6144 bytes into buffer. // returns number of bytes written to buffer, 0 on eof, -1 on error virtual int GetNextFrame(void* buffer) = 0; // Get some information virtual std::string GetPrintableInfo() const = 0; }; class InputFileReader : public InputReader { public: InputFileReader() = default; InputFileReader(const InputFileReader& other) = delete; InputFileReader& operator=(const InputFileReader& other) = delete; // open file and determine stream type // When loop=1, GetNextFrame will never return 0 int Open(std::string filename, bool loop); // Print information about the file opened virtual std::string GetPrintableInfo() const override; virtual int GetNextFrame(void* buffer) override; private: int IdentifyType(); // Rewind the file, and replay anew // returns 0 on success, -1 on failure int Rewind(); bool loop_; // if shall we loop the file over and over std::string filename_; /* Known types of input streams. Description taken from the CRC * mmbTools forum. All values are are little-endian. */ enum class EtiStreamType { /* Not yet identified */ None, /* Raw format is a bit-by-bit (but byte aligned on sync) recording * of a G.703 data stream. The padding is always present. * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI). * Format: for each frame: uint8_t data[6144] */ Raw, /* Streamed format is used for streamed applications. As the total * number of frames is unknown before end of transmission, the * corresponding field is removed. The padding can be removed from * data. * Format: for each frame: uint16_t frameSize uint8_t data[frameSize] */ Streamed, /* Framed format is used for file recording. It is the default format. * The padding can be removed from data. * Format: uint32_t nbFrames for each frame: uint16_t frameSize uint8_t data[frameSize] */ Framed, }; EtiStreamType streamtype_ = EtiStreamType::None; struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; std::unique_ptr inputfile_; size_t inputfilelength_ = 0; uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is // after 2**32 * 24ms ~= 3.3 years }; class InputTcpReader : public InputReader { public: // Endpoint is either host:port or tcp://host:port void Open(const std::string& endpoint); // Put next frame into buffer. This function will never write more than // 6144 bytes into buffer. // returns number of bytes written to buffer, 0 on eof, -1 on error virtual int GetNextFrame(void* buffer) override; virtual std::string GetPrintableInfo() const override; private: Socket::TCPClient m_tcpclient; std::string m_uri; }; struct zmq_input_overflow : public std::exception { const char* what () const throw () { return "InputZMQ buffer overflow"; } }; #if defined(HAVE_ZEROMQ) /* A ZeroMQ input. See www.zeromq.org for more info */ class InputZeroMQReader : public InputReader, public RemoteControllable { public: InputZeroMQReader(); InputZeroMQReader(const InputZeroMQReader& other) = delete; InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete; ~InputZeroMQReader(); int Open(const std::string& uri, size_t max_queued_frames); virtual int GetNextFrame(void* buffer) override; virtual std::string GetPrintableInfo() const override; /* Base function to set parameters. */ virtual void set_parameter( const std::string& parameter, const std::string& value) override; /* Getting a parameter always returns a string. */ virtual const std::string get_parameter( const std::string& parameter) const override; private: std::atomic m_running = ATOMIC_VAR_INIT(false); std::string m_uri; size_t m_max_queued_frames = 0; // Either must contain a full ETI frame, or one flag must be set struct message_t { std::vector eti_frame; bool overflow = false; bool timeout = false; bool fault = false; }; ThreadsafeQueue m_in_messages; mutable std::mutex m_last_in_messages_size_mutex; size_t m_last_in_messages_size = 0; void RecvProcess(void); zmq::context_t m_zmqcontext; // is thread-safe std::thread m_recv_thread; /* We must be careful to keep frame phase consistent. If we * drop a single ETI frame, we will break the transmission * frame vs. ETI frame phase. * * Here we keep track of how many ETI frames we must drop. */ int m_to_drop = 0; }; #endif