diff options
| author | Matthias P. Braendli <matthias.braendli@mpb.li> | 2021-11-26 16:47:15 +0100 | 
|---|---|---|
| committer | Matthias P. Braendli <matthias.braendli@mpb.li> | 2021-11-26 16:47:15 +0100 | 
| commit | 22fd7d757f11a6c976d7e711fd46cf2ce3247c44 (patch) | |
| tree | 782bd87ef76cee4bbeb723ff1a99cd2e3ae6d408 | |
| parent | 9a5cbe7ed3def3eb465174531b530c49a6203a13 (diff) | |
| download | dabmod-22fd7d757f11a6c976d7e711fd46cf2ce3247c44.tar.gz dabmod-22fd7d757f11a6c976d7e711fd46cf2ce3247c44.tar.bz2 dabmod-22fd7d757f11a6c976d7e711fd46cf2ce3247c44.zip | |
Common ba7f317, 5a0689a, c63fb05
| -rw-r--r-- | lib/Socket.cpp | 122 | ||||
| -rw-r--r-- | lib/Socket.h | 1 | ||||
| -rw-r--r-- | lib/edi/common.cpp | 93 | ||||
| -rw-r--r-- | lib/edi/common.hpp | 19 | 
4 files changed, 190 insertions, 45 deletions
| diff --git a/lib/Socket.cpp b/lib/Socket.cpp index c876f32..d12c970 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -409,6 +409,121 @@ bool TCPSocket::valid() const      return m_sock != -1;  } +void TCPSocket::connect(const std::string& hostname, int port, int timeout_ms) +{ +    if (m_sock != INVALID_SOCKET) { +        throw std::logic_error("You may only connect an invalid TCPSocket"); +    } + +    char service[NI_MAXSERV]; +    snprintf(service, NI_MAXSERV-1, "%d", port); + +    /* Obtain address(es) matching host/port */ +    struct addrinfo hints; +    memset(&hints, 0, sizeof(struct addrinfo)); +    hints.ai_family = AF_INET; +    hints.ai_socktype = SOCK_STREAM; +    hints.ai_flags = 0; +    hints.ai_protocol = 0; + +    struct addrinfo *result, *rp; +    int s = getaddrinfo(hostname.c_str(), service, &hints, &result); +    if (s != 0) { +        throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); +    } + +    int flags = 0; + +    /* getaddrinfo() returns a list of address structures. +       Try each address until we successfully connect(2). +       If socket(2) (or connect(2)) fails, we (close the socket +       and) try the next address. */ + +    for (rp = result; rp != nullptr; rp = rp->ai_next) { +        int sfd = ::socket(rp->ai_family, rp->ai_socktype, +                rp->ai_protocol); +        if (sfd == -1) +            continue; + +        flags = fcntl(sfd, F_GETFL); +        if (flags == -1) { +            std::string errstr(strerror(errno)); +            throw std::runtime_error("TCP: Could not get socket flags: " + errstr); +        } + +        if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { +            std::string errstr(strerror(errno)); +            throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); +        } + +        int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen); +        if (ret == 0) { +            m_sock = sfd; +            break; +        } +        if (ret == -1 and errno == EINPROGRESS) { +            m_sock = sfd; +            struct pollfd fds[1]; +            fds[0].fd = m_sock; +            fds[0].events = POLLOUT; + +            int retval = poll(fds, 1, timeout_ms); + +            if (retval == -1) { +                std::string errstr(strerror(errno)); +                ::close(m_sock); +                freeaddrinfo(result); +                throw runtime_error("TCP: connect error on poll: " + errstr); +            } +            else if (retval > 0) { +                int so_error = 0; +                socklen_t len = sizeof(so_error); + +                if (getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &so_error, &len) == -1) { +                    std::string errstr(strerror(errno)); +                    ::close(m_sock); +                    freeaddrinfo(result); +                    throw runtime_error("TCP: getsockopt error connect: " + errstr); +                } + +                if (so_error == 0) { +                    break; +                } +            } +            else { +                ::close(m_sock); +                freeaddrinfo(result); +                throw runtime_error("Timeout on connect"); +            } +            break; +        } + +        ::close(sfd); +    } + +    if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) +        int val = 1; +        if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) +                == SOCKET_ERROR) { +            throw runtime_error("Can't set SO_NOSIGPIPE"); +        } +#endif +    } + +    // Don't keep the socket blocking +    if (fcntl(m_sock, F_SETFL, flags) == -1) { +        std::string errstr(strerror(errno)); +        throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); +    } + +    freeaddrinfo(result); + +    if (rp == nullptr) { +        throw runtime_error("Could not connect"); +    } +} +  void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)  {      if (m_sock != INVALID_SOCKET) { @@ -447,11 +562,15 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)              int flags = fcntl(sfd, F_GETFL);              if (flags == -1) {                  std::string errstr(strerror(errno)); +                freeaddrinfo(result); +                ::close(sfd);                  throw std::runtime_error("TCP: Could not get socket flags: " + errstr);              }              if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) {                  std::string errstr(strerror(errno)); +                freeaddrinfo(result); +                ::close(sfd);                  throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr);              }          } @@ -480,7 +599,6 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock)      if (rp == nullptr) {          throw runtime_error("Could not connect");      } -  }  void TCPSocket::listen(int port, const string& name) @@ -936,10 +1054,12 @@ void TCPReceiveServer::process()                  sock.close();                  // TODO replace fprintf                  fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); +                m_queue.push(make_shared<TCPReceiveMessageDisconnected>());              }              if (num_timeouts > max_num_timeouts) {                  sock.close(); +                m_queue.push(make_shared<TCPReceiveMessageDisconnected>());              }          }      } diff --git a/lib/Socket.h b/lib/Socket.h index 33cdc05..08607a5 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -168,6 +168,7 @@ class TCPSocket {          bool valid(void) const;          void connect(const std::string& hostname, int port, bool nonblock = false); +        void connect(const std::string& hostname, int port, int timeout_ms);          void listen(int port, const std::string& name);          void close(void); diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 2f20391..c99997a 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -153,24 +153,33 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)      while (m_input_data.size() > 2) {          if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { -            const decode_state_t st = decode_afpacket(m_input_data); - -            if (st.num_bytes_consumed == 0 and not st.complete) { -                // We need to refill our buffer -                break; +            const auto r = decode_afpacket(m_input_data); +            bool leave_loop = false; +            switch (r.st) { +                case decode_state_e::Ok: +                    m_last_sequences.pseq_valid = false; +                    m_af_packet_completed(); +                    break; +                case decode_state_e::MissingData: +                    /* Continue filling buffer */ +                    leave_loop = true; +                    break; +                case decode_state_e::Error: +                    m_last_sequences.pseq_valid = false; +                    leave_loop = true; +                    break;              } -            if (st.num_bytes_consumed) { +            if (r.num_bytes_consumed) {                  vector<uint8_t> remaining_data; -                copy(m_input_data.begin() + st.num_bytes_consumed, +                copy(m_input_data.begin() + r.num_bytes_consumed,                          m_input_data.end(),                          back_inserter(remaining_data));                  m_input_data = remaining_data;              } -            m_last_sequences.pseq_valid = false; -            if (st.complete) { -                m_af_packet_completed(); +            if (leave_loop) { +                break;              }          }          else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') { @@ -194,12 +203,21 @@ void TagDispatcher::push_bytes(const vector<uint8_t> &buf)              auto af = m_pft.getNextAFPacket();              if (not af.af_packet.empty()) { -                const decode_state_t st = decode_afpacket(af.af_packet); -                m_last_sequences.pseq = af.pseq; -                m_last_sequences.pseq_valid = true; - -                if (st.complete) { -                    m_af_packet_completed(); +                const auto r = decode_afpacket(af.af_packet); + +                switch (r.st) { +                    case decode_state_e::Ok: +                        m_last_sequences.pseq = af.pseq; +                        m_last_sequences.pseq_valid = true; +                        m_af_packet_completed(); +                        break; +                    case decode_state_e::MissingData: +                        etiLog.level(error) << "ETI MissingData on PFT push_bytes"; +                        m_last_sequences.pseq_valid = false; +                        break; +                    case decode_state_e::Error: +                        m_last_sequences.pseq_valid = false; +                        break;                  }              }          } @@ -219,10 +237,10 @@ void TagDispatcher::push_packet(const Packet &packet)      }      if (buf[0] == 'A' and buf[1] == 'F') { -        const decode_state_t st = decode_afpacket(buf); +        const auto r = decode_afpacket(buf);          m_last_sequences.pseq_valid = false; -        if (st.complete) { +        if (r.st == decode_state_e::Ok) {              m_af_packet_completed();          } @@ -237,11 +255,11 @@ void TagDispatcher::push_packet(const Packet &packet)          auto af = m_pft.getNextAFPacket();          if (not af.af_packet.empty()) { -            const decode_state_t st = decode_afpacket(af.af_packet); -            m_last_sequences.pseq = af.pseq; -            m_last_sequences.pseq_valid = true; +            const auto r = decode_afpacket(af.af_packet); -            if (st.complete) { +            if (r.st == decode_state_e::Ok) { +                m_last_sequences.pseq = af.pseq; +                m_last_sequences.pseq_valid = true;                  m_af_packet_completed();              }          } @@ -261,11 +279,11 @@ void TagDispatcher::setMaxDelay(int num_af_packets)  #define AFPACKET_HEADER_LEN 10 // includes SYNC -decode_state_t TagDispatcher::decode_afpacket( +TagDispatcher::decode_result_t TagDispatcher::decode_afpacket(          const std::vector<uint8_t> &input_data)  {      if (input_data.size() < AFPACKET_HEADER_LEN) { -        return {false, 0}; +        return {decode_state_e::MissingData, 0};      }      // read length from packet @@ -274,7 +292,7 @@ decode_state_t TagDispatcher::decode_afpacket(      const size_t crclength = 2;      if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { -        return {false, 0}; +        return {decode_state_e::MissingData, 0};      }      // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional @@ -291,22 +309,23 @@ decode_state_t TagDispatcher::decode_afpacket(      }      m_last_sequences.seq = seq; +    const size_t crclen = 2;      bool has_crc = (input_data[8] & 0x80) ? true : false;      uint8_t major_revision = (input_data[8] & 0x70) >> 4;      uint8_t minor_revision = input_data[8] & 0x0F;      if (major_revision != 1 or minor_revision != 0) { -        throw invalid_argument("EDI AF Packet has wrong revision " + -                to_string(major_revision) + "." + to_string(minor_revision)); +        etiLog.level(warn) << "EDI AF Packet has wrong revision " << +                (int)major_revision << "." << (int)minor_revision; +    } + +    if (not has_crc) { +        etiLog.level(warn) << "AF packet not supported, has no CRC"; +        return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength};      }      uint8_t pt = input_data[9];      if (pt != 'T') {          // only support Tag -        return {false, 0}; -    } - - -    if (not has_crc) { -        throw invalid_argument("AF packet not supported, has no CRC"); +        return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};      }      uint16_t crc = 0xffff; @@ -318,7 +337,8 @@ decode_state_t TagDispatcher::decode_afpacket(      uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength);      if (packet_crc != crc) { -        throw invalid_argument("AF Packet crc wrong"); +        etiLog.level(warn) << "AF Packet crc wrong"; +        return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen};      }      else {          vector<uint8_t> payload(taglength); @@ -326,8 +346,9 @@ decode_state_t TagDispatcher::decode_afpacket(                  input_data.begin() + AFPACKET_HEADER_LEN + taglength,                  payload.begin()); -        return {decode_tagpacket(payload), -            AFPACKET_HEADER_LEN + taglength + 2}; +        return { +            decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error, +            AFPACKET_HEADER_LEN + taglength + crclen};      }  } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index e8c57c1..5e31984 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -49,13 +49,6 @@ struct frame_timestamp_t {      static frame_timestamp_t from_unix_epoch(std::time_t time, uint32_t tai_utc_offset, uint32_t tsta);  }; -struct decode_state_t { -    decode_state_t(bool _complete, size_t _num_bytes_consumed) : -        complete(_complete), num_bytes_consumed(_num_bytes_consumed) {} -    bool complete; -    size_t num_bytes_consumed; -}; -  using tag_name_t = std::array<uint8_t, 4>;  std::string tag_name_to_human_readable(const tag_name_t& name); @@ -122,7 +115,17 @@ class TagDispatcher {          }      private: -        decode_state_t decode_afpacket(const std::vector<uint8_t> &input_data); +        enum class decode_state_e { +            Ok, MissingData, Error +        }; +        struct decode_result_t { +            decode_result_t(decode_state_e _st, size_t _num_bytes_consumed) : +                st(_st), num_bytes_consumed(_num_bytes_consumed) {} +            decode_state_e st; +            size_t num_bytes_consumed; +        }; + +        decode_result_t decode_afpacket(const std::vector<uint8_t> &input_data);          bool decode_tagpacket(const std::vector<uint8_t> &payload);          PFT::PFT m_pft; | 
