aboutsummaryrefslogtreecommitdiffstats
path: root/src/input/Udp.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/input/Udp.cpp')
-rw-r--r--src/input/Udp.cpp89
1 files changed, 34 insertions, 55 deletions
diff --git a/src/input/Udp.cpp b/src/input/Udp.cpp
index 2cb49e7..a37ee21 100644
--- a/src/input/Udp.cpp
+++ b/src/input/Udp.cpp
@@ -38,7 +38,7 @@ using namespace std;
namespace Inputs {
-int Udp::open(const std::string& name)
+void Udp::open(const std::string& name)
{
// Skip the udp:// part if it is present
const string endpoint = (name.substr(0, 6) == "udp://") ?
@@ -57,8 +57,6 @@ int Udp::open(const std::string& name)
m_name = name;
openUdpSocket(endpoint);
-
- return 0;
}
void Udp::openUdpSocket(const std::string& endpoint)
@@ -82,61 +80,50 @@ void Udp::openUdpSocket(const std::string& endpoint)
throw out_of_range("can't use port number 0 in udp address");
}
- if (m_sock.reinit(port, address) == -1) {
- stringstream ss;
- ss << "Could not init UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (m_sock.setBlocking(false) == -1) {
- stringstream ss;
- ss << "Could not set non-blocking UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ m_sock.reinit(port, address);
+ m_sock.setBlocking(false);
etiLog.level(info) << "Opened UDP port " << address << ":" << port;
}
-int Udp::readFrame(uint8_t* buffer, size_t size)
+size_t Udp::readFrame(uint8_t *buffer, size_t size)
{
// Regardless of buffer contents, try receiving data.
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
-
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
+ auto packet = m_sock.receive(32768);
- std::copy(packet.getData(), packet.getData() + packet.getSize(),
- back_inserter(m_buffer));
+ std::copy(packet.buffer.cbegin(), packet.buffer.cend(), back_inserter(m_buffer));
// Take data from the buffer if it contains enough data,
// in any case write the buffer
if (m_buffer.size() >= (size_t)size) {
std::copy(m_buffer.begin(), m_buffer.begin() + size, buffer);
+ return size;
}
else {
memset(buffer, 0x0, size);
+ return 0;
}
+}
- return size;
+size_t Udp::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ // Maybe there's a way to carry timestamps, but we don't need it.
+ memset(buffer, 0x0, size);
+ return 0;
}
int Udp::setBitrate(int bitrate)
{
if (bitrate <= 0) {
- etiLog.log(error, "Invalid bitrate (%i)\n", bitrate);
- return -1;
+ throw invalid_argument("Invalid bitrate " + to_string(bitrate) + " for " + m_name);
}
return bitrate;
}
-int Udp::close()
+void Udp::close()
{
- return m_sock.close();
+ m_sock.close();
}
@@ -165,10 +152,10 @@ static uint16_t unpack2(const uint8_t *buf)
return (((uint16_t)buf[0]) << 8) | buf[1];
}
-int Sti_d_Rtp::open(const std::string& name)
+void Sti_d_Rtp::open(const std::string& name)
{
- // Skip the sti-rtp:// part if it is present
- const string endpoint = (name.substr(0, 10) == "sti-rtp://") ?
+ // Skip the rtp:// part if it is present
+ const string endpoint = (name.substr(0, 10) == "rtp://") ?
name.substr(10) : name;
// The endpoint should be address:port
@@ -176,43 +163,34 @@ int Sti_d_Rtp::open(const std::string& name)
if (colon_pos == string::npos) {
stringstream ss;
ss << "'" << name <<
- " is an invalid format for sti-rtp address: "
- "expected [sti-rtp://]address:port";
+ " is an invalid format for rtp address: "
+ "expected [rtp://]address:port";
throw invalid_argument(ss.str());
}
m_name = name;
openUdpSocket(endpoint);
-
- return 0;
}
void Sti_d_Rtp::receive_packet()
{
- UdpPacket packet(32768);
- int ret = m_sock.receive(packet);
+ auto packet = m_sock.receive(32768);
- if (ret == -1) {
- stringstream ss;
- ss << "Could not read from UDP socket: " << inetErrMsg;
- throw runtime_error(ss.str());
- }
-
- if (packet.getSize() == 0) {
+ if (packet.buffer.empty()) {
// No packet was received
return;
}
const size_t STI_FC_LEN = 8;
- if (packet.getSize() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
+ if (packet.buffer.size() < RTP_HEADER_LEN + STI_SYNC_LEN + STI_FC_LEN) {
etiLog.level(info) << "Received too small RTP packet for " <<
m_name;
return;
}
- if (not rtpHeaderValid(packet.getData())) {
+ if (not rtpHeaderValid(packet.buffer.data())) {
etiLog.level(info) << "Received invalid RTP header for " <<
m_name;
return;
@@ -220,7 +198,7 @@ void Sti_d_Rtp::receive_packet()
// STI(PI, X)
size_t index = RTP_HEADER_LEN;
- const uint8_t *buf = packet.getData();
+ const uint8_t *buf = packet.buffer.data();
// SYNC
index++; // Advance over STAT
@@ -242,7 +220,7 @@ void Sti_d_Rtp::receive_packet()
m_name;
return;
}
- if (packet.getSize() < index + DFS) {
+ if (packet.buffer.size() < index + DFS) {
etiLog.level(info) << "Received STI too small for given DFS for " <<
m_name;
return;
@@ -270,9 +248,9 @@ void Sti_d_Rtp::receive_packet()
uint16_t NST = unpack2(buf+index) & 0x7FF; // 11 bits
index += 2;
- if (packet.getSize() < index + 4*NST) {
+ if (packet.buffer.size() < index + 4*NST) {
etiLog.level(info) << "Received STI too small to contain NST for " <<
- m_name << " packet: " << packet.getSize() << " need " <<
+ m_name << " packet: " << packet.buffer.size() << " need " <<
index + 4*NST;
return;
}
@@ -307,7 +285,7 @@ void Sti_d_Rtp::receive_packet()
}
}
-int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
+size_t Sti_d_Rtp::readFrame(uint8_t *buffer, size_t size)
{
// Make sure we fill faster than we consume in case there
// are pending packets.
@@ -316,19 +294,20 @@ int Sti_d_Rtp::readFrame(uint8_t* buffer, size_t size)
if (m_queue.empty()) {
memset(buffer, 0x0, size);
+ return 0;
}
else if (m_queue.front().size() != size) {
etiLog.level(warn) << "Invalid input data size for STI " << m_name <<
" : RX " << m_queue.front().size() << " expected " << size;
memset(buffer, 0x0, size);
m_queue.pop_front();
+ return 0;
}
else {
copy(m_queue.front().begin(), m_queue.front().end(), buffer);
m_queue.pop_front();
+ return size;
}
-
- return 0;
}
}