summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias P. Braendli <matthias.braendli@mpb.li>2021-10-02 18:13:24 +0200
committerMatthias P. Braendli <matthias.braendli@mpb.li>2021-10-02 18:13:24 +0200
commit20e7c628347ec0e4a6469b53b98b5309c74a47f0 (patch)
tree3ee868b8535029ba055801954aec2ec2b22c3ea3
parent37b60a6aab4abb47bb052db6fb1b49fe277e4bdc (diff)
downloaddabmux-20e7c628347ec0e4a6469b53b98b5309c74a47f0.tar.gz
dabmux-20e7c628347ec0e4a6469b53b98b5309c74a47f0.tar.bz2
dabmux-20e7c628347ec0e4a6469b53b98b5309c74a47f0.zip
Common ba7f317: Improve EDI TCP receive
-rw-r--r--lib/Socket.cpp2
-rw-r--r--lib/edi/common.cpp90
-rw-r--r--lib/edi/common.hpp19
3 files changed, 65 insertions, 46 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp
index c876f32..7ff6b5e 100644
--- a/lib/Socket.cpp
+++ b/lib/Socket.cpp
@@ -936,10 +936,12 @@ void TCPReceiveServer::process()
sock.close();
// TODO replace fprintf
fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what());
+ m_queue.push(make_shared<TCPReceiveMessageDisconnected>());
}
if (num_timeouts > max_num_timeouts) {
sock.close();
+ m_queue.push(make_shared<TCPReceiveMessageDisconnected>());
}
}
}
diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp
index 2f20391..abaf2ed 100644
--- a/lib/edi/common.cpp
+++ b/lib/edi/common.cpp
@@ -153,25 +153,27 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)
while (m_input_data.size() > 2) {
if (m_input_data[0] == 'A' and m_input_data[1] == 'F') {
- const decode_state_t st = decode_afpacket(m_input_data);
-
- if (st.num_bytes_consumed == 0 and not st.complete) {
- // We need to refill our buffer
- break;
+ const auto r = decode_afpacket(m_input_data);
+ switch (r.st) {
+ case decode_state_e::Ok:
+ m_last_sequences.pseq_valid = false;
+ m_af_packet_completed();
+ break;
+ case decode_state_e::MissingData:
+ /* Continue filling buffer */
+ break;
+ case decode_state_e::Error:
+ m_last_sequences.pseq_valid = false;
+ break;
}
- if (st.num_bytes_consumed) {
+ if (r.num_bytes_consumed) {
vector<uint8_t> remaining_data;
- copy(m_input_data.begin() + st.num_bytes_consumed,
+ copy(m_input_data.begin() + r.num_bytes_consumed,
m_input_data.end(),
back_inserter(remaining_data));
m_input_data = remaining_data;
}
-
- m_last_sequences.pseq_valid = false;
- if (st.complete) {
- m_af_packet_completed();
- }
}
else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') {
PFT::Fragment fragment;
@@ -194,12 +196,21 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)
auto af = m_pft.getNextAFPacket();
if (not af.af_packet.empty()) {
- const decode_state_t st = decode_afpacket(af.af_packet);
- m_last_sequences.pseq = af.pseq;
- m_last_sequences.pseq_valid = true;
-
- if (st.complete) {
- m_af_packet_completed();
+ const auto r = decode_afpacket(af.af_packet);
+
+ switch (r.st) {
+ case decode_state_e::Ok:
+ m_last_sequences.pseq = af.pseq;
+ m_last_sequences.pseq_valid = true;
+ m_af_packet_completed();
+ break;
+ case decode_state_e::MissingData:
+ etiLog.level(error) << "ETI MissingData on PFT push_bytes";
+ m_last_sequences.pseq_valid = false;
+ break;
+ case decode_state_e::Error:
+ m_last_sequences.pseq_valid = false;
+ break;
}
}
}
@@ -219,10 +230,10 @@ void TagDispatcher::push_packet(const Packet &packet)
}
if (buf[0] == 'A' and buf[1] == 'F') {
- const decode_state_t st = decode_afpacket(buf);
+ const auto r = decode_afpacket(buf);
m_last_sequences.pseq_valid = false;
- if (st.complete) {
+ if (r.st == decode_state_e::Ok) {
m_af_packet_completed();
}
@@ -237,11 +248,11 @@ void TagDispatcher::push_packet(const Packet &packet)
auto af = m_pft.getNextAFPacket();
if (not af.af_packet.empty()) {
- const decode_state_t st = decode_afpacket(af.af_packet);
- m_last_sequences.pseq = af.pseq;
- m_last_sequences.pseq_valid = true;
+ const auto r = decode_afpacket(af.af_packet);
- if (st.complete) {
+ if (r.st == decode_state_e::Ok) {
+ m_last_sequences.pseq = af.pseq;
+ m_last_sequences.pseq_valid = true;
m_af_packet_completed();
}
}
@@ -261,11 +272,11 @@ void TagDispatcher::setMaxDelay(int num_af_packets)
#define AFPACKET_HEADER_LEN 10 // includes SYNC
-decode_state_t TagDispatcher::decode_afpacket(
+TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(
const std::vector<uint8_t> &input_data)
{
if (input_data.size() < AFPACKET_HEADER_LEN) {
- return {false, 0};
+ return {decode_state_e::MissingData, 0};
}
// read length from packet
@@ -274,7 +285,7 @@ decode_state_t TagDispatcher::decode_afpacket(
const size_t crclength = 2;
if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) {
- return {false, 0};
+ return {decode_state_e::MissingData, 0};
}
// SEQ wraps at 0xFFFF, unsigned integer overflow is intentional
@@ -291,22 +302,23 @@ decode_state_t TagDispatcher::decode_afpacket(
}
m_last_sequences.seq = seq;
+ const size_t crclen = 2;
bool has_crc = (input_data[8] & 0x80) ? true : false;
uint8_t major_revision = (input_data[8] & 0x70) >> 4;
uint8_t minor_revision = input_data[8] & 0x0F;
if (major_revision != 1 or minor_revision != 0) {
- throw invalid_argument("EDI AF Packet has wrong revision " +
- to_string(major_revision) + "." + to_string(minor_revision));
+ etiLog.level(warn) << "EDI AF Packet has wrong revision " <<
+ (int)major_revision << "." << (int)minor_revision;
+ }
+
+ if (not has_crc) {
+ etiLog.level(warn) << "AF packet not supported, has no CRC";
+ return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength};
}
uint8_t pt = input_data[9];
if (pt != 'T') {
// only support Tag
- return {false, 0};
- }
-
-
- if (not has_crc) {
- throw invalid_argument("AF packet not supported, has no CRC");
+ return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};
}
uint16_t crc = 0xffff;
@@ -318,7 +330,8 @@ decode_state_t TagDispatcher::decode_afpacket(
uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength);
if (packet_crc != crc) {
- throw invalid_argument("AF Packet crc wrong");
+ etiLog.level(warn) << "AF Packet crc wrong";
+ return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};
}
else {
vector<uint8_t> payload(taglength);
@@ -326,8 +339,9 @@ decode_state_t TagDispatcher::decode_afpacket(
input_data.begin() + AFPACKET_HEADER_LEN + taglength,
payload.begin());
- return {decode_tagpacket(payload),
- AFPACKET_HEADER_LEN + taglength + 2};
+ return {
+ decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error,
+ AFPACKET_HEADER_LEN + taglength + crclen};
}
}
diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp
index e8c57c1..5e31984 100644
--- a/lib/edi/common.hpp
+++ b/lib/edi/common.hpp
@@ -49,13 +49,6 @@ struct frame_timestamp_t {
static frame_timestamp_t from_unix_epoch(std::time_t time, uint32_t tai_utc_offset, uint32_t tsta);
};
-struct decode_state_t {
- decode_state_t(bool _complete, size_t _num_bytes_consumed) :
- complete(_complete), num_bytes_consumed(_num_bytes_consumed) {}
- bool complete;
- size_t num_bytes_consumed;
-};
-
using tag_name_t = std::array<uint8_t, 4>;
std::string tag_name_to_human_readable(const tag_name_t& name);
@@ -122,7 +115,17 @@ class TagDispatcher {
}
private:
- decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data);
+ enum class decode_state_e {
+ Ok, MissingData, Error
+ };
+ struct decode_result_t {
+ decode_result_t(decode_state_e _st, size_t _num_bytes_consumed) :
+ st(_st), num_bytes_consumed(_num_bytes_consumed) {}
+ decode_state_e st;
+ size_t num_bytes_consumed;
+ };
+
+ decode_result_t decode_afpacket(const std::vector<uint8_t> &input_data);
bool decode_tagpacket(const std::vector<uint8_t> &payload);
PFT::PFT m_pft;