aboutsummaryrefslogtreecommitdiffstats
path: root/libtoolame-dab/zmqoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'libtoolame-dab/zmqoutput.c')
-rw-r--r--libtoolame-dab/zmqoutput.c120
1 files changed, 120 insertions, 0 deletions
diff --git a/libtoolame-dab/zmqoutput.c b/libtoolame-dab/zmqoutput.c
new file mode 100644
index 0000000..03007cc
--- /dev/null
+++ b/libtoolame-dab/zmqoutput.c
@@ -0,0 +1,120 @@
+#include "zmqoutput.h"
+#include <zmq.h>
+#include <stdlib.h>
+#include <string.h>
+#include "common.h"
+
+static void *zmq_context;
+
+// Buffer containing at maximum one frame
+unsigned char* zmqbuf;
+
+// The current data length (smaller than allocated
+// buffer size)
+size_t zmqbuf_len;
+
+static int zmq_peak_left = 0;
+static int zmq_peak_right = 0;
+
+void zmqoutput_set_peaks(int left, int right)
+{
+ zmq_peak_left = left;
+ zmq_peak_right = right;
+}
+
+int zmqoutput_open(Bit_stream_struc *bs, const char* uri_list)
+{
+ zmq_context = zmq_ctx_new();
+ bs->zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
+ if (bs->zmq_sock == NULL) {
+ fprintf(stderr, "Error occurred during zmq_socket: %s\n",
+ zmq_strerror(errno));
+ return -1;
+ }
+
+ char* uris = strdup(uri_list);
+ char* saveptr = NULL;
+
+ for (; ; uris = NULL) {
+ char* uri = strtok_r(uris, ";", &saveptr);
+
+
+ if (uri) {
+ fprintf(stderr, "Connecting ZMQ to %s\n", uri);
+ if (zmq_connect(bs->zmq_sock, uri) != 0) {
+ fprintf(stderr, "Error occurred during zmq_connect: %s\n",
+ zmq_strerror(errno));
+ free(uris);
+ return -1;
+ }
+ }
+ else {
+ break;
+ }
+ }
+
+ free(uris);
+
+ zmqbuf = (unsigned char*)malloc(bs->zmq_framesize);
+ if (zmqbuf == NULL) {
+ fprintf(stderr, "Unable to allocate ZMQ buffer\n");
+ exit(0);
+ }
+ zmqbuf_len = 0;
+ return 0;
+}
+
+int zmqoutput_write_byte(Bit_stream_struc *bs, unsigned char data)
+{
+ zmqbuf[zmqbuf_len++] = data;
+
+ if (zmqbuf_len == bs->zmq_framesize) {
+
+ int frame_length = sizeof(struct zmq_frame_header) + zmqbuf_len;
+
+ struct zmq_frame_header* header =
+ malloc(frame_length);
+
+ uint8_t* txframe = ((uint8_t*)header) + sizeof(struct zmq_frame_header);
+
+ header->version = 1;
+ header->encoder = ZMQ_ENCODER_TOOLAME;
+ header->datasize = zmqbuf_len;
+ header->audiolevel_left = zmq_peak_left;
+ header->audiolevel_right = zmq_peak_right;
+
+ memcpy(txframe, zmqbuf, zmqbuf_len);
+
+ int send_error = zmq_send(bs->zmq_sock, header, frame_length,
+ ZMQ_DONTWAIT);
+
+ free(header);
+ header = NULL;
+
+ if (send_error < 0) {
+ fprintf(stderr, "ZeroMQ send failed! %s\n", zmq_strerror(errno));
+ }
+
+ zmqbuf_len = 0;
+
+ return bs->zmq_framesize;
+ }
+
+ return 0;
+
+}
+
+void zmqoutput_close(Bit_stream_struc *bs)
+{
+ if (bs->zmq_sock)
+ zmq_close(bs->zmq_sock);
+
+ if (zmq_context)
+ zmq_ctx_destroy(zmq_context);
+
+ if (zmqbuf) {
+ free(zmqbuf);
+ zmqbuf = NULL;
+ }
+}
+