aboutsummaryrefslogtreecommitdiffstats
path: root/zmqoutput.c
blob: cc8937e63c6643c4bcfb0c537ce6f56cdd651885 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include "zmqoutput.h"
#include <zmq.h>
#include <stdlib.h>
#include "common.h"

static void *zmq_context;

// Buffer containing at maximum one frame
unsigned char* zmqbuf;

// The current data length (smaller than allocated
// buffer size)
size_t zmqbuf_len;


int zmqoutput_open(Bit_stream_struc *bs, char* uri)
{
    zmq_context = zmq_ctx_new();
    bs->zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
    if (bs->zmq_sock == NULL) {
        fprintf(stderr, "Error occurred during zmq_socket: %s\n",
                zmq_strerror(errno));
        return -1;
    }
    if (zmq_connect(bs->zmq_sock, uri) != 0) {
        fprintf(stderr, "Error occurred during zmq_connect: %s\n",
                zmq_strerror(errno));
        return -1;
    }

    zmqbuf = (unsigned char*)malloc(bs->zmq_framesize);
    if (zmqbuf == NULL) {
        fprintf(stderr, "Unable to allocate ZMQ buffer\n");
        exit(0);
    }
    zmqbuf_len = 0;
    return 0;
}

int zmqoutput_write_byte(Bit_stream_struc *bs, unsigned char data)
{
    zmqbuf[zmqbuf_len++] = data;

    if (zmqbuf_len == bs->zmq_framesize) {

        int send_error = zmq_send(bs->zmq_sock, zmqbuf, bs->zmq_framesize,
                ZMQ_DONTWAIT);

        if (send_error < 0) {
            fprintf(stderr, "ZeroMQ send failed! %s\n", zmq_strerror(errno));
        }

        zmqbuf_len = 0;

        return bs->zmq_framesize;
    }

    return 0;

}

void zmqoutput_close(Bit_stream_struc *bs)
{
    if (bs->zmq_sock)
        zmq_close(bs->zmq_sock);

    if (zmq_context)
        zmq_ctx_destroy(zmq_context);

    if (zmqbuf) {
        free(zmqbuf);
        zmqbuf = NULL;
    }
}