diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/dabInputZmq.cpp | 71 | ||||
-rw-r--r-- | src/dabInputZmq.h | 34 |
2 files changed, 90 insertions, 15 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp index 8bd351a..ab1aca1 100644 --- a/src/dabInputZmq.cpp +++ b/src/dabInputZmq.cpp @@ -7,6 +7,10 @@ ZeroMQ input. see www.zeromq.org for more info + For the AAC+ input, each zeromq message must contain one superframe. + + For the MPEG input, each zeromq message must contain one frame. + From the ZeroMQ manpage 'zmq': The 0MQ lightweight messaging kernel is a library which extends the standard @@ -98,10 +102,8 @@ int DabInputZmqBase::setBitrate(int bitrate) return bitrate; // TODO do a nice check here } -/******** AAC+ input ******/ - // size corresponds to a frame size. It is constant for a given bitrate -int DabInputZmqAAC::readFrame(void* buffer, int size) +int DabInputZmqBase::readFrame(void* buffer, int size) { int rc; @@ -139,6 +141,8 @@ int DabInputZmqAAC::readFrame(void* buffer, int size) * vs. AAC superframe alignment is preserved. * * TODO: of course this assumption probably doesn't hold. Fix this ! + * TODO: also, with MPEG, the above doesn't hold, so we drop five + * frames even though we could drop less. * */ m_frame_buffer.pop_front(); m_frame_buffer.pop_front(); @@ -186,8 +190,11 @@ int DabInputZmqAAC::readFrame(void* buffer, int size) } } -// Read a superframe from the socket, cut it into five frames, and push to list -int DabInputZmqAAC::readFromSocket(int framesize) + +/******** MPEG input *******/ + +// Read a MPEG frame from the socket, and push to list +int DabInputZmqMPEG::readFromSocket(int framesize) { int rc; bool messageReceived; @@ -202,12 +209,64 @@ int DabInputZmqAAC::readFromSocket(int framesize) } catch (zmq::error_t& err) { - etiLog.level(error) << "Failed to receive from zmq socket " << + etiLog.level(error) << "Failed to receive MPEG frame from zmq socket " << m_name << ": " << err.what(); } char* data = (char*)msg.data(); + if (msg.size() == framesize) + { + if (m_frame_buffer.size() > INPUT_ZMQ_MAX_BUFFER_SIZE) { + etiLog.level(warn) << + "inputZMQ " << m_name << + " buffer full (" << m_frame_buffer.size() << ")," + " dropping incoming frame !"; + messageReceived = 0; + } + else { + // copy the input frame blockwise into the frame_buffer + char* frame = new char[framesize]; + memcpy(frame, data, framesize); + m_frame_buffer.push_back(frame); + } + } + else { + etiLog.level(error) << + "inputZMQ " << m_name << + " wrong data size: recv'd " << msg.size() << + ", need " << framesize << "."; + } + + return msg.size(); +} + +/******** AAC+ input *******/ + +// Read a AAC+ superframe from the socket, cut it into five frames, +// and push to list +int DabInputZmqAAC::readFromSocket(int framesize) +{ + int rc; + bool messageReceived; + zmq::message_t msg; + + try { + messageReceived = m_zmq_sock.recv(&msg, ZMQ_DONTWAIT); + if (!messageReceived) { + return 0; + } + + } + catch (zmq::error_t& err) + { + etiLog.level(error) << + "Failed to receive AAC superframe from zmq socket " << + m_name << ": " << err.what(); + } + + char* data = (char*)msg.data(); + /* TS 102 563, Section 6: * Audio super frames are transported in five successive DAB logical frames * with additional error protection. diff --git a/src/dabInputZmq.h b/src/dabInputZmq.h index 97dd911..e3d6153 100644 --- a/src/dabInputZmq.h +++ b/src/dabInputZmq.h @@ -7,6 +7,10 @@ ZeroMQ input. see www.zeromq.org for more info + For the AAC+ input, each zeromq message must contain one superframe. + + For the MPEG input, each zeromq message must contain one frame. + From the ZeroMQ manpage 'zmq': The 0MQ lightweight messaging kernel is a library which extends the standard @@ -66,10 +70,11 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { : RemoteControllable(name), m_name(name), m_zmq_context(1), m_zmq_sock(m_zmq_context, ZMQ_SUB), - m_bitrate(0) { } + m_bitrate(0), m_prebuffering(INPUT_ZMQ_PREBUFFERING) { + } virtual int open(const std::string inputUri); - virtual int readFrame(void* buffer, int size) = 0; + virtual int readFrame(void* buffer, int size); virtual int setBitrate(int bitrate); virtual int close(); @@ -80,27 +85,38 @@ class DabInputZmqBase : public DabInputBase, public RemoteControllable { virtual string get_parameter(string parameter); protected: + virtual int readFromSocket(int framesize) = 0; + std::string m_name; zmq::context_t m_zmq_context; zmq::socket_t m_zmq_sock; // handle for the zmq socket int m_bitrate; + int m_prebuffering; + std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>] +}; + +class DabInputZmqMPEG : public DabInputZmqBase { + public: + DabInputZmqMPEG(const std::string name) + : DabInputZmqBase("MPEG " + name) { + RC_ADD_PARAMETER(buffer, + "Size of the input buffer [mpeg frames]"); + } + + private: + virtual int readFromSocket(int framesize); }; class DabInputZmqAAC : public DabInputZmqBase { public: DabInputZmqAAC(const std::string name) - : DabInputZmqBase(name), - m_prebuffering(INPUT_ZMQ_PREBUFFERING) { + : DabInputZmqBase("AAC+ " + name) { RC_ADD_PARAMETER(buffer, "Size of the input buffer [aac superframes]"); } - virtual int readFrame(void* buffer, int size); private: - int readFromSocket(int framesize); - - int m_prebuffering; - std::list<char*> m_frame_buffer; //stores elements of type char[<framesize>] + virtual int readFromSocket(int framesize); }; #endif // HAVE_INPUT_ZMQ |