/*
   Copyright (C) 2004, 2005, 2006, 2007, 2008, 2009 Her Majesty the Queen in
   Right of Canada (Communications Research Center Canada)

   Copyright (C) 2019
   Matthias P. Braendli, matthias.braendli@mpb.li

    http://www.opendigitalradio.org
   */
/*
   This file is part of ODR-DabMux.

   ODR-DabMux is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as
   published by the Free Software Foundation, either version 3 of the
   License, or (at your option) any later version.

   ODR-DabMux is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with ODR-DabMux.  If not, see <http://www.gnu.org/licenses/>.
   */

#ifndef _TCPSOCKET
#define _TCPSOCKET

#ifdef HAVE_CONFIG_H
#   include "config.h"
#endif

#include "InetAddress.h"
#include "ThreadsafeQueue.h"
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <netdb.h>
#include <arpa/inet.h>
#include <pthread.h>
#define SOCKET           int
#define INVALID_SOCKET   -1
#define SOCKET_ERROR     -1
#define reuseopt_t       int

#include <iostream>
#include <string>
#include <vector>
#include <memory>
#include <atomic>
#include <thread>
#include <list>

/**
 *  This class represents a TCP socket.
 */
class TcpSocket
{
    public:
        /** Create a new socket that does nothing */
        TcpSocket();

        /** Create a new socket listening for incoming connections.
         *  @param port The port number on which the socket will listen.
         *  @param name The IP address on which the socket will be bound.
         *              It is used to bind the socket on a specific interface if
         *              the computer have many NICs.
         */
        TcpSocket(int port, const std::string& name);
        ~TcpSocket();
        TcpSocket(TcpSocket&& other);
        TcpSocket& operator=(TcpSocket&& other);
        TcpSocket(const TcpSocket& other) = delete;
        TcpSocket& operator=(const TcpSocket& other) = delete;

        bool isValid(void);

        int close(void);

        /** Send data over the TCP connection.
         *  @param data The buffer that will be sent.
         *  @param size Number of bytes to send.
         *  @param timeout_ms number of milliseconds before timeout, or 0 for infinite timeout
         *  return number of bytes sent, 0 on timeout, or throws runtime_error.
         */
        ssize_t send(const void* data, size_t size, int timeout_ms=0);

        /** Receive data from the socket.
         *  @param data The buffer that will receive data.
         *  @param size The buffer size.
         *  @return number of bytes received or -1 (SOCKET_ERROR) if error
         */
        ssize_t recv(void* data, size_t size);

        void listen(void);
        TcpSocket accept(void);

        /* Returns either valid socket if a connection was
         * accepted before the timeout expired, or an invalid
         * socket otherwise.
         */
        TcpSocket accept(int timeout_ms);

        /** Retrieve address this socket is bound to */
        InetAddress getOwnAddress() const;
        InetAddress getRemoteAddress() const;

    private:
        TcpSocket(SOCKET sock, InetAddress own, InetAddress remote);

        /// The address on which the socket is bound.
        InetAddress m_own_address;
        InetAddress m_remote_address;
        /// The low-level socket used by system functions.
        SOCKET m_sock;
};

/* Helper class for TCPDataDispatcher, contains a queue of pending data and
 * a sender thread. */
class TCPConnection
{
    public:
        TCPConnection(TcpSocket&& sock);
        TCPConnection(const TCPConnection&) = delete;
        TCPConnection& operator=(const TCPConnection&) = delete;
        ~TCPConnection();

        ThreadsafeQueue<std::vector<uint8_t> > queue;

    private:
        std::atomic<bool> m_running;
        std::thread m_sender_thread;
        TcpSocket m_sock;

        void process(void);
};

/* Send a TCP stream to several destinations, and automatically disconnect destinations
 * whose buffer overflows.
 */
class TCPDataDispatcher
{
    public:
        TCPDataDispatcher(size_t max_queue_size);
        ~TCPDataDispatcher();
        TCPDataDispatcher(const TCPDataDispatcher&) = delete;
        TCPDataDispatcher& operator=(const TCPDataDispatcher&) = delete;

        void start(int port, const std::string& address);
        void write(const std::vector<uint8_t>& data);

    private:
        void process(void);

        size_t m_max_queue_size;

        std::atomic<bool> m_running;
        std::thread m_listener_thread;
        TcpSocket m_listener_socket;
        std::list<TCPConnection> m_connections;
};

#endif // _TCPSOCKET