summaryrefslogtreecommitdiffstats
path: root/src/InputReader.h
blob: ab45d4f47bade81776a1ad253fa89aaa09141ffb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/*
   Copyright (C) 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
   Her Majesty the Queen in Right of Canada (Communications Research
   Center Canada)

   Copyright (C) 2018
   Matthias P. Braendli, matthias.braendli@mpb.li

    http://www.opendigitalradio.org
 */
/*
   This file is part of ODR-DabMod.

   ODR-DabMod is free software: you can redistribute it and/or modify
   it under the terms of the GNU General Public License as
   published by the Free Software Foundation, either version 3 of the
   License, or (at your option) any later version.

   ODR-DabMod is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with ODR-DabMod.  If not, see <http://www.gnu.org/licenses/>.
 */

#pragma once

#ifdef HAVE_CONFIG_H
#   include "config.h"
#endif

#include <cstdio>
#include <vector>
#include <string>
#include <atomic>
#include <memory>
#include <thread>
#include <unistd.h>
#if defined(HAVE_ZEROMQ)
#  include "zmq.hpp"
#  include "ThreadsafeQueue.h"
#  include "RemoteControl.h"
#endif
#include "Log.h"
#include "Socket.h"
#define INVALID_SOCKET   -1

class InputReader
{
    public:
        // Put next frame into buffer. This function will never write more than
        // 6144 bytes into buffer.
        // returns number of bytes written to buffer, 0 on eof, -1 on error
        virtual int GetNextFrame(void* buffer) = 0;

        // Get some information
        virtual std::string GetPrintableInfo() const = 0;
};

class InputFileReader : public InputReader
{
    public:
        InputFileReader() = default;
        InputFileReader(const InputFileReader& other) = delete;
        InputFileReader& operator=(const InputFileReader& other) = delete;

        // open file and determine stream type
        // When loop=1, GetNextFrame will never return 0
        int Open(std::string filename, bool loop);

        // Print information about the file opened
        virtual std::string GetPrintableInfo() const override;
        virtual int GetNextFrame(void* buffer) override;

    private:
        int IdentifyType();

        // Rewind the file, and replay anew
        // returns 0 on success, -1 on failure
        int Rewind();

        bool loop_; // if shall we loop the file over and over
        std::string filename_;

        /* Known types of input streams. Description taken from the CRC
         * mmbTools forum. All values are are little-endian.  */
        enum class EtiStreamType {
            /* Not yet identified */
            None,

            /* Raw format is a bit-by-bit (but byte aligned on sync) recording
             * of a G.703 data stream. The padding is always present.
             * The raw format can also be referred to as ETI(NI, G.703) or ETI(NI).
             * Format:
                 for each frame:
                   uint8_t data[6144]
             */
            Raw,

            /* Streamed format is used for streamed applications. As the total
             * number of frames is unknown before end of transmission, the
             * corresponding field is removed. The padding can be removed from
             * data.
             * Format:
                 for each frame:
                   uint16_t frameSize
                   uint8_t data[frameSize]
             */
            Streamed,

            /* Framed format is used for file recording. It is the default format.
             * The padding can be removed from data.
             * Format:
                 uint32_t nbFrames
                 for each frame:
                   uint16_t frameSize
                   uint8_t data[frameSize]
             */
            Framed,
        };

        EtiStreamType streamtype_ = EtiStreamType::None;
        struct FILEDeleter{ void operator()(FILE* fd){ if(fd) fclose(fd);}};
        std::unique_ptr<FILE, FILEDeleter> inputfile_;

        size_t inputfilelength_ = 0;
        uint64_t nbframes_ = 0; // 64-bit because 32-bit overflow is
        // after 2**32 * 24ms ~= 3.3 years
};

class InputTcpReader : public InputReader
{
    public:
        // Endpoint is either host:port or tcp://host:port
        void Open(const std::string& endpoint);

        // Put next frame into buffer. This function will never write more than
        // 6144 bytes into buffer.
        // returns number of bytes written to buffer, 0 on eof, -1 on error
        virtual int GetNextFrame(void* buffer) override;

        virtual std::string GetPrintableInfo() const override;

    private:
        Socket::TCPClient m_tcpclient;
        std::string m_uri;
};

struct zmq_input_overflow : public std::exception
{
    const char* what () const throw ()
    {
        return "InputZMQ buffer overflow";
    }
};

#if defined(HAVE_ZEROMQ)
/* A ZeroMQ input. See www.zeromq.org for more info */

class InputZeroMQReader : public InputReader, public RemoteControllable
{
    public:
        InputZeroMQReader();
        InputZeroMQReader(const InputZeroMQReader& other) = delete;
        InputZeroMQReader& operator=(const InputZeroMQReader& other) = delete;
        ~InputZeroMQReader();

        int Open(const std::string& uri, size_t max_queued_frames);
        virtual int GetNextFrame(void* buffer) override;
        virtual std::string GetPrintableInfo() const override;

        /* Base function to set parameters. */
        virtual void set_parameter(
                const std::string& parameter,
                const std::string& value) override;

        /* Getting a parameter always returns a string. */
        virtual const std::string get_parameter(
                const std::string& parameter) const override;

    private:
        std::atomic<bool> m_running = ATOMIC_VAR_INIT(false);
        std::string m_uri;
        size_t m_max_queued_frames = 0;

        // Either must contain a full ETI frame, or one flag must be set
        struct message_t {
            std::vector<uint8_t> eti_frame;
            bool overflow = false;
            bool timeout = false;
            bool fault = false;
        };
        ThreadsafeQueue<message_t> m_in_messages;

        mutable std::mutex m_last_in_messages_size_mutex;
        size_t m_last_in_messages_size = 0;

        void RecvProcess(void);

        zmq::context_t m_zmqcontext; // is thread-safe
        std::thread m_recv_thread;
};

#endif