aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/Globals.cpp2
-rw-r--r--lib/Json.cpp128
-rw-r--r--lib/Json.h65
-rw-r--r--lib/RemoteControl.cpp61
-rw-r--r--lib/RemoteControl.h14
-rw-r--r--lib/Socket.cpp233
-rw-r--r--lib/Socket.h46
-rw-r--r--lib/ThreadsafeQueue.h54
-rw-r--r--lib/edi/ETIDecoder.cpp14
-rw-r--r--lib/edi/ETIDecoder.hpp4
-rw-r--r--lib/edi/common.cpp33
-rw-r--r--lib/edi/common.hpp12
12 files changed, 570 insertions, 96 deletions
diff --git a/lib/Globals.cpp b/lib/Globals.cpp
index 6be26ec..6bd38fb 100644
--- a/lib/Globals.cpp
+++ b/lib/Globals.cpp
@@ -32,5 +32,7 @@
// the RC needs logging, and needs to be initialised later.
Logger etiLog;
+#if ENABLE_REMOTECONTROL
RemoteControllers rcs;
+#endif // ENABLE_REMOTECONTROL
diff --git a/lib/Json.cpp b/lib/Json.cpp
new file mode 100644
index 0000000..361a149
--- /dev/null
+++ b/lib/Json.cpp
@@ -0,0 +1,128 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Copyright (C) 2023
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+#include <string>
+#include <iostream>
+#include <sstream>
+#include <iomanip>
+#include <string>
+#include <algorithm>
+
+#include "Json.h"
+
+namespace json {
+ static std::string escape_json(const std::string &s) {
+ std::ostringstream o;
+ for (auto c = s.cbegin(); c != s.cend(); c++) {
+ switch (*c) {
+ case '"': o << "\\\""; break;
+ case '\\': o << "\\\\"; break;
+ case '\b': o << "\\b"; break;
+ case '\f': o << "\\f"; break;
+ case '\n': o << "\\n"; break;
+ case '\r': o << "\\r"; break;
+ case '\t': o << "\\t"; break;
+ default:
+ if ('\x00' <= *c && *c <= '\x1f') {
+ o << "\\u"
+ << std::hex << std::setw(4) << std::setfill('0') << static_cast<int>(*c);
+ } else {
+ o << *c;
+ }
+ }
+ }
+ return o.str();
+ }
+
+ std::string map_to_json(const map_t& values) {
+ std::ostringstream ss;
+ ss << "{ ";
+ size_t ix = 0;
+ for (const auto& element : values) {
+ if (ix > 0) {
+ ss << ",";
+ }
+
+ ss << "\"" << escape_json(element.first) << "\": ";
+ ss << value_to_json(element.second);
+
+ ix++;
+ }
+ ss << " }";
+
+ return ss.str();
+ }
+
+ std::string value_to_json(const value_t& value)
+ {
+ std::ostringstream ss;
+
+ if (std::holds_alternative<std::string>(value.v)) {
+ ss << "\"" << escape_json(std::get<std::string>(value.v)) << "\"";
+ }
+ else if (std::holds_alternative<double>(value.v)) {
+ ss << std::fixed << std::get<double>(value.v);
+ }
+ else if (std::holds_alternative<uint64_t>(value.v)) {
+ ss << std::get<uint64_t>(value.v);
+ }
+ else if (std::holds_alternative<int64_t>(value.v)) {
+ ss << std::get<int64_t>(value.v);
+ }
+ else if (std::holds_alternative<uint32_t>(value.v)) {
+ ss << std::get<uint32_t>(value.v);
+ }
+ else if (std::holds_alternative<int32_t>(value.v)) {
+ ss << std::get<int32_t>(value.v);
+ }
+ else if (std::holds_alternative<bool>(value.v)) {
+ ss << (std::get<bool>(value.v) ? "true" : "false");
+ }
+ else if (std::holds_alternative<std::nullopt_t>(value.v)) {
+ ss << "null";
+ }
+ else if (std::holds_alternative<std::vector<json::value_t> >(value.v)) {
+ const auto& vec = std::get<std::vector<json::value_t> >(value.v);
+ ss << "[ ";
+ size_t list_ix = 0;
+ for (const auto& list_element : vec) {
+ if (list_ix > 0) {
+ ss << ",";
+ }
+ ss << value_to_json(list_element);
+ list_ix++;
+ }
+ ss << "]";
+ }
+ else if (std::holds_alternative<std::shared_ptr<json::map_t> >(value.v)) {
+ const map_t& v = *std::get<std::shared_ptr<json::map_t> >(value.v);
+ ss << map_to_json(v);
+ }
+ else {
+ throw std::logic_error("variant alternative not handled");
+ }
+
+ return ss.str();
+ }
+}
diff --git a/lib/Json.h b/lib/Json.h
new file mode 100644
index 0000000..b082f92
--- /dev/null
+++ b/lib/Json.h
@@ -0,0 +1,65 @@
+/*
+ Copyright (C) 2007, 2008, 2009, 2010, 2011, 2012
+ Her Majesty the Queen in Right of Canada (Communications Research
+ Center Canada)
+
+ Copyright (C) 2023
+ Matthias P. Braendli, matthias.braendli@mpb.li
+
+ http://www.opendigitalradio.org
+
+ This module adds remote-control capability to some of the dabmux/dabmod modules.
+ */
+/*
+ This program is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation, either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <https://www.gnu.org/licenses/>.
+ */
+
+#pragma once
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vector>
+#include <memory>
+#include <optional>
+#include <stdexcept>
+#include <string>
+#include <unordered_map>
+#include <variant>
+
+namespace json {
+
+ // STL containers are not required to support incomplete types,
+ // hence the shared_ptr
+
+ struct value_t {
+ std::variant<
+ std::shared_ptr<std::unordered_map<std::string, value_t>>,
+ std::vector<value_t>,
+ std::string,
+ double,
+ int64_t,
+ uint64_t,
+ int32_t,
+ uint32_t,
+ bool,
+ std::nullopt_t> v;
+ };
+
+ using map_t = std::unordered_map<std::string, value_t>;
+
+ std::string map_to_json(const map_t& values);
+ std::string value_to_json(const value_t& value);
+}
diff --git a/lib/RemoteControl.cpp b/lib/RemoteControl.cpp
index 9ca8d22..dca3373 100644
--- a/lib/RemoteControl.cpp
+++ b/lib/RemoteControl.cpp
@@ -25,11 +25,15 @@
#include <list>
#include <string>
#include <iostream>
+#include <sstream>
+#include <iomanip>
#include <string>
#include <algorithm>
#include "RemoteControl.h"
-#include "zmq.hpp"
+#if defined(HAVE_ZEROMQ)
+ #include "zmq.hpp"
+#endif
using namespace std;
@@ -100,6 +104,18 @@ std::list< std::vector<std::string> > RemoteControllers::get_param_list_values(c
return allparams;
}
+
+
+std::string RemoteControllers::get_showjson() {
+ json::map_t root;
+ for (auto &controllable : rcs.controllables) {
+ root[controllable->get_rc_name()].v =
+ std::make_shared<json::map_t>(controllable->get_all_values());
+ }
+
+ return json::map_to_json(root);
+}
+
std::string RemoteControllers::get_param(const std::string& name, const std::string& param) {
RemoteControllable* controllable = get_controllable_(name);
return controllable->get_parameter(param);
@@ -121,7 +137,7 @@ RemoteControllable* RemoteControllers::get_controllable_(const std::string& name
[&](RemoteControllable* r) { return r->get_rc_name() == name; });
if (rc == controllables.end()) {
- throw ParameterError("Module name unknown");
+ throw ParameterError(string{"Module name '"} + name + "' unknown");
}
else {
return *rc;
@@ -425,10 +441,15 @@ void RemoteControllerZmq::recv_all(zmq::socket_t& pSocket, std::vector<std::stri
bool more = true;
do {
zmq::message_t msg;
- pSocket.recv(msg);
- std::string incoming((char*)msg.data(), msg.size());
- message.push_back(incoming);
- more = msg.more();
+ const auto zresult = pSocket.recv(msg);
+ if (zresult) {
+ std::string incoming((char*)msg.data(), msg.size());
+ message.push_back(incoming);
+ more = msg.more();
+ }
+ else {
+ more = false;
+ }
} while (more);
}
@@ -455,6 +476,7 @@ void RemoteControllerZmq::send_fail_reply(zmq::socket_t &pSocket, const std::str
void RemoteControllerZmq::process()
{
m_fault = false;
+ m_active = true;
// create zmq reply socket for receiving ctrl parameters
try {
@@ -512,8 +534,21 @@ void RemoteControllerZmq::process()
repSocket.send(zmsg, (--cohort_size > 0) ? zmq::send_flags::sndmore : zmq::send_flags::none);
}
}
+ else if (msg.size() == 1 && command == "showjson") {
+ try {
+ std::string json = rcs.get_showjson();
+
+ zmq::message_t zmsg(json.size());
+ memcpy(zmsg.data(), json.data(), json.size());
+
+ repSocket.send(zmsg, zmq::send_flags::none);
+ }
+ catch (const ParameterError &err) {
+ send_fail_reply(repSocket, err.what());
+ }
+ }
else if (msg.size() == 2 && command == "show") {
- std::string module((char*) msg[1].data(), msg[1].size());
+ const std::string module((char*) msg[1].data(), msg[1].size());
try {
list< vector<string> > r = rcs.get_param_list_values(module);
size_t r_size = r.size();
@@ -531,8 +566,8 @@ void RemoteControllerZmq::process()
}
}
else if (msg.size() == 3 && command == "get") {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
+ const std::string module((char*) msg[1].data(), msg[1].size());
+ const std::string parameter((char*) msg[2].data(), msg[2].size());
try {
std::string value = rcs.get_param(module, parameter);
@@ -545,9 +580,9 @@ void RemoteControllerZmq::process()
}
}
else if (msg.size() == 4 && command == "set") {
- std::string module((char*) msg[1].data(), msg[1].size());
- std::string parameter((char*) msg[2].data(), msg[2].size());
- std::string value((char*) msg[3].data(), msg[3].size());
+ const std::string module((char*) msg[1].data(), msg[1].size());
+ const std::string parameter((char*) msg[2].data(), msg[2].size());
+ const std::string value((char*) msg[3].data(), msg[3].size());
try {
rcs.set_param(module, parameter, value);
@@ -559,7 +594,7 @@ void RemoteControllerZmq::process()
}
else {
send_fail_reply(repSocket,
- "Unsupported command. commands: list, show, get, set");
+ "Unsupported command. commands: list, show, get, set, showjson");
}
}
}
diff --git a/lib/RemoteControl.h b/lib/RemoteControl.h
index 2358b3a..7dd763d 100644
--- a/lib/RemoteControl.h
+++ b/lib/RemoteControl.h
@@ -3,7 +3,7 @@
Her Majesty the Queen in Right of Canada (Communications Research
Center Canada)
- Copyright (C) 2019
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,11 +31,15 @@
# include "config.h"
#endif
+#define ENABLE_REMOTECONTROL 1
+
#if defined(HAVE_ZEROMQ)
# include "zmq.hpp"
#endif
#include <list>
+#include <unordered_map>
+#include <variant>
#include <map>
#include <memory>
#include <string>
@@ -46,6 +50,7 @@
#include "Log.h"
#include "Socket.h"
+#include "Json.h"
#define RC_ADD_PARAMETER(p, desc) { \
std::vector<std::string> p; \
@@ -113,13 +118,13 @@ class RemoteControllable {
}
/* Base function to set parameters. */
- virtual void set_parameter(
- const std::string& parameter,
- const std::string& value) = 0;
+ virtual void set_parameter(const std::string& parameter, const std::string& value) = 0;
/* Getting a parameter always returns a string. */
virtual const std::string get_parameter(const std::string& parameter) const = 0;
+ virtual const json::map_t get_all_values() const = 0;
+
protected:
std::string m_rc_name;
std::list< std::vector<std::string> > m_parameters;
@@ -135,6 +140,7 @@ class RemoteControllers {
void check_faults();
std::list< std::vector<std::string> > get_param_list_values(const std::string& name);
std::string get_param(const std::string& name, const std::string& param);
+ std::string get_showjson();
void set_param(
const std::string& name,
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index 1ff6418..938b573 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2022
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -24,12 +24,13 @@
#include "Socket.h"
-#include <iostream>
+#include <stdexcept>
#include <cstdio>
#include <cstring>
#include <cerrno>
#include <fcntl.h>
#include <poll.h>
+#include <netinet/tcp.h>
namespace Socket {
@@ -105,16 +106,20 @@ UDPSocket::UDPSocket(UDPSocket&& other)
{
m_sock = other.m_sock;
m_port = other.m_port;
+ m_multicast_source = other.m_multicast_source;
other.m_port = 0;
other.m_sock = INVALID_SOCKET;
+ other.m_multicast_source = "";
}
const UDPSocket& UDPSocket::operator=(UDPSocket&& other)
{
m_sock = other.m_sock;
m_port = other.m_port;
+ m_multicast_source = other.m_multicast_source;
other.m_port = 0;
other.m_sock = INVALID_SOCKET;
+ other.m_multicast_source = "";
return *this;
}
@@ -143,6 +148,7 @@ void UDPSocket::reinit(int port, const std::string& name)
// No need to bind to a given port, creating the
// socket is enough
m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ post_init();
return;
}
@@ -179,6 +185,7 @@ void UDPSocket::reinit(int port, const std::string& name)
if (::bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
m_sock = sfd;
+ post_init();
break;
}
@@ -188,10 +195,47 @@ void UDPSocket::reinit(int port, const std::string& name)
freeaddrinfo(result);
if (rp == nullptr) {
- throw runtime_error("Could not bind");
+ throw runtime_error(string{"Could not bind to port "} + to_string(port));
+ }
+}
+
+void UDPSocket::post_init() {
+ int pktinfo = 1;
+ if (setsockopt(m_sock, IPPROTO_IP, IP_PKTINFO, &pktinfo, sizeof(pktinfo)) == SOCKET_ERROR) {
+ throw runtime_error(string("Can't request pktinfo: ") + strerror(errno));
+ }
+
+}
+
+void UDPSocket::init_receive_multicast(int port, const string& local_if_addr, const string& mcastaddr)
+{
+ if (m_sock != INVALID_SOCKET) {
+ ::close(m_sock);
+ }
+
+ m_port = port;
+ m_sock = ::socket(AF_INET, SOCK_DGRAM, 0);
+ post_init();
+
+ int reuse_setting = 1;
+ if (setsockopt(m_sock, SOL_SOCKET, SO_REUSEADDR, &reuse_setting, sizeof(reuse_setting)) == SOCKET_ERROR) {
+ throw runtime_error("Can't reuse address");
+ }
+
+ struct sockaddr_in la;
+ memset((char *) &la, 0, sizeof(la));
+ la.sin_family = AF_INET;
+ la.sin_port = htons(port);
+ la.sin_addr.s_addr = INADDR_ANY;
+ if (::bind(m_sock, (struct sockaddr*)&la, sizeof(la))) {
+ throw runtime_error(string("Could not bind: ") + strerror(errno));
}
+
+ m_multicast_source = mcastaddr;
+ join_group(mcastaddr.c_str(), local_if_addr.c_str());
}
+
void UDPSocket::close()
{
if (m_sock != INVALID_SOCKET) {
@@ -211,16 +255,26 @@ UDPSocket::~UDPSocket()
UDPPacket UDPSocket::receive(size_t max_size)
{
+ struct sockaddr_in addr;
+ struct msghdr msg;
+ struct iovec iov;
+ constexpr size_t BUFFER_SIZE = 1024;
+ char control_buffer[BUFFER_SIZE];
+ struct cmsghdr *cmsg;
+
UDPPacket packet(max_size);
- socklen_t addrSize;
- addrSize = sizeof(*packet.address.as_sockaddr());
- ssize_t ret = recvfrom(m_sock,
- packet.buffer.data(),
- packet.buffer.size(),
- 0,
- packet.address.as_sockaddr(),
- &addrSize);
+ memset(&msg, 0, sizeof(msg));
+ msg.msg_name = &addr;
+ msg.msg_namelen = sizeof(addr);
+ msg.msg_iov = &iov;
+ iov.iov_base = packet.buffer.data();
+ iov.iov_len = packet.buffer.size();
+ msg.msg_iovlen = 1;
+ msg.msg_control = control_buffer;
+ msg.msg_controllen = sizeof(control_buffer);
+
+ ssize_t ret = recvmsg(m_sock, &msg, 0);
if (ret == SOCKET_ERROR) {
packet.buffer.resize(0);
@@ -231,12 +285,42 @@ UDPPacket UDPSocket::receive(size_t max_size)
if (errno == EAGAIN or errno == EWOULDBLOCK)
#endif
{
- return 0;
+ return packet;
}
throw runtime_error(string("Can't receive data: ") + strerror(errno));
}
- packet.buffer.resize(ret);
+ struct in_pktinfo *pktinfo = nullptr;
+ for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
+ pktinfo = (struct in_pktinfo *)CMSG_DATA(cmsg);
+ break;
+ }
+ }
+
+ if (pktinfo) {
+ char src_addr[INET_ADDRSTRLEN];
+ char dst_addr[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &(addr.sin_addr), src_addr, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &(pktinfo->ipi_addr), dst_addr, INET_ADDRSTRLEN);
+ //fprintf(stderr, "Received packet from %s to %s: %zu\n", src_addr, dst_addr, ret);
+
+ memcpy(&packet.address.addr, &addr, sizeof(addr));
+
+ if (m_multicast_source.empty() or
+ strcmp(dst_addr, m_multicast_source.c_str()) == 0) {
+ packet.buffer.resize(ret);
+ }
+ else {
+ // Ignore packet for different multicast group
+ packet.buffer.resize(0);
+ }
+ }
+ else {
+ //fprintf(stderr, "No pktinfo: %zu\n", ret);
+ packet.buffer.resize(ret);
+ }
+
return packet;
}
@@ -268,14 +352,14 @@ void UDPSocket::send(const std::string& data, InetAddress destination)
}
}
-void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
+void UDPSocket::join_group(const char* groupname, const char* if_addr)
{
ip_mreqn group;
if ((group.imr_multiaddr.s_addr = inet_addr(groupname)) == INADDR_NONE) {
throw runtime_error("Cannot convert multicast group name");
}
if (!IN_MULTICAST(ntohl(group.imr_multiaddr.s_addr))) {
- throw runtime_error("Group name is not a multicast address");
+ throw runtime_error(string("Group name '") + groupname + "' is not a multicast address");
}
if (if_addr) {
@@ -287,7 +371,7 @@ void UDPSocket::joinGroup(const char* groupname, const char* if_addr)
group.imr_ifindex = 0;
if (setsockopt(m_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &group, sizeof(group))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't join multicast group") + strerror(errno));
+ throw runtime_error(string("Can't join multicast group: ") + strerror(errno));
}
}
@@ -295,12 +379,12 @@ void UDPSocket::setMulticastSource(const char* source_addr)
{
struct in_addr addr;
if (inet_aton(source_addr, &addr) == 0) {
- throw runtime_error(string("Can't parse source address") + strerror(errno));
+ throw runtime_error(string("Can't parse source address: ") + strerror(errno));
}
if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_IF, &addr, sizeof(addr))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't set source address") + strerror(errno));
+ throw runtime_error(string("Can't set source address: ") + strerror(errno));
}
}
@@ -308,7 +392,7 @@ void UDPSocket::setMulticastTTL(int ttl)
{
if (setsockopt(m_sock, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl))
== SOCKET_ERROR) {
- throw runtime_error(string("Can't set multicast ttl") + strerror(errno));
+ throw runtime_error(string("Can't set multicast ttl: ") + strerror(errno));
}
}
@@ -326,15 +410,13 @@ void UDPReceiver::add_receive_port(int port, const string& bindto, const string&
UDPSocket sock;
if (IN_MULTICAST(ntohl(inet_addr(mcastaddr.c_str())))) {
- sock.reinit(port, mcastaddr);
- sock.setMulticastSource(bindto.c_str());
- sock.joinGroup(mcastaddr.c_str(), bindto.c_str());
+ sock.init_receive_multicast(port, bindto, mcastaddr);
}
else {
sock.reinit(port, bindto);
}
- m_sockets.push_back(move(sock));
+ m_sockets.push_back(std::move(sock));
}
vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)
@@ -365,11 +447,13 @@ vector<UDPReceiver::ReceivedPacket> UDPReceiver::receive(int timeout_ms)
for (size_t i = 0; i < m_sockets.size(); i++) {
if (fds[i].revents & POLLIN) {
auto p = m_sockets[i].receive(2048); // This is larger than the usual MTU
- ReceivedPacket rp;
- rp.packetdata = move(p.buffer);
- rp.received_from = move(p.address);
- rp.port_received_on = m_sockets[i].getPort();
- received.push_back(move(rp));
+ if (not p.buffer.empty()) {
+ ReceivedPacket rp;
+ rp.packetdata = std::move(p.buffer);
+ rp.received_from = std::move(p.address);
+ rp.port_received_on = m_sockets[i].getPort();
+ received.push_back(std::move(rp));
+ }
}
}
@@ -394,7 +478,7 @@ TCPSocket::~TCPSocket()
TCPSocket::TCPSocket(TCPSocket&& other) :
m_sock(other.m_sock),
- m_remote_address(move(other.m_remote_address))
+ m_remote_address(std::move(other.m_remote_address))
{
if (other.m_sock != -1) {
other.m_sock = -1;
@@ -610,6 +694,37 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)
}
}
+void TCPSocket::enable_keepalive(int time, int intvl, int probes)
+{
+ if (m_sock == INVALID_SOCKET) {
+ throw std::logic_error("You may not call enable_keepalive on invalid socket");
+ }
+ int optval = 1;
+ auto optlen = sizeof(optval);
+ if (setsockopt(m_sock, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set SO_KEEPALIVE: " + errstr);
+ }
+
+ optval = time;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPIDLE, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPIDLE: " + errstr);
+ }
+
+ optval = intvl;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPINTVL, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPINTVL: " + errstr);
+ }
+
+ optval = probes;
+ if (setsockopt(m_sock, SOL_TCP, TCP_KEEPCNT, &optval, optlen) < 0) {
+ std::string errstr(strerror(errno));
+ throw std::runtime_error("TCP: Could not set TCP_KEEPCNT: " + errstr);
+ }
+}
+
void TCPSocket::listen(int port, const string& name)
{
if (m_sock != INVALID_SOCKET) {
@@ -852,22 +967,33 @@ ssize_t TCPClient::recv(void *buffer, size_t length, int flags, int timeout_ms)
reconnect();
}
+ m_last_received_packet_ts = chrono::steady_clock::now();
+
return ret;
}
catch (const TCPSocket::Interrupted&) {
return -1;
}
catch (const TCPSocket::Timeout&) {
+ const auto timeout = chrono::milliseconds(timeout_ms * 5);
+ if (m_last_received_packet_ts.has_value() and
+ chrono::steady_clock::now() - *m_last_received_packet_ts > timeout)
+ {
+ // This is to catch half-closed TCP connections
+ reconnect();
+ }
+
return 0;
}
- return 0;
+ throw std::logic_error("unreachable");
}
void TCPClient::reconnect()
{
TCPSocket newsock;
m_sock = std::move(newsock);
+ m_last_received_packet_ts = nullopt;
m_sock.connect(m_hostname, m_port, true);
}
@@ -875,7 +1001,7 @@ TCPConnection::TCPConnection(TCPSocket&& sock) :
queue(),
m_running(true),
m_sender_thread(),
- m_sock(move(sock))
+ m_sock(std::move(sock))
{
#if MISSING_OWN_ADDR
auto own_addr = m_sock.getOwnAddress();
@@ -938,8 +1064,9 @@ void TCPConnection::process()
}
-TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size) :
- m_max_queue_size(max_queue_size)
+TCPDataDispatcher::TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll) :
+ m_max_queue_size(max_queue_size),
+ m_buffers_to_preroll(buffers_to_preroll)
{
}
@@ -967,12 +1094,20 @@ void TCPDataDispatcher::write(const vector<uint8_t>& data)
throw runtime_error(m_exception_data);
}
+ auto lock = unique_lock<mutex>(m_mutex);
+
+ if (m_buffers_to_preroll > 0) {
+ m_preroll_queue.push_back(data);
+ if (m_preroll_queue.size() > m_buffers_to_preroll) {
+ m_preroll_queue.pop_front();
+ }
+ }
+
for (auto& connection : m_connections) {
connection.queue.push(data);
}
- m_connections.remove_if(
- [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
+ m_connections.remove_if( [&](const TCPConnection& conn){ return conn.queue.size() > m_max_queue_size; });
}
void TCPDataDispatcher::process()
@@ -984,7 +1119,14 @@ void TCPDataDispatcher::process()
// Add a new TCPConnection to the list, constructing it from the client socket
auto sock = m_listener_socket.accept(timeout_ms);
if (sock.valid()) {
- m_connections.emplace(m_connections.begin(), move(sock));
+ auto lock = unique_lock<mutex>(m_mutex);
+ m_connections.emplace(m_connections.begin(), std::move(sock));
+
+ if (m_buffers_to_preroll > 0) {
+ for (const auto& buf : m_preroll_queue) {
+ m_connections.front().queue.push(buf);
+ }
+ }
}
}
}
@@ -1050,7 +1192,7 @@ void TCPReceiveServer::process()
}
else {
buf.resize(r);
- m_queue.push(make_shared<TCPReceiveMessageData>(move(buf)));
+ m_queue.push(make_shared<TCPReceiveMessageData>(std::move(buf)));
}
}
catch (const TCPSocket::Interrupted&) {
@@ -1091,7 +1233,7 @@ TCPSendClient::~TCPSendClient()
}
}
-void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
+TCPSendClient::ErrorStats TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
{
if (not m_running) {
throw runtime_error(m_exception_data);
@@ -1103,6 +1245,17 @@ void TCPSendClient::sendall(const std::vector<uint8_t>& buffer)
vector<uint8_t> discard;
m_queue.try_pop(discard);
}
+
+ TCPSendClient::ErrorStats es;
+ es.num_reconnects = m_num_reconnects.load();
+
+ es.has_seen_new_errors = es.num_reconnects != m_num_reconnects_prev;
+ m_num_reconnects_prev = es.num_reconnects;
+
+ auto lock = unique_lock<mutex>(m_error_mutex);
+ es.last_error = m_last_error;
+
+ return es;
}
void TCPSendClient::process()
@@ -1124,12 +1277,16 @@ void TCPSendClient::process()
}
else {
try {
+ m_num_reconnects.fetch_add(1, std::memory_order_seq_cst);
m_sock.connect(m_hostname, m_port);
m_is_connected = true;
}
catch (const runtime_error& e) {
m_is_connected = false;
this_thread::sleep_for(chrono::seconds(1));
+
+ auto lock = unique_lock<mutex>(m_error_mutex);
+ m_last_error = e.what();
}
}
}
diff --git a/lib/Socket.h b/lib/Socket.h
index f5143a0..7709145 100644
--- a/lib/Socket.h
+++ b/lib/Socket.h
@@ -2,7 +2,7 @@
Copyright (C) 2003, 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the
Queen in Right of Canada (Communications Research Center Canada)
- Copyright (C) 2020
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://www.opendigitalradio.org
@@ -31,9 +31,11 @@
#include "ThreadsafeQueue.h"
#include <cstdlib>
#include <atomic>
-#include <iostream>
+#include <chrono>
#include <list>
#include <memory>
+#include <optional>
+#include <string>
#include <thread>
#include <vector>
@@ -111,13 +113,13 @@ class UDPSocket
/** Close the already open socket, and create a new one. Throws a runtime_error on error. */
void reinit(int port);
void reinit(int port, const std::string& name);
+ void init_receive_multicast(int port, const std::string& local_if_addr, const std::string& mcastaddr);
void close(void);
void send(UDPPacket& packet);
void send(const std::vector<uint8_t>& data, InetAddress destination);
void send(const std::string& data, InetAddress destination);
UDPPacket receive(size_t max_size);
- void joinGroup(const char* groupname, const char* if_addr = nullptr);
void setMulticastSource(const char* source_addr);
void setMulticastTTL(int ttl);
@@ -129,9 +131,14 @@ class UDPSocket
SOCKET getNativeSocket() const;
int getPort() const;
+ private:
+ void join_group(const char* groupname, const char* if_addr = nullptr);
+ void post_init();
+
protected:
SOCKET m_sock = INVALID_SOCKET;
int m_port = 0;
+ std::string m_multicast_source = "";
};
/* UDP packet receiver supporting receiving from several ports at once */
@@ -173,6 +180,11 @@ class TCPSocket {
void listen(int port, const std::string& name);
void close(void);
+ /* Enable TCP keepalive. See
+ * https://tldp.org/HOWTO/TCP-Keepalive-HOWTO/usingkeepalive.html
+ */
+ void enable_keepalive(int time, int intvl, int probes);
+
/* throws a runtime_error on failure, an invalid socket on timeout */
TCPSocket accept(int timeout_ms);
@@ -226,6 +238,8 @@ class TCPClient {
TCPSocket m_sock;
std::string m_hostname;
int m_port;
+
+ std::optional<std::chrono::steady_clock::time_point> m_last_received_packet_ts;
};
/* Helper class for TCPDataDispatcher, contains a queue of pending data and
@@ -254,7 +268,7 @@ class TCPConnection
class TCPDataDispatcher
{
public:
- TCPDataDispatcher(size_t max_queue_size);
+ TCPDataDispatcher(size_t max_queue_size, size_t buffers_to_preroll);
~TCPDataDispatcher();
TCPDataDispatcher(const TCPDataDispatcher&) = delete;
TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;
@@ -266,11 +280,16 @@ class TCPDataDispatcher
void process();
size_t m_max_queue_size;
+ size_t m_buffers_to_preroll;
+
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
std::string m_exception_data;
std::thread m_listener_thread;
TCPSocket m_listener_socket;
+
+ std::mutex m_mutex;
+ std::deque<std::vector<uint8_t> > m_preroll_queue;
std::list<TCPConnection> m_connections;
};
@@ -314,10 +333,18 @@ class TCPSendClient {
public:
TCPSendClient(const std::string& hostname, int port);
~TCPSendClient();
+ TCPSendClient(const TCPSendClient&) = delete;
+ TCPSendClient& operator=(const TCPSendClient&) = delete;
- /* Throws a runtime_error on error
- */
- void sendall(const std::vector<uint8_t>& buffer);
+
+ struct ErrorStats {
+ std::string last_error = "";
+ size_t num_reconnects = 0;
+ bool has_seen_new_errors = false;
+ };
+
+ /* Throws a runtime_error when the process thread isn't running */
+ ErrorStats sendall(const std::vector<uint8_t>& buffer);
private:
void process();
@@ -334,6 +361,11 @@ class TCPSendClient {
std::string m_exception_data;
std::thread m_sender_thread;
TCPSocket m_listener_socket;
+
+ std::atomic<size_t> m_num_reconnects = ATOMIC_VAR_INIT(0);
+ size_t m_num_reconnects_prev = 0;
+ std::mutex m_error_mutex;
+ std::string m_last_error = "";
};
}
diff --git a/lib/ThreadsafeQueue.h b/lib/ThreadsafeQueue.h
index 815dfe0..8b385d6 100644
--- a/lib/ThreadsafeQueue.h
+++ b/lib/ThreadsafeQueue.h
@@ -2,7 +2,7 @@
Copyright (C) 2007, 2008, 2009, 2010, 2011 Her Majesty the Queen in
Right of Canada (Communications Research Center Canada)
- Copyright (C) 2018
+ Copyright (C) 2023
Matthias P. Braendli, matthias.braendli@mpb.li
An implementation for a threadsafe queue, depends on C++11
@@ -32,6 +32,7 @@
#include <condition_variable>
#include <queue>
#include <utility>
+#include <cassert>
/* This queue is meant to be used by two threads. One producer
* that pushes elements into the queue, and one consumer that
@@ -69,7 +70,6 @@ public:
}
size_t queue_size = the_queue.size();
lock.unlock();
-
the_rx_notification.notify_one();
return queue_size;
@@ -93,11 +93,57 @@ public:
return queue_size;
}
+ struct push_overflow_result { bool overflowed; size_t new_size; };
+
+ /* Push one element into the queue, and if queue is
+ * full remove one element from the other end.
+ *
+ * max_size == 0 is not allowed.
+ *
+ * returns the new queue size and a flag if overflow occurred.
+ */
+ push_overflow_result push_overflow(T const& val, size_t max_size)
+ {
+ assert(max_size > 0);
+ std::unique_lock<std::mutex> lock(the_mutex);
+
+ bool overflow = false;
+ while (the_queue.size() >= max_size) {
+ overflow = true;
+ the_queue.pop();
+ }
+ the_queue.push(val);
+ const size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return {overflow, queue_size};
+ }
+
+ push_overflow_result push_overflow(T&& val, size_t max_size)
+ {
+ assert(max_size > 0);
+ std::unique_lock<std::mutex> lock(the_mutex);
+
+ bool overflow = false;
+ while (the_queue.size() >= max_size) {
+ overflow = true;
+ the_queue.pop();
+ }
+ the_queue.emplace(std::move(val));
+ const size_t queue_size = the_queue.size();
+ lock.unlock();
+
+ the_rx_notification.notify_one();
+
+ return {overflow, queue_size};
+ }
+
+
/* Push one element into the queue, but wait until the
* queue size goes below the threshold.
*
- * Notify waiting thread.
- *
* returns the new queue size.
*/
size_t push_wait_if_full(T const& val, size_t threshold)
diff --git a/lib/edi/ETIDecoder.cpp b/lib/edi/ETIDecoder.cpp
index 0a4da54..1a726cf 100644
--- a/lib/edi/ETIDecoder.cpp
+++ b/lib/edi/ETIDecoder.cpp
@@ -1,5 +1,5 @@
/*
- Copyright (C) 2020
+ Copyright (C) 2024
Matthias P. Braendli, matthias.braendli@mpb.li
http://opendigitalradio.org
@@ -44,7 +44,7 @@ ETIDecoder::ETIDecoder(ETIDataCollector& data_collector) :
std::bind(&ETIDecoder::decode_estn, this, _1, _2));
m_dispatcher.register_tag("*dmy",
std::bind(&ETIDecoder::decode_stardmy, this, _1, _2));
- m_dispatcher.register_tagpacket_handler(std::bind(&ETIDecoder::decode_tagpacket, this, _1));
+ m_dispatcher.register_afpacket_handler(std::bind(&ETIDecoder::decode_afpacket, this, _1));
}
void ETIDecoder::set_verbose(bool verbose)
@@ -174,7 +174,7 @@ bool ETIDecoder::decode_deti(const std::vector<uint8_t>& value, const tag_name_t
fic.begin());
i += fic_length;
- m_data_collector.update_fic(move(fic));
+ m_data_collector.update_fic(std::move(fic));
}
if (rfudf) {
@@ -215,7 +215,7 @@ bool ETIDecoder::decode_estn(const std::vector<uint8_t>& value, const tag_name_t
value.end(),
back_inserter(stc.mst));
- m_data_collector.add_subchannel(move(stc));
+ m_data_collector.add_subchannel(std::move(stc));
return true;
}
@@ -225,9 +225,9 @@ bool ETIDecoder::decode_stardmy(const std::vector<uint8_t>&, const tag_name_t&)
return true;
}
-bool ETIDecoder::decode_tagpacket(const std::vector<uint8_t>& value)
+bool ETIDecoder::decode_afpacket(std::vector<uint8_t>&& value)
{
- m_received_tagpacket.tagpacket = value;
+ m_received_tagpacket.afpacket = std::move(value);
return true;
}
@@ -237,7 +237,7 @@ void ETIDecoder::packet_completed()
ReceivedTagPacket tp;
swap(tp, m_received_tagpacket);
- m_data_collector.assemble(move(tp));
+ m_data_collector.assemble(std::move(tp));
}
}
diff --git a/lib/edi/ETIDecoder.hpp b/lib/edi/ETIDecoder.hpp
index 3949a14..1ad6c64 100644
--- a/lib/edi/ETIDecoder.hpp
+++ b/lib/edi/ETIDecoder.hpp
@@ -58,7 +58,7 @@ struct eti_stc_data {
};
struct ReceivedTagPacket {
- std::vector<uint8_t> tagpacket;
+ std::vector<uint8_t> afpacket;
frame_timestamp_t timestamp;
seq_info_t seq;
};
@@ -133,7 +133,7 @@ class ETIDecoder {
bool decode_estn(const std::vector<uint8_t>& value, const tag_name_t& n);
bool decode_stardmy(const std::vector<uint8_t>& value, const tag_name_t& n);
- bool decode_tagpacket(const std::vector<uint8_t>& value);
+ bool decode_afpacket(std::vector<uint8_t>&& value);
void packet_completed();
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index c99997a..38eadf9 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -33,9 +33,9 @@ namespace EdiDecoder {
using namespace std;
-bool frame_timestamp_t::valid() const
+bool frame_timestamp_t::is_valid() const
{
- return tsta != 0xFFFFFF;
+ return tsta != 0xFFFFFF and seconds != 0;
}
string frame_timestamp_t::to_string() const
@@ -43,7 +43,7 @@ string frame_timestamp_t::to_string() const
const time_t seconds_in_unix_epoch = to_unix_epoch();
stringstream ss;
- if (valid()) {
+ if (is_valid()) {
ss << "Timestamp: ";
}
else {
@@ -129,10 +129,9 @@ std::string tag_name_to_human_readable(const tag_name_t& name)
return s;
}
-TagDispatcher::TagDispatcher(
- std::function<void()>&& af_packet_completed) :
- m_af_packet_completed(move(af_packet_completed)),
- m_tagpacket_handler([](const std::vector<uint8_t>& /*ignore*/){})
+TagDispatcher::TagDispatcher(std::function<void()>&& af_packet_completed) :
+ m_af_packet_completed(std::move(af_packet_completed)),
+ m_afpacket_handler([](std::vector<uint8_t>&& /*ignore*/){})
{
}
@@ -278,7 +277,6 @@ void TagDispatcher::setMaxDelay(int num_af_packets)
}
-#define AFPACKET_HEADER_LEN 10 // includes SYNC
TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(
const std::vector<uint8_t> &input_data)
{
@@ -341,25 +339,30 @@ TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(
return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};
}
else {
+ vector<uint8_t> afpacket(AFPACKET_HEADER_LEN + taglength + crclen);
+ copy(input_data.begin(),
+ input_data.begin() + AFPACKET_HEADER_LEN + taglength + crclen,
+ afpacket.begin());
+ m_afpacket_handler(std::move(afpacket));
+
vector<uint8_t> payload(taglength);
copy(input_data.begin() + AFPACKET_HEADER_LEN,
input_data.begin() + AFPACKET_HEADER_LEN + taglength,
payload.begin());
- return {
- decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error,
- AFPACKET_HEADER_LEN + taglength + crclen};
+ auto result = decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error;
+ return {result, AFPACKET_HEADER_LEN + taglength + crclen};
}
}
void TagDispatcher::register_tag(const std::string& tag, tag_handler&& h)
{
- m_handlers[tag] = move(h);
+ m_handlers[tag] = std::move(h);
}
-void TagDispatcher::register_tagpacket_handler(tagpacket_handler&& h)
+void TagDispatcher::register_afpacket_handler(afpacket_handler&& h)
{
- m_tagpacket_handler = move(h);
+ m_afpacket_handler = std::move(h);
}
@@ -428,8 +431,6 @@ bool TagDispatcher::decode_tagpacket(const vector<uint8_t> &payload)
}
}
- m_tagpacket_handler(payload);
-
return success;
}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index c3e6c40..fdd7424 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -32,12 +32,14 @@
namespace EdiDecoder {
+constexpr size_t AFPACKET_HEADER_LEN = 10; // includes SYNC
+
struct frame_timestamp_t {
uint32_t seconds = 0;
uint32_t utco = 0;
uint32_t tsta = 0xFFFFFF; // According to EN 300 797 Annex B
- bool valid() const;
+ bool is_valid() const;
std::string to_string() const;
std::time_t to_unix_epoch() const;
std::chrono::system_clock::time_point to_system_clock() const;
@@ -133,9 +135,9 @@ class TagDispatcher {
*/
void register_tag(const std::string& tag, tag_handler&& h);
- /* The complete tagpacket can also be retrieved */
- using tagpacket_handler = std::function<void(const std::vector<uint8_t>&)>;
- void register_tagpacket_handler(tagpacket_handler&& h);
+ /* The complete AF packet can also be retrieved */
+ using afpacket_handler = std::function<void(std::vector<uint8_t>&&)>;
+ void register_afpacket_handler(afpacket_handler&& h);
seq_info_t get_seq_info() const {
return m_last_sequences;
@@ -160,7 +162,7 @@ class TagDispatcher {
std::vector<uint8_t> m_input_data;
std::map<std::string, tag_handler> m_handlers;
std::function<void()> m_af_packet_completed;
- tagpacket_handler m_tagpacket_handler;
+ afpacket_handler m_afpacket_handler;
std::vector<std::string> m_ignored_tags;
};