diff options
Diffstat (limited to 'src/AVTInput.cpp')
-rw-r--r-- | src/AVTInput.cpp | 401 |
1 files changed, 130 insertions, 271 deletions
diff --git a/src/AVTInput.cpp b/src/AVTInput.cpp index ce65041..f59cddb 100644 --- a/src/AVTInput.cpp +++ b/src/AVTInput.cpp @@ -1,5 +1,6 @@ /* ------------------------------------------------------------------ * Copyright (C) 2017 AVT GmbH - Fabien Vercasson + * Copyright (C) 2019 Matthias P. Braendli * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,9 +24,6 @@ #include <limits.h> #include <algorithm> -#include "UdpSocket.h" -#include "OrderedQueue.h" -#include "AVTEDIInput.h" //#define PRINTF(fmt, A...) fprintf(stderr, fmt, ##A) #define PRINTF(x ...) @@ -34,161 +32,71 @@ #define DEBUG(X...) #define ERROR(fmt, A...) fprintf(stderr, "AVT: ERROR " fmt, ##A) -#define DEF_BR 64 #define MAX_AVT_FRAME_SIZE (1500) /* Max AVT MTU = 1472 */ #define MAX_PAD_FRAME_QUEUE_SIZE (6) -//#define DISTURB_INPUT +//#define DISTURB_INPUT // ETSI EN 300 797 V1.2.1 ch 8.2.1.2 uint8_t STI_FSync0[3] = { 0x1F, 0x90, 0xCA }; uint8_t STI_FSync1[3] = { 0xE0, 0x6F, 0x35 }; -// The enum values folown the AVT messages definitions. -enum { - AVT_Mono = 0, - AVT_Mono_SBR, - AVT_Stereo, - AVT_Stereo_SBR, - AVT_Stereo_SBR_PS -}; - -enum { - AVT_MonoMode_LR2 = 0, - AVT_MonoMode_L, - AVT_MonoMode_R -}; - -enum { - AVT_DAC_32 = 0, - AVT_DAC_48 -}; - -/* ------------------------------------------------------------------ - * - */ static void _dump(const uint8_t* buf, int size) { - for( int i = 0 ; i < size ; i ++) - { + for (int i = 0 ; i < size ; i++) { PRINTF("%02X ", buf[i]); if( (i+1) % 16 == 0 ) PRINTF("\n"); } - if( size % 16 != 0 ) PRINTF("\n"); + if (size % 16 != 0 ) PRINTF("\n"); } -/* ------------------------------------------------------------------ - * - */ static uint32_t unpack2(const uint8_t* buf) { - return( buf[0] << 8 | - buf[1]); + return (buf[0] << 8) | buf[1]; } -/* ------------------------------------------------------------------ - * - */ AVTInput::AVTInput(const std::string& input_uri, const std::string& output_uri, uint32_t pad_port, size_t jitterBufferSize) : _input_uri(input_uri), _output_uri(output_uri), _pad_port(pad_port), _jitterBufferSize(jitterBufferSize), - _input_socket(NULL), - _input_packet(NULL), - _output_socket(NULL), - _output_packet(NULL), - _input_pad_socket(NULL), - _input_pad_packet(NULL), - _ediInput(NULL), - _ordered(NULL), - _subChannelIndex(DEF_BR/8), - _bitRate(DEF_BR*1000), - _audioMode(AVT_Mono), - _monoMode(AVT_MonoMode_LR2), - _dac(AVT_DAC_48), - _dab24msFrameSize(DEF_BR*3), - _dummyFrameNumber(0), - _frameAlligned(false), - _currentFrame(NULL), - _currentFrameSize(0), - _nbFrames(0), - _nextFrameIndex(0), - _lastInfoFrameType(_typeCantExtract), - _lastInfoSize(0), - _infoNbFrame(0) -{ -} - -/* ------------------------------------------------------------------ - * - */ -AVTInput::~AVTInput() + _output_packet(2048), + _input_pad_packet(2048), + _ordered(5000, _jitterBufferSize), + _lastInfoFrameType(_typeCantExtract) { - delete _input_packet; - delete _input_socket; - delete _output_packet; - delete _output_socket; - delete _input_pad_packet; - delete _input_pad_socket; - delete _ediInput; - delete [] _currentFrame; - delete _ordered; - while (_padFrameQueue.size() > 0) { - std::vector<uint8_t>* frame = _padFrameQueue.front(); - _padFrameQueue.pop(); - delete frame; - } + } -/* ------------------------------------------------------------------ - * - */ int AVTInput::prepare(void) -{ - _input_socket = new UdpSocket(); - _input_packet = new UdpPacket(2048); - - if( !_output_uri.empty() ) - { - _output_socket = new UdpSocket(); - _output_packet = new UdpPacket(2048); - } - +{ UdpSocket::init(); INFO("Open input socket\n"); - int ret = _openSocketSrv(_input_socket, _input_uri.c_str()); + int ret = _openSocketSrv(&_input_socket, _input_uri.c_str()); if (ret == 0 && !_output_uri.empty()) { INFO("Open output socket\n"); - ret = _openSocketCli(_output_socket, _output_packet, _output_uri.c_str()); + ret = _openSocketCli(); } if ( ret == 0 && _pad_port > 0) { INFO("Open PAD Port %d\n", _pad_port); char uri[50]; sprintf(uri, "udp://:%d", _pad_port); - _input_pad_socket = new UdpSocket(); - _input_pad_packet = new UdpPacket(2048); - ret = _openSocketSrv(_input_pad_socket, uri); + ret = _openSocketSrv(&_input_pad_socket, uri); _purgeMessages(); } - - _ediInput = new AVTEDIInput(_jitterBufferSize*24/3); return ret; } -/* ------------------------------------------------------------------ - * - */ int AVTInput::setDabPlusParameters(int bitrate, int channels, int sample_rate, bool sbr, bool ps) { int ret = 0; - + _subChannelIndex = bitrate / 8; _bitRate = bitrate * 1000; _dab24msFrameSize = bitrate * 3; @@ -196,40 +104,36 @@ int AVTInput::setDabPlusParameters(int bitrate, int channels, int sample_rate, b ERROR("Bad bitrate for DAB+ (8..192)"); return 1; } - + if ( sample_rate != 48000 && sample_rate != 32000 ) { ERROR("Bad sample rate for DAB+ (32000,48000)"); return 1; } _dac = sample_rate == 48000 ? AVT_DAC_48 : AVT_DAC_32; - + if ( channels != 1 && channels != 2 ) { ERROR("Bad channel number for DAB+ (1,2)"); return 1; - } - _audioMode = + } + _audioMode = channels == 1 ? (sbr ? AVT_Mono_SBR : AVT_Mono) - : ( ps ? AVT_Stereo_SBR_PS : sbr ? AVT_Stereo_SBR : AVT_Stereo ); + : ( ps ? AVT_Stereo_SBR_PS : sbr ? AVT_Stereo_SBR : AVT_Stereo ); - delete _ordered; - _ordered = new OrderedQueue(5000, _jitterBufferSize); + _ordered = OrderedQueue(5000, _jitterBufferSize); - delete [] _currentFrame; - _currentFrame = new uint8_t[_subChannelIndex*8*5*3]; + _currentFrame.clear(); + _currentFrame.resize(_subChannelIndex*8*5*3); _currentFrameSize = 0; _nbFrames = 0; - _sendCtrlMessage(_output_socket, _output_packet); + _sendCtrlMessage(); return ret; } -/* ------------------------------------------------------------------ - * - */ bool AVTInput::_parseURI(const char* uri, std::string& address, long& port) -{ +{ // Skip the udp:// part if it is present if (strncmp(uri, "udp://", 6) == 0) { address = uri + 6; @@ -237,13 +141,13 @@ bool AVTInput::_parseURI(const char* uri, std::string& address, long& port) else { address = uri; } - + size_t pos = address.find(':'); if (pos == std::string::npos) { fprintf(stderr, "\"%s\" is an invalid format for udp address: " "should be [udp://][address]:port - > aborting\n", uri); - return false; + return false; } port = strtol(address.c_str()+pos+1, (char **)NULL, 10); @@ -253,28 +157,25 @@ bool AVTInput::_parseURI(const char* uri, std::string& address, long& port) uri); return false; } - + if ((port <= 0) || (port >= 65536)) { fprintf(stderr, "can't use port number %ld in udp address\n", port); return false; } address.resize(pos); - DEBUG("_parseURI <%s> -> <%s> : %ld\n", uri, address.c_str(), port); + DEBUG("_parseURI <%s> -> <%s> : %ld\n", uri, address.c_str(), port); return true; } -/* ------------------------------------------------------------------ - * From dabInputUdp::dabInputUdpOpen - */ int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri) { int returnCode = -1; - + std::string address; long port; - + if (_parseURI(uri, address, port)) { returnCode = 0; if (socket->create(port) == -1) { @@ -306,25 +207,25 @@ int AVTInput::_openSocketSrv(UdpSocket* socket, const char* uri) /* ------------------------------------------------------------------ * From ODR-dabMux DabOutputUdp::Open */ -int AVTInput::_openSocketCli(UdpSocket* socket, UdpPacket* packet, const char* uri) +int AVTInput::_openSocketCli() { std::string address; long port; - if (!_parseURI(uri, address, port)) { + if (!_parseURI(_output_uri.c_str(), address, port)) { return -1; } - if (packet->getAddress().setAddress(address.c_str()) == -1) { + if (_output_packet.getAddress().setAddress(address.c_str()) == -1) { fprintf(stderr, "Can't set address %s (%s: %s)\n", address.c_str(), inetErrDesc, inetErrMsg); return -1; } - packet->getAddress().setPort(port); + _output_packet.getAddress().setPort(port); - if (socket->create() == -1) { - fprintf(stderr, "Can't create UDP socket (%s: %s)\n", + if (_output_socket.create() == -1) { + fprintf(stderr, "Can't create UDP socket (%s: %s)\n", inetErrDesc, inetErrMsg); return -1; } @@ -340,49 +241,35 @@ ssize_t AVTInput::_read(uint8_t* buf, size_t size, bool onlyOnePacket) ssize_t nbBytes = 0; uint8_t* data = buf; + UdpPacket _input_packet(2048); - if (_input_packet->getLength() == 0) { - _input_socket->receive(*_input_packet); + if (_input_packet.getLength() == 0) { + _input_socket.receive(_input_packet); } while (nbBytes < size) { unsigned freeSize = size - nbBytes; - if (_input_packet->getLength() > freeSize) { + if (_input_packet.getLength() > freeSize) { // Not enought place in output - memcpy(&data[nbBytes], _input_packet->getData(), freeSize); + memcpy(&data[nbBytes], _input_packet.getData(), freeSize); nbBytes = size; - _input_packet->setOffset(_input_packet->getOffset() + freeSize); - } else { - unsigned length = _input_packet->getLength(); - memcpy(&data[nbBytes], _input_packet->getData(), length); + _input_packet.setOffset(_input_packet.getOffset() + freeSize); + } + else { + unsigned length = _input_packet.getLength(); + memcpy(&data[nbBytes], _input_packet.getData(), length); nbBytes += length; - _input_packet->setOffset(0); - - _input_socket->receive(*_input_packet); - if (_input_packet->getLength() == 0 || onlyOnePacket) { + _input_packet.setOffset(0); + + _input_socket.receive(_input_packet); + if (_input_packet.getLength() == 0 || onlyOnePacket) { break; } } } bzero(&data[nbBytes], size - nbBytes); - - return nbBytes; -} - -/* ------------------------------------------------------------------ - * - */ -bool AVTInput::_ediPushData(uint8_t* buf, size_t length) -{ - return _ediInput->pushData(buf, length); -} -/* ------------------------------------------------------------------ - * - */ -size_t AVTInput::_ediPopFrame(std::vector<uint8_t>& data, int32_t& frameNumber) -{ - return _ediInput->popFrame(data, frameNumber); + return nbBytes; } /* ------------------------------------------------------------------ @@ -402,7 +289,7 @@ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size, { const uint8_t* data = NULL; uint32_t index = 0; - + bool error = !_isSTI(buf+index); bool rtp = false; @@ -426,12 +313,12 @@ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size, index += 2; //uint32_t CFS = unpack2(buf+index); index += 2; - + // FC index += 5; uint32_t DFCTL = buf[index]; index += 1; - uint32_t DFCTH = buf[index] >> 3; + uint32_t DFCTH = buf[index] >> 3; uint32_t NST = unpack2(buf+index) & 0x7FF; // 11 bits index += 2; @@ -443,14 +330,14 @@ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size, data = buf+index; dataSize = STL - 2*CRCSTF; - frameNumber = DFCTH*250 + DFCTL; - + frameNumber = DFCTH*250 + DFCTL; + _info(rtp?_typeSTIRTP:_typeSTI, dataSize); } else error = true; } if( error ) ERROR("Nothing detected\n"); - + return data; } @@ -471,25 +358,25 @@ const uint8_t* AVTInput::_findDABFrameFromUDP(const uint8_t* buf, size_t size, * * 0 = ( Left + Right ) / 2 * * 1 = Left * * 2 = Right - */ -void AVTInput::_sendCtrlMessage(UdpSocket* socket, UdpPacket* packet) + */ +void AVTInput::_sendCtrlMessage() { if (!_output_uri.empty()) { uint8_t data[50]; uint32_t index = 0; - + data[index++] = 0xFD; data[index++] = 0x07; data[index++] = _subChannelIndex; data[index++] = _audioMode; data[index++] = _dac; data[index++] = _monoMode; - - packet->setOffset(0); - packet->setLength(0); - packet->addData(data, index); - socket->send(*packet); - + + _output_packet.setOffset(0); + _output_packet.setLength(0); + _output_packet.addData(data, index); + _output_socket.send(_output_packet); + INFO("Send control packet to encoder\n"); } } @@ -506,28 +393,25 @@ void AVTInput::_sendCtrlMessage(UdpSocket* socket, UdpPacket* packet) void AVTInput::_sendPADFrame(UdpPacket* packet) { if (packet && _padFrameQueue.size() > 0) { - std::vector<uint8_t>* frame = _padFrameQueue.front(); - frame = _padFrameQueue.front(); + std::vector<uint8_t> frame(move(_padFrameQueue.front())); _padFrameQueue.pop(); - + uint8_t data[500]; uint32_t index = 0; - + data[index++] = 0xFD; data[index++] = 0x18; - data[index++] = frame->size()+2; + data[index++] = frame.size()+2; data[index++] = 0xAD; - data[index++] = frame->size(); - memcpy( data+index, frame->data(), frame->size()); - index += frame->size(); + data[index++] = frame.size(); + memcpy( data+index, frame.data(), frame.size()); + index += frame.size(); packet->setOffset(0); packet->setLength(0); packet->addData(data, index); - _input_pad_socket->send(*packet); - - delete frame; + _input_pad_socket.send(*packet); } } @@ -557,18 +441,17 @@ bool AVTInput::_checkMessage() { bool dataRecevied = false; - if (_input_pad_socket) { - if (_input_pad_packet->getLength() == 0) { - _input_pad_socket->receive(*_input_pad_packet); - } + if (_input_pad_packet.getLength() == 0) { + _input_pad_socket.receive(_input_pad_packet); + } - if (_input_pad_packet->getLength() > 0) { - _interpretMessage((uint8_t*)_input_pad_packet->getData(), _input_pad_packet->getLength(), _input_pad_packet); - _input_pad_packet->setOffset(0); - _input_pad_socket->receive(*_input_pad_packet); + if (_input_pad_packet.getLength() > 0) { + _interpretMessage((uint8_t*)_input_pad_packet.getData(), _input_pad_packet.getLength(), + &_input_pad_packet); + _input_pad_packet.setOffset(0); + _input_pad_socket.receive(_input_pad_packet); - dataRecevied = true; - } + dataRecevied = true; } return dataRecevied; @@ -579,25 +462,23 @@ bool AVTInput::_checkMessage() */ void AVTInput::_purgeMessages() { - if (_input_pad_socket) { - bool dataRecevied; - int nb = 0; - do { - dataRecevied = false; - if (_input_pad_packet->getLength() == 0) { - _input_pad_socket->receive(*_input_pad_packet); - } + bool dataRecevied; + int nb = 0; + do { + dataRecevied = false; + if (_input_pad_packet.getLength() == 0) { + _input_pad_socket.receive(_input_pad_packet); + } - if (_input_pad_packet->getLength() > 0) { - nb++; - _input_pad_packet->setOffset(0); - _input_pad_socket->receive(*_input_pad_packet); + if (_input_pad_packet.getLength() > 0) { + nb++; + _input_pad_packet.setOffset(0); + _input_pad_socket.receive(_input_pad_packet); - dataRecevied = true; - } - } while (dataRecevied); - if (nb>0) DEBUG("%d messages purged\n", nb); - } + dataRecevied = true; + } + } while (dataRecevied); + if (nb>0) DEBUG("%d messages purged\n", nb); } @@ -611,42 +492,33 @@ bool AVTInput::_readFrame() uint8_t readBuf[MAX_AVT_FRAME_SIZE]; int32_t frameNumber; const uint8_t* dataPtr = NULL; - size_t dataSize = 0; + size_t dataSize = 0; std::vector<uint8_t> data; size_t readBytes = _read(readBuf, sizeof(readBuf), true/*onlyOnePacket*/); if (readBytes > 0) { dataRecevied = true; - - if (_ediPushData(readBuf, readBytes)) { - dataSize = _ediPopFrame(data, frameNumber); - if (dataSize>0) { - dataPtr = data.data(); - _info(_typeEDI, dataSize); - } - } else { - if (readBytes > _dab24msFrameSize) { - // Extract frame data and frame number from buf - dataPtr = _findDABFrameFromUDP(readBuf, readBytes, frameNumber, dataSize); - } -// if (!data) { -// // Assuming pure RAW data -// data = buf; -// dataSize = _dab24msFrameSize; -// frameNumber = _dummyFrameNumber++; -// } - if (!dataPtr) { - _info(_typeCantExtract, 0); - } + + if (readBytes > _dab24msFrameSize) { + // Extract frame data and frame number from buf + dataPtr = _findDABFrameFromUDP(readBuf, readBytes, frameNumber, dataSize); + } +// if (!data) { +// // Assuming pure RAW data +// data = buf; +// dataSize = _dab24msFrameSize; +// frameNumber = _dummyFrameNumber++; +// } + if (!dataPtr) { + _info(_typeCantExtract, 0); } if (dataPtr) { - if (dataSize == _dab24msFrameSize ) { - if( _frameAlligned || frameNumber%5 == 0) - { + if (dataSize == _dab24msFrameSize ) { + if( _frameAligned || frameNumber%5 == 0) { #if defined(DISTURB_INPUT) // Duplicate a frame - if(frameNumber%250==0) _ordered->push(frameNumber, dataPtr, dataSize); + if(frameNumber%250==0) _ordered.push(frameNumber, dataPtr, dataSize); // Invert 2 frames (content inverted, audio distrubed by this test)) if( frameNumber % 200 == 0) frameNumber += 10; @@ -655,8 +527,8 @@ bool AVTInput::_readFrame() // Remove a frame (audio distrubed, frame missing) if(frameNumber%300 > 5) #endif - _ordered->push(frameNumber, dataPtr, dataSize); - _frameAlligned = true; + _ordered.push(frameNumber, dataPtr, dataSize); + _frameAligned = true; } } else ERROR("Wrong frame size from encoder %zu != %zu\n", dataSize, _dab24msFrameSize); @@ -674,27 +546,27 @@ ssize_t AVTInput::getNextFrame(std::vector<uint8_t> &buf) ssize_t nbBytes = 0; //printf("A: _padFrameQueue size=%zu\n", _padFrameQueue.size()); - + // Read all messages from encoder (in priority) // Read all available frames from input socket while (_checkMessage() || _readFrame() ); //printf("B: _padFrameQueue size=%zu\n", _padFrameQueue.size()); - + // Assemble next frame int32_t nb = 0; - std::vector<uint8_t> part; - while (_nbFrames < 5 && (nb = _ordered->pop(part)) != 0) + std::vector<uint8_t> part; + while (_nbFrames < 5 && (nb = _ordered.pop(part)) != 0) { while (_checkMessage()); - memcpy(_currentFrame+_currentFrameSize, part.data(), nb); + memcpy(_currentFrame.data() + _currentFrameSize, part.data(), nb); _currentFrameSize += nb; _nbFrames ++; } - if (_nbFrames == 5 && _currentFrameSize <= buf.size()) { - memcpy(&buf[0], _currentFrame, _currentFrameSize); + if (_nbFrames == 5 && _currentFrameSize <= buf.size()) { + memcpy(&buf[0], _currentFrame.data(), _currentFrameSize); nbBytes = _currentFrameSize; _currentFrameSize = 0; _nbFrames = 0; @@ -713,20 +585,10 @@ void AVTInput::pushPADFrame(const uint8_t* buf, size_t size) if (_pad_port == 0) { return; } - - std::vector<uint8_t>* frame; - -// while (_padFrameQueue.size() > MAX_PAD_FRAME_QUEUE_SIZE) { -// frame = _padFrameQueue.front(); -// _padFrameQueue.pop(); -// delete frame; -// ERROR("Drop one PAD Frame\n"); -// } if (size > 0) { - frame = new std::vector<uint8_t>(size); - memcpy(frame->data(), buf, size); - std::reverse(frame->begin(), frame->end()); + std::vector<uint8_t> frame(size); + std::reverse_copy(buf, buf + size, frame.begin()); _padFrameQueue.push(frame); } } @@ -746,17 +608,14 @@ void AVTInput::_info(_frameType type, size_t size) { if (_lastInfoFrameType != type || _lastInfoSize != size) { switch (type) { - case _typeEDI: - INFO("Extracting from EDI frames of size %zu\n", size); - break; case _typeSTI: INFO("Extracting from UDP/STI frames of size %zu\n", size); - break; + break; case _typeSTIRTP: INFO("Extracting from UDP/RTP/STI frames of size %zu\n", size); - break; + break; case _typeCantExtract: - ERROR("Can't extract data from encoder frame\n"); + ERROR("Can't extract data from encoder frame\n"); break; } _lastInfoFrameType = type; |