1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
|
#include "zmqoutput.h"
#include <zmq.h>
#include <stdlib.h>
#include <string.h>
#include "common.h"
static void *zmq_context;
// Buffer containing at maximum one frame
unsigned char* zmqbuf;
// The current data length (smaller than allocated
// buffer size)
size_t zmqbuf_len;
static int zmq_peak_left = 0;
static int zmq_peak_right = 0;
void zmqoutput_set_peaks(int left, int right)
{
zmq_peak_left = left;
zmq_peak_right = right;
}
int zmqoutput_open(Bit_stream_struc *bs, const char* uri_list)
{
zmq_context = zmq_ctx_new();
bs->zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
if (bs->zmq_sock == NULL) {
fprintf(stderr, "Error occurred during zmq_socket: %s\n",
zmq_strerror(errno));
return -1;
}
char* uris = strdup(uri_list);
char* saveptr = NULL;
for (; ; uris = NULL) {
char* uri = strtok_r(uris, ";", &saveptr);
if (uri) {
fprintf(stderr, "Connecting ZMQ to %s\n", uri);
if (zmq_connect(bs->zmq_sock, uri) != 0) {
fprintf(stderr, "Error occurred during zmq_connect: %s\n",
zmq_strerror(errno));
free(uris);
return -1;
}
}
else {
break;
}
}
free(uris);
zmqbuf = (unsigned char*)malloc(bs->zmq_framesize);
if (zmqbuf == NULL) {
fprintf(stderr, "Unable to allocate ZMQ buffer\n");
exit(0);
}
zmqbuf_len = 0;
return 0;
}
int zmqoutput_write_byte(Bit_stream_struc *bs, unsigned char data)
{
zmqbuf[zmqbuf_len++] = data;
if (zmqbuf_len == bs->zmq_framesize) {
int frame_length = sizeof(struct zmq_frame_header) + zmqbuf_len;
struct zmq_frame_header* header =
malloc(frame_length);
uint8_t* txframe = ((uint8_t*)header) + sizeof(struct zmq_frame_header);
header->version = 1;
header->encoder = ZMQ_ENCODER_TOOLAME;
header->datasize = zmqbuf_len;
header->audiolevel_left = zmq_peak_left;
header->audiolevel_right = zmq_peak_right;
memcpy(txframe, zmqbuf, zmqbuf_len);
int send_error = zmq_send(bs->zmq_sock, header, frame_length,
ZMQ_DONTWAIT);
free(header);
header = NULL;
if (send_error < 0) {
fprintf(stderr, "ZeroMQ send failed! %s\n", zmq_strerror(errno));
}
zmqbuf_len = 0;
return bs->zmq_framesize;
}
return 0;
}
void zmqoutput_close(Bit_stream_struc *bs)
{
if (bs->zmq_sock)
zmq_close(bs->zmq_sock);
if (zmq_context)
zmq_ctx_destroy(zmq_context);
if (zmqbuf) {
free(zmqbuf);
zmqbuf = NULL;
}
}
|