/* \brief 数据分发虚拟设备实现 数据分发虚设备用于MARSIO服务进程向各从应用分发数据,并将各从应用发送的数据转发到物理网卡。 \author Qiuwen Lu \date 2017-05-09 */ #include #include #include #include #include /* ============================= Common Functions =========================================== */ static int vdev_data_create_loop_buffer(const char * prefix, const char * devsym, unsigned int nr_stream, unsigned int sz_tunnel, struct rte_ring * object[]) { for (int i = 0; i < nr_stream; i++) { char str_loop_buffer[MR_SYMBOL_MAX]; snprintf(str_loop_buffer, sizeof(str_loop_buffer), "%s-%s-%d", prefix, devsym, i); object[i] = rte_ring_create(str_loop_buffer, sz_tunnel, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); if (object[i] == NULL) { MR_WARNING("Create loop buffer for vdev %s(sym=%s, size=%d) failed. : %s\n", devsym, str_loop_buffer, sz_tunnel, __str_rte_errno()); goto err_out; } } return 0; err_out: for (int i = 0; i < nr_stream; i++) if (object[i] != NULL) rte_ring_free(object[i]); return -1; } static int vdev_data_dispatch(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags) { hash_t hash_value[MR_BURST_MAX]; for (int i = 0; i < nr_pkts; i++) hash_value[i] = pkts[i]->hash.usr; for (int i = 0; i < nr_pkts; i++) __rte_mbuf_sanity_check(pkts[i], 1); return vnode_mirror_enqueue_bulk(_vdev->vnode_rx_prod, qid, pkts, hash_value, nr_pkts); } static int vdev_data_collect(struct _vdev * _vdev, queue_id_t qid, struct rte_mbuf * pkts[], unsigned int nr_pkts, int flags) { int __nr_mbufs_out = 0; int __nr_mbufs_left = nr_pkts; int ret; // 从LTX通道取包 ret = vnode_mirror_dequeue_burst(_vdev->vnode_ltx_cons, qid, &pkts[__nr_mbufs_out], __nr_mbufs_left); __nr_mbufs_out += ret; __nr_mbufs_left -= ret; assert(__nr_mbufs_left >= 0); // 从FTX通道取包 ret = vnode_mirror_dequeue_burst(_vdev->vnode_ftx_cons, qid, &pkts[__nr_mbufs_out], __nr_mbufs_left); __nr_mbufs_out += ret; __nr_mbufs_left -= ret; assert(__nr_mbufs_left >= 0); // 剩余的容量,从TX通道取 ret = vnode_mirror_dequeue_burst(_vdev->vnode_tx_cons, qid, &pkts[__nr_mbufs_out], __nr_mbufs_left); __nr_mbufs_out += ret; __nr_mbufs_left -= ret; assert(__nr_mbufs_left >= 0); for (int i = 0; i < __nr_mbufs_out; i++) __rte_mbuf_sanity_check(pkts[i], 1); return __nr_mbufs_out; } static int vdev_data_idle_poll(struct _vdev * _vdev, queue_id_t qid) { vnode_mirror_flush(_vdev->vnode_rx_prod, qid); return 0; } static int vdev_data_stats_get(struct _vdev * _vdev, struct vdev_stat_info * stat_info) { struct vnode_prod_stat * st_prod_rx = vnode_mirror_prod_stat_get(_vdev->vnode_rx_prod); struct vnode_cons_stat * st_cons_tx = vnode_mirror_cons_stat_get(_vdev->vnode_tx_cons); struct vnode_cons_stat * st_cons_ftx = vnode_mirror_cons_stat_get(_vdev->vnode_ftx_cons); //TODO: What about nr_rxstream, nr_txstream is different? for (int i = 0; i < _vdev->nr_rxstream; i++) { stat_info->rx_on_line[i] = rte_atomic64_read(&st_prod_rx[i].on_line); stat_info->rx_deliver[i] = rte_atomic64_read(&st_prod_rx[i].deliver); stat_info->rx_missed[i] = rte_atomic64_read(&st_prod_rx[i].missed); stat_info->rx_total_len[i] = rte_atomic64_read(&st_prod_rx[i].on_line); } for (int i = 0; i < _vdev->nr_txstream; i++) { stat_info->tx_on_line[i] = rte_atomic64_read(&st_cons_tx[i].on_line); stat_info->tx_deliver[i] = rte_atomic64_read(&st_cons_tx[i].deliver); stat_info->tx_missed[i] = rte_atomic64_read(&st_cons_tx[i].missed); stat_info->tx_total_len[i] = rte_atomic64_read(&st_cons_tx[i].total_len); stat_info->ftx_on_line[i] = rte_atomic64_read(&st_cons_ftx[i].on_line); stat_info->ftx_deliver[i] = rte_atomic64_read(&st_cons_ftx[i].deliver); stat_info->ftx_missed[i] = rte_atomic64_read(&st_cons_ftx[i].missed); stat_info->ftx_total_len[i] = rte_atomic64_read(&st_cons_ftx[i].total_len); } return 0; } static int vdev_data_instance_destory(struct vdev_instance * vdi) { if (vdi->vnode_rx_cons != NULL) vnode_mirror_delete_cons(vdi->vnode_rx_cons); if (vdi->vnode_tx_prod != NULL) vnode_mirror_delete_prod(vdi->vnode_tx_prod); if (vdi->vnode_ftx_prod != NULL) vnode_mirror_delete_prod(vdi->vnode_ftx_prod); if (vdi->vnode_ltx_prod != NULL) vnode_mirror_delete_prod(vdi->vnode_ltx_prod); FREE(vdi); return 0; } /* 创建虚设备实例,应用使用 */ static struct vdev_instance * vdev_data_instance_create(struct _vdev * _vdev, const char * appsym, unsigned int nr_rxstream, unsigned int nr_txstream) { struct vdev_instance * vdi = ZMALLOC(sizeof(struct vdev_instance)); MR_VERIFY_MALLOC(vdi); vdi->vdev = &_vdev->vdev; if (nr_rxstream > 0) { vdi->vnode_rx_cons = vnode_mirror_create_cons(_vdev->vnode_rx, appsym, nr_rxstream); } if (nr_txstream > 0) { vdi->vnode_tx_prod = vnode_mirror_create_prod(_vdev->vnode_tx, appsym, nr_txstream); } if (nr_txstream > 0) { vdi->vnode_ftx_prod = vnode_mirror_create_prod(_vdev->vnode_ftx, appsym, nr_txstream); } if (nr_txstream > 0) { vdi->vnode_ltx_prod = vnode_mirror_create_prod(_vdev->vnode_ltx, appsym, nr_txstream); } if (nr_rxstream > 0 && vdi->vnode_rx_cons == NULL) { MR_ERROR("Creating VDI for device %s failed: " "Cannot create rx consumer, nr_rxstream = %d", vdi->vdev->symbol, nr_rxstream); goto errout; } if (nr_txstream > 0 && vdi->vnode_tx_prod == NULL) { MR_ERROR("Creating VDI for device %s failed: " "Cannot create TX producer, nr_txstream = %d", vdi->vdev->symbol, nr_txstream); goto errout; } if (nr_txstream > 0 && vdi->vnode_ftx_prod == NULL) { MR_ERROR("Creating VDI for device %s failed: " "Cannot create FTX producer, nr_txstream = %d", vdi->vdev->symbol, nr_txstream); goto errout; } if (nr_txstream > 0 && vdi->vnode_ltx_prod == NULL) { MR_ERROR("Creating VDI for device %s failed: " "Cannot create LTX producer, nr_txstream = %d", vdi->vdev->symbol, nr_txstream); goto errout; } vdi->direct_pool = _vdev->direct_pool; vdi->indirect_pool = _vdev->indirect_pool; vdi->nr_rxstream = nr_rxstream; vdi->nr_txstream = nr_txstream; return vdi; errout: vdev_data_instance_destory(vdi); return NULL; } static int vdev_data_destory(struct _vdev * _vdev) { return 0; } int vdev_data_create(struct vdev_main* v_main, const char * symbol, unsigned int nr_rxstream, unsigned int nr_txstream, unsigned int sz_tunnel, unsigned int sz_buffer, struct rte_mempool * direct_pool, struct rte_mempool * indirect_pool) { // 检查设备是否已经存在,不允许重复创建 struct vdev * vdev_info = vdev_lookup(v_main, symbol); if (vdev_info != NULL) { MR_WARNING("vdev %s has been created. failed. \n", vdev_info->symbol); return RT_IGNORE; } #define _ERR_VERIFY(x, ...) \ do { if(x == NULL) { MR_ERROR(__VA_ARGS__); goto errout;} } while(0) \ // 申请Info结构体的空间 struct _vdev * _vdev = ZMALLOC(sizeof(struct _vdev)); MR_VERIFY_MALLOC(_vdev); /* 填充vdev信息 */ vdev_info = &_vdev->vdev; /* 填充_vdev信息 */ snprintf(vdev_info->symbol, sizeof(vdev_info->symbol), "%s", symbol); _vdev->nr_rxstream = nr_rxstream; _vdev->nr_txstream = nr_txstream; _vdev->sz_buffer = sz_buffer; _vdev->sz_tunnel = sz_tunnel; _vdev->direct_pool = direct_pool; _vdev->indirect_pool = indirect_pool; char vnode_sym_rx[MR_SYMBOL_MAX]; snprintf(vnode_sym_rx, sizeof(vnode_sym_rx), "%s-rx", vdev_info->symbol); char vnode_sym_tx[MR_SYMBOL_MAX]; snprintf(vnode_sym_tx, sizeof(vnode_sym_tx), "%s-tx", vdev_info->symbol); char vnode_sym_ftx[MR_SYMBOL_MAX]; snprintf(vnode_sym_ftx, sizeof(vnode_sym_ftx), "%s-ftx", vdev_info->symbol); char vnode_sym_ltx[MR_SYMBOL_MAX]; snprintf(vnode_sym_ltx, sizeof(vnode_sym_ltx), "%s-ltx", vdev_info->symbol); /* 创建VNODE */ _vdev->vnode_rx = vnode_mirror_create(vnode_sym_rx, sz_tunnel, sz_buffer, indirect_pool); _vdev->vnode_tx = vnode_mirror_create(vnode_sym_tx, sz_tunnel, sz_buffer, indirect_pool); _vdev->vnode_ftx = vnode_mirror_create(vnode_sym_ftx, sz_tunnel, 0, indirect_pool); _vdev->vnode_ltx = vnode_mirror_create(vnode_sym_ltx, sz_tunnel, 0, indirect_pool); /* 错误校验 */ _ERR_VERIFY(_vdev->vnode_rx, "Create vdev %s rx vnode failed.", vdev_info->symbol); _ERR_VERIFY(_vdev->vnode_tx, "Create vdev %s tx vnode failed.", vdev_info->symbol); _ERR_VERIFY(_vdev->vnode_ftx, "Create vdev %s fast tx vnode failed. ", vdev_info->symbol); _ERR_VERIFY(_vdev->vnode_ltx, "Create vdev %s lock tx vnode failed. ", vdev_info->symbol); _vdev->vnode_rx_prod = vnode_mirror_create_prod(_vdev->vnode_rx, "sv", nr_rxstream); _vdev->vnode_tx_cons = vnode_mirror_create_cons(_vdev->vnode_tx, "sv", nr_txstream); _vdev->vnode_ftx_cons = vnode_mirror_create_cons(_vdev->vnode_ftx, "sv", nr_txstream); _vdev->vnode_ltx_cons = vnode_mirror_create_cons(_vdev->vnode_ltx, "sv", nr_txstream); /* 校验,申请VNODE、VNODE生产者、消费者是否成功 */ _ERR_VERIFY(_vdev->vnode_rx_prod, "Create vdev %s rx vnode producer failed. ", vdev_info->symbol); _ERR_VERIFY(_vdev->vnode_tx_cons, "Create vdev %s tx vnode consumer failed. ", vdev_info->symbol); _ERR_VERIFY(_vdev->vnode_ftx_cons, "Create vdev %s fast tx vnode consumer failed. ", vdev_info->symbol); _ERR_VERIFY(_vdev->vnode_ltx_cons, "Create vdev %s lock tx vnode consumer failed. ", vdev_info->symbol); /* 创建自循环缓冲区 */ _vdev->rx_loop_buffers = ZMALLOC(sizeof(struct rte_ring *) * _vdev->nr_rxstream); _vdev->tx_loop_buffers = ZMALLOC(sizeof(struct rte_ring *) * _vdev->nr_txstream); MR_VERIFY_MALLOC(_vdev->rx_loop_buffers); MR_VERIFY_MALLOC(_vdev->tx_loop_buffers); vdev_data_create_loop_buffer("rx", vdev_info->symbol, _vdev->nr_rxstream, sz_tunnel, _vdev->rx_loop_buffers); vdev_data_create_loop_buffer("tx", vdev_info->symbol, _vdev->nr_txstream, sz_tunnel, _vdev->tx_loop_buffers); _ERR_VERIFY(_vdev->rx_loop_buffers, "Create rx self-loop buffer failed. \n"); _ERR_VERIFY(_vdev->tx_loop_buffers, "Create tx self-loop buffer failed. \n"); /* 注册回调函数 */ _vdev->dispatch = vdev_data_dispatch; _vdev->collect = vdev_data_collect; _vdev->idle_pool = vdev_data_idle_poll; _vdev->destory = vdev_data_destory; _vdev->stats_get = vdev_data_stats_get; _vdev->vdi_create = vdev_data_instance_create; _vdev->vdi_destory = vdev_data_instance_destory; /* 加入虚设备列表 */ MR_VERIFY(v_main->vdev_max_idx <= RTE_DIM(v_main->_vdev_array)); vdev_info->port_id = v_main->vdev_max_idx; v_main->_vdev_array[v_main->vdev_max_idx++] = _vdev; return RT_SUCCESS; errout: if (_vdev != NULL) vdev_data_destory(_vdev); rte_free(_vdev); return RT_ERR; #undef _ERR_VERIFY }