aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2018-02-13 17:21:00 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2018-02-13 17:21:00 +0100
commitab64249989657e9b9e14735d3a1752f0f921056b (patch)
tree16021499f08a9c7fd03eae4fc57c4e6bb2e902d7 /src
parente526c97dcdaf12de4d4c6324ca007eec60884165 (diff)
downloaddabmux-ab64249989657e9b9e14735d3a1752f0f921056b.tar.gz
dabmux-ab64249989657e9b9e14735d3a1752f0f921056b.tar.bz2
dabmux-ab64249989657e9b9e14735d3a1752f0f921056b.zip
Make nonblock available again for file inputs
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp21
-rw-r--r--src/input/File.cpp251
-rw-r--r--src/input/File.h11
3 files changed, 171 insertions, 112 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 17b34ca..120ca09 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -869,11 +869,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan,
dabProtection* protection = &subchan->protection;
- const bool nonblock = pt.get("nonblock", false);
- if (nonblock) {
- etiLog.level(warn) << "The nonblock option is not supported";
- }
-
if (type == "dabplus" or type == "audio") {
subchan->type = subchannel_type_t::Audio;
subchan->bitrate = 0;
@@ -889,10 +884,7 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan,
throw logic_error("Incomplete handling of file input");
}
}
- else if (proto == "tcp" ||
- proto == "epgm" ||
- proto == "ipc") {
-
+ else if (proto == "tcp" or proto == "epgm" or proto == "ipc") {
auto zmqconfig = setup_zmq_input(pt, subchanuid);
if (type == "audio") {
@@ -965,6 +957,17 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan,
ss << "Subchannel with uid " << subchanuid << " has unknown type!";
throw runtime_error(ss.str());
}
+
+ const bool nonblock = pt.get("nonblock", false);
+ if (nonblock) {
+ if (auto filein = dynamic_pointer_cast<Inputs::FileBase>(subchan->input)) {
+ filein->setNonblocking(nonblock);
+ }
+ else {
+ etiLog.level(warn) << "The nonblock option is not supported";
+ }
+ }
+
subchan->startAddress = 0;
if (type == "audio") {
diff --git a/src/input/File.cpp b/src/input/File.cpp
index 5c61fd4..54dac1e 100644
--- a/src/input/File.cpp
+++ b/src/input/File.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2016 Matthias P. Braendli
+ Copyright (C) 2018 Matthias P. Braendli
http://www.opendigitalradio.org
*/
@@ -23,16 +23,14 @@
along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <sstream>
#include <errno.h>
-#include <stdlib.h>
-#include <stdio.h>
+#include <cstdlib>
+#include <cstdio>
#include <fcntl.h>
#include <unistd.h>
#ifndef _WIN32
# define O_BINARY 0
#endif
-
#include "input/File.h"
#include "mpeg.h"
#include "ReedSolomon.h"
@@ -62,12 +60,15 @@ __attribute((packed))
int FileBase::open(const std::string& name)
{
- m_fd = ::open(name.c_str(), O_RDONLY | O_BINARY);
+ int flags = O_RDONLY | O_BINARY;
+ if (m_nonblock) {
+ flags |= O_NONBLOCK;
+ }
+
+ m_fd = ::open(name.c_str(), flags);
if (m_fd == -1) {
- std::stringstream ss;
- ss << "Could not open input file " << name << ": " <<
- strerror(errno);
- throw std::runtime_error(ss.str());
+ throw std::runtime_error("Could not open input file " + name + ": " +
+ strerror(errno));
}
return 0;
}
@@ -75,7 +76,7 @@ int FileBase::open(const std::string& name)
int FileBase::setBitrate(int bitrate)
{
if (bitrate <= 0) {
- etiLog.log(error, "Invalid bitrate (%i)\n", bitrate);
+ etiLog.log(error, "Invalid bitrate (%i)", bitrate);
return -1;
}
@@ -92,6 +93,11 @@ int FileBase::close()
return 0;
}
+void FileBase::setNonblocking(bool nonblock)
+{
+ m_nonblock = nonblock;
+}
+
int FileBase::rewind()
{
return ::lseek(m_fd, 0, SEEK_SET);
@@ -99,32 +105,78 @@ int FileBase::rewind()
ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size)
{
- ssize_t ret = read(m_fd, buffer, size);
+ ssize_t ret = 0;
+ if (m_nonblock) {
+ if (size > m_nonblock_buffer.size()) {
+ size_t required_len = size - m_nonblock_buffer.size();
+ std::vector<uint8_t> buf(required_len);
+ ret = read(m_fd, buf.data(), required_len);
+
+ /* If no process has the pipe open for writing, read() shall return 0
+ * to indicate end-of-file. */
+ if (ret == 0) {
+ return 0;
+ }
- if (ret == -1) {
- etiLog.log(alert, "ERROR: Can't read file\n");
- perror("");
- return -1;
- }
+ /* If some process has the pipe open for writing and O_NONBLOCK is
+ * set, read() shall return −1 and set errno to [EAGAIN]. */
+ if (ret == -1 and errno == EAGAIN) {
+ return 0;
+ }
+ else if (ret == -1) {
+ etiLog.level(alert) << "ERROR: Can't read file " << strerror(errno);
+ return -1;
+ }
- if (ret < (ssize_t)size) {
- ssize_t sizeOut = ret;
- etiLog.log(info, "reach end of file -> rewinding\n");
- if (rewind() == -1) {
- etiLog.log(alert, "ERROR: Can't rewind file\n");
- return -1;
+ if (buf.size() + ret == size) {
+ std::copy(m_nonblock_buffer.begin(), m_nonblock_buffer.end(),
+ buffer);
+ buffer += m_nonblock_buffer.size();
+ m_nonblock_buffer.clear();
+ std::copy(buf.begin(), buf.end(), buffer);
+ return size;
+ }
}
+ else {
+ std::copy(m_nonblock_buffer.begin(), m_nonblock_buffer.begin() + size,
+ buffer);
+
+ std::vector<uint8_t> remaining_buf;
+ std::copy(m_nonblock_buffer.begin() + size, m_nonblock_buffer.end(),
+ std::back_inserter(remaining_buf));
+
+ m_nonblock_buffer = std::move(remaining_buf);
+ return size;
+ }
+ return 0;
+ }
+ else {
+ ret = read(m_fd, buffer, size);
- ret = read(m_fd, buffer + sizeOut, size - sizeOut);
if (ret == -1) {
- etiLog.log(alert, "ERROR: Can't read file\n");
- perror("");
+ etiLog.level(alert) << "ERROR: Can't read file " << strerror(errno);
return -1;
}
if (ret < (ssize_t)size) {
- etiLog.log(alert, "ERROR: Not enough data in file\n");
- return -1;
+ ssize_t sizeOut = ret;
+ etiLog.log(info, "reach end of file -> rewinding");
+ if (rewind() == -1) {
+ etiLog.log(alert, "ERROR: Can't rewind file");
+ return -1;
+ }
+
+ ret = read(m_fd, buffer + sizeOut, size - sizeOut);
+ if (ret == -1) {
+ etiLog.log(alert, "ERROR: Can't read file");
+ perror("");
+ return -1;
+ }
+
+ if (ret < (ssize_t)size) {
+ etiLog.log(alert, "ERROR: Not enough data in file");
+ return -1;
+ }
}
}
@@ -140,7 +192,8 @@ READ_SUBCHANNEL:
result = readData(m_fd, buffer, size, 2);
m_parity = false;
return 0;
- } else {
+ }
+ else {
result = readMpegHeader(m_fd, buffer, size);
if (result > 0) {
result = readMpegFrame(m_fd, buffer, size);
@@ -151,79 +204,77 @@ READ_SUBCHANNEL:
}
}
switch (result) {
- case MPEG_BUFFER_UNDERFLOW:
- etiLog.log(warn, "data underflow -> frame muted\n");
- goto MUTE_SUBCHANNEL;
- case MPEG_BUFFER_OVERFLOW:
- etiLog.log(warn, "bitrate too high -> frame muted\n");
- goto MUTE_SUBCHANNEL;
- case MPEG_FILE_EMPTY:
- if (do_rewind) {
- etiLog.log(error, "file rewinded and still empty "
- "-> frame muted\n");
+ case MPEG_BUFFER_UNDERFLOW:
+ etiLog.log(warn, "data underflow -> frame muted");
goto MUTE_SUBCHANNEL;
- }
- else {
- etiLog.log(info, "reach end of file -> rewinding\n");
- rewind();
- goto READ_SUBCHANNEL;
- }
- case MPEG_FILE_ERROR:
- etiLog.log(alert, "can't read file (%i) -> frame muted\n", errno);
- perror("");
- goto MUTE_SUBCHANNEL;
- case MPEG_SYNC_NOT_FOUND:
- etiLog.log(alert, "mpeg sync not found, maybe is not a valid file "
- "-> frame muted\n");
- goto MUTE_SUBCHANNEL;
- case MPEG_INVALID_FRAME:
- etiLog.log(alert, "file is not a valid mpeg file "
- "-> frame muted\n");
- goto MUTE_SUBCHANNEL;
- default:
- if (result < 0) {
- etiLog.log(alert,
- "unknown error (code = %i) -> frame muted\n",
- result);
+ case MPEG_BUFFER_OVERFLOW:
+ etiLog.log(warn, "bitrate too high -> frame muted");
+ goto MUTE_SUBCHANNEL;
+ case MPEG_FILE_EMPTY:
+ if (do_rewind) {
+ etiLog.log(error, "file rewinded and still empty "
+ "-> frame muted");
+ goto MUTE_SUBCHANNEL;
+ }
+ else {
+ etiLog.log(info, "reach end of file -> rewinding");
+ rewind();
+ goto READ_SUBCHANNEL;
+ }
+ case MPEG_FILE_ERROR:
+ etiLog.log(alert, "can't read file (%i) -> frame muted", errno);
+ perror("");
+ goto MUTE_SUBCHANNEL;
+ case MPEG_SYNC_NOT_FOUND:
+ etiLog.log(alert, "mpeg sync not found, maybe is not a valid file "
+ "-> frame muted");
+ goto MUTE_SUBCHANNEL;
+ case MPEG_INVALID_FRAME:
+ etiLog.log(alert, "file is not a valid mpeg file "
+ "-> frame muted");
+ goto MUTE_SUBCHANNEL;
+ default:
+ if (result < 0) {
+ etiLog.log(alert,
+ "unknown error (code = %i) -> frame muted",
+ result);
MUTE_SUBCHANNEL:
- memset(buffer, 0, size);
- }
- else {
- if (result < (ssize_t)size) {
- etiLog.log(warn, "bitrate too low from file "
- "-> frame padded\n");
- memset((char*)buffer + result, 0, size - result);
+ memset(buffer, 0, size);
}
+ else {
+ if (result < (ssize_t)size) {
+ etiLog.log(warn, "bitrate too low from file "
+ "-> frame padded");
+ memset((char*)buffer + result, 0, size - result);
+ }
- result = checkDabMpegFrame(buffer);
- switch (result) {
- case MPEG_FREQUENCY:
- etiLog.log(error, "file has a frame with an invalid "
- "frequency: %i, should be 48000 or 24000\n",
- getMpegFrequency(buffer));
- break;
- case MPEG_PADDING:
- etiLog.log(warn,
- "file has a frame with padding bit set\n");
- break;
- case MPEG_COPYRIGHT:
- result = 0;
- break;
- case MPEG_ORIGINAL:
- result = 0;
- break;
- case MPEG_EMPHASIS:
- etiLog.log(warn,
- "file has a frame with emphasis bits set\n");
- break;
- default:
- if (result < 0) {
- etiLog.log(alert, "mpeg file has an invalid DAB "
- "mpeg frame (unknown reason: %i)\n", result);
+ result = checkDabMpegFrame(buffer);
+ switch (result) {
+ case MPEG_FREQUENCY:
+ etiLog.log(error, "file has a frame with an invalid "
+ "frequency: %i, should be 48000 or 24000",
+ getMpegFrequency(buffer));
+ break;
+ case MPEG_PADDING:
+ etiLog.log(warn, "file has a frame with padding bit setn");
+ break;
+ case MPEG_COPYRIGHT:
+ result = 0;
+ break;
+ case MPEG_ORIGINAL:
+ result = 0;
+ break;
+ case MPEG_EMPHASIS:
+ etiLog.log(warn, "file has a frame with emphasis bits set");
+ break;
+ default:
+ if (result < 0) {
+ etiLog.log(alert, "mpeg file has an invalid DAB "
+ "mpeg frame (unknown reason: %i)", result);
+ }
+ break;
}
- break;
}
- }
}
return result;
}
@@ -362,7 +413,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
else if (nbBytes < 3) {
etiLog.log(error,
"Error while reading file for packet header; "
- "read %i out of 3 bytes\n", nbBytes);
+ "read %i out of 3 bytes", nbBytes);
break;
}
@@ -390,7 +441,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
}
else if (nbBytes == 0) {
etiLog.log(info,
- "Packet header read, but no data!\n");
+ "Packet header read, but no data!");
if (rewind() == -1) {
goto END_PACKET;
}
@@ -398,7 +449,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
}
else if (nbBytes < length - 3) {
etiLog.log(error, "Error while reading packet file; "
- "read %i out of %i bytes\n", nbBytes, length - 3);
+ "read %i out of %i bytes", nbBytes, length - 3);
break;
}
@@ -416,7 +467,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size)
if (m_enhancedPacketLength >= (12 * 188)) {
if (m_enhancedPacketLength > (12 * 188)) {
etiLog.log(error,
- "Error, too much enhanced packet data!\n");
+ "Error, too much enhanced packet data!");
}
ReedSolomon encoder(204, 188);
for (int j = 0; j < 12; ++j) {
diff --git a/src/input/File.h b/src/input/File.h
index 080d6b5..62a6707 100644
--- a/src/input/File.h
+++ b/src/input/File.h
@@ -2,7 +2,7 @@
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
- Copyright (C) 2016 Matthias P. Braendli
+ Copyright (C) 2018 Matthias P. Braendli
http://www.opendigitalradio.org
*/
@@ -28,7 +28,7 @@
#include <vector>
#include <array>
#include <string>
-#include <stdint.h>
+#include <cstdint>
#include "input/inputs.h"
#include "ManagementServer.h"
@@ -41,6 +41,8 @@ class FileBase : public InputBase {
virtual int setBitrate(int bitrate);
virtual int close();
+ virtual void setNonblocking(bool nonblock);
+
/* Rewind the file
* Returns -1 on failure, 0 on success
*/
@@ -52,8 +54,11 @@ class FileBase : public InputBase {
virtual ssize_t readFromFile(uint8_t* buf, size_t len);
// We use unix open() instead of fopen() because
- // we might want to do non-blocking I/O in the future
+ // of non-blocking I/O
int m_fd = -1;
+
+ bool m_nonblock = false;
+ std::vector<uint8_t> m_nonblock_buffer;
};
class MPEGFile : public FileBase {