From 22fd7d757f11a6c976d7e711fd46cf2ce3247c44 Mon Sep 17 00:00:00 2001 From: "Matthias P. Braendli" Date: Fri, 26 Nov 2021 16:47:15 +0100 Subject: Common ba7f317, 5a0689a, c63fb05 --- lib/Socket.cpp | 122 ++++++++++++++++++++++++++++++++++++++++++++++++++++- lib/Socket.h | 1 + lib/edi/common.cpp | 93 ++++++++++++++++++++++++---------------- lib/edi/common.hpp | 19 +++++---- 4 files changed, 190 insertions(+), 45 deletions(-) diff --git a/lib/Socket.cpp b/lib/Socket.cpp index c876f32..d12c970 100644 --- a/lib/Socket.cpp +++ b/lib/Socket.cpp @@ -409,6 +409,121 @@ bool TCPSocket::valid() const return m_sock != -1; } +void TCPSocket::connect(const std::string& hostname, int port, int timeout_ms) +{ + if (m_sock != INVALID_SOCKET) { + throw std::logic_error("You may only connect an invalid TCPSocket"); + } + + char service[NI_MAXSERV]; + snprintf(service, NI_MAXSERV-1, "%d", port); + + /* Obtain address(es) matching host/port */ + struct addrinfo hints; + memset(&hints, 0, sizeof(struct addrinfo)); + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = 0; + hints.ai_protocol = 0; + + struct addrinfo *result, *rp; + int s = getaddrinfo(hostname.c_str(), service, &hints, &result); + if (s != 0) { + throw runtime_error(string("getaddrinfo failed: ") + gai_strerror(s)); + } + + int flags = 0; + + /* getaddrinfo() returns a list of address structures. + Try each address until we successfully connect(2). + If socket(2) (or connect(2)) fails, we (close the socket + and) try the next address. */ + + for (rp = result; rp != nullptr; rp = rp->ai_next) { + int sfd = ::socket(rp->ai_family, rp->ai_socktype, + rp->ai_protocol); + if (sfd == -1) + continue; + + flags = fcntl(sfd, F_GETFL); + if (flags == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not get socket flags: " + errstr); + } + + if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); + } + + int ret = ::connect(sfd, rp->ai_addr, rp->ai_addrlen); + if (ret == 0) { + m_sock = sfd; + break; + } + if (ret == -1 and errno == EINPROGRESS) { + m_sock = sfd; + struct pollfd fds[1]; + fds[0].fd = m_sock; + fds[0].events = POLLOUT; + + int retval = poll(fds, 1, timeout_ms); + + if (retval == -1) { + std::string errstr(strerror(errno)); + ::close(m_sock); + freeaddrinfo(result); + throw runtime_error("TCP: connect error on poll: " + errstr); + } + else if (retval > 0) { + int so_error = 0; + socklen_t len = sizeof(so_error); + + if (getsockopt(m_sock, SOL_SOCKET, SO_ERROR, &so_error, &len) == -1) { + std::string errstr(strerror(errno)); + ::close(m_sock); + freeaddrinfo(result); + throw runtime_error("TCP: getsockopt error connect: " + errstr); + } + + if (so_error == 0) { + break; + } + } + else { + ::close(m_sock); + freeaddrinfo(result); + throw runtime_error("Timeout on connect"); + } + break; + } + + ::close(sfd); + } + + if (m_sock != INVALID_SOCKET) { +#if defined(HAVE_SO_NOSIGPIPE) + int val = 1; + if (setsockopt(m_sock, SOL_SOCKET, SO_NOSIGPIPE, &val, sizeof(val)) + == SOCKET_ERROR) { + throw runtime_error("Can't set SO_NOSIGPIPE"); + } +#endif + } + + // Don't keep the socket blocking + if (fcntl(m_sock, F_SETFL, flags) == -1) { + std::string errstr(strerror(errno)); + throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); + } + + freeaddrinfo(result); + + if (rp == nullptr) { + throw runtime_error("Could not connect"); + } +} + void TCPSocket::connect(const std::string& hostname, int port, bool nonblock) { if (m_sock != INVALID_SOCKET) { @@ -447,11 +562,15 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock) int flags = fcntl(sfd, F_GETFL); if (flags == -1) { std::string errstr(strerror(errno)); + freeaddrinfo(result); + ::close(sfd); throw std::runtime_error("TCP: Could not get socket flags: " + errstr); } if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { std::string errstr(strerror(errno)); + freeaddrinfo(result); + ::close(sfd); throw std::runtime_error("TCP: Could not set O_NONBLOCK: " + errstr); } } @@ -480,7 +599,6 @@ void TCPSocket::connect(const std::string& hostname, int port, bool nonblock) if (rp == nullptr) { throw runtime_error("Could not connect"); } - } void TCPSocket::listen(int port, const string& name) @@ -936,10 +1054,12 @@ void TCPReceiveServer::process() sock.close(); // TODO replace fprintf fprintf(stderr, "TCP Receiver restarted after error: %s\n", e.what()); + m_queue.push(make_shared()); } if (num_timeouts > max_num_timeouts) { sock.close(); + m_queue.push(make_shared()); } } } diff --git a/lib/Socket.h b/lib/Socket.h index 33cdc05..08607a5 100644 --- a/lib/Socket.h +++ b/lib/Socket.h @@ -168,6 +168,7 @@ class TCPSocket { bool valid(void) const; void connect(const std::string& hostname, int port, bool nonblock = false); + void connect(const std::string& hostname, int port, int timeout_ms); void listen(int port, const std::string& name); void close(void); diff --git a/lib/edi/common.cpp b/lib/edi/common.cpp index 2f20391..c99997a 100644 --- a/lib/edi/common.cpp +++ b/lib/edi/common.cpp @@ -153,24 +153,33 @@ void TagDispatcher::push_bytes(const vector &buf) while (m_input_data.size() > 2) { if (m_input_data[0] == 'A' and m_input_data[1] == 'F') { - const decode_state_t st = decode_afpacket(m_input_data); - - if (st.num_bytes_consumed == 0 and not st.complete) { - // We need to refill our buffer - break; + const auto r = decode_afpacket(m_input_data); + bool leave_loop = false; + switch (r.st) { + case decode_state_e::Ok: + m_last_sequences.pseq_valid = false; + m_af_packet_completed(); + break; + case decode_state_e::MissingData: + /* Continue filling buffer */ + leave_loop = true; + break; + case decode_state_e::Error: + m_last_sequences.pseq_valid = false; + leave_loop = true; + break; } - if (st.num_bytes_consumed) { + if (r.num_bytes_consumed) { vector remaining_data; - copy(m_input_data.begin() + st.num_bytes_consumed, + copy(m_input_data.begin() + r.num_bytes_consumed, m_input_data.end(), back_inserter(remaining_data)); m_input_data = remaining_data; } - m_last_sequences.pseq_valid = false; - if (st.complete) { - m_af_packet_completed(); + if (leave_loop) { + break; } } else if (m_input_data[0] == 'P' and m_input_data[1] == 'F') { @@ -194,12 +203,21 @@ void TagDispatcher::push_bytes(const vector &buf) auto af = m_pft.getNextAFPacket(); if (not af.af_packet.empty()) { - const decode_state_t st = decode_afpacket(af.af_packet); - m_last_sequences.pseq = af.pseq; - m_last_sequences.pseq_valid = true; - - if (st.complete) { - m_af_packet_completed(); + const auto r = decode_afpacket(af.af_packet); + + switch (r.st) { + case decode_state_e::Ok: + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; + m_af_packet_completed(); + break; + case decode_state_e::MissingData: + etiLog.level(error) << "ETI MissingData on PFT push_bytes"; + m_last_sequences.pseq_valid = false; + break; + case decode_state_e::Error: + m_last_sequences.pseq_valid = false; + break; } } } @@ -219,10 +237,10 @@ void TagDispatcher::push_packet(const Packet &packet) } if (buf[0] == 'A' and buf[1] == 'F') { - const decode_state_t st = decode_afpacket(buf); + const auto r = decode_afpacket(buf); m_last_sequences.pseq_valid = false; - if (st.complete) { + if (r.st == decode_state_e::Ok) { m_af_packet_completed(); } @@ -237,11 +255,11 @@ void TagDispatcher::push_packet(const Packet &packet) auto af = m_pft.getNextAFPacket(); if (not af.af_packet.empty()) { - const decode_state_t st = decode_afpacket(af.af_packet); - m_last_sequences.pseq = af.pseq; - m_last_sequences.pseq_valid = true; + const auto r = decode_afpacket(af.af_packet); - if (st.complete) { + if (r.st == decode_state_e::Ok) { + m_last_sequences.pseq = af.pseq; + m_last_sequences.pseq_valid = true; m_af_packet_completed(); } } @@ -261,11 +279,11 @@ void TagDispatcher::setMaxDelay(int num_af_packets) #define AFPACKET_HEADER_LEN 10 // includes SYNC -decode_state_t TagDispatcher::decode_afpacket( +TagDispatcher::decode_result_t TagDispatcher::decode_afpacket( const std::vector &input_data) { if (input_data.size() < AFPACKET_HEADER_LEN) { - return {false, 0}; + return {decode_state_e::MissingData, 0}; } // read length from packet @@ -274,7 +292,7 @@ decode_state_t TagDispatcher::decode_afpacket( const size_t crclength = 2; if (input_data.size() < AFPACKET_HEADER_LEN + taglength + crclength) { - return {false, 0}; + return {decode_state_e::MissingData, 0}; } // SEQ wraps at 0xFFFF, unsigned integer overflow is intentional @@ -291,22 +309,23 @@ decode_state_t TagDispatcher::decode_afpacket( } m_last_sequences.seq = seq; + const size_t crclen = 2; bool has_crc = (input_data[8] & 0x80) ? true : false; uint8_t major_revision = (input_data[8] & 0x70) >> 4; uint8_t minor_revision = input_data[8] & 0x0F; if (major_revision != 1 or minor_revision != 0) { - throw invalid_argument("EDI AF Packet has wrong revision " + - to_string(major_revision) + "." + to_string(minor_revision)); + etiLog.level(warn) << "EDI AF Packet has wrong revision " << + (int)major_revision << "." << (int)minor_revision; + } + + if (not has_crc) { + etiLog.level(warn) << "AF packet not supported, has no CRC"; + return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength}; } uint8_t pt = input_data[9]; if (pt != 'T') { // only support Tag - return {false, 0}; - } - - - if (not has_crc) { - throw invalid_argument("AF packet not supported, has no CRC"); + return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen}; } uint16_t crc = 0xffff; @@ -318,7 +337,8 @@ decode_state_t TagDispatcher::decode_afpacket( uint16_t packet_crc = read_16b(input_data.begin() + AFPACKET_HEADER_LEN + taglength); if (packet_crc != crc) { - throw invalid_argument("AF Packet crc wrong"); + etiLog.level(warn) << "AF Packet crc wrong"; + return {decode_state_e::Error, AFPACKET_HEADER_LEN + taglength + crclen}; } else { vector payload(taglength); @@ -326,8 +346,9 @@ decode_state_t TagDispatcher::decode_afpacket( input_data.begin() + AFPACKET_HEADER_LEN + taglength, payload.begin()); - return {decode_tagpacket(payload), - AFPACKET_HEADER_LEN + taglength + 2}; + return { + decode_tagpacket(payload) ? decode_state_e::Ok : decode_state_e::Error, + AFPACKET_HEADER_LEN + taglength + crclen}; } } diff --git a/lib/edi/common.hpp b/lib/edi/common.hpp index e8c57c1..5e31984 100644 --- a/lib/edi/common.hpp +++ b/lib/edi/common.hpp @@ -49,13 +49,6 @@ struct frame_timestamp_t { static frame_timestamp_t from_unix_epoch(std::time_t time, uint32_t tai_utc_offset, uint32_t tsta); }; -struct decode_state_t { - decode_state_t(bool _complete, size_t _num_bytes_consumed) : - complete(_complete), num_bytes_consumed(_num_bytes_consumed) {} - bool complete; - size_t num_bytes_consumed; -}; - using tag_name_t = std::array; std::string tag_name_to_human_readable(const tag_name_t& name); @@ -122,7 +115,17 @@ class TagDispatcher { } private: - decode_state_t decode_afpacket(const std::vector &input_data); + enum class decode_state_e { + Ok, MissingData, Error + }; + struct decode_result_t { + decode_result_t(decode_state_e _st, size_t _num_bytes_consumed) : + st(_st), num_bytes_consumed(_num_bytes_consumed) {} + decode_state_e st; + size_t num_bytes_consumed; + }; + + decode_result_t decode_afpacket(const std::vector &input_data); bool decode_tagpacket(const std::vector &payload); PFT::PFT m_pft; -- cgit v1.2.3