summaryrefslogtreecommitdiffstats
path: root/contrib/Socket.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/Socket.cpp')
-rw-r--r--contrib/Socket.cpp70
1 files changed, 67 insertions, 3 deletions
diff --git a/contrib/Socket.cpp b/contrib/Socket.cpp
index bfbef93..8b8d0e2 100644
--- a/contrib/Socket.cpp
+++ b/contrib/Socket.cpp
@@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size)
// This suppresses the -Wlogical-op warning
#if EAGAIN == EWOULDBLOCK
- if (errno == EAGAIN) {
+ if (errno == EAGAIN)
#else
- if (errno == EAGAIN or errno == EWOULDBLOCK) {
+ if (errno == EAGAIN or errno == EWOULDBLOCK)
#endif
+ {
return 0;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
@@ -733,7 +734,9 @@ TCPConnection::~TCPConnection()
m_running = false;
vector<uint8_t> termination_marker;
queue.push(termination_marker);
- m_sender_thread.join();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
}
void TCPConnection::process()
@@ -905,4 +908,65 @@ void TCPReceiveServer::process()
}
}
+TCPSendClient::TCPSendClient(const std::string& hostname, int port) :
+ m_hostname(hostname),
+ m_port(port),
+ m_running(true)
+{
+ m_sender_thread = std::thread(&TCPSendClient::process, this);
+}
+
+TCPSendClient::~TCPSendClient()
+{
+ m_running = false;
+ m_queue.trigger_wakeup();
+ if (m_sender_thread.joinable()) {
+ m_sender_thread.join();
+ }
+}
+
+void TCPSendClient::sendall(std::vector<uint8_t>&& buffer)
+{
+ if (not m_running) {
+ throw runtime_error(m_exception_data);
+ }
+
+ m_queue.push(move(buffer));
+}
+
+void TCPSendClient::process()
+{
+ try {
+ while (m_running) {
+ if (m_is_connected) {
+ try {
+ vector<uint8_t> incoming;
+ m_queue.wait_and_pop(incoming);
+ if (m_sock.sendall(incoming.data(), incoming.size()) == -1) {
+ m_is_connected = false;
+ m_sock = TCPSocket();
+ }
+ }
+ catch (const ThreadsafeQueueWakeup&) {
+ break;
+ }
+ }
+ else {
+ try {
+ m_sock.connect(m_hostname, m_port);
+ m_is_connected = true;
+ }
+ catch (const runtime_error& e) {
+ m_is_connected = false;
+ this_thread::sleep_for(chrono::seconds(1));
+ }
+ }
+ }
+ }
+ catch (const runtime_error& e) {
+ m_exception_data = e.what();
+ m_running = false;
+ }
+}
+
}