diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/InputFileReader.cpp | 67 | ||||
-rw-r--r-- | src/InputReader.h | 150 | ||||
-rw-r--r-- | src/InputTcpReader.cpp | 7 | ||||
-rw-r--r-- | src/InputZeroMQReader.cpp | 2 |
4 files changed, 104 insertions, 122 deletions
diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp index 84f0be4..5e93477 100644 --- a/src/InputFileReader.cpp +++ b/src/InputFileReader.cpp @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyrigth (C) 2013 + Copyrigth (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li @@ -47,28 +47,29 @@ int InputFileReader::Open(std::string filename, bool loop) { filename_ = filename; loop_ = loop; - inputfile_ = fopen(filename_.c_str(), "r"); - if (inputfile_ == NULL) { + FILE* fd = fopen(filename_.c_str(), "r"); + if (fd == nullptr) { etiLog.level(error) << "Unable to open input file!"; perror(filename_.c_str()); return -1; } + inputfile_.reset(fd); return IdentifyType(); } int InputFileReader::Rewind() { - rewind(inputfile_); // Also clears the EOF flag + rewind(inputfile_.get()); // Also clears the EOF flag return IdentifyType(); } int InputFileReader::IdentifyType() { - EtiStreamType streamType = ETI_STREAM_TYPE_NONE; + EtiStreamType streamType = EtiStreamType::None; struct stat inputFileStat; - fstat(fileno(inputfile_), &inputFileStat); + fstat(fileno(inputfile_.get()), &inputFileStat); inputfilelength_ = inputFileStat.st_size; uint32_t sync; @@ -77,22 +78,22 @@ int InputFileReader::IdentifyType() char discard_buffer[6144]; - if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) { + if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read sync in input file!"; perror(filename_.c_str()); return -1; } if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { - streamType = ETI_STREAM_TYPE_RAW; + streamType = EtiStreamType::Raw; if (inputfilelength_ > 0) { nbframes_ = inputfilelength_ / 6144; } else { nbframes_ = ~0; } - if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) { + if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) { // if the seek fails, consume the rest of the frame - if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_) + if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); @@ -104,7 +105,7 @@ int InputFileReader::IdentifyType() } nbFrames = sync; - if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { + if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read frame size in input file!"; perror(filename_.c_str()); return -1; @@ -114,7 +115,7 @@ int InputFileReader::IdentifyType() sync |= ((uint32_t)frameSize) << 16; if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { - streamType = ETI_STREAM_TYPE_STREAMED; + streamType = EtiStreamType::Streamed; frameSize = nbFrames & 0xffff; if (inputfilelength_ > 0) { nbframes_ = inputfilelength_ / (frameSize + 2); @@ -122,9 +123,9 @@ int InputFileReader::IdentifyType() else { nbframes_ = ~0; } - if (fseek(inputfile_, -6, SEEK_CUR) != 0) { + if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) { // if the seek fails, consume the rest of the frame - if (fread(discard_buffer, frameSize - 4, 1, inputfile_) + if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); @@ -135,16 +136,16 @@ int InputFileReader::IdentifyType() return 0; } - if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) { + if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read nb frame in input file!"; perror(filename_.c_str()); return -1; } if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { - streamType = ETI_STREAM_TYPE_FRAMED; - if (fseek(inputfile_, -6, SEEK_CUR) != 0) { + streamType = EtiStreamType::Framed; + if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) { // if the seek fails, consume the rest of the frame - if (fread(discard_buffer, frameSize - 4, 1, inputfile_) + if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); @@ -160,21 +161,21 @@ int InputFileReader::IdentifyType() for (size_t i = 10; i < 6144 + 10; ++i) { sync >>= 8; sync &= 0xffffff; - if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_) != 1) { + if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); return -1; } if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) { - streamType = ETI_STREAM_TYPE_RAW; + streamType = EtiStreamType::Raw; if (inputfilelength_ > 0) { nbframes_ = (inputfilelength_ - i) / 6144; } else { nbframes_ = ~0; } - if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) { - if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_) + if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) { + if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get()) != 1) { etiLog.level(error) << "Unable to read from input file!"; perror(filename_.c_str()); @@ -190,17 +191,17 @@ int InputFileReader::IdentifyType() return -1; } -void InputFileReader::PrintInfo() +void InputFileReader::PrintInfo() const { fprintf(stderr, "Input file format: "); switch (streamtype_) { - case ETI_STREAM_TYPE_RAW: + case EtiStreamType::Raw: fprintf(stderr, "raw"); break; - case ETI_STREAM_TYPE_STREAMED: + case EtiStreamType::Streamed: fprintf(stderr, "streamed"); break; - case ETI_STREAM_TYPE_FRAMED: + case EtiStreamType::Framed: fprintf(stderr, "framed"); break; default: @@ -221,15 +222,15 @@ int InputFileReader::GetNextFrame(void* buffer) { uint16_t frameSize; - if (streamtype_ == ETI_STREAM_TYPE_RAW) { + if (streamtype_ == EtiStreamType::Raw) { frameSize = 6144; } else { - if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { + if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) { etiLog.level(error) << "Reached end of file."; if (loop_) { if (Rewind() == 0) { - if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) { + if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) { PDEBUG("Error after rewinding file!\n"); etiLog.level(error) << "Error after rewinding file!"; return -1; @@ -252,15 +253,15 @@ int InputFileReader::GetNextFrame(void* buffer) } PDEBUG("Frame size: %u\n", frameSize); - size_t read_bytes = fread(buffer, 1, frameSize, inputfile_); + size_t read_bytes = fread(buffer, 1, frameSize, inputfile_.get()); if ( loop_ && - streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144 - read_bytes == 0 && feof(inputfile_)) { + streamtype_ == EtiStreamType::Raw && //implies frameSize == 6144 + read_bytes == 0 && feof(inputfile_.get())) { // in case of an EOF from a RAW that we loop, rewind // otherwise, we won't tolerate it if (Rewind() == 0) { - read_bytes = fread(buffer, 1, frameSize, inputfile_); + read_bytes = fread(buffer, 1, frameSize, inputfile_.get()); } else { PDEBUG("Impossible to rewind file!\n"); diff --git a/src/InputReader.h b/src/InputReader.h index 7d6b373..c897c2d 100644 --- a/src/InputReader.h +++ b/src/InputReader.h @@ -3,7 +3,7 @@ Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 + Copyright (C) 2018 Matthias P. Braendli, matthias.braendli@mpb.li http://www.opendigitalradio.org @@ -33,6 +33,7 @@ #include <cstdio> #include <vector> +#include <atomic> #include <memory> #if defined(HAVE_ZEROMQ) # include "zmq.hpp" @@ -40,47 +41,8 @@ #endif #include "porting.h" #include "Log.h" -#include "lib/UdpSocket.h" -#include <sys/socket.h> -#include <netinet/in.h> #include <unistd.h> -#include <netdb.h> -#include <arpa/inet.h> -#define SOCKET int #define INVALID_SOCKET -1 -#define SOCKET_ERROR -1 - -/* Known types of input streams. Description taken from the CRC mmbTools forum. - - All numbers are little-endian. - - Framed format is used for file recording. It is the default format. The - padding can be removed from data. Format: - uint32_t nbFrames - for each frame - uint16_t frameSize - uint8_t data[frameSize] - - Streamed format is used for streamed applications. As the total number of - frames is unknown before end of transmission, the corresponding field is - removed. The padding can be removed from data. Format: - for each frame - uint16_t frameSize - uint8_t data[frameSize] - - Raw format is a bit-by-bit (but byte aligned on sync) recording of a G.703 - data stream. The padding is always present. Format: - for each frame - uint8_t data[6144] - - Please note that our raw format can also be referred to as ETI(NI, G.703) or ETI(NI). -*/ -enum EtiStreamType { - ETI_STREAM_TYPE_NONE = 0, - ETI_STREAM_TYPE_RAW, - ETI_STREAM_TYPE_STREAMED, - ETI_STREAM_TYPE_FRAMED, -}; class InputReader { @@ -91,43 +53,25 @@ class InputReader virtual int GetNextFrame(void* buffer) = 0; // Print some information - virtual void PrintInfo() = 0; + virtual void PrintInfo() const = 0; }; class InputFileReader : public InputReader { public: - InputFileReader() : - streamtype_(ETI_STREAM_TYPE_NONE), - inputfile_(NULL) { } - - ~InputFileReader() - { - if (inputfile_ != NULL) { - fprintf(stderr, "\nClosing input file...\n"); - - fclose(inputfile_); - } - } + InputFileReader() = default; + InputFileReader(const InputFileReader& other) = delete; + InputFileReader& operator=(const InputFileReader& other) = delete; // open file and determine stream type // When loop=1, GetNextFrame will never return 0 int Open(std::string filename, bool loop); // Print information about the file opened - void PrintInfo(); - + void PrintInfo() const; int GetNextFrame(void* buffer); - EtiStreamType GetStreamType() - { - return streamtype_; - } - private: - InputFileReader(const InputFileReader& other) = delete; - InputFileReader& operator=(const InputFileReader& other) = delete; - int IdentifyType(); // Rewind the file, and replay anew @@ -136,19 +80,60 @@ class InputFileReader : public InputReader bool loop_; // if shall we loop the file over and over std::string filename_; - EtiStreamType streamtype_; - FILE* inputfile_; - size_t inputfilelength_; - uint64_t nbframes_; // 64-bit because 32-bit overflow is - // after 2**32 * 24ms ~= 3.3 years + /* Known types of input streams. Description taken from the CRC + * mmbTools forum. All values are are little-endian. */ + enum class EtiStreamType { + /* Not yet identified */ + None, + + /* Raw format is a bit-by-bit (but byte aligned on sync) recording + * of a G.703 data stream. The padding is always present. + * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI). + * Format: + for each frame: + uint8_t data[6144] + */ + Raw, + + /* Streamed format is used for streamed applications. As the total + * number of frames is unknown before end of transmission, the + * corresponding field is removed. The padding can be removed from + * data. + * Format: + for each frame: + uint16_t frameSize + uint8_t data[frameSize] + */ + Streamed, + + /* Framed format is used for file recording. It is the default format. + * The padding can be removed from data. + * Format: + uint32_t nbFrames + for each frame: + uint16_t frameSize + uint8_t data[frameSize] + */ + Framed, + }; + + EtiStreamType streamtype_ = EtiStreamType::None; + struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}}; + std::unique_ptr<FILE, FILEDeleter> inputfile_; + + size_t inputfilelength_ = 0; + uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is + // after 2**32 * 24ms ~= 3.3 years }; class InputTcpReader : public InputReader { public: InputTcpReader(); - ~InputTcpReader(); + InputTcpReader(const InputTcpReader& other) = delete; + InputTcpReader& operator=(const InputTcpReader& other) = delete; + virtual ~InputTcpReader(); // Endpoint is either host:port or tcp://host:port void Open(const std::string& endpoint); @@ -159,21 +144,19 @@ class InputTcpReader : public InputReader virtual int GetNextFrame(void* buffer); // Print some information - virtual void PrintInfo(); + virtual void PrintInfo() const; private: - InputTcpReader(const InputTcpReader& other) = delete; - InputTcpReader& operator=(const InputTcpReader& other) = delete; - SOCKET m_sock; + int m_sock = INVALID_SOCKET; std::string m_uri; }; struct zmq_input_overflow : public std::exception { - const char* what () const throw () - { - return "InputZMQ buffer overflow"; - } + const char* what () const throw () + { + return "InputZMQ buffer overflow"; + } }; #if defined(HAVE_ZEROMQ) @@ -189,17 +172,12 @@ struct InputZeroMQThreadData class InputZeroMQWorker { public: - InputZeroMQWorker() : - running(false), - zmqcontext(1), - m_to_drop(0) { } - void Start(struct InputZeroMQThreadData* workerdata); void Stop(); + bool is_running(void) const { return running; } - bool is_running(void) { return running; } private: - bool running; + std::atomic<bool> running = ATOMIC_VAR_INIT(false); void RecvProcess(struct InputZeroMQThreadData* workerdata); @@ -212,7 +190,7 @@ class InputZeroMQWorker * * Here we keep track of how many ETI frames we must drop */ - int m_to_drop; + int m_to_drop = 0; }; class InputZeroMQReader : public InputReader @@ -232,7 +210,7 @@ class InputZeroMQReader : public InputReader int GetNextFrame(void* buffer); - void PrintInfo(); + void PrintInfo() const; private: InputZeroMQReader(const InputZeroMQReader& other) = delete; diff --git a/src/InputTcpReader.cpp b/src/InputTcpReader.cpp index 94ec0ad..9a93ad1 100644 --- a/src/InputTcpReader.cpp +++ b/src/InputTcpReader.cpp @@ -32,9 +32,12 @@ #include "InputReader.h" #include "PcDebug.h" #include "Utils.h" -#include <sys/socket.h> #include <unistd.h> #include <errno.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <netdb.h> +#include <arpa/inet.h> InputTcpReader::InputTcpReader() { @@ -126,7 +129,7 @@ int InputTcpReader::GetNextFrame(void* buffer) return r; } -void InputTcpReader::PrintInfo() +void InputTcpReader::PrintInfo() const { fprintf(stderr, "Input TCP:\n"); fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str()); diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp index 783f0f5..5d0e513 100644 --- a/src/InputZeroMQReader.cpp +++ b/src/InputZeroMQReader.cpp @@ -120,7 +120,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer) return framesize; } -void InputZeroMQReader::PrintInfo() +void InputZeroMQReader::PrintInfo() const { fprintf(stderr, "Input ZeroMQ:\n"); fprintf(stderr, " Receiving from %s\n\n", uri_.c_str()); |