/*
   Copyright (C) 2009 Her Majesty the Queen in Right of Canada (Communications
   Research Center Canada)
   */
/*
   This file is part of ODR-DabMux.

   ODR-DabMux is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as
   published by the Free Software Foundation, either version 3 of the
   License, or (at your option) any later version.

   ODR-DabMux is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.
   */

#include "dabInputSlip.h"
#include "dabInputFifo.h"
#include "TcpServer.h"
#include "UdpSocket.h"
#include "bridge.h"

#include <signal.h>
#include <string.h>
#include <limits.h>


#ifdef _WIN32
#   include <io.h>

#   define sem_t HANDLE
#   define O_NONBLOCK 0
#else
#   include <semaphore.h>
#   define O_BINARY 0
#endif


struct dabInputSlipData {
    TcpServer* server;
    UdpPacket** packets;
    UdpPacket* buffer;
    bridgeInfo* info;
    dabInputFifoStats stats;
    pthread_t thread;
    sem_t semWrite;
    sem_t semQueue;
    bool reading;
    volatile int nbPackets;
    volatile int queueSize;
    volatile int packetSize;
};


struct dabInputOperations dabInputSlipOperations = {
    dabInputSlipInit,
    dabInputSlipOpen,
    dabInputSlipSetbuf,
    dabInputSlipRead,
    NULL,
    NULL,
    dabInputSlipReadFrame,
    dabInputSetbitrate,
    dabInputSlipClose,
    dabInputSlipClean,
    NULL
};


int dabInputSlipInit(void** args)
{
    dabInputSlipData* data = new dabInputSlipData;
    memset(&data->stats, 0, sizeof(data->stats));
    data->stats.id = dabInputFifoData::nb++;
    data->server = new TcpServer();
    data->packetSize = 1500;
    data->queueSize = 10;
    data->packets = new UdpPacket*[data->queueSize];
    for (int i = 0; i < data->queueSize; ++i) {
        data->packets[i] = new UdpPacket(data->packetSize);
    }
    data->buffer = new UdpPacket(data->packetSize);
    data->nbPackets = 0;
    data->info = new bridgeInfo;
    data->thread = (pthread_t)NULL;
    bridgeInitInfo(data->info);

#ifdef _WIN32
    char semName[32];
    sprintf(semName, "semWrite%i", data->stats.id);
    data->semWrite = CreateSemaphore(NULL, 1, 1, semName);
    if (data->semWrite == NULL) {
        fprintf(stderr, "Can't init SLIP data write semaphore %s\n", semName);
        return -1;
    }
    sprintf(semName, "semQueue%i", data->stats.id);
    data->semQueue = CreateSemaphore(NULL, 1, 1, semName);
    if (data->semQueue == NULL) {
        fprintf(stderr, "Can't init SLIP data index semaphore %s\n", semName);
        return -1;
    }
#else
    if (sem_init(&data->semWrite, 0, data->queueSize) == -1) {
        perror("Can't init SLIP data write semaphore");
        return -1;
    }
    if (sem_init(&data->semQueue, 0, 1) == -1) {
        perror("Can't init SLIP data index semaphore");
        return -1;
    }
#endif
    data->reading = false;

    *args = data;
    return 0;
}


