From 3edd67dc81cd637e06ea22221f3aebfa0111d989 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Mon, 23 Sep 2019 20:30:57 +0200 Subject: Add output code from ODR-AudioEnc with EDI output --- src/odr-sourcecompanion.cpp | 132 +++++++++++++++++++++++++------------------- 1 file changed, 76 insertions(+), 56 deletions(-) (limited to 'src/odr-sourcecompanion.cpp') diff --git a/src/odr-sourcecompanion.cpp b/src/odr-sourcecompanion.cpp index a69f705..8b5b701 100644 --- a/src/odr-sourcecompanion.cpp +++ b/src/odr-sourcecompanion.cpp @@ -27,6 +27,7 @@ #include "zmq.hpp" #include "AVTInput.h" +#include "Outputs.h" #include "AACDecoder.h" #include "StatsPublish.h" #include @@ -89,10 +90,10 @@ void usage(const char* name) { " --ps Force the usage of PS\n" " Output and pad parameters:\n" " -o, --output=URI Output ZMQ uri. (e.g. 'tcp://localhost:9000')\n" - " -or- Output file uri. (e.g. 'file.dabp')\n" - " -or- a single dash '-' to denote stdout\n" " If more than one ZMQ output is given, the socket\n" " will be connected to all listed endpoints.\n" + " -e, --edi=URI EDI output uri, (e.g. 'tcp://localhost:7000')\n" + " -T, --timestamp-delay=DELAY_MS Enabled timestamps in EDI (requires TAI clock bulletin download) and\n" " -k, --secret-key=FILE Enable ZMQ encryption with the given secret key.\n" " -p, --pad=BYTES Set PAD size in bytes.\n" " -P, --pad-fifo=FILENAME Set PAD data input fifo name" @@ -137,12 +138,13 @@ int main(int argc, char *argv[]) string send_stats_to = ""; /* Data for ZMQ CURVE authentication */ - char* keyfile = nullptr; - char secretkey[CURVE_KEYLEN+1]; + char *keyfile = nullptr; const struct option longopts[] = { {"bitrate", required_argument, 0, 'b'}, {"channels", required_argument, 0, 'c'}, + {"edi", required_argument, 0, 'e'}, + {"timestamp-delay", required_argument, 0, 'T'}, {"output", required_argument, 0, 'o'}, {"pad", required_argument, 0, 'p'}, {"pad-fifo", required_argument, 0, 'P'}, @@ -183,13 +185,17 @@ int main(int argc, char *argv[]) bool allowSBR = false; bool allowPS = false; + vector edi_output_uris; + bool tist_enabled = false; + uint32_t tist_delay_ms = 0; + int bitrate = 0; int channels = 2; int sample_rate = 48000; char ch = 0; int index; while(ch != -1) { - ch = getopt_long(argc, argv, "hlb:c:k:o:r:p:P:I:", longopts, &index); + ch = getopt_long(argc, argv, "hlb:c:e:T:k:o:r:p:P:I:", longopts, &index); switch (ch) { case 0: // AAC-LC allowPS = false; @@ -204,10 +210,17 @@ int main(int argc, char *argv[]) allowSBR = true; break; case 'b': - bitrate = atoi(optarg); + bitrate = stoi(optarg); break; case 'c': - channels = atoi(optarg); + channels = stoi(optarg); + break; + case 'e': + edi_output_uris.push_back(optarg); + break; + case 'T': + tist_enabled = true; + tist_delay_ms = std::stoi(optarg); break; case 'k': keyfile = optarg; @@ -219,13 +232,13 @@ int main(int argc, char *argv[]) output_uris.push_back(optarg); break; case 'p': - padlen = atoi(optarg); + padlen = stoi(optarg); break; case 'P': pad_fifo = optarg; break; case 'r': - sample_rate = atoi(optarg); + sample_rate = stoi(optarg); break; case 'S': send_stats_to = optarg; @@ -238,16 +251,16 @@ int main(int argc, char *argv[]) avt_output_uri = optarg; break; case 7: - avt_timeout = atoi(optarg); + avt_timeout = stoi(optarg); if (avt_timeout < 0) { avt_timeout = 2000; } break; case 8: - avt_pad_port = atoi(optarg); + avt_pad_port = stoi(optarg); break; case 9: - avt_jitterBufferSize = atoi(optarg); + avt_jitterBufferSize = stoi(optarg); break; case '?': case 'h': @@ -266,33 +279,52 @@ int main(int argc, char *argv[]) return 1; } - zmq::context_t zmq_ctx; - zmq::socket_t zmq_sock(zmq_ctx, ZMQ_PUB); + shared_ptr zmq_output; + Output::EDI edi_output; - if (not output_uris.empty()) { - for (auto uri : output_uris) { - if (keyfile) { - fprintf(stderr, "Enabling encryption\n"); + if (output_uris.empty() and edi_output_uris.empty()) { + fprintf(stderr, "No output URIs defined\n"); + return 1; + } - int rc = readkey(keyfile, secretkey); - if (rc) { - fprintf(stderr, "Error reading secret key\n"); - return 2; - } + for (const auto& uri : output_uris) { + if (not zmq_output) { + zmq_output = make_shared(); + } - const int yes = 1; - zmq_sock.setsockopt(ZMQ_CURVE_SERVER, - &yes, sizeof(yes)); + zmq_output->connect(uri.c_str(), keyfile); + } - zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY, - secretkey, CURVE_KEYLEN); + for (const auto& uri : edi_output_uris) { + if (uri.compare(0, 6, "tcp://") == 0 or + uri.compare(0, 6, "udp://") == 0) { + auto host_port_sep_ix = uri.find(':', 6); + if (host_port_sep_ix != string::npos) { + auto host = uri.substr(6, host_port_sep_ix - 6); + auto port = std::stoi(uri.substr(host_port_sep_ix + 1)); + + auto proto = uri.substr(0, 3); + if (proto == "tcp") { + edi_output.add_tcp_destination(host, port); + } + else if (proto == "udp") { + edi_output.add_udp_destination(host, port); + } + else { + throw logic_error("unhandled proto"); + } } - zmq_sock.connect(uri.c_str()); + else { + fprintf(stderr, "Invalid EDI URL host!\n"); + } + } + else { + fprintf(stderr, "Invalid EDI protocol!\n"); } } - else { - fprintf(stderr, "No output URI defined\n"); - return 1; + + if (not edi_output_uris.empty()) { + edi_output.set_tist(tist_enabled, tist_delay_ms); } if (padlen != 0) { @@ -350,19 +382,15 @@ int main(int argc, char *argv[]) } int outbuf_size; - std::vector zmqframebuf; std::vector outbuf; outbuf_size = bitrate/8*120; outbuf.resize(24*120); - zmqframebuf.resize(ZMQ_HEADER_SIZE + 24*120); if (outbuf_size % 5 != 0) { fprintf(stderr, "Warning: (outbuf_size mod 5) = %d\n", outbuf_size % 5); } - zmq_frame_header_t *zmq_frame_header = (zmq_frame_header_t*)&zmqframebuf[0]; - unsigned char pad_buf[padlen + 1]; fprintf(stderr, "Starting encoding\n"); @@ -449,28 +477,22 @@ int main(int argc, char *argv[]) read_bytes = numOutBytes; if (numOutBytes != 0) { - // ------------ ZeroMQ transmit - try { - zmq_frame_header->encoder = ZMQ_ENCODER_FDK; - zmq_frame_header->version = 1; - zmq_frame_header->datasize = numOutBytes; - zmq_frame_header->audiolevel_left = peak_left; - zmq_frame_header->audiolevel_right = peak_right; - - assert(ZMQ_FRAME_SIZE(zmq_frame_header) <= zmqframebuf.size()); - - memcpy(ZMQ_FRAME_DATA(zmq_frame_header), - &outbuf[0], numOutBytes); - zmq_sock.send(&zmqframebuf[0], ZMQ_FRAME_SIZE(zmq_frame_header), - ZMQ_DONTWAIT); + bool success = false; + if (zmq_output) { + zmq_output->update_audio_levels(peak_left, peak_right); + success = zmq_output->write_frame(outbuf.data(), outbuf.size()); } - catch (zmq::error_t& e) { - fprintf(stderr, "ZeroMQ send error !\n"); - send_error_count ++; + else if (edi_output.enabled()) { + edi_output.update_audio_levels(peak_left, peak_right); + success = edi_output.write_frame(outbuf.data(), outbuf.size()); + } + + if (not success) { + send_error_count++; } if (send_error_count > 10) { - fprintf(stderr, "ZeroMQ send failed ten times, aborting!\n"); + fprintf(stderr, "Send failed ten times, aborting!\n"); retval = 4; break; } @@ -500,7 +522,5 @@ int main(int argc, char *argv[]) fprintf(stderr, "\n"); - zmq_sock.close(); - return retval; } -- cgit v1.2.3