summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/AlsaDabplus.cpp80
-rw-r--r--src/SampleQueue.h102
2 files changed, 119 insertions, 63 deletions
diff --git a/src/AlsaDabplus.cpp b/src/AlsaDabplus.cpp
index 50dcddc..103a4b2 100644
--- a/src/AlsaDabplus.cpp
+++ b/src/AlsaDabplus.cpp
@@ -245,7 +245,7 @@ int main(int argc, char *argv[]) {
uint8_t input_buf[input_size];
- int max_size = input_size + NUM_SAMPLES_PER_CALL;
+ int max_size = 2*input_size + NUM_SAMPLES_PER_CALL;
SampleQueue<uint8_t> queue(BYTES_PER_SAMPLE, channels, max_size);
@@ -266,12 +266,6 @@ int main(int argc, char *argv[]) {
fprintf(stderr, "Start ALSA capture thread\n");
alsa_in.start();
- fprintf(stderr, "Starting queue preroll\n");
- // Preroll until queue full
- while (queue.size() < input_size) {
- usleep(1000);
- }
-
int outbuf_size = subchannel_index*120;
uint8_t outbuf[20480];
@@ -282,10 +276,10 @@ int main(int argc, char *argv[]) {
fprintf(stderr, "Starting encoding\n");
int send_error_count = 0;
- struct timespec tp;
- clock_gettime(CLOCK_MONOTONIC, &tp);
+ struct timespec tp_next;
+ clock_gettime(CLOCK_MONOTONIC, &tp_next);
- int calls;
+ int calls = 0; // for checking
while (1) {
int in_identifier = IN_AUDIO_DATA;
int out_identifier = OUT_BITSTREAM_DATA;
@@ -297,20 +291,53 @@ int main(int argc, char *argv[]) {
int in_size, in_elem_size;
int out_size, out_elem_size;
+
+ // -------------- wait the right amount of time
+ struct timespec tp_now;
+ clock_gettime(CLOCK_MONOTONIC, &tp_now);
+
+ unsigned long time_now = (1000000000ul * tp_now.tv_sec) +
+ tp_now.tv_nsec;
+ unsigned long time_next = (1000000000ul * tp_next.tv_sec) +
+ tp_next.tv_nsec;
+
+ const unsigned long wait_time = 120000000ul / 2;
+
+ unsigned long waiting = wait_time - (time_now - time_next);
+ if ((time_now - time_next) < wait_time) {
+ //printf("Sleep %zuus\n", waiting / 1000);
+ usleep(waiting / 1000);
+ }
+
+ // Move our time_counter 60ms into the future.
+ // The encoder needs two calls for one frame
+ tp_next.tv_nsec += wait_time;
+ if (tp_next.tv_nsec > 1000000000L) {
+ tp_next.tv_nsec -= 1000000000L;
+ tp_next.tv_sec += 1;
+ }
+
+
+ // -------------- Read Data
memset(outbuf, 0x00, outbuf_size);
- int read = queue.pop(input_buf, input_size);
+ size_t overruns;
+ size_t read = queue.pop(input_buf, input_size, &overruns); // returns bytes
if (read != input_size) {
- fprintf(stderr, "Short read\n");
+ fprintf(stderr, "U");
+ }
+
+ if (overruns) {
+ fprintf(stderr, "O%zu", overruns);
}
// -------------- AAC Encoding
in_ptr = input_buf;
- in_size = read;
+ in_size = (int)read;
in_elem_size = BYTES_PER_SAMPLE;
- in_args.numInSamples = input_size;
+ in_args.numInSamples = input_size/BYTES_PER_SAMPLE;
in_buf.numBufs = 1;
in_buf.bufs = &in_ptr;
in_buf.bufferIdentifiers = &in_identifier;
@@ -334,14 +361,15 @@ int main(int argc, char *argv[]) {
fprintf(stderr, "Encoding failed\n");
break;
}
-
calls++;
/* Check if the encoder has generated output data */
if (out_args.numOutBytes != 0)
{
- fprintf(stderr, "data out after %d calls\n", calls);
+ // Our timing code depends on this
+ assert (calls == 2);
calls = 0;
+
// ----------- RS encoding
int row, col;
unsigned char buf_to_rs_enc[110];
@@ -377,26 +405,6 @@ int main(int argc, char *argv[]) {
if (out_args.numOutBytes + row*10 == outbuf_size)
fprintf(stderr, ".");
}
-
- // -------------- wait the right amount of time
- tp.tv_nsec += 60000000;
- if (tp.tv_nsec > 1000000000L) {
- tp.tv_nsec -= 1000000000L;
- tp.tv_sec += 1;
- }
-
- fprintf(stderr, "sleep %ld.%ld\n", tp.tv_sec, tp.tv_nsec);
- struct timespec tp_now;
- do {
- usleep(1000);
- clock_gettime(CLOCK_MONOTONIC, &tp_now);
- } while (tp_now.tv_sec < tp.tv_sec ||
- ( tp_now.tv_sec == tp.tv_sec &&
- tp_now.tv_nsec < tp.tv_nsec) );
-
- tp.tv_sec = tp_now.tv_sec;
- tp.tv_nsec = tp_now.tv_nsec;
-
}
zmq_sock.close();
diff --git a/src/SampleQueue.h b/src/SampleQueue.h
index eabc301..778526e 100644
--- a/src/SampleQueue.h
+++ b/src/SampleQueue.h
@@ -9,6 +9,8 @@
#ifndef _SAMPLE_QUEUE_H_
#define _SAMPLE_QUEUE_H_
+#define DEBUG_SAMPLE_QUEUE 0
+
#include <boost/thread.hpp>
#include <queue>
@@ -18,10 +20,23 @@
* that pushes elements into the queue, and one consumer that
* retrieves the elements.
*
- * The queue can make the consumer block until enough elements
- * are available.
+ * This queue should contain audio sample data, interleaved L/R
+ * form, 2bytes per sample. Therefore, the push and pop functions
+ * should always place or retrieve data in multiples of
+ * bytes_per_sample * number_of_channels
+ *
+ * The queue has a maximum size. If this size is reached, push()
+ * ignores new data.
+ *
+ * If pop() is called but there is not enough data in the queue,
+ * the missing samples are replaced by zeros. pop() will always
+ * write the requested length.
*/
+
+/* The template is actually not really tested for anything else
+ * than uint8_t
+ */
template<typename T>
class SampleQueue
{
@@ -30,7 +45,9 @@ public:
unsigned int channels,
size_t max_size) :
m_bytes_per_sample(bytes_per_sample),
- m_channels(channels), m_max_size(max_size) {}
+ m_channels(channels),
+ m_max_size(max_size),
+ m_overruns(0) {}
/* Push a bunch of samples into the buffer */
@@ -38,10 +55,18 @@ public:
{
boost::mutex::scoped_lock lock(m_mutex);
+ assert(len % (m_channels * m_bytes_per_sample) == 0);
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## push %s %zu, %zu >= %zu\n",
+ (m_queue.size() >= m_max_size) ? "overrun" : "ok",
+ len / 4,
+ m_queue.size() / 4,
+ m_max_size / 4);
+#endif
+
if (m_queue.size() >= m_max_size) {
- /*fprintf(stderr, "######## push overrun %zu, %zu\n",
- len,
- m_queue.size()); // */
+ m_overruns++;
return 0;
}
@@ -50,9 +75,6 @@ public:
}
size_t new_size = m_queue.size();
- lock.unlock();
-
- //m_condition_variable.notify_one();
return new_size;
}
@@ -69,14 +91,36 @@ public:
*/
size_t pop(T* buf, size_t len)
{
+ size_t ovr;
+ return pop(buf, len, ovr);
+ }
+
+ /* Get len elements, place them into the buf array.
+ * Also update the overrun variable with the information
+ * of how many overruns we saw since the last pop.
+ * Returns the number of elements it was able to take
+ * from the queue
+ */
+ size_t pop(T* buf, size_t len, size_t* overruns)
+ {
boost::mutex::scoped_lock lock(m_mutex);
- fprintf(stderr, "######## pop %zu (%zu)\n",
- len,
- m_queue.size());
+
+ assert(len % (m_channels * m_bytes_per_sample) == 0);
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "######## pop %zu (%zu), %zu overruns: ",
+ len / 4,
+ m_queue.size() / 4,
+ m_overruns);
+#endif
+ *overruns = m_overruns;
+ m_overruns = 0;
size_t ret = 0;
if (m_queue.size() < len) {
+ /* Not enough data in queue, fill with zeros */
+
size_t i;
for (i = 0; i < m_queue.size(); i++) {
buf[i] = m_queue[i];
@@ -89,8 +133,16 @@ public:
}
m_queue.resize(0);
+
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "after short pop %zu (%zu)\n",
+ len / 4,
+ m_queue.size() / 4);
+#endif
}
else {
+ /* Queue contains enough data */
+
for (size_t i = 0; i < len; i++) {
buf[i] = m_queue[i];
}
@@ -98,33 +150,29 @@ public:
ret = len;
m_queue.erase(m_queue.begin(), m_queue.begin() + len);
- }
-
- return ret;
- }
- /*
- void wait_and_pop(T& popped_value)
- {
- boost::mutex::scoped_lock lock(m_mutex);
- while(m_queue.size() < m_required_size)
- {
- m_condition_variable.wait(lock);
+#if DEBUG_SAMPLE_QUEUE
+ fprintf(stdout, "after ok pop %zu (%zu)\n",
+ len / 4,
+ m_queue.size() / 4);
+#endif
}
- popped_value = m_queue.front();
- m_queue.pop_front();
+ return ret;
}
- */
private:
std::deque<T> m_queue;
mutable boost::mutex m_mutex;
- //boost::condition_variable m_condition_variable;
unsigned int m_channels;
unsigned int m_bytes_per_sample;
size_t m_max_size;
+
+ /* Counter to keep track of number of overruns between calls
+ * to pop()
+ */
+ size_t m_overruns;
};
#endif