aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/zmq2edi/zmq2edi.cpp141
1 files changed, 68 insertions, 73 deletions
diff --git a/src/zmq2edi/zmq2edi.cpp b/src/zmq2edi/zmq2edi.cpp
index fadd163..8ea1bf0 100644
--- a/src/zmq2edi/zmq2edi.cpp
+++ b/src/zmq2edi/zmq2edi.cpp
@@ -240,19 +240,19 @@ int start(int argc, char **argv)
break;
case 'i':
{
- double interleave_ms = std::stod(optarg);
- if (interleave_ms != 0.0) {
- if (interleave_ms < 0) {
- throw std::runtime_error("EDI output: negative interleave value is invalid.");
+ double interleave_ms = std::stod(optarg);
+ if (interleave_ms != 0.0) {
+ if (interleave_ms < 0) {
+ throw std::runtime_error("EDI output: negative interleave value is invalid.");
+ }
+
+ auto latency_rounded = lround(interleave_ms / 24.0);
+ if (latency_rounded * 24 > 30000) {
+ throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
+ }
+
+ edi_conf.latency_frames = latency_rounded;
}
-
- auto latency_rounded = lround(interleave_ms / 24.0);
- if (latency_rounded * 24 > 30000) {
- throw std::runtime_error("EDI output: interleaving set for more than 30 seconds!");
- }
-
- edi_conf.latency_frames = latency_rounded;
- }
}
break;
case 'D':
@@ -297,92 +297,87 @@ int start(int argc, char **argv)
const char* source_url = argv[optind];
- zmq::context_t zmq_ctx(1);
- etiLog.level(info) << "Entering main loop";
size_t frame_count = 0;
+ size_t error_count = 0;
- while (true) {
- size_t error_count = 0;
+ etiLog.level(info) << "Opening ZMQ input: " << source_url;
- etiLog.level(info) << "Opening ZMQ input: " << source_url;
+ zmq::context_t zmq_ctx(1);
+ zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
+ zmq_sock.connect(source_url);
+ zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+
+ while (error_count < 10) {
+ zmq::message_t incoming;
+ zmq::pollitem_t items[1];
+ items[0].socket = zmq_sock;
+ items[0].events = ZMQ_POLLIN;
+ const long timeout_ms = 300;
+ const int num_events = zmq::poll(items, 1, timeout_ms);
+ if (num_events == 0) { // timeout
+ error_count++;
+ }
+ else {
+ // Event received: recv will not block
+ zmq_sock.recv(&incoming);
- zmq::socket_t zmq_sock(zmq_ctx, ZMQ_SUB);
- zmq_sock.connect(source_url);
- zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0); // subscribe to all messages
+ zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
- while (error_count < 10)
- {
- zmq::message_t incoming;
- zmq::pollitem_t items[1];
- items[0].socket = zmq_sock;
- items[0].events = ZMQ_POLLIN;
- const long timeout_ms = 3000;
- const int num_events = zmq::poll(items, 1, timeout_ms);
- if (num_events == 0) { // timeout
+ if (dab_msg->version != 1) {
+ etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
error_count++;
}
- else {
- // Event received: recv will not block
- zmq_sock.recv(&incoming);
- zmq_dab_message_t* dab_msg = (zmq_dab_message_t*)incoming.data();
+ int offset = sizeof(dab_msg->version) +
+ NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
- if (dab_msg->version != 1) {
- etiLog.level(error) << "ZeroMQ wrong packet version " << dab_msg->version;
+ std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+
+ for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
+ if (dab_msg->buflen[i] <= 0 or dab_msg->buflen[i] > 6144) {
+ etiLog.level(error) << "ZeroMQ buffer " << i <<
+ " has invalid length " << dab_msg->buflen[i];
error_count++;
}
+ else {
+ std::vector<uint8_t> buf(6144, 0x55);
- int offset = sizeof(dab_msg->version) +
- NUM_FRAMES_PER_ZMQ_MESSAGE * sizeof(*dab_msg->buflen);
-
- std::list<std::pair<std::vector<uint8_t>, metadata_t> > all_frames;
+ const int framesize = dab_msg->buflen[i];
- for (int i = 0; i < NUM_FRAMES_PER_ZMQ_MESSAGE; i++) {
- if (dab_msg->buflen[i] <= 0 ||
- dab_msg->buflen[i] > 6144)
- {
- etiLog.level(error) << "ZeroMQ buffer " << i << " has invalid length " <<
- dab_msg->buflen[i];
- error_count++;
- }
- else {
- std::vector<uint8_t> buf(6144, 0x55);
-
- const int framesize = dab_msg->buflen[i];
-
- memcpy(&buf.front(),
- ((uint8_t*)incoming.data()) + offset,
- framesize);
+ memcpy(&buf.front(),
+ ((uint8_t*)incoming.data()) + offset,
+ framesize);
- all_frames.emplace_back(
- std::piecewise_construct,
- std::make_tuple(std::move(buf)),
- std::make_tuple());
+ all_frames.emplace_back(
+ std::piecewise_construct,
+ std::make_tuple(std::move(buf)),
+ std::make_tuple());
- offset += framesize;
- }
+ offset += framesize;
}
+ }
- for (auto &f : all_frames) {
- size_t consumed_bytes = 0;
+ for (auto &f : all_frames) {
+ size_t consumed_bytes = 0;
- f.second = get_md_one_frame(
- static_cast<uint8_t*>(incoming.data()) + offset,
- incoming.size() - offset,
- &consumed_bytes);
+ f.second = get_md_one_frame(
+ static_cast<uint8_t*>(incoming.data()) + offset,
+ incoming.size() - offset,
+ &consumed_bytes);
- offset += consumed_bytes;
- }
+ offset += consumed_bytes;
+ }
- for (auto &f : all_frames) {
- edisender.push_frame(f);
- frame_count++;
- }
+ for (auto &f : all_frames) {
+ edisender.push_frame(f);
+ frame_count++;
}
}
}
+ etiLog.level(info) << "Quitting after " << frame_count << " frames transferred";
+
return 0;
}