aboutsummaryrefslogtreecommitdiffstats
path: root/zmqoutput.c
blob: 75a35437ec577d979dea06488235cf9d2939683d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include "zmqoutput.h"
#include <zmq.h>
#include <stdlib.h>
#include <string.h>
#include "common.h"

static void *zmq_context;

// Buffer containing at maximum one frame
unsigned char* zmqbuf;

// The current data length (smaller than allocated
// buffer size)
size_t zmqbuf_len;

static int zmq_peak_left = 0;
static int zmq_peak_right = 0;

void zmqoutput_set_peaks(int left, int right)
{
    zmq_peak_left = left;
    zmq_peak_right = right;
}

int zmqoutput_open(Bit_stream_struc *bs, char* uri)
{
    zmq_context = zmq_ctx_new();
    bs->zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
    if (bs->zmq_sock == NULL) {
        fprintf(stderr, "Error occurred during zmq_socket: %s\n",
                zmq_strerror(errno));
        return -1;
    }
    if (zmq_connect(bs->zmq_sock, uri) != 0) {
        fprintf(stderr, "Error occurred during zmq_connect: %s\n",
                zmq_strerror(errno));
        return -1;
    }

    zmqbuf = (unsigned char*)malloc(bs->zmq_framesize);
    if (zmqbuf == NULL) {
        fprintf(stderr, "Unable to allocate ZMQ buffer\n");
        exit(0);
    }
    zmqbuf_len = 0;
    return 0;
}

int zmqoutput_write_byte(Bit_stream_struc *bs, unsigned char data)
{
    zmqbuf[zmqbuf_len++] = data;

    if (zmqbuf_len == bs->zmq_framesize) {

        int frame_length = sizeof(struct zmq_frame_header) + zmqbuf_len;

        struct zmq_frame_header* header =
            malloc(frame_length);

        uint8_t* txframe = ((uint8_t*)header) + sizeof(struct zmq_frame_header);

        header->version          = 1;
        header->encoder          = ZMQ_ENCODER_TOOLAME;
        header->datasize         = zmqbuf_len;
        header->audiolevel_left  = 0;
        header->audiolevel_right = 0;

        memcpy(txframe, zmqbuf, zmqbuf_len);

        int send_error = zmq_send(bs->zmq_sock, txframe, frame_length,
                ZMQ_DONTWAIT);

        if (send_error < 0) {
            fprintf(stderr, "ZeroMQ send failed! %s\n", zmq_strerror(errno));
        }

        zmqbuf_len = 0;

        return bs->zmq_framesize;
    }

    return 0;

}

void zmqoutput_close(Bit_stream_struc *bs)
{
    if (bs->zmq_sock)
        zmq_close(bs->zmq_sock);

    if (zmq_context)
        zmq_ctx_destroy(zmq_context);

    if (zmqbuf) {
        free(zmqbuf);
        zmqbuf = NULL;
    }
}