summaryrefslogtreecommitdiffstats
path: root/src/dabInputZmq.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/dabInputZmq.cpp')
-rw-r--r--src/dabInputZmq.cpp141
1 files changed, 139 insertions, 2 deletions
diff --git a/src/dabInputZmq.cpp b/src/dabInputZmq.cpp
index 753d6da..ee38440 100644
--- a/src/dabInputZmq.cpp
+++ b/src/dabInputZmq.cpp
@@ -67,13 +67,140 @@ using namespace std;
extern StatsServer* global_stats;
+int readkey(string& keyfile, char* key)
+{
+ int fd = open(keyfile.c_str(), O_RDONLY);
+ if (fd < 0)
+ return fd;
+ int ret = read(fd, key, CURVE_KEYLEN);
+ if (ret < 0)
+ return ret;
+ close(fd);
+
+ /* It needs to be zero-terminated */
+ key[CURVE_KEYLEN] = '\0';
+
+ return 0;
+}
+
/***** Common functions (MPEG and AAC) ******/
-int DabInputZmqBase::open(const std::string inputUri)
+/* If necessary, unbind the socket, then check the keys,
+ * if they are ok and encryption is required, set the
+ * keys to the socket, and finally bind the socket
+ * to the new address
+ */
+void DabInputZmqBase::rebind()
{
+ if (! m_zmq_sock_bound_to.empty()) {
+ try {
+ m_zmq_sock.unbind(m_zmq_sock_bound_to.c_str());
+ }
+ catch (zmq::error_t& err) {
+ etiLog.level(warn) << "ZMQ unbind for input " << m_name << " failed";
+ }
+ }
+
+ m_zmq_sock_bound_to = "";
+
+ /* Load each key independently */
+ if (! m_config.curve_public_keyfile.empty()) {
+ int rc = readkey(m_config.curve_public_keyfile, m_curve_public_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid public key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_public_key);
+ }
+ }
+
+ if (! m_config.curve_secret_keyfile.empty()) {
+ int rc = readkey(m_config.curve_secret_keyfile, m_curve_secret_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid secret key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_secret_key);
+ }
+ }
+
+ if (! m_config.curve_encoder_keyfile.empty()) {
+ int rc = readkey(m_config.curve_encoder_keyfile, m_curve_encoder_key);
+
+ if (rc < 0) {
+ etiLog.level(warn) << "Invalid encoder key for input " <<
+ m_name;
+
+ INVALIDATE_KEY(m_curve_encoder_key);
+ }
+ }
+
+ /* If you want encryption, you need to have defined all
+ * key files
+ */
+ if ( m_config.enable_encryption &&
+ ( ! (KEY_VALID(m_curve_public_key) &&
+ KEY_VALID(m_curve_secret_key) &&
+ KEY_VALID(m_curve_encoder_key) ) ) ) {
+ throw std::runtime_error("When enabling encryption, all three "
+ "keyfiles must be set!");
+ }
+
+ if (m_config.enable_encryption) {
+ try {
+ /* We want to check that the encoder is the right one,
+ * so the encoder is the CURVE server.
+ */
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SERVERKEY,
+ m_curve_encoder_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set encoder key for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
+ }
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_CURVE_PUBLICKEY,
+ m_curve_public_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set public key for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
+ }
+
+ try {
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SECRETKEY,
+ m_curve_secret_key, CURVE_KEYLEN);
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ set secret key for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
+ }
+ }
+ else {
+ try {
+ /* This forces the socket to go to the ZMQ_NULL auth
+ * mechanism
+ */
+ const int no = 0;
+ m_zmq_sock.setsockopt(ZMQ_CURVE_SERVER, &no, sizeof(no));
+ }
+ catch (zmq::error_t& err) {
+ std::ostringstream os;
+ os << "ZMQ remove keys for input " << m_name << " failed";
+ throw std::runtime_error(os.str());
+ }
+
+ }
+
// Prepare the ZMQ socket to accept connections
try {
- m_zmq_sock.bind(inputUri.c_str());
+ m_zmq_sock.bind(m_inputUri.c_str());
}
catch (zmq::error_t& err) {
std::ostringstream os;
@@ -81,6 +208,8 @@ int DabInputZmqBase::open(const std::string inputUri)
throw std::runtime_error(os.str());
}
+ m_zmq_sock_bound_to = m_inputUri;
+
try {
m_zmq_sock.setsockopt(ZMQ_SUBSCRIBE, NULL, 0);
}
@@ -89,6 +218,14 @@ int DabInputZmqBase::open(const std::string inputUri)
os << "ZMQ set socket options for input " << m_name << " failed";
throw std::runtime_error(os.str());
}
+}
+
+int DabInputZmqBase::open(const std::string inputUri)
+{
+ m_inputUri = inputUri;
+
+ /* Let caller handle exceptions when we open() */
+ rebind();
// We want to appear in the statistics !
global_stats->registerInput(m_name);