FM messaging socket reads that are triggered by FM API calls from client services have been seen to rarely but occasionally block/stall the fmManager process. This fmManager stall can then lead to other client service process stalls which in the case of mtcAgent has been seen to lead to uncontrolled switch of activity ; aka Swact. This update adds a 5 second socket read timeout to FM's client services socket setup to avoid the prolonged blocking cases that lead to Swact or adversely affect (block) other client service process execution. Setting a read timeout on Linux sockets is a good programming practice. Doing so it helps ensure that an application, FM and client services do not hang indefinitely if a network operation like a socket read becomes unresponsive. Configuring a timeout helps manage network communication reliability and efficiency, especially in applications where responsiveness is critical. Especially in server-client application such as FM. Test Plan: PASS: Verify AIO DX system install. PASS: Verify blocked socket timeout and error log after 5 seconds. PASS: Verify unblocked socket reads complete successfully. PASS: Verify alarm assert/clear functions operate normally. PASS: Verify set socket timeout failure handling. PASS: Verify fmManager is not leaking files or memory. PASS: Verify rook-ceph apply remove 100 loop soak - no stall or swact - AIO DX - with 2 OSDs on each controller Closes-Bug: 2088025 Change-Id: I1d947bccf9faeedcc2b96c7bc398fbab77b7ae09 Signed-off-by: Eric MacDonald <eric.macdonald@windriver.com>
495 lines
11 KiB
495 lines
11 KiB
// Copyright (c) 2017,2023-2024 Wind River Systems, Inc.
// SPDX-License-Identifier: Apache-2.0
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <stdlib.h>
#include <sys/poll.h>
#include <sys/select.h>
#include <errno.h>
#include <unistd.h>
#include <stdint.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include "fmSocket.h"
#include "fmThread.h"
#include "fmMutex.h"
#include "fmLog.h"
#include "fmDbUtils.h"
CFmMutex & getConnectionMutex(){
static CFmMutex *m = new CFmMutex;
return *m;
void CFmSockAddr::set_type(socklen_t addr_len) {
if (addr_len==sizeof(address.ipv4)) {
type = AF_INET;
} else if (addr_len==sizeof(address.ipv6)) {
type = AF_INET6;
} else {
type = AF_UNIX;
CFmSocket::CFmSocket() {
m_fd = -1;
CFmSocket::~CFmSocket() {
void CFmSocket::close() {
if (m_fd!=-1) {
//FM_INFO_LOG("close fd:(%d)", m_fd);
m_fd = -1;
bool CFmSocket::create_socket() {
int optval = 1;
socklen_t optlen = sizeof(optval);
m_fd = ::socket(address_family,SOCK_STREAM,0);
if (m_fd == -1){
FM_ERROR_LOG("Failed to create socket, error: (%d) (%s)", errno, strerror(errno));
return false;
/* Set the KEEPALIVE option active */
if(setsockopt(m_fd, SOL_SOCKET, SO_KEEPALIVE, &optval, optlen) < 0) {
FM_ERROR_LOG("Failed to setsockopt, error: (%d) (%s)", errno, strerror(errno));
return false;
//FM_DEBUG_LOG("SO_KEEPALIVE set on socket\n");
struct timeval timeout;
timeout.tv_sec = SOCKET_TIMEOUT_DEFAULT ;
timeout.tv_usec = 0 ;
if(setsockopt(m_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout) < 0) {
FM_ERROR_LOG("Failed to setsockopt SO_RCVTIMEO timeout, error: (%d) (%s)", errno, strerror(errno));
return false;
return true;
bool CFmSocket::connect(const char *host, int port, int address_family) {
this->address_family = address_family;
if (!create_socket()) {
return false;
switch (address_family) {
//When address is IPv4
case AF_INET:
struct sockaddr_in addr;
if (inet_aton(host,&(addr.sin_addr))==0) {
FM_INFO_LOG("inet_aton() failed\n");
return false;
addr.sin_port = htons(port);
addr.sin_family = AF_INET;
fd_set wset;
return (::connect(m_fd,(const struct sockaddr *)&addr,sizeof(addr))==0);
//When address is IPv6
case AF_INET6:
struct sockaddr_in6 addr;
if (inet_pton(AF_INET6,host,&(addr.sin6_addr))<=0) {
FM_INFO_LOG("inet_aton() failed\n");
return false;
addr.sin6_port = htons(port);
addr.sin6_family = AF_INET6;
fd_set wset;
return (::connect(m_fd,(const struct sockaddr *)&addr,sizeof(addr))==0);
//Should never get here, needed for completeness
return false;
bool CFmSocket::write(int fd, const void *data, long len) {
int offset = 0;
while (offset!=len) {
int rc = ::write(fd, ((char*)data)+offset,len-offset);
if (rc==0 || (rc==-1 && errno!=EINTR)) {
FM_ERROR_LOG("Socket Error: Failed to write to fd:(%d), len:(%d), rc:(%d), error:(%s)",
fd, len, rc, strerror(errno));
return false;
if (rc==-1 && errno==EINTR) continue;
return true;
bool CFmSocket::write_packet(int fd, const void *data, long plen) {
uint32_t len = htonl(plen);
bool rc = write(fd,&len,sizeof(len));
if (!rc) return false;
return write(fd,data,plen);
bool CFmSocket::write_packet(int fd, const std::vector<char> &data) {
return write_packet(fd,&(data[0]),data.size());
bool CFmSocket::read_packet(int fd, std::vector<char> &data) {
int32_t len = 0;
long tlen = sizeof(len);
int i = 10;
for ( ; i > 0 ; --i) {
if (!read(fd,&len,tlen)) {
return false;
if (tlen!=sizeof(len))
FM_ERROR_LOG("Socket Error: Length does not match the data size: (%ld), (%ld)", tlen, sizeof(len));
return false;
tlen = len;
if (!read(fd,&(data[0]),tlen)) return false;
return true;
bool CFmSocket::read(int fd,void *data, long &len) {
int offset = 0;
while (offset!=len) {
int rc = ::read(fd, ((char*)data)+offset,len-offset);
if (rc==0 || (rc==-1 && errno!=EINTR)) {
// return code 0 means graceful close of TCP socket
if (rc !=0 ){
FM_ERROR_LOG("Failed to read from fd:(%d), rc:(%d), error:(%s), len:(%d)",
fd, rc, strerror(errno), len);
len = offset;
return false;
if (rc==-1 && errno==EINTR) continue;
return true;
bool CFmSocket::read_packet(std::vector<char> &data) {
return read_packet(m_fd,data);
bool CFmSocket::read(void *data, long &len) {
return read(m_fd,data,len);
int CFmSocket::select(int *rfd, int rlen, int *wfds, int wlen,int timeout, int timeoutusec, bool &timedout) {
fd_set rset,wset;
int max_fd = -1;
int ix = 0;
for ( ; ix < rlen ; ++ix ) {
if (max_fd < rfd[ix]) max_fd = rfd[ix];
ix = 0;
for ( ; ix < wlen ; ++ix ) {
if (max_fd < wfds[ix]) max_fd = wfds[ix];
struct timeval to;
to.tv_sec = timeout;
to.tv_usec = timeoutusec;
int rc = 0;
while (true) {
rc = ::select(max_fd+1,&rset,&wset,NULL,&to);
if (rc==-1 && errno!=EINTR) {
if (rc==-1) continue;
if (rc==0) timedout = true;
if (rc>0) {
ix = 0;
for ( ; ix < rlen ; ++ix ) {
if (!FD_ISSET(rfd[ix],&rset)) {
ix = 0;
for ( ; ix < wlen ; ++ix ) {
if (!FD_ISSET(wfds[ix],&wset)) {
wfds[ix] = -1;
return rc;
int CFmSocket::select_read(int fd,int timeout, bool &timedout){
return select(&fd,1,NULL,0,timeout,0,timedout);
bool CFmSocket::fd_valid(){
struct pollfd pfd = {.fd = m_fd, .events = POLLRDHUP};
if ((poll(&pfd, 1, 0)) < 0) {
return false;
} else {
if (pfd.revents & POLLRDHUP) {
// broken pipe
FM_ERROR_LOG("A broken pipe error occurred\n");
return false;
} else {
return true;
bool CFmSocket::recvfrom(void *data, long &len, CFmSockAddr &addr) {
socklen_t addr_len = sizeof(addr.address);
int l = ::recvfrom(m_fd,data,len,0,addr.get_sockaddr(),&addr_len);
if (l==-1) { len = errno; return false; }
len = l;
return true;
bool CFmSocket::write(const void *data, long len) {
return write(m_fd,data,len);
void FmSocketServer::rm_socket(int sock) {
CFmMutexGuard m(getConnectionMutex());
conn_map_t::iterator it = connections.find(sock);
if (it!=connections.end()) {
void FmSocketServer::handle_socket_data(int fd, std::vector<char> &data,
CFmDBSession &sess) {
FM_INFO_LOG("Received data from sock:%d len %lu\n",fd,data.size());
bool FmSocketServer::good_socket(int sock) {
bool timedout=false;
int rc = select(&sock,1,NULL,0,0,0,timedout);
return (rc!=-1);
void FmSocketServer::find_bad_fd() {
if (!good_socket(m_fd)) {
std::vector<int> lsock;
int ix = 0;
int mx =lsock.size();
for ( ; ix < mx ; ++ix) {
if (!good_socket(lsock[ix])) {
FM_INFO_LOG("Found bad fd, close it:(%d)", lsock[ix]);
bool FmSocketServer::run() {
CFmDBSession *sess;
if (fm_db_util_create_session(&sess) != true){
FM_ERROR_LOG("Fail to create DB session, exit ...\n");
exit (-1);
while (true) {
std::vector<int> lsock;
bool timedout =false;
int rc = select(&(lsock[0]),lsock.size(),NULL,0,1,0,timedout);
if (timedout) continue;
if (rc==-1) {
//listening socket and close all current sockets..
int ix = 0;
int mx = lsock.size();
for ( ; ix < mx ; ++ix ) {
if (lsock[ix]==-1) continue;
if (lsock[ix]==m_fd) {
bool rc = read_packet(lsock[ix],buff);
if (!rc) {
} else {
handle_socket_data(lsock[ix],buff, *sess);
return false;
FmSocketServer::~FmSocketServer() {
conn_map_t::iterator it = connections.begin();
conn_map_t::iterator end = connections.end();
for (; it != end ; ++it) {
void FmSocketServer::to_sock_array(std::vector<int> &socks) {
CFmMutexGuard m(getConnectionMutex());
conn_map_t::iterator it = connections.begin();
conn_map_t::iterator end = connections.end();
for (; it != end ; ++it) {
bool FmSocketServer::server_reset() {
if (!create_socket()) {
FM_INFO_LOG("Failed to create socket for port:(%d)\n", server_port);
return false;
// Set socket reusable
FM_INFO_LOG("Setting socket fd:%d as re-useable", m_fd);
int optval = 1;
setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof optval);
// Set socket read timeout
struct timeval timeout;
timeout.tv_sec = SOCKET_TIMEOUT_DEFAULT ;
timeout.tv_usec = 0 ;
FM_INFO_LOG("Setting socket fd:%d with %d second read timeout", m_fd, timeout.tv_sec );
setsockopt(m_fd, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof timeout);
switch (address_family) {
//When address is IPv4
case AF_INET:
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr(server_addr.c_str());
addr.sin_port = htons(server_port);
int ipAddr = addr.sin_addr.s_addr;
memset(str,0, sizeof(str));
inet_ntop( AF_INET, &ipAddr, str, INET_ADDRSTRLEN );
if (bind(m_fd,(const struct sockaddr *)&addr,sizeof(addr))!=0) {
return false;
FM_INFO_LOG("FM server socket binds the addr:(%s) port:(%d)\n",str, htons(server_port));
if (::listen(m_fd,10)==-1) {
FM_INFO_LOG("listen on fd:(%d) failed, errno: (%d) (%s)\n",
m_fd, errno, strerror(errno));
return true;
//When address is IPv6
case AF_INET6:
struct sockaddr_in6 addr;
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(server_port);
memset(str,0, sizeof(str));
inet_ntop( AF_INET6, &addr.sin6_addr.s6_addr, str, INET6_ADDRSTRLEN );
if (bind(m_fd,(const struct sockaddr *)&addr,sizeof(addr))!=0) {
return false;
FM_INFO_LOG("FM server socket binds the addr:(%s) port:(%d)\n",str, htons(server_port));
if (::listen(m_fd,10)==-1) {
FM_INFO_LOG("listen on fd:(%d) failed, errno: (%d) (%s)\n",
m_fd, errno, strerror(errno));
return true;
//Should never get here, needed for completeness
return false;
bool FmSocketServer::server_sock(const char *bindaddr, int port, int address_family) {
this->address_family = address_family;
server_addr = bindaddr;
server_port = port;
return server_reset();
bool FmSocketServer::accept() {
client_conn con;
socklen_t alen = sizeof(con.addr);
int fd = ::accept(m_fd,con.addr.get_sockaddr(),&alen);
if (fd==-1) {
FM_INFO_LOG("accept returns fd: (%d) errno: (%d) (%s)\n",
fd, errno, strerror(errno));
return false;
con.sock = fd;
CFmMutexGuard m(getConnectionMutex());
connections[fd] = con;
return true;