aboutsummaryrefslogtreecommitdiffstats
path: root/libtoolame-dab/zmqoutput.c
blob: 03007ccdb7b3b09b02efb1dbdb1db3e5b9898ddb (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include "zmqoutput.h"
#include <zmq.h>
#include <stdlib.h>
#include <string.h>
#include "common.h"

static void *zmq_context;

// Buffer containing at maximum one frame
unsigned char* zmqbuf;

// The current data length (smaller than allocated
// buffer size)
size_t zmqbuf_len;

static int zmq_peak_left = 0;
static int zmq_peak_right = 0;

void zmqoutput_set_peaks(int left, int right)
{
    zmq_peak_left = left;
    zmq_peak_right = right;
}

int zmqoutput_open(Bit_stream_struc *bs, const char* uri_list)
{
    zmq_context = zmq_ctx_new();
    bs->zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
    if (bs->zmq_sock == NULL) {
        fprintf(stderr, "Error occurred during zmq_socket: %s\n",
                zmq_strerror(errno));
        return -1;
    }

    char* uris = strdup(uri_list);
    char* saveptr = NULL;

    for (; ; uris = NULL) {
        char* uri = strtok_r(uris, ";", &saveptr);


        if (uri) {
            fprintf(stderr, "Connecting ZMQ to %s\n", uri);
            if (zmq_connect(bs->zmq_sock, uri) != 0) {
                fprintf(stderr, "Error occurred during zmq_connect: %s\n",
                        zmq_strerror(errno));
                free(uris);
                return -1;
            }
        }
        else {
            break;
        }
    }

    free(uris);

    zmqbuf = (unsigned char*)malloc(bs->zmq_framesize);
    if (zmqbuf == NULL) {
        fprintf(stderr, "Unable to allocate ZMQ buffer\n");
        exit(0);
    }
    zmqbuf_len = 0;
    return 0;
}

int zmqoutput_write_byte(Bit_stream_struc *bs, unsigned char data)
{
    zmqbuf[zmqbuf_len++] = data;

    if (zmqbuf_len == bs->zmq_framesize) {

        int frame_length = sizeof(struct zmq_frame_header) + zmqbuf_len;

        struct zmq_frame_header* header =
            malloc(frame_length);

        uint8_t* txframe = ((uint8_t*)header) + sizeof(struct zmq_frame_header);

        header->version          = 1;
        header->encoder          = ZMQ_ENCODER_TOOLAME;
        header->datasize         = zmqbuf_len;
        header->audiolevel_left  = zmq_peak_left;
        header->audiolevel_right = zmq_peak_right;

        memcpy(txframe, zmqbuf, zmqbuf_len);

        int send_error = zmq_send(bs->zmq_sock, header, frame_length,
                ZMQ_DONTWAIT);

        free(header);
        header = NULL;

        if (send_error < 0) {
            fprintf(stderr, "ZeroMQ send failed! %s\n", zmq_strerror(errno));
        }

        zmqbuf_len = 0;

        return bs->zmq_framesize;
    }

    return 0;

}

void zmqoutput_close(Bit_stream_struc *bs)
{
    if (bs->zmq_sock)
        zmq_close(bs->zmq_sock);

    if (zmq_context)
        zmq_ctx_destroy(zmq_context);

    if (zmqbuf) {
        free(zmqbuf);
        zmqbuf = NULL;
    }
}