From ab64249989657e9b9e14735d3a1752f0f921056b Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 13 Feb 2018 17:21:00 +0100 Subject: Make nonblock available again for file inputs --- src/ConfigParser.cpp | 21 +++-- src/input/File.cpp | 251 +++++++++++++++++++++++++++++++-------------------- src/input/File.h | 11 ++- 3 files changed, 171 insertions(+), 112 deletions(-) diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp index 17b34ca..120ca09 100644 --- a/src/ConfigParser.cpp +++ b/src/ConfigParser.cpp @@ -869,11 +869,6 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, dabProtection* protection = &subchan->protection; - const bool nonblock = pt.get("nonblock", false); - if (nonblock) { - etiLog.level(warn) << "The nonblock option is not supported"; - } - if (type == "dabplus" or type == "audio") { subchan->type = subchannel_type_t::Audio; subchan->bitrate = 0; @@ -889,10 +884,7 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, throw logic_error("Incomplete handling of file input"); } } - else if (proto == "tcp" || - proto == "epgm" || - proto == "ipc") { - + else if (proto == "tcp" or proto == "epgm" or proto == "ipc") { auto zmqconfig = setup_zmq_input(pt, subchanuid); if (type == "audio") { @@ -965,6 +957,17 @@ static void setup_subchannel_from_ptree(DabSubchannel* subchan, ss << "Subchannel with uid " << subchanuid << " has unknown type!"; throw runtime_error(ss.str()); } + + const bool nonblock = pt.get("nonblock", false); + if (nonblock) { + if (auto filein = dynamic_pointer_cast(subchan->input)) { + filein->setNonblocking(nonblock); + } + else { + etiLog.level(warn) << "The nonblock option is not supported"; + } + } + subchan->startAddress = 0; if (type == "audio") { diff --git a/src/input/File.cpp b/src/input/File.cpp index 5c61fd4..54dac1e 100644 --- a/src/input/File.cpp +++ b/src/input/File.cpp @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 Matthias P. Braendli + Copyright (C) 2018 Matthias P. Braendli http://www.opendigitalradio.org */ @@ -23,16 +23,14 @@ along with ODR-DabMux. If not, see . */ -#include #include -#include -#include +#include +#include #include #include #ifndef _WIN32 # define O_BINARY 0 #endif - #include "input/File.h" #include "mpeg.h" #include "ReedSolomon.h" @@ -62,12 +60,15 @@ __attribute((packed)) int FileBase::open(const std::string& name) { - m_fd = ::open(name.c_str(), O_RDONLY | O_BINARY); + int flags = O_RDONLY | O_BINARY; + if (m_nonblock) { + flags |= O_NONBLOCK; + } + + m_fd = ::open(name.c_str(), flags); if (m_fd == -1) { - std::stringstream ss; - ss << "Could not open input file " << name << ": " << - strerror(errno); - throw std::runtime_error(ss.str()); + throw std::runtime_error("Could not open input file " + name + ": " + + strerror(errno)); } return 0; } @@ -75,7 +76,7 @@ int FileBase::open(const std::string& name) int FileBase::setBitrate(int bitrate) { if (bitrate <= 0) { - etiLog.log(error, "Invalid bitrate (%i)\n", bitrate); + etiLog.log(error, "Invalid bitrate (%i)", bitrate); return -1; } @@ -92,6 +93,11 @@ int FileBase::close() return 0; } +void FileBase::setNonblocking(bool nonblock) +{ + m_nonblock = nonblock; +} + int FileBase::rewind() { return ::lseek(m_fd, 0, SEEK_SET); @@ -99,32 +105,78 @@ int FileBase::rewind() ssize_t FileBase::readFromFile(uint8_t* buffer, size_t size) { - ssize_t ret = read(m_fd, buffer, size); + ssize_t ret = 0; + if (m_nonblock) { + if (size > m_nonblock_buffer.size()) { + size_t required_len = size - m_nonblock_buffer.size(); + std::vector buf(required_len); + ret = read(m_fd, buf.data(), required_len); + + /* If no process has the pipe open for writing, read() shall return 0 + * to indicate end-of-file. */ + if (ret == 0) { + return 0; + } - if (ret == -1) { - etiLog.log(alert, "ERROR: Can't read file\n"); - perror(""); - return -1; - } + /* If some process has the pipe open for writing and O_NONBLOCK is + * set, read() shall return −1 and set errno to [EAGAIN]. */ + if (ret == -1 and errno == EAGAIN) { + return 0; + } + else if (ret == -1) { + etiLog.level(alert) << "ERROR: Can't read file " << strerror(errno); + return -1; + } - if (ret < (ssize_t)size) { - ssize_t sizeOut = ret; - etiLog.log(info, "reach end of file -> rewinding\n"); - if (rewind() == -1) { - etiLog.log(alert, "ERROR: Can't rewind file\n"); - return -1; + if (buf.size() + ret == size) { + std::copy(m_nonblock_buffer.begin(), m_nonblock_buffer.end(), + buffer); + buffer += m_nonblock_buffer.size(); + m_nonblock_buffer.clear(); + std::copy(buf.begin(), buf.end(), buffer); + return size; + } } + else { + std::copy(m_nonblock_buffer.begin(), m_nonblock_buffer.begin() + size, + buffer); + + std::vector remaining_buf; + std::copy(m_nonblock_buffer.begin() + size, m_nonblock_buffer.end(), + std::back_inserter(remaining_buf)); + + m_nonblock_buffer = std::move(remaining_buf); + return size; + } + return 0; + } + else { + ret = read(m_fd, buffer, size); - ret = read(m_fd, buffer + sizeOut, size - sizeOut); if (ret == -1) { - etiLog.log(alert, "ERROR: Can't read file\n"); - perror(""); + etiLog.level(alert) << "ERROR: Can't read file " << strerror(errno); return -1; } if (ret < (ssize_t)size) { - etiLog.log(alert, "ERROR: Not enough data in file\n"); - return -1; + ssize_t sizeOut = ret; + etiLog.log(info, "reach end of file -> rewinding"); + if (rewind() == -1) { + etiLog.log(alert, "ERROR: Can't rewind file"); + return -1; + } + + ret = read(m_fd, buffer + sizeOut, size - sizeOut); + if (ret == -1) { + etiLog.log(alert, "ERROR: Can't read file"); + perror(""); + return -1; + } + + if (ret < (ssize_t)size) { + etiLog.log(alert, "ERROR: Not enough data in file"); + return -1; + } } } @@ -140,7 +192,8 @@ READ_SUBCHANNEL: result = readData(m_fd, buffer, size, 2); m_parity = false; return 0; - } else { + } + else { result = readMpegHeader(m_fd, buffer, size); if (result > 0) { result = readMpegFrame(m_fd, buffer, size); @@ -151,79 +204,77 @@ READ_SUBCHANNEL: } } switch (result) { - case MPEG_BUFFER_UNDERFLOW: - etiLog.log(warn, "data underflow -> frame muted\n"); - goto MUTE_SUBCHANNEL; - case MPEG_BUFFER_OVERFLOW: - etiLog.log(warn, "bitrate too high -> frame muted\n"); - goto MUTE_SUBCHANNEL; - case MPEG_FILE_EMPTY: - if (do_rewind) { - etiLog.log(error, "file rewinded and still empty " - "-> frame muted\n"); + case MPEG_BUFFER_UNDERFLOW: + etiLog.log(warn, "data underflow -> frame muted"); goto MUTE_SUBCHANNEL; - } - else { - etiLog.log(info, "reach end of file -> rewinding\n"); - rewind(); - goto READ_SUBCHANNEL; - } - case MPEG_FILE_ERROR: - etiLog.log(alert, "can't read file (%i) -> frame muted\n", errno); - perror(""); - goto MUTE_SUBCHANNEL; - case MPEG_SYNC_NOT_FOUND: - etiLog.log(alert, "mpeg sync not found, maybe is not a valid file " - "-> frame muted\n"); - goto MUTE_SUBCHANNEL; - case MPEG_INVALID_FRAME: - etiLog.log(alert, "file is not a valid mpeg file " - "-> frame muted\n"); - goto MUTE_SUBCHANNEL; - default: - if (result < 0) { - etiLog.log(alert, - "unknown error (code = %i) -> frame muted\n", - result); + case MPEG_BUFFER_OVERFLOW: + etiLog.log(warn, "bitrate too high -> frame muted"); + goto MUTE_SUBCHANNEL; + case MPEG_FILE_EMPTY: + if (do_rewind) { + etiLog.log(error, "file rewinded and still empty " + "-> frame muted"); + goto MUTE_SUBCHANNEL; + } + else { + etiLog.log(info, "reach end of file -> rewinding"); + rewind(); + goto READ_SUBCHANNEL; + } + case MPEG_FILE_ERROR: + etiLog.log(alert, "can't read file (%i) -> frame muted", errno); + perror(""); + goto MUTE_SUBCHANNEL; + case MPEG_SYNC_NOT_FOUND: + etiLog.log(alert, "mpeg sync not found, maybe is not a valid file " + "-> frame muted"); + goto MUTE_SUBCHANNEL; + case MPEG_INVALID_FRAME: + etiLog.log(alert, "file is not a valid mpeg file " + "-> frame muted"); + goto MUTE_SUBCHANNEL; + default: + if (result < 0) { + etiLog.log(alert, + "unknown error (code = %i) -> frame muted", + result); MUTE_SUBCHANNEL: - memset(buffer, 0, size); - } - else { - if (result < (ssize_t)size) { - etiLog.log(warn, "bitrate too low from file " - "-> frame padded\n"); - memset((char*)buffer + result, 0, size - result); + memset(buffer, 0, size); } + else { + if (result < (ssize_t)size) { + etiLog.log(warn, "bitrate too low from file " + "-> frame padded"); + memset((char*)buffer + result, 0, size - result); + } - result = checkDabMpegFrame(buffer); - switch (result) { - case MPEG_FREQUENCY: - etiLog.log(error, "file has a frame with an invalid " - "frequency: %i, should be 48000 or 24000\n", - getMpegFrequency(buffer)); - break; - case MPEG_PADDING: - etiLog.log(warn, - "file has a frame with padding bit set\n"); - break; - case MPEG_COPYRIGHT: - result = 0; - break; - case MPEG_ORIGINAL: - result = 0; - break; - case MPEG_EMPHASIS: - etiLog.log(warn, - "file has a frame with emphasis bits set\n"); - break; - default: - if (result < 0) { - etiLog.log(alert, "mpeg file has an invalid DAB " - "mpeg frame (unknown reason: %i)\n", result); + result = checkDabMpegFrame(buffer); + switch (result) { + case MPEG_FREQUENCY: + etiLog.log(error, "file has a frame with an invalid " + "frequency: %i, should be 48000 or 24000", + getMpegFrequency(buffer)); + break; + case MPEG_PADDING: + etiLog.log(warn, "file has a frame with padding bit setn"); + break; + case MPEG_COPYRIGHT: + result = 0; + break; + case MPEG_ORIGINAL: + result = 0; + break; + case MPEG_EMPHASIS: + etiLog.log(warn, "file has a frame with emphasis bits set"); + break; + default: + if (result < 0) { + etiLog.log(alert, "mpeg file has an invalid DAB " + "mpeg frame (unknown reason: %i)", result); + } + break; } - break; } - } } return result; } @@ -362,7 +413,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size) else if (nbBytes < 3) { etiLog.log(error, "Error while reading file for packet header; " - "read %i out of 3 bytes\n", nbBytes); + "read %i out of 3 bytes", nbBytes); break; } @@ -390,7 +441,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size) } else if (nbBytes == 0) { etiLog.log(info, - "Packet header read, but no data!\n"); + "Packet header read, but no data!"); if (rewind() == -1) { goto END_PACKET; } @@ -398,7 +449,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size) } else if (nbBytes < length - 3) { etiLog.log(error, "Error while reading packet file; " - "read %i out of %i bytes\n", nbBytes, length - 3); + "read %i out of %i bytes", nbBytes, length - 3); break; } @@ -416,7 +467,7 @@ int PacketFile::readFrame(uint8_t* buffer, size_t size) if (m_enhancedPacketLength >= (12 * 188)) { if (m_enhancedPacketLength > (12 * 188)) { etiLog.log(error, - "Error, too much enhanced packet data!\n"); + "Error, too much enhanced packet data!"); } ReedSolomon encoder(204, 188); for (int j = 0; j < 12; ++j) { diff --git a/src/input/File.h b/src/input/File.h index 080d6b5..62a6707 100644 --- a/src/input/File.h +++ b/src/input/File.h @@ -2,7 +2,7 @@ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications Research Center Canada) - Copyright (C) 2016 Matthias P. Braendli + Copyright (C) 2018 Matthias P. Braendli http://www.opendigitalradio.org */ @@ -28,7 +28,7 @@ #include #include #include -#include +#include #include "input/inputs.h" #include "ManagementServer.h" @@ -41,6 +41,8 @@ class FileBase : public InputBase { virtual int setBitrate(int bitrate); virtual int close(); + virtual void setNonblocking(bool nonblock); + /* Rewind the file * Returns -1 on failure, 0 on success */ @@ -52,8 +54,11 @@ class FileBase : public InputBase { virtual ssize_t readFromFile(uint8_t* buf, size_t len); // We use unix open() instead of fopen() because - // we might want to do non-blocking I/O in the future + // of non-blocking I/O int m_fd = -1; + + bool m_nonblock = false; + std::vector m_nonblock_buffer; }; class MPEGFile : public FileBase { -- cgit v1.2.3