diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/Outputs.cpp | 171 | ||||
| -rw-r--r-- | src/Outputs.h | 126 | ||||
| -rw-r--r-- | src/common.h | 9 | ||||
| -rw-r--r-- | src/odr-audioenc.cpp | 160 | ||||
| -rw-r--r-- | src/utils.h | 35 | 
5 files changed, 344 insertions, 157 deletions
diff --git a/src/Outputs.cpp b/src/Outputs.cpp new file mode 100644 index 0000000..a80ca08 --- /dev/null +++ b/src/Outputs.cpp @@ -0,0 +1,171 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#include "Outputs.h" +#include <string> +#include <stdexcept> +#include <cstring> +#include <cerrno> +#include <cassert> + +namespace Output { + +using namespace std; + +void Base::update_audio_levels(int16_t audiolevel_left, int16_t audiolevel_right) +{ +    m_audio_left = audiolevel_left; +    m_audio_right = audiolevel_right; +} + +File::File(const char *filename) +{ +    m_fd = fopen(filename, "wb"); +    if (m_fd == nullptr) { +        throw runtime_error(string("Error opening output file: ") + strerror(errno)); +    } +} + +File::File(FILE *fd) : m_fd(fd) { } + +File::~File() { +    if (m_fd) { +        fclose(m_fd); +        m_fd = nullptr; +    } +} + +bool File::write_frame(const uint8_t *buf, size_t len) +{ +    if (m_fd == nullptr) { +        throw logic_error("Invalid usage of closed File output"); +    } + +    return fwrite(buf, len, 1, m_fd) == 1; +} + +ZMQ::ZMQ() : +    m_ctx(), +    m_sock(m_ctx, ZMQ_PUB) +{ +    // Do not wait at teardown to send all data out +    int linger = 0; +    m_sock.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +} + +ZMQ::~ZMQ() {} + +void ZMQ::connect(const char *uri, const char *keyfile) +{ +    if (keyfile) { +        fprintf(stderr, "Enabling encryption\n"); + +        int rc = readkey(keyfile, m_secretkey); +        if (rc) { +            throw runtime_error("Error reading secret key"); +        } + +        const int yes = 1; +        m_sock.setsockopt(ZMQ_CURVE_SERVER, +                &yes, sizeof(yes)); + +        m_sock.setsockopt(ZMQ_CURVE_SECRETKEY, +                m_secretkey, CURVE_KEYLEN); +    } +    m_sock.connect(uri); +} + +void ZMQ::set_encoder_type(encoder_selection_t& enc, int bitrate) +{ +    m_encoder = enc; +    m_bitrate = bitrate; +} + +bool ZMQ::write_frame(const uint8_t *buf, size_t len) +{ +    switch (m_encoder) { +        case encoder_selection_t::fdk_dabplus: +            return send_frame(buf, len); +        case encoder_selection_t::toolame_dab: +            return write_toolame(buf, len); +    } +    throw logic_error("Unhandled encoder in ZMQ::write_frame"); +} + +bool ZMQ::write_toolame(const uint8_t *buf, size_t len) +{ +    m_toolame_buffer.insert(m_toolame_buffer.end(), +            buf, buf + len); + +    // ODR-DabMux expects frames of length 3*bitrate +    const auto frame_len = 3 * m_bitrate; +    while (m_toolame_buffer.size() > frame_len) { +        vec_u8 frame(frame_len); +        // this is probably not very efficient +        std::copy(m_toolame_buffer.begin(), m_toolame_buffer.begin() + frame_len, frame.begin()); + +        bool success = send_frame(frame.data(), frame.size()); +        if (not success) { +            return false; +        } + +        m_toolame_buffer.erase(m_toolame_buffer.begin(), m_toolame_buffer.begin() + frame_len); +    } +    return true; +} + +bool ZMQ::send_frame(const uint8_t *buf, size_t len) +{ +    if (m_framebuf.size() != ZMQ_HEADER_SIZE + len) { +        m_framebuf.resize(ZMQ_HEADER_SIZE + len); +    } + +    zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)m_framebuf.data(); + +    try { +        switch (m_encoder) { +            case encoder_selection_t::fdk_dabplus: +                zmq_frame_header->encoder = ZMQ_ENCODER_FDK; +                break; +            case encoder_selection_t::toolame_dab: +                zmq_frame_header->encoder = ZMQ_ENCODER_TOOLAME; +                break; +        } + +        zmq_frame_header->version = 1; +        zmq_frame_header->datasize = len; +        zmq_frame_header->audiolevel_left = m_audio_left; +        zmq_frame_header->audiolevel_right = m_audio_right; + +        assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= m_framebuf.size()); + +        memcpy(ZMQ_FRAME_DATA(zmq_frame_header), buf, len); + +        m_sock.send(m_framebuf.data(), ZMQ_FRAME_SIZE(zmq_frame_header), +                ZMQ_DONTWAIT); +    } +    catch (zmq::error_t& e) { +        fprintf(stderr, "ZeroMQ send error !\n"); +        return false; +    } + +    return true; +} + +} diff --git a/src/Outputs.h b/src/Outputs.h new file mode 100644 index 0000000..30b20c8 --- /dev/null +++ b/src/Outputs.h @@ -0,0 +1,126 @@ +/* ------------------------------------------------------------------ + * Copyright (C) 2011 Martin Storsjo + * Copyright (C) 2019 Matthias P. Braendli + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + *    http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. + * See the License for the specific language governing permissions + * and limitations under the License. + * ------------------------------------------------------------------- + */ + +#pragma once +#include <vector> +#include <deque> +#include <cstdint> +#include <cstddef> +#include <cstdio> +#include "common.h" +#include "zmq.hpp" +extern "C" { +#include "encryption.h" +} + +namespace Output { + +/*! \file Outputs.h + * + * Declaration of all outputs + */ + +class Base { +    public: +        virtual ~Base() {}; + +        /*! Write a buffer of encoded data to the output */ +        virtual bool write_frame(const uint8_t *buf, size_t len) = 0; + +        /*! Update peak audio level information */ +        virtual void update_audio_levels( +                int16_t audiolevel_left, int16_t audiolevel_right); + +    protected: +        int16_t m_audio_left = 0; +        int16_t m_audio_right = 0; +}; + +class File : public Base { +    public: +        File(const char *filename); +        File(FILE *file); +        File(const File&) = delete; +        File& operator=(const File&) = delete; +        virtual ~File() override; + +        virtual bool write_frame(const uint8_t *buf, size_t len) override; + +    private: +        FILE *m_fd = nullptr; +}; + +/*! This defines the on-wire representation of a ZMQ message header. + * It must be compatible with the definition in ODR-DabMux. + * + * The data follows right after this header */ +struct zmq_frame_header_t +{ +    uint16_t version; // we support version=1 now +    uint16_t encoder; // see ZMQ_ENCODER_XYZ + +    /* length of the 'data' field */ +    uint32_t datasize; + +    /* Audio level, peak, linear PCM */ +    int16_t audiolevel_left; +    int16_t audiolevel_right; + +    /* Data follows this header */ +} __attribute__ ((packed)); + +#define ZMQ_ENCODER_FDK 1 +#define ZMQ_ENCODER_TOOLAME 2 + +#define ZMQ_HEADER_SIZE sizeof(struct zmq_frame_header_t) + +/* The expected frame size incl data of the given frame */ +#define ZMQ_FRAME_SIZE(f) (sizeof(struct zmq_frame_header_t) + f->datasize) + +#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(struct zmq_frame_header_t) ) + + +class ZMQ: public Base { +    public: +        ZMQ(); +        ZMQ(const ZMQ&) = delete; +        ZMQ& operator=(const ZMQ&) = delete; +        virtual ~ZMQ() override; + +        void connect(const char *uri, const char *keyfile); +        void set_encoder_type(encoder_selection_t& enc, int bitrate); + +        virtual bool write_frame(const uint8_t *buf, size_t len) override; + +    private: +        virtual bool write_toolame(const uint8_t *buf, size_t len); +        virtual bool send_frame(const uint8_t *buf, size_t len); + +        zmq::context_t m_ctx; +        zmq::socket_t m_sock; + +        int m_bitrate = 0; +        char m_secretkey[CURVE_KEYLEN+1]; +        encoder_selection_t m_encoder = encoder_selection_t::fdk_dabplus; +        using vec_u8 = std::vector<uint8_t>; +        vec_u8 m_framebuf; +        std::deque<uint8_t> m_toolame_buffer; +}; + +} diff --git a/src/common.h b/src/common.h index cd856f4..774a4a0 100644 --- a/src/common.h +++ b/src/common.h @@ -16,8 +16,7 @@   * -------------------------------------------------------------------   */ -#ifndef __COMMON_H_ -#define __COMMON_H_ +#pragma once  // 16 bits per sample is fine for now  #define BYTES_PER_SAMPLE 2 @@ -25,5 +24,9 @@  // How many samples we insert into the queue each call  #define NUM_SAMPLES_PER_CALL 10 // 10 samples @ 32kHz = 3.125ms -#endif // __COMMON_H_ +//! Enumeration of encoders we can use +enum class encoder_selection_t { +    fdk_dabplus, +    toolame_dab +}; diff --git a/src/odr-audioenc.cpp b/src/odr-audioenc.cpp index f56c6df..7f11edd 100644 --- a/src/odr-audioenc.cpp +++ b/src/odr-audioenc.cpp @@ -36,6 +36,7 @@   *  - \ref VLCInput.h VLC Input   *  - \ref AlsaInput.h Alsa Input   *  - \ref JackInput.h JACK Input + *  - \ref Outputs.h ZeroMQ and file outputs   *  - \ref SampleQueue.h   *  - \ref charset.h Charset conversion   *  - \ref toolame.h libtolame API @@ -54,7 +55,7 @@  #include "VLCInput.h"  #include "SampleQueue.h"  #include "AACDecoder.h" -#include "zmq.hpp" +#include "Outputs.h"  #include "common.h"  #include "wavfile.h" @@ -91,12 +92,6 @@ constexpr int MAX_FAULTS_ALLOWED = 5;  using vec_u8 = std::vector<uint8_t>; -//! Enumeration of encoders we can use -enum class encoder_selection_t { -    fdk_dabplus, -    toolame_dab -}; -  using namespace std;  struct audioenc_settings_t { @@ -470,8 +465,8 @@ int main(int argc, char *argv[])      encoder_selection_t selected_encoder = encoder_selection_t::fdk_dabplus; -    // For the file output -    FILE *out_fh = nullptr; +    shared_ptr<Output::File> file_output; +    shared_ptr<Output::ZMQ> zmq_output;      vector<string> output_uris; @@ -485,7 +480,6 @@ int main(int argc, char *argv[])      string dab_channel_mode;      int dab_psy_model = 1; -    deque<uint8_t> toolame_output_buffer;      /* Keep track of peaks */      int peak_left  = 0; @@ -764,63 +758,38 @@ int main(int argc, char *argv[])          return 1;      } -    zmq::context_t zmq_ctx; -    zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB); - -    // Do not wait at teardown to send all data out -    int linger = 0; -    zmq_sock.setsockopt(ZMQ_LINGER, &linger, sizeof(linger)); +    if (output_uris.empty()) { +        fprintf(stderr, "No output URI defined\n"); +        return 1; +    } -    if (not output_uris.empty()) { -        for (auto uri : output_uris) { -            if (uri == "-") { -                if (out_fh != nullptr) { -                    fprintf(stderr, "You can't write to more than one file!\n"); -                    return 1; -                } -                out_fh = stdout; +    for (const auto& uri : output_uris) { +        if (uri == "-") { +            if (file_output) { +                fprintf(stderr, "You can't write to more than one file!\n"); +                return 1;              } -            else if ((uri.compare(0, 6, "tcp://") == 0) || -                    (uri.compare(0, 6, "pgm://") == 0) || -                    (uri.compare(0, 7, "epgm://") == 0) || -                    (uri.compare(0, 6, "ipc://") == 0)) { -                if (keyfile) { -                    fprintf(stderr, "Enabling encryption\n"); - -                    int rc = readkey(keyfile, secretkey); -                    if (rc) { -                        fprintf(stderr, "Error reading secret key\n"); -                        return 2; -                    } - -                    const int yes = 1; -                    zmq_sock.setsockopt(ZMQ_CURVE_SERVER, -                            &yes, sizeof(yes)); +            file_output = make_shared<Output::File>(stdout); +        } +        else if ((uri.compare(0, 6, "tcp://") == 0) || +                (uri.compare(0, 6, "pgm://") == 0) || +                (uri.compare(0, 7, "epgm://") == 0) || +                (uri.compare(0, 6, "ipc://") == 0)) { -                    zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, -                            secretkey, CURVE_KEYLEN); -                } -                zmq_sock.connect(uri.c_str()); +            if (not zmq_output) { +                zmq_output = make_shared<Output::ZMQ>();              } -            else { // We assume it's a file name -                if (out_fh != nullptr) { -                    fprintf(stderr, "You can't write to more than one file!\n"); -                    return 1; -                } - -                out_fh = fopen(uri.c_str(), "wb"); -                if (!out_fh) { -                    fprintf(stderr, "Can't open output file!\n"); -                    return 1; -                } +            zmq_output->connect(uri.c_str(), keyfile); +        } +        else { // We assume it's a file name +            if (file_output) { +                fprintf(stderr, "You can't write to more than one file!\n"); +                return 1;              } +            file_output = make_shared<Output::File>(uri.c_str());          }      } -    else { -        fprintf(stderr, "No output URI defined\n"); -        return 1; -    }      if (padlen != 0) {          int flags; @@ -956,13 +925,15 @@ int main(int argc, char *argv[])          return 1;      } +    zmq_output->set_encoder_type(selected_encoder, bitrate); +      int outbuf_size;      vec_u8 zmqframebuf;      vec_u8 outbuf; +      if (selected_encoder == encoder_selection_t::fdk_dabplus) {          outbuf_size = bitrate/8*120;          outbuf.resize(24*120); -        zmqframebuf.resize(ZMQ_HEADER_SIZE + 24*120);          if(outbuf_size % 5 != 0) {              fprintf(stderr, "Warning: (outbuf_size mod 5) = %d\n", outbuf_size % 5); @@ -972,13 +943,8 @@ int main(int argc, char *argv[])          outbuf_size = 4092;          outbuf.resize(outbuf_size);          fprintf(stderr, "Setting outbuf size to %zu\n", outbuf.size()); - -        // ODR-DabMux expects frames of length 3*bitrate -        zmqframebuf.resize(ZMQ_HEADER_SIZE + 3 * bitrate);      } -    zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)&zmqframebuf[0]; -      unsigned char pad_buf[padlen + 1];      if (restart_on_fault) { @@ -1327,61 +1293,18 @@ int main(int argc, char *argv[])          }          if (numOutBytes != 0) { -            if (out_fh) { -                fwrite(&outbuf[0], 1, numOutBytes, out_fh); +            if (file_output) { +                file_output->write_frame(outbuf.data(), numOutBytes);              } -            else { -                // ------------ ZeroMQ transmit -                try { -                    if (selected_encoder == encoder_selection_t::fdk_dabplus) { -                        zmq_frame_header->encoder = ZMQ_ENCODER_FDK; -                        zmq_frame_header->version = 1; -                        zmq_frame_header->datasize = numOutBytes; -                        zmq_frame_header->audiolevel_left = peak_left; -                        zmq_frame_header->audiolevel_right = peak_right; +            else if (zmq_output) { +                bool success = zmq_output->write_frame(outbuf.data(), numOutBytes); -                        assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= zmqframebuf.size()); - -                        memcpy(ZMQ_FRAME_DATA(zmq_frame_header), -                                &outbuf[0], numOutBytes); - -                        zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header), -                                ZMQ_DONTWAIT); - -                    } -                    else if (selected_encoder == encoder_selection_t::toolame_dab) { -                        toolame_output_buffer.insert(toolame_output_buffer.end(), -                                outbuf.begin(), outbuf.begin() + numOutBytes); - -                        while (toolame_output_buffer.size() > 3 * bitrate) { -                            zmq_frame_header->encoder = ZMQ_ENCODER_TOOLAME; -                            zmq_frame_header->version = 1; -                            zmq_frame_header->datasize = 3 * bitrate; -                            zmq_frame_header->audiolevel_left = peak_left; -                            zmq_frame_header->audiolevel_right = peak_right; - -                            uint8_t *encoded_frame = ZMQ_FRAME_DATA(zmq_frame_header); - -                            // no memcpy for std::deque -                            for (size_t i = 0; i < 3*bitrate; i++) { -                                encoded_frame[i] = toolame_output_buffer[i]; -                            } - -                            zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header), -                                    ZMQ_DONTWAIT); - -                            toolame_output_buffer.erase(toolame_output_buffer.begin(), -                                                        toolame_output_buffer.begin() + 3 * bitrate); -                        } -                    } -                } -                catch (zmq::error_t& e) { +                if (not success) {                      fprintf(stderr, "ZeroMQ send error !\n");                      send_error_count ++;                  } -                if (send_error_count > 10) -                { +                if (send_error_count > 10) {                      fprintf(stderr, "ZeroMQ send failed ten times, aborting!\n");                      retval = 4;                      break; @@ -1389,8 +1312,7 @@ int main(int argc, char *argv[])              }          } -        if (numOutBytes != 0) -        { +        if (numOutBytes != 0) {              if (show_level) {                  if (settings.channels == 1) {                      fprintf(stderr, "\rIn: [%-6s] %1s %1s %1s", @@ -1430,11 +1352,9 @@ int main(int argc, char *argv[])      fprintf(stderr, "\n"); -    if (out_fh) { -        fclose(out_fh); -    } +    file_output.reset(); +    zmq_output.reset(); -    zmq_sock.close();      free_rs_char(rs_handler);      if (selected_encoder == encoder_selection_t::fdk_dabplus) { diff --git a/src/utils.h b/src/utils.h index 83b3e4d..2cb06c3 100644 --- a/src/utils.h +++ b/src/utils.h @@ -1,5 +1,4 @@ -#ifndef UTILS_H_ -#define UTILS_H_ +#pragma once  #include <math.h>  #include <stdint.h> @@ -17,37 +16,5 @@   */  const char* level(int channel, int peak); -/*! This defines the on-wire representation of a ZMQ message header. - * It must be compatible with the definition in ODR-DabMux. - * - * The data follows right after this header */ -struct zmq_frame_header_t -{ -    uint16_t version; // we support version=1 now -    uint16_t encoder; // see ZMQ_ENCODER_XYZ - -    /* length of the 'data' field */ -    uint32_t datasize; - -    /* Audio level, peak, linear PCM */ -    int16_t audiolevel_left; -    int16_t audiolevel_right; - -    /* Data follows this header */ -} __attribute__ ((packed)); - -#define ZMQ_ENCODER_FDK 1 -#define ZMQ_ENCODER_TOOLAME 2 - -#define ZMQ_HEADER_SIZE sizeof(struct zmq_frame_header_t) - -/* The expected frame size incl data of the given frame */ -#define ZMQ_FRAME_SIZE(f) (sizeof(struct zmq_frame_header_t) + f->datasize) - -#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(struct zmq_frame_header_t) ) - -  size_t strlen_utf8(const char *s); -#endif -  | 
