diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-04-21 14:12:50 +0200 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2020-04-21 14:12:50 +0200 |
commit | b1438d4fa31aa9e967f1f7e2d48f55ca371151d1 (patch) | |
tree | a49610744f323e6405a400a2250dd8f5a596382f /contrib/Socket.cpp | |
parent | 90201eb5ff37340b85348f762effd8124449f27f (diff) | |
download | ODR-AudioEnc-b1438d4fa31aa9e967f1f7e2d48f55ca371151d1.tar.gz ODR-AudioEnc-b1438d4fa31aa9e967f1f7e2d48f55ca371151d1.tar.bz2 ODR-AudioEnc-b1438d4fa31aa9e967f1f7e2d48f55ca371151d1.zip |
Change EDI output to handle disconnects
Diffstat (limited to 'contrib/Socket.cpp')
-rw-r--r-- | contrib/Socket.cpp | 70 |
1 files changed, 67 insertions, 3 deletions
diff --git a/contrib/Socket.cpp b/contrib/Socket.cpp index bfbef93..8b8d0e2 100644 --- a/contrib/Socket.cpp +++ b/contrib/Socket.cpp @@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size) // This suppresses the -Wlogical-op warning #if EAGAIN == EWOULDBLOCK - if (errno == EAGAIN) { + if (errno == EAGAIN) #else - if (errno == EAGAIN or errno == EWOULDBLOCK) { + if (errno == EAGAIN or errno == EWOULDBLOCK) #endif + { return 0; } throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -733,7 +734,9 @@ TCPConnection::~TCPConnection() m_running = false; vector<uint8_t> termination_marker; queue.push(termination_marker); - m_sender_thread.join(); + if (m_sender_thread.joinable()) { + m_sender_thread.join(); + } } void TCPConnection::process() @@ -905,4 +908,65 @@ void TCPReceiveServer::process() } } +TCPSendClient::TCPSendClient(const std::string& hostname, int port) : + m_hostname(hostname), + m_port(port), + m_running(true) +{ + m_sender_thread = std::thread(&TCPSendClient::process, this); +} + +TCPSendClient::~TCPSendClient() +{ + m_running = false; + m_queue.trigger_wakeup(); + if (m_sender_thread.joinable()) { + m_sender_thread.join(); + } +} + +void TCPSendClient::sendall(std::vector<uint8_t>&& buffer) +{ + if (not m_running) { + throw runtime_error(m_exception_data); + } + + m_queue.push(move(buffer)); +} + +void TCPSendClient::process() +{ + try { + while (m_running) { + if (m_is_connected) { + try { + vector<uint8_t> incoming; + m_queue.wait_and_pop(incoming); + if (m_sock.sendall(incoming.data(), incoming.size()) == -1) { + m_is_connected = false; + m_sock = TCPSocket(); + } + } + catch (const ThreadsafeQueueWakeup&) { + break; + } + } + else { + try { + m_sock.connect(m_hostname, m_port); + m_is_connected = true; + } + catch (const runtime_error& e) { + m_is_connected = false; + this_thread::sleep_for(chrono::seconds(1)); + } + } + } + } + catch (const runtime_error& e) { + m_exception_data = e.what(); + m_running = false; + } +} + } |