diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/Socket.cpp | 70 | ||||
-rw-r--r-- | lib/Socket.h | 28 | ||||
-rw-r--r-- | lib/ThreadsafeQueue.h | 23 | ||||
-rw-r--r-- | lib/edioutput/Transport.cpp | 41 | ||||
-rw-r--r-- | lib/edioutput/Transport.h | 2 |
5 files changed, 135 insertions, 29 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp index bfbef93..159de7e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size) // This suppresses the -Wlogical-op warning #if EAGAIN == EWOULDBLOCK - if (errno == EAGAIN) { + if (errno == EAGAIN) #else - if (errno == EAGAIN or errno == EWOULDBLOCK) { + if (errno == EAGAIN or errno == EWOULDBLOCK) #endif + { return 0; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -733,7 +734,9 @@ TCPConnection::~TCPConnection() m_running = false; vector<uint8_t> termination_marker; queue.push(termination_marker); - m_sender_thread.join(); + if (m_sender_thread.joinable()) { + m_sender_thread.join(); + } } void TCPConnection::process() @@ -905,4 +908,65 @@ void TCPReceiveServer::process() } } +TCPSendClient::TCPSendClient(const std::string& hostname, int port) : + m_hostname(hostname), + m_port(port), + m_running(true) +{ + m_sender_thread = std::thread(&TCPSendClient::process, this); +} + +TCPSendClient::~TCPSendClient() +{ + m_running = false; + m_queue.trigger_wakeup(); + if (m_sender_thread.joinable()) { + m_sender_thread.join(); + } +} + +void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) +{ + if (not m_running) { + throw runtime_error(m_exception_data); + } + + m_queue.push(buffer); +} + +void TCPSendClient::process() +{ + try { + while (m_running) { + if (m_is_connected) { + try { + vector<uint8_t> incoming; + m_queue.wait_and_pop(incoming); + if (m_sock.sendall(incoming.data(), incoming.size()) == -1) { + m_is_connected = false; + m_sock = TCPSocket(); + } + } + catch (const ThreadsafeQueueWakeup&) { + break; + } + } + else { + try { + m_sock.connect(m_hostname, m_port); + m_is_connected = true; + } + catch (const runtime_error& e) { + m_is_connected = false; + this_thread::sleep_for(chrono::seconds(1)); + } + } + } + } + catch (const runtime_error& e) { + m_exception_data = e.what(); + m_running = false; + } +} + } diff --git a/lib/Socket.h b/lib/Socket.h index b9f6317..84def40 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -291,4 +291,32 @@ class TCPReceiveServer { TCPSocket m_listener_socket; }; +/* A TCP client that abstracts the handling of connects and disconnects. + */ +class TCPSendClient { + public: + TCPSendClient(const std::string& hostname, int port); + ~TCPSendClient(); + + /* Throws a runtime_error on error + */ + void sendall(const std::vector<uint8_t>& buffer); + + private: + void process(); + + std::string m_hostname; + int m_port; + + bool m_is_connected = false; + + TCPSocket m_sock; + static constexpr size_t MAX_QUEUE_SIZE = 1024; + ThreadsafeQueue<std::vector<uint8_t> > m_queue; + std::atomic<bool> m_running; + std::string m_exception_data; + std::thread m_sender_thread; + TCPSocket m_listener_socket; +}; + } diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 62f4c96..815dfe0 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -52,12 +52,21 @@ public: /* Push one element into the queue, and notify another thread that * might be waiting. * + * if max_size > 0 and the queue already contains at least max_size elements, + * the element gets discarded. + * * returns the new queue size. */ - size_t push(T const& val) + size_t push(T const& val, size_t max_size = 0) { std::unique_lock<std::mutex> lock(the_mutex); - the_queue.push(val); + size_t queue_size_before = the_queue.size(); + if (max_size == 0) { + the_queue.push(val); + } + else if (queue_size_before < max_size) { + the_queue.push(val); + } size_t queue_size = the_queue.size(); lock.unlock(); @@ -66,10 +75,16 @@ public: return queue_size; } - size_t push(T&& val) + size_t push(T&& val, size_t max_size = 0) { std::unique_lock<std::mutex> lock(the_mutex); - the_queue.emplace(std::move(val)); + size_t queue_size_before = the_queue.size(); + if (max_size == 0) { + the_queue.emplace(std::move(val)); + } + else if (queue_size_before < max_size) { + the_queue.emplace(std::move(val)); + } size_t queue_size = the_queue.size(); lock.unlock(); diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp index 0d5c237..4c91483 100644 --- a/lib/edioutput/Transport.cpp +++ b/lib/edioutput/Transport.cpp @@ -87,9 +87,8 @@ Sender::Sender(const configuration_t& conf) : tcp_dispatchers.emplace(tcp_dest.get(), dispatcher); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) { - auto tcp_socket = make_shared<Socket::TCPSocket>(); - tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port); - tcp_senders.emplace(tcp_dest.get(), tcp_socket); + auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port); + tcp_senders.emplace(tcp_dest.get(), tcp_send_client); } else { throw logic_error("EDI destination not implemented"); @@ -127,8 +126,18 @@ void Sender::write(const TagPacket& tagpacket) edi_fragments = edi_interleaver.Interleave(edi_fragments); } + if (m_conf.verbose) { + fprintf(stderr, "EDI number of PFT fragments %zu\n", + edi_fragments.size()); + } + // Send over ethernet - for (const auto& edi_frag : edi_fragments) { + for (auto& edi_frag : edi_fragments) { + if (m_conf.dump) { + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(edi_frag.begin(), edi_frag.end(), debug_iterator); + } + for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { Socket::InetAddress addr; @@ -140,26 +149,21 @@ void Sender::write(const TagPacket& tagpacket) tcp_dispatchers.at(tcp_dest.get())->write(edi_frag); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size()); + tcp_senders.at(tcp_dest.get())->sendall(edi_frag); } else { throw logic_error("EDI destination not implemented"); } } - - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(edi_frag.begin(), edi_frag.end(), debug_iterator); - } - } - - if (m_conf.verbose) { - fprintf(stderr, "EDI number of PFT fragments %zu\n", - edi_fragments.size()); } } else { // Send over ethernet + if (m_conf.dump) { + ostream_iterator<uint8_t> debug_iterator(edi_debug_file); + copy(af_packet.begin(), af_packet.end(), debug_iterator); + } + for (auto& dest : m_conf.destinations) { if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) { Socket::InetAddress addr; @@ -171,17 +175,12 @@ void Sender::write(const TagPacket& tagpacket) tcp_dispatchers.at(tcp_dest.get())->write(af_packet); } else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) { - tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size()); + tcp_senders.at(tcp_dest.get())->sendall(af_packet); } else { throw logic_error("EDI destination not implemented"); } } - - if (m_conf.dump) { - ostream_iterator<uint8_t> debug_iterator(edi_debug_file); - copy(af_packet.begin(), af_packet.end(), debug_iterator); - } } } diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h index df6fe56..73b2ab6 100644 --- a/lib/edioutput/Transport.h +++ b/lib/edioutput/Transport.h @@ -64,7 +64,7 @@ class Sender { std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets; std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers; - std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders; + std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders; }; } |