summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/dabInputZmq.cpp71
-rw-r--r--src/dabInputZmq.h34
2 files changed, 90 insertions, 15 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index 8bd351a..ab1aca1 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -7,6 +7,10 @@
ZeroMQ input. see www.zeromq.org for more info
+ For the AAC+ input, each zeromq message must contain one superframe.
+
+ For the MPEG input, each zeromq message must contain one frame.
+
From the ZeroMQ manpage 'zmq':
The 0MQ lightweight messaging kernel is a library which extends the standard
@@ -98,10 +102,8 @@ int DabInputZmqBase::setBitrate(int bitrate)
return bitrate; // TODO do a nice check here
}
-/******** AAC+ input ******/
-
// size corresponds to a frame size. It is constant for a given bitrate
-int DabInputZmqAAC::readFrame(void* buffer, int size)
+int DabInputZmqBase::readFrame(void* buffer, int size)
{
int rc;
@@ -139,6 +141,8 @@ int DabInputZmqAAC::readFrame(void* buffer, int size)
* vs. AAC superframe alignment is preserved.
*
* TODO: of course this assumption probably doesn't hold. Fix this !
+ * TODO: also, with MPEG, the above doesn't hold, so we drop five
+ * frames even though we could drop less.
* */
m_frame_buffer.pop_front();
m_frame_buffer.pop_front();
@@ -186,8 +190,11 @@ int DabInputZmqAAC::readFrame(void* buffer, int size)
}
}
-// Read a superframe from the socket, cut it into five frames, and push to list
-int DabInputZmqAAC::readFromSocket(int framesize)
+
+/******** MPEG input *******/
+
+// Read a MPEG frame from the socket, and push to list
+int DabInputZmqMPEG::readFromSocket(int framesize)
{
int rc;
bool messageReceived;
@@ -202,12 +209,64 @@ int DabInputZmqAAC::readFromSocket(int framesize)
}
catch (zmq::error_t& err)
{
- etiLog.level(error) << "Failed to receive from zmq socket " <<
+ etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " <<
m_name << ": " << err.what();
}
char* data = (char*)msg.data();
+ if (msg.size() == framesize)
+ {
+ if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) {
+ etiLog.level(warn) <<
+ "inputZMQ " << m_name <<
+ " buffer full (" << m_frame_buffer.size() << "),"
+ " dropping incoming frame !";
+ messageReceived = 0;
+ }
+ else {
+ // copy the input frame blockwise into the frame_buffer
+ char* frame = new char[framesize];
+ memcpy(frame, data, framesize);
+ m_frame_buffer.push_back(frame);
+ }
+ }
+ else {
+ etiLog.level(error) <<
+ "inputZMQ " << m_name <<
+ " wrong data size: recv'd " << msg.size() <<
+ ", need " << framesize << ".";
+ }
+
+ return msg.size();
+}
+
+/******** AAC+ input *******/
+
+// Read a AAC+ superframe from the socket, cut it into five frames,
+// and push to list
+int DabInputZmqAAC::readFromSocket(int framesize)
+{
+ int rc;
+ bool messageReceived;
+ zmq::message_t msg;
+
+ try {
+ messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT);
+ if (!messageReceived) {
+ return 0;
+ }
+
+ }
+ catch (zmq::error_t& err)
+ {
+ etiLog.level(error) <<
+ "Failed to receive AAC superframe from zmq socket " <<
+ m_name << ": " << err.what();
+ }
+
+ char* data = (char*)msg.data();
+
/* TS 102 563, Section 6:
* Audio super frames are transported in five successive DAB logical frames
* with additional error protection.
diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h
index 97dd911..e3d6153 100644
--- a/src/dabInputZmq.h
+++ b/src/dabInputZmq.h
@@ -7,6 +7,10 @@
ZeroMQ input. see www.zeromq.org for more info
+ For the AAC+ input, each zeromq message must contain one superframe.
+
+ For the MPEG input, each zeromq message must contain one frame.
+
From the ZeroMQ manpage 'zmq':
The 0MQ lightweight messaging kernel is a library which extends the standard
@@ -66,10 +70,11 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
: RemoteControllable(name),
m_name(name), m_zmq_context(1),
m_zmq_sock(m_zmq_context, ZMQ_SUB),
- m_bitrate(0) { }
+ m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING) {
+ }
virtual int open(const std::string inputUri);
- virtual int readFrame(void* buffer, int size) = 0;
+ virtual int readFrame(void* buffer, int size);
virtual int setBitrate(int bitrate);
virtual int close();
@@ -80,27 +85,38 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable {
virtual string get_parameter(string parameter);
protected:
+ virtual int readFromSocket(int framesize) = 0;
+
std::string m_name;
zmq::context_t m_zmq_context;
zmq::socket_t m_zmq_sock; // handle for the zmq socket
int m_bitrate;
+ int m_prebuffering;
+ std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>]
+};
+
+class DabInputZmqMPEG : public DabInputZmqBase {
+ public:
+ DabInputZmqMPEG(const std::string name)
+ : DabInputZmqBase("MPEG " + name) {
+ RC_ADD_PARAMETER(buffer,
+ "Size of the input buffer [mpeg frames]");
+ }
+
+ private:
+ virtual int readFromSocket(int framesize);
};
class DabInputZmqAAC : public DabInputZmqBase {
public:
DabInputZmqAAC(const std::string name)
- : DabInputZmqBase(name),
- m_prebuffering(INPUT_ZMQ_PREBUFFERING) {
+ : DabInputZmqBase("AAC+ " + name) {
RC_ADD_PARAMETER(buffer,
"Size of the input buffer [aac superframes]");
}
- virtual int readFrame(void* buffer, int size);
private:
- int readFromSocket(int framesize);
-
- int m_prebuffering;
- std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>]
+ virtual int readFromSocket(int framesize);
};
#endif // HAVE_INPUT_ZMQ