diff options
| -rw-r--r-- | lib/Socket.cpp | 70 | ||||
| -rw-r--r-- | lib/Socket.h | 28 | ||||
| -rw-r--r-- | lib/ThreadsafeQueue.h | 23 | 
3 files changed, 114 insertions, 7 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp index bfbef93..159de7e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -199,10 +199,11 @@ UDPPacket UDPSocket::receive(size_t max_size)          // This suppresses the -Wlogical-op warning  #if EAGAIN == EWOULDBLOCK -        if (errno == EAGAIN) { +        if (errno == EAGAIN)  #else -        if (errno == EAGAIN or errno == EWOULDBLOCK) { +        if (errno == EAGAIN or errno == EWOULDBLOCK)  #endif +        {              return 0;          }          throw runtime_error(string("Can't receive data: ") + strerror(errno)); @@ -733,7 +734,9 @@ TCPConnection::~TCPConnection()      m_running = false;      vector<uint8_t> termination_marker;      queue.push(termination_marker); -    m_sender_thread.join(); +    if (m_sender_thread.joinable()) { +        m_sender_thread.join(); +    }  }  void TCPConnection::process() @@ -905,4 +908,65 @@ void TCPReceiveServer::process()      }  } +TCPSendClient::TCPSendClient(const std::string& hostname, int port) : +    m_hostname(hostname), +    m_port(port), +    m_running(true) +{ +    m_sender_thread = std::thread(&TCPSendClient::process, this); +} + +TCPSendClient::~TCPSendClient() +{ +    m_running = false; +    m_queue.trigger_wakeup(); +    if (m_sender_thread.joinable()) { +        m_sender_thread.join(); +    } +} + +void TCPSendClient::sendall(const std::vector<uint8_t>& buffer) +{ +    if (not m_running) { +        throw runtime_error(m_exception_data); +    } + +    m_queue.push(buffer); +} + +void TCPSendClient::process() +{ +    try { +        while (m_running) { +            if (m_is_connected) { +                try { +                    vector<uint8_t> incoming; +                    m_queue.wait_and_pop(incoming); +                    if (m_sock.sendall(incoming.data(), incoming.size()) == -1) { +                        m_is_connected = false; +                        m_sock = TCPSocket(); +                    } +                } +                catch (const ThreadsafeQueueWakeup&) { +                    break; +                } +            } +            else { +                try { +                    m_sock.connect(m_hostname, m_port); +                    m_is_connected = true; +                } +                catch (const runtime_error& e) { +                    m_is_connected = false; +                    this_thread::sleep_for(chrono::seconds(1)); +                } +            } +        } +    } +    catch (const runtime_error& e) { +        m_exception_data = e.what(); +        m_running = false; +    } +} +  } diff --git a/lib/Socket.h b/lib/Socket.h index b9f6317..84def40 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -291,4 +291,32 @@ class TCPReceiveServer {          TCPSocket m_listener_socket;  }; +/* A TCP client that abstracts the handling of connects and disconnects. + */ +class TCPSendClient { +    public: +        TCPSendClient(const std::string& hostname, int port); +        ~TCPSendClient(); + +        /* Throws a runtime_error on error +         */ +        void sendall(const std::vector<uint8_t>& buffer); + +    private: +        void process(); + +        std::string m_hostname; +        int m_port; + +        bool m_is_connected = false; + +        TCPSocket m_sock; +        static constexpr size_t MAX_QUEUE_SIZE = 1024; +        ThreadsafeQueue<std::vector<uint8_t> > m_queue; +        std::atomic<bool> m_running; +        std::string m_exception_data; +        std::thread m_sender_thread; +        TCPSocket m_listener_socket; +}; +  } diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h index 62f4c96..815dfe0 100644 --- a/lib/ThreadsafeQueue.h +++ b/lib/ThreadsafeQueue.h @@ -52,12 +52,21 @@ public:      /* Push one element into the queue, and notify another thread that       * might be waiting.       * +     * if max_size > 0 and the queue already contains at least max_size elements, +     * the element gets discarded. +     *       * returns the new queue size.       */ -    size_t push(T const& val) +    size_t push(T const& val, size_t max_size = 0)      {          std::unique_lock<std::mutex> lock(the_mutex); -        the_queue.push(val); +        size_t queue_size_before = the_queue.size(); +        if (max_size == 0) { +            the_queue.push(val); +        } +        else if (queue_size_before < max_size) { +            the_queue.push(val); +        }          size_t queue_size = the_queue.size();          lock.unlock(); @@ -66,10 +75,16 @@ public:          return queue_size;      } -    size_t push(T&& val) +    size_t push(T&& val, size_t max_size = 0)      {          std::unique_lock<std::mutex> lock(the_mutex); -        the_queue.emplace(std::move(val)); +        size_t queue_size_before = the_queue.size(); +        if (max_size == 0) { +            the_queue.emplace(std::move(val)); +        } +        else if (queue_size_before < max_size) { +            the_queue.emplace(std::move(val)); +        }          size_t queue_size = the_queue.size();          lock.unlock();  | 
