From 9deef28b415e45f861ba3347908dff0e4ef7036c Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Tue, 17 Mar 2020 10:03:18 +0100 Subject: Add TIST decoder to zmq-sub --- zmqtest/zmq-sub/Makefile | 6 +- zmqtest/zmq-sub/zmq-sub.c | 181 ------------------- zmqtest/zmq-sub/zmq-sub.cpp | 422 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 425 insertions(+), 184 deletions(-) delete mode 100644 zmqtest/zmq-sub/zmq-sub.c create mode 100644 zmqtest/zmq-sub/zmq-sub.cpp diff --git a/zmqtest/zmq-sub/Makefile b/zmqtest/zmq-sub/Makefile index 82df891..006b912 100644 --- a/zmqtest/zmq-sub/Makefile +++ b/zmqtest/zmq-sub/Makefile @@ -1,10 +1,10 @@ GIT_VERSION = $(shell git describe --long --all | cut -d "-" -f 3) -CFLAGS = -std=gnu99 -Wall -g $(ZMQ_LIB) -DGIT_VERSION=\"$(GIT_VERSION)\" +CXXFLAGS = -std=c++11 -Wall -Wno-unused -g $(ZMQ_LIB) -DGIT_VERSION=\"$(GIT_VERSION)\" -fsanitize=address -fsanitize=undefined LINK = -lzmq all: zmq-sub -zmq-sub: zmq-sub.c - gcc $(CFLAGS) -o zmq-sub zmq-sub.c $(LINK) +zmq-sub: zmq-sub.cpp + g++ $(CXXFLAGS) -o zmq-sub zmq-sub.cpp $(LINK) diff --git a/zmqtest/zmq-sub/zmq-sub.c b/zmqtest/zmq-sub/zmq-sub.c deleted file mode 100644 index 2abe4c3..0000000 --- a/zmqtest/zmq-sub/zmq-sub.c +++ /dev/null @@ -1,181 +0,0 @@ -/* - Receive ODR-DabMux ZMQ ETI output and write to a file. - - The MIT License (MIT) - - Copyright (c) 2018 Matthias P. Braendli - - Permission is hereby granted, free of charge, to any person obtaining a copy - of this software and associated documentation files (the "Software"), to deal - in the Software without restriction, including without limitation the rights - to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - copies of the Software, and to permit persons to whom the Software is - furnished to do so, subject to the following conditions: - - The above copyright notice and this permission notice shall be included in all - copies or substantial portions of the Software. - - THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - SOFTWARE. -*/ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - - -#define NUM_FRAMES_PER_ZMQ_MESSAGE 4 - -struct zmq_dab_message_t -{ - uint32_t version; - int16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; - uint8_t buf[NUM_FRAMES_PER_ZMQ_MESSAGE*6144]; -}; - -long timespecdiff_ms(struct timespec time, struct timespec oldTime) -{ - long tv_sec; - long tv_nsec; - if (time.tv_nsec < oldTime.tv_nsec) { - tv_sec = time.tv_sec - 1 - oldTime.tv_sec; - tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec; - } - else { - tv_sec = time.tv_sec - oldTime.tv_sec; - tv_nsec = time.tv_nsec - oldTime.tv_nsec; - } - - return tv_sec * 1000 + tv_nsec / 1000000; -} - - -void barf() -{ - fprintf(stderr, "Error: %s\n", zmq_strerror(errno)); - exit(1); -} - - -void do_subscriber(const char* host, int port) -{ - int rc; - - void* ctx = zmq_ctx_new(); - void* sock = zmq_socket(ctx, ZMQ_SUB); - - char endpoint[256]; - snprintf(endpoint, 256, "tcp://%s:%d", host, port); - - rc = zmq_connect(sock, endpoint); - fprintf(stderr, "connect %d\n", rc); - if (rc) barf(); - - rc = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, NULL, 0); - fprintf(stderr, "subscribe %d\n", rc); - if (rc) barf(); - - - const int framelen = NUM_FRAMES_PER_ZMQ_MESSAGE * 6144; - uint8_t eti[framelen]; - - struct zmq_dab_message_t message; - - struct timespec time_start; - size_t total_size = 0; - size_t num_frames = 0; - long last_sec = 0; - - while (1) { - uint8_t* eti_p = eti; - - memset(eti, 0x55, framelen); - rc = zmq_recv(sock, &message, framelen, 0); - - if (rc > 0 && message.version == 1) { - uint8_t* buf = message.buf; - - struct timespec time_now; - clock_gettime(CLOCK_MONOTONIC, &time_now); - - if (num_frames == 0) { - time_start.tv_nsec = time_now.tv_nsec; - time_start.tv_sec = time_now.tv_sec; - last_sec = time_now.tv_sec; - } - - if (time_now.tv_sec > last_sec) { - last_sec = time_now.tv_sec; - - // calculate time_now - time_start in us - long diff_ms = timespecdiff_ms(time_now, time_start); - - fprintf(stderr, "Received %zu bytes, %zu ETI frames in %ld milliseconds : %f bytes/second; %f ms/frame\n", - total_size, num_frames, diff_ms, 1e3 * total_size/diff_ms, - (double)diff_ms/num_frames); - } - - for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { - memcpy(eti_p, buf, message.buflen[i]); - eti_p += 6144; - buf += message.buflen[i]; - - total_size += message.buflen[i]; - num_frames++; - } - - write(STDOUT_FILENO, eti, framelen); - } - else if (rc < 0) { - fprintf(stderr, "rc=%d \n", rc); - - barf(); - } - } - - zmq_close(sock); - - zmq_ctx_destroy(ctx); -} - -void usage(char** argv) -{ - fprintf(stderr, "usage: %s host port\n", argv[0]); - fprintf(stderr, "connects to odr-dabmux' ETI output at tcp://host:port using a ZeroMQ sub socket\n"); - fprintf(stderr, "and outputs raw ETI on stdout\n"); - exit(1); -} - -int main(int argc, char** argv) -{ -#ifdef GIT_VERSION - fprintf(stderr, "zmq-sub ETI reader version %s\n", GIT_VERSION); -#else - fprintf(stderr, "zmq-sub ETI reader version ?\n"); -#endif - - if (argc < 3) { - usage(argv); - } - - char* host = argv[1]; - int port = atoi(argv[2]); - - fprintf(stderr, "connecting to tcp://%s:%d\n", host, port); - - do_subscriber(host, port); - - return 0; -} - diff --git a/zmqtest/zmq-sub/zmq-sub.cpp b/zmqtest/zmq-sub/zmq-sub.cpp new file mode 100644 index 0000000..68c6ab2 --- /dev/null +++ b/zmqtest/zmq-sub/zmq-sub.cpp @@ -0,0 +1,422 @@ +/* + Receive ODR-DabMux ZMQ ETI output and write to a file. + + The MIT License (MIT) + + Copyright (c) 2020 Matthias P. Braendli + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in all + copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + SOFTWARE. +*/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +#define NUM_FRAMES_PER_ZMQ_MESSAGE 4 + +struct zmq_dab_message_t +{ + uint32_t version; + int16_t buflen[NUM_FRAMES_PER_ZMQ_MESSAGE]; + // Followed by buf + // Followed by metadata +}; + +long timespecdiff_ms(struct timespec time, struct timespec oldTime) +{ + long tv_sec; + long tv_nsec; + if (time.tv_nsec < oldTime.tv_nsec) { + tv_sec = time.tv_sec - 1 - oldTime.tv_sec; + tv_nsec = 1000000000L + time.tv_nsec - oldTime.tv_nsec; + } + else { + tv_sec = time.tv_sec - oldTime.tv_sec; + tv_nsec = time.tv_nsec - oldTime.tv_nsec; + } + + return tv_sec * 1000 + tv_nsec / 1000000; +} + +static double get_tist_ms(const uint8_t *p /* ETI frame of length 6144 */, uint16_t dlfc) +{ + const int fct = p[4]; + if (dlfc % 250 != fct) { + cerr << "Frame FCT=" << fct << " does not correspond to DLFC=" << dlfc << endl; + } + + bool ficf = (p[5] & 0x80) >> 7; + const int nst = p[5] & 0x7F; + const int mid = (p[6] & 0x18) >> 3; + + int ficl = 0; + if (ficf == 0) { + throw runtime_error("Not FIC in data stream!"); + } + else if (mid == 3) { + ficl = 32; + } + else { + ficl = 24; + } + + vector sad(nst); + vector stl(nst); + // Loop over STC subchannels: + for (int i=0; i < nst; i++) { + // EDI stream index is 1-indexed + const int edi_stream_id = i + 1; + + uint32_t scid = (p[8 + 4*i] & 0xFC) >> 2; + sad[i] = (p[8+4*i] & 0x03) * 256 + p[9+4*i]; + uint32_t tpl = (p[10+4*i] & 0xFC) >> 2; + stl[i] = (p[10+4*i] & 0x03) * 256 + \ + p[11+4*i]; + } + + uint16_t mnsc = 0; + std::memcpy(&mnsc, p + 8 + 4*nst, sizeof(uint16_t)); + + /*const uint16_t crc1 = p[8 + 4*nst + 2]*256 + \ + p[8 + 4*nst + 3]; */ + + const uint8_t *fic_data = p + 12 + 4*nst; + size_t fic_length = ficl * 4; + + // loop over MSC subchannels + int offset = 0; + for (int i=0; i < nst; i++) { + const uint8_t *mst_data = (p + 12 + 4*nst + ficf*ficl*4 + offset); + offset += stl[i] * 8; + } + + /* + const uint16_t crc2 = p[12 + 4*nst + ficf*ficl*4 + offset] * 256 + \ + p[12 + 4*nst + ficf*ficl*4 + offset + 1]; */ + + // TIST + const size_t tist_ix = 12 + 4*nst + ficf*ficl*4 + offset + 4; + uint32_t tist = (uint32_t)(p[tist_ix]) << 24 | + (uint32_t)(p[tist_ix+1]) << 16 | + (uint32_t)(p[tist_ix+2]) << 8 | + (uint32_t)(p[tist_ix+3]); + + const double pps_offset = (tist & 0xFFFFFF) / 16384.0; + return pps_offset; +} + +enum class output_metadata_id_e { + // Contains no value, can be used to group fields + separation_marker = 0, + + // TAI-UTC offset, value is int16_t. + utc_offset = 1, + + /* EDI Time is the number of SI seconds since 2000-01-01 T 00:00:00 UTC. + * value is an uint32_t */ + edi_time = 2, + + /* The DLFC field from the EDI TAG deti. value is uint16_t */ + dlfc = 3, +}; + +// This metadata gets transmitted in the zmq stream +struct metadata_t { + uint32_t edi_time; + int16_t utc_offset; + uint16_t dlfc; +}; + +static metadata_t get_md_one_frame(uint8_t *buf, size_t size, size_t *consumed_bytes) +{ + size_t remaining = size; + if (remaining < 3) { + throw std::runtime_error("Insufficient data"); + } + + metadata_t md; + bool utc_offset_received = false; + bool edi_time_received = false; + bool dlfc_received = false; + + while (remaining) { + uint8_t id = buf[0]; + uint16_t len = (((uint16_t)buf[1]) << 8) + buf[2]; + + if (id == static_cast(output_metadata_id_e::separation_marker)) { + if (len != 0) { + cerr << "Invalid length " << len << " for metadata: separation_marker" << endl; + } + + if (not utc_offset_received or not edi_time_received or not dlfc_received) { + throw std::runtime_error("Incomplete metadata received"); + } + + remaining -= 3; + *consumed_bytes = size - remaining; + return md; + } + else if (id == static_cast(output_metadata_id_e::utc_offset)) { + if (len != 2) { + cerr << "Invalid length " << len << " for metadata: utc_offset" << endl; + } + if (remaining < 2) { + throw std::runtime_error("Insufficient data for utc_offset"); + } + uint16_t utco; + std::memcpy(&utco, buf + 3, sizeof(utco)); + md.utc_offset = ntohs(utco); + utc_offset_received = true; + remaining -= 5; + buf += 5; + } + else if (id == static_cast(output_metadata_id_e::edi_time)) { + if (len != 4) { + cerr << "Invalid length " << len << " for metadata: edi_time" << endl; + } + if (remaining < 4) { + throw std::runtime_error("Insufficient data for edi_time"); + } + uint32_t edi_time; + std::memcpy(&edi_time, buf + 3, sizeof(edi_time)); + md.edi_time = ntohl(edi_time); + edi_time_received = true; + remaining -= 7; + buf += 7; + } + else if (id == static_cast(output_metadata_id_e::dlfc)) { + if (len != 2) { + cerr << "Invalid length " << len << " for metadata: dlfc" << endl; + } + if (remaining < 2) { + throw std::runtime_error("Insufficient data for dlfc"); + } + uint16_t dlfc; + std::memcpy(&dlfc, buf + 3, sizeof(dlfc)); + md.dlfc = ntohs(dlfc); + dlfc_received = true; + remaining -= 5; + buf += 5; + } + } + + throw std::runtime_error("Insufficient data"); +} + +void barf() +{ + fprintf(stderr, "Error: %s\n", zmq_strerror(errno)); + exit(1); +} + + +void do_subscriber(const char* host, int port, bool show_tist) +{ + ssize_t rc; + + void* ctx = zmq_ctx_new(); + void* sock = zmq_socket(ctx, ZMQ_SUB); + + char endpoint[256]; + snprintf(endpoint, 256, "tcp://%s:%d", host, port); + + rc = zmq_connect(sock, endpoint); + fprintf(stderr, "connect %zu\n", rc); + if (rc) barf(); + + rc = zmq_setsockopt(sock, ZMQ_SUBSCRIBE, NULL, 0); + fprintf(stderr, "subscribe %zu\n", rc); + if (rc) barf(); + + + constexpr size_t ETILEN = NUM_FRAMES_PER_ZMQ_MESSAGE * 6144; + uint8_t eti[ETILEN]; + + constexpr size_t HEADER_LEN = sizeof(uint32_t) + NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(int16_t); + constexpr size_t MAX_MESSAGE_LEN = HEADER_LEN + ETILEN; + uint8_t zmq_message[MAX_MESSAGE_LEN]; + + struct timespec time_start; + size_t total_size = 0; + size_t num_frames = 0; + long last_sec = 0; + + while (1) { + uint8_t *eti_p = eti; + + memset(eti, 0x55, ETILEN); + rc = zmq_recv(sock, zmq_message, MAX_MESSAGE_LEN, 0); + + if (rc < 0) { + fprintf(stderr, "rc=%zu\n", rc); + barf(); + } + else if (rc < (ssize_t)HEADER_LEN) { + cerr << "Short packet received!" << endl; + return; + } + + zmq_dab_message_t message = {}; + memcpy(&message, zmq_message, sizeof(zmq_dab_message_t)); + + if (message.version == 1) { + struct timespec time_now; + clock_gettime(CLOCK_MONOTONIC, &time_now); + + if (num_frames == 0) { + time_start.tv_nsec = time_now.tv_nsec; + time_start.tv_sec = time_now.tv_sec; + last_sec = time_now.tv_sec; + } + + if (time_now.tv_sec > last_sec) { + last_sec = time_now.tv_sec; + + // calculate time_now - time_start in us + long diff_ms = timespecdiff_ms(time_now, time_start); + + fprintf(stderr, "Received %zu bytes, %zu ETI frames in %ld milliseconds : %f bytes/second; %f ms/frame\n", + total_size, num_frames, diff_ms, 1e3 * total_size/diff_ms, + (double)diff_ms/num_frames); + } + + size_t offset = HEADER_LEN; + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + memcpy(eti_p, zmq_message + offset, message.buflen[i]); + eti_p += 6144; + offset += message.buflen[i]; + total_size += message.buflen[i]; + num_frames++; + } + + for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) { + size_t consumed_bytes = 0; + + auto md = get_md_one_frame( + zmq_message + offset, + MAX_MESSAGE_LEN - offset, + &consumed_bytes); + + const uint8_t *eti_frame = eti + i * 6144; + + double pps_offset = get_tist_ms(eti_frame, md.dlfc); + + using namespace std::chrono; + std::time_t posix_timestamp_1_jan_2000 = 946684800; + + const auto t_frame = + system_clock::from_time_t(md.edi_time + posix_timestamp_1_jan_2000 - md.utc_offset) + + milliseconds(std::lrint(pps_offset)); + + const auto t_now = system_clock::now(); + const auto delta = t_frame - t_now; + + if (show_tist) { + fprintf(stderr, "Metadata: DLFC=%5d UTCO=%3d EDI_TIME=%10d, TIST %3ld ms, t_frame= %ld, Delta=%ld ms\n", + md.dlfc, md.utc_offset, md.edi_time, std::lrint(pps_offset), + md.edi_time + posix_timestamp_1_jan_2000 - md.utc_offset, + duration_cast(delta).count()); + } + + offset += consumed_bytes; + } + + write(STDOUT_FILENO, eti, ETILEN); + } + else { + cerr << "Wrong ZMQ message version!" << endl; + return; + } + } + + zmq_close(sock); + + zmq_ctx_destroy(ctx); +} + +void usage(char** argv) +{ + fprintf(stderr, "usage: %s [-t] host port\n", argv[0]); + fprintf(stderr, "connects to odr-dabmux' ETI output at tcp://host:port using a ZeroMQ sub socket\n"); + fprintf(stderr, "and outputs raw ETI on stdout\n\n"); + fprintf(stderr, "Options:\n"); + fprintf(stderr, " -t : enables TIST decoding and time delta calculation\n"); + exit(1); +} + +int main(int argc, char** argv) +{ +#ifdef GIT_VERSION + fprintf(stderr, "zmq-sub ETI reader version %s\n", GIT_VERSION); +#else + fprintf(stderr, "zmq-sub ETI reader version ?\n"); +#endif + + int flags = 0; + bool show_tist = false; + int opt; + while ((opt = getopt(argc, argv, "t")) != -1) { + switch (opt) { + case 't': + show_tist = true; + break; + default: + fprintf(stderr, "Invalid option\n"); + usage(argv); + } + } + + if (optind + 1 >= argc) { + fprintf(stderr, "Missing host and port\n"); + usage(argv); + } + + char *host = argv[optind]; + int port = atoi(argv[optind + 1]); + + fprintf(stderr, "connecting to tcp://%s:%d, show tist=%s\n", host, port, show_tist ? "true" : "false"); + + try { + do_subscriber(host, port, show_tist); + } + catch (const std::runtime_error &e) { + cerr << "Runtime error: " << e.what() << endl; + } + catch (const std::logic_error &e) { + cerr << "Logic error! " << e.what() << endl; + } + + return 0; +} + -- cgit v1.2.3