computing-offload/qtfs/qtfs_common/conn.c
Yikun Jiang a68570b5d9 Add computing offloading code
1. Add computing offloading code
2. Add script.md
3. Add virsh_demo.xml

Change-Id: Id9ef883e2f0eb727eb5448b9d1c47767f46b1021
Signed-off-by: Yikun Jiang <yikunkero@gmail.com>
2023-10-23 19:29:57 +08:00

951 lines
26 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/* SPDX-License-Identifier: GPL-2.0 */
/*
* Copyright (C) 2023. Huawei Technologies Co., Ltd. All rights reserved.
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2 and
* only version 2 as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*/
#include <linux/kallsyms.h>
#include <linux/tcp.h>
#include <net/tcp.h>
#include <linux/un.h>
#include <linux/file.h>
#include "comm.h"
#include "conn.h"
#include "log.h"
#include "req.h"
#include "symbol_wrapper.h"
#include "uds_module.h"
struct qtfs_pvar_ops_s *g_pvar_ops = NULL;
char qtfs_log_level[QTFS_LOGLEVEL_STRLEN] = {0};
char qtfs_conn_type[20] = QTFS_CONN_SOCK_TYPE;
int log_level = LOG_ERROR;
static int qtfs_conn_max_conn = QTFS_MAX_THREADS;
struct qtinfo *qtfs_diag_info = NULL;
bool qtfs_epoll_mode = false; // true: support any mode; false: only support fifo
static atomic_t g_qtfs_conn_num;
static struct list_head g_vld_lst;
static struct list_head g_busy_lst;
static struct llist_head g_lazy_put_llst;
static struct list_head g_fifo_lst;
static struct mutex g_param_mutex;
static struct mutex g_fifo_mutex;
int qtfs_mod_exiting = false;
struct qtfs_conn_var_s *qtfs_thread_var[QTFS_MAX_THREADS] = {NULL};
struct qtfs_conn_var_s *qtfs_epoll_var = NULL;
#ifdef QTFS_SERVER
struct qtfs_server_userp_s *qtfs_userps = NULL;
#endif
#ifdef QTFS_CLIENT
struct kmem_cache *qtfs_fifo_pvar_cache;
#endif
// try to connect remote uds server, only for unix domain socket
#define QTFS_UDS_PROXY_SUFFIX ".proxy"
int qtfs_uds_proxy_build(struct socket *sock, struct sockaddr_un *addr, int len)
{
int ret;
struct uds_proxy_remote_conn_req req;
struct uds_proxy_remote_conn_rsp rsp;
struct sockaddr_un proxy = {.sun_family = AF_UNIX};
struct socket *proxy_sock;
struct msghdr msgs;
struct msghdr msgr;
struct kvec vec;
ret = sock_create_kern(&init_net, AF_UNIX, SOCK_STREAM, 0, &proxy_sock);
if (ret) {
qtfs_err("create proxy sock failed sun path:%s", addr->sun_path);
return -EFAULT;
}
memset(proxy.sun_path, 0, sizeof(proxy.sun_path));
strlcpy(proxy.sun_path, UDS_BUILD_CONN_ADDR, strlen(UDS_BUILD_CONN_ADDR) + 1);
ret = sock->ops->connect(proxy_sock, (struct sockaddr *)&proxy, sizeof(proxy), SOCK_NONBLOCK);
if (ret) {
qtfs_err("connect to uds proxy failed");
goto err_end;
}
memset(req.sun_path, 0, sizeof(req.sun_path));
strlcpy(req.sun_path, addr->sun_path, sizeof(req.sun_path));
memset(&msgs, 0, sizeof(struct msghdr));
memset(&msgr, 0, sizeof(struct msghdr));
req.type = sock->sk->sk_type;
vec.iov_base = &req;
vec.iov_len = sizeof(req);
ret = kernel_sendmsg(proxy_sock, &msgs, &vec, 1, vec.iov_len);
if (ret < 0) {
qtfs_err("send remote connect request failed:%d", ret);
goto err_end;
}
vec.iov_base = &rsp;
vec.iov_len = sizeof(rsp);
ret = kernel_recvmsg(proxy_sock, &msgr, &vec, 1, vec.iov_len, MSG_WAITALL);
if (ret <= 0) {
qtfs_err("recv remote connect response failed:%d", ret);
goto err_end;
}
if (rsp.ret == 0) {
goto err_end;
}
qtfs_info("try to build uds proxy successed, sun path:%s", addr->sun_path);
sock_release(proxy_sock);
return 0;
err_end:
sock_release(proxy_sock);
return -ECONNREFUSED;
}
static int qtfs_uds_remote_whitelist(const char *path)
{
int i;
int ret = 1;
struct qtfs_wl_cap *cap;
read_lock(&g_qtfs_wl.rwlock);
cap = &g_qtfs_wl.cap[QTFS_WHITELIST_UDSCONNECT];
for (i = 0; i < cap->nums; i++) {
if (strncmp(path, cap->item[i], strlen(cap->item[i])) == 0) {
if (strlen(path) > strlen(cap->item[i]) && path[strlen(cap->item[i])] != '/') {
continue;
}
ret = 0;
break;
}
}
read_unlock(&g_qtfs_wl.rwlock);
return ret;
}
static inline int qtfs_uds_is_proxy(void)
{
#define UDS_PROXYD_PRNAME "udsproxyd"
if (strlen(current->comm) == strlen(UDS_PROXYD_PRNAME) &&
strncmp(current->comm, UDS_PROXYD_PRNAME, strlen(UDS_PROXYD_PRNAME)) == 0)
return 1;
return 0;
}
static inline int qtfs_uds_is_rexec(void)
{
#define REXEC_PRNAME "rexec"
if (strlen(current->comm) == strlen(REXEC_PRNAME) &&
strncmp(current->comm, REXEC_PRNAME, strlen(REXEC_PRNAME)) == 0)
return 1;
return 0;
}
int qtfs_uds_remote_connect_user(int fd, struct sockaddr __user *addr, int len)
{
int sysret = -EINVAL;
int ret;
int err;
int un_headlen;
struct fd f;
struct socket *sock;
struct sockaddr_un addr_un;
struct sockaddr_un addr_proxy;
if (qtfs_uds_is_rexec()) {
qtfs_info("Rexec process has no nessary to connect local server");
goto try_conn_remote;
}
sysret = qtfs_syscall_connect(fd, addr, len);
// don't try remote uds connect if: 1.local connect successed; 2.this process is udsproxyd
if (sysret == 0 || qtfs_uds_is_proxy())
return sysret;
try_conn_remote:
// len is passed from syscall input args directly. it's trustworthy
if (copy_from_user(&addr_un, addr, len)) {
qtfs_err("copy sockaddr failed.");
return sysret;
}
// don't try remote uds connect if sunpath not in whitelist
if (qtfs_uds_remote_whitelist(addr_un.sun_path) != 0)
return sysret;
if (addr_un.sun_family != AF_UNIX)
return sysret;
un_headlen = sizeof(struct sockaddr_un) - sizeof(addr_un.sun_path);
// 如果用户态给的参数长度不够,这里智能失败退出
if (len < un_headlen || strlen(addr_un.sun_path) >= (len - un_headlen - strlen(QTFS_UDS_PROXY_SUFFIX))) {
qtfs_err("failed to try connect remote uds server, sun path:%s too long to add suffix:%s",
addr_un.sun_path, QTFS_UDS_PROXY_SUFFIX);
return sysret;
}
qtfs_info("uds connect failed:%d try to remote connect:%s.", sysret, addr_un.sun_path);
f = fdget(fd);
if (f.file == NULL) {
return -EBADF;
}
#if (LINUX_VERSION_CODE >= KERNEL_VERSION(5, 11, 0))
sock =sock_from_file(f.file);
#else
sock = sock_from_file(f.file, &err);
#endif
if (!sock) {
goto end;
}
// try to connect remote uds's proxy
ret = qtfs_uds_proxy_build(sock, &addr_un, len);
if (ret == 0) {
memcpy(&addr_proxy, &addr_un, sizeof(struct sockaddr_un));
strlcat(addr_proxy.sun_path, QTFS_UDS_PROXY_SUFFIX, sizeof(addr_proxy.sun_path));
if (copy_to_user(addr, &addr_proxy, (len > sizeof(struct sockaddr_un)) ? sizeof(struct sockaddr_un) : len)) {
qtfs_err("copy to addr failed sunpath:%s", addr_proxy.sun_path);
goto end;
}
sysret = qtfs_syscall_connect(fd, addr, len);
qtfs_info("try remote connect sunpath:%s ret:%d", addr_un.sun_path, sysret);
if (copy_to_user(addr, &addr_un, (len > sizeof(struct sockaddr_un)) ? sizeof(struct sockaddr_un) : len)) {
qtfs_err("resume addr failed");
goto end;
}
}
end:
fdput(f);
return sysret;
}
int qtfs_conn_init(struct qtfs_conn_var_s *pvar)
{
return pvar->conn_ops->conn_init(&pvar->conn_var, pvar->user_type);
}
void qtfs_conn_fini(struct qtfs_conn_var_s *pvar)
{
return pvar->conn_ops->conn_fini(&pvar->conn_var, pvar->user_type);
}
#define MAGIC_U32(magic, n) ((magic >> (n * 8)) & 0xff)
static inline int qtfs_conn_sync_magic(struct qtfs_conn_var_s *pvar, bool block)
{
u8 byte;
int ret;
if (pvar->magic_recv == 0)
return 0;
while (1) {
ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block);
if (ret <= 0) break;
if (byte != MAGIC_U32(pvar->magic_recv, 3)) continue;
ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block);
if (ret <= 0) break;
if (byte != MAGIC_U32(pvar->magic_recv, 2)) continue;
ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block);
if (ret <= 0) break;
if (byte != MAGIC_U32(pvar->magic_recv, 1)) continue;
ret = pvar->conn_ops->conn_recv(&pvar->conn_var, &byte, 1, block);
if (ret <= 0) break;
if (byte != MAGIC_U32(pvar->magic_recv, 0)) continue;
break;
}
if (ret < 0) {
if (ret != -EAGAIN)
qtfs_err("qtfs sync magic failed ret:%d byte:%u", ret, byte);
return ret;
}
return 0;
}
int qtfs_conn_send(struct qtfs_conn_var_s *pvar)
{
int ret = 0;
int iov_ret = 0;
if (pvar->vec_send.iov_len > pvar->send_max)
return -EMSGSIZE;
if (pvar->magic_send != 0) {
ret = pvar->conn_ops->conn_send(&pvar->conn_var, &pvar->magic_send, sizeof(pvar->magic_send));
if (ret <= 0) {
qtfs_err("magic send failed, ret:%d", ret);
return ret;
}
}
pvar->send_valid = pvar->vec_send.iov_len;
ret = pvar->conn_ops->conn_send(&pvar->conn_var, pvar->vec_send.iov_base, pvar->vec_send.iov_len);
if (ret <= 0)
return ret;
if (pvar->iov_send) {
iov_ret = pvar->conn_ops->conn_send_iter(&pvar->conn_var, pvar->iov_send);
pvar->iov_send = NULL; // invalid it after use
if (iov_ret <= 0)
return iov_ret;
}
return ret + iov_ret;
}
int do_qtfs_conn_recv(struct qtfs_conn_var_s *pvar, bool block)
{
int ret = 0;
int headlen = 0;
struct qtreq *rsp = NULL;
struct kvec load;
unsigned long retrytimes = 0;
headlen = pvar->conn_ops->conn_recv(&pvar->conn_var, pvar->vec_recv.iov_base, QTFS_MSG_HEAD_LEN, block);
if (headlen <= 0) {
return headlen;
}
load.iov_base = pvar->vec_recv.iov_base + QTFS_MSG_HEAD_LEN;
load.iov_len = pvar->vec_recv.iov_len - QTFS_MSG_HEAD_LEN;
rsp = pvar->vec_recv.iov_base;
// only recv head
if (load.iov_len == 0)
goto end;
retry:
ret = pvar->conn_ops->conn_recv(&pvar->conn_var, load.iov_base,
(rsp->len < load.iov_len) ? rsp->len : load.iov_len, true);
if (ret == -EAGAIN)
goto retry;
if (ret == -ERESTARTSYS) {
#ifdef QTFS_CLIENT
if (retrytimes == 0) {
qtinfo_cntinc(QTINF_RESTART_SYS);
qtinfo_recverrinc(rsp->type);
}
#endif
retrytimes++;
msleep(1);
goto retry;
}
if (ret < 0) {
qtfs_err("qtfs recv get invalidelen is :%d", ret);
return ret;
}
if (ret > rsp->len) {
qtfs_crit("recv total:%d msg len:%lu\n", ret, rsp->len);
WARN_ON(1);
}
end:
return ret + headlen;
}
int qtfs_conn_recv_block(struct qtfs_conn_var_s *pvar)
{
int ret = 0;
ret = qtfs_conn_sync_magic(pvar, true);
if (ret != 0) {
return ret;
}
ret = do_qtfs_conn_recv(pvar, true);
if (ret > 0) {
pvar->recv_valid = (ret > pvar->recv_max) ? pvar->recv_max : ret;
}
return ret;
}
int qtfs_conn_recv(struct qtfs_conn_var_s *pvar)
{
int ret = 0;
ret = qtfs_conn_sync_magic(pvar, true);
if (ret != 0) {
return ret;
}
ret = do_qtfs_conn_recv(pvar, false);
if (ret <= 0) {
msleep(1);
} else {
pvar->recv_valid = (ret > pvar->recv_max) ? pvar->recv_max : ret;
}
return ret;
}
int qtfs_conn_var_init(struct qtfs_conn_var_s *pvar)
{
INIT_LIST_HEAD(&pvar->lst);
// qtfs消息为130多k当作最大值作为合法性判断
if (pvar->recv_max > QTFS_MSG_LEN || pvar->send_max > QTFS_MSG_LEN ||
pvar->recv_max == 0 || pvar->recv_max == 0) {
qtfs_err("invalid recv max:%u or invalid send max:%u",
pvar->recv_max, pvar->send_max);
return QTFS_ERR;
}
pvar->vec_recv.iov_base = kmalloc(pvar->recv_max, GFP_KERNEL);
if (pvar->vec_recv.iov_base == NULL) {
qtfs_err("qtfs recv kmalloc failed, len:%u.\n", pvar->recv_max);
return QTFS_ERR;
}
pvar->vec_send.iov_base = kmalloc(pvar->send_max, GFP_KERNEL);
if (pvar->vec_send.iov_base == NULL) {
qtfs_err("qtfs send kmalloc failed, len:%u.\n", pvar->send_max);
kfree(pvar->vec_recv.iov_base);
pvar->vec_recv.iov_base = NULL;
return QTFS_ERR;
}
pvar->vec_recv.iov_len = pvar->recv_max;
pvar->vec_send.iov_len = 0;
memset(pvar->vec_recv.iov_base, 0, pvar->recv_max);
memset(pvar->vec_send.iov_base, 0, pvar->send_max);
pvar->recv_valid = 0;
pvar->send_valid = 0;
qtfs_info("init pvar thread:%d recv max:%u, send max:%u", pvar->cur_threadidx, pvar->recv_max, pvar->send_max);
return QTFS_OK;
}
void qtfs_conn_var_fini(struct qtfs_conn_var_s *pvar)
{
if (pvar->vec_recv.iov_base != NULL) {
kfree(pvar->vec_recv.iov_base);
pvar->vec_recv.iov_base = NULL;
pvar->vec_recv.iov_len = 0;
}
if (pvar->vec_send.iov_base != NULL) {
kfree(pvar->vec_send.iov_base);
pvar->vec_send.iov_base = NULL;
pvar->vec_send.iov_len = 0;
}
return;
}
void qtfs_conn_msg_clear(struct qtfs_conn_var_s *pvar)
{
memset(pvar->vec_recv.iov_base, 0, pvar->recv_valid);
memset(pvar->vec_send.iov_base, 0, pvar->send_valid);
pvar->recv_valid = 0;
pvar->send_valid = 0;
#ifdef QTFS_CLIENT
memset(pvar->who_using, 0, QTFS_FUNCTION_LEN);
#endif
return;
}
void *qtfs_conn_msg_buf(struct qtfs_conn_var_s *pvar, int dir)
{
struct qtreq *req = (dir == QTFS_SEND) ? pvar->vec_send.iov_base : pvar->vec_recv.iov_base;
return req->data;
}
// state machine
#define QTCONN_CUR_STATE(pvar) ((pvar->state == QTCONN_INIT) ? "INIT" : \
((pvar->state == QTCONN_CONNECTING) ? "CONNECTING" : \
((pvar->state == QTCONN_ACTIVE) ? "ACTIVE" : "UNKNOWN")))
static int qtfs_sm_connecting(struct qtfs_conn_var_s *pvar)
{
int ret = QTERROR;
int retry = 3;
while (qtfs_mod_exiting == false && retry-- > 0) {
ret = pvar->conn_ops->conn_new_connection(&pvar->conn_var, pvar->user_type);
if (ret == 0) {
qtfs_info("qtfs sm connecting connect to a new connection.");
break;
}
msleep(100);
}
return ret;
}
int qtfs_sm_active(struct qtfs_conn_var_s *pvar)
{
int ret = 0;
switch (pvar->state) {
case QTCONN_ACTIVE:
// do nothing
break;
case QTCONN_INIT:
ret = qtfs_conn_init(pvar);
if (ret) {
qtfs_err("qtfs sm active init failed, ret:%d.", ret);
break;
}
// dont break, just enter connecting state to process
pvar->state = QTCONN_CONNECTING;
qtfs_info("qtfs sm active connecting, threadidx:%d",
pvar->cur_threadidx);
// fall-through
case QTCONN_CONNECTING:
// accept(server) or connect(client)
ret = qtfs_sm_connecting(pvar);
if (ret == 0)
pvar->state = QTCONN_ACTIVE;
break;
default:
qtfs_err("qtfs sm active unknown state:%s.", QTCONN_CUR_STATE(pvar));
ret = -EINVAL;
break;
}
return ret;
}
int qtfs_sm_reconnect(struct qtfs_conn_var_s *pvar)
{
int ret = QTOK;
switch (pvar->state) {
case QTCONN_INIT:
WARN_ON(1);
qtfs_err("qtfs sm reconnect state error!");
ret = QTERROR;
break;
case QTCONN_ACTIVE:
qtfs_conn_fini(pvar);
ret = qtfs_conn_init(pvar);
if (ret) {
qtfs_err("qtfs sm active init failed, ret:%d.", ret);
ret = QTERROR;
pvar->state = QTCONN_INIT;
break;
}
pvar->state = QTCONN_CONNECTING;
qtfs_warn("qtfs sm reconnect thread:%d, state:%s.", pvar->cur_threadidx, QTCONN_CUR_STATE(pvar));
// fall-through
case QTCONN_CONNECTING:
ret = qtfs_sm_connecting(pvar);
if (ret == 0)
pvar->state = QTCONN_ACTIVE;
break;
default:
qtfs_err("qtfs sm reconnect unknown state:%s.", QTCONN_CUR_STATE(pvar));
ret = QTERROR;
break;
}
return ret;
}
int qtfs_sm_exit(struct qtfs_conn_var_s *pvar)
{
int ret = QTOK;
switch (pvar->state) {
case QTCONN_INIT:
// do nothing
break;
case QTCONN_ACTIVE:
case QTCONN_CONNECTING:
qtfs_conn_fini(pvar);
#ifdef QTFS_SERVER
pvar->state = QTCONN_CONNECTING;
#endif
#ifdef QTFS_CLIENT
pvar->state = QTCONN_INIT;
#endif
qtfs_warn("qtfs sm exit thread:%d state:%s.", pvar->cur_threadidx, QTCONN_CUR_STATE(pvar));
break;
default:
qtfs_err("qtfs sm exit unknown state:%s.", QTCONN_CUR_STATE(pvar));
ret = QTERROR;
break;
}
return ret;
}
int qtfs_mutex_lock_interruptible(struct mutex *lock)
{
int ret;
ret = mutex_lock_interruptible(lock);
if (ret == 0) {
// mutex lock successed, proc lazy put
while (1) {
struct llist_node *toput = llist_del_first(&g_lazy_put_llst);
struct qtfs_conn_var_s *pvar;
if (toput == NULL)
break;
pvar = llist_entry(toput, struct qtfs_conn_var_s, lazy_put);
pvar->conn_ops->conn_msg_clear(pvar);
list_move_tail(&pvar->lst, &g_vld_lst);
qtfs_warn("qtfs pvar lazy put idx:%d.", pvar->cur_threadidx);
}
}
return ret;
}
static void parse_param(void)
{
// reserve for pcie conn type
// default as socket type
g_pvar_ops = &qtfs_conn_sock_pvar_ops;
// calling conn specific parse_param
g_pvar_ops->parse_param();
}
int qtfs_conn_param_init(void)
{
#ifdef QTFS_CLIENT
qtfs_fifo_pvar_cache = kmem_cache_create("qtfs_fifo_pvar",
sizeof(struct qtfs_conn_var_s),
0,
(SLAB_RECLAIM_ACCOUNT | SLAB_MEM_SPREAD),
NULL);
if (!qtfs_fifo_pvar_cache) {
qtfs_err("qtfs fifo pvar cache create failed.\n");
return -ENOMEM;
}
#endif
INIT_LIST_HEAD(&g_vld_lst);
INIT_LIST_HEAD(&g_busy_lst);
INIT_LIST_HEAD(&g_fifo_lst);
init_llist_head(&g_lazy_put_llst);
atomic_set(&g_qtfs_conn_num, 0);
// parse module_param and choose specified channel
// should set g_pvar_ops here
parse_param();
g_pvar_ops->param_init();
mutex_init(&g_param_mutex);
mutex_init(&g_fifo_mutex);
return 0;
}
void release_pvar(struct qtfs_conn_var_s *pvar)
{
if (!pvar)
return;
pvar->conn_ops->conn_var_fini(pvar);
qtfs_sm_exit(pvar);
if (pvar->cur_threadidx < 0 || pvar->cur_threadidx >= QTFS_MAX_THREADS) {
qtfs_err("qtfs free unknown threadidx %d", pvar->cur_threadidx);
} else {
qtfs_thread_var[pvar->cur_threadidx] = NULL;
qtfs_info("qtfs free pvar idx:%d successed.", pvar->cur_threadidx);
}
list_del(&pvar->lst);
kfree(pvar);
}
void qtfs_conn_param_fini(void)
{
struct list_head *plst;
struct list_head *n;
int ret;
int conn_num;
int i;
#ifdef QTFS_CLIENT
kmem_cache_destroy(qtfs_fifo_pvar_cache);
#endif
ret = qtfs_mutex_lock_interruptible(&g_param_mutex);
if (ret) {
qtfs_err("qtfs conn param finish mutex lock interrup failed, ret:%d.", ret);
WARN_ON(1);
return;
}
list_for_each_safe(plst, n, &g_vld_lst) {
release_pvar((struct qtfs_conn_var_s *)plst);
}
list_for_each_safe(plst, n, &g_busy_lst) {
release_pvar((struct qtfs_conn_var_s *)plst);
}
conn_num = atomic_read(&g_qtfs_conn_num);
for (i = 0; i < conn_num; i++) {
if (qtfs_thread_var[i] != NULL) {
qtfs_err("qtfs param not free idx:%d holder:%s",
qtfs_thread_var[i]->cur_threadidx,
qtfs_thread_var[i]->who_using);
}
}
mutex_unlock(&g_param_mutex);
g_pvar_ops->param_fini();
}
struct qtfs_conn_var_s *_qtfs_conn_get_param(const char *func)
{
struct qtfs_conn_var_s *pvar = NULL;
int ret;
int cnt = 0;
if (qtfs_mod_exiting == true) {
qtfs_warn("qtfs module is exiting, good bye!");
return NULL;
}
retry:
ret = qtfs_mutex_lock_interruptible(&g_param_mutex);
if (ret) {
qtfs_err("qtfs conn get param mutex lock interrup failed, ret:%d.", ret);
return NULL;
}
if (!list_empty(&g_vld_lst))
pvar = list_last_entry(&g_vld_lst, struct qtfs_conn_var_s, lst);
if (pvar != NULL) {
list_move_tail(&pvar->lst, &g_busy_lst);
}
mutex_unlock(&g_param_mutex);
if (pvar != NULL) {
int ret;
if (pvar->state == QTCONN_ACTIVE && pvar->conn_ops->conn_connected(&pvar->conn_var) == false) {
qtfs_warn("qtfs get param thread:%d disconnected, try to reconnect.", pvar->cur_threadidx);
ret = qtfs_sm_reconnect(pvar);
} else {
ret = qtfs_sm_active(pvar);
}
if (ret != 0) {
qtfs_conn_put_param(pvar);
return (IS_ERR_VALUE((long)ret) ? ERR_PTR((long)ret) : NULL);
}
strlcpy(pvar->who_using, func, QTFS_FUNCTION_LEN);
return pvar;
}
ret = qtfs_mutex_lock_interruptible(&g_param_mutex);
if (ret) {
qtfs_err("qtfs conn get param mutex lock interrup failed, ret:%d.", ret);
return NULL;
}
if (atomic_read(&g_qtfs_conn_num) >= qtfs_conn_max_conn) {
mutex_unlock(&g_param_mutex);
cnt++;
msleep(1);
if (cnt < QTFS_GET_PARAM_MAX_RETRY)
goto retry;
qtfs_err("qtfs get param failed, the concurrency specification has reached the upper limit");
return NULL;
}
pvar = kmalloc(sizeof(struct qtfs_conn_var_s), GFP_KERNEL);
if (pvar == NULL) {
qtfs_err("qtfs get param kmalloc failed.\n");
mutex_unlock(&g_param_mutex);
return NULL;
}
memset(pvar, 0, sizeof(struct qtfs_conn_var_s));
// initialize conn_pvar here
pvar->recv_max = QTFS_MSG_LEN;
pvar->send_max = QTFS_MSG_LEN;
pvar->user_type = QTFS_CONN_TYPE_QTFS;
g_pvar_ops->pvar_init(&pvar->conn_var, &pvar->conn_ops, pvar->user_type);
if (QTFS_OK != pvar->conn_ops->conn_var_init(pvar)) {
qtfs_err("qtfs sock var init failed.\n");
kfree(pvar);
mutex_unlock(&g_param_mutex);
return NULL;
}
memcpy(pvar->who_using, func, (strlen(func) >= QTFS_FUNCTION_LEN - 1) ? (QTFS_FUNCTION_LEN - 1) : strlen(func));
pvar->cur_threadidx = atomic_read(&g_qtfs_conn_num);
qtfs_info("qtfs create new param, cur conn num:%d\n", atomic_read(&g_qtfs_conn_num));
qtfs_thread_var[pvar->cur_threadidx] = pvar;
// add to busy list
atomic_inc(&g_qtfs_conn_num);
list_add(&pvar->lst, &g_busy_lst);
pvar->state = QTCONN_INIT;
pvar->seq_num = 0;
#ifdef QTFS_CLIENT
mutex_unlock(&g_param_mutex);
ret = qtfs_sm_active(pvar);
if (ret) {
qtfs_err("qtfs get param active connection failed, ret:%d, curstate:%s", ret, QTCONN_CUR_STATE(pvar));
// put to vld list
qtfs_conn_put_param(pvar);
return (IS_ERR_VALUE((long)ret) ? ERR_PTR((long)ret) : NULL);
}
qtfs_thread_var[pvar->cur_threadidx] = pvar;
#else
if (!pvar->conn_ops->conn_inited(pvar, pvar->user_type)) {
if ((ret = qtfs_sm_active(pvar)) != 0) {
qtfs_err("qtfs get param active connection failed, ret:%d, curstate:%s", ret, QTCONN_CUR_STATE(pvar));
// put to vld list
mutex_unlock(&g_param_mutex);
qtfs_conn_put_param(pvar);
return (IS_ERR_VALUE((long)ret) ? ERR_PTR((long)ret) : NULL);
}
mutex_unlock(&g_param_mutex);
} else {
mutex_unlock(&g_param_mutex);
pvar->state = QTCONN_CONNECTING;
ret = qtfs_sm_active(pvar);
if (ret) {
qtfs_err("qtfs get param active connection failed, ret:%d curstate:%s", ret, QTCONN_CUR_STATE(pvar));
qtfs_conn_put_param(pvar);
return (IS_ERR_VALUE((long)ret) ? ERR_PTR((long)ret) : NULL);
}
}
#endif
qtinfo_cntinc(QTINF_ACTIV_CONN);
return pvar;
}
struct qtfs_conn_var_s *qtfs_epoll_establish_conn(void)
{
struct qtfs_conn_var_s *pvar = NULL;
int ret;
pvar = qtfs_epoll_var;
if (pvar) {
if (pvar->state == QTCONN_ACTIVE && pvar->conn_ops->conn_connected(&pvar->conn_var) == false) {
qtfs_warn("qtfs epoll get param thread:%d disconnected, try to reconnect.", pvar->cur_threadidx);
ret = qtfs_sm_reconnect(pvar);
} else {
ret = qtfs_sm_active(pvar);
}
if (ret) {
return NULL;
}
return pvar;
}
pvar = kmalloc(sizeof(struct qtfs_conn_var_s), GFP_KERNEL);
if (pvar == NULL) {
qtfs_err("qtfs get param kmalloc failed.\n");
return NULL;
}
memset(pvar, 0, sizeof(struct qtfs_conn_var_s));
pvar->recv_max = QTFS_EPOLL_MSG_LEN;
pvar->send_max = QTFS_EPOLL_MSG_LEN;
pvar->user_type = QTFS_CONN_TYPE_EPOLL;
pvar->cur_threadidx = QTFS_EPOLL_THREADIDX;
g_pvar_ops->pvar_init(&pvar->conn_var, &pvar->conn_ops, pvar->user_type);
if (QTFS_OK != pvar->conn_ops->conn_var_init(pvar)) {
qtfs_err("qtfs sock var init failed.\n");
kfree(pvar);
return NULL;
}
qtfs_epoll_var = pvar;
pvar->state = QTCONN_INIT;
ret = qtfs_sm_active(pvar);
if (ret) {
qtfs_err("qtfs epoll get param active new param failed, ret:%d state:%s", ret, QTCONN_CUR_STATE(pvar));
return pvar;
}
qtfs_info("qtfs create new epoll param state:%s", QTCONN_CUR_STATE(pvar));
return pvar;
}
void qtfs_conn_put_param(struct qtfs_conn_var_s *pvar)
{
int ret;
if (!pvar) {
qtfs_err("qtfs_conn_var_s is null!!");
return;
}
ret = qtfs_mutex_lock_interruptible(&g_param_mutex);
if (ret) {
llist_add(&pvar->lazy_put, &g_lazy_put_llst);
qtfs_warn("qtfs conn put param add to lazy list idx:%d, ret:%d.", pvar->cur_threadidx, ret);
return;
}
pvar->conn_ops->conn_msg_clear(pvar);
list_move_tail(&pvar->lst, &g_vld_lst);
mutex_unlock(&g_param_mutex);
}
void qtfs_epoll_cut_conn(struct qtfs_conn_var_s *pvar)
{
int ret = 0;
if (!pvar) {
qtfs_err("qtfs_conn_var_s is null!!");
return;
}
ret = qtfs_sm_exit(pvar);
if (ret) {
qtfs_err("qtfs epoll put param exit failed, ret:%d state:%s", ret, QTCONN_CUR_STATE(pvar));
}
}
#ifdef QTFS_CLIENT
/* fifo的机制有所不同每一个pvar对应唯一一个fifo的访问生命周期贯穿
从fifo open开始到fifo close结束在open时get param在close时put param */
#define QTFS_FIFO_MAGIC_SEND 0xa55aa55a
#define QTFS_FIFO_MAGIC_RECV 0x5aa55aa5
struct qtfs_conn_var_s *qtfs_fifo_get_param(void)
{
int ret;
struct qtfs_conn_var_s *pvar = kmem_cache_alloc(qtfs_fifo_pvar_cache, GFP_KERNEL);
if (pvar == NULL) {
qtfs_err("kmem cache alloc fifo cache failed.");
return NULL;
}
memset(pvar, 0, sizeof(struct qtfs_conn_var_s));
// initialize conn_pvar here
pvar->recv_max = QTFS_FIFO_REQ_LEN;
pvar->send_max = QTFS_FIFO_REQ_LEN;
pvar->magic_send = QTFS_FIFO_MAGIC_SEND;
pvar->magic_recv = QTFS_FIFO_MAGIC_RECV;
pvar->user_type = QTFS_CONN_TYPE_FIFO;
g_pvar_ops->pvar_init(&pvar->conn_var, &pvar->conn_ops, pvar->user_type);
if (QTFS_OK != pvar->conn_ops->conn_var_init(pvar)) {
qtfs_err("qtfs sock var init failed.\n");
kmem_cache_free(qtfs_fifo_pvar_cache, pvar);
return NULL;
}
pvar->state = QTCONN_INIT;
ret = qtfs_sm_active(pvar);
if (ret) {
qtfs_err("qtfs fifo get param active new param faile, ret:%d state:%s", ret, QTCONN_CUR_STATE(pvar));
pvar->conn_ops->conn_var_fini(pvar);
kmem_cache_free(qtfs_fifo_pvar_cache, pvar);
return NULL;
}
mutex_lock(&g_fifo_mutex);
list_add(&pvar->lst, &g_fifo_lst);
mutex_unlock(&g_fifo_mutex);
qtfs_info("qtfs create new fifo param state:%s", QTCONN_CUR_STATE(pvar));
return pvar;
}
void qtfs_fifo_put_param(struct qtfs_conn_var_s *pvar)
{
mutex_lock(&g_fifo_mutex);
list_del(&pvar->lst);
mutex_unlock(&g_fifo_mutex);
qtfs_sm_exit(pvar);
pvar->conn_ops->conn_var_fini(pvar);
kmem_cache_free(qtfs_fifo_pvar_cache, pvar);
return;
}
#endif
void qtfs_conn_list_cnt(void)
{
struct list_head *entry;
struct qtfs_conn_var_s *pvar;
#ifdef QTFS_CLIENT
int ret = 0;
ret = qtfs_mutex_lock_interruptible(&g_param_mutex);
if (ret) {
qtfs_err("qtfs conn put param mutex lock interrup failed, ret:%d.", ret);
return;
}
#endif
qtfs_diag_info->pvar_busy = 0;
qtfs_diag_info->pvar_vld = 0;
memset(qtfs_diag_info->who_using, 0, sizeof(qtfs_diag_info->who_using));
list_for_each(entry, &g_busy_lst) {
qtfs_diag_info->pvar_busy++;
pvar = (struct qtfs_conn_var_s *)entry;
if (pvar->cur_threadidx < 0 || pvar->cur_threadidx >= QTFS_MAX_THREADS)
continue;
strlcpy(qtfs_diag_info->who_using[pvar->cur_threadidx],
qtfs_thread_var[pvar->cur_threadidx]->who_using, QTFS_FUNCTION_LEN);
}
list_for_each(entry, &g_vld_lst)
qtfs_diag_info->pvar_vld++;
#ifdef QTFS_CLIENT
mutex_unlock(&g_param_mutex);
#endif
}
module_param(qtfs_conn_max_conn, int, 0600);
module_param_string(qtfs_log_level, qtfs_log_level, sizeof(qtfs_log_level), 0600);
module_param_string(qtfs_conn_type, qtfs_conn_type, sizeof(qtfs_conn_type), 0600);