aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2014-02-11 17:09:16 +0100
committerMatthias P. Braendli <matthias.braendli@mpb.li>2014-02-11 17:09:16 +0100
commit66d4072446dd0b68c6b250607c7020e0aafae4ee (patch)
treeb833efce4f4967edc58085994cf18382f4f41c17
parentd58099780dcd5c5260e9e5609f1ee0b1da247546 (diff)
downloadtoolame-dab-66d4072446dd0b68c6b250607c7020e0aafae4ee.tar.gz
toolame-dab-66d4072446dd0b68c6b250607c7020e0aafae4ee.tar.bz2
toolame-dab-66d4072446dd0b68c6b250607c7020e0aafae4ee.zip
keep bs->buf as is, create zmq specific buffer instead
-rw-r--r--Makefile3
-rw-r--r--bitstream.c65
-rw-r--r--common.h3
-rw-r--r--zmqoutput.c71
-rw-r--r--zmqoutput.h13
5 files changed, 105 insertions, 50 deletions
diff --git a/Makefile b/Makefile
index b8f7212..665387f 100644
--- a/Makefile
+++ b/Makefile
@@ -22,7 +22,8 @@ c_sources = \
tables.c \
availbits.c \
ath.c \
- encode_new.c
+ encode_new.c \
+ zmqoutput.c
OBJ = $(c_sources:.c=.o)
diff --git a/bitstream.c b/bitstream.c
index 02f1515..91573d8 100644
--- a/bitstream.c
+++ b/bitstream.c
@@ -1,7 +1,7 @@
#include <stdio.h>
#include <stdlib.h>
-#include <zmq.h>
#include <string.h>
+#include "zmqoutput.h"
#include "common.h"
#include "mem.h"
#include "bitstream.h"
@@ -104,52 +104,27 @@ int refill_buffer (Bit_stream_struc * bs)
/* empty the buffer to the output device when the buffer becomes full */
void empty_buffer (Bit_stream_struc * bs, int minimum)
{
- register int i;
+ int i;
if (bs->pt) {
for (i = bs->buf_size - 1; i >= minimum; i--)
fwrite (&bs->buf[i], sizeof (unsigned char), 1, bs->pt);
fflush (bs->pt); /* NEW SS to assist in debugging */
-
- for (i = minimum - 1; i >= 0; i--)
- bs->buf[bs->buf_size - minimum + i] = bs->buf[i];
-
- bs->buf_byte_idx = bs->buf_size - 1 - minimum;
- bs->buf_bit_idx = 8;
+ }
+ else if (bs->zmq_sock) {
+ for (i = bs->buf_size - 1; i >= minimum; i--)
+ zmqoutput_write_byte(bs, bs->buf[i]);
}
- if (bs->zmq_sock) {
- unsigned char outbuf[bs->zmq_framesize];
-
- int j = 0;
- for (i = bs->buf_size - 1; i >= minimum; i--) {
- outbuf[j++] = bs->buf[i];
- if (j >= bs->zmq_framesize)
- break;
- }
-
- if (j < bs->zmq_framesize) {
- fprintf(stderr, "not enough data in buffer ! j=%d, req'd %d",
- j, bs->zmq_framesize);
- }
-
- int send_error = zmq_send(bs->zmq_sock, outbuf, bs->zmq_framesize,
- ZMQ_DONTWAIT);
+ for (i = minimum - 1; i >= 0; i--)
+ bs->buf[bs->buf_size - minimum + i] = bs->buf[i];
- if (send_error < 0) {
- fprintf(stderr, "ZeroMQ send failed! %s\n", zmq_strerror(errno));
- }
-
- for (i = minimum - 1; i >= 0; i--)
- bs->buf[bs->buf_size - minimum + i] = bs->buf[i];
+ bs->buf_byte_idx = bs->buf_size - 1 - minimum;
+ bs->buf_bit_idx = 8;
- bs->buf_byte_idx = bs->buf_size - 1 - minimum;
- bs->buf_bit_idx = 8;
- }
}
-static void *zmq_context;
/* open the device to write the bit stream into it */
void open_bit_stream_w (Bit_stream_struc * bs, char *bs_filenam, int size)
@@ -159,20 +134,11 @@ void open_bit_stream_w (Bit_stream_struc * bs, char *bs_filenam, int size)
if (bs_filenam[0] == '-')
bs->pt = stdout;
else if (strncmp(bs_filenam, "tcp://", 4) == 0) {
- zmq_context = zmq_ctx_new();
- bs->zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
- if (bs->zmq_sock == NULL) {
- fprintf(stderr, "Error occurred during zmq_socket: %s\n",
- zmq_strerror(errno));
- abort();
+ if (zmqoutput_open(bs, bs_filenam) != 0) {
+ fprintf(stderr, "Could not initialise ZMQ\n");
+ exit(1);
}
- if (zmq_connect(bs->zmq_sock, bs_filenam) != 0) {
- fprintf(stderr, "Error occurred during zmq_connect: %s\n",
- zmq_strerror(errno));
- abort();
- }
-
- bs->pt = NULL;
+ bs->pt = NULL; // we're not using file output
}
else if ((bs->pt = fopen (bs_filenam, "wb")) == NULL) {
fprintf (stderr, "Could not create \"%s\".\n", bs_filenam);
@@ -192,7 +158,8 @@ void close_bit_stream_w (Bit_stream_struc * bs)
{
putbits (bs, 0, 7);
empty_buffer (bs, bs->buf_byte_idx + 1);
- fclose (bs->pt);
+ if (bs->pt) fclose(bs->pt);
+ zmqoutput_close(bs);
desalloc_buffer (bs);
}
diff --git a/common.h b/common.h
index c4fd303..aa50c4f 100644
--- a/common.h
+++ b/common.h
@@ -87,6 +87,9 @@
*
***********************************************************************/
+#include <stdio.h>
+#include <stdlib.h>
+
/* Structure for Reading Layer II Allocation Tables from File */
typedef struct
diff --git a/zmqoutput.c b/zmqoutput.c
new file mode 100644
index 0000000..5cf85ec
--- /dev/null
+++ b/zmqoutput.c
@@ -0,0 +1,71 @@
+#include "zmqoutput.h"
+#include <zmq.h>
+#include <stdlib.h>
+#include "common.h"
+
+static void *zmq_context;
+
+// Buffer containing at maximum one frame
+unsigned char* zmqbuf;
+
+// The current data length (smaller than allocated
+// buffer size)
+size_t zmqbuf_len;
+
+
+int zmqoutput_open(Bit_stream_struc *bs, char* uri)
+{
+ zmq_context = zmq_ctx_new();
+ bs->zmq_sock = zmq_socket(zmq_context, ZMQ_PUB);
+ if (bs->zmq_sock == NULL) {
+ fprintf(stderr, "Error occurred during zmq_socket: %s\n",
+ zmq_strerror(errno));
+ return -1;
+ }
+ if (zmq_connect(bs->zmq_sock, uri) != 0) {
+ fprintf(stderr, "Error occurred during zmq_connect: %s\n",
+ zmq_strerror(errno));
+ return -1;
+ }
+
+ zmqbuf = (unsigned char*)malloc(bs->zmq_framesize);
+ if (zmqbuf == NULL) {
+ fprintf(stderr, "Unable to allocate ZMQ buffer\n");
+ exit(0);
+ }
+ zmqbuf_len = 0;
+ return 0;
+}
+
+int zmqoutput_write_byte(Bit_stream_struc *bs, unsigned char data)
+{
+ zmqbuf[zmqbuf_len++] = data;
+
+ if (zmqbuf_len == bs->zmq_framesize) {
+
+ int send_error = zmq_send(bs->zmq_sock, zmqbuf, bs->zmq_framesize,
+ ZMQ_DONTWAIT);
+
+ if (send_error < 0) {
+ fprintf(stderr, "ZeroMQ send failed! %s\n", zmq_strerror(errno));
+ }
+
+ zmqbuf_len = 0;
+ }
+
+}
+
+void zmqoutput_close(Bit_stream_struc *bs)
+{
+ if (bs->zmq_sock)
+ zmq_close(bs->zmq_sock);
+
+ if (zmq_context)
+ zmq_ctx_destroy(zmq_context);
+
+ if (zmqbuf) {
+ free(zmqbuf);
+ zmqbuf = NULL;
+ }
+}
+
diff --git a/zmqoutput.h b/zmqoutput.h
new file mode 100644
index 0000000..8d7cb13
--- /dev/null
+++ b/zmqoutput.h
@@ -0,0 +1,13 @@
+#ifndef _ZMQOUTPUT_H_
+#define _ZMQOUTPUT_H_
+
+#include "common.h"
+
+int zmqoutput_open(Bit_stream_struc * bs, char* uri);
+
+int zmqoutput_write_byte(Bit_stream_struc *bs, unsigned char data);
+
+void zmqoutput_close(Bit_stream_struc *bs);
+
+#endif
+