summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/Socket.cpp70
-rw-r--r--lib/Socket.h28
-rw-r--r--lib/ThreadsafeQueue.h23
-rw-r--r--lib/edioutput/Transport.cpp41
-rw-r--r--lib/edioutput/Transport.h2
5 files changed, 135 insertions, 29 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index bfbef93..159de7e 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size)
// This suppresses the -Wlogical-op warning
#if EAGAIN == EWOULDBLOCK
- if (errno == EAGAIN) {
+ if (errno == EAGAIN)
#else
- if (errno == EAGAIN or errno == EWOULDBLOCK) {
+ if (errno == EAGAIN or errno == EWOULDBLOCK)
#endif
+ {
return 0;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
@@ -733,7 +734,9 @@ TCPConnection::~TCPConnection()
m_running = false;
vector<uint8_t> termination_marker;
queue.push(termination_marker);
- m_sender_thread.join();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
}
void TCPConnection::process()
@@ -905,4 +908,65 @@ void TCPReceiveServer::process()
}
}
+TCPSendClient::TCPSendClient(const std::string& hostname, int port) :
+ m_hostname(hostname),
+ m_port(port),
+ m_running(true)
+{
+ m_sender_thread = std::thread(&TCPSendClient::process, this);
+}
+
+TCPSendClient::~TCPSendClient()
+{
+ m_running = false;
+ m_queue.trigger_wakeup();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
+}
+
+void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ m_queue.push(buffer);
+}
+
+void TCPSendClient::process()
+{
+ try {
+ while (m_running) {
+ if (m_is_connected) {
+ try {
+ vector<uint8_t> incoming;
+ m_queue.wait_and_pop(incoming);
+ if (m_sock.sendall(incoming.data(), incoming.size()) == -1) {
+ m_is_connected = false;
+ m_sock = TCPSocket();
+ }
+ }
+ catch (const ThreadsafeQueueWakeup&) {
+ break;
+ }
+ }
+ else {
+ try {
+ m_sock.connect(m_hostname, m_port);
+ m_is_connected = true;
+ }
+ catch (const runtime_error& e) {
+ m_is_connected = false;
+ this_thread::sleep_for(chrono::seconds(1));
+ }
+ }
+ }
+ }
+ catch (const runtime_error& e) {
+ m_exception_data = e.what();
+ m_running = false;
+ }
+}
+
}
diff --git a/lib/Socket.h b/lib/Socket.h
index b9f6317..84def40 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -291,4 +291,32 @@ class TCPReceiveServer {
TCPSocket m_listener_socket;
};
+/* A TCP client that abstracts the handling of connects and disconnects.
+ */
+class TCPSendClient {
+ public:
+ TCPSendClient(const std::string& hostname, int port);
+ ~TCPSendClient();
+
+ /* Throws a runtime_error on error
+ */
+ void sendall(const std::vector<uint8_t>& buffer);
+
+ private:
+ void process();
+
+ std::string m_hostname;
+ int m_port;
+
+ bool m_is_connected = false;
+
+ TCPSocket m_sock;
+ static constexpr size_t MAX_QUEUE_SIZE = 1024;
+ ThreadsafeQueue<std::vector<uint8_t> > m_queue;
+ std::atomic<bool> m_running;
+ std::string m_exception_data;
+ std::thread m_sender_thread;
+ TCPSocket m_listener_socket;
+};
+
}
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index 62f4c96..815dfe0 100644
--- a/lib/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -52,12 +52,21 @@ public:
/* Push one element into the queue, and notify another thread that
* might be waiting.
*
+ * if max_size > 0 and the queue already contains at least max_size elements,
+ * the element gets discarded.
+ *
* returns the new queue size.
*/
- size_t push(T const& val)
+ size_t push(T const& val, size_t max_size = 0)
{
std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.push(val);
+ size_t queue_size_before = the_queue.size();
+ if (max_size == 0) {
+ the_queue.push(val);
+ }
+ else if (queue_size_before < max_size) {
+ the_queue.push(val);
+ }
size_t queue_size = the_queue.size();
lock.unlock();
@@ -66,10 +75,16 @@ public:
return queue_size;
}
- size_t push(T&& val)
+ size_t push(T&& val, size_t max_size = 0)
{
std::unique_lock<std::mutex> lock(the_mutex);
- the_queue.emplace(std::move(val));
+ size_t queue_size_before = the_queue.size();
+ if (max_size == 0) {
+ the_queue.emplace(std::move(val));
+ }
+ else if (queue_size_before < max_size) {
+ the_queue.emplace(std::move(val));
+ }
size_t queue_size = the_queue.size();
lock.unlock();
diff --git a/lib/edioutput/Transport.cpp b/lib/edioutput/Transport.cpp
index 0d5c237..4c91483 100644
--- a/lib/edioutput/Transport.cpp
+++ b/lib/edioutput/Transport.cpp
@@ -87,9 +87,8 @@ Sender::Sender(const configuration_t& conf) :
tcp_dispatchers.emplace(tcp_dest.get(), dispatcher);
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(edi_dest)) {
- auto tcp_socket = make_shared<Socket::TCPSocket>();
- tcp_socket->connect(tcp_dest->dest_addr, tcp_dest->dest_port);
- tcp_senders.emplace(tcp_dest.get(), tcp_socket);
+ auto tcp_send_client = make_shared<Socket::TCPSendClient>(tcp_dest->dest_addr, tcp_dest->dest_port);
+ tcp_senders.emplace(tcp_dest.get(), tcp_send_client);
}
else {
throw logic_error("EDI destination not implemented");
@@ -127,8 +126,18 @@ void Sender::write(const TagPacket& tagpacket)
edi_fragments = edi_interleaver.Interleave(edi_fragments);
}
+ if (m_conf.verbose) {
+ fprintf(stderr, "EDI number of PFT fragments %zu\n",
+ edi_fragments.size());
+ }
+
// Send over ethernet
- for (const auto& edi_frag : edi_fragments) {
+ for (auto& edi_frag : edi_fragments) {
+ if (m_conf.dump) {
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
+ }
+
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
Socket::InetAddress addr;
@@ -140,26 +149,21 @@ void Sender::write(const TagPacket& tagpacket)
tcp_dispatchers.at(tcp_dest.get())->write(edi_frag);
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(edi_frag.data(), edi_frag.size());
+ tcp_senders.at(tcp_dest.get())->sendall(edi_frag);
}
else {
throw logic_error("EDI destination not implemented");
}
}
-
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(edi_frag.begin(), edi_frag.end(), debug_iterator);
- }
- }
-
- if (m_conf.verbose) {
- fprintf(stderr, "EDI number of PFT fragments %zu\n",
- edi_fragments.size());
}
}
else {
// Send over ethernet
+ if (m_conf.dump) {
+ ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
+ copy(af_packet.begin(), af_packet.end(), debug_iterator);
+ }
+
for (auto& dest : m_conf.destinations) {
if (const auto& udp_dest = dynamic_pointer_cast<edi::udp_destination_t>(dest)) {
Socket::InetAddress addr;
@@ -171,17 +175,12 @@ void Sender::write(const TagPacket& tagpacket)
tcp_dispatchers.at(tcp_dest.get())->write(af_packet);
}
else if (auto tcp_dest = dynamic_pointer_cast<edi::tcp_client_t>(dest)) {
- tcp_senders.at(tcp_dest.get())->sendall(af_packet.data(), af_packet.size());
+ tcp_senders.at(tcp_dest.get())->sendall(af_packet);
}
else {
throw logic_error("EDI destination not implemented");
}
}
-
- if (m_conf.dump) {
- ostream_iterator<uint8_t> debug_iterator(edi_debug_file);
- copy(af_packet.begin(), af_packet.end(), debug_iterator);
- }
}
}
diff --git a/lib/edioutput/Transport.h b/lib/edioutput/Transport.h
index df6fe56..73b2ab6 100644
--- a/lib/edioutput/Transport.h
+++ b/lib/edioutput/Transport.h
@@ -64,7 +64,7 @@ class Sender {
std::unordered_map<udp_destination_t*, std::shared_ptr<Socket::UDPSocket>> udp_sockets;
std::unordered_map<tcp_server_t*, std::shared_ptr<Socket::TCPDataDispatcher>> tcp_dispatchers;
- std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSocket>> tcp_senders;
+ std::unordered_map<tcp_client_t*, std::shared_ptr<Socket::TCPSendClient>> tcp_senders;
};
}