summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2019-09-23 14:44:45 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2019-09-23 14:44:45 +0200
commit907534bccee046c0499e9f936873c229b247ef95 (patch)
tree07f6995d0c8614f80a920f86660809b2fbd79b2b /src
parent015427d9e74f34dc7d0f7fbad4ad1eaad6537cce (diff)
downloaddabmux-907534bccee046c0499e9f936873c229b247ef95.tar.gz
dabmux-907534bccee046c0499e9f936873c229b247ef95.tar.bz2
dabmux-907534bccee046c0499e9f936873c229b247ef95.zip
Add buffermanagement setting to RC
Diffstat (limited to 'src')
-rw-r--r--src/ConfigParser.cpp8
-rw-r--r--src/DabMultiplexer.cpp12
-rw-r--r--src/MuxElements.cpp11
-rw-r--r--src/MuxElements.h12
-rw-r--r--src/input/Edi.cpp91
-rw-r--r--src/input/Edi.h25
-rw-r--r--src/input/inputs.h15
7 files changed, 130 insertions, 44 deletions
diff --git a/src/ConfigParser.cpp b/src/ConfigParser.cpp
index 2fb375b..d45cfef 100644
--- a/src/ConfigParser.cpp
+++ b/src/ConfigParser.cpp
@@ -956,7 +956,9 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
}
}
else if (proto == "edi") {
- subchan->input = make_shared<Inputs::Edi>(subchanuid);
+ auto inedi = make_shared<Inputs::Edi>(subchanuid);
+ rcs.enrol(inedi.get());
+ subchan->input = inedi;
}
else if (proto == "stp") {
subchan->input = make_shared<Inputs::Sti_d_Rtp>();
@@ -1023,10 +1025,10 @@ static void setup_subchannel_from_ptree(shared_ptr<DabSubchannel>& subchan,
const string bufferManagement = pt.get("buffer-management", "prebuffering");
if (bufferManagement == "prebuffering") {
- subchan->bufferManagement = BufferManagement::Prebuffering;
+ subchan->input->setBufferManagement(Inputs::BufferManagement::Prebuffering);
}
else if (bufferManagement == "timestamped") {
- subchan->bufferManagement = BufferManagement::Timestamped;
+ subchan->input->setBufferManagement(Inputs::BufferManagement::Timestamped);
}
else {
throw runtime_error("Subchannel with uid " + subchanuid + " has invalid buffer-management !");
diff --git a/src/DabMultiplexer.cpp b/src/DabMultiplexer.cpp
index 549850a..9bd3c3f 100644
--- a/src/DabMultiplexer.cpp
+++ b/src/DabMultiplexer.cpp
@@ -592,17 +592,9 @@ void DabMultiplexer::mux_frame(std::vector<std::shared_ptr<DabOutput> >& outputs
edi::TagESTn& tag = edi_subchannelToTag[subchannel.get()];
int sizeSubchannel = subchannel->getSizeByte();
- int result = -1;
- switch (subchannel->bufferManagement) {
- case BufferManagement::Prebuffering:
- result = subchannel->input->readFrame(&etiFrame[index], sizeSubchannel);
- break;
- case BufferManagement::Timestamped:
- // no need to check enableTist because we always increment the timestamp
- result = subchannel->input->readFrame(&etiFrame[index],
+ // no need to check enableTist because we always increment the timestamp
+ int result = subchannel->readFrame(&etiFrame[index],
sizeSubchannel, edi_time + m_tist_offset, tai_utc_offset, timestamp);
- break;
- }
if (result < 0) {
etiLog.log(info,
diff --git a/src/MuxElements.cpp b/src/MuxElements.cpp
index ad1fcd4..81466a8 100644
--- a/src/MuxElements.cpp
+++ b/src/MuxElements.cpp
@@ -784,6 +784,17 @@ unsigned short DabSubchannel::getSizeDWord() const
return (bitrate * 3) >> 3;
}
+size_t DabSubchannel::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta)
+{
+ switch (input->getBufferManagement()) {
+ case Inputs::BufferManagement::Prebuffering:
+ return input->readFrame(buffer, size);
+ case Inputs::BufferManagement::Timestamped:
+ return input->readFrame(buffer, size, seconds, utco, tsta);
+ }
+ throw logic_error("Unhandled case");
+}
+
LinkageSet::LinkageSet(const std::string& name,
uint16_t lsn,
bool active,
diff --git a/src/MuxElements.h b/src/MuxElements.h
index f691093..0f7e621 100644
--- a/src/MuxElements.h
+++ b/src/MuxElements.h
@@ -338,14 +338,6 @@ struct dabProtection {
};
};
-enum class BufferManagement {
- // Use a buffer in the input that doesn't consider timestamps
- Prebuffering,
-
- // Buffer incoming data until a given timestamp is reached
- Timestamped,
-};
-
class DabSubchannel
{
public:
@@ -364,10 +356,12 @@ public:
// Calculate subchannel size in number of uint64_t
unsigned short getSizeDWord(void) const;
+ // Read from the input, using the correct buffer management
+ size_t readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utco, uint32_t tsta);
+
std::string uid;
std::string inputUri;
- BufferManagement bufferManagement = BufferManagement::Prebuffering;
std::shared_ptr<Inputs::InputBase> input;
unsigned char id = 0;
subchannel_type_t type = subchannel_type_t::DABAudio;
diff --git a/src/input/Edi.cpp b/src/input/Edi.cpp
index f6d4c07..98e8e9c 100644
--- a/src/input/Edi.cpp
+++ b/src/input/Edi.cpp
@@ -44,27 +44,23 @@ namespace Inputs {
constexpr bool VERBOSE = false;
constexpr size_t TCP_BLOCKSIZE = 2048;
-/* Absolute max number of frames to be queued, both with and without timestamping */
-constexpr size_t MAX_FRAMES_QUEUED = 1000;
-
-/* When using timestamping, start discarding the front of the queue once the queue
- * is this full. Must be smaller than MAX_FRAMES_QUEUED. */
-constexpr size_t MAX_FRAMES_QUEUED_PREBUFFERING = 500;
-
-/* When not using timestamping, how many frames to prebuffer.
- * TODO should be configurable as ZMQ. */
-constexpr size_t NUM_FRAMES_PREBUFFERING = 10;
-/* Consider the buffer to be full on the receive side if it's above the overrun threshold. */
-constexpr size_t MAX_FRAMES_OVERRUN = 900;
-
-
Edi::Edi(const std::string& name) :
+ RemoteControllable(name),
m_tcp_receive_server(TCP_BLOCKSIZE),
m_sti_writer(),
m_sti_decoder(m_sti_writer, VERBOSE),
m_name(name),
m_stats(name)
-{ }
+{
+ RC_ADD_PARAMETER(buffermanagement,
+ "Set type of buffer management to use [prebuffering, timestamped]");
+
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [24ms frames]");
+
+ RC_ADD_PARAMETER(prebuffering,
+ "Min buffer level before streaming starts [24ms frames]");
+}
Edi::~Edi() {
m_running = false;
@@ -116,7 +112,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
EdiDecoder::sti_frame_t sti;
if (m_is_prebuffering) {
- m_is_prebuffering = m_frames.size() < NUM_FRAMES_PREBUFFERING;
+ m_is_prebuffering = m_frames.size() < m_num_frames_prebuffering;
if (not m_is_prebuffering) {
etiLog.level(info) << "EDI input " << m_name << " pre-buffering complete.";
}
@@ -153,13 +149,13 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size)
}
else if (sti.frame.size() == size) {
// Steady-state when everything works well
- if (m_frames.size() > MAX_FRAMES_OVERRUN) {
+ if (m_frames.size() > m_max_frames_overrun) {
m_stats.notifyOverrun();
/* If the buffer is too full, we drop as many frames as needed
* to get down to the prebuffering size. We would like to have our buffer
* filled to the prebuffering length. */
- size_t over_max = m_frames.size() - NUM_FRAMES_PREBUFFERING;
+ size_t over_max = m_frames.size() - m_num_frames_prebuffering;
while (over_max--) {
EdiDecoder::sti_frame_t discard;
@@ -234,7 +230,7 @@ size_t Edi::readFrame(uint8_t *buffer, size_t size, std::time_t seconds, int utc
else {
// Wait more, but erase the front of the frame queue to avoid
// stalling on one frame with incorrect timestamp
- if (m_frames.size() >= MAX_FRAMES_QUEUED_PREBUFFERING) {
+ if (m_frames.size() >= m_max_frames_overrun) {
m_pending_sti_frame.frame.clear();
}
m_stats.notifyUnderrun();
@@ -338,7 +334,10 @@ void Edi::m_run()
const auto sti = m_sti_writer.getFrame();
if (not sti.frame.empty()) {
- m_frames.push_wait_if_full(move(sti), MAX_FRAMES_QUEUED);
+ // We should not wait here, because we want the complete input buffering
+ // happening inside m_frames. Using the blocking function is only a protection
+ // against runaway memory usage if something goes wrong in the consumer.
+ m_frames.push_wait_if_full(move(sti), m_max_frames_overrun * 2);
work_done = true;
}
@@ -363,4 +362,56 @@ void Edi::close()
m_udp_sock.close();
}
+
+void Edi::set_parameter(const std::string& parameter, const std::string& value)
+{
+ if (parameter == "buffer") {
+ size_t new_limit = atol(value.c_str());
+ m_max_frames_overrun = new_limit;
+ }
+ else if (parameter == "prebuffering") {
+ size_t new_limit = atol(value.c_str());
+ m_num_frames_prebuffering = new_limit;
+ }
+ else if (parameter == "buffermanagement") {
+ if (value == "prebuffering") {
+ setBufferManagement(Inputs::BufferManagement::Prebuffering);
+ }
+ else if (value == "timestamped") {
+ setBufferManagement(Inputs::BufferManagement::Timestamped);
+ }
+ else {
+ throw ParameterError("Invalid value for '" + parameter + "' in controllable " + get_rc_name());
+ }
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+}
+
+const std::string Edi::get_parameter(const std::string& parameter) const
+{
+ stringstream ss;
+ if (parameter == "buffer") {
+ ss << m_max_frames_overrun;
+ }
+ else if (parameter == "prebuffering") {
+ ss << m_num_frames_prebuffering;
+ }
+ else if (parameter == "buffermanagement") {
+ switch (getBufferManagement()) {
+ case Inputs::BufferManagement::Prebuffering:
+ ss << "prebuffering";
+ break;
+ case Inputs::BufferManagement::Timestamped:
+ ss << "Timestamped";
+ break;
+ }
+ }
+ else {
+ throw ParameterError("Parameter '" + parameter + "' is not exported by controllable " + get_rc_name());
+ }
+ return ss.str();
+}
+
}
diff --git a/src/input/Edi.h b/src/input/Edi.h
index 56bbf65..0a44139 100644
--- a/src/input/Edi.h
+++ b/src/input/Edi.h
@@ -46,7 +46,7 @@ namespace Inputs {
*
* This way, the EDI decoding happens in a separate thread.
*/
-class Edi : public InputBase {
+class Edi : public InputBase, public RemoteControllable {
public:
Edi(const std::string& name);
Edi(const Edi&) = delete;
@@ -59,6 +59,10 @@ class Edi : public InputBase {
virtual int setBitrate(int bitrate);
virtual void close();
+ /* Remote control */
+ virtual void set_parameter(const std::string& parameter, const std::string& value);
+ virtual const std::string get_parameter(const std::string& parameter) const;
+
protected:
void m_run();
@@ -75,12 +79,29 @@ class Edi : public InputBase {
std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
ThreadsafeQueue<EdiDecoder::sti_frame_t> m_frames;
+ std::mutex m_rc_params_mutex;
+ // InputBase defines bufferManagement, which must also be guarded by that mutex
+
// Used in timestamp-based buffer management
EdiDecoder::sti_frame_t m_pending_sti_frame;
- // Used in prebuffering-based buffer management
+ // State variable used in prebuffering-based buffer management
bool m_is_prebuffering = true;
+ /* When using prebuffering, consider the buffer to be full on the
+ * receive side if it's above the overrun threshold.
+ *
+ * When using timestamping, start discarding the front of the queue once the queue
+ * is this full. Must be smaller than m_max_frames_queued.
+ *
+ * Parameter 'buffer' inside RC. */
+ std::atomic<size_t> m_max_frames_overrun = ATOMIC_VAR_INIT(1000);
+
+ /* When not using timestamping, how many frames to prebuffer.
+ * Parameter 'prebuffering' inside RC. */
+ std::atomic<size_t> m_num_frames_prebuffering = ATOMIC_VAR_INIT(10);
+
+
std::string m_name;
InputStat m_stats;
};
diff --git a/src/input/inputs.h b/src/input/inputs.h
index b99a88f..5d4fc60 100644
--- a/src/input/inputs.h
+++ b/src/input/inputs.h
@@ -35,6 +35,15 @@
namespace Inputs {
+enum class BufferManagement {
+ // Use a buffer in the input that doesn't consider timestamps
+ Prebuffering,
+
+ // Buffer incoming data until a given timestamp is reached
+ Timestamped,
+};
+
+
/* New input object base */
class InputBase {
public:
@@ -73,8 +82,14 @@ class InputBase {
virtual void close() = 0;
virtual ~InputBase() {}
+
+ void setBufferManagement(BufferManagement bm) { m_bufferManagement = bm; }
+ BufferManagement getBufferManagement() const { return m_bufferManagement; }
+
protected:
InputBase() {}
+
+ std::atomic<BufferManagement> m_bufferManagement = ATOMIC_VAR_INIT(BufferManagement::Prebuffering);
};
};