From 20e7c628347ec0e4a6469b53b98b5309c74a47f0 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Sat, 2 Oct 2021 18:13:24 +0200 Subject: Common ba7f317: Improve EDI TCP receive --- lib/Socket.cpp | 2 ++ lib/edi/common.cpp | 90 +++++++++++++++++++++++++++++++----------------------- lib/edi/common.hpp | 19 +++++++----- 3 files changed, 65 insertions(+), 46 deletions(-) (limited to 'lib') diff --git a/lib/Socket.cpp b/lib/Socket.cpp index c876f32..7ff6b5e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -936,10 +936,12 @@ void TCPReceiveServer::process() sock.close(); // TODO replace fprintf fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); + m_queue.push(make_shared()); } if (num_timeouts > max_num_timeouts) { sock.close(); + m_queue.push(make_shared()); } } } diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 2f20391..abaf2ed 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -153,25 +153,27 @@ void TagDispatcher::push_bytes(const vector &buf) while (m_input_data.size() > 2) { if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { - const decode_state_t st = decode_afpacket(m_input_data); - - if (st.num_bytes_consumed == 0 and not st.complete) { - // We need to refill our buffer - break; + const auto r = decode_afpacket(m_input_data); + switch (r.st) { + case decode_state_e::Ok: + m_last_sequences.pseq_valid = false; + m_af_packet_completed(); + break; + case decode_state_e::MissingData: + /* Continue filling buffer */ + break; + case decode_state_e::Error: + m_last_sequences.pseq_valid = false; + break; } - if (st.num_bytes_consumed) { + if (r.num_bytes_consumed) { vector remaining_data; - copy(m_input_data.begin() + st.num_bytes_consumed, + copy(m_input_data.begin() + r.num_bytes_consumed, m_input_data.end(), back_inserter(remaining_data)); m_input_data = remaining_data; } - - m_last_sequences.pseq_valid = false; - if (st.complete) { - m_af_packet_completed(); - } } else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') { PFT::Fragment fragment; @@ -194,12 +196,21 @@ void TagDispatcher::push_bytes(const vector &buf) auto af = m_pft.getNextAFPacket(); if (not af.af_packet.empty()) { - const decode_state_t st = decode_afpacket(af.af_packet); - m_last_sequences.pseq = af.pseq; - m_last_sequences.pseq_valid = true; - - if (st.complete) { - m_af_packet_completed(); + const auto r = decode_afpacket(af.af_packet); + + switch (r.st) { + case decode_state_e::Ok: + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; + m_af_packet_completed(); + break; + case decode_state_e::MissingData: + etiLog.level(error) << "ETI MissingData on PFT push_bytes"; + m_last_sequences.pseq_valid = false; + break; + case decode_state_e::Error: + m_last_sequences.pseq_valid = false; + break; } } } @@ -219,10 +230,10 @@ void TagDispatcher::push_packet(const Packet &packet) } if (buf[0] == 'A' and buf[1] == 'F') { - const decode_state_t st = decode_afpacket(buf); + const auto r = decode_afpacket(buf); m_last_sequences.pseq_valid = false; - if (st.complete) { + if (r.st == decode_state_e::Ok) { m_af_packet_completed(); } @@ -237,11 +248,11 @@ void TagDispatcher::push_packet(const Packet &packet) auto af = m_pft.getNextAFPacket(); if (not af.af_packet.empty()) { - const decode_state_t st = decode_afpacket(af.af_packet); - m_last_sequences.pseq = af.pseq; - m_last_sequences.pseq_valid = true; + const auto r = decode_afpacket(af.af_packet); - if (st.complete) { + if (r.st == decode_state_e::Ok) { + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; m_af_packet_completed(); } } @@ -261,11 +272,11 @@ void TagDispatcher::setMaxDelay(int num_af_packets) #define AFPACKET_HEADER_LEN 10 // includes SYNC -decode_state_t TagDispatcher::decode_afpacket( +TagDispatcher::decode_result_t TagDispatcher::decode_afpacket( const std::vector &input_data) { if (input_data.size() < AFPACKET_HEADER_LEN) { - return {false, 0}; + return {decode_state_e::MissingData, 0}; } // read length from packet @@ -274,7 +285,7 @@ decode_state_t TagDispatcher::decode_afpacket( const size_t crclength = 2; if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { - return {false, 0}; + return {decode_state_e::MissingData, 0}; } // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional @@ -291,22 +302,23 @@ decode_state_t TagDispatcher::decode_afpacket( } m_last_sequences.seq = seq; + const size_t crclen = 2; bool has_crc = (input_data[8] & 0x80) ? true : false; uint8_t major_revision = (input_data[8] & 0x70) >> 4; uint8_t minor_revision = input_data[8] & 0x0F; if (major_revision != 1 or minor_revision != 0) { - throw invalid_argument("EDI AF Packet has wrong revision " + - to_string(major_revision) + "." + to_string(minor_revision)); + etiLog.level(warn) << "EDI AF Packet has wrong revision " << + (int)major_revision << "." << (int)minor_revision; + } + + if (not has_crc) { + etiLog.level(warn) << "AF packet not supported, has no CRC"; + return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength}; } uint8_t pt = input_data[9]; if (pt != 'T') { // only support Tag - return {false, 0}; - } - - - if (not has_crc) { - throw invalid_argument("AF packet not supported, has no CRC"); + return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen}; } uint16_t crc = 0xffff; @@ -318,7 +330,8 @@ decode_state_t TagDispatcher::decode_afpacket( uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); if (packet_crc != crc) { - throw invalid_argument("AF Packet crc wrong"); + etiLog.level(warn) << "AF Packet crc wrong"; + return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen}; } else { vector payload(taglength); @@ -326,8 +339,9 @@ decode_state_t TagDispatcher::decode_afpacket( input_data.begin() + AFPACKET_HEADER_LEN + taglength, payload.begin()); - return {decode_tagpacket(payload), - AFPACKET_HEADER_LEN + taglength + 2}; + return { + decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error, + AFPACKET_HEADER_LEN + taglength + crclen}; } } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index e8c57c1..5e31984 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -49,13 +49,6 @@ struct frame_timestamp_t { static frame_timestamp_t from_unix_epoch(std::time_t time, uint32_t tai_utc_offset, uint32_t tsta); }; -struct decode_state_t { - decode_state_t(bool _complete, size_t _num_bytes_consumed) : - complete(_complete), num_bytes_consumed(_num_bytes_consumed) {} - bool complete; - size_t num_bytes_consumed; -}; - using tag_name_t = std::array; std::string tag_name_to_human_readable(const tag_name_t& name); @@ -122,7 +115,17 @@ class TagDispatcher { } private: - decode_state_t decode_afpacket(const std::vector &input_data); + enum class decode_state_e { + Ok, MissingData, Error + }; + struct decode_result_t { + decode_result_t(decode_state_e _st, size_t _num_bytes_consumed) : + st(_st), num_bytes_consumed(_num_bytes_consumed) {} + decode_state_e st; + size_t num_bytes_consumed; + }; + + decode_result_t decode_afpacket(const std::vector &input_data); bool decode_tagpacket(const std::vector &payload); PFT::PFT m_pft; -- cgit v1.2.3