diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/RemoteControl.cpp | 18 | ||||
-rw-r--r-- | lib/Socket.cpp | 12 | ||||
-rw-r--r-- | lib/Socket.h | 23 | ||||
-rw-r--r-- | lib/edioutput/AFPacket.cpp (renamed from lib/edi/AFPacket.cpp) | 0 | ||||
-rw-r--r-- | lib/edioutput/AFPacket.h (renamed from lib/edi/AFPacket.h) | 0 | ||||
-rw-r--r-- | lib/edioutput/EDIConfig.h (renamed from lib/edi/EDIConfig.h) | 3 | ||||
-rw-r--r-- | lib/edioutput/Interleaver.cpp (renamed from lib/edi/Interleaver.cpp) | 0 | ||||
-rw-r--r-- | lib/edioutput/Interleaver.h (renamed from lib/edi/Interleaver.h) | 0 | ||||
-rw-r--r-- | lib/edioutput/PFT.cpp (renamed from lib/edi/PFT.cpp) | 1 | ||||
-rw-r--r-- | lib/edioutput/PFT.h (renamed from lib/edi/PFT.h) | 5 | ||||
-rw-r--r-- | lib/edioutput/TagItems.cpp (renamed from lib/edi/TagItems.cpp) | 6 | ||||
-rw-r--r-- | lib/edioutput/TagItems.h (renamed from lib/edi/TagItems.h) | 2 | ||||
-rw-r--r-- | lib/edioutput/TagPacket.cpp (renamed from lib/edi/TagPacket.cpp) | 0 | ||||
-rw-r--r-- | lib/edioutput/TagPacket.h (renamed from lib/edi/TagPacket.h) | 0 | ||||
-rw-r--r-- | lib/edioutput/Transport.cpp (renamed from lib/edi/Transport.cpp) | 6 | ||||
-rw-r--r-- | lib/edioutput/Transport.h (renamed from lib/edi/Transport.h) | 0 |
16 files changed, 44 insertions, 32 deletions
diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp index 4adb90c..9ca8d22 100644 --- a/lib/RemoteControl.cpp +++ b/lib/RemoteControl.cpp @@ -29,6 +29,7 @@ #include <algorithm> #include "RemoteControl.h" +#include "zmq.hpp" using namespace std; @@ -424,7 +425,7 @@ void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::stri bool more = true; do { zmq::message_t msg; - pSocket.recv(&msg); + pSocket.recv(msg); std::string incoming((char*)msg.data(), msg.size()); message.push_back(incoming); more = msg.more(); @@ -436,7 +437,7 @@ void RemoteControllerZmq::send_ok_reply(zmq::socket_t &pSocket) zmq::message_t msg(2); char repCode[2] = {'o', 'k'}; memcpy ((void*) msg.data(), repCode, 2); - pSocket.send(msg, 0); + pSocket.send(msg, zmq::send_flags::none); } void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::string &error) @@ -444,11 +445,11 @@ void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::str zmq::message_t msg1(4); char repCode[4] = {'f', 'a', 'i', 'l'}; memcpy ((void*) msg1.data(), repCode, 4); - pSocket.send(msg1, ZMQ_SNDMORE); + pSocket.send(msg1, zmq::send_flags::sndmore); zmq::message_t msg2(error.length()); memcpy ((void*) msg2.data(), error.c_str(), error.length()); - pSocket.send(msg2, 0); + pSocket.send(msg2, zmq::send_flags::none); } void RemoteControllerZmq::process() @@ -508,8 +509,7 @@ void RemoteControllerZmq::process() zmq::message_t zmsg(ss.str().size()); memcpy ((void*) zmsg.data(), msg_s.data(), msg_s.size()); - int flag = (--cohort_size > 0) ? ZMQ_SNDMORE : 0; - repSocket.send(zmsg, flag); + repSocket.send(zmsg, (--cohort_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none); } } else if (msg.size() == 2 && command == "show") { @@ -523,8 +523,7 @@ void RemoteControllerZmq::process() zmq::message_t zmsg(ss.str().size()); memcpy(zmsg.data(), ss.str().data(), ss.str().size()); - int flag = (--r_size > 0) ? ZMQ_SNDMORE : 0; - repSocket.send(zmsg, flag); + repSocket.send(zmsg, (--r_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none); } } catch (const ParameterError &err) { @@ -539,7 +538,7 @@ void RemoteControllerZmq::process() std::string value = rcs.get_param(module, parameter); zmq::message_t zmsg(value.size()); memcpy ((void*) zmsg.data(), value.data(), value.size()); - repSocket.send(zmsg, 0); + repSocket.send(zmsg, zmq::send_flags::none); } catch (const ParameterError &err) { send_fail_reply(repSocket, err.what()); @@ -576,4 +575,3 @@ void RemoteControllerZmq::process() } #endif - diff --git a/lib/Socket.cpp b/lib/Socket.cpp index d41ed1c..6a20429 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -862,9 +862,9 @@ TCPReceiveServer::~TCPReceiveServer() } } -vector<uint8_t> TCPReceiveServer::receive() +shared_ptr<TCPReceiveMessage> TCPReceiveServer::receive() { - vector<uint8_t> buffer; + shared_ptr<TCPReceiveMessage> buffer = make_shared<TCPReceiveMessageEmpty>(); m_queue.try_pop(buffer); // we can ignore try_pop()'s return value, because @@ -892,11 +892,12 @@ void TCPReceiveServer::process() } else if (r == 0) { sock.close(); + m_queue.push(make_shared<TCPReceiveMessageDisconnected>()); break; } else { buf.resize(r); - m_queue.push(move(buf)); + m_queue.push(make_shared<TCPReceiveMessageData>(move(buf))); } } catch (const TCPSocket::Interrupted&) { @@ -905,6 +906,11 @@ void TCPReceiveServer::process() catch (const TCPSocket::Timeout&) { num_timeouts++; } + catch (const runtime_error& e) { + sock.close(); + // TODO replace fprintf + fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); + } if (num_timeouts > max_num_timeouts) { sock.close(); diff --git a/lib/Socket.h b/lib/Socket.h index 8881be3..2291dd5 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -30,11 +30,12 @@ #include "ThreadsafeQueue.h" #include <cstdlib> -#include <iostream> -#include <vector> #include <atomic> -#include <thread> +#include <iostream> #include <list> +#include <memory> +#include <thread> +#include <vector> #include <sys/socket.h> #include <netinet/in.h> @@ -265,6 +266,14 @@ class TCPDataDispatcher std::list<TCPConnection> m_connections; }; +struct TCPReceiveMessage { virtual ~TCPReceiveMessage() {}; }; +struct TCPReceiveMessageDisconnected : public TCPReceiveMessage { }; +struct TCPReceiveMessageEmpty : public TCPReceiveMessage { }; +struct TCPReceiveMessageData : public TCPReceiveMessage { + TCPReceiveMessageData(std::vector<uint8_t> d) : data(d) {}; + std::vector<uint8_t> data; +}; + /* A TCP Server to receive data, which abstracts the handling of connects and disconnects. */ class TCPReceiveServer { @@ -276,15 +285,15 @@ class TCPReceiveServer { void start(int listen_port, const std::string& address); - // Return a vector that contains up to blocksize bytes of data, or - // and empty vector if no data is available. - std::vector<uint8_t> receive(); + // Return an instance of a subclass of TCPReceiveMessage that contains up to blocksize + // bytes of data, or TCPReceiveMessageEmpty if no data is available. + std::shared_ptr<TCPReceiveMessage> receive(); private: void process(); size_t m_blocksize = 0; - ThreadsafeQueue<std::vector<uint8_t> > m_queue; + ThreadsafeQueue<std::shared_ptr<TCPReceiveMessage> > m_queue; std::atomic<bool> m_running = ATOMIC_VAR_INIT(false); std::string m_exception_data; std::thread m_listener_thread; diff --git a/lib/edi/AFPacket.cpp b/lib/edioutput/AFPacket.cpp index b38c38b..b38c38b 100644 --- a/lib/edi/AFPacket.cpp +++ b/lib/edioutput/AFPacket.cpp diff --git a/lib/edi/AFPacket.h b/lib/edioutput/AFPacket.h index f2c4e35..f2c4e35 100644 --- a/lib/edi/AFPacket.h +++ b/lib/edioutput/AFPacket.h diff --git a/lib/edi/EDIConfig.h b/lib/edioutput/EDIConfig.h index 4f1df97..647d77e 100644 --- a/lib/edi/EDIConfig.h +++ b/lib/edioutput/EDIConfig.h @@ -44,6 +44,7 @@ struct destination_t { // Can represent both unicast and multicast destinations struct udp_destination_t : public destination_t { std::string dest_addr; + unsigned int dest_port = 0; std::string source_addr; unsigned int source_port = 0; unsigned int ttl = 10; @@ -68,10 +69,8 @@ struct configuration_t { bool dump = false; // dump a file with the EDI packets bool verbose = false; bool enable_pft = false; // Enable protection and fragmentation - bool enable_transport_header = true; // Sets Addr, Source and Dest in PFT unsigned int tagpacket_alignment = 0; std::vector<std::shared_ptr<destination_t> > destinations; - unsigned int dest_port = 0; // common destination port, because it's encoded in the transport layer unsigned int latency_frames = 0; // if nonzero, enable interleaver with a latency of latency_frames * 24ms bool enabled() const { return destinations.size() > 0; } diff --git a/lib/edi/Interleaver.cpp b/lib/edioutput/Interleaver.cpp index f26a50e..f26a50e 100644 --- a/lib/edi/Interleaver.cpp +++ b/lib/edioutput/Interleaver.cpp diff --git a/lib/edi/Interleaver.h b/lib/edioutput/Interleaver.h index 3029d5d..3029d5d 100644 --- a/lib/edi/Interleaver.h +++ b/lib/edioutput/Interleaver.h diff --git a/lib/edi/PFT.cpp b/lib/edioutput/PFT.cpp index 1e3d4da..b2f07e0 100644 --- a/lib/edi/PFT.cpp +++ b/lib/edioutput/PFT.cpp @@ -55,7 +55,6 @@ PFT::PFT() { } PFT::PFT(const configuration_t &conf) : m_k(conf.chunk_len), m_m(conf.fec), - m_dest_port(conf.dest_port), m_pseq(0), m_num_chunks(0), m_verbose(conf.verbose) diff --git a/lib/edi/PFT.h b/lib/edioutput/PFT.h index 1019915..4d138c5 100644 --- a/lib/edi/PFT.h +++ b/lib/edioutput/PFT.h @@ -68,13 +68,14 @@ class PFT private: unsigned int m_k = 207; // length of RS data word unsigned int m_m = 3; // number of fragments that can be recovered if lost - unsigned int m_dest_port = 12000; // Destination port for transport header uint16_t m_pseq = 0; size_t m_num_chunks = 0; bool m_verbose = false; - bool m_transport_header = false; + // Transport header is always deactivated + const bool m_transport_header = false; const uint16_t m_addr_source = 0; + const unsigned int m_dest_port = 0; }; } diff --git a/lib/edi/TagItems.cpp b/lib/edioutput/TagItems.cpp index 9746469..739adfa 100644 --- a/lib/edi/TagItems.cpp +++ b/lib/edioutput/TagItems.cpp @@ -212,8 +212,8 @@ std::vector<uint8_t> TagDSTI::Assemble() packet.push_back(0); packet.push_back(0); - uint8_t dfctl = dflc % 250; - uint8_t dfcth = dflc / 250; + uint8_t dfctl = dlfc % 250; + uint8_t dfcth = dlfc / 250; uint16_t dstiHeader = dfctl | (dfcth << 8) | (rfadf << 13) | (atstf << 14) | (stihf << 15); @@ -254,7 +254,7 @@ std::vector<uint8_t> TagDSTI::Assemble() packet[6] = (taglength >> 8) & 0xFF; packet[7] = taglength & 0xFF; - dflc = (dflc+1) % 5000; + dlfc = (dlfc+1) % 5000; /* std::cerr << "TagItem dsti, packet.size " << packet.size() << std::endl; diff --git a/lib/edi/TagItems.h b/lib/edioutput/TagItems.h index 5c81b01..f24dc44 100644 --- a/lib/edi/TagItems.h +++ b/lib/edioutput/TagItems.h @@ -147,7 +147,7 @@ class TagDSTI : public TagItem bool stihf = false; bool atstf = false; // presence of atst data bool rfadf = false; - uint16_t dflc = 0; // modulo 5000 frame counter + uint16_t dlfc = 0; // modulo 5000 frame counter // STI Header (optional) uint8_t stat = 0; diff --git a/lib/edi/TagPacket.cpp b/lib/edioutput/TagPacket.cpp index ec52ad7..ec52ad7 100644 --- a/lib/edi/TagPacket.cpp +++ b/lib/edioutput/TagPacket.cpp diff --git a/lib/edi/TagPacket.h b/lib/edioutput/TagPacket.h index b53b718..b53b718 100644 --- a/lib/edi/TagPacket.h +++ b/lib/edioutput/TagPacket.h diff --git a/lib/edi/Transport.cpp b/lib/edioutput/Transport.cpp index fa7588a..cfed9ec 100644 --- a/lib/edi/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -38,7 +38,7 @@ void configuration_t::print() const etiLog.level(info) << " verbose " << verbose; for (auto edi_dest : destinations) { if (auto udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(edi_dest)) { - etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << dest_port; + etiLog.level(info) << " UDP to " << udp_dest->dest_addr << ":" << udp_dest->dest_port; if (not udp_dest->source_addr.empty()) { etiLog.level(info) << " source " << udp_dest->source_addr; etiLog.level(info) << " ttl " << udp_dest->ttl; @@ -148,7 +148,7 @@ void Sender::write(const TagPacket& tagpacket) for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); udp_sockets.at(udp_dest.get())->send(edi_frag, addr); } @@ -176,7 +176,7 @@ void Sender::write(const TagPacket& tagpacket) for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { Socket::InetAddress addr; - addr.resolveUdpDestination(udp_dest->dest_addr, m_conf.dest_port); + addr.resolveUdpDestination(udp_dest->dest_addr, udp_dest->dest_port); if (af_packet.size() > 1400 and not m_udp_fragmentation_warning_printed) { fprintf(stderr, "EDI Output: AF packet larger than 1400," diff --git a/lib/edi/Transport.h b/lib/edioutput/Transport.h index 56ded3b..56ded3b 100644 --- a/lib/edi/Transport.h +++ b/lib/edioutput/Transport.h |