diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-01-13 11:53:15 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-01-13 11:53:15 +0100 |
commit | ea5594186bafa5489d6086a26d71b8f3d1ccf9cd (patch) | |
tree | a307b0882a867b415c68cd7d644241abe0c971e1 /lib | |
parent | f908d28e72887b68391a246ceb328cb52dcb2aaa (diff) | |
download | dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.gz dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.bz2 dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.zip |
Add threaded UDP input for EDI
Diffstat (limited to 'lib')
-rw-r--r-- | lib/UdpSocket.cpp | 62 | ||||
-rw-r--r-- | lib/UdpSocket.h | 29 |
2 files changed, 89 insertions, 2 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp index 981d713..b88c731 100644 --- a/lib/UdpSocket.cpp +++ b/lib/UdpSocket.cpp @@ -167,8 +167,7 @@ int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) /** * Must be called to receive data on a multicast address. - * @param groupname The multica -st address to join. + * @param groupname The multicast address to join. * @return 0 if ok, -1 if error */ int UdpSocket::joinGroup(char* groupname) @@ -254,3 +253,62 @@ InetAddress UdpPacket::getAddress() return address; } +UdpReceiver::~UdpReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UdpReceiver::start(int port) { + m_port = port; + m_thread = std::thread(&UdpReceiver::m_run, this); +} + +std::vector<uint8_t> UdpReceiver::get_packet_buffer() +{ + if (m_stop) { + throw runtime_error("UDP Receiver not running"); + } + + UdpPacket p; + m_packets.wait_and_pop(p); + + return p.getBuffer(); +} + +void UdpReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic<bool>& m_stop; + } autoSetStop(m_stop); + + m_sock.reinit(m_port, "0.0.0.0"); + + const size_t packsize = 8192; + UdpPacket packet(packsize); + + while (not m_stop) { + int ret = m_sock.receive(packet); + if (ret == 0) { + if (packet.getSize() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + m_packets.push(packet); + } + else + { + if (inetErrNo != EINTR) { + // TODO replace fprintf + fprintf(stderr, "Socket error: %s\n", inetErrMsg); + } + m_stop = true; + } + } +} + diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h index f51e87c..81a7d2b 100644 --- a/lib/UdpSocket.h +++ b/lib/UdpSocket.h @@ -31,6 +31,7 @@ #endif #include "InetAddress.h" +#include "ThreadsafeQueue.h" #include <sys/socket.h> #include <netinet/in.h> #include <unistd.h> @@ -45,6 +46,8 @@ #include <stdlib.h> #include <iostream> #include <vector> +#include <thread> +#include <atomic> class UdpPacket; @@ -172,3 +175,29 @@ class UdpPacket InetAddress address; }; +/* Threaded UDP receiver */ +class UdpReceiver { + public: + UdpReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {} + ~UdpReceiver(); + UdpReceiver(const UdpReceiver&) = delete; + UdpReceiver operator=(const UdpReceiver&) = delete; + + // Start the receiver in a separate thread + void start(int port); + + // Get the data contained in a UDP packet, blocks if none available + // In case of error, throws a runtime_error + std::vector<uint8_t> get_packet_buffer(void); + + private: + void m_run(void); + + int m_port; + std::thread m_thread; + std::atomic<bool> m_stop; + ThreadsafeQueue<UdpPacket> m_packets; + UdpSocket m_sock; +}; + + |