diff options
author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-01-13 11:53:15 +0100 |
---|---|---|
committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2017-01-13 11:53:15 +0100 |
commit | ea5594186bafa5489d6086a26d71b8f3d1ccf9cd (patch) | |
tree | a307b0882a867b415c68cd7d644241abe0c971e1 /lib/UdpSocket.cpp | |
parent | f908d28e72887b68391a246ceb328cb52dcb2aaa (diff) | |
download | dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.gz dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.bz2 dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.zip |
Add threaded UDP input for EDI
Diffstat (limited to 'lib/UdpSocket.cpp')
-rw-r--r-- | lib/UdpSocket.cpp | 62 |
1 files changed, 60 insertions, 2 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp index 981d713..b88c731 100644 --- a/lib/UdpSocket.cpp +++ b/lib/UdpSocket.cpp @@ -167,8 +167,7 @@ int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination) /** * Must be called to receive data on a multicast address. - * @param groupname The multica -st address to join. + * @param groupname The multicast address to join. * @return 0 if ok, -1 if error */ int UdpSocket::joinGroup(char* groupname) @@ -254,3 +253,62 @@ InetAddress UdpPacket::getAddress() return address; } +UdpReceiver::~UdpReceiver() { + m_stop = true; + m_sock.close(); + if (m_thread.joinable()) { + m_thread.join(); + } +} + +void UdpReceiver::start(int port) { + m_port = port; + m_thread = std::thread(&UdpReceiver::m_run, this); +} + +std::vector<uint8_t> UdpReceiver::get_packet_buffer() +{ + if (m_stop) { + throw runtime_error("UDP Receiver not running"); + } + + UdpPacket p; + m_packets.wait_and_pop(p); + + return p.getBuffer(); +} + +void UdpReceiver::m_run() +{ + // Ensure that stop is set to true in case of exception or return + struct SetStopOnDestruct { + SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {} + ~SetStopOnDestruct() { m_stop = true; } + private: atomic<bool>& m_stop; + } autoSetStop(m_stop); + + m_sock.reinit(m_port, "0.0.0.0"); + + const size_t packsize = 8192; + UdpPacket packet(packsize); + + while (not m_stop) { + int ret = m_sock.receive(packet); + if (ret == 0) { + if (packet.getSize() == packsize) { + // TODO replace fprintf + fprintf(stderr, "Warning, possible UDP truncation\n"); + } + m_packets.push(packet); + } + else + { + if (inetErrNo != EINTR) { + // TODO replace fprintf + fprintf(stderr, "Socket error: %s\n", inetErrMsg); + } + m_stop = true; + } + } +} + |