summaryrefslogtreecommitdiffstats
path: root/src/dabInputSlip.cpp
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2016-09-11 22:15:35 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2016-09-11 22:24:33 +0200
commit9248c1d7976ba1c37e3df147a1eb3115fe72c8d0 (patch)
treee331a2fc4600fe80ca4e2b404d4379989a95d127 /src/dabInputSlip.cpp
parent8750493994d574001e466fef21ded86730359640 (diff)
downloaddabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.gz
dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.tar.bz2
dabmux-9248c1d7976ba1c37e3df147a1eb3115fe72c8d0.zip
Drop SLIP, Refactor sockets, improve TCP output
Quite a large refactoring of the sockets, TCP and UDP, in order to improve the ETI-over-TCP output. This can now accept several simultaneous connections, and requires a throttle. The SLIP input is gone. The UDP inputs are currently broken.
Diffstat (limited to 'src/dabInputSlip.cpp')
-rw-r--r--src/dabInputSlip.cpp412
1 files changed, 0 insertions, 412 deletions
diff --git a/src/dabInputSlip.cpp b/src/dabInputSlip.cpp
deleted file mode 100644
index 0063cca..0000000
--- a/src/dabInputSlip.cpp
+++ /dev/null
@@ -1,412 +0,0 @@
-/*
- Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
- Research Center Canada)
- */
-/*
- This file is part of ODR-DabMux.
-
- ODR-DabMux is free software: you can redistribute it and/or modify
- it under the terms of the GNU General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
-
- ODR-DabMux is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License
- along with ODR-DabMux. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "dabInputSlip.h"
-#include "dabInputFifo.h"
-#include "TcpServer.h"
-#include "UdpSocket.h"
-#include "bridge.h"
-
-#include <signal.h>
-#include <string.h>
-#include <limits.h>
-
-#ifdef HAVE_FORMAT_BRIDGE
-# ifdef HAVE_INPUT_SLIP
-
-#ifdef _WIN32
-# include <io.h>
-
-# define sem_t HANDLE
-# define O_NONBLOCK 0
-#else
-# include <semaphore.h>
-# define O_BINARY 0
-#endif
-
-
-struct dabInputSlipData {
- TcpServer* server;
- UdpPacket** packets;
- UdpPacket* buffer;
- bridgeInfo* info;
- dabInputFifoStats stats;
- pthread_t thread;
- sem_t semWrite;
- sem_t semQueue;
- bool reading;
- volatile int nbPackets;
- volatile int queueSize;
- volatile int packetSize;
-};
-
-
-struct dabInputOperations dabInputSlipOperations = {
- dabInputSlipInit,
- dabInputSlipOpen,
- dabInputSlipSetbuf,
- dabInputSlipRead,
- NULL,
- NULL,
- dabInputSlipReadFrame,
- dabInputSetbitrate,
- dabInputSlipClose,
- dabInputSlipClean,
- NULL
-};
-
-
-int dabInputSlipInit(void** args)
-{
- dabInputSlipData* data = new dabInputSlipData;
- memset(&data->stats, 0, sizeof(data->stats));
- data->stats.id = dabInputFifoData::nb++;
- data->server = new TcpServer();
- data->packetSize = 1500;
- data->queueSize = 10;
- data->packets = new UdpPacket*[data->queueSize];
- for (int i = 0; i < data->queueSize; ++i) {
- data->packets[i] = new UdpPacket(data->packetSize);
- }
- data->buffer = new UdpPacket(data->packetSize);
- data->nbPackets = 0;
- data->info = new bridgeInfo;
- data->thread = (pthread_t)NULL;
- bridgeInitInfo(data->info);
-
-#ifdef _WIN32
- char semName[32];
- sprintf(semName, "semWrite%i", data->stats.id);
- data->semWrite = CreateSemaphore(NULL, 1, 1, semName);
- if (data->semWrite == NULL) {
- fprintf(stderr, "Can't init SLIP data write semaphore %s\n", semName);
- return -1;
- }
- sprintf(semName, "semQueue%i", data->stats.id);
- data->semQueue = CreateSemaphore(NULL, 1, 1, semName);
- if (data->semQueue == NULL) {
- fprintf(stderr, "Can't init SLIP data index semaphore %s\n", semName);
- return -1;
- }
-#else
- if (sem_init(&data->semWrite, 0, data->queueSize) == -1) {
- perror("Can't init SLIP data write semaphore");
- return -1;
- }
- if (sem_init(&data->semQueue, 0, 1) == -1) {
- perror("Can't init SLIP data index semaphore");
- return -1;
- }
-#endif
- data->reading = false;
-
- *args = data;
- return 0;
-}
-
-
-void* dabInputSlipThread(void* args)
-{
- dabInputSlipData* data = (dabInputSlipData*)args;
- TcpSocket* client;
-
- while ((client = data->server->accept()) != NULL) {
- int size = 0;
- etiLog.log(info, "SLIP server got a new client.\n");
-
-#ifdef _WIN32
- WaitForSingleObject(data->semWrite, INFINITE);
- WaitForSingleObject(data->semQueue, INFINITE);
-#else
- sem_wait(&data->semWrite);
- sem_wait(&data->semQueue);
-#endif
- UdpPacket* packet = data->packets[data->nbPackets];
-#ifdef _WIN32
- ReleaseSemaphore(data->semQueue, 1, NULL);
-#else
- sem_post(&data->semQueue);
-#endif
-
- while ((size = client->read(packet->getData(), packet->getSize()))
- > 0) {
- packet->setLength(size);
-#ifdef _WIN32
- WaitForSingleObject(data->semQueue, INFINITE);
-#else
- sem_wait(&data->semQueue);
-#endif
- data->nbPackets++;
-#ifdef _WIN32
- ReleaseSemaphore(data->semQueue, 1, NULL);
-#else
- sem_post(&data->semQueue);
-#endif
-
-#ifdef _WIN32
- WaitForSingleObject(data->semWrite, INFINITE);
- WaitForSingleObject(data->semQueue, INFINITE);
-#else
- sem_wait(&data->semWrite);
- sem_wait(&data->semQueue);
-#endif
- packet = data->packets[data->nbPackets];
-#ifdef _WIN32
- ReleaseSemaphore(data->semQueue, 1, NULL);
-#else
- sem_post(&data->semQueue);
-#endif
- }
- etiLog.log(info, "SLIP server client deconnected.\n");
- client->close();
- }
- etiLog.log(error, "SLIP thread can't accept new client (%s)\n",
- inetErrDesc, inetErrMsg);
-
- return NULL;
-}
-
-
-int dabInputSlipOpen(void* args, const char* inputName)
-{
- const char* address;
- long port;
- address = strchr(inputName, ':');
- if (address == NULL) {
- etiLog.log(error, "\"%s\" SLIP address format is invalid: "
- "should be [address]:port - > aborting\n", inputName);
- return -1;
- }
- ++address;
- port = strtol(address, (char **)NULL, 10);
- if ((port == LONG_MIN) || (port == LONG_MAX)) {
- etiLog.log(error, "can't convert port number in SLIP address %s\n",
- address);
- return -1;
- }
- if (port == 0) {
- etiLog.log(error, "can't use port number 0 in SLIP address\n");
- return -1;
- }
- dabInputSlipData* data = (dabInputSlipData*)args;
- if (data->server->create(port) == -1) {
- etiLog.log(error, "can't set port %i on SLIP input (%s: %s)\n",
- port, inetErrDesc, inetErrMsg);
- return -1;
- }
-
- if (data->server->listen() == -1) {
- etiLog.log(error, "can't listen on SLIP socket(%s: %s)\n",
- inetErrDesc, inetErrMsg);
- return -1;
- }
-#ifdef _WIN32
- data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputSlipThread, data, 0, NULL);
- if (data->thread == NULL) {
- fprintf(stderr, "Can't create SLIP child");
- return -1;
- }
-#else
- if (pthread_create(&data->thread, NULL, dabInputSlipThread, data)) {
- perror("Can't create SLIP child");
- return -1;
- }
-#endif
-
- etiLog.log(debug, "check return code of create\n");
- return 0;
-}
-
-
-int dabInputSlipSetbuf(void* args, int size)
-{
- dabInputSlipData* data = (dabInputSlipData*)args;
-
- if (size <= 10) {
- return -1;
- }
-
- data->packetSize = size - 10;
- if (data->packets == NULL) {
- data->packets = new UdpPacket*[data->queueSize];
- }
- for (int i = 0; i < data->queueSize; ++i) {
- if (data->packets[i] == NULL) {
- data->packets[i] = new UdpPacket(data->packetSize);
- } else {
- data->packets[i]->setSize(data->packetSize);
- }
- }
- if (data->buffer == NULL) {
- data->buffer = new UdpPacket(data->packetSize);
- } else {
- data->buffer->setSize(data->packetSize);
- }
-
- return 0;
-}
-
-
-int dabInputSlipRead(void* args, void* buffer, int size)
-{
- dabInputSlipData* data = (dabInputSlipData*)args;
-
- if (data->nbPackets > 0) { // data ready
- UdpPacket* temp;
- temp = data->buffer;
- data->buffer = data->packets[0];
-
-#ifdef _WIN32
- WaitForSingleObject(data->semQueue, INFINITE);
-#else
- sem_wait(&data->semQueue);
-#endif
- for (int i = 1; i < data->queueSize; ++i) {
- data->packets[i - 1] = data->packets[i];
- }
- data->packets[data->queueSize - 1] = temp;
- --data->nbPackets;
-#ifdef _WIN32
- ReleaseSemaphore(data->semQueue, 1, NULL);
- ReleaseSemaphore(data->semWrite, 1, NULL);
-#else
- sem_post(&data->semQueue);
- sem_post(&data->semWrite);
-#endif
- } else {
- data->buffer->setLength(0);
- }
-
- return data->buffer->getLength();
-}
-
-
-int dabInputSlipReadFrame(dabInputOperations* ops, void* args, void* buffer, int size)
-{
- int nbBytes = 0;
- dabInputSlipData* data = (dabInputSlipData*)args;
- dabInputFifoStats* stats = (dabInputFifoStats*)&data->stats;
-
-#ifdef _WIN32
- WaitForSingleObject(data->semQueue, INFINITE);
-#else
- sem_wait(&data->semQueue);
-#endif
- stats->bufferRecords[stats->bufferCount].curSize = data->nbPackets;
- stats->bufferRecords[stats->bufferCount].maxSize = data->queueSize;
-#ifdef _WIN32
- ReleaseSemaphore(data->semQueue, 1, NULL);
-#else
- sem_post(&data->semQueue);
-#endif
- if (++stats->bufferCount == NB_RECORDS) {
- etiLog.log(info, "SLIP buffer state: (%i)", stats->id);
- for (int i = 0; i < stats->bufferCount; ++i) {
- etiLog.log(info, " %i/%i",
- stats->bufferRecords[i].curSize,
- stats->bufferRecords[i].maxSize);
- }
- etiLog.log(info, "\n");
-
- stats->bufferCount = 0;
- }
-
- data->stats.frameRecords[data->stats.frameCount].curSize = 0;
- data->stats.frameRecords[data->stats.frameCount].maxSize = size;
-
- if (data->buffer->getLength() == 0) {
- ops->read(args, NULL, 0);
- }
- while ((nbBytes = writePacket(data->buffer->getData(),
- data->buffer->getLength(), buffer, size, data->info))
- != 0) {
- data->stats.frameRecords[data->stats.frameCount].curSize = nbBytes;
- ops->read(args, NULL, 0);
- }
-
- if (data->buffer->getLength() != 0) {
- data->stats.frameRecords[data->stats.frameCount].curSize = size;
- }
-
- if (++stats->frameCount == NB_RECORDS) {
- etiLog.log(info, "Data subchannel usage: (%i)",
- stats->id);
- for (int i = 0; i < stats->frameCount; ++i) {
- etiLog.log(info, " %i/%i",
- stats->frameRecords[i].curSize,
- stats->frameRecords[i].maxSize);
- }
- etiLog.log(info, "\n");
- stats->frameCount = 0;
- }
- return size;
-}
-
-
-int dabInputSlipClose(void* args)
-{
- dabInputSlipData* data = (dabInputSlipData*)args;
- data->server->close();
-#ifdef WIN32
- DWORD status;
- for (int i = 0; i < 5; ++i) {
- if (GetExitCodeThread(data->thread, &status)) {
- break;
- }
- Sleep(100);
- }
- TerminateThread(data->thread, 1);
-#else
- if (data->thread != (pthread_t)NULL) {
- pthread_kill(data->thread, SIGPIPE);
- }
-#endif
- return 0;
-}
-
-
-int dabInputSlipClean(void** args)
-{
- dabInputSlipData* data = (dabInputSlipData*)(*args);
-#ifdef _WIN32
- CloseHandle(data->thread);
- CloseHandle(data->semWrite);
- CloseHandle(data->semQueue);
-#else
- sem_destroy(&data->semWrite);
- sem_destroy(&data->semQueue);
-#endif
- for (int i = 0; i < data->queueSize; ++i) {
- if (data->packets[i] != NULL) {
- delete data->packets[i];
- }
- }
- delete []data->packets;
- delete data->buffer;
- delete data->server;
- delete data->info;
- delete data;
- return 0;
-}
-
-# endif // HAVE_INPUT_SLIP
-#endif // HAVE_FORMAT_BRIDGE
-