aboutsummaryrefslogtreecommitdiffstats
path: root/lib/UdpSocket.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 11:53:15 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2017-01-13 11:53:15 +0100
commitea5594186bafa5489d6086a26d71b8f3d1ccf9cd (patch)
treea307b0882a867b415c68cd7d644241abe0c971e1 /lib/UdpSocket.cpp
parentf908d28e72887b68391a246ceb328cb52dcb2aaa (diff)
downloaddabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.gz
dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.tar.bz2
dabmod-ea5594186bafa5489d6086a26d71b8f3d1ccf9cd.zip
Add threaded UDP input for EDI
Diffstat (limited to 'lib/UdpSocket.cpp')
-rw-r--r--lib/UdpSocket.cpp62
1 files changed, 60 insertions, 2 deletions
diff --git a/lib/UdpSocket.cpp b/lib/UdpSocket.cpp
index 981d713..b88c731 100644
--- a/lib/UdpSocket.cpp
+++ b/lib/UdpSocket.cpp
@@ -167,8 +167,7 @@ int UdpSocket::send(const std::vector<uint8_t>& data, InetAddress destination)
/**
* Must be called to receive data on a multicast address.
- * @param groupname The multica
-st address to join.
+ * @param groupname The multicast address to join.
* @return 0 if ok, -1 if error
*/
int UdpSocket::joinGroup(char* groupname)
@@ -254,3 +253,62 @@ InetAddress UdpPacket::getAddress()
return address;
}
+UdpReceiver::~UdpReceiver() {
+ m_stop = true;
+ m_sock.close();
+ if (m_thread.joinable()) {
+ m_thread.join();
+ }
+}
+
+void UdpReceiver::start(int port) {
+ m_port = port;
+ m_thread = std::thread(&UdpReceiver::m_run, this);
+}
+
+std::vector<uint8_t> UdpReceiver::get_packet_buffer()
+{
+ if (m_stop) {
+ throw runtime_error("UDP Receiver not running");
+ }
+
+ UdpPacket p;
+ m_packets.wait_and_pop(p);
+
+ return p.getBuffer();
+}
+
+void UdpReceiver::m_run()
+{
+ // Ensure that stop is set to true in case of exception or return
+ struct SetStopOnDestruct {
+ SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
+ ~SetStopOnDestruct() { m_stop = true; }
+ private: atomic<bool>& m_stop;
+ } autoSetStop(m_stop);
+
+ m_sock.reinit(m_port, "0.0.0.0");
+
+ const size_t packsize = 8192;
+ UdpPacket packet(packsize);
+
+ while (not m_stop) {
+ int ret = m_sock.receive(packet);
+ if (ret == 0) {
+ if (packet.getSize() == packsize) {
+ // TODO replace fprintf
+ fprintf(stderr, "Warning, possible UDP truncation\n");
+ }
+ m_packets.push(packet);
+ }
+ else
+ {
+ if (inetErrNo != EINTR) {
+ // TODO replace fprintf
+ fprintf(stderr, "Socket error: %s\n", inetErrMsg);
+ }
+ m_stop = true;
+ }
+ }
+}
+