aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-09-23 20:30:57 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-09-23 20:30:57 +0200
commit3edd67dc81cd637e06ea22221f3aebfa0111d989 (patch)
tree27ff4d2749a6081b2439e6fa8baec8270a5a2d7b
parent09e514732788d821189c59ddc58e70355ba1a3cb (diff)
downloadODR-SourceCompanion-3edd67dc81cd637e06ea22221f3aebfa0111d989.tar.gz
ODR-SourceCompanion-3edd67dc81cd637e06ea22221f3aebfa0111d989.tar.bz2
ODR-SourceCompanion-3edd67dc81cd637e06ea22221f3aebfa0111d989.zip
Add output code from ODR-AudioEnc with EDI output
-rw-r--r--Makefile.am3
-rw-r--r--src/Outputs.cpp258
-rw-r--r--src/Outputs.h161
-rw-r--r--src/common.h32
-rw-r--r--src/odr-sourcecompanion.cpp132
5 files changed, 529 insertions, 57 deletions
diff --git a/Makefile.am b/Makefile.am
index 4884dab..6f925f6 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -9,11 +9,12 @@ endif
odr_sourcecompanion_LDADD = -lzmq
odr_sourcecompanion_CFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall
-odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc/fec -Ilib
+odr_sourcecompanion_CXXFLAGS = $(GITVERSION_FLAGS) -ggdb -O2 -Wall -Isrc -Ilib
odr_sourcecompanion_SOURCES = src/odr-sourcecompanion.cpp \
src/AACDecoder.h src/AACDecoder.cpp \
src/AVTInput.h src/AVTInput.cpp \
src/OrderedQueue.h src/OrderedQueue.cpp \
+ src/Outputs.h src/Outputs.cpp \
src/StatsPublish.h src/StatsPublish.cpp \
src/encryption.h src/encryption.c \
src/utils.h src/utils.c \
diff --git a/src/Outputs.cpp b/src/Outputs.cpp
new file mode 100644
index 0000000..d0d3ca4
--- /dev/null
+++ b/src/Outputs.cpp
@@ -0,0 +1,258 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2011 Martin Storsjo
+ * Copyright (C) 2019 Matthias P. Braendli
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#include "Outputs.h"
+#include <string>
+#include <stdexcept>
+#include <cstring>
+#include <cerrno>
+#include <cassert>
+
+namespace Output {
+
+using namespace std;
+
+void Base::update_audio_levels(int16_t audiolevel_left, int16_t audiolevel_right)
+{
+ m_audio_left = audiolevel_left;
+ m_audio_right = audiolevel_right;
+}
+
+File::File(const char *filename)
+{
+ m_fd = fopen(filename, "wb");
+ if (m_fd == nullptr) {
+ throw runtime_error(string("Error opening output file: ") + strerror(errno));
+ }
+}
+
+File::File(FILE *fd) : m_fd(fd) { }
+
+File::~File() {
+ if (m_fd) {
+ fclose(m_fd);
+ m_fd = nullptr;
+ }
+}
+
+bool File::write_frame(const uint8_t *buf, size_t len)
+{
+ if (m_fd == nullptr) {
+ throw logic_error("Invalid usage of closed File output");
+ }
+
+ return fwrite(buf, len, 1, m_fd) == 1;
+}
+
+ZMQ::ZMQ() :
+ m_ctx(),
+ m_sock(m_ctx, ZMQ_PUB)
+{
+ // Do not wait at teardown to send all data out
+ int linger = 0;
+ m_sock.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
+}
+
+ZMQ::~ZMQ() {}
+
+void ZMQ::connect(const char *uri, const char *keyfile)
+{
+ if (keyfile) {
+ fprintf(stderr, "Enabling encryption\n");
+
+ int rc = readkey(keyfile, m_secretkey);
+ if (rc) {
+ throw runtime_error("Error reading secret key");
+ }
+
+ const int yes = 1;
+ m_sock.setsockopt(ZMQ_CURVE_SERVER,
+ &yes, sizeof(yes));
+
+ m_sock.setsockopt(ZMQ_CURVE_SECRETKEY,
+ m_secretkey, CURVE_KEYLEN);
+ }
+ m_sock.connect(uri);
+}
+
+void ZMQ::set_encoder_type(encoder_selection_t& enc, int bitrate)
+{
+ m_encoder = enc;
+ m_bitrate = bitrate;
+}
+
+bool ZMQ::write_frame(const uint8_t *buf, size_t len)
+{
+ if (m_framebuf.size() != ZMQ_HEADER_SIZE + len) {
+ m_framebuf.resize(ZMQ_HEADER_SIZE + len);
+ }
+
+ zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)m_framebuf.data();
+
+ try {
+ switch (m_encoder) {
+ case encoder_selection_t::fdk_dabplus:
+ zmq_frame_header->encoder = ZMQ_ENCODER_FDK;
+ break;
+ case encoder_selection_t::toolame_dab:
+ zmq_frame_header->encoder = ZMQ_ENCODER_TOOLAME;
+ break;
+ }
+
+ zmq_frame_header->version = 1;
+ zmq_frame_header->datasize = len;
+ zmq_frame_header->audiolevel_left = m_audio_left;
+ zmq_frame_header->audiolevel_right = m_audio_right;
+
+ assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= m_framebuf.size());
+
+ memcpy(ZMQ_FRAME_DATA(zmq_frame_header), buf, len);
+
+ m_sock.send(m_framebuf.data(), ZMQ_FRAME_SIZE(zmq_frame_header),
+ ZMQ_DONTWAIT);
+ }
+ catch (zmq::error_t& e) {
+ fprintf(stderr, "ZeroMQ send error !\n");
+ return false;
+ }
+
+ return true;
+}
+
+EDI::EDI() :
+ m_clock_tai({})
+{ }
+
+EDI::~EDI() { }
+
+void EDI::add_udp_destination(const std::string& host, unsigned int port)
+{
+ auto dest = make_shared<edi::udp_destination_t>();
+ dest->dest_addr = host;
+ m_edi_conf.dest_port = port;
+ m_edi_conf.destinations.push_back(dest);
+
+ // We cannot carry AF packets over UDP, because they would be too large.
+ m_edi_conf.enable_pft = true;
+
+ // TODO make FEC configurable
+}
+
+void EDI::add_tcp_destination(const std::string& host, unsigned int port)
+{
+ auto dest = make_shared<edi::tcp_client_t>();
+ dest->dest_addr = host;
+ if (dest->dest_port != 0 and dest->dest_port != port) {
+ throw runtime_error("All EDI UDP outputs must be to the same destination port");
+ }
+ dest->dest_port = port;
+ m_edi_conf.destinations.push_back(dest);
+
+ m_edi_conf.dump = true;
+}
+
+bool EDI::enabled() const
+{
+ return not m_edi_conf.destinations.empty();
+}
+
+void EDI::set_tist(bool enable, uint32_t delay_ms)
+{
+ m_tist = enable;
+ m_delay_ms = delay_ms;
+}
+
+bool EDI::write_frame(const uint8_t *buf, size_t len)
+{
+ if (not m_edi_sender) {
+ m_edi_sender = make_shared<edi::Sender>(m_edi_conf);
+ }
+
+ if (m_edi_time == 0) {
+ using Sec = chrono::seconds;
+ const auto now = chrono::time_point_cast<Sec>(chrono::system_clock::now());
+ m_edi_time = chrono::system_clock::to_time_t(now) + (m_delay_ms / 1000);
+ m_send_version_at_time = m_edi_time;
+
+ /* TODO we still have to see if 24ms granularity is achievable, given that
+ * one DAB+ super frame is carried over more than 1 ETI frame.
+ */
+ for (int32_t sub_ms = (m_delay_ms % 1000); sub_ms > 0; sub_ms -= 24) {
+ m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2
+ }
+ }
+
+ edi::TagStarPTR edi_tagStarPtr("DSTI");
+
+ m_edi_tagDSTI.stihf = false;
+ m_edi_tagDSTI.atstf = m_tist;
+
+ m_timestamp += 24 << 14; // Shift 24ms by 14 to Timestamp level 2
+ if (m_timestamp > 0xf9FFff) {
+ m_timestamp -= 0xfa0000; // Substract 16384000, corresponding to one second
+ m_edi_time += 1;
+
+ m_num_seconds_sent++;
+ }
+
+ m_edi_tagDSTI.set_edi_time(m_edi_time, m_clock_tai.get_offset());
+ m_edi_tagDSTI.tsta = m_timestamp & 0xffffff;
+
+ m_edi_tagDSTI.rfadf = false;
+ // DFCT is handled inside the TagDSTI
+
+ edi::TagSSm edi_tagPayload;
+ // TODO make edi_tagPayload.stid configurable
+ edi_tagPayload.istd_data = buf;
+ edi_tagPayload.istd_length = len;
+
+ edi::TagODRAudioLevels edi_tagAudioLevels(m_audio_left, m_audio_right);
+
+ stringstream ss;
+ ss << PACKAGE_NAME << " " <<
+#if defined(GITVERSION)
+ GITVERSION;
+#else
+ PACKAGE_VERSION;
+#endif
+ edi::TagODRVersion edi_tagVersion(ss.str(), m_num_seconds_sent);
+
+
+ // The above Tag Items will be assembled into a TAG Packet
+ edi::TagPacket edi_tagpacket(m_edi_conf.tagpacket_alignment);
+
+ // put tags *ptr, DETI and all subchannels into one TagPacket
+ edi_tagpacket.tag_items.push_back(&edi_tagStarPtr);
+ edi_tagpacket.tag_items.push_back(&m_edi_tagDSTI);
+ edi_tagpacket.tag_items.push_back(&edi_tagPayload);
+ edi_tagpacket.tag_items.push_back(&edi_tagAudioLevels);
+
+ // Send version information only every 10 seconds to save bandwidth
+ if (m_send_version_at_time < m_edi_time) {
+ m_send_version_at_time += 10;
+ edi_tagpacket.tag_items.push_back(&edi_tagVersion);
+ }
+
+ m_edi_sender->write(edi_tagpacket);
+
+ // TODO Handle TCP disconnect
+ return true;
+}
+
+}
diff --git a/src/Outputs.h b/src/Outputs.h
new file mode 100644
index 0000000..0f1f34f
--- /dev/null
+++ b/src/Outputs.h
@@ -0,0 +1,161 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2011 Martin Storsjo
+ * Copyright (C) 2019 Matthias P. Braendli
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#pragma once
+#include <vector>
+#include <chrono>
+#include <deque>
+#include <cstdint>
+#include <cstddef>
+#include <cstdio>
+#include "common.h"
+#include "zmq.hpp"
+#include "ClockTAI.h"
+#include "edi/TagItems.h"
+#include "edi/TagPacket.h"
+#include "edi/AFPacket.h"
+#include "edi/Transport.h"
+extern "C" {
+#include "encryption.h"
+}
+
+namespace Output {
+
+/*! \file Outputs.h
+ *
+ * Declaration of all outputs
+ */
+
+class Base {
+ public:
+ virtual ~Base() {};
+
+ /*! Write a buffer of encoded data to the output */
+ virtual bool write_frame(const uint8_t *buf, size_t len) = 0;
+
+ /*! Update peak audio level information */
+ virtual void update_audio_levels(
+ int16_t audiolevel_left, int16_t audiolevel_right);
+
+ protected:
+ int16_t m_audio_left = 0;
+ int16_t m_audio_right = 0;
+};
+
+class File : public Base {
+ public:
+ File(const char *filename);
+ File(FILE *file);
+ File(const File&) = delete;
+ File& operator=(const File&) = delete;
+ virtual ~File() override;
+
+ virtual bool write_frame(const uint8_t *buf, size_t len) override;
+
+ private:
+ FILE *m_fd = nullptr;
+};
+
+/*! This defines the on-wire representation of a ZMQ message header.
+ * It must be compatible with the definition in ODR-DabMux.
+ *
+ * The data follows right after this header */
+struct zmq_frame_header_t
+{
+ uint16_t version; // we support version=1 now
+ uint16_t encoder; // see ZMQ_ENCODER_XYZ
+
+ /* length of the 'data' field */
+ uint32_t datasize;
+
+ /* Audio level, peak, linear PCM */
+ int16_t audiolevel_left;
+ int16_t audiolevel_right;
+
+ /* Data follows this header */
+} __attribute__ ((packed));
+
+#define ZMQ_ENCODER_FDK 1
+#define ZMQ_ENCODER_TOOLAME 2
+
+#define ZMQ_HEADER_SIZE sizeof(struct zmq_frame_header_t)
+
+/* The expected frame size incl data of the given frame */
+#define ZMQ_FRAME_SIZE(f) (sizeof(struct zmq_frame_header_t) + f->datasize)
+
+#define ZMQ_FRAME_DATA(f) ( ((uint8_t*)f)+sizeof(struct zmq_frame_header_t) )
+
+
+class ZMQ: public Base {
+ public:
+ ZMQ();
+ ZMQ(const ZMQ&) = delete;
+ ZMQ& operator=(const ZMQ&) = delete;
+ virtual ~ZMQ() override;
+
+ void connect(const char *uri, const char *keyfile);
+ void set_encoder_type(encoder_selection_t& enc, int bitrate);
+
+ virtual bool write_frame(const uint8_t *buf, size_t len) override;
+
+ private:
+ zmq::context_t m_ctx;
+ zmq::socket_t m_sock;
+
+ int m_bitrate = 0;
+ char m_secretkey[CURVE_KEYLEN+1];
+ encoder_selection_t m_encoder = encoder_selection_t::fdk_dabplus;
+ using vec_u8 = std::vector<uint8_t>;
+ vec_u8 m_framebuf;
+};
+
+
+class EDI: public Base {
+ public:
+ EDI();
+ EDI(const EDI&) = delete;
+ EDI& operator=(const EDI&) = delete;
+ virtual ~EDI() override;
+
+ void add_udp_destination(const std::string& host, unsigned int port);
+ void add_tcp_destination(const std::string& host, unsigned int port);
+
+ void set_tist(bool enable, uint32_t delay_ms);
+
+ bool enabled() const;
+
+ virtual bool write_frame(const uint8_t *buf, size_t len) override;
+
+ private:
+ edi::configuration_t m_edi_conf;
+ std::shared_ptr<edi::Sender> m_edi_sender;
+
+ uint32_t m_timestamp = 0;
+ uint32_t m_num_seconds_sent = 0;
+ std::time_t m_edi_time = 0;
+ std::time_t m_send_version_at_time = 0;
+
+ edi::TagDSTI m_edi_tagDSTI;
+
+ ClockTAI m_clock_tai;
+ bool m_tist = false;
+ uint32_t m_delay_ms = 0;
+};
+
+}
diff --git a/src/common.h b/src/common.h
new file mode 100644
index 0000000..774a4a0
--- /dev/null
+++ b/src/common.h
@@ -0,0 +1,32 @@
+/* ------------------------------------------------------------------
+ * Copyright (C) 2016 Matthias P. Braendli
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ * -------------------------------------------------------------------
+ */
+
+#pragma once
+
+// 16 bits per sample is fine for now
+#define BYTES_PER_SAMPLE 2
+
+// How many samples we insert into the queue each call
+#define NUM_SAMPLES_PER_CALL 10 // 10 samples @ 32kHz = 3.125ms
+
+//! Enumeration of encoders we can use
+enum class encoder_selection_t {
+ fdk_dabplus,
+ toolame_dab
+};
+
diff --git a/src/odr-sourcecompanion.cpp b/src/odr-sourcecompanion.cpp
index a69f705..8b5b701 100644
--- a/src/odr-sourcecompanion.cpp
+++ b/src/odr-sourcecompanion.cpp
@@ -27,6 +27,7 @@
#include "zmq.hpp"
#include "AVTInput.h"
+#include "Outputs.h"
#include "AACDecoder.h"
#include "StatsPublish.h"
#include <sys/time.h>
@@ -89,10 +90,10 @@ void usage(const char* name) {
" --ps Force the usage of PS\n"
" Output and pad parameters:\n"
" -o, --output=URI Output ZMQ uri. (e.g. 'tcp://localhost:9000')\n"
- " -or- Output file uri. (e.g. 'file.dabp')\n"
- " -or- a single dash '-' to denote stdout\n"
" If more than one ZMQ output is given, the socket\n"
" will be connected to all listed endpoints.\n"
+ " -e, --edi=URI EDI output uri, (e.g. 'tcp://localhost:7000')\n"
+ " -T, --timestamp-delay=DELAY_MS Enabled timestamps in EDI (requires TAI clock bulletin download) and\n"
" -k, --secret-key=FILE Enable ZMQ encryption with the given secret key.\n"
" -p, --pad=BYTES Set PAD size in bytes.\n"
" -P, --pad-fifo=FILENAME Set PAD data input fifo name"
@@ -137,12 +138,13 @@ int main(int argc, char *argv[])
string send_stats_to = "";
/* Data for ZMQ CURVE authentication */
- char* keyfile = nullptr;
- char secretkey[CURVE_KEYLEN+1];
+ char *keyfile = nullptr;
const struct option longopts[] = {
{"bitrate", required_argument, 0, 'b'},
{"channels", required_argument, 0, 'c'},
+ {"edi", required_argument, 0, 'e'},
+ {"timestamp-delay", required_argument, 0, 'T'},
{"output", required_argument, 0, 'o'},
{"pad", required_argument, 0, 'p'},
{"pad-fifo", required_argument, 0, 'P'},
@@ -183,13 +185,17 @@ int main(int argc, char *argv[])
bool allowSBR = false;
bool allowPS = false;
+ vector<string> edi_output_uris;
+ bool tist_enabled = false;
+ uint32_t tist_delay_ms = 0;
+
int bitrate = 0;
int channels = 2;
int sample_rate = 48000;
char ch = 0;
int index;
while(ch != -1) {
- ch = getopt_long(argc, argv, "hlb:c:k:o:r:p:P:I:", longopts, &index);
+ ch = getopt_long(argc, argv, "hlb:c:e:T:k:o:r:p:P:I:", longopts, &index);
switch (ch) {
case 0: // AAC-LC
allowPS = false;
@@ -204,10 +210,17 @@ int main(int argc, char *argv[])
allowSBR = true;
break;
case 'b':
- bitrate = atoi(optarg);
+ bitrate = stoi(optarg);
break;
case 'c':
- channels = atoi(optarg);
+ channels = stoi(optarg);
+ break;
+ case 'e':
+ edi_output_uris.push_back(optarg);
+ break;
+ case 'T':
+ tist_enabled = true;
+ tist_delay_ms = std::stoi(optarg);
break;
case 'k':
keyfile = optarg;
@@ -219,13 +232,13 @@ int main(int argc, char *argv[])
output_uris.push_back(optarg);
break;
case 'p':
- padlen = atoi(optarg);
+ padlen = stoi(optarg);
break;
case 'P':
pad_fifo = optarg;
break;
case 'r':
- sample_rate = atoi(optarg);
+ sample_rate = stoi(optarg);
break;
case 'S':
send_stats_to = optarg;
@@ -238,16 +251,16 @@ int main(int argc, char *argv[])
avt_output_uri = optarg;
break;
case 7:
- avt_timeout = atoi(optarg);
+ avt_timeout = stoi(optarg);
if (avt_timeout < 0) {
avt_timeout = 2000;
}
break;
case 8:
- avt_pad_port = atoi(optarg);
+ avt_pad_port = stoi(optarg);
break;
case 9:
- avt_jitterBufferSize = atoi(optarg);
+ avt_jitterBufferSize = stoi(optarg);
break;
case '?':
case 'h':
@@ -266,33 +279,52 @@ int main(int argc, char *argv[])
return 1;
}
- zmq::context_t zmq_ctx;
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB);
+ shared_ptr<Output::ZMQ> zmq_output;
+ Output::EDI edi_output;
- if (not output_uris.empty()) {
- for (auto uri : output_uris) {
- if (keyfile) {
- fprintf(stderr, "Enabling encryption\n");
+ if (output_uris.empty() and edi_output_uris.empty()) {
+ fprintf(stderr, "No output URIs defined\n");
+ return 1;
+ }
- int rc = readkey(keyfile, secretkey);
- if (rc) {
- fprintf(stderr, "Error reading secret key\n");
- return 2;
- }
+ for (const auto& uri : output_uris) {
+ if (not zmq_output) {
+ zmq_output = make_shared<Output::ZMQ>();
+ }
- const int yes = 1;
- zmq_sock.setsockopt(ZMQ_CURVE_SERVER,
- &yes, sizeof(yes));
+ zmq_output->connect(uri.c_str(), keyfile);
+ }
- zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY,
- secretkey, CURVE_KEYLEN);
+ for (const auto& uri : edi_output_uris) {
+ if (uri.compare(0, 6, "tcp://") == 0 or
+ uri.compare(0, 6, "udp://") == 0) {
+ auto host_port_sep_ix = uri.find(':', 6);
+ if (host_port_sep_ix != string::npos) {
+ auto host = uri.substr(6, host_port_sep_ix - 6);
+ auto port = std::stoi(uri.substr(host_port_sep_ix + 1));
+
+ auto proto = uri.substr(0, 3);
+ if (proto == "tcp") {
+ edi_output.add_tcp_destination(host, port);
+ }
+ else if (proto == "udp") {
+ edi_output.add_udp_destination(host, port);
+ }
+ else {
+ throw logic_error("unhandled proto");
+ }
}
- zmq_sock.connect(uri.c_str());
+ else {
+ fprintf(stderr, "Invalid EDI URL host!\n");
+ }
+ }
+ else {
+ fprintf(stderr, "Invalid EDI protocol!\n");
}
}
- else {
- fprintf(stderr, "No output URI defined\n");
- return 1;
+
+ if (not edi_output_uris.empty()) {
+ edi_output.set_tist(tist_enabled, tist_delay_ms);
}
if (padlen != 0) {
@@ -350,19 +382,15 @@ int main(int argc, char *argv[])
}
int outbuf_size;
- std::vector<uint8_t> zmqframebuf;
std::vector<uint8_t> outbuf;
outbuf_size = bitrate/8*120;
outbuf.resize(24*120);
- zmqframebuf.resize(ZMQ_HEADER_SIZE + 24*120);
if (outbuf_size % 5 != 0) {
fprintf(stderr, "Warning: (outbuf_size mod 5) = %d\n", outbuf_size % 5);
}
- zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)&zmqframebuf[0];
-
unsigned char pad_buf[padlen + 1];
fprintf(stderr, "Starting encoding\n");
@@ -449,28 +477,22 @@ int main(int argc, char *argv[])
read_bytes = numOutBytes;
if (numOutBytes != 0) {
- // ------------ ZeroMQ transmit
- try {
- zmq_frame_header->encoder = ZMQ_ENCODER_FDK;
- zmq_frame_header->version = 1;
- zmq_frame_header->datasize = numOutBytes;
- zmq_frame_header->audiolevel_left = peak_left;
- zmq_frame_header->audiolevel_right = peak_right;
-
- assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= zmqframebuf.size());
-
- memcpy(ZMQ_FRAME_DATA(zmq_frame_header),
- &outbuf[0], numOutBytes);
- zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header),
- ZMQ_DONTWAIT);
+ bool success = false;
+ if (zmq_output) {
+ zmq_output->update_audio_levels(peak_left, peak_right);
+ success = zmq_output->write_frame(outbuf.data(), outbuf.size());
}
- catch (zmq::error_t& e) {
- fprintf(stderr, "ZeroMQ send error !\n");
- send_error_count ++;
+ else if (edi_output.enabled()) {
+ edi_output.update_audio_levels(peak_left, peak_right);
+ success = edi_output.write_frame(outbuf.data(), outbuf.size());
+ }
+
+ if (not success) {
+ send_error_count++;
}
if (send_error_count > 10) {
- fprintf(stderr, "ZeroMQ send failed ten times, aborting!\n");
+ fprintf(stderr, "Send failed ten times, aborting!\n");
retval = 4;
break;
}
@@ -500,7 +522,5 @@ int main(int argc, char *argv[])
fprintf(stderr, "\n");
- zmq_sock.close();
-
return retval;
}