aboutsummaryrefslogtreecommitdiffstats
path: root/src/InputReader.h
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-20 01:20:41 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-20 01:20:41 +0100
commit28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d (patch)
treef22941e989bb775aacda52876c97ada7b899a7dd /src/InputReader.h
parent95f556cf0797ab4c23f431e5c8c5accfa7f4c30b (diff)
parentf52b0e13f61a947c26236504ffb4b072352abc04 (diff)
downloaddabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.tar.gz
dabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.tar.bz2
dabmod-28ddaa742d1a815c8c07d17b2a79fbfb964fdc1d.zip
Merge branch 'outputRefactoring' into next
Diffstat (limited to 'src/InputReader.h')
-rw-r--r--src/InputReader.h204
1 files changed, 77 insertions, 127 deletions
diff --git a/src/InputReader.h b/src/InputReader.h
index 7d6b373..07326cf 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -33,6 +33,7 @@
#include <cstdio>
#include <vector>
+#include <atomic>
#include <memory>
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
@@ -40,47 +41,8 @@
#endif
#include "porting.h"
#include "Log.h"
-#include "lib/UdpSocket.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#define SOCKET int
#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-
-/* Known types of input streams. Description taken from the CRC mmbTools forum.
-
- All numbers are little-endian.
-
- Framed format is used for file recording. It is the default format. The
- padding can be removed from data. Format:
- uint32_t nbFrames
- for each frame
- uint16_t frameSize
- uint8_t data[frameSize]
-
- Streamed format is used for streamed applications. As the total number of
- frames is unknown before end of transmission, the corresponding field is
- removed. The padding can be removed from data. Format:
- for each frame
- uint16_t frameSize
- uint8_t data[frameSize]
-
- Raw format is a bit-by-bit (but byte aligned on sync) recording of a G.703
- data stream. The padding is always present. Format:
- for each frame
- uint8_t data[6144]
-
- Please note that our raw format can also be referred to as ETI(NI, G.703) or ETI(NI).
-*/
-enum EtiStreamType {
- ETI_STREAM_TYPE_NONE = 0,
- ETI_STREAM_TYPE_RAW,
- ETI_STREAM_TYPE_STREAMED,
- ETI_STREAM_TYPE_FRAMED,
-};
class InputReader
{
@@ -91,43 +53,25 @@ class InputReader
virtual int GetNextFrame(void* buffer) = 0;
// Print some information
- virtual void PrintInfo() = 0;
+ virtual void PrintInfo() const = 0;
};
class InputFileReader : public InputReader
{
public:
- InputFileReader() :
- streamtype_(ETI_STREAM_TYPE_NONE),
- inputfile_(NULL) { }
-
- ~InputFileReader()
- {
- if (inputfile_ != NULL) {
- fprintf(stderr, "\nClosing input file...\n");
-
- fclose(inputfile_);
- }
- }
+ InputFileReader() = default;
+ InputFileReader(const InputFileReader& other) = delete;
+ InputFileReader& operator=(const InputFileReader& other) = delete;
// open file and determine stream type
// When loop=1, GetNextFrame will never return 0
int Open(std::string filename, bool loop);
// Print information about the file opened
- void PrintInfo();
-
+ void PrintInfo() const;
int GetNextFrame(void* buffer);
- EtiStreamType GetStreamType()
- {
- return streamtype_;
- }
-
private:
- InputFileReader(const InputFileReader& other) = delete;
- InputFileReader& operator=(const InputFileReader& other) = delete;
-
int IdentifyType();
// Rewind the file, and replay anew
@@ -136,19 +80,60 @@ class InputFileReader : public InputReader
bool loop_; // if shall we loop the file over and over
std::string filename_;
- EtiStreamType streamtype_;
- FILE* inputfile_;
- size_t inputfilelength_;
- uint64_t nbframes_; // 64-bit because 32-bit overflow is
- // after 2**32 * 24ms ~= 3.3 years
+ /* Known types of input streams. Description taken from the CRC
+ * mmbTools forum. All values are are little-endian. */
+ enum class EtiStreamType {
+ /* Not yet identified */
+ None,
+
+ /* Raw format is a bit-by-bit (but byte aligned on sync) recording
+ * of a G.703 data stream. The padding is always present.
+ * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI).
+ * Format:
+ for each frame:
+ uint8_t data[6144]
+ */
+ Raw,
+
+ /* Streamed format is used for streamed applications. As the total
+ * number of frames is unknown before end of transmission, the
+ * corresponding field is removed. The padding can be removed from
+ * data.
+ * Format:
+ for each frame:
+ uint16_t frameSize
+ uint8_t data[frameSize]
+ */
+ Streamed,
+
+ /* Framed format is used for file recording. It is the default format.
+ * The padding can be removed from data.
+ * Format:
+ uint32_t nbFrames
+ for each frame:
+ uint16_t frameSize
+ uint8_t data[frameSize]
+ */
+ Framed,
+ };
+
+ EtiStreamType streamtype_ = EtiStreamType::None;
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> inputfile_;
+
+ size_t inputfilelength_ = 0;
+ uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is
+ // after 2**32 * 24ms ~= 3.3 years
};
class InputTcpReader : public InputReader
{
public:
InputTcpReader();
- ~InputTcpReader();
+ InputTcpReader(const InputTcpReader& other) = delete;
+ InputTcpReader& operator=(const InputTcpReader& other) = delete;
+ virtual ~InputTcpReader();
// Endpoint is either host:port or tcp://host:port
void Open(const std::string& endpoint);
@@ -159,89 +144,54 @@ class InputTcpReader : public InputReader
virtual int GetNextFrame(void* buffer);
// Print some information
- virtual void PrintInfo();
+ virtual void PrintInfo() const;
private:
- InputTcpReader(const InputTcpReader& other) = delete;
- InputTcpReader& operator=(const InputTcpReader& other) = delete;
- SOCKET m_sock;
+ int m_sock = INVALID_SOCKET;
std::string m_uri;
};
struct zmq_input_overflow : public std::exception
{
- const char* what () const throw ()
- {
- return "InputZMQ buffer overflow";
- }
+ const char* what () const throw ()
+ {
+ return "InputZMQ buffer overflow";
+ }
};
#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */
-struct InputZeroMQThreadData
-{
- ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > *in_messages;
- std::string uri;
- size_t max_queued_frames;
-};
-
-class InputZeroMQWorker
+class InputZeroMQReader : public InputReader
{
public:
- InputZeroMQWorker() :
- running(false),
- zmqcontext(1),
- m_to_drop(0) { }
+ InputZeroMQReader() = default;
+ InputZeroMQReader(const InputZeroMQReader& other) = delete;
+ InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
+ ~InputZeroMQReader();
- void Start(struct InputZeroMQThreadData* workerdata);
- void Stop();
+ int Open(const std::string& uri, size_t max_queued_frames);
+ int GetNextFrame(void* buffer);
+ void PrintInfo() const;
- bool is_running(void) { return running; }
private:
- bool running;
+ std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
+ std::string m_uri;
+ size_t m_max_queued_frames = 0;
+ ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > m_in_messages;
- void RecvProcess(struct InputZeroMQThreadData* workerdata);
+ void RecvProcess(void);
- zmq::context_t zmqcontext; // is thread-safe
- boost::thread recv_thread;
+ zmq::context_t m_zmqcontext; // is thread-safe
+ boost::thread m_recv_thread;
/* We must be careful to keep frame phase consistent. If we
* drop a single ETI frame, we will break the transmission
* frame vs. ETI frame phase.
*
- * Here we keep track of how many ETI frames we must drop
+ * Here we keep track of how many ETI frames we must drop.
*/
- int m_to_drop;
-};
-
-class InputZeroMQReader : public InputReader
-{
- public:
- InputZeroMQReader()
- {
- workerdata_.in_messages = &in_messages_;
- }
-
- ~InputZeroMQReader()
- {
- worker_.Stop();
- }
-
- int Open(const std::string& uri, size_t max_queued_frames);
-
- int GetNextFrame(void* buffer);
-
- void PrintInfo();
-
- private:
- InputZeroMQReader(const InputZeroMQReader& other) = delete;
- InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
- std::string uri_;
-
- InputZeroMQWorker worker_;
- ThreadsafeQueue<std::shared_ptr<std::vector<uint8_t> > > in_messages_;
- struct InputZeroMQThreadData workerdata_;
+ int m_to_drop = 0;
};
#endif