aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 11:53:15 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 11:53:15 +0100
commitea5594186bafa5489d6086a26d71b8f3d1ccf9cd (patch)
treea307b0882a867b415c68cd7d644241abe0c971e1 /lib
parentf908d28e72887b68391a246ceb328cb52dcb2aaa (diff)
downloaddabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.gz
dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.bz2
dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.zip
Add threaded UDP input for EDI
Diffstat (limited to 'lib')
-rw-r--r--lib/UdpSocket.cpp62
-rw-r--r--lib/UdpSocket.h29
2 files changed, 89 insertions, 2 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp
index 981d713..b88c731 100644
--- a/lib/UdpSocket.cpp
+++ b/lib/UdpSocket.cpp
@@ -167,8 +167,7 @@ int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
/**
* Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
+ * @param groupname The multicast address to join.
* @return 0 if ok, -1 if error
*/
int UdpSocket::joinGroup(char* groupname)
@@ -254,3 +253,62 @@ InetAddress UdpPacket::getAddress()
return address;
}
+UdpReceiver::~UdpReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UdpReceiver::start(int port) {
+ m_port = port;
+ m_thread = std::thread(&UdpReceiver::m_run, this);
+}
+
+std::vector<uint8_t> UdpReceiver::get_packet_buffer()
+{
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
+ }
+
+ UdpPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.getBuffer();
+}
+
+void UdpReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ m_sock.reinit(m_port, "0.0.0.0");
+
+ const size_t packsize = 8192;
+ UdpPacket packet(packsize);
+
+ while (not m_stop) {
+ int ret = m_sock.receive(packet);
+ if (ret == 0) {
+ if (packet.getSize() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ m_packets.push(packet);
+ }
+ else
+ {
+ if (inetErrNo != EINTR) {
+ // TODO replace fprintf
+ fprintf(stderr, "Socket error: %s\n", inetErrMsg);
+ }
+ m_stop = true;
+ }
+ }
+}
+
diff --git a/lib/UdpSocket.h b/lib/UdpSocket.h
index f51e87c..81a7d2b 100644
--- a/lib/UdpSocket.h
+++ b/lib/UdpSocket.h
@@ -31,6 +31,7 @@
#endif
#include "InetAddress.h"
+#include "ThreadsafeQueue.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
@@ -45,6 +46,8 @@
#include <stdlib.h>
#include <iostream>
#include <vector>
+#include <thread>
+#include <atomic>
class UdpPacket;
@@ -172,3 +175,29 @@ class UdpPacket
InetAddress address;
};
+/* Threaded UDP receiver */
+class UdpReceiver {
+ public:
+ UdpReceiver() : m_port(0), m_thread(), m_stop(false), m_packets() {}
+ ~UdpReceiver();
+ UdpReceiver(const UdpReceiver&) = delete;
+ UdpReceiver operator=(const UdpReceiver&) = delete;
+
+ // Start the receiver in a separate thread
+ void start(int port);
+
+ // Get the data contained in a UDP packet, blocks if none available
+ // In case of error, throws a runtime_error
+ std::vector<uint8_t> get_packet_buffer(void);
+
+ private:
+ void m_run(void);
+
+ int m_port;
+ std::thread m_thread;
+ std::atomic<bool> m_stop;
+ ThreadsafeQueue<UdpPacket> m_packets;
+ UdpSocket m_sock;
+};
+
+