From bbab73a63b8c7b50e8a8cb228999d45024fad984 Mon Sep 17 00:00:00 2001 From: "Matthias (think)" Date: Wed, 11 Jul 2012 11:49:12 +0200 Subject: added unmodified mmbtools --- src/dabInputFifo.cpp | 513 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 513 insertions(+) create mode 100644 src/dabInputFifo.cpp (limited to 'src/dabInputFifo.cpp') diff --git a/src/dabInputFifo.cpp b/src/dabInputFifo.cpp new file mode 100644 index 0000000..a2c2ef7 --- /dev/null +++ b/src/dabInputFifo.cpp @@ -0,0 +1,513 @@ +/* + Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications + Research Center Canada) + */ +/* + This file is part of CRC-DabMux. + + CRC-DabMux is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as + published by the Free Software Foundation, either version 3 of the + License, or (at your option) any later version. + + CRC-DabMux is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with CRC-DabMux. If not, see . + */ + +#include "dabInputFifo.h" +#include "dabInputPacketFile.h" +#include "dabInput.h" + +#include +#include +#include +#include +#include + + +#ifdef HAVE_FORMAT_PACKET +# ifdef HAVE_INPUT_FIFO + + +int dabInputFifoData::nb = 0; + + +struct dabInputOperations dabInputFifoOperations = { + dabInputFifoInit, + dabInputFifoOpen, + dabInputFifoSetbuf, + dabInputFifoRead, + dabInputFifoLock, + dabInputFifoUnlock, + dabInputPacketFileRead, + dabInputSetbitrate, + dabInputFifoClose, + dabInputFifoClean, + dabInputFifoRewind +}; + + +int dabInputFifoInit(void** args) +{ + dabInputFifoData* data = new dabInputFifoData; + memset(data, 0, sizeof(*data)); + data->stats.id = dabInputFifoData::nb++; + data->maxSize = 0; + data->curSize = 0; + data->head = 0; + data->tail = 0; + data->buffer = NULL; + data->packetData = NULL; + data->enhancedPacketData = NULL; + data->packetLength = 0; + data->enhancedPacketLength = 0; + data->enhancedPacketWaiting = 0; + data->full = false; + data->running = true; + data->thread = (pthread_t)NULL; +#ifdef _WIN32 + char semName[32]; + sprintf(semName, "semInfo%i", data->stats.id); + data->semInfo = CreateSemaphore(NULL, 1, 1, semName); + if (data->semInfo == NULL) { + fprintf(stderr, "Can't init FIFO data semaphore %s\n", semName); + return -1; + } + sprintf(semName, "semBuffer%i", data->stats.id); + data->semBuffer = CreateSemaphore(NULL, 1, 1, semName); + if (data->semBuffer == NULL) { + fprintf(stderr, "Can't init FIFO buffer semaphore %s\n", semName); + return -1; + } + sprintf(semName, "semFull%i", data->stats.id); + data->semFull = CreateSemaphore(NULL, 1, 1, semName); + if (data->semFull == NULL) { + fprintf(stderr, "Can't init FIFO semaphore %s\n", semName); + return -1; + } +#else + if (sem_init(&data->semInfo, 0, 1) == -1) { + perror("Can't init FIFO data semaphore"); + return -1; + } + if (sem_init(&data->semBuffer, 0, 0) == -1) { + perror("Can't init fIFO buffer semaphore"); + return -1; + } + if (sem_init(&data->semFull, 0, 0) == -1) { + perror("Can't init FIFO semaphore"); + return -1; + } +#endif + + if (data->maxSize > 0) { +#ifdef _WIN32 + ReleaseSemaphore(data->semBuffer, 1, NULL); +#else + sem_post(&data->semBuffer); +#endif + } + *args = data; + return 0; +} + + +int dabInputFifoOpen(void* args, const char* filename) +{ + dabInputFifoData* data = (dabInputFifoData*)args; + data->file = open(filename, O_RDONLY | O_BINARY | O_NONBLOCK); + if (data->file == -1) { + perror(filename); + return -1; + } +#ifdef _WIN32 +#else + int flags = fcntl(data->file, F_GETFL); + if (flags == -1) { + perror(filename); + return -1; + } + if (fcntl(data->file, F_SETFL, flags & ~O_NONBLOCK) == -1) { + perror(filename); + return -1; + } +#endif + +#ifdef _WIN32 + data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputFifoThread, data, 0, NULL); + if (data->thread == NULL) { + fprintf(stderr, "Can't create FIFO child\n"); + return -1; + } +#else + if (pthread_create(&data->thread, NULL, dabInputFifoThread, data)) { + perror("Can't create FIFO child"); + return -1; + } +#endif + + return 0; +} + + +int dabInputFifoSetbuf(void* args, int size) +{ + dabInputFifoData* data = (dabInputFifoData*)args; + + if (data->maxSize > 0) { +#ifdef _WIN32 + WaitForSingleObject(data->semBuffer, INFINITE); +#else + sem_wait(&data->semBuffer); +#endif + } + if (data->buffer != NULL) { + delete data->buffer; + } + if (size == 0) { + size = 1024; + } + data->buffer = new unsigned char[size * 16]; + data->maxSize = size * 16; +#ifdef _WIN32 + ReleaseSemaphore(data->semBuffer, 1, NULL); +#else + sem_post(&data->semBuffer); +#endif + + return 0; +} + + +int dabInputFifoRead(void* args, void* buffer, int size) +{ + //fprintf(stderr, "INFO: read %i bytes\n", size); + dabInputFifoData* data = (dabInputFifoData*)args; + dabInputFifoStats* stats = &data->stats; + int head; + int tail; + int curSize; + int maxSize; +#ifdef _WIN32 + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semInfo); +#endif + head = data->head; + tail = data->tail; + curSize = data->curSize; + maxSize = data->maxSize; +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + //fprintf(stderr, "head: %i, tail: %i, curSize: %i\n", head, tail, curSize); + if (size > curSize) { + if (curSize == 0) { + stats->empty = true; + } else { + etiLog.print(TcpLog::WARNING, "Not enough data in FIFO buffer: (%i) %i/%i\n", + data->stats.id, curSize, size); + } + return 0; + } + if (head > tail) { + memcpy(buffer, data->buffer + tail, size); +#ifdef _WIN32 + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semInfo); +#endif + data->tail += size; + data->curSize -= size; +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + return size; + } else { + if (maxSize - tail >= size) { + memcpy(buffer, data->buffer + tail, size); +#ifdef _WIN32 + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semInfo); +#endif + data->tail += size; + data->curSize -= size; +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + return size; + } else { + memcpy(buffer, data->buffer + tail, maxSize - tail); +#ifdef _WIN32 + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semInfo); +#endif + data->tail = 0; + data->curSize -= maxSize - tail; +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + return maxSize - tail + dabInputFifoRead(data, (char*)buffer + maxSize - tail, size - (maxSize - tail)); + } + } + return -1; +} + + +int dabInputFifoLock(void* args) { + dabInputFifoData* data = (dabInputFifoData*)args; + dabInputFifoStats* stats = &data->stats; + + int maxSize; + int curSize; +#ifdef _WIN32 + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semInfo); +#endif + maxSize = data->maxSize; + curSize = data->curSize; +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + + stats->bufferRecords[stats->bufferCount].curSize = curSize; + stats->bufferRecords[stats->bufferCount].maxSize = maxSize; + + if (++stats->bufferCount == NB_RECORDS) { + etiLog.print(TcpLog::INFO, "FIFO buffer state: (%i)", stats->id); + for (int i = 0; i < stats->bufferCount; ++i) { + etiLog.print(TcpLog::INFO, " %i/%i", + stats->bufferRecords[i].curSize, + stats->bufferRecords[i].maxSize); + } + etiLog.print(TcpLog::INFO, "\n"); + + if (stats->full) { + etiLog.print(TcpLog::WARNING, "FIFO buffer full: (%i)\n", + data->stats.id); + stats->full = false; + } + if (stats->empty) { + etiLog.print(TcpLog::WARNING, "FIFO buffer empty: (%i)\n", + data->stats.id); + stats->empty = false; + } + if (stats->error) { + etiLog.print(TcpLog::ERR, "FIFO input read error: (%i)\n", + data->stats.id); + stats->error = false; + } + if (stats->input) { + etiLog.print(TcpLog::ERR, "FIFO input not connected: (%i)\n", + data->stats.id); + stats->input = false; + } + + stats->bufferCount = 0; + } + return 0; +} + + +int dabInputFifoUnlock(void* args) { + dabInputFifoData* data = (dabInputFifoData*)args; + if (data->full) { +#ifdef _WIN32 + ReleaseSemaphore(data->semFull, 1, NULL); +#else + sem_post(&data->semFull); +#endif + } + return 0; +} + + +int dabInputFifoClose(void* args) +{ + dabInputFifoData* data = (dabInputFifoData*)args; + close(data->file); + return 0; +} + + +int dabInputFifoClean(void** args) +{ + dabInputFifoData* data = (dabInputFifoData*)*args; + data->running = false; + etiLog.print(TcpLog::DBG, "Wait FIFO child...\n"); +#ifdef WIN32 + DWORD status; + for (int i = 0; i < 5; ++i) { + if (GetExitCodeThread(data->thread, &status)) { + break; + } + Sleep(100); + } + TerminateThread(data->thread, 1); + if (CloseHandle(data->thread) == 0) { + etiLog.print(TcpLog::DBG, "ERROR: Failed to close FIFO child thread\n"); + } +#else + if (data->thread != (pthread_t)NULL) { + if (pthread_join(data->thread, NULL)) { + etiLog.print(TcpLog::DBG, "ERROR: FIFO child thread had not exit normally\n"); + } + } +#endif + etiLog.print(TcpLog::DBG, "Done\n"); +#ifdef _WIN32 + CloseHandle(data->semInfo); + CloseHandle(data->semFull); + CloseHandle(data->semBuffer); +#else + sem_destroy(&data->semInfo); + sem_destroy(&data->semFull); + sem_destroy(&data->semBuffer); +#endif + if (data->packetData != NULL) { + delete[] data->packetData; + } + if (data->enhancedPacketData != NULL) { + for (int i = 0; i < 12; ++i) { + if (data->enhancedPacketData[i] != NULL) { + delete[] data->enhancedPacketData[i]; + } + } + delete[] data->enhancedPacketData; + } + delete data->buffer; + delete data; + return 0; +} + + +int dabInputFifoRewind(void* args) +{ + return -1; +} + + +void* dabInputFifoThread(void* args) +{ + dabInputFifoData* data = (dabInputFifoData*)args; + int head; + int tail; + int curSize; + int maxSize; + int ret; + while (data->running) { +#ifdef _WIN32 + WaitForSingleObject(data->semBuffer, INFINITE); + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semBuffer); + sem_wait(&data->semInfo); +#endif + head = data->head; + tail = data->tail; + curSize = data->curSize; + maxSize = data->maxSize; +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + //fprintf(stderr, "thread, head: %i, tail: %i, curSize: %i\n", head, tail, curSize); + + if (curSize == maxSize) { + data->stats.full = true; + data->full = true; +#ifdef _WIN32 + WaitForSingleObject(data->semFull, INFINITE); +#else + sem_wait(&data->semFull); +#endif + } else if (head >= tail) { // 2 blocks + ret = read(data->file, data->buffer + head, maxSize - head); + if (ret == 0) { + data->stats.input = true; + data->full = true; +#ifdef _WIN32 + WaitForSingleObject(data->semFull, INFINITE); +#else + sem_wait(&data->semFull); +#endif + } else if (ret == -1) { + data->stats.error = true; + } else { +#ifdef _WIN32 + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semInfo); +#endif + data->head += ret; + data->curSize += ret; + if (data->head == maxSize) { + data->head = 0; + } +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + } + } else { // 1 block + ret = read(data->file, data->buffer + head, tail - head); + if (ret == 0) { + data->stats.input = true; + data->full = true; +#ifdef _WIN32 + WaitForSingleObject(data->semFull, INFINITE); +#else + sem_wait(&data->semFull); +#endif + } else if (ret == -1) { + data->stats.error = true; + } else { +#ifdef _WIN32 + WaitForSingleObject(data->semInfo, INFINITE); +#else + sem_wait(&data->semInfo); +#endif + data->head += ret; + data->curSize += ret; + if (data->head == maxSize) { + data->head = 0; + } +#ifdef _WIN32 + ReleaseSemaphore(data->semInfo, 1, NULL); +#else + sem_post(&data->semInfo); +#endif + } + } +#ifdef _WIN32 + ReleaseSemaphore(data->semBuffer, 1, NULL); +#else + sem_post(&data->semBuffer); +#endif + } + return NULL; +} + + +# endif +#endif -- cgit v1.2.3