aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2020-03-17 10:03:18 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2020-03-17 10:03:18 +0100
commit9deef28b415e45f861ba3347908dff0e4ef7036c (patch)
tree8978a4f362dff17b328d27300f9392c637b3ed8f
parent4c3d8c6b3162ddaca3b57ecd8467d99e85eeaa1b (diff)
downloadmmbtools-aux-9deef28b415e45f861ba3347908dff0e4ef7036c.tar.gz
mmbtools-aux-9deef28b415e45f861ba3347908dff0e4ef7036c.tar.bz2
mmbtools-aux-9deef28b415e45f861ba3347908dff0e4ef7036c.zip
Add TIST decoder to zmq-sub
-rw-r--r--zmqtest/zmq-sub/Makefile6
-rw-r--r--zmqtest/zmq-sub/zmq-sub.c181
-rw-r--r--zmqtest/zmq-sub/zmq-sub.cpp422
3 files changed, 425 insertions, 184 deletions
diff --git a/zmqtest/zmq-sub/Makefile b/zmqtest/zmq-sub/Makefile
index 82df891..006b912 100644
--- a/zmqtest/zmq-sub/Makefile
+++ b/zmqtest/zmq-sub/Makefile
@@ -1,10 +1,10 @@
GIT_VERSION = $(shell git describe --long --all | cut -d "-" -f 3)
-CFLAGS = -std=gnu99 -Wall -g $(ZMQ_LIB) -DGIT_VERSION=\"$(GIT_VERSION)\"
+CXXFLAGS = -std=c++11 -Wall -Wno-unused -g $(ZMQ_LIB) -DGIT_VERSION=\"$(GIT_VERSION)\" -fsanitize=address -fsanitize=undefined
LINK = -lzmq
all: zmq-sub
-zmq-sub: zmq-sub.c
- gcc $(CFLAGS) -o zmq-sub zmq-sub.c $(LINK)
+zmq-sub: zmq-sub.cpp
+ g++ $(CXXFLAGS) -o zmq-sub zmq-sub.cpp $(LINK)
diff --git a/zmqtest/zmq-sub/zmq-sub.c b/zmqtest/zmq-sub/zmq-sub.c
deleted file mode 100644
index 2abe4c3..0000000
--- a/zmqtest/zmq-sub/zmq-sub.c
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- Receive ODR-DabMux ZMQ ETI output and write to a file.
-
- The MIT License (MIT)
-
- Copyright (c) 2018 Matthias P. Braendli
-
- Permission is hereby granted, free of charge, to any person obtaining a copy
- of this software and associated documentation files (the "Software"), to deal
- in the Software without restriction, including without limitation the rights
- to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- copies of the Software, and to permit persons to whom the Software is
- furnished to do so, subject to the following conditions:
-
- The above copyright notice and this permission notice shall be included in all
- copies or substantial portions of the Software.
-
- THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- SOFTWARE.
-*/
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-#include <zmq.h>
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-#include <errno.h>
-#include <time.h>
-
-
-#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
-
-struct zmq_dab_message_t
-{
- uint32_t version;
- int16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE];
- uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144];
-};
-
-long timespecdiff_ms(struct timespec time, struct timespec oldTime)
-{
- long tv_sec;
- long tv_nsec;
- if (time.tv_nsec < oldTime.tv_nsec) {
- tv_sec = time.tv_sec - 1 - oldTime.tv_sec;
- tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec;
- }
- else {
- tv_sec = time.tv_sec - oldTime.tv_sec;
- tv_nsec = time.tv_nsec - oldTime.tv_nsec;
- }
-
- return tv_sec * 1000 + tv_nsec / 1000000;
-}
-
-
-void barf()
-{
- fprintf(stderr, "Error: %s\n", zmq_strerror(errno));
- exit(1);
-}
-
-
-void do_subscriber(const char* host, int port)
-{
- int rc;
-
- void* ctx = zmq_ctx_new();
- void* sock = zmq_socket(ctx, ZMQ_SUB);
-
- char endpoint[256];
- snprintf(endpoint, 256, "tcp://%s:%d", host, port);
-
- rc = zmq_connect(sock, endpoint);
- fprintf(stderr, "connect %d\n", rc);
- if (rc) barf();
-
- rc = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, NULL, 0);
- fprintf(stderr, "subscribe %d\n", rc);
- if (rc) barf();
-
-
- const int framelen = NUM_FRAMES_PER_ZMQ_MESSAGE * 6144;
- uint8_t eti[framelen];
-
- struct zmq_dab_message_t message;
-
- struct timespec time_start;
- size_t total_size = 0;
- size_t num_frames = 0;
- long last_sec = 0;
-
- while (1) {
- uint8_t* eti_p = eti;
-
- memset(eti, 0x55, framelen);
- rc = zmq_recv(sock, &message, framelen, 0);
-
- if (rc > 0 && message.version == 1) {
- uint8_t* buf = message.buf;
-
- struct timespec time_now;
- clock_gettime(CLOCK_MONOTONIC, &time_now);
-
- if (num_frames == 0) {
- time_start.tv_nsec = time_now.tv_nsec;
- time_start.tv_sec = time_now.tv_sec;
- last_sec = time_now.tv_sec;
- }
-
- if (time_now.tv_sec > last_sec) {
- last_sec = time_now.tv_sec;
-
- // calculate time_now - time_start in us
- long diff_ms = timespecdiff_ms(time_now, time_start);
-
- fprintf(stderr, "Received %zu bytes, %zu ETI frames in %ld milliseconds : %f bytes/second; %f ms/frame\n",
- total_size, num_frames, diff_ms, 1e3 * total_size/diff_ms,
- (double)diff_ms/num_frames);
- }
-
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- memcpy(eti_p, buf, message.buflen[i]);
- eti_p += 6144;
- buf += message.buflen[i];
-
- total_size += message.buflen[i];
- num_frames++;
- }
-
- write(STDOUT_FILENO, eti, framelen);
- }
- else if (rc < 0) {
- fprintf(stderr, "rc=%d \n", rc);
-
- barf();
- }
- }
-
- zmq_close(sock);
-
- zmq_ctx_destroy(ctx);
-}
-
-void usage(char** argv)
-{
- fprintf(stderr, "usage: %s host port\n", argv[0]);
- fprintf(stderr, "connects to odr-dabmux' ETI output at tcp://host:port using a ZeroMQ sub socket\n");
- fprintf(stderr, "and outputs raw ETI on stdout\n");
- exit(1);
-}
-
-int main(int argc, char** argv)
-{
-#ifdef GIT_VERSION
- fprintf(stderr, "zmq-sub ETI reader version %s\n", GIT_VERSION);
-#else
- fprintf(stderr, "zmq-sub ETI reader version ?\n");
-#endif
-
- if (argc < 3) {
- usage(argv);
- }
-
- char* host = argv[1];
- int port = atoi(argv[2]);
-
- fprintf(stderr, "connecting to tcp://%s:%d\n", host, port);
-
- do_subscriber(host, port);
-
- return 0;
-}
-
diff --git a/zmqtest/zmq-sub/zmq-sub.cpp b/zmqtest/zmq-sub/zmq-sub.cpp
new file mode 100644
index 0000000..68c6ab2
--- /dev/null
+++ b/zmqtest/zmq-sub/zmq-sub.cpp
@@ -0,0 +1,422 @@
+/*
+ Receive ODR-DabMux ZMQ ETI output and write to a file.
+
+ The MIT License (MIT)
+
+ Copyright (c) 2020 Matthias P. Braendli
+
+ Permission is hereby granted, free of charge, to any person obtaining a copy
+ of this software and associated documentation files (the "Software"), to deal
+ in the Software without restriction, including without limitation the rights
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ copies of the Software, and to permit persons to whom the Software is
+ furnished to do so, subject to the following conditions:
+
+ The above copyright notice and this permission notice shall be included in all
+ copies or substantial portions of the Software.
+
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ SOFTWARE.
+*/
+#include <cstdio>
+#include <cstdlib>
+#include <cstring>
+#include <ctime>
+#include <cmath>
+#include <iostream>
+#include <stdexcept>
+#include <vector>
+#include <chrono>
+#include <unistd.h>
+#include <zmq.h>
+#include <sys/types.h>
+#include <arpa/inet.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <unistd.h>
+
+using namespace std;
+
+#define NUM_FRAMES_PER_ZMQ_MESSAGE 4
+
+struct zmq_dab_message_t
+{
+ uint32_t version;
+ int16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE];
+ // Followed by buf
+ // Followed by metadata
+};
+
+long timespecdiff_ms(struct timespec time, struct timespec oldTime)
+{
+ long tv_sec;
+ long tv_nsec;
+ if (time.tv_nsec < oldTime.tv_nsec) {
+ tv_sec = time.tv_sec - 1 - oldTime.tv_sec;
+ tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec;
+ }
+ else {
+ tv_sec = time.tv_sec - oldTime.tv_sec;
+ tv_nsec = time.tv_nsec - oldTime.tv_nsec;
+ }
+
+ return tv_sec * 1000 + tv_nsec / 1000000;
+}
+
+static double get_tist_ms(const uint8_t *p /* ETI frame of length 6144 */, uint16_t dlfc)
+{
+ const int fct = p[4];
+ if (dlfc % 250 != fct) {
+ cerr << "Frame FCT=" << fct << " does not correspond to DLFC=" << dlfc << endl;
+ }
+
+ bool ficf = (p[5] & 0x80) >> 7;
+ const int nst = p[5] & 0x7F;
+ const int mid = (p[6] & 0x18) >> 3;
+
+ int ficl = 0;
+ if (ficf == 0) {
+ throw runtime_error("Not FIC in data stream!");
+ }
+ else if (mid == 3) {
+ ficl = 32;
+ }
+ else {
+ ficl = 24;
+ }
+
+ vector<uint32_t> sad(nst);
+ vector<uint32_t> stl(nst);
+ // Loop over STC subchannels:
+ for (int i=0; i < nst; i++) {
+ // EDI stream index is 1-indexed
+ const int edi_stream_id = i + 1;
+
+ uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2;
+ sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i];
+ uint32_t tpl = (p[10+4*i] & 0xFC) >> 2;
+ stl[i] = (p[10+4*i] & 0x03) * 256 + \
+ p[11+4*i];
+ }
+
+ uint16_t mnsc = 0;
+ std::memcpy(&mnsc, p + 8 + 4*nst, sizeof(uint16_t));
+
+ /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \
+ p[8 + 4*nst + 3]; */
+
+ const uint8_t *fic_data = p + 12 + 4*nst;
+ size_t fic_length = ficl * 4;
+
+ // loop over MSC subchannels
+ int offset = 0;
+ for (int i=0; i < nst; i++) {
+ const uint8_t *mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset);
+ offset += stl[i] * 8;
+ }
+
+ /*
+ const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \
+ p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */
+
+ // TIST
+ const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4;
+ uint32_t tist = (uint32_t)(p[tist_ix]) << 24 |
+ (uint32_t)(p[tist_ix+1]) << 16 |
+ (uint32_t)(p[tist_ix+2]) << 8 |
+ (uint32_t)(p[tist_ix+3]);
+
+ const double pps_offset = (tist & 0xFFFFFF) / 16384.0;
+ return pps_offset;
+}
+
+enum class output_metadata_id_e {
+ // Contains no value, can be used to group fields
+ separation_marker = 0,
+
+ // TAI-UTC offset, value is int16_t.
+ utc_offset = 1,
+
+ /* EDI Time is the number of SI seconds since 2000-01-01 T 00:00:00 UTC.
+ * value is an uint32_t */
+ edi_time = 2,
+
+ /* The DLFC field from the EDI TAG deti. value is uint16_t */
+ dlfc = 3,
+};
+
+// This metadata gets transmitted in the zmq stream
+struct metadata_t {
+ uint32_t edi_time;
+ int16_t utc_offset;
+ uint16_t dlfc;
+};
+
+static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes)
+{
+ size_t remaining = size;
+ if (remaining < 3) {
+ throw std::runtime_error("Insufficient data");
+ }
+
+ metadata_t md;
+ bool utc_offset_received = false;
+ bool edi_time_received = false;
+ bool dlfc_received = false;
+
+ while (remaining) {
+ uint8_t id = buf[0];
+ uint16_t len = (((uint16_t)buf[1]) << 8) + buf[2];
+
+ if (id == static_cast<uint8_t>(output_metadata_id_e::separation_marker)) {
+ if (len != 0) {
+ cerr << "Invalid length " << len << " for metadata: separation_marker" << endl;
+ }
+
+ if (not utc_offset_received or not edi_time_received or not dlfc_received) {
+ throw std::runtime_error("Incomplete metadata received");
+ }
+
+ remaining -= 3;
+ *consumed_bytes = size - remaining;
+ return md;
+ }
+ else if (id == static_cast<uint8_t>(output_metadata_id_e::utc_offset)) {
+ if (len != 2) {
+ cerr << "Invalid length " << len << " for metadata: utc_offset" << endl;
+ }
+ if (remaining < 2) {
+ throw std::runtime_error("Insufficient data for utc_offset");
+ }
+ uint16_t utco;
+ std::memcpy(&utco, buf + 3, sizeof(utco));
+ md.utc_offset = ntohs(utco);
+ utc_offset_received = true;
+ remaining -= 5;
+ buf += 5;
+ }
+ else if (id == static_cast<uint8_t>(output_metadata_id_e::edi_time)) {
+ if (len != 4) {
+ cerr << "Invalid length " << len << " for metadata: edi_time" << endl;
+ }
+ if (remaining < 4) {
+ throw std::runtime_error("Insufficient data for edi_time");
+ }
+ uint32_t edi_time;
+ std::memcpy(&edi_time, buf + 3, sizeof(edi_time));
+ md.edi_time = ntohl(edi_time);
+ edi_time_received = true;
+ remaining -= 7;
+ buf += 7;
+ }
+ else if (id == static_cast<uint8_t>(output_metadata_id_e::dlfc)) {
+ if (len != 2) {
+ cerr << "Invalid length " << len << " for metadata: dlfc" << endl;
+ }
+ if (remaining < 2) {
+ throw std::runtime_error("Insufficient data for dlfc");
+ }
+ uint16_t dlfc;
+ std::memcpy(&dlfc, buf + 3, sizeof(dlfc));
+ md.dlfc = ntohs(dlfc);
+ dlfc_received = true;
+ remaining -= 5;
+ buf += 5;
+ }
+ }
+
+ throw std::runtime_error("Insufficient data");
+}
+
+void barf()
+{
+ fprintf(stderr, "Error: %s\n", zmq_strerror(errno));
+ exit(1);
+}
+
+
+void do_subscriber(const char* host, int port, bool show_tist)
+{
+ ssize_t rc;
+
+ void* ctx = zmq_ctx_new();
+ void* sock = zmq_socket(ctx, ZMQ_SUB);
+
+ char endpoint[256];
+ snprintf(endpoint, 256, "tcp://%s:%d", host, port);
+
+ rc = zmq_connect(sock, endpoint);
+ fprintf(stderr, "connect %zu\n", rc);
+ if (rc) barf();
+
+ rc = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, NULL, 0);
+ fprintf(stderr, "subscribe %zu\n", rc);
+ if (rc) barf();
+
+
+ constexpr size_t ETILEN = NUM_FRAMES_PER_ZMQ_MESSAGE * 6144;
+ uint8_t eti[ETILEN];
+
+ constexpr size_t HEADER_LEN = sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(int16_t);
+ constexpr size_t MAX_MESSAGE_LEN = HEADER_LEN + ETILEN;
+ uint8_t zmq_message[MAX_MESSAGE_LEN];
+
+ struct timespec time_start;
+ size_t total_size = 0;
+ size_t num_frames = 0;
+ long last_sec = 0;
+
+ while (1) {
+ uint8_t *eti_p = eti;
+
+ memset(eti, 0x55, ETILEN);
+ rc = zmq_recv(sock, zmq_message, MAX_MESSAGE_LEN, 0);
+
+ if (rc < 0) {
+ fprintf(stderr, "rc=%zu\n", rc);
+ barf();
+ }
+ else if (rc < (ssize_t)HEADER_LEN) {
+ cerr << "Short packet received!" << endl;
+ return;
+ }
+
+ zmq_dab_message_t message = {};
+ memcpy(&message, zmq_message, sizeof(zmq_dab_message_t));
+
+ if (message.version == 1) {
+ struct timespec time_now;
+ clock_gettime(CLOCK_MONOTONIC, &time_now);
+
+ if (num_frames == 0) {
+ time_start.tv_nsec = time_now.tv_nsec;
+ time_start.tv_sec = time_now.tv_sec;
+ last_sec = time_now.tv_sec;
+ }
+
+ if (time_now.tv_sec > last_sec) {
+ last_sec = time_now.tv_sec;
+
+ // calculate time_now - time_start in us
+ long diff_ms = timespecdiff_ms(time_now, time_start);
+
+ fprintf(stderr, "Received %zu bytes, %zu ETI frames in %ld milliseconds : %f bytes/second; %f ms/frame\n",
+ total_size, num_frames, diff_ms, 1e3 * total_size/diff_ms,
+ (double)diff_ms/num_frames);
+ }
+
+ size_t offset = HEADER_LEN;
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ memcpy(eti_p, zmq_message + offset, message.buflen[i]);
+ eti_p += 6144;
+ offset += message.buflen[i];
+ total_size += message.buflen[i];
+ num_frames++;
+ }
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ size_t consumed_bytes = 0;
+
+ auto md = get_md_one_frame(
+ zmq_message + offset,
+ MAX_MESSAGE_LEN - offset,
+ &consumed_bytes);
+
+ const uint8_t *eti_frame = eti + i * 6144;
+
+ double pps_offset = get_tist_ms(eti_frame, md.dlfc);
+
+ using namespace std::chrono;
+ std::time_t posix_timestamp_1_jan_2000 = 946684800;
+
+ const auto t_frame =
+ system_clock::from_time_t(md.edi_time + posix_timestamp_1_jan_2000 - md.utc_offset) +
+ milliseconds(std::lrint(pps_offset));
+
+ const auto t_now = system_clock::now();
+ const auto delta = t_frame - t_now;
+
+ if (show_tist) {
+ fprintf(stderr, "Metadata: DLFC=%5d UTCO=%3d EDI_TIME=%10d, TIST %3ld ms, t_frame= %ld, Delta=%ld ms\n",
+ md.dlfc, md.utc_offset, md.edi_time, std::lrint(pps_offset),
+ md.edi_time + posix_timestamp_1_jan_2000 - md.utc_offset,
+ duration_cast<milliseconds>(delta).count());
+ }
+
+ offset += consumed_bytes;
+ }
+
+ write(STDOUT_FILENO, eti, ETILEN);
+ }
+ else {
+ cerr << "Wrong ZMQ message version!" << endl;
+ return;
+ }
+ }
+
+ zmq_close(sock);
+
+ zmq_ctx_destroy(ctx);
+}
+
+void usage(char** argv)
+{
+ fprintf(stderr, "usage: %s [-t] host port\n", argv[0]);
+ fprintf(stderr, "connects to odr-dabmux' ETI output at tcp://host:port using a ZeroMQ sub socket\n");
+ fprintf(stderr, "and outputs raw ETI on stdout\n\n");
+ fprintf(stderr, "Options:\n");
+ fprintf(stderr, " -t : enables TIST decoding and time delta calculation\n");
+ exit(1);
+}
+
+int main(int argc, char** argv)
+{
+#ifdef GIT_VERSION
+ fprintf(stderr, "zmq-sub ETI reader version %s\n", GIT_VERSION);
+#else
+ fprintf(stderr, "zmq-sub ETI reader version ?\n");
+#endif
+
+ int flags = 0;
+ bool show_tist = false;
+ int opt;
+ while ((opt = getopt(argc, argv, "t")) != -1) {
+ switch (opt) {
+ case 't':
+ show_tist = true;
+ break;
+ default:
+ fprintf(stderr, "Invalid option\n");
+ usage(argv);
+ }
+ }
+
+ if (optind + 1 >= argc) {
+ fprintf(stderr, "Missing host and port\n");
+ usage(argv);
+ }
+
+ char *host = argv[optind];
+ int port = atoi(argv[optind + 1]);
+
+ fprintf(stderr, "connecting to tcp://%s:%d, show tist=%s\n", host, port, show_tist ? "true" : "false");
+
+ try {
+ do_subscriber(host, port, show_tist);
+ }
+ catch (const std::runtime_error &e) {
+ cerr << "Runtime error: " << e.what() << endl;
+ }
+ catch (const std::logic_error &e) {
+ cerr << "Logic error! " << e.what() << endl;
+ }
+
+ return 0;
+}
+