aboutsummaryrefslogtreecommitdiffstats
path: root/lib/Socket.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:06:09 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2021-01-16 08:06:09 +0100
commitd14814a92377084177753c7a60d83a9307ad0672 (patch)
tree3b901eed7eaacc07341d16dbcd0db0d60951a5e0 /lib/Socket.cpp
parent0efb3830dcd441ffdb53ebe69f2dc2886614fb8b (diff)
downloaddabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.gz
dabmod-d14814a92377084177753c7a60d83a9307ad0672.tar.bz2
dabmod-d14814a92377084177753c7a60d83a9307ad0672.zip
Update common code to latest, update zmq.hpp and adapt
Diffstat (limited to 'lib/Socket.cpp')
-rw-r--r--lib/Socket.cpp154
1 files changed, 93 insertions, 61 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index d41ed1c..c876f32 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -66,6 +66,18 @@ void InetAddress::resolveUdpDestination(const std::string& destination, int port
}
}
+string InetAddress::to_string() const
+{
+ char received_from_str[64] = {};
+ sockaddr *addr = reinterpret_cast<sockaddr*>(&addr);
+ const char* ret = inet_ntop(AF_INET, addr, received_from_str, 63);
+
+ if (ret == nullptr) {
+ throw invalid_argument(string("Error converting InetAddress") + strerror(errno));
+ }
+ return ret;
+}
+
UDPPacket::UDPPacket() { }
UDPPacket::UDPPacket(size_t initSize) :
@@ -74,24 +86,37 @@ UDPPacket::UDPPacket(size_t initSize) :
{ }
-UDPSocket::UDPSocket() :
- m_sock(INVALID_SOCKET)
+UDPSocket::UDPSocket()
{
reinit(0, "");
}
-UDPSocket::UDPSocket(int port) :
- m_sock(INVALID_SOCKET)
+UDPSocket::UDPSocket(int port)
{
reinit(port, "");
}
-UDPSocket::UDPSocket(int port, const std::string& name) :
- m_sock(INVALID_SOCKET)
+UDPSocket::UDPSocket(int port, const std::string& name)
{
reinit(port, name);
}
+UDPSocket::UDPSocket(UDPSocket&& other)
+{
+ m_sock = other.m_sock;
+ m_port = other.m_port;
+ other.m_port = 0;
+ other.m_sock = INVALID_SOCKET;
+}
+
+const UDPSocket& UDPSocket::operator=(UDPSocket&& other)
+{
+ m_sock = other.m_sock;
+ m_port = other.m_port;
+ other.m_port = 0;
+ other.m_sock = INVALID_SOCKET;
+ return *this;
+}
void UDPSocket::setBlocking(bool block)
{
@@ -112,6 +137,8 @@ void UDPSocket::reinit(int port, const std::string& name)
::close(m_sock);
}
+ m_port = port;
+
if (port == 0) {
// No need to bind to a given port, creating the
// socket is enough
@@ -276,72 +303,71 @@ void UDPSocket::setMulticastTTL(int ttl)
}
}
-UDPReceiver::UDPReceiver() { }
-
-UDPReceiver::~UDPReceiver() {
- m_stop = true;
- m_sock.close();
- if (m_thread.joinable()) {
- m_thread.join();
- }
+SOCKET UDPSocket::getNativeSocket() const
+{
+ return m_sock;
}
-void UDPReceiver::start(int port, const string& bindto, const string& mcastaddr, size_t max_packets_queued) {
- m_port = port;
- m_bindto = bindto;
- m_mcastaddr = mcastaddr;
- m_max_packets_queued = max_packets_queued;
- m_thread = std::thread(&UDPReceiver::m_run, this);
+int UDPSocket::getPort() const
+{
+ return m_port;
}
-std::vector<uint8_t> UDPReceiver::get_packet_buffer()
-{
- if (m_stop) {
- throw runtime_error("UDP Receiver not running");
- }
+void UDPReceiver::add_receive_port(int port, const string& bindto, const string& mcastaddr) {
+ UDPSocket sock;
- UDPPacket p;
- m_packets.wait_and_pop(p);
+ if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) {
+ sock.reinit(port, mcastaddr);
+ sock.setMulticastSource(bindto.c_str());
+ sock.joinGroup(mcastaddr.c_str(), bindto.c_str());
+ }
+ else {
+ sock.reinit(port, bindto);
+ }
- return p.buffer;
+ m_sockets.push_back(move(sock));
}
-void UDPReceiver::m_run()
+vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)
{
- // Ensure that stop is set to true in case of exception or return
- struct SetStopOnDestruct {
- SetStopOnDestruct(atomic<bool>& stop) : m_stop(stop) {}
- ~SetStopOnDestruct() { m_stop = true; }
- private: atomic<bool>& m_stop;
- } autoSetStop(m_stop);
-
- if (IN_MULTICAST(ntohl(inet_addr(m_mcastaddr.c_str())))) {
- m_sock.reinit(m_port, m_mcastaddr);
- m_sock.setMulticastSource(m_bindto.c_str());
- m_sock.joinGroup(m_mcastaddr.c_str(), m_bindto.c_str());
+ constexpr size_t MAX_FDS = 64;
+ struct pollfd fds[MAX_FDS];
+ if (m_sockets.size() > MAX_FDS) {
+ throw std::runtime_error("UDPReceiver only supports up to 64 ports");
}
- else {
- m_sock.reinit(m_port, m_bindto);
+
+ for (size_t i = 0; i < m_sockets.size(); i++) {
+ fds[i].fd = m_sockets[i].getNativeSocket();
+ fds[i].events = POLLIN;
}
- while (not m_stop) {
- constexpr size_t packsize = 8192;
- try {
- auto packet = m_sock.receive(packsize);
- if (packet.buffer.size() == packsize) {
- // TODO replace fprintf
- fprintf(stderr, "Warning, possible UDP truncation\n");
- }
+ int retval = poll(fds, m_sockets.size(), timeout_ms);
- // If this blocks, the UDP socket will lose incoming packets
- m_packets.push_wait_if_full(packet, m_max_packets_queued);
- }
- catch (const std::runtime_error& e) {
- // TODO replace fprintf
- // TODO handle intr
- fprintf(stderr, "Socket error: %s\n", e.what());
- m_stop = true;
+ if (retval == -1 and errno == EINTR) {
+ throw Interrupted();
+ }
+ else if (retval == -1) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("UDP receive with poll() error: " + errstr);
+ }
+ else if (retval > 0) {
+ vector<ReceivedPacket> received;
+
+ for (size_t i = 0; i < m_sockets.size(); i++) {
+ if (fds[i].revents & POLLIN) {
+ auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU
+ ReceivedPacket rp;
+ rp.packetdata = move(p.buffer);
+ rp.received_from = move(p.address);
+ rp.port_received_on = m_sockets[i].getPort();
+ received.push_back(move(rp));
+ }
}
+
+ return received;
+ }
+ else {
+ throw Timeout();
}
}
@@ -862,9 +888,9 @@ TCPReceiveServer::~TCPReceiveServer()
}
}
-vector<uint8_t> TCPReceiveServer::receive()
+shared_ptr<TCPReceiveMessage> TCPReceiveServer::receive()
{
- vector<uint8_t> buffer;
+ shared_ptr<TCPReceiveMessage> buffer = make_shared<TCPReceiveMessageEmpty>();
m_queue.try_pop(buffer);
// we can ignore try_pop()'s return value, because
@@ -892,11 +918,12 @@ void TCPReceiveServer::process()
}
else if (r == 0) {
sock.close();
+ m_queue.push(make_shared<TCPReceiveMessageDisconnected>());
break;
}
else {
buf.resize(r);
- m_queue.push(move(buf));
+ m_queue.push(make_shared<TCPReceiveMessageData>(move(buf)));
}
}
catch (const TCPSocket::Interrupted&) {
@@ -905,6 +932,11 @@ void TCPReceiveServer::process()
catch (const TCPSocket::Timeout&) {
num_timeouts++;
}
+ catch (const runtime_error& e) {
+ sock.close();
+ // TODO replace fprintf
+ fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what());
+ }
if (num_timeouts > max_num_timeouts) {
sock.close();