void* dabInputSlipThread(void* args)
{
    dabInputSlipData* data = (dabInputSlipData*)args;
    TcpSocket* client;

    while ((client = data->server->accept()) != NULL) {
        int size = 0;
        etiLog.log(info, "SLIP server got a new client.\n");

#ifdef _WIN32
        WaitForSingleObject(data->semWrite, INFINITE);
                WaitForSingleObject(data->semQueue, INFINITE);
#else
        sem_wait(&data->semWrite);
        sem_wait(&data->semQueue);
#endif
        UdpPacket* packet = data->packets[data->nbPackets];
#ifdef _WIN32
        ReleaseSemaphore(data->semQueue, 1, NULL);
#else
        sem_post(&data->semQueue);
#endif

        while ((size = client->read(packet->getData(), packet->getSize()))
                    > 0) {
            packet->setLength(size);
#ifdef _WIN32
            WaitForSingleObject(data->semQueue, INFINITE);
#else
            sem_wait(&data->semQueue);
#endif
            data->nbPackets++;
#ifdef _WIN32
            ReleaseSemaphore(data->semQueue, 1, NULL);
#else
            sem_post(&data->semQueue);
#endif
            
#ifdef _WIN32
            WaitForSingleObject(data->semWrite, INFINITE);
            WaitForSingleObject(data->semQueue, INFINITE);
#else
            sem_wait(&data->semWrite);
            sem_wait(&data->semQueue);
#endif
            packet = data->packets[data->nbPackets];
#ifdef _WIN32
            ReleaseSemaphore(data->semQueue, 1, NULL);
#else
            sem_post(&data->semQueue);
#endif
        }
        etiLog.log(info, "SLIP server client deconnected.\n");
        client->close();
    }
    etiLog.log(error, "SLIP thread can't accept new client (%s)\n",
            inetErrDesc, inetErrMsg);

    return NULL;
}


int dabInputSlipOpen(void* args, const char* inputName)
{
    const char* address;
    long port;
    address = strchr(inputName, ':');
    if (address == NULL) {
        etiLog.log(error, "\"%s\" SLIP address format is invalid: "
                "should be [address]:port - > aborting\n", inputName);
        return -1;
    }
    ++address;
    port = strtol(address, (char **)NULL, 10);
    if ((port == LONG_MIN) || (port == LONG_MAX)) {
        etiLog.log(error, "can't convert port number in SLIP address %s\n",
                address);
        return -1;
    }
    if (port == 0) {
        etiLog.log(error, "can't use port number 0 in SLIP address\n");
        return -1;
    }
    dabInputSlipData* data = (dabInputSlipData*)args;
    if (data->server->create(port) == -1) {
        etiLog.log(error, "can't set port %i on SLIP input (%s: %s)\n",
                port, inetErrDesc, inetErrMsg);
        return -1;
    }

    if (data->server->listen() == -1) {
        etiLog.log(error, "can't listen on SLIP socket(%s: %s)\n",
                inetErrDesc, inetErrMsg);
        return -1;
    }
#ifdef _WIN32
    data->thread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)dabInputSlipThread, data, 0, NULL);
    if (data->thread == NULL) {
        fprintf(stderr, "Can't create SLIP child");
        return -1;
    }
#else
    if (pthread_create(&data->thread, NULL, dabInputSlipThread, data)) {
        perror("Can't create SLIP child");
        return -1;
    }
#endif

    etiLog.log(debug, "check return code of create\n");
    return 0;
}


int dabInputSlipSetbuf(void* args, int size)
{
    dabInputSlipData* data = (dabInputSlipData*)args;

    if (size <= 10) {
        return -1;
    }

    data->packetSize = size - 10;
    if (data->packets == NULL) {
        data->packets = new UdpPacket*[data->queueSize];
    }
    for (int i = 0; i < data->queueSize; ++i) {
        if (data->packets[i] == NULL) {
            data->packets[i] = new UdpPacket(data->packetSize);
        } else {
            data->packets[i]->setSize(data->packetSize);
        }
    }
    if (data->buffer == NULL) {
        data->buffer = new UdpPacket(data->packetSize);
    } else {
        data->buffer->setSize(data->packetSize);
    }

    return 0;
}


