From 7102f830e01c3d4d695c0d36608cb09064e4aedc Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 20 May 2019 08:49:29 +0200 Subject: Move outputs to a separate file --- src/Outputs.cpp | 171 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/Outputs.h | 126 +++++++++++++++++++++++++++++++++++++ src/common.h | 9 ++- src/odr-audioenc.cpp | 160 ++++++++++++----------------------------------- src/utils.h | 35 +---------- 5 files changed, 344 insertions(+), 157 deletions(-) create mode 100644 src/Outputs.cpp create mode 100644 src/Outputs.h (limited to 'src') diff --git a/src/Outputs.cpp b/src/Outputs.cpp new file mode 100644 index 0000000..a80ca08 --- /dev/null +++ b/src/Outputs.cpp @@ -0,0 +1,171 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#include "Outputs.h" +#include +#include +#include +#include +#include + +namespace Output { + +using namespace std; + +void Base::update_audio_levels(int16_t audiolevel_left, int16_t audiolevel_right) +{ + m_audio_left = audiolevel_left; + m_audio_right = audiolevel_right; +} + +File::File(const char *filename) +{ + m_fd = fopen(filename, "wb"); + if (m_fd == nullptr) { + throw runtime_error(string("Error opening output file: ") + strerror(errno)); + } +} + +File::File(FILE *fd) : m_fd(fd) { } + +File::~File() { + if (m_fd) { + fclose(m_fd); + m_fd = nullptr; + } +} + +bool File::write_frame(const uint8_t *buf, size_t len) +{ + if (m_fd == nullptr) { + throw logic_error("Invalid usage of closed File output"); + } + + return fwrite(buf, len, 1, m_fd) == 1; +} + +ZMQ::ZMQ() : + m_ctx(), + m_sock(m_ctx, ZMQ_PUB) +{ + // Do not wait at teardown to send all data out + int linger = 0; + m_sock.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +} + +ZMQ::~ZMQ() {} + +void ZMQ::connect(const char *uri, const char *keyfile) +{ + if (keyfile) { + fprintf(stderr, "Enabling encryption\n"); + + int rc = readkey(keyfile, m_secretkey); + if (rc) { + throw runtime_error("Error reading secret key"); + } + + const int yes = 1; + m_sock.setsockopt(ZMQ_CURVE_SERVER, + &yes, sizeof(yes)); + + m_sock.setsockopt(ZMQ_CURVE_SECRETKEY, + m_secretkey, CURVE_KEYLEN); + } + m_sock.connect(uri); +} + +void ZMQ::set_encoder_type(encoder_selection_t& enc, int bitrate) +{ + m_encoder = enc; + m_bitrate = bitrate; +} + +bool ZMQ::write_frame(const uint8_t *buf, size_t len) +{ + switch (m_encoder) { + case encoder_selection_t::fdk_dabplus: + return send_frame(buf, len); + case encoder_selection_t::toolame_dab: + return write_toolame(buf, len); + } + throw logic_error("Unhandled encoder in ZMQ::write_frame"); +} + +bool ZMQ::write_toolame(const uint8_t *buf, size_t len) +{ + m_toolame_buffer.insert(m_toolame_buffer.end(), + buf, buf + len); + + // ODR-DabMux expects frames of length 3*bitrate + const auto frame_len = 3 * m_bitrate; + while (m_toolame_buffer.size() > frame_len) { + vec_u8 frame(frame_len); + // this is probably not very efficient + std::copy(m_toolame_buffer.begin(), m_toolame_buffer.begin() + frame_len, frame.begin()); + + bool success = send_frame(frame.data(), frame.size()); + if (not success) { + return false; + } + + m_toolame_buffer.erase(m_toolame_buffer.begin(), m_toolame_buffer.begin() + frame_len); + } + return true; +} + +bool ZMQ::send_frame(const uint8_t *buf, size_t len) +{ + if (m_framebuf.size() != ZMQ_HEADER_SIZE + len) { + m_framebuf.resize(ZMQ_HEADER_SIZE + len); + } + + zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)m_framebuf.data(); + + try { + switch (m_encoder) { + case encoder_selection_t::fdk_dabplus: + zmq_frame_header->encoder = ZMQ_ENCODER_FDK; + break; + case encoder_selection_t::toolame_dab: + zmq_frame_header->encoder = ZMQ_ENCODER_TOOLAME; + break; + } + + zmq_frame_header->version = 1; + zmq_frame_header->datasize = len; + zmq_frame_header->audiolevel_left = m_audio_left; + zmq_frame_header->audiolevel_right = m_audio_right; + + assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= m_framebuf.size()); + + memcpy(ZMQ_FRAME_DATA(zmq_frame_header), buf, len); + + m_sock.send(m_framebuf.data(), ZMQ_FRAME_SIZE(zmq_frame_header), + ZMQ_DONTWAIT); + } + catch (zmq::error_t& e) { + fprintf(stderr, "ZeroMQ send error !\n"); + return false; + } + + return true; +} + +} diff --git a/src/Outputs.h b/src/Outputs.h new file mode 100644 index 0000000..30b20c8 --- /dev/null +++ b/src/Outputs.h @@ -0,0 +1,126 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once +#include +#include +#include +#include +#include +#include "common.h" +#include "zmq.hpp" +extern "C" { +#include "encryption.h" +} + +namespace Output { + +/*! \file Outputs.h + * + * Declaration of all outputs + */ + +class Base { + public: + virtual ~Base() {}; + + /*! Write a buffer of encoded data to the output */ + virtual bool write_frame(const uint8_t *buf, size_t len) = 0; + + /*! Update peak audio level information */ + virtual void update_audio_levels( + int16_t audiolevel_left, int16_t audiolevel_right); + + protected: + int16_t m_audio_left = 0; + int16_t m_audio_right = 0; +}; + +class File : public Base { + public: + File(const char *filename); + File(FILE *file); + File(const File&) = delete; + File& operator=(const File&) = delete; + virtual ~File() override; + + virtual bool write_frame(const uint8_t *buf, size_t len) override; + + private: + FILE *m_fd = nullptr; +}; + +/*! This defines the on-wire representation of a ZMQ message header. + * It must be compatible with the definition in ODR-DabMux. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ + uint16_t version; // we support version=1 now + uint16_t encoder; // see ZMQ_ENCODER_XYZ + + /* length of the 'data' field */ + uint32_t datasize; + + /* Audio level, peak, linear PCM */ + int16_t audiolevel_left; + int16_t audiolevel_right; + + /* Data follows this header */ +} __attribute__ ((packed)); + +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +#define ZMQ_HEADER_SIZE sizeof(struct zmq_frame_header_t) + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(struct zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(struct zmq_frame_header_t) ) + + +class ZMQ: public Base { + public: + ZMQ(); + ZMQ(const ZMQ&) = delete; + ZMQ& operator=(const ZMQ&) = delete; + virtual ~ZMQ() override; + + void connect(const char *uri, const char *keyfile); + void set_encoder_type(encoder_selection_t& enc, int bitrate); + + virtual bool write_frame(const uint8_t *buf, size_t len) override; + + private: + virtual bool write_toolame(const uint8_t *buf, size_t len); + virtual bool send_frame(const uint8_t *buf, size_t len); + + zmq::context_t m_ctx; + zmq::socket_t m_sock; + + int m_bitrate = 0; + char m_secretkey[CURVE_KEYLEN+1]; + encoder_selection_t m_encoder = encoder_selection_t::fdk_dabplus; + using vec_u8 = std::vector; + vec_u8 m_framebuf; + std::deque m_toolame_buffer; +}; + +} diff --git a/src/common.h b/src/common.h index cd856f4..774a4a0 100644 --- a/src/common.h +++ b/src/common.h @@ -16,8 +16,7 @@ * ------------------------------------------------------------------- */ -#ifndef __COMMON_H_ -#define __COMMON_H_ +#pragma once // 16 bits per sample is fine for now #define BYTES_PER_SAMPLE 2 @@ -25,5 +24,9 @@ // How many samples we insert into the queue each call #define NUM_SAMPLES_PER_CALL 10 // 10 samples @ 32kHz = 3.125ms -#endif // __COMMON_H_ +//! Enumeration of encoders we can use +enum class encoder_selection_t { + fdk_dabplus, + toolame_dab +}; diff --git a/src/odr-audioenc.cpp b/src/odr-audioenc.cpp index f56c6df..7f11edd 100644 --- a/src/odr-audioenc.cpp +++ b/src/odr-audioenc.cpp @@ -36,6 +36,7 @@ * - \ref VLCInput.h VLC Input * - \ref AlsaInput.h Alsa Input * - \ref JackInput.h JACK Input + * - \ref Outputs.h ZeroMQ and file outputs * - \ref SampleQueue.h * - \ref charset.h Charset conversion * - \ref toolame.h libtolame API @@ -54,7 +55,7 @@ #include "VLCInput.h" #include "SampleQueue.h" #include "AACDecoder.h" -#include "zmq.hpp" +#include "Outputs.h" #include "common.h" #include "wavfile.h" @@ -91,12 +92,6 @@ constexpr int MAX_FAULTS_ALLOWED = 5; using vec_u8 = std::vector; -//! Enumeration of encoders we can use -enum class encoder_selection_t { - fdk_dabplus, - toolame_dab -}; - using namespace std; struct audioenc_settings_t { @@ -470,8 +465,8 @@ int main(int argc, char *argv[]) encoder_selection_t selected_encoder = encoder_selection_t::fdk_dabplus; - // For the file output - FILE *out_fh = nullptr; + shared_ptr file_output; + shared_ptr zmq_output; vector output_uris; @@ -485,7 +480,6 @@ int main(int argc, char *argv[]) string dab_channel_mode; int dab_psy_model = 1; - deque toolame_output_buffer; /* Keep track of peaks */ int peak_left = 0; @@ -764,63 +758,38 @@ int main(int argc, char *argv[]) return 1; } - zmq::context_t zmq_ctx; - zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB); - - // Do not wait at teardown to send all data out - int linger = 0; - zmq_sock.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); + if (output_uris.empty()) { + fprintf(stderr, "No output URI defined\n"); + return 1; + } - if (not output_uris.empty()) { - for (auto uri : output_uris) { - if (uri == "-") { - if (out_fh != nullptr) { - fprintf(stderr, "You can't write to more than one file!\n"); - return 1; - } - out_fh = stdout; + for (const auto& uri : output_uris) { + if (uri == "-") { + if (file_output) { + fprintf(stderr, "You can't write to more than one file!\n"); + return 1; } - else if ((uri.compare(0, 6, "tcp://") == 0) || - (uri.compare(0, 6, "pgm://") == 0) || - (uri.compare(0, 7, "epgm://") == 0) || - (uri.compare(0, 6, "ipc://") == 0)) { - if (keyfile) { - fprintf(stderr, "Enabling encryption\n"); - - int rc = readkey(keyfile, secretkey); - if (rc) { - fprintf(stderr, "Error reading secret key\n"); - return 2; - } - - const int yes = 1; - zmq_sock.setsockopt(ZMQ_CURVE_SERVER, - &yes, sizeof(yes)); + file_output = make_shared(stdout); + } + else if ((uri.compare(0, 6, "tcp://") == 0) || + (uri.compare(0, 6, "pgm://") == 0) || + (uri.compare(0, 7, "epgm://") == 0) || + (uri.compare(0, 6, "ipc://") == 0)) { - zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, - secretkey, CURVE_KEYLEN); - } - zmq_sock.connect(uri.c_str()); + if (not zmq_output) { + zmq_output = make_shared(); } - else { // We assume it's a file name - if (out_fh != nullptr) { - fprintf(stderr, "You can't write to more than one file!\n"); - return 1; - } - - out_fh = fopen(uri.c_str(), "wb"); - if (!out_fh) { - fprintf(stderr, "Can't open output file!\n"); - return 1; - } + zmq_output->connect(uri.c_str(), keyfile); + } + else { // We assume it's a file name + if (file_output) { + fprintf(stderr, "You can't write to more than one file!\n"); + return 1; } + file_output = make_shared(uri.c_str()); } } - else { - fprintf(stderr, "No output URI defined\n"); - return 1; - } if (padlen != 0) { int flags; @@ -956,13 +925,15 @@ int main(int argc, char *argv[]) return 1; } + zmq_output->set_encoder_type(selected_encoder, bitrate); + int outbuf_size; vec_u8 zmqframebuf; vec_u8 outbuf; + if (selected_encoder == encoder_selection_t::fdk_dabplus) { outbuf_size = bitrate/8*120; outbuf.resize(24*120); - zmqframebuf.resize(ZMQ_HEADER_SIZE + 24*120); if(outbuf_size % 5 != 0) { fprintf(stderr, "Warning: (outbuf_size mod 5) = %d\n", outbuf_size % 5); @@ -972,13 +943,8 @@ int main(int argc, char *argv[]) outbuf_size = 4092; outbuf.resize(outbuf_size); fprintf(stderr, "Setting outbuf size to %zu\n", outbuf.size()); - - // ODR-DabMux expects frames of length 3*bitrate - zmqframebuf.resize(ZMQ_HEADER_SIZE + 3 * bitrate); } - zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)&zmqframebuf[0]; - unsigned char pad_buf[padlen + 1]; if (restart_on_fault) { @@ -1327,61 +1293,18 @@ int main(int argc, char *argv[]) } if (numOutBytes != 0) { - if (out_fh) { - fwrite(&outbuf[0], 1, numOutBytes, out_fh); + if (file_output) { + file_output->write_frame(outbuf.data(), numOutBytes); } - else { - // ------------ ZeroMQ transmit - try { - if (selected_encoder == encoder_selection_t::fdk_dabplus) { - zmq_frame_header->encoder = ZMQ_ENCODER_FDK; - zmq_frame_header->version = 1; - zmq_frame_header->datasize = numOutBytes; - zmq_frame_header->audiolevel_left = peak_left; - zmq_frame_header->audiolevel_right = peak_right; + else if (zmq_output) { + bool success = zmq_output->write_frame(outbuf.data(), numOutBytes); - assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= zmqframebuf.size()); - - memcpy(ZMQ_FRAME_DATA(zmq_frame_header), - &outbuf[0], numOutBytes); - - zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header), - ZMQ_DONTWAIT); - - } - else if (selected_encoder == encoder_selection_t::toolame_dab) { - toolame_output_buffer.insert(toolame_output_buffer.end(), - outbuf.begin(), outbuf.begin() + numOutBytes); - - while (toolame_output_buffer.size() > 3 * bitrate) { - zmq_frame_header->encoder = ZMQ_ENCODER_TOOLAME; - zmq_frame_header->version = 1; - zmq_frame_header->datasize = 3 * bitrate; - zmq_frame_header->audiolevel_left = peak_left; - zmq_frame_header->audiolevel_right = peak_right; - - uint8_t *encoded_frame = ZMQ_FRAME_DATA(zmq_frame_header); - - // no memcpy for std::deque - for (size_t i = 0; i < 3*bitrate; i++) { - encoded_frame[i] = toolame_output_buffer[i]; - } - - zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header), - ZMQ_DONTWAIT); - - toolame_output_buffer.erase(toolame_output_buffer.begin(), - toolame_output_buffer.begin() + 3 * bitrate); - } - } - } - catch (zmq::error_t& e) { + if (not success) { fprintf(stderr, "ZeroMQ send error !\n"); send_error_count ++; } - if (send_error_count > 10) - { + if (send_error_count > 10) { fprintf(stderr, "ZeroMQ send failed ten times, aborting!\n"); retval = 4; break; @@ -1389,8 +1312,7 @@ int main(int argc, char *argv[]) } } - if (numOutBytes != 0) - { + if (numOutBytes != 0) { if (show_level) { if (settings.channels == 1) { fprintf(stderr, "\rIn: [%-6s] %1s %1s %1s", @@ -1430,11 +1352,9 @@ int main(int argc, char *argv[]) fprintf(stderr, "\n"); - if (out_fh) { - fclose(out_fh); - } + file_output.reset(); + zmq_output.reset(); - zmq_sock.close(); free_rs_char(rs_handler); if (selected_encoder == encoder_selection_t::fdk_dabplus) { diff --git a/src/utils.h b/src/utils.h index 83b3e4d..2cb06c3 100644 --- a/src/utils.h +++ b/src/utils.h @@ -1,5 +1,4 @@ -#ifndef UTILS_H_ -#define UTILS_H_ +#pragma once #include #include @@ -17,37 +16,5 @@ */ const char* level(int channel, int peak); -/*! This defines the on-wire representation of a ZMQ message header. - * It must be compatible with the definition in ODR-DabMux. - * - * The data follows right after this header */ -struct zmq_frame_header_t -{ - uint16_t version; // we support version=1 now - uint16_t encoder; // see ZMQ_ENCODER_XYZ - - /* length of the 'data' field */ - uint32_t datasize; - - /* Audio level, peak, linear PCM */ - int16_t audiolevel_left; - int16_t audiolevel_right; - - /* Data follows this header */ -} __attribute__ ((packed)); - -#define ZMQ_ENCODER_FDK 1 -#define ZMQ_ENCODER_TOOLAME 2 - -#define ZMQ_HEADER_SIZE sizeof(struct zmq_frame_header_t) - -/* The expected frame size incl data of the given frame */ -#define ZMQ_FRAME_SIZE(f) (sizeof(struct zmq_frame_header_t) + f->datasize) - -#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(struct zmq_frame_header_t) ) - - size_t strlen_utf8(const char *s); -#endif - -- cgit v1.2.3