/*
Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
Research Center Canada)
*/
/*
This file is part of CRC-DabMux.
CRC-DabMux is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
CRC-DabMux is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with CRC-DabMux. If not, see .
*/
#include "dabInputFifo.h"
#include "dabInputPacketFile.h"
#include "dabInput.h"
#include
#include
#include
#include
#include
#ifdef HAVE_FORMAT_PACKET
# ifdef HAVE_INPUT_FIFO
int dabInputFifoData::nb = 0;
struct dabInputOperations dabInputFifoOperations = {
dabInputFifoInit,
dabInputFifoOpen,
dabInputFifoSetbuf,
dabInputFifoRead,
dabInputFifoLock,
dabInputFifoUnlock,
dabInputPacketFileRead,
dabInputSetbitrate,
dabInputFifoClose,
dabInputFifoClean,
dabInputFifoRewind
};
int dabInputFifoInit(void** args)
{
dabInputFifoData* data = new dabInputFifoData;
memset(data, 0, sizeof(*data));
data->stats.id = dabInputFifoData::nb++;
data->maxSize = 0;
data->curSize = 0;
data->head = 0;
data->tail = 0;
data->buffer = NULL;
data->packetData = NULL;
data->enhancedPacketData = NULL;
data->packetLength = 0;
data->enhancedPacketLength = 0;
data->enhancedPacketWaiting = 0;
data->full = false;
data->running = true;
data->thread = (pthread_t)NULL;
#ifdef _WIN32
char semName[32];
sprintf(semName, "semInfo%i", data->stats.id);
data->semInfo = CreateSemaphore(NULL, 1, 1, semName);
if (data->semInfo == NULL) {
fprintf(stderr, "Can't init FIFO data semaphore %s\n", semName);
return -1;
}
sprintf(semName, "semBuffer%i", data->stats.id);
data->semBuffer = CreateSemaphore(NULL, 1, 1, semName);
if (data->semBuffer == NULL) {
fprintf(stderr, "Can't init FIFO buffer semaphore %s\n", semName);
return -1;
}
sprintf(semName, "semFull%i", data->stats.id);
data->semFull = CreateSemaphore(NULL, 1, 1, semName);
if (data->semFull == NULL) {
fprintf(stderr, "Can't init FIFO semaphore %s\n", semName);
return -1;
}
#else
if (sem_init(&data->semInfo, 0, 1) == -1) {
perror("Can't init FIFO data semaphore");
return -1;
}
if (sem_init(&data->semBuffer, 0, 0) == -1) {
perror("Can't init fIFO buffer semaphore");
return -1;
}
if (sem_init(&data->semFull, 0, 0) == -1) {
perror("Can't init FIFO semaphore");
return -1;
}
#endif
if (data->maxSize > 0) {
#ifdef _WIN32
ReleaseSemaphore(data->semBuffer, 1, NULL);
#else
sem_post(&data->semBuffer);
#endif
}
*args = data;
return 0;
}
int dabInputFifoOpen(void* args, const char* filename)
{
dabInputFifoData* data = (dabInputFifoData*)args;
data->file = open(filename, O_RDONLY | O_BINARY | O_NONBLOCK);
if (data->file == -1) {
perror(filename);
return -1;
}
#ifdef _WIN32
#else
int flags = fcntl(data->file, F_GETFL);
if (flags == -1) {
perror(filename);
return -1;
}
if (fcntl(data->file, F_SETFL, flags & ~O_NONBLOCK) == -1) {
perror(filename);
return -1;
}
#endif
#ifdef _WIN32
data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputFifoThread, data, 0, NULL);
if (data->thread == NULL) {
fprintf(stderr, "Can't create FIFO child\n");
return -1;
}
#else
if (pthread_create(&data->thread, NULL, dabInputFifoThread, data)) {
perror("Can't create FIFO child");
return -1;
}
#endif
return 0;
}
int dabInputFifoSetbuf(void* args, int size)
{
dabInputFifoData* data = (dabInputFifoData*)args;
if (data->maxSize > 0) {
#ifdef _WIN32
WaitForSingleObject(data->semBuffer, INFINITE);
#else
sem_wait(&data->semBuffer);
#endif
}
if (data->buffer != NULL) {
delete data->buffer;
}
if (size == 0) {
size = 1024;
}
data->buffer = new unsigned char[size * 16];
data->maxSize = size * 16;
#ifdef _WIN32
ReleaseSemaphore(data->semBuffer, 1, NULL);
#else
sem_post(&data->semBuffer);
#endif
return 0;
}
int dabInputFifoRead(void* args, void* buffer, int size)
{
//fprintf(stderr, "INFO: read %i bytes\n", size);
dabInputFifoData* data = (dabInputFifoData*)args;
dabInputFifoStats* stats = &data->stats;
int head;
int tail;
int curSize;
int maxSize;
#ifdef _WIN32
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semInfo);
#endif
head = data->head;
tail = data->tail;
curSize = data->curSize;
maxSize = data->maxSize;
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
//fprintf(stderr, "head: %i, tail: %i, curSize: %i\n", head, tail, curSize);
if (size > curSize) {
if (curSize == 0) {
stats->empty = true;
} else {
etiLog.print(TcpLog::WARNING, "Not enough data in FIFO buffer: (%i) %i/%i\n",
data->stats.id, curSize, size);
}
return 0;
}
if (head > tail) {
memcpy(buffer, data->buffer + tail, size);
#ifdef _WIN32
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semInfo);
#endif
data->tail += size;
data->curSize -= size;
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
return size;
} else {
if (maxSize - tail >= size) {
memcpy(buffer, data->buffer + tail, size);
#ifdef _WIN32
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semInfo);
#endif
data->tail += size;
data->curSize -= size;
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
return size;
} else {
memcpy(buffer, data->buffer + tail, maxSize - tail);
#ifdef _WIN32
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semInfo);
#endif
data->tail = 0;
data->curSize -= maxSize - tail;
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
return maxSize - tail + dabInputFifoRead(data, (char*)buffer + maxSize - tail, size - (maxSize - tail));
}
}
return -1;
}
int dabInputFifoLock(void* args) {
dabInputFifoData* data = (dabInputFifoData*)args;
dabInputFifoStats* stats = &data->stats;
int maxSize;
int curSize;
#ifdef _WIN32
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semInfo);
#endif
maxSize = data->maxSize;
curSize = data->curSize;
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
stats->bufferRecords[stats->bufferCount].curSize = curSize;
stats->bufferRecords[stats->bufferCount].maxSize = maxSize;
if (++stats->bufferCount == NB_RECORDS) {
etiLog.print(TcpLog::INFO, "FIFO buffer state: (%i)", stats->id);
for (int i = 0; i < stats->bufferCount; ++i) {
etiLog.print(TcpLog::INFO, " %i/%i",
stats->bufferRecords[i].curSize,
stats->bufferRecords[i].maxSize);
}
etiLog.print(TcpLog::INFO, "\n");
if (stats->full) {
etiLog.print(TcpLog::WARNING, "FIFO buffer full: (%i)\n",
data->stats.id);
stats->full = false;
}
if (stats->empty) {
etiLog.print(TcpLog::WARNING, "FIFO buffer empty: (%i)\n",
data->stats.id);
stats->empty = false;
}
if (stats->error) {
etiLog.print(TcpLog::ERR, "FIFO input read error: (%i)\n",
data->stats.id);
stats->error = false;
}
if (stats->input) {
etiLog.print(TcpLog::ERR, "FIFO input not connected: (%i)\n",
data->stats.id);
stats->input = false;
}
stats->bufferCount = 0;
}
return 0;
}
int dabInputFifoUnlock(void* args) {
dabInputFifoData* data = (dabInputFifoData*)args;
if (data->full) {
#ifdef _WIN32
ReleaseSemaphore(data->semFull, 1, NULL);
#else
sem_post(&data->semFull);
#endif
}
return 0;
}
int dabInputFifoClose(void* args)
{
dabInputFifoData* data = (dabInputFifoData*)args;
close(data->file);
return 0;
}
int dabInputFifoClean(void** args)
{
dabInputFifoData* data = (dabInputFifoData*)*args;
data->running = false;
etiLog.print(TcpLog::DBG, "Wait FIFO child...\n");
#ifdef WIN32
DWORD status;
for (int i = 0; i < 5; ++i) {
if (GetExitCodeThread(data->thread, &status)) {
break;
}
Sleep(100);
}
TerminateThread(data->thread, 1);
if (CloseHandle(data->thread) == 0) {
etiLog.print(TcpLog::DBG, "ERROR: Failed to close FIFO child thread\n");
}
#else
if (data->thread != (pthread_t)NULL) {
if (pthread_join(data->thread, NULL)) {
etiLog.print(TcpLog::DBG, "ERROR: FIFO child thread had not exit normally\n");
}
}
#endif
etiLog.print(TcpLog::DBG, "Done\n");
#ifdef _WIN32
CloseHandle(data->semInfo);
CloseHandle(data->semFull);
CloseHandle(data->semBuffer);
#else
sem_destroy(&data->semInfo);
sem_destroy(&data->semFull);
sem_destroy(&data->semBuffer);
#endif
if (data->packetData != NULL) {
delete[] data->packetData;
}
if (data->enhancedPacketData != NULL) {
for (int i = 0; i < 12; ++i) {
if (data->enhancedPacketData[i] != NULL) {
delete[] data->enhancedPacketData[i];
}
}
delete[] data->enhancedPacketData;
}
delete data->buffer;
delete data;
return 0;
}
int dabInputFifoRewind(void* args)
{
return -1;
}
void* dabInputFifoThread(void* args)
{
dabInputFifoData* data = (dabInputFifoData*)args;
int head;
int tail;
int curSize;
int maxSize;
int ret;
while (data->running) {
#ifdef _WIN32
WaitForSingleObject(data->semBuffer, INFINITE);
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semBuffer);
sem_wait(&data->semInfo);
#endif
head = data->head;
tail = data->tail;
curSize = data->curSize;
maxSize = data->maxSize;
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
//fprintf(stderr, "thread, head: %i, tail: %i, curSize: %i\n", head, tail, curSize);
if (curSize == maxSize) {
data->stats.full = true;
data->full = true;
#ifdef _WIN32
WaitForSingleObject(data->semFull, INFINITE);
#else
sem_wait(&data->semFull);
#endif
} else if (head >= tail) { // 2 blocks
ret = read(data->file, data->buffer + head, maxSize - head);
if (ret == 0) {
data->stats.input = true;
data->full = true;
#ifdef _WIN32
WaitForSingleObject(data->semFull, INFINITE);
#else
sem_wait(&data->semFull);
#endif
} else if (ret == -1) {
data->stats.error = true;
} else {
#ifdef _WIN32
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semInfo);
#endif
data->head += ret;
data->curSize += ret;
if (data->head == maxSize) {
data->head = 0;
}
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
}
} else { // 1 block
ret = read(data->file, data->buffer + head, tail - head);
if (ret == 0) {
data->stats.input = true;
data->full = true;
#ifdef _WIN32
WaitForSingleObject(data->semFull, INFINITE);
#else
sem_wait(&data->semFull);
#endif
} else if (ret == -1) {
data->stats.error = true;
} else {
#ifdef _WIN32
WaitForSingleObject(data->semInfo, INFINITE);
#else
sem_wait(&data->semInfo);
#endif
data->head += ret;
data->curSize += ret;
if (data->head == maxSize) {
data->head = 0;
}
#ifdef _WIN32
ReleaseSemaphore(data->semInfo, 1, NULL);
#else
sem_post(&data->semInfo);
#endif
}
}
#ifdef _WIN32
ReleaseSemaphore(data->semBuffer, 1, NULL);
#else
sem_post(&data->semBuffer);
#endif
}
return NULL;
}
# endif
#endif