/* Virtual Data Deliver Node - Base Class Author : Lu Qiuwen Zheng Chao Date : 2016-12-10 TODO: Try to use RCU in thread safe mode. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "dp_trace.h" #include "vnode_common.h" #include #include /* What is a tunnel ? * tunnel is a fifo with fixed size. we use rte_ring as tunnel. * * What is a tunnel block ? * A 2d array of tunnels, trans data from a prod to a cons * +-------------+-------------+-------------+------------+------------+ * | nr_prodq | nr_consq | descs[0][0] | desc[0][1] | .......... | * +-------------+-------------+-------------+------------+------------+ * Len = sizeof(nr_prodq) + sizeof(nr_consq) + sizeof(descs) * nr_prodq * nr_consq */ static struct tunnel_desc * tunnel_new(const char * symbol, unsigned int sz_exclusive, unsigned int sz_shared, unsigned int sz_buffer) { struct tunnel_desc * desc = ZMALLOC(sizeof(struct tunnel_desc)); MR_VERIFY_MALLOC(desc); desc->tunnel_object = rte_ring_create(symbol, sz_exclusive + sz_shared, SOCKET_ID_ANY, RING_F_SC_DEQ | RING_F_SP_ENQ); if (desc->tunnel_object == NULL) { MR_ERROR("Create tunnel %s failed : %s", symbol, strerror(errno)); goto errout; } snprintf(desc->symbol, sizeof(desc->symbol), "%s", symbol); desc->tunnel_size = sz_exclusive + sz_shared; desc->en_buffer = ZMALLOC(sizeof(void *) * MR_LIBVNODE_MAX_SZ_BURST); MR_VERIFY_MALLOC(desc->en_buffer); desc->rt_buffer = ZMALLOC(sizeof(void *) * MR_LIBVNODE_MAX_SZ_BURST); MR_VERIFY_MALLOC(desc->rt_buffer); desc->sz_en_buffer = MR_LIBVNODE_MAX_SZ_BURST; desc->sz_rt_buffer = MR_LIBVNODE_MAX_SZ_BURST; #if VNODE_CHECK_THREAD_SAFE rte_spinlock_init(&desc->lock_thread_safe_check); #endif return desc; errout: if (desc->tunnel_object != NULL) { rte_ring_free(desc->tunnel_object); desc->tunnel_object = NULL; } if (desc->en_buffer != NULL) { FREE(&desc->en_buffer); } if (desc->rt_buffer != NULL) { FREE(&desc->rt_buffer); } FREE(&desc); return NULL; } /* Delete a tunnel */ static int tunnel_delete(struct tunnel_desc * desc) { for (int i = 0; i < desc->sz_en_buffer_used; i++) { infra_rte_pktmbuf_free(desc->en_buffer[i]); } struct rte_mbuf * mbuf; while (rte_ring_dequeue(desc->tunnel_object, (void **)&mbuf) == 0) { infra_rte_pktmbuf_free(mbuf); } /* free the rt buffer */ for (int i = 0; i < desc->sz_rt_buffer_used; i++) { assert(0); infra_rte_pktmbuf_free(desc->rt_buffer[i]); } MR_VERIFY_2(rte_ring_empty(desc->tunnel_object) == 1, "Tunnel %s is not empty", desc->symbol); rte_free(desc->en_buffer); rte_free(desc->rt_buffer); rte_ring_free(desc->tunnel_object); rte_free(desc); return 0; } /* Delete a block of tunnels */ static int tunnel_block_delete(struct tunnel_block * block) { for (int prodq_id = 0; prodq_id < block->nr_prodq; prodq_id++) { for (int consq_id = 0; consq_id < block->nr_consq; consq_id++) { if (*tunnel_block_locate(block, prodq_id, consq_id) != NULL) tunnel_delete(*tunnel_block_locate(block, prodq_id, consq_id)); } } rte_free(block); return 0; } /* Alloc a block of tunnels, and init all the tunnels */ static struct tunnel_block * tunnel_block_new(const char * symbol, struct vnode_prod * prod, struct vnode_cons * cons, unsigned int sz_exclusive, unsigned int sz_shared, unsigned int sz_buffer) { unsigned int nr_prodq = prod->nr_prodq; unsigned int nr_consq = cons->nr_consq; unsigned int block_size = sizeof(struct tunnel_block) + sizeof(struct tunnel_desc *) * (nr_prodq * nr_consq); struct tunnel_block * block = (struct tunnel_block *)rte_zmalloc(NULL, block_size, 0); MR_VERIFY_MALLOC(block); block->cons = cons; block->prod = prod; block->nr_consq = nr_consq; block->nr_prodq = nr_prodq; // create tunnel for each prodq and consq in block for (int prodq_id = 0; prodq_id < nr_prodq; prodq_id++) { for (int consq_id = 0; consq_id < nr_consq; consq_id++) { char tunnel_sym[MR_SYMBOL_MAX]; snprintf(tunnel_sym, sizeof(tunnel_sym), "%s-%d-%d", symbol, prodq_id, consq_id); struct tunnel_desc * tdesc = tunnel_new(tunnel_sym, sz_exclusive, sz_shared, sz_buffer); if (tdesc == NULL) goto err; *tunnel_block_locate(block, prodq_id, consq_id) = tdesc; } } return block; err: MR_ERROR("Create tunnel block %s failed, tunnel size = %d, tunnel buffer = %d", symbol, sz_exclusive, sz_buffer); if (block != NULL) { tunnel_block_delete(block); } return NULL; } static int do_producer_join_unsafe(struct vnode * vnode, struct vnode_prod * prod) { struct vnode_cons * cons = ACCESS_ONCE(vnode->cons); struct tunnel_block * block = NULL; if (cons != NULL) { char block_sym[MR_SYMBOL_MAX]; snprintf(block_sym, sizeof(block_sym), "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); // create communication tunnel for each cons and prods block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer); if (block == NULL) { goto error; } prod->block = block; cons->block = block; } vnode->prod = prod; return 0; error: MR_ERROR("Join vnode %s producer %s failed. ", vnode->symbol, prod->symbol); return -1; } static int do_consumer_join_unsafe(struct vnode * vnode, struct vnode_cons * cons) { struct vnode_prod * prod = ACCESS_ONCE(vnode->prod); struct tunnel_block * block = NULL; if (prod != NULL) { char block_sym[MR_SYMBOL_MAX]; snprintf(block_sym, sizeof(block_sym) - 1, "%s-%s-%s", vnode->symbol, prod->symbol, cons->symbol); block = tunnel_block_new(block_sym, prod, cons, vnode->sz_tunnel, vnode->sz_shared, vnode->sz_tunnel_buffer); if (block == NULL) { goto error; } prod->block = block; cons->block = block; } vnode->cons = cons; return 0; error: MR_ERROR("Join vnode %s consumer %s failed. ", vnode->symbol, cons->symbol); return -1; } static void synchronize_dataplane() { rte_delay_ms(100); // assume each function in operation will finished after such time. } /* VNode Structure Operation Functions */ struct vnode * __vnode_common_create(const char * sym, unsigned int sz_tunnel, unsigned int sz_tunnel_buffer, unsigned int notify_cons_when_rx) { struct vnode * object = (struct vnode *)rte_zmalloc(NULL, sizeof(struct vnode), RTE_CACHE_LINE_SIZE); MR_VERIFY_MALLOC(object); snprintf(object->symbol, sizeof(object->symbol), "%s", sym); object->sz_tunnel = sz_tunnel; object->sz_tunnel_buffer = sz_tunnel_buffer; object->notify_cons_when_rx = notify_cons_when_rx; object->en_q_len_monitor = 0; rte_spinlock_init(&object->lock); return object; } /* VNode的生产者、消费者注册 */ struct vnode_prod * __vnode_common_create_prod(struct vnode * vnode, const char * symbol, int nr_prodq) { struct vnode_prod * prod = rte_zmalloc(NULL, sizeof(struct vnode_prod), RTE_CACHE_LINE_SIZE); MR_VERIFY_MALLOC(prod); snprintf(prod->symbol, sizeof(vnode->symbol), "%s", symbol); prod->vnode = vnode; prod->nr_prodq = nr_prodq; rte_spinlock_lock(&vnode->lock); int ret = do_producer_join_unsafe(vnode, prod); rte_spinlock_unlock(&vnode->lock); vnode->credits_on_loan = 0; if (ret < 0) { goto err; } return prod; err: rte_free(prod); return NULL; } struct vnode_cons * __vnode_common_create_cons(struct vnode * vnode, const char * symbol, int nr_consq) { struct vnode_cons * cons = rte_zmalloc(NULL, sizeof(struct vnode_cons), RTE_CACHE_LINE_SIZE); MR_VERIFY_MALLOC(cons); // cons description init snprintf(cons->symbol, sizeof(cons->symbol), "%s", symbol); cons->vnode = vnode; cons->nr_consq = nr_consq; // eventfd for (unsigned int qid = 0; qid < nr_consq; qid++) { struct vnode_cons_notify * cons_notify_ctx = &cons->notify[qid]; if (vnode->notify_cons_when_rx) { cons_notify_ctx->enable = 1; cons_notify_ctx->cons_running_status = CONS_STATUS_RUNNING; cons_notify_ctx->cons_notify_eventfd = eventfd(1, EFD_CLOEXEC | EFD_NONBLOCK); if (unlikely(cons_notify_ctx->cons_notify_eventfd < 0)) { MR_ERROR("failed at create eventfd for vnode consumer %s: %s", symbol, strerror(errno)); goto err; } } } vnode->max_credits_per_tunnel = vnode->max_inflight / nr_consq; vnode->credits_on_loan = 0; rte_spinlock_lock(&vnode->lock); int ret = do_consumer_join_unsafe(vnode, cons); rte_spinlock_unlock(&vnode->lock); if (ret < 0) { goto err; } return cons; err: rte_free(cons); return NULL; } /// vnode_prod/cons_lookup() functions /// lookup prod/cons description by it's name. struct vnode_prod * __vnode_common_prod_lookup(struct vnode * vnode, const char * sym) { assert(vnode != NULL && sym != NULL); rte_spinlock_lock(&vnode->lock); struct vnode_prod * ret = NULL; struct vnode_prod * prod = vnode->prod; if (strncmp(sym, prod->symbol, sizeof(prod->symbol) - 1) == 0) { ret = prod; } else { ret = NULL; } rte_spinlock_unlock(&vnode->lock); return ret; } struct vnode_cons * __vnode_common_cons_lookup(struct vnode * vnode, const char * sym) { assert(vnode != NULL && sym != NULL); rte_spinlock_lock(&vnode->lock); struct vnode_cons * ret = NULL; struct vnode_cons * cons = vnode->cons; if (strncmp(sym, cons->symbol, sizeof(cons->symbol) - 1) == 0) { ret = cons; } else { ret = NULL; } rte_spinlock_unlock(&vnode->lock); return ret; } int __vnode_common_cons_attach(struct vnode * node, struct vnode_cons * cons) { int ret; assert(node != NULL && cons != NULL && cons->vnode == node); rte_spinlock_lock(&node->lock); if (cons->cur_attach >= cons->nr_consq) { MR_ERROR("Too much attach request(cur_attach=%d, nr_consq=%d)", cons->cur_attach, cons->nr_consq); ret = -1; } else { ret = cons->cur_attach++; } rte_spinlock_unlock(&node->lock); return ret; } int __vnode_common_prod_attach(struct vnode * node, struct vnode_prod * prod) { int ret; assert(node != NULL && prod != NULL && prod->vnode == node); rte_spinlock_lock(&node->lock); if (prod->cur_attach >= prod->nr_prodq) { MR_ERROR("Too much attach request(cur_attach=%d, nr_prodq=%d)", prod->cur_attach, prod->nr_prodq); ret = -1; } else { ret = prod->cur_attach++; } rte_spinlock_unlock(&node->lock); return ret; } struct vnode_prod_stat * __vnode_common_prod_stat_get(struct vnode_prod * prod) { struct vnode_cons * cons = ACCESS_ONCE(prod->vnode->cons); struct vnode_prod_stat * prod_stat = prod->stat; /* do nothing */ if (unlikely(cons == NULL)) { return prod_stat; } /* calculate the stats at here instead of enq, deq */ for (unsigned int prodq = 0; prodq < prod->nr_prodq; prodq++) { uint64_t prod_on_line = 0; uint64_t prod_deliver = 0; uint64_t prod_total_len = 0; uint64_t prod_missed = 0; for (unsigned int consq = 0; consq < cons->nr_consq; consq++) { struct tunnel_block * block = ACCESS_ONCE(prod->block); if (block == NULL) { return prod_stat; } struct tunnel_desc * tunnel_desc = *tunnel_block_locate(block, prodq, consq); assert(tunnel_desc != NULL); prod_on_line += tunnel_desc->on_line; prod_deliver += tunnel_desc->deliver; prod_total_len += tunnel_desc->total_len; prod_missed += tunnel_desc->missed; } prod_stat[prodq].on_line = prod_on_line; prod_stat[prodq].deliver = prod_deliver; prod_stat[prodq].total_len = prod_total_len; prod_stat[prodq].missed = prod_missed; } return prod_stat; } struct vnode_cons_stat * __vnode_common_cons_stat_get(struct vnode_cons * cons) { struct vnode_prod * prod = ACCESS_ONCE(cons->vnode->prod); struct vnode_cons_stat * cons_stat = cons->stat; /* do nothing */ if (prod == NULL) { return cons_stat; } /* calculate the stats at here instead of enq, deq */ for (unsigned int consq = 0; consq < cons->nr_consq; consq++) { uint64_t consq_on_line = 0; uint64_t consq_deliver = 0; uint64_t consq_total_len = 0; uint64_t consq_missed = 0; unsigned int consq_q_len_max = 0; float consq_q_len_avg_max = 0; for (unsigned int prodq = 0; prodq < prod->nr_prodq; prodq++) { struct tunnel_block * block = ACCESS_ONCE(cons->block); if (block == NULL) { return cons_stat; } struct tunnel_desc * tunnel_desc = *tunnel_block_locate(block, prodq, consq); assert(tunnel_desc != NULL); consq_on_line += tunnel_desc->on_line; consq_deliver += tunnel_desc->deliver; consq_total_len += tunnel_desc->total_len; consq_missed += tunnel_desc->missed; consq_q_len_max = RTE_MAX(tunnel_desc->q_len, consq_q_len_max); consq_q_len_avg_max = RTE_MAX(tunnel_desc->q_len_avg, consq_q_len_avg_max); } cons_stat[consq].on_line = consq_on_line; cons_stat[consq].deliver = consq_deliver; cons_stat[consq].total_len = consq_total_len; cons_stat[consq].missed = consq_missed; /* q_len, use the max q value */ cons_stat[consq].q_len_max = consq_q_len_max; cons_stat[consq].q_len_avg_max = consq_q_len_avg_max; } return cons->stat; } int __vnode_common_delete_prod(struct vnode_prod * prod) { struct vnode * vnode = prod->vnode; rte_spinlock_lock(&vnode->lock); if (vnode->cons) { struct vnode_cons * cons = vnode->cons; struct tunnel_block * block = cons->block; cons->block = NULL; rte_atomic_thread_fence(__ATOMIC_RELEASE); /* wait for a few second */ synchronize_dataplane(); tunnel_block_delete(block); } vnode->prod = NULL; rte_free(prod); rte_spinlock_unlock(&vnode->lock); return 0; } int __vnode_common_delete_cons(struct vnode_cons * cons) { struct vnode * vnode = cons->vnode; rte_spinlock_lock(&vnode->lock); struct vnode_prod * prod = vnode->prod; if (prod != NULL) { struct tunnel_block * block = prod->block; prod->block = NULL; rte_atomic_thread_fence(__ATOMIC_RELEASE); synchronize_dataplane(); tunnel_block_delete(block); } // close event fds for notify for (unsigned int qid = 0; qid < cons->nr_consq; qid++) { if (cons->notify[qid].enable) { close(cons->notify[qid].cons_notify_eventfd); } } /* free the cons */ vnode->cons = NULL; rte_free(cons); rte_spinlock_unlock(&vnode->lock); return 0; } int __vnode_common_delete(struct vnode * vnode) { if (vnode->cons != NULL) { __vnode_common_delete_cons(vnode->cons); } if (vnode->prod != NULL) { __vnode_common_delete_prod(vnode->prod); } rte_free(vnode); return 0; } void __vnode_common_unpoison(struct vnode * vnode) { MR_ASAN_UNPOISON_MEMORY_REGION(vnode, sizeof(struct vnode)); } void __vnode_common_unpoison_prod(struct vnode_prod * prod) { } void __vnode_common_unpoison_cons(struct vnode_cons * cons) { } struct vnode_cons_notify * __vnode_common_cons_notify_ctx(struct vnode_cons * cons) { return cons->notify; }