summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-16 10:55:15 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-01-16 10:55:15 +0100
commitf5820b347ea6920764023d6cf71f7a254bd7106d (patch)
treee305b1c79beb56f205f917f9be2399f0779a1d30 /src
parentbcf39bd3ff478deae0dcc51f1021ceb8700c22cc (diff)
downloaddabmod-f5820b347ea6920764023d6cf71f7a254bd7106d.tar.gz
dabmod-f5820b347ea6920764023d6cf71f7a254bd7106d.tar.bz2
dabmod-f5820b347ea6920764023d6cf71f7a254bd7106d.zip
Do some InputReader cleanup
Diffstat (limited to 'src')
-rw-r--r--src/InputFileReader.cpp67
-rw-r--r--src/InputReader.h150
-rw-r--r--src/InputTcpReader.cpp7
-rw-r--r--src/InputZeroMQReader.cpp2
4 files changed, 104 insertions, 122 deletions
diff --git a/src/InputFileReader.cpp b/src/InputFileReader.cpp
index 84f0be4..5e93477 100644
--- a/src/InputFileReader.cpp
+++ b/src/InputFileReader.cpp
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyrigth (C) 2013
+ Copyrigth (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
@@ -47,28 +47,29 @@ int InputFileReader::Open(std::string filename, bool loop)
{
filename_ = filename;
loop_ = loop;
- inputfile_ = fopen(filename_.c_str(), "r");
- if (inputfile_ == NULL) {
+ FILE* fd = fopen(filename_.c_str(), "r");
+ if (fd == nullptr) {
etiLog.level(error) << "Unable to open input file!";
perror(filename_.c_str());
return -1;
}
+ inputfile_.reset(fd);
return IdentifyType();
}
int InputFileReader::Rewind()
{
- rewind(inputfile_); // Also clears the EOF flag
+ rewind(inputfile_.get()); // Also clears the EOF flag
return IdentifyType();
}
int InputFileReader::IdentifyType()
{
- EtiStreamType streamType = ETI_STREAM_TYPE_NONE;
+ EtiStreamType streamType = EtiStreamType::None;
struct stat inputFileStat;
- fstat(fileno(inputfile_), &inputFileStat);
+ fstat(fileno(inputfile_.get()), &inputFileStat);
inputfilelength_ = inputFileStat.st_size;
uint32_t sync;
@@ -77,22 +78,22 @@ int InputFileReader::IdentifyType()
char discard_buffer[6144];
- if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
+ if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read sync in input file!";
perror(filename_.c_str());
return -1;
}
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_RAW;
+ streamType = EtiStreamType::Raw;
if (inputfilelength_ > 0) {
nbframes_ = inputfilelength_ / 6144;
}
else {
nbframes_ = ~0;
}
- if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) {
+ if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) {
// if the seek fails, consume the rest of the frame
- if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
+ if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -104,7 +105,7 @@ int InputFileReader::IdentifyType()
}
nbFrames = sync;
- if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
+ if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read frame size in input file!";
perror(filename_.c_str());
return -1;
@@ -114,7 +115,7 @@ int InputFileReader::IdentifyType()
sync |= ((uint32_t)frameSize) << 16;
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_STREAMED;
+ streamType = EtiStreamType::Streamed;
frameSize = nbFrames & 0xffff;
if (inputfilelength_ > 0) {
nbframes_ = inputfilelength_ / (frameSize + 2);
@@ -122,9 +123,9 @@ int InputFileReader::IdentifyType()
else {
nbframes_ = ~0;
}
- if (fseek(inputfile_, -6, SEEK_CUR) != 0) {
+ if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) {
// if the seek fails, consume the rest of the frame
- if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
+ if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -135,16 +136,16 @@ int InputFileReader::IdentifyType()
return 0;
}
- if (fread(&sync, sizeof(sync), 1, inputfile_) != 1) {
+ if (fread(&sync, sizeof(sync), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read nb frame in input file!";
perror(filename_.c_str());
return -1;
}
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_FRAMED;
- if (fseek(inputfile_, -6, SEEK_CUR) != 0) {
+ streamType = EtiStreamType::Framed;
+ if (fseek(inputfile_.get(), -6, SEEK_CUR) != 0) {
// if the seek fails, consume the rest of the frame
- if (fread(discard_buffer, frameSize - 4, 1, inputfile_)
+ if (fread(discard_buffer, frameSize - 4, 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -160,21 +161,21 @@ int InputFileReader::IdentifyType()
for (size_t i = 10; i < 6144 + 10; ++i) {
sync >>= 8;
sync &= 0xffffff;
- if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_) != 1) {
+ if (fread((uint8_t*)&sync + 3, 1, 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
return -1;
}
if ((sync == 0x49c5f8ff) || (sync == 0xb63a07ff)) {
- streamType = ETI_STREAM_TYPE_RAW;
+ streamType = EtiStreamType::Raw;
if (inputfilelength_ > 0) {
nbframes_ = (inputfilelength_ - i) / 6144;
}
else {
nbframes_ = ~0;
}
- if (fseek(inputfile_, -sizeof(sync), SEEK_CUR) != 0) {
- if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_)
+ if (fseek(inputfile_.get(), -sizeof(sync), SEEK_CUR) != 0) {
+ if (fread(discard_buffer, 6144 - sizeof(sync), 1, inputfile_.get())
!= 1) {
etiLog.level(error) << "Unable to read from input file!";
perror(filename_.c_str());
@@ -190,17 +191,17 @@ int InputFileReader::IdentifyType()
return -1;
}
-void InputFileReader::PrintInfo()
+void InputFileReader::PrintInfo() const
{
fprintf(stderr, "Input file format: ");
switch (streamtype_) {
- case ETI_STREAM_TYPE_RAW:
+ case EtiStreamType::Raw:
fprintf(stderr, "raw");
break;
- case ETI_STREAM_TYPE_STREAMED:
+ case EtiStreamType::Streamed:
fprintf(stderr, "streamed");
break;
- case ETI_STREAM_TYPE_FRAMED:
+ case EtiStreamType::Framed:
fprintf(stderr, "framed");
break;
default:
@@ -221,15 +222,15 @@ int InputFileReader::GetNextFrame(void* buffer)
{
uint16_t frameSize;
- if (streamtype_ == ETI_STREAM_TYPE_RAW) {
+ if (streamtype_ == EtiStreamType::Raw) {
frameSize = 6144;
}
else {
- if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
+ if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {
etiLog.level(error) << "Reached end of file.";
if (loop_) {
if (Rewind() == 0) {
- if (fread(&frameSize, sizeof(frameSize), 1, inputfile_) != 1) {
+ if (fread(&frameSize, sizeof(frameSize), 1, inputfile_.get()) != 1) {
PDEBUG("Error after rewinding file!\n");
etiLog.level(error) << "Error after rewinding file!";
return -1;
@@ -252,15 +253,15 @@ int InputFileReader::GetNextFrame(void* buffer)
}
PDEBUG("Frame size: %u\n", frameSize);
- size_t read_bytes = fread(buffer, 1, frameSize, inputfile_);
+ size_t read_bytes = fread(buffer, 1, frameSize, inputfile_.get());
if ( loop_ &&
- streamtype_ == ETI_STREAM_TYPE_RAW && //implies frameSize == 6144
- read_bytes == 0 && feof(inputfile_)) {
+ streamtype_ == EtiStreamType::Raw && //implies frameSize == 6144
+ read_bytes == 0 && feof(inputfile_.get())) {
// in case of an EOF from a RAW that we loop, rewind
// otherwise, we won't tolerate it
if (Rewind() == 0) {
- read_bytes = fread(buffer, 1, frameSize, inputfile_);
+ read_bytes = fread(buffer, 1, frameSize, inputfile_.get());
}
else {
PDEBUG("Impossible to rewind file!\n");
diff --git a/src/InputReader.h b/src/InputReader.h
index 7d6b373..c897c2d 100644
--- a/src/InputReader.h
+++ b/src/InputReader.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2016
+ Copyright (C) 2018
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -33,6 +33,7 @@
#include <cstdio>
#include <vector>
+#include <atomic>
#include <memory>
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
@@ -40,47 +41,8 @@
#endif
#include "porting.h"
#include "Log.h"
-#include "lib/UdpSocket.h"
-#include <sys/socket.h>
-#include <netinet/in.h>
#include <unistd.h>
-#include <netdb.h>
-#include <arpa/inet.h>
-#define SOCKET int
#define INVALID_SOCKET -1
-#define SOCKET_ERROR -1
-
-/* Known types of input streams. Description taken from the CRC mmbTools forum.
-
- All numbers are little-endian.
-
- Framed format is used for file recording. It is the default format. The
- padding can be removed from data. Format:
- uint32_t nbFrames
- for each frame
- uint16_t frameSize
- uint8_t data[frameSize]
-
- Streamed format is used for streamed applications. As the total number of
- frames is unknown before end of transmission, the corresponding field is
- removed. The padding can be removed from data. Format:
- for each frame
- uint16_t frameSize
- uint8_t data[frameSize]
-
- Raw format is a bit-by-bit (but byte aligned on sync) recording of a G.703
- data stream. The padding is always present. Format:
- for each frame
- uint8_t data[6144]
-
- Please note that our raw format can also be referred to as ETI(NI, G.703) or ETI(NI).
-*/
-enum EtiStreamType {
- ETI_STREAM_TYPE_NONE = 0,
- ETI_STREAM_TYPE_RAW,
- ETI_STREAM_TYPE_STREAMED,
- ETI_STREAM_TYPE_FRAMED,
-};
class InputReader
{
@@ -91,43 +53,25 @@ class InputReader
virtual int GetNextFrame(void* buffer) = 0;
// Print some information
- virtual void PrintInfo() = 0;
+ virtual void PrintInfo() const = 0;
};
class InputFileReader : public InputReader
{
public:
- InputFileReader() :
- streamtype_(ETI_STREAM_TYPE_NONE),
- inputfile_(NULL) { }
-
- ~InputFileReader()
- {
- if (inputfile_ != NULL) {
- fprintf(stderr, "\nClosing input file...\n");
-
- fclose(inputfile_);
- }
- }
+ InputFileReader() = default;
+ InputFileReader(const InputFileReader& other) = delete;
+ InputFileReader& operator=(const InputFileReader& other) = delete;
// open file and determine stream type
// When loop=1, GetNextFrame will never return 0
int Open(std::string filename, bool loop);
// Print information about the file opened
- void PrintInfo();
-
+ void PrintInfo() const;
int GetNextFrame(void* buffer);
- EtiStreamType GetStreamType()
- {
- return streamtype_;
- }
-
private:
- InputFileReader(const InputFileReader& other) = delete;
- InputFileReader& operator=(const InputFileReader& other) = delete;
-
int IdentifyType();
// Rewind the file, and replay anew
@@ -136,19 +80,60 @@ class InputFileReader : public InputReader
bool loop_; // if shall we loop the file over and over
std::string filename_;
- EtiStreamType streamtype_;
- FILE* inputfile_;
- size_t inputfilelength_;
- uint64_t nbframes_; // 64-bit because 32-bit overflow is
- // after 2**32 * 24ms ~= 3.3 years
+ /* Known types of input streams. Description taken from the CRC
+ * mmbTools forum. All values are are little-endian. */
+ enum class EtiStreamType {
+ /* Not yet identified */
+ None,
+
+ /* Raw format is a bit-by-bit (but byte aligned on sync) recording
+ * of a G.703 data stream. The padding is always present.
+ * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI).
+ * Format:
+ for each frame:
+ uint8_t data[6144]
+ */
+ Raw,
+
+ /* Streamed format is used for streamed applications. As the total
+ * number of frames is unknown before end of transmission, the
+ * corresponding field is removed. The padding can be removed from
+ * data.
+ * Format:
+ for each frame:
+ uint16_t frameSize
+ uint8_t data[frameSize]
+ */
+ Streamed,
+
+ /* Framed format is used for file recording. It is the default format.
+ * The padding can be removed from data.
+ * Format:
+ uint32_t nbFrames
+ for each frame:
+ uint16_t frameSize
+ uint8_t data[frameSize]
+ */
+ Framed,
+ };
+
+ EtiStreamType streamtype_ = EtiStreamType::None;
+ struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
+ std::unique_ptr<FILE, FILEDeleter> inputfile_;
+
+ size_t inputfilelength_ = 0;
+ uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is
+ // after 2**32 * 24ms ~= 3.3 years
};
class InputTcpReader : public InputReader
{
public:
InputTcpReader();
- ~InputTcpReader();
+ InputTcpReader(const InputTcpReader& other) = delete;
+ InputTcpReader& operator=(const InputTcpReader& other) = delete;
+ virtual ~InputTcpReader();
// Endpoint is either host:port or tcp://host:port
void Open(const std::string& endpoint);
@@ -159,21 +144,19 @@ class InputTcpReader : public InputReader
virtual int GetNextFrame(void* buffer);
// Print some information
- virtual void PrintInfo();
+ virtual void PrintInfo() const;
private:
- InputTcpReader(const InputTcpReader& other) = delete;
- InputTcpReader& operator=(const InputTcpReader& other) = delete;
- SOCKET m_sock;
+ int m_sock = INVALID_SOCKET;
std::string m_uri;
};
struct zmq_input_overflow : public std::exception
{
- const char* what () const throw ()
- {
- return "InputZMQ buffer overflow";
- }
+ const char* what () const throw ()
+ {
+ return "InputZMQ buffer overflow";
+ }
};
#if defined(HAVE_ZEROMQ)
@@ -189,17 +172,12 @@ struct InputZeroMQThreadData
class InputZeroMQWorker
{
public:
- InputZeroMQWorker() :
- running(false),
- zmqcontext(1),
- m_to_drop(0) { }
-
void Start(struct InputZeroMQThreadData* workerdata);
void Stop();
+ bool is_running(void) const { return running; }
- bool is_running(void) { return running; }
private:
- bool running;
+ std::atomic<bool> running = ATOMIC_VAR_INIT(false);
void RecvProcess(struct InputZeroMQThreadData* workerdata);
@@ -212,7 +190,7 @@ class InputZeroMQWorker
*
* Here we keep track of how many ETI frames we must drop
*/
- int m_to_drop;
+ int m_to_drop = 0;
};
class InputZeroMQReader : public InputReader
@@ -232,7 +210,7 @@ class InputZeroMQReader : public InputReader
int GetNextFrame(void* buffer);
- void PrintInfo();
+ void PrintInfo() const;
private:
InputZeroMQReader(const InputZeroMQReader& other) = delete;
diff --git a/src/InputTcpReader.cpp b/src/InputTcpReader.cpp
index 94ec0ad..9a93ad1 100644
--- a/src/InputTcpReader.cpp
+++ b/src/InputTcpReader.cpp
@@ -32,9 +32,12 @@
#include "InputReader.h"
#include "PcDebug.h"
#include "Utils.h"
-#include <sys/socket.h>
#include <unistd.h>
#include <errno.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netdb.h>
+#include <arpa/inet.h>
InputTcpReader::InputTcpReader()
{
@@ -126,7 +129,7 @@ int InputTcpReader::GetNextFrame(void* buffer)
return r;
}
-void InputTcpReader::PrintInfo()
+void InputTcpReader::PrintInfo() const
{
fprintf(stderr, "Input TCP:\n");
fprintf(stderr, " Receiving from %s\n\n", m_uri.c_str());
diff --git a/src/InputZeroMQReader.cpp b/src/InputZeroMQReader.cpp
index 783f0f5..5d0e513 100644
--- a/src/InputZeroMQReader.cpp
+++ b/src/InputZeroMQReader.cpp
@@ -120,7 +120,7 @@ int InputZeroMQReader::GetNextFrame(void* buffer)
return framesize;
}
-void InputZeroMQReader::PrintInfo()
+void InputZeroMQReader::PrintInfo() const
{
fprintf(stderr, "Input ZeroMQ:\n");
fprintf(stderr, " Receiving from %s\n\n", uri_.c_str());