diff options
Diffstat (limited to 'src/dabInputSlip.cpp')
-rw-r--r-- | src/dabInputSlip.cpp | 408 |
1 files changed, 408 insertions, 0 deletions
diff --git a/src/dabInputSlip.cpp b/src/dabInputSlip.cpp new file mode 100644 index 0000000..7663d2c --- /dev/null +++ b/src/dabInputSlip.cpp @@ -0,0 +1,408 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + */ +/* + This file is part of CRC-DabMux. + + CRC-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>. + */ + +#include "dabInputSlip.h" +#include "dabInputFifo.h" +#include "TcpServer.h" +#include "UdpSocket.h" +#include "bridge.h" + +#include <signal.h> +#include <string.h> +#include <limits.h> + + +#ifdef _WIN32 +# include <io.h> + +# define sem_t HANDLE +# define O_NONBLOCK 0 +#else +# include <semaphore.h> +# define O_BINARY 0 +#endif + + +struct dabInputSlipData { + TcpServer* server; + UdpPacket** packets; + UdpPacket* buffer; + bridgeInfo* info; + dabInputFifoStats stats; + pthread_t thread; + sem_t semWrite; + sem_t semQueue; + bool reading; + volatile int nbPackets; + volatile int queueSize; + volatile int packetSize; +}; + + +struct dabInputOperations dabInputSlipOperations = { + dabInputSlipInit, + dabInputSlipOpen, + dabInputSlipSetbuf, + dabInputSlipRead, + NULL, + NULL, + dabInputSlipReadFrame, + dabInputSetbitrate, + dabInputSlipClose, + dabInputSlipClean, + NULL +}; + + +int dabInputSlipInit(void** args) +{ + dabInputSlipData* data = new dabInputSlipData; + memset(&data->stats, 0, sizeof(data->stats)); + data->stats.id = dabInputFifoData::nb++; + data->server = new TcpServer(); + data->packetSize = 1500; + data->queueSize = 10; + data->packets = new UdpPacket*[data->queueSize]; + for (int i = 0; i < data->queueSize; ++i) { + data->packets[i] = new UdpPacket(data->packetSize); + } + data->buffer = new UdpPacket(data->packetSize); + data->nbPackets = 0; + data->info = new bridgeInfo; + data->thread = (pthread_t)NULL; + bridgeInitInfo(data->info); + +#ifdef _WIN32 + char semName[32]; + sprintf(semName, "semWrite%i", data->stats.id); + data->semWrite = CreateSemaphore(NULL, 1, 1, semName); + if (data->semWrite == NULL) { + fprintf(stderr, "Can't init SLIP data write semaphore %s\n", semName); + return -1; + } + sprintf(semName, "semQueue%i", data->stats.id); + data->semQueue = CreateSemaphore(NULL, 1, 1, semName); + if (data->semQueue == NULL) { + fprintf(stderr, "Can't init SLIP data index semaphore %s\n", semName); + return -1; + } +#else + if (sem_init(&data->semWrite, 0, data->queueSize) == -1) { + perror("Can't init SLIP data write semaphore"); + return -1; + } + if (sem_init(&data->semQueue, 0, 1) == -1) { + perror("Can't init SLIP data index semaphore"); + return -1; + } +#endif + data->reading = false; + + *args = data; + return 0; +} + + +void* dabInputSlipThread(void* args) +{ + dabInputSlipData* data = (dabInputSlipData*)args; + TcpSocket* client; + + while ((client = data->server->accept()) != NULL) { + int size = 0; + etiLog.print(TcpLog::INFO, "SLIP server got a new client.\n"); + +#ifdef _WIN32 + WaitForSingleObject(data->semWrite, INFINITE); + WaitForSingleObject(data->semQueue, INFINITE); +#else + sem_wait(&data->semWrite); + sem_wait(&data->semQueue); +#endif + UdpPacket* packet = data->packets[data->nbPackets]; +#ifdef _WIN32 + ReleaseSemaphore(data->semQueue, 1, NULL); +#else + sem_post(&data->semQueue); +#endif + + while ((size = client->read(packet->getData(), packet->getSize())) + > 0) { + packet->setLength(size); +#ifdef _WIN32 + WaitForSingleObject(data->semQueue, INFINITE); +#else + sem_wait(&data->semQueue); +#endif + data->nbPackets++; +#ifdef _WIN32 + ReleaseSemaphore(data->semQueue, 1, NULL); +#else + sem_post(&data->semQueue); +#endif + +#ifdef _WIN32 + WaitForSingleObject(data->semWrite, INFINITE); + WaitForSingleObject(data->semQueue, INFINITE); +#else + sem_wait(&data->semWrite); + sem_wait(&data->semQueue); +#endif + packet = data->packets[data->nbPackets]; +#ifdef _WIN32 + ReleaseSemaphore(data->semQueue, 1, NULL); +#else + sem_post(&data->semQueue); +#endif + } + etiLog.print(TcpLog::INFO, "SLIP server client deconnected.\n"); + client->close(); + } + etiLog.print(TcpLog::ERR, "SLIP thread can't accept new client (%s)\n", + inetErrDesc, inetErrMsg); + + return NULL; +} + + +int dabInputSlipOpen(void* args, const char* inputName) +{ + const char* address; + long port; + address = strchr(inputName, ':'); + if (address == NULL) { + etiLog.print(TcpLog::ERR, "\"%s\" SLIP address format is invalid: " + "should be [address]:port - > aborting\n", inputName); + return -1; + } + ++address; + port = strtol(address, (char **)NULL, 10); + if ((port == LONG_MIN) || (port == LONG_MAX)) { + etiLog.print(TcpLog::ERR, "can't convert port number in SLIP address %s\n", + address); + return -1; + } + if (port == 0) { + etiLog.print(TcpLog::ERR, "can't use port number 0 in SLIP address\n"); + return -1; + } + dabInputSlipData* data = (dabInputSlipData*)args; + if (data->server->create(port) == -1) { + etiLog.print(TcpLog::ERR, "can't set port %i on SLIP input (%s: %s)\n", + port, inetErrDesc, inetErrMsg); + return -1; + } + + if (data->server->listen() == -1) { + etiLog.print(TcpLog::ERR, "can't listen on SLIP socket(%s: %s)\n", + inetErrDesc, inetErrMsg); + return -1; + } +#ifdef _WIN32 + data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputSlipThread, data, 0, NULL); + if (data->thread == NULL) { + fprintf(stderr, "Can't create SLIP child"); + return -1; + } +#else + if (pthread_create(&data->thread, NULL, dabInputSlipThread, data)) { + perror("Can't create SLIP child"); + return -1; + } +#endif + + etiLog.print(TcpLog::DBG, "check return code of create\n"); + return 0; +} + + +int dabInputSlipSetbuf(void* args, int size) +{ + dabInputSlipData* data = (dabInputSlipData*)args; + + if (size <= 10) { + return -1; + } + + data->packetSize = size - 10; + if (data->packets == NULL) { + data->packets = new UdpPacket*[data->queueSize]; + } + for (int i = 0; i < data->queueSize; ++i) { + if (data->packets[i] == NULL) { + data->packets[i] = new UdpPacket(data->packetSize); + } else { + data->packets[i]->setSize(data->packetSize); + } + } + if (data->buffer == NULL) { + data->buffer = new UdpPacket(data->packetSize); + } else { + data->buffer->setSize(data->packetSize); + } + + return 0; +} + + +int dabInputSlipRead(void* args, void* buffer, int size) +{ + dabInputSlipData* data = (dabInputSlipData*)args; + + if (data->nbPackets > 0) { // data ready + UdpPacket* temp; + temp = data->buffer; + data->buffer = data->packets[0]; + +#ifdef _WIN32 + WaitForSingleObject(data->semQueue, INFINITE); +#else + sem_wait(&data->semQueue); +#endif + for (int i = 1; i < data->queueSize; ++i) { + data->packets[i - 1] = data->packets[i]; + } + data->packets[data->queueSize - 1] = temp; + --data->nbPackets; +#ifdef _WIN32 + ReleaseSemaphore(data->semQueue, 1, NULL); + ReleaseSemaphore(data->semWrite, 1, NULL); +#else + sem_post(&data->semQueue); + sem_post(&data->semWrite); +#endif + } else { + data->buffer->setLength(0); + } + + return data->buffer->getLength(); +} + + +int dabInputSlipReadFrame(dabInputOperations* ops, void* args, void* buffer, int size) +{ + int nbBytes = 0; + dabInputSlipData* data = (dabInputSlipData*)args; + dabInputFifoStats* stats = (dabInputFifoStats*)&data->stats; + +#ifdef _WIN32 + WaitForSingleObject(data->semQueue, INFINITE); +#else + sem_wait(&data->semQueue); +#endif + stats->bufferRecords[stats->bufferCount].curSize = data->nbPackets; + stats->bufferRecords[stats->bufferCount].maxSize = data->queueSize; +#ifdef _WIN32 + ReleaseSemaphore(data->semQueue, 1, NULL); +#else + sem_post(&data->semQueue); +#endif + if (++stats->bufferCount == NB_RECORDS) { + etiLog.print(TcpLog::INFO, "SLIP buffer state: (%i)", stats->id); + for (int i = 0; i < stats->bufferCount; ++i) { + etiLog.print(TcpLog::INFO, " %i/%i", + stats->bufferRecords[i].curSize, + stats->bufferRecords[i].maxSize); + } + etiLog.print(TcpLog::INFO, "\n"); + + stats->bufferCount = 0; + } + + data->stats.frameRecords[data->stats.frameCount].curSize = 0; + data->stats.frameRecords[data->stats.frameCount].maxSize = size; + + if (data->buffer->getLength() == 0) { + ops->read(args, NULL, 0); + } + while ((nbBytes = writePacket(data->buffer->getData(), + data->buffer->getLength(), buffer, size, data->info)) + != 0) { + data->stats.frameRecords[data->stats.frameCount].curSize = nbBytes; + ops->read(args, NULL, 0); + } + + if (data->buffer->getLength() != 0) { + data->stats.frameRecords[data->stats.frameCount].curSize = size; + } + + if (++stats->frameCount == NB_RECORDS) { + etiLog.print(TcpLog::INFO, "Data subchannel usage: (%i)", + stats->id); + for (int i = 0; i < stats->frameCount; ++i) { + etiLog.print(TcpLog::INFO, " %i/%i", + stats->frameRecords[i].curSize, + stats->frameRecords[i].maxSize); + } + etiLog.print(TcpLog::INFO, "\n"); + stats->frameCount = 0; + } + return size; +} + + +int dabInputSlipClose(void* args) +{ + dabInputSlipData* data = (dabInputSlipData*)args; + data->server->close(); +#ifdef WIN32 + DWORD status; + for (int i = 0; i < 5; ++i) { + if (GetExitCodeThread(data->thread, &status)) { + break; + } + Sleep(100); + } + TerminateThread(data->thread, 1); +#else + if (data->thread != (pthread_t)NULL) { + pthread_kill(data->thread, SIGPIPE); + } +#endif + return 0; +} + + +int dabInputSlipClean(void** args) +{ + dabInputSlipData* data = (dabInputSlipData*)(*args); +#ifdef _WIN32 + CloseHandle(data->thread); + CloseHandle(data->semWrite); + CloseHandle(data->semQueue); +#else + sem_destroy(&data->semWrite); + sem_destroy(&data->semQueue); +#endif + for (int i = 0; i < data->queueSize; ++i) { + if (data->packets[i] != NULL) { + delete data->packets[i]; + } + } + delete []data->packets; + delete data->buffer; + delete data->server; + delete data->info; + delete data; + return 0; +} + + |