summaryrefslogtreecommitdiffstats
path: root/src/dabInputFifo.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dabInputFifo.cpp')
-rw-r--r--src/dabInputFifo.cpp513
1 files changed, 513 insertions, 0 deletions
diff --git a/src/dabInputFifo.cpp b/src/dabInputFifo.cpp
new file mode 100644
index 0000000..a2c2ef7
--- /dev/null
+++ b/src/dabInputFifo.cpp
@@ -0,0 +1,513 @@
+/*
+ Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
+ Research Center Canada)
+ */
+/*
+ This file is part of CRC-DabMux.
+
+ CRC-DabMux is free software: you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as
+ published by the Free Software Foundation, either version 3 of the
+ License, or (at your option) any later version.
+
+ CRC-DabMux is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with CRC-DabMux. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "dabInputFifo.h"
+#include "dabInputPacketFile.h"
+#include "dabInput.h"
+
+#include <unistd.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <fcntl.h>
+
+
+#ifdef HAVE_FORMAT_PACKET
+# ifdef HAVE_INPUT_FIFO
+
+
+int dabInputFifoData::nb = 0;
+
+
+struct dabInputOperations dabInputFifoOperations = {
+ dabInputFifoInit,
+ dabInputFifoOpen,
+ dabInputFifoSetbuf,
+ dabInputFifoRead,
+ dabInputFifoLock,
+ dabInputFifoUnlock,
+ dabInputPacketFileRead,
+ dabInputSetbitrate,
+ dabInputFifoClose,
+ dabInputFifoClean,
+ dabInputFifoRewind
+};
+
+
+int dabInputFifoInit(void** args)
+{
+ dabInputFifoData* data = new dabInputFifoData;
+ memset(data, 0, sizeof(*data));
+ data->stats.id = dabInputFifoData::nb++;
+ data->maxSize = 0;
+ data->curSize = 0;
+ data->head = 0;
+ data->tail = 0;
+ data->buffer = NULL;
+ data->packetData = NULL;
+ data->enhancedPacketData = NULL;
+ data->packetLength = 0;
+ data->enhancedPacketLength = 0;
+ data->enhancedPacketWaiting = 0;
+ data->full = false;
+ data->running = true;
+ data->thread = (pthread_t)NULL;
+#ifdef _WIN32
+ char semName[32];
+ sprintf(semName, "semInfo%i", data->stats.id);
+ data->semInfo = CreateSemaphore(NULL, 1, 1, semName);
+ if (data->semInfo == NULL) {
+ fprintf(stderr, "Can't init FIFO data semaphore %s\n", semName);
+ return -1;
+ }
+ sprintf(semName, "semBuffer%i", data->stats.id);
+ data->semBuffer = CreateSemaphore(NULL, 1, 1, semName);
+ if (data->semBuffer == NULL) {
+ fprintf(stderr, "Can't init FIFO buffer semaphore %s\n", semName);
+ return -1;
+ }
+ sprintf(semName, "semFull%i", data->stats.id);
+ data->semFull = CreateSemaphore(NULL, 1, 1, semName);
+ if (data->semFull == NULL) {
+ fprintf(stderr, "Can't init FIFO semaphore %s\n", semName);
+ return -1;
+ }
+#else
+ if (sem_init(&data->semInfo, 0, 1) == -1) {
+ perror("Can't init FIFO data semaphore");
+ return -1;
+ }
+ if (sem_init(&data->semBuffer, 0, 0) == -1) {
+ perror("Can't init fIFO buffer semaphore");
+ return -1;
+ }
+ if (sem_init(&data->semFull, 0, 0) == -1) {
+ perror("Can't init FIFO semaphore");
+ return -1;
+ }
+#endif
+
+ if (data->maxSize > 0) {
+#ifdef _WIN32
+ ReleaseSemaphore(data->semBuffer, 1, NULL);
+#else
+ sem_post(&data->semBuffer);
+#endif
+ }
+ *args = data;
+ return 0;
+}
+
+
+int dabInputFifoOpen(void* args, const char* filename)
+{
+ dabInputFifoData* data = (dabInputFifoData*)args;
+ data->file = open(filename, O_RDONLY | O_BINARY | O_NONBLOCK);
+ if (data->file == -1) {
+ perror(filename);
+ return -1;
+ }
+#ifdef _WIN32
+#else
+ int flags = fcntl(data->file, F_GETFL);
+ if (flags == -1) {
+ perror(filename);
+ return -1;
+ }
+ if (fcntl(data->file, F_SETFL, flags & ~O_NONBLOCK) == -1) {
+ perror(filename);
+ return -1;
+ }
+#endif
+
+#ifdef _WIN32
+ data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputFifoThread, data, 0, NULL);
+ if (data->thread == NULL) {
+ fprintf(stderr, "Can't create FIFO child\n");
+ return -1;
+ }
+#else
+ if (pthread_create(&data->thread, NULL, dabInputFifoThread, data)) {
+ perror("Can't create FIFO child");
+ return -1;
+ }
+#endif
+
+ return 0;
+}
+
+
+int dabInputFifoSetbuf(void* args, int size)
+{
+ dabInputFifoData* data = (dabInputFifoData*)args;
+
+ if (data->maxSize > 0) {
+#ifdef _WIN32
+ WaitForSingleObject(data->semBuffer, INFINITE);
+#else
+ sem_wait(&data->semBuffer);
+#endif
+ }
+ if (data->buffer != NULL) {
+ delete data->buffer;
+ }
+ if (size == 0) {
+ size = 1024;
+ }
+ data->buffer = new unsigned char[size * 16];
+ data->maxSize = size * 16;
+#ifdef _WIN32
+ ReleaseSemaphore(data->semBuffer, 1, NULL);
+#else
+ sem_post(&data->semBuffer);
+#endif
+
+ return 0;
+}
+
+
+int dabInputFifoRead(void* args, void* buffer, int size)
+{
+ //fprintf(stderr, "INFO: read %i bytes\n", size);
+ dabInputFifoData* data = (dabInputFifoData*)args;
+ dabInputFifoStats* stats = &data->stats;
+ int head;
+ int tail;
+ int curSize;
+ int maxSize;
+#ifdef _WIN32
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semInfo);
+#endif
+ head = data->head;
+ tail = data->tail;
+ curSize = data->curSize;
+ maxSize = data->maxSize;
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+ //fprintf(stderr, "head: %i, tail: %i, curSize: %i\n", head, tail, curSize);
+ if (size > curSize) {
+ if (curSize == 0) {
+ stats->empty = true;
+ } else {
+ etiLog.print(TcpLog::WARNING, "Not enough data in FIFO buffer: (%i) %i/%i\n",
+ data->stats.id, curSize, size);
+ }
+ return 0;
+ }
+ if (head > tail) {
+ memcpy(buffer, data->buffer + tail, size);
+#ifdef _WIN32
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semInfo);
+#endif
+ data->tail += size;
+ data->curSize -= size;
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+ return size;
+ } else {
+ if (maxSize - tail >= size) {
+ memcpy(buffer, data->buffer + tail, size);
+#ifdef _WIN32
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semInfo);
+#endif
+ data->tail += size;
+ data->curSize -= size;
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+ return size;
+ } else {
+ memcpy(buffer, data->buffer + tail, maxSize - tail);
+#ifdef _WIN32
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semInfo);
+#endif
+ data->tail = 0;
+ data->curSize -= maxSize - tail;
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+ return maxSize - tail + dabInputFifoRead(data, (char*)buffer + maxSize - tail, size - (maxSize - tail));
+ }
+ }
+ return -1;
+}
+
+
+int dabInputFifoLock(void* args) {
+ dabInputFifoData* data = (dabInputFifoData*)args;
+ dabInputFifoStats* stats = &data->stats;
+
+ int maxSize;
+ int curSize;
+#ifdef _WIN32
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semInfo);
+#endif
+ maxSize = data->maxSize;
+ curSize = data->curSize;
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+
+ stats->bufferRecords[stats->bufferCount].curSize = curSize;
+ stats->bufferRecords[stats->bufferCount].maxSize = maxSize;
+
+ if (++stats->bufferCount == NB_RECORDS) {
+ etiLog.print(TcpLog::INFO, "FIFO buffer state: (%i)", stats->id);
+ for (int i = 0; i < stats->bufferCount; ++i) {
+ etiLog.print(TcpLog::INFO, " %i/%i",
+ stats->bufferRecords[i].curSize,
+ stats->bufferRecords[i].maxSize);
+ }
+ etiLog.print(TcpLog::INFO, "\n");
+
+ if (stats->full) {
+ etiLog.print(TcpLog::WARNING, "FIFO buffer full: (%i)\n",
+ data->stats.id);
+ stats->full = false;
+ }
+ if (stats->empty) {
+ etiLog.print(TcpLog::WARNING, "FIFO buffer empty: (%i)\n",
+ data->stats.id);
+ stats->empty = false;
+ }
+ if (stats->error) {
+ etiLog.print(TcpLog::ERR, "FIFO input read error: (%i)\n",
+ data->stats.id);
+ stats->error = false;
+ }
+ if (stats->input) {
+ etiLog.print(TcpLog::ERR, "FIFO input not connected: (%i)\n",
+ data->stats.id);
+ stats->input = false;
+ }
+
+ stats->bufferCount = 0;
+ }
+ return 0;
+}
+
+
+int dabInputFifoUnlock(void* args) {
+ dabInputFifoData* data = (dabInputFifoData*)args;
+ if (data->full) {
+#ifdef _WIN32
+ ReleaseSemaphore(data->semFull, 1, NULL);
+#else
+ sem_post(&data->semFull);
+#endif
+ }
+ return 0;
+}
+
+
+int dabInputFifoClose(void* args)
+{
+ dabInputFifoData* data = (dabInputFifoData*)args;
+ close(data->file);
+ return 0;
+}
+
+
+int dabInputFifoClean(void** args)
+{
+ dabInputFifoData* data = (dabInputFifoData*)*args;
+ data->running = false;
+ etiLog.print(TcpLog::DBG, "Wait FIFO child...\n");
+#ifdef WIN32
+ DWORD status;
+ for (int i = 0; i < 5; ++i) {
+ if (GetExitCodeThread(data->thread, &status)) {
+ break;
+ }
+ Sleep(100);
+ }
+ TerminateThread(data->thread, 1);
+ if (CloseHandle(data->thread) == 0) {
+ etiLog.print(TcpLog::DBG, "ERROR: Failed to close FIFO child thread\n");
+ }
+#else
+ if (data->thread != (pthread_t)NULL) {
+ if (pthread_join(data->thread, NULL)) {
+ etiLog.print(TcpLog::DBG, "ERROR: FIFO child thread had not exit normally\n");
+ }
+ }
+#endif
+ etiLog.print(TcpLog::DBG, "Done\n");
+#ifdef _WIN32
+ CloseHandle(data->semInfo);
+ CloseHandle(data->semFull);
+ CloseHandle(data->semBuffer);
+#else
+ sem_destroy(&data->semInfo);
+ sem_destroy(&data->semFull);
+ sem_destroy(&data->semBuffer);
+#endif
+ if (data->packetData != NULL) {
+ delete[] data->packetData;
+ }
+ if (data->enhancedPacketData != NULL) {
+ for (int i = 0; i < 12; ++i) {
+ if (data->enhancedPacketData[i] != NULL) {
+ delete[] data->enhancedPacketData[i];
+ }
+ }
+ delete[] data->enhancedPacketData;
+ }
+ delete data->buffer;
+ delete data;
+ return 0;
+}
+
+
+int dabInputFifoRewind(void* args)
+{
+ return -1;
+}
+
+
+void* dabInputFifoThread(void* args)
+{
+ dabInputFifoData* data = (dabInputFifoData*)args;
+ int head;
+ int tail;
+ int curSize;
+ int maxSize;
+ int ret;
+ while (data->running) {
+#ifdef _WIN32
+ WaitForSingleObject(data->semBuffer, INFINITE);
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semBuffer);
+ sem_wait(&data->semInfo);
+#endif
+ head = data->head;
+ tail = data->tail;
+ curSize = data->curSize;
+ maxSize = data->maxSize;
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+ //fprintf(stderr, "thread, head: %i, tail: %i, curSize: %i\n", head, tail, curSize);
+
+ if (curSize == maxSize) {
+ data->stats.full = true;
+ data->full = true;
+#ifdef _WIN32
+ WaitForSingleObject(data->semFull, INFINITE);
+#else
+ sem_wait(&data->semFull);
+#endif
+ } else if (head >= tail) { // 2 blocks
+ ret = read(data->file, data->buffer + head, maxSize - head);
+ if (ret == 0) {
+ data->stats.input = true;
+ data->full = true;
+#ifdef _WIN32
+ WaitForSingleObject(data->semFull, INFINITE);
+#else
+ sem_wait(&data->semFull);
+#endif
+ } else if (ret == -1) {
+ data->stats.error = true;
+ } else {
+#ifdef _WIN32
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semInfo);
+#endif
+ data->head += ret;
+ data->curSize += ret;
+ if (data->head == maxSize) {
+ data->head = 0;
+ }
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+ }
+ } else { // 1 block
+ ret = read(data->file, data->buffer + head, tail - head);
+ if (ret == 0) {
+ data->stats.input = true;
+ data->full = true;
+#ifdef _WIN32
+ WaitForSingleObject(data->semFull, INFINITE);
+#else
+ sem_wait(&data->semFull);
+#endif
+ } else if (ret == -1) {
+ data->stats.error = true;
+ } else {
+#ifdef _WIN32
+ WaitForSingleObject(data->semInfo, INFINITE);
+#else
+ sem_wait(&data->semInfo);
+#endif
+ data->head += ret;
+ data->curSize += ret;
+ if (data->head == maxSize) {
+ data->head = 0;
+ }
+#ifdef _WIN32
+ ReleaseSemaphore(data->semInfo, 1, NULL);
+#else
+ sem_post(&data->semInfo);
+#endif
+ }
+ }
+#ifdef _WIN32
+ ReleaseSemaphore(data->semBuffer, 1, NULL);
+#else
+ sem_post(&data->semBuffer);
+#endif
+ }
+ return NULL;
+}
+
+
+# endif
+#endif