int dabInputSlipRead(void* args, void* buffer, int size)
{
    dabInputSlipData* data = (dabInputSlipData*)args;

    if (data->nbPackets > 0) {  // data ready
        UdpPacket* temp;
        temp = data->buffer;
        data->buffer = data->packets[0];

#ifdef _WIN32
        WaitForSingleObject(data->semQueue, INFINITE);
#else
        sem_wait(&data->semQueue);
#endif
        for (int i = 1; i < data->queueSize; ++i) {
            data->packets[i - 1] = data->packets[i];
        }
        data->packets[data->queueSize - 1] = temp;
        --data->nbPackets;
#ifdef _WIN32
        ReleaseSemaphore(data->semQueue, 1, NULL);
        ReleaseSemaphore(data->semWrite, 1, NULL);
#else
        sem_post(&data->semQueue);
        sem_post(&data->semWrite);
#endif
    } else {
        data->buffer->setLength(0);
    }

    return data->buffer->getLength();
}


int dabInputSlipReadFrame(dabInputOperations* ops, void* args, void* buffer, int size)
{
    int nbBytes = 0;
    dabInputSlipData* data = (dabInputSlipData*)args;
    dabInputFifoStats* stats = (dabInputFifoStats*)&data->stats;

#ifdef _WIN32
    WaitForSingleObject(data->semQueue, INFINITE);
#else
    sem_wait(&data->semQueue);
#endif
    stats->bufferRecords[stats->bufferCount].curSize = data->nbPackets;
    stats->bufferRecords[stats->bufferCount].maxSize = data->queueSize;
#ifdef _WIN32
    ReleaseSemaphore(data->semQueue, 1, NULL);
#else
    sem_post(&data->semQueue);
#endif
    if (++stats->bufferCount == NB_RECORDS) {
        etiLog.log(info, "SLIP buffer state: (%i)", stats->id);
        for (int i = 0; i < stats->bufferCount; ++i) {
            etiLog.log(info, " %i/%i",
                    stats->bufferRecords[i].curSize,
                    stats->bufferRecords[i].maxSize);
        }
        etiLog.log(info, "\n");

        stats->bufferCount = 0;
    }

    data->stats.frameRecords[data->stats.frameCount].curSize = 0;
    data->stats.frameRecords[data->stats.frameCount].maxSize = size;

    if (data->buffer->getLength() == 0) {
        ops->read(args, NULL, 0);
    }
    while ((nbBytes = writePacket(data->buffer->getData(),
                    data->buffer->getLength(), buffer, size, data->info))
            != 0) {
        data->stats.frameRecords[data->stats.frameCount].curSize = nbBytes;
        ops->read(args, NULL, 0);
    }

    if (data->buffer->getLength() != 0) {
        data->stats.frameRecords[data->stats.frameCount].curSize = size;
    }

    if (++stats->frameCount == NB_RECORDS) {
        etiLog.log(info, "Data subchannel usage: (%i)",
                stats->id);
        for (int i = 0; i < stats->frameCount; ++i) {
            etiLog.log(info, " %i/%i",
                    stats->frameRecords[i].curSize,
                    stats->frameRecords[i].maxSize);
        }
        etiLog.log(info, "\n");
        stats->frameCount = 0;
    }
    return size;
}


int dabInputSlipClose(void* args)
{
    dabInputSlipData* data = (dabInputSlipData*)args;
    data->server->close();
#ifdef WIN32
    DWORD status;
    for (int i = 0; i < 5; ++i) {
        if (GetExitCodeThread(data->thread, &status)) {
            break;
        }
        Sleep(100);
    }
    TerminateThread(data->thread, 1);
#else
    if (data->thread != (pthread_t)NULL) {
        pthread_kill(data->thread, SIGPIPE);
    }
#endif
    return 0;
}


int dabInputSlipClean(void** args)
{
    dabInputSlipData* data = (dabInputSlipData*)(*args);
#ifdef _WIN32
    CloseHandle(data->thread);
    CloseHandle(data->semWrite);
    CloseHandle(data->semQueue);
#else
    sem_destroy(&data->semWrite);
    sem_destroy(&data->semQueue);
#endif
    for (int i = 0; i < data->queueSize; ++i) {
        if (data->packets[i] != NULL) {
            delete data->packets[i];
        }
    }
    delete []data->packets;
    delete data->buffer;
    delete data->server;
    delete data->info;
    delete data;
    return 0;
}