diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2021-10-02 18:13:24 +0200 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2021-10-02 18:13:24 +0200 | 
| commit | 20e7c628347ec0e4a6469b53b98b5309c74a47f0 (patch) | |
| tree | 3ee868b8535029ba055801954aec2ec2b22c3ea3 /lib | |
| parent | 37b60a6aab4abb47bb052db6fb1b49fe277e4bdc (diff) | |
| download | dabmux-20e7c628347ec0e4a6469b53b98b5309c74a47f0.tar.gz dabmux-20e7c628347ec0e4a6469b53b98b5309c74a47f0.tar.bz2 dabmux-20e7c628347ec0e4a6469b53b98b5309c74a47f0.zip  | |
Common ba7f317: Improve EDI TCP receive
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/Socket.cpp | 2 | ||||
| -rw-r--r-- | lib/edi/common.cpp | 90 | ||||
| -rw-r--r-- | lib/edi/common.hpp | 19 | 
3 files changed, 65 insertions, 46 deletions
diff --git a/lib/Socket.cpp b/lib/Socket.cpp index c876f32..7ff6b5e 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -936,10 +936,12 @@ void TCPReceiveServer::process()                  sock.close();                  // TODO replace fprintf                  fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); +                m_queue.push(make_shared<TCPReceiveMessageDisconnected>());              }              if (num_timeouts > max_num_timeouts) {                  sock.close(); +                m_queue.push(make_shared<TCPReceiveMessageDisconnected>());              }          }      } diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 2f20391..abaf2ed 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -153,25 +153,27 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)      while (m_input_data.size() > 2) {          if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { -            const decode_state_t st = decode_afpacket(m_input_data); - -            if (st.num_bytes_consumed == 0 and not st.complete) { -                // We need to refill our buffer -                break; +            const auto r = decode_afpacket(m_input_data); +            switch (r.st) { +                case decode_state_e::Ok: +                    m_last_sequences.pseq_valid = false; +                    m_af_packet_completed(); +                    break; +                case decode_state_e::MissingData: +                    /* Continue filling buffer */ +                    break; +                case decode_state_e::Error: +                    m_last_sequences.pseq_valid = false; +                    break;              } -            if (st.num_bytes_consumed) { +            if (r.num_bytes_consumed) {                  vector<uint8_t> remaining_data; -                copy(m_input_data.begin() + st.num_bytes_consumed, +                copy(m_input_data.begin() + r.num_bytes_consumed,                          m_input_data.end(),                          back_inserter(remaining_data));                  m_input_data = remaining_data;              } - -            m_last_sequences.pseq_valid = false; -            if (st.complete) { -                m_af_packet_completed(); -            }          }          else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') {              PFT::Fragment fragment; @@ -194,12 +196,21 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)              auto af = m_pft.getNextAFPacket();              if (not af.af_packet.empty()) { -                const decode_state_t st = decode_afpacket(af.af_packet); -                m_last_sequences.pseq = af.pseq; -                m_last_sequences.pseq_valid = true; - -                if (st.complete) { -                    m_af_packet_completed(); +                const auto r = decode_afpacket(af.af_packet); + +                switch (r.st) { +                    case decode_state_e::Ok: +                        m_last_sequences.pseq = af.pseq; +                        m_last_sequences.pseq_valid = true; +                        m_af_packet_completed(); +                        break; +                    case decode_state_e::MissingData: +                        etiLog.level(error) << "ETI MissingData on PFT push_bytes"; +                        m_last_sequences.pseq_valid = false; +                        break; +                    case decode_state_e::Error: +                        m_last_sequences.pseq_valid = false; +                        break;                  }              }          } @@ -219,10 +230,10 @@ void TagDispatcher::push_packet(const Packet &packet)      }      if (buf[0] == 'A' and buf[1] == 'F') { -        const decode_state_t st = decode_afpacket(buf); +        const auto r = decode_afpacket(buf);          m_last_sequences.pseq_valid = false; -        if (st.complete) { +        if (r.st == decode_state_e::Ok) {              m_af_packet_completed();          } @@ -237,11 +248,11 @@ void TagDispatcher::push_packet(const Packet &packet)          auto af = m_pft.getNextAFPacket();          if (not af.af_packet.empty()) { -            const decode_state_t st = decode_afpacket(af.af_packet); -            m_last_sequences.pseq = af.pseq; -            m_last_sequences.pseq_valid = true; +            const auto r = decode_afpacket(af.af_packet); -            if (st.complete) { +            if (r.st == decode_state_e::Ok) { +                m_last_sequences.pseq = af.pseq; +                m_last_sequences.pseq_valid = true;                  m_af_packet_completed();              }          } @@ -261,11 +272,11 @@ void TagDispatcher::setMaxDelay(int num_af_packets)  #define AFPACKET_HEADER_LEN 10 // includes SYNC -decode_state_t TagDispatcher::decode_afpacket( +TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(          const std::vector<uint8_t> &input_data)  {      if (input_data.size() < AFPACKET_HEADER_LEN) { -        return {false, 0}; +        return {decode_state_e::MissingData, 0};      }      // read length from packet @@ -274,7 +285,7 @@ decode_state_t TagDispatcher::decode_afpacket(      const size_t crclength = 2;      if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { -        return {false, 0}; +        return {decode_state_e::MissingData, 0};      }      // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional @@ -291,22 +302,23 @@ decode_state_t TagDispatcher::decode_afpacket(      }      m_last_sequences.seq = seq; +    const size_t crclen = 2;      bool has_crc = (input_data[8] & 0x80) ? true : false;      uint8_t major_revision = (input_data[8] & 0x70) >> 4;      uint8_t minor_revision = input_data[8] & 0x0F;      if (major_revision != 1 or minor_revision != 0) { -        throw invalid_argument("EDI AF Packet has wrong revision " + -                to_string(major_revision) + "." + to_string(minor_revision)); +        etiLog.level(warn) << "EDI AF Packet has wrong revision " << +                (int)major_revision << "." << (int)minor_revision; +    } + +    if (not has_crc) { +        etiLog.level(warn) << "AF packet not supported, has no CRC"; +        return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength};      }      uint8_t pt = input_data[9];      if (pt != 'T') {          // only support Tag -        return {false, 0}; -    } - - -    if (not has_crc) { -        throw invalid_argument("AF packet not supported, has no CRC"); +        return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};      }      uint16_t crc = 0xffff; @@ -318,7 +330,8 @@ decode_state_t TagDispatcher::decode_afpacket(      uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength);      if (packet_crc != crc) { -        throw invalid_argument("AF Packet crc wrong"); +        etiLog.level(warn) << "AF Packet crc wrong"; +        return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};      }      else {          vector<uint8_t> payload(taglength); @@ -326,8 +339,9 @@ decode_state_t TagDispatcher::decode_afpacket(                  input_data.begin() + AFPACKET_HEADER_LEN + taglength,                  payload.begin()); -        return {decode_tagpacket(payload), -            AFPACKET_HEADER_LEN + taglength + 2}; +        return { +            decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error, +            AFPACKET_HEADER_LEN + taglength + crclen};      }  } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index e8c57c1..5e31984 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -49,13 +49,6 @@ struct frame_timestamp_t {      static frame_timestamp_t from_unix_epoch(std::time_t time, uint32_t tai_utc_offset, uint32_t tsta);  }; -struct decode_state_t { -    decode_state_t(bool _complete, size_t _num_bytes_consumed) : -        complete(_complete), num_bytes_consumed(_num_bytes_consumed) {} -    bool complete; -    size_t num_bytes_consumed; -}; -  using tag_name_t = std::array<uint8_t, 4>;  std::string tag_name_to_human_readable(const tag_name_t& name); @@ -122,7 +115,17 @@ class TagDispatcher {          }      private: -        decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data); +        enum class decode_state_e { +            Ok, MissingData, Error +        }; +        struct decode_result_t { +            decode_result_t(decode_state_e _st, size_t _num_bytes_consumed) : +                st(_st), num_bytes_consumed(_num_bytes_consumed) {} +            decode_state_e st; +            size_t num_bytes_consumed; +        }; + +        decode_result_t decode_afpacket(const std::vector<uint8_t> &input_data);          bool decode_tagpacket(const std::vector<uint8_t> &payload);          PFT::PFT m_pft;  | 
